Files
emsback/ems-system/src/main/java/com/xzzn/ems/service/InfluxPointDataWriter.java
2026-03-18 10:06:42 +08:00

698 lines
26 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package com.xzzn.ems.service;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.net.HttpURLConnection;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
@Component
public class InfluxPointDataWriter {
private static final Logger log = LoggerFactory.getLogger(InfluxPointDataWriter.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Value("${influxdb.enabled:true}")
private boolean enabled;
@Value("${influxdb.url:}")
private String url;
@Value("${influxdb.username:}")
private String username;
@Value("${influxdb.password:}")
private String password;
@Value("${influxdb.api-token:}")
private String apiToken;
@Value("${influxdb.database:ems_point_data}")
private String database;
@Value("${influxdb.retention-policy:autogen}")
private String retentionPolicy;
@Value("${influxdb.measurement:mqtt_point_data}")
private String measurement;
@Value("${influxdb.write-method:POST}")
private String writeMethod;
@Value("${influxdb.read-method:GET}")
private String readMethod;
@Value("${influxdb.write-path:/write}")
private String writePath;
@Value("${influxdb.query-path:/query}")
private String queryPath;
@Value("${influxdb.org:}")
private String org;
@Value("${influxdb.bucket:}")
private String bucket;
@PostConstruct
public void init() {
if (!enabled) {
log.info("InfluxDB 写入已禁用");
return;
}
if (url == null || url.trim().isEmpty()) {
log.warn("InfluxDB URL 未配置,跳过初始化");
return;
}
log.info("InfluxDB 已启用 HTTP 接入, url: {}, database: {}", url, database);
}
public void writeBatch(List<PointWritePayload> payloads) {
if (!enabled || payloads == null || payloads.isEmpty()) {
return;
}
try {
StringBuilder body = new StringBuilder();
for (PointWritePayload payload : payloads) {
if (payload == null || payload.getPointValue() == null) {
continue;
}
long time = payload.getDataTime() == null ? System.currentTimeMillis() : payload.getDataTime().getTime();
body.append(measurement)
.append(",site_id=").append(escapeLineTag(payload.getSiteId()))
.append(",device_id=").append(escapeLineTag(payload.getDeviceId()))
.append(",point_key=").append(escapeLineTag(payload.getPointKey()))
.append(" value=").append(payload.getPointValue().toPlainString())
.append(" ").append(time)
.append("\n");
}
if (body.length() == 0) {
return;
}
String writeUrl = buildWriteUrl();
if (isBlank(writeUrl)) {
log.warn("写入 InfluxDB 失败v2 写入地址未构建成功,请检查 influxdb.org / influxdb.bucket 配置");
return;
}
HttpResult result = executeRequest(methodOrDefault(writeMethod, "POST"), writeUrl, body.toString());
if (result.code < 200 || result.code >= 300) {
if (result.code == 404 && isV2WritePath() && isOrgOrBucketMissing(result.body)) {
if (ensureV2OrgAndBucket()) {
HttpResult retryResult = executeRequest(methodOrDefault(writeMethod, "POST"), writeUrl, body.toString());
if (retryResult.code >= 200 && retryResult.code < 300) {
log.info("InfluxDB org/bucket 自动创建成功,写入已恢复");
return;
}
log.warn("InfluxDB 重试写入失败HTTP状态码: {}, url: {}, body: {}", retryResult.code, writeUrl, safeLog(retryResult.body));
return;
}
}
log.warn("写入 InfluxDB 失败HTTP状态码: {}, url: {}, body: {}", result.code, writeUrl, safeLog(result.body));
}
} catch (Exception e) {
log.warn("写入 InfluxDB 失败: {}", e.getMessage());
}
}
public List<PointValue> queryCurveData(String siteId, String deviceId, String pointKey, Date startTime, Date endTime) {
if (!enabled) {
return Collections.emptyList();
}
if (isBlank(siteId) || isBlank(deviceId) || isBlank(pointKey) || startTime == null || endTime == null) {
return Collections.emptyList();
}
String normalizedSiteId = siteId.trim();
String normalizedDeviceId = deviceId.trim();
String normalizedPointKey = pointKey.trim();
String influxQl = String.format(
"SELECT \"value\" FROM \"%s\" WHERE \"site_id\" = '%s' AND \"device_id\" = '%s' AND \"point_key\" = '%s' " +
"AND time >= %dms AND time <= %dms ORDER BY time ASC",
measurement,
escapeTagValue(normalizedSiteId),
escapeTagValue(normalizedDeviceId),
escapeTagValue(normalizedPointKey),
startTime.getTime(),
endTime.getTime()
);
try {
String queryUrl = buildQueryUrl(influxQl);
List<PointValue> values = parseInfluxQlResponse(executeRequestWithResponse(methodOrDefault(readMethod, "GET"), queryUrl));
if (!values.isEmpty()) {
return values;
}
// 兼容 dataKey 大小写差异,避免点位 key 字母大小写不一致导致查不到曲线
String regexQuery = String.format(
"SELECT \"value\" FROM \"%s\" WHERE \"site_id\" = '%s' AND \"device_id\" = '%s' AND \"point_key\" =~ /(?i)^%s$/ " +
"AND time >= %dms AND time <= %dms ORDER BY time ASC",
measurement,
escapeTagValue(normalizedSiteId),
escapeTagValue(normalizedDeviceId),
escapeRegex(normalizedPointKey),
startTime.getTime(),
endTime.getTime()
);
values = parseInfluxQlResponse(executeRequestWithResponse(methodOrDefault(readMethod, "GET"), buildQueryUrl(regexQuery)));
return values;
} catch (Exception e) {
log.warn("查询 InfluxDB 曲线失败: {}", e.getMessage());
return Collections.emptyList();
}
}
public List<PointValue> queryCurveDataByPointKey(String siteId, String pointKey, Date startTime, Date endTime) {
if (!enabled) {
return Collections.emptyList();
}
if (isBlank(siteId) || isBlank(pointKey) || startTime == null || endTime == null) {
return Collections.emptyList();
}
String normalizedSiteId = siteId.trim();
String normalizedPointKey = pointKey.trim();
String influxQl = String.format(
"SELECT \"value\" FROM \"%s\" WHERE \"site_id\" = '%s' AND \"point_key\" = '%s' " +
"AND time >= %dms AND time <= %dms ORDER BY time ASC",
measurement,
escapeTagValue(normalizedSiteId),
escapeTagValue(normalizedPointKey),
startTime.getTime(),
endTime.getTime()
);
try {
String queryUrl = buildQueryUrl(influxQl);
return parseInfluxQlResponse(executeRequestWithResponse(methodOrDefault(readMethod, "GET"), queryUrl));
} catch (Exception e) {
log.warn("按 pointKey 查询 InfluxDB 曲线失败: {}", e.getMessage());
return Collections.emptyList();
}
}
public PointValue queryLatestPointValueByPointKey(String siteId, String pointKey, Date startTime, Date endTime) {
if (!enabled) {
return null;
}
if (isBlank(siteId) || isBlank(pointKey) || startTime == null || endTime == null) {
return null;
}
String normalizedSiteId = siteId.trim();
String normalizedPointKey = pointKey.trim();
String influxQl = String.format(
"SELECT \"value\" FROM \"%s\" WHERE \"site_id\" = '%s' AND \"point_key\" = '%s' " +
"AND time >= %dms AND time <= %dms ORDER BY time DESC LIMIT 1",
measurement,
escapeTagValue(normalizedSiteId),
escapeTagValue(normalizedPointKey),
startTime.getTime(),
endTime.getTime()
);
try {
List<PointValue> values = parseInfluxQlResponse(
executeRequestWithResponse(methodOrDefault(readMethod, "GET"), buildQueryUrl(influxQl))
);
if (!values.isEmpty()) {
return values.get(0);
}
String regexQuery = String.format(
"SELECT \"value\" FROM \"%s\" WHERE \"site_id\" = '%s' AND \"point_key\" =~ /(?i)^%s$/ " +
"AND time >= %dms AND time <= %dms ORDER BY time DESC LIMIT 1",
measurement,
escapeTagValue(normalizedSiteId),
escapeRegex(normalizedPointKey),
startTime.getTime(),
endTime.getTime()
);
values = parseInfluxQlResponse(
executeRequestWithResponse(methodOrDefault(readMethod, "GET"), buildQueryUrl(regexQuery))
);
return values.isEmpty() ? null : values.get(0);
} catch (Exception e) {
log.warn("按 pointKey 查询 InfluxDB 最新值失败: {}", e.getMessage());
return null;
}
}
private String buildWriteUrl() {
if (isV2WritePath()) {
return buildV2WriteUrl();
}
StringBuilder sb = new StringBuilder(trimTrailingSlash(url));
sb.append(normalizePath(writePath)).append("?db=").append(urlEncode(database));
if (!isBlank(retentionPolicy)) {
sb.append("&rp=").append(urlEncode(retentionPolicy));
}
sb.append("&precision=ms");
return sb.toString();
}
private String buildQueryUrl(String influxQl) {
String queryDb = database;
if (isV2WritePath() && !isBlank(bucket)) {
queryDb = bucket;
}
StringBuilder queryUrl = new StringBuilder(trimTrailingSlash(url))
.append(normalizePath(queryPath))
.append("?db=").append(urlEncode(queryDb))
.append("&epoch=ms&q=").append(urlEncode(influxQl));
if (!isBlank(retentionPolicy)) {
queryUrl.append("&rp=").append(urlEncode(retentionPolicy));
}
return queryUrl.toString();
}
private String buildV2WriteUrl() {
String currentBucket = isBlank(bucket) ? database : bucket;
if (isBlank(org) || isBlank(currentBucket)) {
return null;
}
return trimTrailingSlash(url)
+ "/api/v2/write?org=" + urlEncode(org)
+ "&bucket=" + urlEncode(currentBucket)
+ "&precision=ms";
}
private HttpResult executeRequest(String method, String requestUrl, String body) throws Exception {
HttpURLConnection connection = openConnection(method, requestUrl, "text/plain; charset=UTF-8");
try {
if (body != null) {
connection.setDoOutput(true);
try (OutputStream os = connection.getOutputStream()) {
os.write(body.getBytes(StandardCharsets.UTF_8));
}
}
int code = connection.getResponseCode();
InputStream is = (code >= 200 && code < 300) ? connection.getInputStream() : connection.getErrorStream();
return new HttpResult(code, readStream(is));
} finally {
connection.disconnect();
}
}
private String executeRequestWithResponse(String method, String requestUrl) throws Exception {
HttpURLConnection connection = openConnection(method, requestUrl, "text/plain; charset=UTF-8");
try {
int code = connection.getResponseCode();
InputStream is = (code >= 200 && code < 300) ? connection.getInputStream() : connection.getErrorStream();
return readStream(is);
} finally {
connection.disconnect();
}
}
private String readStream(InputStream is) throws Exception {
if (is == null) {
return "";
}
try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
StringBuilder sb = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
sb.append(line);
}
return sb.toString();
}
}
private HttpResult executeJsonRequest(String method, String requestUrl, String jsonBody) throws Exception {
HttpURLConnection connection = openConnection(method, requestUrl, "application/json; charset=UTF-8");
try {
if (jsonBody != null) {
connection.setDoOutput(true);
try (OutputStream os = connection.getOutputStream()) {
os.write(jsonBody.getBytes(StandardCharsets.UTF_8));
}
}
int code = connection.getResponseCode();
InputStream is = (code >= 200 && code < 300) ? connection.getInputStream() : connection.getErrorStream();
return new HttpResult(code, readStream(is));
} finally {
connection.disconnect();
}
}
private HttpURLConnection openConnection(String method, String requestUrl, String contentType) throws Exception {
HttpURLConnection connection = (HttpURLConnection) new java.net.URL(requestUrl).openConnection();
connection.setRequestMethod(method);
connection.setConnectTimeout(5000);
connection.setReadTimeout(8000);
connection.setRequestProperty("Accept", "application/json");
connection.setRequestProperty("Content-Type", contentType);
if (!isBlank(apiToken)) {
connection.setRequestProperty("Authorization", "Token " + apiToken.trim());
} else if (!isBlank(username) || !isBlank(password)) {
String basic = java.util.Base64.getEncoder()
.encodeToString((safe(username) + ":" + safe(password)).getBytes(StandardCharsets.UTF_8));
connection.setRequestProperty("Authorization", "Basic " + basic);
}
return connection;
}
private String escapeLineTag(String value) {
if (value == null) {
return "";
}
return value.replace("\\", "\\\\")
.replace(",", "\\,")
.replace(" ", "\\ ")
.replace("=", "\\=");
}
private String trimTrailingSlash(String v) {
if (v == null) {
return "";
}
String result = v.trim();
while (result.endsWith("/")) {
result = result.substring(0, result.length() - 1);
}
return result;
}
private String urlEncode(String value) {
try {
return URLEncoder.encode(safe(value), StandardCharsets.UTF_8.name());
} catch (Exception e) {
return safe(value);
}
}
private String safe(String value) {
return value == null ? "" : value;
}
private boolean isV2WritePath() {
return "/api/v2/write".equals(normalizePath(writePath));
}
private boolean isOrgOrBucketMissing(String responseBody) {
if (isBlank(responseBody)) {
return false;
}
String lower = responseBody.toLowerCase();
return (lower.contains("organization") && lower.contains("not found"))
|| (lower.contains("bucket") && lower.contains("not found"));
}
private boolean ensureV2OrgAndBucket() {
try {
if (isBlank(org)) {
log.warn("InfluxDB 自动创建 organization 失败org 配置为空");
return false;
}
String currentBucket = isBlank(bucket) ? database : bucket;
String orgId = queryOrgId(org);
if (isBlank(orgId)) {
orgId = createOrg(org);
}
if (isBlank(orgId)) {
log.warn("InfluxDB 自动创建 organization 失败org={}", org);
return false;
}
if (!bucketExists(org, currentBucket)) {
if (!createBucket(orgId, currentBucket)) {
log.warn("InfluxDB 自动创建 bucket 失败org={}, bucket={}", org, currentBucket);
return false;
}
}
return true;
} catch (Exception ex) {
log.warn("InfluxDB 自动创建 org/bucket 异常: {}", ex.getMessage());
return false;
}
}
private String queryOrgId(String orgName) throws Exception {
String requestUrl = trimTrailingSlash(url) + "/api/v2/orgs?org=" + urlEncode(orgName);
HttpResult result = executeJsonRequest("GET", requestUrl, null);
if (result.code < 200 || result.code >= 300 || isBlank(result.body)) {
log.warn("查询 organization 失败status={}, org={}, body={}", result.code, orgName, safeLog(result.body));
return null;
}
JsonNode root = OBJECT_MAPPER.readTree(result.body);
JsonNode orgs = root.path("orgs");
if (orgs.isArray() && orgs.size() > 0) {
JsonNode first = orgs.get(0);
if (first != null) {
String id = first.path("id").asText(null);
if (!isBlank(id)) {
return id;
}
}
}
return null;
}
private String createOrg(String orgName) throws Exception {
String requestUrl = trimTrailingSlash(url) + "/api/v2/orgs";
Map<String, Object> payload = new HashMap<>();
payload.put("name", orgName);
String body = OBJECT_MAPPER.writeValueAsString(payload);
HttpResult result = executeJsonRequest("POST", requestUrl, body);
if ((result.code < 200 || result.code >= 300) || isBlank(result.body)) {
log.warn("创建 organization 失败status={}, org={}, body={}", result.code, orgName, safeLog(result.body));
return null;
}
JsonNode root = OBJECT_MAPPER.readTree(result.body);
String id = root.path("id").asText(null);
return isBlank(id) ? null : id;
}
private boolean bucketExists(String orgName, String bucketName) throws Exception {
String requestUrl = trimTrailingSlash(url)
+ "/api/v2/buckets?name=" + urlEncode(bucketName)
+ "&org=" + urlEncode(orgName);
HttpResult result = executeJsonRequest("GET", requestUrl, null);
if (result.code < 200 || result.code >= 300 || isBlank(result.body)) {
log.warn("查询 bucket 失败status={}, org={}, bucket={}, body={}", result.code, orgName, bucketName, safeLog(result.body));
return false;
}
JsonNode root = OBJECT_MAPPER.readTree(result.body);
JsonNode buckets = root.path("buckets");
return buckets.isArray() && buckets.size() > 0;
}
private boolean createBucket(String orgId, String bucketName) throws Exception {
String requestUrl = trimTrailingSlash(url) + "/api/v2/buckets";
Map<String, Object> payload = new HashMap<>();
payload.put("orgID", orgId);
payload.put("name", bucketName);
String body = OBJECT_MAPPER.writeValueAsString(payload);
HttpResult result = executeJsonRequest("POST", requestUrl, body);
if (result.code < 200 || result.code >= 300) {
log.warn("创建 bucket 失败status={}, orgId={}, bucket={}, body={}", result.code, orgId, bucketName, safeLog(result.body));
}
return result.code >= 200 && result.code < 300;
}
private String methodOrDefault(String method, String defaultMethod) {
return isBlank(method) ? defaultMethod : method.trim().toUpperCase();
}
private String normalizePath(String path) {
if (isBlank(path)) {
return "/";
}
String p = path.trim();
return p.startsWith("/") ? p : "/" + p;
}
private String safeLog(String body) {
if (body == null) {
return "";
}
return body.length() > 200 ? body.substring(0, 200) : body;
}
private Date parseInfluxTime(Object timeObject) {
if (timeObject == null) {
return null;
}
if (timeObject instanceof Number) {
return new Date(((Number) timeObject).longValue());
}
try {
return Date.from(Instant.parse(String.valueOf(timeObject)));
} catch (Exception e) {
return null;
}
}
private BigDecimal toBigDecimal(Object valueObject) {
if (valueObject == null) {
return null;
}
if (valueObject instanceof BigDecimal) {
return (BigDecimal) valueObject;
}
if (valueObject instanceof Number) {
return new BigDecimal(valueObject.toString());
}
try {
return new BigDecimal(String.valueOf(valueObject));
} catch (Exception e) {
return null;
}
}
private String escapeTagValue(String value) {
return value == null ? "" : value.replace("\\", "\\\\").replace("'", "\\'");
}
private String escapeRegex(String value) {
if (value == null) {
return "";
}
return value.replace("\\", "\\\\")
.replace("/", "\\/")
.replace(".", "\\.")
.replace("(", "\\(")
.replace(")", "\\)")
.replace("[", "\\[")
.replace("]", "\\]")
.replace("{", "\\{")
.replace("}", "\\}")
.replace("^", "\\^")
.replace("$", "\\$")
.replace("*", "\\*")
.replace("+", "\\+")
.replace("?", "\\?")
.replace("|", "\\|");
}
private List<PointValue> parseInfluxQlResponse(String response) throws Exception {
if (isBlank(response)) {
return Collections.emptyList();
}
List<PointValue> values = new ArrayList<>();
JsonNode root = OBJECT_MAPPER.readTree(response);
JsonNode resultsNode = root.path("results");
if (!resultsNode.isArray()) {
return values;
}
for (JsonNode result : resultsNode) {
JsonNode seriesArray = result.path("series");
if (!seriesArray.isArray()) {
continue;
}
for (JsonNode series : seriesArray) {
JsonNode rows = series.path("values");
if (!rows.isArray()) {
continue;
}
for (JsonNode row : rows) {
if (!row.isArray() || row.size() < 2) {
continue;
}
Date dataTime = parseInfluxTime(row.get(0).isNumber() ? row.get(0).asLong() : row.get(0).asText());
BigDecimal pointValue = toBigDecimal(row.get(1).isNumber() ? row.get(1).asText() : row.get(1).asText(null));
if (dataTime == null || pointValue == null) {
continue;
}
values.add(new PointValue(dataTime, pointValue));
}
}
}
return values;
}
private boolean isBlank(String value) {
return value == null || value.trim().isEmpty();
}
public static class PointWritePayload {
private final String siteId;
private final String deviceId;
private final String pointKey;
private final BigDecimal pointValue;
private final Date dataTime;
public PointWritePayload(String siteId, String deviceId, String pointKey, BigDecimal pointValue, Date dataTime) {
this.siteId = siteId;
this.deviceId = deviceId;
this.pointKey = pointKey;
this.pointValue = pointValue;
this.dataTime = dataTime;
}
public String getSiteId() {
return siteId;
}
public String getDeviceId() {
return deviceId;
}
public String getPointKey() {
return pointKey;
}
public BigDecimal getPointValue() {
return pointValue;
}
public Date getDataTime() {
return dataTime;
}
}
public static class PointValue {
private final Date dataTime;
private final BigDecimal pointValue;
public PointValue(Date dataTime, BigDecimal pointValue) {
this.dataTime = dataTime;
this.pointValue = pointValue;
}
public Date getDataTime() {
return dataTime;
}
public BigDecimal getPointValue() {
return pointValue;
}
}
private static class HttpResult {
private final int code;
private final String body;
private HttpResult(int code, String body) {
this.code = code;
this.body = body;
}
}
}