From 3753075b101fbaa309f34206e4b9dd949985ad60 Mon Sep 17 00:00:00 2001 From: mashili Date: Mon, 17 Nov 2025 02:47:43 +0800 Subject: [PATCH] =?UTF-8?q?=E5=90=8C=E6=AD=A5=E6=97=A5=E5=BF=97&=E7=9B=91?= =?UTF-8?q?=E5=90=AC=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/ems/MqttMessageController.java | 61 ++++++- .../java/com/xzzn/ems/domain/MqttSyncLog.java | 15 ++ .../xzzn/ems/service/IMqttSyncLogService.java | 8 +- .../service/impl/MqttSyncLogServiceImpl.java | 172 +++++++++++++++--- .../mapper/ems/MqttSyncLogMapper.xml | 7 +- 5 files changed, 232 insertions(+), 31 deletions(-) diff --git a/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java b/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java index 006a7a6..afdaf19 100644 --- a/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java +++ b/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java @@ -100,8 +100,14 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber { return this::handleSystemStatus; } else if (topic.contains("STRATEGY")) { return this::handleStrategyData; - } else if (topic.contains("PROTECTION_PLAN")) { + } else if (topic.equals("FAULT_PROTECTION_PLAN_UP")) { return this::handleFaultProtPlanData; + } else if (topic.equals("FAULT_ALARM_RECORD_UP")) { + return this::handleFaultAlarmData; + } else if (topic.equals("FAULT_PLAN_ISSUE_UP")) { + return this::handleFaultPlanIssueData; + } else if (topic.equals("DEVICE_CHANGE_LOG_UP")) { + return this::handleDeviceChangeLogData; } else { return this::handleDeviceData; } @@ -156,7 +162,7 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber { } - // 处理运行策略数据 + // 处理运行策略数据:云端-本地 private void handleStrategyData(String topic, MqttMessage message) { String payload = new String(message.getPayload()); System.out.println("[处理运行策略数据] data: " + payload); @@ -170,20 +176,59 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber { } } - // 处理设备保护告警策略数据 + // 处理设备保护告警策略数据:云端-本地 private void handleFaultProtPlanData(String topic, MqttMessage message) { String payload = new String(message.getPayload()); System.out.println("[处理设备保护告警策略数据] data: " + payload); try { // 业务处理逻辑 - MqttSyncLog planLog = JSON.parseObject(payload, MqttSyncLog.class); - if (planLog != null) { - iMqttSyncLogService.handleMqttPlanData(planLog); - } + iMqttSyncLogService.handleMqttPlanData(payload); emsMqttMessageService.insertMqttOriginalMessage(topic,payload); } catch (Exception e) { - log.error("Failed to process strategy data message: " + e.getMessage(), e); + log.error("Failed to process fault plan data message: " + e.getMessage(), e); + } + } + + // 处理保护策略告警信息:本地-云端 + private void handleFaultAlarmData(String topic, MqttMessage message) { + String payload = new String(message.getPayload()); + System.out.println("[处理本地保护策略告警信息到云端] data: " + payload); + try { + // 业务处理逻辑 + iMqttSyncLogService.handleFaultAlarmData(payload); + + emsMqttMessageService.insertMqttOriginalMessage(topic,payload); + } catch (Exception e) { + log.error("Failed to process fault plan alarm data message: " + e.getMessage(), e); + } + } + + // 处理保护策略下发日志:本地-云端 + private void handleFaultPlanIssueData(String topic, MqttMessage message) { + String payload = new String(message.getPayload()); + System.out.println("[处理本地保护策略下发日志到云端] data: " + payload); + try { + // 业务处理逻辑 + iMqttSyncLogService.handleFaultPlanIssueData(payload); + + emsMqttMessageService.insertMqttOriginalMessage(topic,payload); + } catch (Exception e) { + log.error("Failed to process fault plan issue log message: " + e.getMessage(), e); + } + } + + // 处理设备状态变更日志:本地-云端 + private void handleDeviceChangeLogData(String topic, MqttMessage message) { + String payload = new String(message.getPayload()); + System.out.println("[处理本地的保护策略告警信息到云端] data: " + payload); + try { + // 业务处理逻辑 + iMqttSyncLogService.handleDeviceChangeLogData(payload); + + emsMqttMessageService.insertMqttOriginalMessage(topic,payload); + } catch (Exception e) { + log.error("Failed to process device change log message: " + e.getMessage(), e); } } diff --git a/ems-system/src/main/java/com/xzzn/ems/domain/MqttSyncLog.java b/ems-system/src/main/java/com/xzzn/ems/domain/MqttSyncLog.java index cc8b5cf..a2b72aa 100644 --- a/ems-system/src/main/java/com/xzzn/ems/domain/MqttSyncLog.java +++ b/ems-system/src/main/java/com/xzzn/ems/domain/MqttSyncLog.java @@ -46,6 +46,10 @@ public class MqttSyncLog extends BaseEntity @Excel(name = "失败原因", readConverterExp = "s=tatus=FAIL时填写") private String errorMsg; + /** 同步对象 */ + @Excel(name = "同步对象") + private String syncObject; + /** 同步目标 */ @Excel(name = "同步目标") private String target; @@ -130,6 +134,16 @@ public class MqttSyncLog extends BaseEntity return errorMsg; } + public void setSyncObject(String syncObject) + { + this.syncObject = syncObject; + } + + public String getSyncObject() + { + return syncObject; + } + public void setTarget(String target) { this.target = target; @@ -151,6 +165,7 @@ public class MqttSyncLog extends BaseEntity .append("content", getContent()) .append("status", getStatus()) .append("errorMsg", getErrorMsg()) + .append("syncObject", getSyncObject()) .append("target", getTarget()) .append("createTime", getCreateTime()) .toString(); diff --git a/ems-system/src/main/java/com/xzzn/ems/service/IMqttSyncLogService.java b/ems-system/src/main/java/com/xzzn/ems/service/IMqttSyncLogService.java index b984e0f..0e1d51b 100644 --- a/ems-system/src/main/java/com/xzzn/ems/service/IMqttSyncLogService.java +++ b/ems-system/src/main/java/com/xzzn/ems/service/IMqttSyncLogService.java @@ -62,5 +62,11 @@ public interface IMqttSyncLogService // 处理策略信息 public void handleMqttStrategyData(String payload); // 处理设备告警保护信息 - public void handleMqttPlanData(MqttSyncLog planLog); + public void handleMqttPlanData(String payload); + // 处理设备保护策略触发的告警信息 + public void handleFaultAlarmData(String payload); + // 处理保护策略告警信息 + public void handleFaultPlanIssueData(String payload); + // 处理设备运行状态变更 + public void handleDeviceChangeLogData(String payload); } diff --git a/ems-system/src/main/java/com/xzzn/ems/service/impl/MqttSyncLogServiceImpl.java b/ems-system/src/main/java/com/xzzn/ems/service/impl/MqttSyncLogServiceImpl.java index 8797e9e..42401b0 100644 --- a/ems-system/src/main/java/com/xzzn/ems/service/impl/MqttSyncLogServiceImpl.java +++ b/ems-system/src/main/java/com/xzzn/ems/service/impl/MqttSyncLogServiceImpl.java @@ -5,13 +5,13 @@ import java.util.List; import com.alibaba.fastjson2.JSON; import com.xzzn.common.utils.DateUtils; import com.xzzn.common.utils.StringUtils; -import com.xzzn.ems.service.IEmsFaultProtectionPlanService; -import com.xzzn.ems.service.IEmsStrategyService; +import com.xzzn.ems.domain.EmsSiteSetting; +import com.xzzn.ems.mapper.EmsSiteSettingMapper; +import com.xzzn.ems.service.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.xzzn.ems.mapper.MqttSyncLogMapper; import com.xzzn.ems.domain.MqttSyncLog; -import com.xzzn.ems.service.IMqttSyncLogService; /** * MQTT云上本地同步日志Service业务层处理 @@ -28,6 +28,18 @@ public class MqttSyncLogServiceImpl implements IMqttSyncLogService private IEmsFaultProtectionPlanService emsFaultProtectionPlanService; @Autowired private IEmsStrategyService emsStrategyService; + @Autowired + private IEmsAlarmRecordsService emsAlarmRecordsService; + @Autowired + private EmsSiteSettingMapper emsSiteSettingMapper; + @Autowired + private IEmsFaultIssueLogService emsFaultIssueLogService; + @Autowired + private IEmsDeviceChangeLogService emsDeviceChangeLogService; + @Autowired + private IEmsStrategyTempService emsStrategyTempService; + @Autowired + private IEmsStrategyTimeConfigService emsStrategyTimeConfigService; /** * 查询MQTT云上本地同步日志 @@ -102,12 +114,103 @@ public class MqttSyncLogServiceImpl implements IMqttSyncLogService return mqttSyncLogMapper.deleteMqttSyncLogById(id); } + // 校验同步目标是否该站点 + private boolean checkIsSite(String target) { + if (StringUtils.isEmpty(target)) { + return false; + } + EmsSiteSetting emsSiteSetting = emsSiteSettingMapper.selectEmsSiteSettingBySiteId(target); + if (emsSiteSetting == null) { + return false; + } + return true; + } + /** * 处理云上运行策略数据 + * 云上-本地:校验是否该站点 * @param payload */ @Override public void handleMqttStrategyData(String payload) { + MqttSyncLog syncLog = JSON.parseObject(payload, MqttSyncLog.class); + if (syncLog != null) { + // 校验是否该站点数据 + if (!checkIsSite(syncLog.getTarget())) { + return; + } + // 校验是否存在 + String syncId = syncLog.getSyncId(); + MqttSyncLog existLog = mqttSyncLogMapper.selectMqttSyncLogBySyncId(syncId); + if (existLog == null) { + try { + // 根据不同操作更新 + String operateType = syncLog.getOperateType(); + if (!StringUtils.isEmpty(operateType)) { + // 不同表操作 + String tableName = syncLog.getTableName(); + switch (tableName) { + case "ems_strategy_running": + emsStrategyService.dealStrategyRunningData(syncLog.getContent(), operateType); + break; + case "ems_strategy_temp": + emsStrategyTempService.dealStrategyTempData(syncLog.getContent(), operateType); + break; + case "ems_strategy_time_config": + emsStrategyTimeConfigService.dealStrategyTimeData(syncLog.getContent(), operateType); + break; + default: + break; + } + } + } catch (Exception e) { + syncLog.setStatus("FAIL"); + syncLog.setErrorMsg(e.getMessage()); + } + // 保存日志 + insertMqttSyncLog(syncLog); + } + } + } + + /** + * 处理云上同步的告警保护信息 + * 云上-本地:校验是否该站点 + * @param payload + */ + @Override + public void handleMqttPlanData(String payload) { + MqttSyncLog planLog = JSON.parseObject(payload, MqttSyncLog.class); + if (planLog != null) { + // 校验是否该站点数据 + if (!checkIsSite(planLog.getTarget())) { + return; + } + + // 校验是否存在 + String syncId = planLog.getSyncId(); + MqttSyncLog existLog = mqttSyncLogMapper.selectMqttSyncLogBySyncId(syncId); + if (existLog == null) { + // 处理数据 + String operateType = planLog.getOperateType(); + if (!StringUtils.isEmpty(operateType)) { + emsFaultProtectionPlanService.dealSyncData(planLog.getContent(), operateType); + } + + // 保存日志 + insertMqttSyncLog(planLog); + } + } + + } + + /** + * 处理设备保护策略触发的告警信息 + * 本地-云上 + * @param payload + */ + @Override + public void handleFaultAlarmData(String payload) { MqttSyncLog syncLog = JSON.parseObject(payload, MqttSyncLog.class); if (syncLog != null) { // 校验是否存在 @@ -117,34 +220,61 @@ public class MqttSyncLogServiceImpl implements IMqttSyncLogService // 根据不同操作更新 String operateType = syncLog.getOperateType(); if (!StringUtils.isEmpty(operateType)) { - emsStrategyService.dealStrategyData(syncLog.getContent(),operateType); + emsAlarmRecordsService.dealSyncData(syncLog.getContent(),operateType); } + + // 保存日志 + insertMqttSyncLog(syncLog); } - // 保存日志 - insertMqttSyncLog(syncLog); } } /** - * 处理云上同步的告警保护信息 - * @param planLog + * 处理保护策略告警信息 + * 本地-云上 + * @param payload */ @Override - public void handleMqttPlanData(MqttSyncLog planLog) { - // 校验是否存在 - String syncId = planLog.getSyncId(); - MqttSyncLog existLog = mqttSyncLogMapper.selectMqttSyncLogBySyncId(syncId); - if (existLog != null) { - return; - } - // 保存日志 - insertMqttSyncLog(planLog); + public void handleFaultPlanIssueData(String payload) { + MqttSyncLog syncLog = JSON.parseObject(payload, MqttSyncLog.class); + if (syncLog != null) { + // 校验是否存在 + String syncId = syncLog.getSyncId(); + MqttSyncLog existLog = mqttSyncLogMapper.selectMqttSyncLogBySyncId(syncId); + if (existLog == null) { + // 根据不同操作更新 + String operateType = syncLog.getOperateType(); + if (!StringUtils.isEmpty(operateType)) { + emsFaultIssueLogService.dealSyncData(syncLog.getContent(),operateType); + } - // 处理数据 - String operateType = planLog.getOperateType(); - if (!StringUtils.isEmpty(operateType)) { - emsFaultProtectionPlanService.dealSyncData(planLog.getContent(),operateType); + // 保存日志 + insertMqttSyncLog(syncLog); + } } + } + /** + * 处理设备运行状态变更日志 + * 本地-云上 + * @param payload + */ + @Override + public void handleDeviceChangeLogData(String payload) { + MqttSyncLog syncLog = JSON.parseObject(payload, MqttSyncLog.class); + if (syncLog == null) { + // 校验是否存在 + String syncId = syncLog.getSyncId(); + MqttSyncLog existLog = mqttSyncLogMapper.selectMqttSyncLogBySyncId(syncId); + if (existLog == null) { + // 根据不同操作更新 + String operateType = syncLog.getOperateType(); + if (!StringUtils.isEmpty(operateType)) { + emsDeviceChangeLogService.dealSyncData(syncLog.getContent(),operateType); + } + // 保存日志 + insertMqttSyncLog(syncLog); + } + } } } diff --git a/ems-system/src/main/resources/mapper/ems/MqttSyncLogMapper.xml b/ems-system/src/main/resources/mapper/ems/MqttSyncLogMapper.xml index 1a1a2f1..01a0bd6 100644 --- a/ems-system/src/main/resources/mapper/ems/MqttSyncLogMapper.xml +++ b/ems-system/src/main/resources/mapper/ems/MqttSyncLogMapper.xml @@ -13,12 +13,13 @@ + - select id, sync_id, topic, operate_type, table_name, content, status, error_msg, target, create_time from mqtt_sync_log + select id, sync_id, topic, operate_type, table_name, content, status, error_msg, sync_object, target, create_time from mqtt_sync_log @@ -50,6 +52,7 @@ content, status, error_msg, + sync_object, target, create_time, @@ -61,6 +64,7 @@ #{content}, #{status}, #{errorMsg}, + #{syncObject}, #{target}, #{createTime}, @@ -76,6 +80,7 @@ content = #{content}, status = #{status}, error_msg = #{errorMsg}, + sync_object = #{syncObject}, target = #{target}, create_time = #{createTime},