diff --git a/ems-admin/src/main/resources/application-prod.yml b/ems-admin/src/main/resources/application-prod.yml
index c380127..9172989 100644
--- a/ems-admin/src/main/resources/application-prod.yml
+++ b/ems-admin/src/main/resources/application-prod.yml
@@ -70,7 +70,7 @@ spring:
# 端口,默认为6379
port: 6379
# 数据库索引
- database: 0
+ database: 3
# 密码
password: 12345678
# 连接超时时间
@@ -92,7 +92,7 @@ spring:
druid:
# 主库数据源
master:
- url: jdbc:mysql://172.17.0.13:3306/setri_ems?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
+ url: jdbc:mysql://172.17.0.13:3306/setri_ems_new?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
username: ems
password: Aa112211!
# 从库数据源
@@ -188,7 +188,7 @@ xss:
mqtt:
broker.url: tcp://121.5.164.6:1883
- client.id: ems-cloud
+ client.id: ems-cloud-new
username: dmbroker
password: qwer1234
connection-timeout: 15
@@ -216,7 +216,7 @@ weather:
influxdb:
enabled: true
url: http://172.17.0.7:8086/
- api-token: B_1HvHbUhubQQhLdI0XtNVw7maWS1aIjVZQ1a3PGD6b-VNg3_JUo_jHgZmjeBKYXnGATNdIqfpl-FAVbJ4UIPg==
+ api-token: l_MUXGYFs15utEaLLLgGUUkHYVA84nweimAyeiHNRIg_FWy3ACcdA85LnxDIBKA8bKxbPp2isTkrqHzrhXtZYw==
write-method: POST
read-method: GET
write-path: /api/v2/write
diff --git a/ems-admin/src/main/resources/application.yml b/ems-admin/src/main/resources/application.yml
index 01cabff..94bef80 100644
--- a/ems-admin/src/main/resources/application.yml
+++ b/ems-admin/src/main/resources/application.yml
@@ -94,7 +94,7 @@ spring:
druid:
# 主库数据源
master:
- url: jdbc:mysql://122.51.194.184:13306/setri_ems?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
+ url: jdbc:mysql://122.51.194.184:13306/setri_ems_new?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
username: ems
password: 12345678
# 从库数据源
diff --git a/ems-admin/src/main/resources/logback.xml b/ems-admin/src/main/resources/logback.xml
index 96d503c..2c82385 100644
--- a/ems-admin/src/main/resources/logback.xml
+++ b/ems-admin/src/main/resources/logback.xml
@@ -70,6 +70,18 @@
${log.pattern}
+
+
+
+ ${log.path}/sys-influxdb.log
+
+ ${log.path}/sys-influxdb.%d{yyyy-MM-dd}.log
+ 60
+
+
+ ${log.pattern}
+
+
@@ -90,4 +102,9 @@
+
+
+
+
+
diff --git a/ems-system/src/main/java/com/xzzn/ems/service/InfluxPointDataWriter.java b/ems-system/src/main/java/com/xzzn/ems/service/InfluxPointDataWriter.java
index 4aed2c3..53a2a13 100644
--- a/ems-system/src/main/java/com/xzzn/ems/service/InfluxPointDataWriter.java
+++ b/ems-system/src/main/java/com/xzzn/ems/service/InfluxPointDataWriter.java
@@ -28,6 +28,7 @@ import java.util.Map;
public class InfluxPointDataWriter {
private static final Logger log = LoggerFactory.getLogger(InfluxPointDataWriter.class);
+ private static final Logger influxWriteSuccessLog = LoggerFactory.getLogger("sys-influxdb");
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Value("${influxdb.enabled:true}")
@@ -91,6 +92,7 @@ public class InfluxPointDataWriter {
}
try {
StringBuilder body = new StringBuilder();
+ List pointSummaries = new ArrayList<>();
for (PointWritePayload payload : payloads) {
if (payload == null || payload.getPointValue() == null) {
continue;
@@ -103,6 +105,12 @@ public class InfluxPointDataWriter {
.append(" value=").append(payload.getPointValue().toPlainString())
.append(" ").append(time)
.append("\n");
+ pointSummaries.add(String.format("siteId=%s, deviceId=%s, pointKey=%s, value=%s, time=%s",
+ safe(payload.getSiteId()),
+ safe(payload.getDeviceId()),
+ safe(payload.getPointKey()),
+ payload.getPointValue(),
+ new Date(time)));
}
if (body.length() == 0) {
return;
@@ -112,6 +120,8 @@ public class InfluxPointDataWriter {
log.warn("写入 InfluxDB 失败:v2 写入地址未构建成功,请检查 influxdb.org / influxdb.bucket 配置");
return;
}
+ long startMillis = System.currentTimeMillis();
+ int writeCount = countWriteLines(body);
HttpResult result = executeRequest(methodOrDefault(writeMethod, "POST"), writeUrl, body.toString());
if (result.code < 200 || result.code >= 300) {
if (result.code == 404 && isV2WritePath() && isOrgOrBucketMissing(result.body)) {
@@ -119,6 +129,7 @@ public class InfluxPointDataWriter {
HttpResult retryResult = executeRequest(methodOrDefault(writeMethod, "POST"), writeUrl, body.toString());
if (retryResult.code >= 200 && retryResult.code < 300) {
log.info("InfluxDB org/bucket 自动创建成功,写入已恢复");
+ logWriteSuccess(writeCount, writeUrl, startMillis, pointSummaries);
return;
}
log.warn("InfluxDB 重试写入失败,HTTP状态码: {}, url: {}, body: {}", retryResult.code, writeUrl, safeLog(retryResult.body));
@@ -126,7 +137,9 @@ public class InfluxPointDataWriter {
}
}
log.warn("写入 InfluxDB 失败,HTTP状态码: {}, url: {}, body: {}", result.code, writeUrl, safeLog(result.body));
+ return;
}
+ logWriteSuccess(writeCount, writeUrl, startMillis, pointSummaries);
} catch (Exception e) {
log.warn("写入 InfluxDB 失败: {}", e.getMessage());
}
@@ -173,7 +186,8 @@ public class InfluxPointDataWriter {
startTime.getTime(),
endTime.getTime()
);
- values = parseInfluxQlResponse(executeRequestWithResponse(methodOrDefault(readMethod, "GET"), buildQueryUrl(regexQuery)));
+ String regexQueryUrl = buildQueryUrl(regexQuery);
+ values = parseInfluxQlResponse(executeRequestWithResponse(methodOrDefault(readMethod, "GET"), regexQueryUrl));
return values;
} catch (Exception e) {
log.warn("查询 InfluxDB 曲线失败: {}", e.getMessage());
@@ -204,7 +218,8 @@ public class InfluxPointDataWriter {
try {
String queryUrl = buildQueryUrl(influxQl);
- return parseInfluxQlResponse(executeRequestWithResponse(methodOrDefault(readMethod, "GET"), queryUrl));
+ List values = parseInfluxQlResponse(executeRequestWithResponse(methodOrDefault(readMethod, "GET"), queryUrl));
+ return values;
} catch (Exception e) {
log.warn("按 pointKey 查询 InfluxDB 曲线失败: {}", e.getMessage());
return Collections.emptyList();
@@ -234,24 +249,9 @@ public class InfluxPointDataWriter {
);
try {
+ String queryUrl = buildQueryUrl(influxQl);
List values = parseInfluxQlResponse(
- executeRequestWithResponse(methodOrDefault(readMethod, "GET"), buildQueryUrl(influxQl))
- );
- if (!values.isEmpty()) {
- return values.get(0);
- }
-
- String regexQuery = String.format(
- "SELECT \"value\" FROM \"%s\" WHERE \"site_id\" = '%s' AND \"point_key\" =~ /(?i)^%s$/ " +
- "AND time >= %dms AND time <= %dms ORDER BY time DESC LIMIT 1",
- measurement,
- escapeTagValue(normalizedSiteId),
- escapeRegex(normalizedPointKey),
- startTime.getTime(),
- endTime.getTime()
- );
- values = parseInfluxQlResponse(
- executeRequestWithResponse(methodOrDefault(readMethod, "GET"), buildQueryUrl(regexQuery))
+ executeRequestWithResponse(methodOrDefault(readMethod, "GET"), queryUrl)
);
return values.isEmpty() ? null : values.get(0);
} catch (Exception e) {
@@ -260,6 +260,21 @@ public class InfluxPointDataWriter {
}
}
+ private void logWriteSuccess(int writeCount, String writeUrl, long startMillis, List pointSummaries) {
+ influxWriteSuccessLog.info("InfluxDB写入成功, count: {}, points: {}, url: {}, costMs: {}",
+ writeCount, pointSummaries, writeUrl, System.currentTimeMillis() - startMillis);
+ }
+
+ private int countWriteLines(StringBuilder body) {
+ int count = 0;
+ for (int i = 0; i < body.length(); i++) {
+ if (body.charAt(i) == '\n') {
+ count++;
+ }
+ }
+ return count;
+ }
+
private String buildWriteUrl() {
if (isV2WritePath()) {
return buildV2WriteUrl();
diff --git a/ems-system/src/main/java/com/xzzn/ems/service/impl/DeviceDataProcessServiceImpl.java b/ems-system/src/main/java/com/xzzn/ems/service/impl/DeviceDataProcessServiceImpl.java
index 9f7df84..4ba84bc 100644
--- a/ems-system/src/main/java/com/xzzn/ems/service/impl/DeviceDataProcessServiceImpl.java
+++ b/ems-system/src/main/java/com/xzzn/ems/service/impl/DeviceDataProcessServiceImpl.java
@@ -2816,15 +2816,16 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i
private JSONArray parseJsonData(String message) {
try {
- JSONObject object = JSONObject.parseObject(message);
- return object.getJSONArray("payload");
- } catch (JSONException e) {
- log.info("mqtt message is not a json object: " + e.getMessage());
- try {
- return JSONArray.parseArray(message);
- } catch (Exception arrayException) {
- log.info("mqtt message is not a json array: " + e.getMessage());
+ Object parsed = JSON.parse(message);
+ if (parsed instanceof JSONArray) {
+ return (JSONArray) parsed;
}
+ if (parsed instanceof JSONObject) {
+ return ((JSONObject) parsed).getJSONArray("payload");
+ }
+ log.info("mqtt message root type is unsupported: {}", parsed == null ? "null" : parsed.getClass().getName());
+ } catch (JSONException e) {
+ log.info("mqtt message parse failed: {}", e.getMessage());
}
return null;
}
@@ -3954,15 +3955,22 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i
log.info("deviceId:" + deviceId);
+ if (checkJsonDataEmpty(jsonData)) {
+ log.warn("设备告警Data为空,跳过告警处理,siteId: {},deviceId: {}", siteId, deviceId);
+ continue;
+ }
+
// 存放mqtt原始每个设备最晚一次数据,便于后面点位获取数据
redisCache.setCacheObject(RedisKeyConstants.ORIGINAL_MQTT_DATA_ALARM + siteId + "_" + deviceId, obj);
// 存放每次同步数据,失效时间(同同步时间)-用于判断是否正常同步数据
redisCache.setCacheObject(RedisKeyConstants.SYNC_DATA_ALARM + siteId + "_" + deviceId, obj, 1, TimeUnit.MINUTES);
// 处理设备数据,根据不同设备类型进行不同的数据处理
String deviceCategory = processingDeviceAlarmData(siteId, deviceId, jsonData, dataUpdateTime);
- if (StringUtils.isEmpty(deviceCategory)) {
+ if (StringUtils.isNotEmpty(deviceCategory)) {
// 处理告警信息
alarmDataProcess(siteId, deviceId, jsonData, alarmMatchInfo, deviceCategory);
+ } else {
+ log.warn("设备告警数据未识别到设备类型,跳过告警处理,siteId: {},deviceId: {}", siteId, deviceId);
}
}
}
@@ -4097,8 +4105,21 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i
private void alarmDataProcess(String siteId, String deviceId, String jsonData,
Map alarmInfoData, String category) {
+ if (StringUtils.isEmpty(category)) {
+ log.warn("设备类型为空,跳过告警处理,siteId: {},deviceId: {}", siteId, deviceId);
+ return;
+ }
+ if (alarmInfoData == null || alarmInfoData.isEmpty()) {
+ log.warn("告警匹配缓存为空,跳过告警处理,siteId: {},deviceId: {},category: {}", siteId, deviceId, category);
+ return;
+ }
Map obj = JSON.parseObject(jsonData, new TypeReference