diff --git a/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java b/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java
index 006a7a6..afdaf19 100644
--- a/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java
+++ b/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java
@@ -100,8 +100,14 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber {
return this::handleSystemStatus;
} else if (topic.contains("STRATEGY")) {
return this::handleStrategyData;
- } else if (topic.contains("PROTECTION_PLAN")) {
+ } else if (topic.equals("FAULT_PROTECTION_PLAN_UP")) {
return this::handleFaultProtPlanData;
+ } else if (topic.equals("FAULT_ALARM_RECORD_UP")) {
+ return this::handleFaultAlarmData;
+ } else if (topic.equals("FAULT_PLAN_ISSUE_UP")) {
+ return this::handleFaultPlanIssueData;
+ } else if (topic.equals("DEVICE_CHANGE_LOG_UP")) {
+ return this::handleDeviceChangeLogData;
} else {
return this::handleDeviceData;
}
@@ -156,7 +162,7 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber {
}
- // 处理运行策略数据
+ // 处理运行策略数据:云端-本地
private void handleStrategyData(String topic, MqttMessage message) {
String payload = new String(message.getPayload());
System.out.println("[处理运行策略数据] data: " + payload);
@@ -170,20 +176,59 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber {
}
}
- // 处理设备保护告警策略数据
+ // 处理设备保护告警策略数据:云端-本地
private void handleFaultProtPlanData(String topic, MqttMessage message) {
String payload = new String(message.getPayload());
System.out.println("[处理设备保护告警策略数据] data: " + payload);
try {
// 业务处理逻辑
- MqttSyncLog planLog = JSON.parseObject(payload, MqttSyncLog.class);
- if (planLog != null) {
- iMqttSyncLogService.handleMqttPlanData(planLog);
- }
+ iMqttSyncLogService.handleMqttPlanData(payload);
emsMqttMessageService.insertMqttOriginalMessage(topic,payload);
} catch (Exception e) {
- log.error("Failed to process strategy data message: " + e.getMessage(), e);
+ log.error("Failed to process fault plan data message: " + e.getMessage(), e);
+ }
+ }
+
+ // 处理保护策略告警信息:本地-云端
+ private void handleFaultAlarmData(String topic, MqttMessage message) {
+ String payload = new String(message.getPayload());
+ System.out.println("[处理本地保护策略告警信息到云端] data: " + payload);
+ try {
+ // 业务处理逻辑
+ iMqttSyncLogService.handleFaultAlarmData(payload);
+
+ emsMqttMessageService.insertMqttOriginalMessage(topic,payload);
+ } catch (Exception e) {
+ log.error("Failed to process fault plan alarm data message: " + e.getMessage(), e);
+ }
+ }
+
+ // 处理保护策略下发日志:本地-云端
+ private void handleFaultPlanIssueData(String topic, MqttMessage message) {
+ String payload = new String(message.getPayload());
+ System.out.println("[处理本地保护策略下发日志到云端] data: " + payload);
+ try {
+ // 业务处理逻辑
+ iMqttSyncLogService.handleFaultPlanIssueData(payload);
+
+ emsMqttMessageService.insertMqttOriginalMessage(topic,payload);
+ } catch (Exception e) {
+ log.error("Failed to process fault plan issue log message: " + e.getMessage(), e);
+ }
+ }
+
+ // 处理设备状态变更日志:本地-云端
+ private void handleDeviceChangeLogData(String topic, MqttMessage message) {
+ String payload = new String(message.getPayload());
+ System.out.println("[处理本地的保护策略告警信息到云端] data: " + payload);
+ try {
+ // 业务处理逻辑
+ iMqttSyncLogService.handleDeviceChangeLogData(payload);
+
+ emsMqttMessageService.insertMqttOriginalMessage(topic,payload);
+ } catch (Exception e) {
+ log.error("Failed to process device change log message: " + e.getMessage(), e);
}
}
diff --git a/ems-system/src/main/java/com/xzzn/ems/domain/MqttSyncLog.java b/ems-system/src/main/java/com/xzzn/ems/domain/MqttSyncLog.java
index cc8b5cf..a2b72aa 100644
--- a/ems-system/src/main/java/com/xzzn/ems/domain/MqttSyncLog.java
+++ b/ems-system/src/main/java/com/xzzn/ems/domain/MqttSyncLog.java
@@ -46,6 +46,10 @@ public class MqttSyncLog extends BaseEntity
@Excel(name = "失败原因", readConverterExp = "s=tatus=FAIL时填写")
private String errorMsg;
+ /** 同步对象 */
+ @Excel(name = "同步对象")
+ private String syncObject;
+
/** 同步目标 */
@Excel(name = "同步目标")
private String target;
@@ -130,6 +134,16 @@ public class MqttSyncLog extends BaseEntity
return errorMsg;
}
+ public void setSyncObject(String syncObject)
+ {
+ this.syncObject = syncObject;
+ }
+
+ public String getSyncObject()
+ {
+ return syncObject;
+ }
+
public void setTarget(String target)
{
this.target = target;
@@ -151,6 +165,7 @@ public class MqttSyncLog extends BaseEntity
.append("content", getContent())
.append("status", getStatus())
.append("errorMsg", getErrorMsg())
+ .append("syncObject", getSyncObject())
.append("target", getTarget())
.append("createTime", getCreateTime())
.toString();
diff --git a/ems-system/src/main/java/com/xzzn/ems/service/IMqttSyncLogService.java b/ems-system/src/main/java/com/xzzn/ems/service/IMqttSyncLogService.java
index b984e0f..0e1d51b 100644
--- a/ems-system/src/main/java/com/xzzn/ems/service/IMqttSyncLogService.java
+++ b/ems-system/src/main/java/com/xzzn/ems/service/IMqttSyncLogService.java
@@ -62,5 +62,11 @@ public interface IMqttSyncLogService
// 处理策略信息
public void handleMqttStrategyData(String payload);
// 处理设备告警保护信息
- public void handleMqttPlanData(MqttSyncLog planLog);
+ public void handleMqttPlanData(String payload);
+ // 处理设备保护策略触发的告警信息
+ public void handleFaultAlarmData(String payload);
+ // 处理保护策略告警信息
+ public void handleFaultPlanIssueData(String payload);
+ // 处理设备运行状态变更
+ public void handleDeviceChangeLogData(String payload);
}
diff --git a/ems-system/src/main/java/com/xzzn/ems/service/impl/MqttSyncLogServiceImpl.java b/ems-system/src/main/java/com/xzzn/ems/service/impl/MqttSyncLogServiceImpl.java
index 8797e9e..42401b0 100644
--- a/ems-system/src/main/java/com/xzzn/ems/service/impl/MqttSyncLogServiceImpl.java
+++ b/ems-system/src/main/java/com/xzzn/ems/service/impl/MqttSyncLogServiceImpl.java
@@ -5,13 +5,13 @@ import java.util.List;
import com.alibaba.fastjson2.JSON;
import com.xzzn.common.utils.DateUtils;
import com.xzzn.common.utils.StringUtils;
-import com.xzzn.ems.service.IEmsFaultProtectionPlanService;
-import com.xzzn.ems.service.IEmsStrategyService;
+import com.xzzn.ems.domain.EmsSiteSetting;
+import com.xzzn.ems.mapper.EmsSiteSettingMapper;
+import com.xzzn.ems.service.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.xzzn.ems.mapper.MqttSyncLogMapper;
import com.xzzn.ems.domain.MqttSyncLog;
-import com.xzzn.ems.service.IMqttSyncLogService;
/**
* MQTT云上本地同步日志Service业务层处理
@@ -28,6 +28,18 @@ public class MqttSyncLogServiceImpl implements IMqttSyncLogService
private IEmsFaultProtectionPlanService emsFaultProtectionPlanService;
@Autowired
private IEmsStrategyService emsStrategyService;
+ @Autowired
+ private IEmsAlarmRecordsService emsAlarmRecordsService;
+ @Autowired
+ private EmsSiteSettingMapper emsSiteSettingMapper;
+ @Autowired
+ private IEmsFaultIssueLogService emsFaultIssueLogService;
+ @Autowired
+ private IEmsDeviceChangeLogService emsDeviceChangeLogService;
+ @Autowired
+ private IEmsStrategyTempService emsStrategyTempService;
+ @Autowired
+ private IEmsStrategyTimeConfigService emsStrategyTimeConfigService;
/**
* 查询MQTT云上本地同步日志
@@ -102,12 +114,103 @@ public class MqttSyncLogServiceImpl implements IMqttSyncLogService
return mqttSyncLogMapper.deleteMqttSyncLogById(id);
}
+ // 校验同步目标是否该站点
+ private boolean checkIsSite(String target) {
+ if (StringUtils.isEmpty(target)) {
+ return false;
+ }
+ EmsSiteSetting emsSiteSetting = emsSiteSettingMapper.selectEmsSiteSettingBySiteId(target);
+ if (emsSiteSetting == null) {
+ return false;
+ }
+ return true;
+ }
+
/**
* 处理云上运行策略数据
+ * 云上-本地:校验是否该站点
* @param payload
*/
@Override
public void handleMqttStrategyData(String payload) {
+ MqttSyncLog syncLog = JSON.parseObject(payload, MqttSyncLog.class);
+ if (syncLog != null) {
+ // 校验是否该站点数据
+ if (!checkIsSite(syncLog.getTarget())) {
+ return;
+ }
+ // 校验是否存在
+ String syncId = syncLog.getSyncId();
+ MqttSyncLog existLog = mqttSyncLogMapper.selectMqttSyncLogBySyncId(syncId);
+ if (existLog == null) {
+ try {
+ // 根据不同操作更新
+ String operateType = syncLog.getOperateType();
+ if (!StringUtils.isEmpty(operateType)) {
+ // 不同表操作
+ String tableName = syncLog.getTableName();
+ switch (tableName) {
+ case "ems_strategy_running":
+ emsStrategyService.dealStrategyRunningData(syncLog.getContent(), operateType);
+ break;
+ case "ems_strategy_temp":
+ emsStrategyTempService.dealStrategyTempData(syncLog.getContent(), operateType);
+ break;
+ case "ems_strategy_time_config":
+ emsStrategyTimeConfigService.dealStrategyTimeData(syncLog.getContent(), operateType);
+ break;
+ default:
+ break;
+ }
+ }
+ } catch (Exception e) {
+ syncLog.setStatus("FAIL");
+ syncLog.setErrorMsg(e.getMessage());
+ }
+ // 保存日志
+ insertMqttSyncLog(syncLog);
+ }
+ }
+ }
+
+ /**
+ * 处理云上同步的告警保护信息
+ * 云上-本地:校验是否该站点
+ * @param payload
+ */
+ @Override
+ public void handleMqttPlanData(String payload) {
+ MqttSyncLog planLog = JSON.parseObject(payload, MqttSyncLog.class);
+ if (planLog != null) {
+ // 校验是否该站点数据
+ if (!checkIsSite(planLog.getTarget())) {
+ return;
+ }
+
+ // 校验是否存在
+ String syncId = planLog.getSyncId();
+ MqttSyncLog existLog = mqttSyncLogMapper.selectMqttSyncLogBySyncId(syncId);
+ if (existLog == null) {
+ // 处理数据
+ String operateType = planLog.getOperateType();
+ if (!StringUtils.isEmpty(operateType)) {
+ emsFaultProtectionPlanService.dealSyncData(planLog.getContent(), operateType);
+ }
+
+ // 保存日志
+ insertMqttSyncLog(planLog);
+ }
+ }
+
+ }
+
+ /**
+ * 处理设备保护策略触发的告警信息
+ * 本地-云上
+ * @param payload
+ */
+ @Override
+ public void handleFaultAlarmData(String payload) {
MqttSyncLog syncLog = JSON.parseObject(payload, MqttSyncLog.class);
if (syncLog != null) {
// 校验是否存在
@@ -117,34 +220,61 @@ public class MqttSyncLogServiceImpl implements IMqttSyncLogService
// 根据不同操作更新
String operateType = syncLog.getOperateType();
if (!StringUtils.isEmpty(operateType)) {
- emsStrategyService.dealStrategyData(syncLog.getContent(),operateType);
+ emsAlarmRecordsService.dealSyncData(syncLog.getContent(),operateType);
}
+
+ // 保存日志
+ insertMqttSyncLog(syncLog);
}
- // 保存日志
- insertMqttSyncLog(syncLog);
}
}
/**
- * 处理云上同步的告警保护信息
- * @param planLog
+ * 处理保护策略告警信息
+ * 本地-云上
+ * @param payload
*/
@Override
- public void handleMqttPlanData(MqttSyncLog planLog) {
- // 校验是否存在
- String syncId = planLog.getSyncId();
- MqttSyncLog existLog = mqttSyncLogMapper.selectMqttSyncLogBySyncId(syncId);
- if (existLog != null) {
- return;
- }
- // 保存日志
- insertMqttSyncLog(planLog);
+ public void handleFaultPlanIssueData(String payload) {
+ MqttSyncLog syncLog = JSON.parseObject(payload, MqttSyncLog.class);
+ if (syncLog != null) {
+ // 校验是否存在
+ String syncId = syncLog.getSyncId();
+ MqttSyncLog existLog = mqttSyncLogMapper.selectMqttSyncLogBySyncId(syncId);
+ if (existLog == null) {
+ // 根据不同操作更新
+ String operateType = syncLog.getOperateType();
+ if (!StringUtils.isEmpty(operateType)) {
+ emsFaultIssueLogService.dealSyncData(syncLog.getContent(),operateType);
+ }
- // 处理数据
- String operateType = planLog.getOperateType();
- if (!StringUtils.isEmpty(operateType)) {
- emsFaultProtectionPlanService.dealSyncData(planLog.getContent(),operateType);
+ // 保存日志
+ insertMqttSyncLog(syncLog);
+ }
}
+ }
+ /**
+ * 处理设备运行状态变更日志
+ * 本地-云上
+ * @param payload
+ */
+ @Override
+ public void handleDeviceChangeLogData(String payload) {
+ MqttSyncLog syncLog = JSON.parseObject(payload, MqttSyncLog.class);
+ if (syncLog == null) {
+ // 校验是否存在
+ String syncId = syncLog.getSyncId();
+ MqttSyncLog existLog = mqttSyncLogMapper.selectMqttSyncLogBySyncId(syncId);
+ if (existLog == null) {
+ // 根据不同操作更新
+ String operateType = syncLog.getOperateType();
+ if (!StringUtils.isEmpty(operateType)) {
+ emsDeviceChangeLogService.dealSyncData(syncLog.getContent(),operateType);
+ }
+ // 保存日志
+ insertMqttSyncLog(syncLog);
+ }
+ }
}
}
diff --git a/ems-system/src/main/resources/mapper/ems/MqttSyncLogMapper.xml b/ems-system/src/main/resources/mapper/ems/MqttSyncLogMapper.xml
index 1a1a2f1..01a0bd6 100644
--- a/ems-system/src/main/resources/mapper/ems/MqttSyncLogMapper.xml
+++ b/ems-system/src/main/resources/mapper/ems/MqttSyncLogMapper.xml
@@ -13,12 +13,13 @@
+
- select id, sync_id, topic, operate_type, table_name, content, status, error_msg, target, create_time from mqtt_sync_log
+ select id, sync_id, topic, operate_type, table_name, content, status, error_msg, sync_object, target, create_time from mqtt_sync_log
@@ -50,6 +52,7 @@
content,
status,
error_msg,
+ sync_object,
target,
create_time,
@@ -61,6 +64,7 @@
#{content},
#{status},
#{errorMsg},
+ #{syncObject},
#{target},
#{createTime},
@@ -76,6 +80,7 @@
content = #{content},
status = #{status},
error_msg = #{errorMsg},
+ sync_object = #{syncObject},
target = #{target},
create_time = #{createTime},