From 111631a42656bfe66ac937c2e665ee9fd972ee7d Mon Sep 17 00:00:00 2001 From: dashixiong Date: Mon, 20 Apr 2026 20:42:57 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/resources/application-prod.yml | 8 +- ems-admin/src/main/resources/application.yml | 2 +- ems-admin/src/main/resources/logback.xml | 17 +++ .../ems/service/InfluxPointDataWriter.java | 53 ++++--- .../impl/DeviceDataProcessServiceImpl.java | 95 +++++++++---- .../mapper/ems/EmsAlarmRecordsMapper.xml | 11 +- .../mapper/ems/EmsDailyEnergyDataMapper.xml | 129 ++++++++++-------- 7 files changed, 206 insertions(+), 109 deletions(-) diff --git a/ems-admin/src/main/resources/application-prod.yml b/ems-admin/src/main/resources/application-prod.yml index c380127..9172989 100644 --- a/ems-admin/src/main/resources/application-prod.yml +++ b/ems-admin/src/main/resources/application-prod.yml @@ -70,7 +70,7 @@ spring: # 端口,默认为6379 port: 6379 # 数据库索引 - database: 0 + database: 3 # 密码 password: 12345678 # 连接超时时间 @@ -92,7 +92,7 @@ spring: druid: # 主库数据源 master: - url: jdbc:mysql://172.17.0.13:3306/setri_ems?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8 + url: jdbc:mysql://172.17.0.13:3306/setri_ems_new?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8 username: ems password: Aa112211! # 从库数据源 @@ -188,7 +188,7 @@ xss: mqtt: broker.url: tcp://121.5.164.6:1883 - client.id: ems-cloud + client.id: ems-cloud-new username: dmbroker password: qwer1234 connection-timeout: 15 @@ -216,7 +216,7 @@ weather: influxdb: enabled: true url: http://172.17.0.7:8086/ - api-token: B_1HvHbUhubQQhLdI0XtNVw7maWS1aIjVZQ1a3PGD6b-VNg3_JUo_jHgZmjeBKYXnGATNdIqfpl-FAVbJ4UIPg== + api-token: l_MUXGYFs15utEaLLLgGUUkHYVA84nweimAyeiHNRIg_FWy3ACcdA85LnxDIBKA8bKxbPp2isTkrqHzrhXtZYw== write-method: POST read-method: GET write-path: /api/v2/write diff --git a/ems-admin/src/main/resources/application.yml b/ems-admin/src/main/resources/application.yml index 01cabff..94bef80 100644 --- a/ems-admin/src/main/resources/application.yml +++ b/ems-admin/src/main/resources/application.yml @@ -94,7 +94,7 @@ spring: druid: # 主库数据源 master: - url: jdbc:mysql://122.51.194.184:13306/setri_ems?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8 + url: jdbc:mysql://122.51.194.184:13306/setri_ems_new?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8 username: ems password: 12345678 # 从库数据源 diff --git a/ems-admin/src/main/resources/logback.xml b/ems-admin/src/main/resources/logback.xml index 96d503c..2c82385 100644 --- a/ems-admin/src/main/resources/logback.xml +++ b/ems-admin/src/main/resources/logback.xml @@ -70,6 +70,18 @@ ${log.pattern} + + + + ${log.path}/sys-influxdb.log + + ${log.path}/sys-influxdb.%d{yyyy-MM-dd}.log + 60 + + + ${log.pattern} + + @@ -90,4 +102,9 @@ + + + + + diff --git a/ems-system/src/main/java/com/xzzn/ems/service/InfluxPointDataWriter.java b/ems-system/src/main/java/com/xzzn/ems/service/InfluxPointDataWriter.java index 4aed2c3..53a2a13 100644 --- a/ems-system/src/main/java/com/xzzn/ems/service/InfluxPointDataWriter.java +++ b/ems-system/src/main/java/com/xzzn/ems/service/InfluxPointDataWriter.java @@ -28,6 +28,7 @@ import java.util.Map; public class InfluxPointDataWriter { private static final Logger log = LoggerFactory.getLogger(InfluxPointDataWriter.class); + private static final Logger influxWriteSuccessLog = LoggerFactory.getLogger("sys-influxdb"); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @Value("${influxdb.enabled:true}") @@ -91,6 +92,7 @@ public class InfluxPointDataWriter { } try { StringBuilder body = new StringBuilder(); + List pointSummaries = new ArrayList<>(); for (PointWritePayload payload : payloads) { if (payload == null || payload.getPointValue() == null) { continue; @@ -103,6 +105,12 @@ public class InfluxPointDataWriter { .append(" value=").append(payload.getPointValue().toPlainString()) .append(" ").append(time) .append("\n"); + pointSummaries.add(String.format("siteId=%s, deviceId=%s, pointKey=%s, value=%s, time=%s", + safe(payload.getSiteId()), + safe(payload.getDeviceId()), + safe(payload.getPointKey()), + payload.getPointValue(), + new Date(time))); } if (body.length() == 0) { return; @@ -112,6 +120,8 @@ public class InfluxPointDataWriter { log.warn("写入 InfluxDB 失败:v2 写入地址未构建成功,请检查 influxdb.org / influxdb.bucket 配置"); return; } + long startMillis = System.currentTimeMillis(); + int writeCount = countWriteLines(body); HttpResult result = executeRequest(methodOrDefault(writeMethod, "POST"), writeUrl, body.toString()); if (result.code < 200 || result.code >= 300) { if (result.code == 404 && isV2WritePath() && isOrgOrBucketMissing(result.body)) { @@ -119,6 +129,7 @@ public class InfluxPointDataWriter { HttpResult retryResult = executeRequest(methodOrDefault(writeMethod, "POST"), writeUrl, body.toString()); if (retryResult.code >= 200 && retryResult.code < 300) { log.info("InfluxDB org/bucket 自动创建成功,写入已恢复"); + logWriteSuccess(writeCount, writeUrl, startMillis, pointSummaries); return; } log.warn("InfluxDB 重试写入失败,HTTP状态码: {}, url: {}, body: {}", retryResult.code, writeUrl, safeLog(retryResult.body)); @@ -126,7 +137,9 @@ public class InfluxPointDataWriter { } } log.warn("写入 InfluxDB 失败,HTTP状态码: {}, url: {}, body: {}", result.code, writeUrl, safeLog(result.body)); + return; } + logWriteSuccess(writeCount, writeUrl, startMillis, pointSummaries); } catch (Exception e) { log.warn("写入 InfluxDB 失败: {}", e.getMessage()); } @@ -173,7 +186,8 @@ public class InfluxPointDataWriter { startTime.getTime(), endTime.getTime() ); - values = parseInfluxQlResponse(executeRequestWithResponse(methodOrDefault(readMethod, "GET"), buildQueryUrl(regexQuery))); + String regexQueryUrl = buildQueryUrl(regexQuery); + values = parseInfluxQlResponse(executeRequestWithResponse(methodOrDefault(readMethod, "GET"), regexQueryUrl)); return values; } catch (Exception e) { log.warn("查询 InfluxDB 曲线失败: {}", e.getMessage()); @@ -204,7 +218,8 @@ public class InfluxPointDataWriter { try { String queryUrl = buildQueryUrl(influxQl); - return parseInfluxQlResponse(executeRequestWithResponse(methodOrDefault(readMethod, "GET"), queryUrl)); + List values = parseInfluxQlResponse(executeRequestWithResponse(methodOrDefault(readMethod, "GET"), queryUrl)); + return values; } catch (Exception e) { log.warn("按 pointKey 查询 InfluxDB 曲线失败: {}", e.getMessage()); return Collections.emptyList(); @@ -234,24 +249,9 @@ public class InfluxPointDataWriter { ); try { + String queryUrl = buildQueryUrl(influxQl); List values = parseInfluxQlResponse( - executeRequestWithResponse(methodOrDefault(readMethod, "GET"), buildQueryUrl(influxQl)) - ); - if (!values.isEmpty()) { - return values.get(0); - } - - String regexQuery = String.format( - "SELECT \"value\" FROM \"%s\" WHERE \"site_id\" = '%s' AND \"point_key\" =~ /(?i)^%s$/ " + - "AND time >= %dms AND time <= %dms ORDER BY time DESC LIMIT 1", - measurement, - escapeTagValue(normalizedSiteId), - escapeRegex(normalizedPointKey), - startTime.getTime(), - endTime.getTime() - ); - values = parseInfluxQlResponse( - executeRequestWithResponse(methodOrDefault(readMethod, "GET"), buildQueryUrl(regexQuery)) + executeRequestWithResponse(methodOrDefault(readMethod, "GET"), queryUrl) ); return values.isEmpty() ? null : values.get(0); } catch (Exception e) { @@ -260,6 +260,21 @@ public class InfluxPointDataWriter { } } + private void logWriteSuccess(int writeCount, String writeUrl, long startMillis, List pointSummaries) { + influxWriteSuccessLog.info("InfluxDB写入成功, count: {}, points: {}, url: {}, costMs: {}", + writeCount, pointSummaries, writeUrl, System.currentTimeMillis() - startMillis); + } + + private int countWriteLines(StringBuilder body) { + int count = 0; + for (int i = 0; i < body.length(); i++) { + if (body.charAt(i) == '\n') { + count++; + } + } + return count; + } + private String buildWriteUrl() { if (isV2WritePath()) { return buildV2WriteUrl(); diff --git a/ems-system/src/main/java/com/xzzn/ems/service/impl/DeviceDataProcessServiceImpl.java b/ems-system/src/main/java/com/xzzn/ems/service/impl/DeviceDataProcessServiceImpl.java index 9f7df84..4ba84bc 100644 --- a/ems-system/src/main/java/com/xzzn/ems/service/impl/DeviceDataProcessServiceImpl.java +++ b/ems-system/src/main/java/com/xzzn/ems/service/impl/DeviceDataProcessServiceImpl.java @@ -2816,15 +2816,16 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i private JSONArray parseJsonData(String message) { try { - JSONObject object = JSONObject.parseObject(message); - return object.getJSONArray("payload"); - } catch (JSONException e) { - log.info("mqtt message is not a json object: " + e.getMessage()); - try { - return JSONArray.parseArray(message); - } catch (Exception arrayException) { - log.info("mqtt message is not a json array: " + e.getMessage()); + Object parsed = JSON.parse(message); + if (parsed instanceof JSONArray) { + return (JSONArray) parsed; } + if (parsed instanceof JSONObject) { + return ((JSONObject) parsed).getJSONArray("payload"); + } + log.info("mqtt message root type is unsupported: {}", parsed == null ? "null" : parsed.getClass().getName()); + } catch (JSONException e) { + log.info("mqtt message parse failed: {}", e.getMessage()); } return null; } @@ -3954,15 +3955,22 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i log.info("deviceId:" + deviceId); + if (checkJsonDataEmpty(jsonData)) { + log.warn("设备告警Data为空,跳过告警处理,siteId: {},deviceId: {}", siteId, deviceId); + continue; + } + // 存放mqtt原始每个设备最晚一次数据,便于后面点位获取数据 redisCache.setCacheObject(RedisKeyConstants.ORIGINAL_MQTT_DATA_ALARM + siteId + "_" + deviceId, obj); // 存放每次同步数据,失效时间(同同步时间)-用于判断是否正常同步数据 redisCache.setCacheObject(RedisKeyConstants.SYNC_DATA_ALARM + siteId + "_" + deviceId, obj, 1, TimeUnit.MINUTES); // 处理设备数据,根据不同设备类型进行不同的数据处理 String deviceCategory = processingDeviceAlarmData(siteId, deviceId, jsonData, dataUpdateTime); - if (StringUtils.isEmpty(deviceCategory)) { + if (StringUtils.isNotEmpty(deviceCategory)) { // 处理告警信息 alarmDataProcess(siteId, deviceId, jsonData, alarmMatchInfo, deviceCategory); + } else { + log.warn("设备告警数据未识别到设备类型,跳过告警处理,siteId: {},deviceId: {}", siteId, deviceId); } } } @@ -4097,8 +4105,21 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i private void alarmDataProcess(String siteId, String deviceId, String jsonData, Map alarmInfoData, String category) { + if (StringUtils.isEmpty(category)) { + log.warn("设备类型为空,跳过告警处理,siteId: {},deviceId: {}", siteId, deviceId); + return; + } + if (alarmInfoData == null || alarmInfoData.isEmpty()) { + log.warn("告警匹配缓存为空,跳过告警处理,siteId: {},deviceId: {},category: {}", siteId, deviceId, category); + return; + } Map obj = JSON.parseObject(jsonData, new TypeReference>() { }); + if (obj == null || obj.isEmpty()) { + log.warn("设备告警Data解析结果为空,跳过告警处理,siteId: {},deviceId: {},category: {},data: {}", + siteId, deviceId, category, jsonData); + return; + } String redisKey = RedisKeyConstants.LATEST_ALARM_RECORD + "_" + siteId + "_" + deviceId; // 获取redis里面的当前有效告警遍历添加到已存在告警key里面 @@ -4114,12 +4135,8 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i // 结合同步数据,筛选簇需要更新的告警信息 List needUpdateKeys = obj.entrySet().stream() .filter(entry -> { - Object valueObj = entry.getValue(); - if (valueObj == null) { - return false; - } - int value = Integer.parseInt(valueObj.toString()); - return value == 0 && currentAlarmKeys.contains(entry.getKey()); + Long value = convertToLong(entry.getValue()); + return value != null && value == 0L && currentAlarmKeys.contains(entry.getKey()); }) .map(Map.Entry::getKey) .collect(Collectors.toList()); @@ -4129,11 +4146,13 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i if (!needUpdateKeys.isEmpty()) { List records = iEmsAlarmRecordsService.getAllUnfinishedRecords(needUpdateKeys, siteId, deviceId); // 转为Map便于快速获取 - needUpdateMap = records.stream() - .collect(Collectors.toMap( - EmsAlarmRecords::getAlarmPoint, - record -> record - )); + if (CollectionUtils.isNotEmpty(records)) { + needUpdateMap = records.stream() + .collect(Collectors.toMap( + EmsAlarmRecords::getAlarmPoint, + record -> record + )); + } } @@ -4147,20 +4166,28 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i continue; } String key = entry.getKey(); - Long value = (Long) entry.getValue(); + Long value = convertToLong(entry.getValue()); + if (value == null) { + log.warn("告警值不是有效数字,忽略处理,siteId: {},deviceId: {},point: {},value: {}", + siteId, deviceId, key, entry.getValue()); + continue; + } Boolean isCurrentAlarm = currentAlarmKeys.contains(key); String matchRedisKey = siteId + "_" + category + "_" + key; // 默认告警值0是正常,1是异常 Long alarmData = 1L; - Object cacheObj = alarmInfoData.get(matchRedisKey); - if (cacheObj != null) { - EmsAlarmMatchData matchInfo = JSON.toJavaObject(cacheObj, EmsAlarmMatchData.class); + EmsAlarmMatchData matchInfo = alarmInfoData.get(matchRedisKey); + if (matchInfo != null) { alarmData = matchInfo.getAlarmData(); } // 处理告警 if (value.equals(alarmData) && !isCurrentAlarm) { // 上送值和匹配值相同,新增告警 - EmsAlarmMatchData matchInfo = JSON.toJavaObject(cacheObj, EmsAlarmMatchData.class); + if (matchInfo == null) { + log.warn("未找到告警匹配配置,忽略新增告警,siteId: {},deviceId: {},category: {},point: {}", + siteId, deviceId, category, key); + continue; + } EmsAlarmRecords emsAlarmRecord = convertAlarmRecord(siteId, deviceId, matchInfo); saveOrUpdateList.add(emsAlarmRecord); newAddRecordList.add(emsAlarmRecord); @@ -4219,6 +4246,24 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i } } + private Long convertToLong(Object value) { + if (value == null) { + return null; + } + if (value instanceof Number) { + return ((Number) value).longValue(); + } + String text = String.valueOf(value); + if (StringUtils.isBlank(text)) { + return null; + } + try { + return new BigDecimal(text.trim()).longValue(); + } catch (NumberFormatException ex) { + return null; + } + } + private EmsAlarmRecords convertAlarmRecord(String siteId, String deviceId, EmsAlarmMatchData matchInfo) { EmsAlarmRecords emsAlarmRecords = new EmsAlarmRecords(); emsAlarmRecords.setSiteId(siteId); diff --git a/ems-system/src/main/resources/mapper/ems/EmsAlarmRecordsMapper.xml b/ems-system/src/main/resources/mapper/ems/EmsAlarmRecordsMapper.xml index 21972bc..f67fe8c 100644 --- a/ems-system/src/main/resources/mapper/ems/EmsAlarmRecordsMapper.xml +++ b/ems-system/src/main/resources/mapper/ems/EmsAlarmRecordsMapper.xml @@ -125,7 +125,10 @@ t.alarm_content as alarmContent, t.ticket_no as ticketNo, t.id - from ems_alarm_records t INNER JOIN ems_devices_setting t2 on t.site_id = t2.site_id and t.device_id = t2.device_id + from ems_alarm_records t + INNER JOIN ems_devices_setting t2 + on t.site_id COLLATE utf8mb4_general_ci = t2.site_id + and t.device_id COLLATE utf8mb4_general_ci = t2.device_id where t.site_id = #{siteId} and t.status != 1 and t.alarm_level in ('C','D') @@ -156,7 +159,9 @@ t.ticket_no as ticketNo, t.id from ems_alarm_records t - LEFT JOIN ems_devices_setting t2 on t.site_id = t2.site_id and t.device_id = t2.device_id + LEFT JOIN ems_devices_setting t2 + on t.site_id = t2.site_id + and t.device_id = t2.device_id where t.site_id = #{siteId} AND t.device_type = #{deviceType} @@ -265,4 +270,4 @@ and device_id = #{deviceId} and alarm_point is not null - \ No newline at end of file + diff --git a/ems-system/src/main/resources/mapper/ems/EmsDailyEnergyDataMapper.xml b/ems-system/src/main/resources/mapper/ems/EmsDailyEnergyDataMapper.xml index 0785e57..4601b75 100644 --- a/ems-system/src/main/resources/mapper/ems/EmsDailyEnergyDataMapper.xml +++ b/ems-system/src/main/resources/mapper/ems/EmsDailyEnergyDataMapper.xml @@ -288,65 +288,80 @@