diff --git a/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java b/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java index ec1d846..006a7a6 100644 --- a/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java +++ b/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java @@ -3,7 +3,7 @@ package com.xzzn.web.controller.ems; import com.alibaba.fastjson2.JSON; import com.xzzn.common.utils.StringUtils; import com.xzzn.ems.domain.EmsMqttTopicConfig; -import com.xzzn.ems.domain.MqttSyncStrategyLog; +import com.xzzn.ems.domain.MqttSyncLog; import com.xzzn.ems.mapper.EmsMqttTopicConfigMapper; import com.xzzn.ems.service.*; import com.xzzn.framework.manager.MqttLifecycleManager; @@ -45,6 +45,8 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber { private EmsMqttTopicConfigMapper emsMqttTopicConfigMapper; @Autowired private IEmsStrategyService emsStrategyService; + @Autowired + private IMqttSyncLogService iMqttSyncLogService; @Autowired public MqttMessageController(MqttLifecycleManager mqttLifecycleManager) { @@ -98,6 +100,8 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber { return this::handleSystemStatus; } else if (topic.contains("STRATEGY")) { return this::handleStrategyData; + } else if (topic.contains("PROTECTION_PLAN")) { + return this::handleFaultProtPlanData; } else { return this::handleDeviceData; } @@ -158,20 +162,29 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber { System.out.println("[处理运行策略数据] data: " + payload); try { // 业务处理逻辑 - MqttSyncStrategyLog strategyLog = JSON.parseObject(payload, MqttSyncStrategyLog.class); - if (strategyLog != null) { - String content = strategyLog.getContent(); - // 根据不同操作更新 - String operateType = strategyLog.getOperateType(); - if (!StringUtils.isEmpty(operateType)) { - emsStrategyService.dealStrategyData(content,operateType); - } - } + iMqttSyncLogService.handleMqttStrategyData(payload); + emsMqttMessageService.insertMqttOriginalMessage(topic,payload); } catch (Exception e) { - log.error("Failed to process system status message: " + e.getMessage(), e); + log.error("Failed to process strategy data message: " + e.getMessage(), e); } + } + // 处理设备保护告警策略数据 + private void handleFaultProtPlanData(String topic, MqttMessage message) { + String payload = new String(message.getPayload()); + System.out.println("[处理设备保护告警策略数据] data: " + payload); + try { + // 业务处理逻辑 + MqttSyncLog planLog = JSON.parseObject(payload, MqttSyncLog.class); + if (planLog != null) { + iMqttSyncLogService.handleMqttPlanData(planLog); + } + + emsMqttMessageService.insertMqttOriginalMessage(topic,payload); + } catch (Exception e) { + log.error("Failed to process strategy data message: " + e.getMessage(), e); + } } @Override diff --git a/ems-framework/src/main/java/com/xzzn/framework/aspectj/FaultProtPlanAspect.java b/ems-framework/src/main/java/com/xzzn/framework/aspectj/FaultProtPlanAspect.java new file mode 100644 index 0000000..8352f5a --- /dev/null +++ b/ems-framework/src/main/java/com/xzzn/framework/aspectj/FaultProtPlanAspect.java @@ -0,0 +1,262 @@ +package com.xzzn.framework.aspectj; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.xzzn.common.utils.StringUtils; +import com.xzzn.ems.domain.EmsFaultProtectionPlan; +import com.xzzn.ems.domain.MqttSyncLog; +import com.xzzn.ems.mapper.EmsFaultProtectionPlanMapper; +import com.xzzn.ems.mapper.EmsMqttTopicConfigMapper; +import com.xzzn.ems.mapper.MqttSyncLogMapper; +import com.xzzn.framework.web.service.MqttPublisher; +import org.aspectj.lang.JoinPoint; +import org.aspectj.lang.annotation.AfterReturning; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.annotation.Before; +import org.aspectj.lang.annotation.Pointcut; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cglib.beans.BeanMap; +import org.springframework.stereotype.Component; + +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +/** + * 设备保护告警同步 + */ +@Aspect +@Component +public class FaultProtPlanAspect { + @Autowired + private MqttPublisher mqttPublisher; + + private static final ObjectMapper objectMapper = new ObjectMapper(); + private static final String STRATEGY_TOPIC = "FAULT_PROTECTION_PLAN_UP"; + private static final String TABLE_NAME = "ems_fault_protection_plan"; + @Autowired + private EmsMqttTopicConfigMapper emsMqttTopicConfigMapper; + @Autowired + private MqttSyncLogMapper mqttSyncLogMapper; + @Autowired + private EmsFaultProtectionPlanMapper emsFaultProtectionPlanMapper; + + // 用ThreadLocal暂存删除前的对象 + private ThreadLocal beforeDeleteThreadLocal = new ThreadLocal<>(); + @Before("execution(* com.xzzn.ems.mapper.EmsFaultProtectionPlanMapper.deleteEmsFaultProtectionPlanByIds(..)) && args(ids)") + public void beforeDelete(JoinPoint joinPoint, Long[] ids) { + // 获取删除的id + Object[] args = joinPoint.getArgs(); + if (args == null || args.length == 0) { + return; + } + Long[] id = (Long[]) args[0]; + + // 查询删除前的数据 + EmsFaultProtectionPlan faultInfo = emsFaultProtectionPlanMapper.selectEmsFaultProtectionPlanById(id[0]); + beforeDeleteThreadLocal.set(faultInfo); // 暂存 + } + + // 定义切点:拦截策略相关表的Mapper方法 + @Pointcut("(execution(* com.xzzn.ems.mapper.EmsFaultProtectionPlanMapper.insertEmsFaultProtectionPlan(..)) && args(insertEntity)) ") + public void insertPointCut(EmsFaultProtectionPlan insertEntity) { + System.out.println("【新增设备保护告警】FaultProtPlanAspect 实例化"); + } + @Pointcut("(execution(* com.xzzn.ems.mapper.EmsFaultProtectionPlanMapper.updateEmsFaultProtectionPlan(..)) && args(updateEntity)) ") + public void updatePointCut(EmsFaultProtectionPlan updateEntity) { + System.out.println("【更新设备保护告警】FaultProtPlanAspect 实例化"); + } + @Pointcut("(execution(* com.xzzn.ems.mapper.EmsFaultProtectionPlanMapper.deleteEmsFaultProtectionPlanByIds(..)) && args(ids)) ") + public void deletePointCut(Long[] ids) { + System.out.println("【删除设备保护告警】FaultProtPlanAspect 实例化"); + } + + // 方法执行成功后发布同步消息 + @AfterReturning(pointcut = "insertPointCut(insertEntity)", returning = "result") + public void afterInsert(JoinPoint joinPoint, EmsFaultProtectionPlan insertEntity, Integer result) { + System.out.println("【新增设备保护告警切面进入成功】"); + if (result == 0) { + return; + } + // 解析方法名,获取操作类型(INSERT/UPDATE/DELETE)和表名 + String methodName = joinPoint.getSignature().getName(); + String operateType = getOperateType(methodName); + String siteId = insertEntity.getSiteId(); + + // 构建日志同步消息 + MqttSyncLog message = createMessageObject(operateType,siteId); + + try { + // 数据转换 + String content = convertEntityToJson(insertEntity); + message.setContent(content); + + // 发布到MQTT主题 + String topic = emsMqttTopicConfigMapper.checkTopicIsExist(STRATEGY_TOPIC); + if (StringUtils.isEmpty(topic)) { + mqttPublisher.publish(STRATEGY_TOPIC, objectMapper.writeValueAsString(message), 1); + } + } catch (Exception e) { + message.setStatus("FAIL"); + message.setErrorMsg(e.getMessage()); + } + // 存储同步信息 + mqttSyncLogMapper.insertMqttSyncLog(message); + } + @AfterReturning(pointcut = "updatePointCut(updateEntity)", returning = "result") + public void afterUpdate(JoinPoint joinPoint, EmsFaultProtectionPlan updateEntity, Integer result) { + System.out.println("【更新设备保护告警切面进入成功】"); + if (result == 0) { + return; + } + // 解析方法名,获取操作类型(INSERT/UPDATE/DELETE)和表名 + String methodName = joinPoint.getSignature().getName(); + String operateType = getOperateType(methodName); + String siteId = updateEntity.getSiteId(); + + // 构建日志同步消息 + MqttSyncLog message = createMessageObject(operateType,siteId); + + try { + // 数据转换 + String content = convertEntityToJson(updateEntity); + message.setContent(content); + + // 发布到MQTT主题 - 判断区分本地还是云上 + String topic = emsMqttTopicConfigMapper.checkTopicIsExist(STRATEGY_TOPIC); + if (StringUtils.isEmpty(topic)) { + mqttPublisher.publish(STRATEGY_TOPIC, objectMapper.writeValueAsString(message), 1); + } + } catch (Exception e) { + message.setStatus("FAIL"); + message.setErrorMsg(e.getMessage()); + } + // 存储同步信息 + mqttSyncLogMapper.insertMqttSyncLog(message); + } + @AfterReturning(pointcut = "deletePointCut(id)", returning = "result") + public void afterDelete(JoinPoint joinPoint, Long[] id, Integer result) { + System.out.println("【删除设备保护告警切面进入成功】"); + if (result == 0) { + return; + } + // 解析方法名,获取操作类型(INSERT/UPDATE/DELETE)和表名 + String methodName = joinPoint.getSignature().getName(); + String operateType = getOperateType(methodName); + + // 从ThreadLocal中获取删除前的对象 + EmsFaultProtectionPlan faultInfo = beforeDeleteThreadLocal.get(); + if (faultInfo == null) { + return; + } + + // 构建日志同步消息 + MqttSyncLog message = createMessageObject(operateType,faultInfo.getSiteId()); + + try { + // 数据转换 + String content = convertEntityToJson(faultInfo); + message.setContent(content); + + // 发布到MQTT主题 + String topic = emsMqttTopicConfigMapper.checkTopicIsExist(STRATEGY_TOPIC); + if (StringUtils.isEmpty(topic)) { + mqttPublisher.publish(STRATEGY_TOPIC, objectMapper.writeValueAsString(message), 1); + } + } catch (Exception e) { + message.setStatus("FAIL"); + message.setErrorMsg(e.getMessage()); + } + // 存储同步信息 + mqttSyncLogMapper.insertMqttSyncLog(message); + } + + // 构建同步信息 + private MqttSyncLog createMessageObject(String operateType, String siteId) { + MqttSyncLog message = new MqttSyncLog(); + message.setSyncId(UUID.randomUUID().toString()); + message.setOperateType(operateType); + message.setTableName(TABLE_NAME); + message.setCreateTime(new Date()); + message.setTopic(STRATEGY_TOPIC); + message.setStatus("SUCCESS"); + message.setTarget(siteId); + return message; + } + + // 从方法名判断操作类型(示例:insert→INSERT,update→UPDATE,delete→DELETE) + private String getOperateType(String methodName) { + if (methodName.startsWith("insert")) return "INSERT"; + if (methodName.startsWith("stop")) return "STOP"; + if (methodName.startsWith("update") || methodName.startsWith("stop")) return "UPDATE"; + if (methodName.startsWith("delete")) return "DELETE"; + return "UNKNOWN"; + } + + // 从方法参数提取数据(示例:若参数是实体类,转成Map) + private Map extractDataFromParams(Object[] args) { + // 实际需反射获取实体类的字段和值(如id、name等) + Map data = new HashMap<>(); + if (args == null || args.length == 0) { + return data; + } + + for (int i = 0; i < args.length; i++) { + Object arg = args[i]; + if (arg == null) { + continue; // 跳过null参数 + } + + // 处理基本类型/包装类/字符串(直接作为值存入,key为"param0"、"param1"等) + if (isBasicType(arg.getClass())) { + String key = "param" + i; // 基本类型参数用"param0"、"param1"作为key + data.put(key, arg); + } else { + Map beanMap = beanToMap(arg); + data.putAll(beanMap); // 合并实体类的字段到结果Map + } + } + + return data; + } + + /** + * 判断是否为基本类型或包装类或字符串 + */ + private boolean isBasicType(Class clazz) { + return clazz.isPrimitive() // 基本类型(int、long、boolean等) + || clazz == String.class // 字符串 + || Number.class.isAssignableFrom(clazz) // 数字包装类(Integer、Long等) + || clazz == Boolean.class; // 布尔包装类 + } + + /** + * 将实体类转换为Map(字段名为key,字段值为value) + */ + private Map beanToMap(Object bean) { + Map map = new HashMap<>(); + if (bean == null) { + return map; + } + + // 方式1:使用BeanMap(简洁高效) + BeanMap beanMap = BeanMap.create(bean); + for (Object key : beanMap.keySet()) { + map.put(key.toString(), beanMap.get(key)); + } + + return map; + } + + // 在方法中转换 + public String convertEntityToJson(EmsFaultProtectionPlan insertEntity) throws Exception { + if (insertEntity == null) { + return null; // 空对象返回空JSON + } + + // 将实体类转换为JSON字符串 + return objectMapper.writeValueAsString(insertEntity); + } + + +} \ No newline at end of file diff --git a/ems-framework/src/main/java/com/xzzn/framework/aspectj/StrategySyncAspect.java b/ems-framework/src/main/java/com/xzzn/framework/aspectj/StrategySyncAspect.java index b7eb2c7..518c214 100644 --- a/ems-framework/src/main/java/com/xzzn/framework/aspectj/StrategySyncAspect.java +++ b/ems-framework/src/main/java/com/xzzn/framework/aspectj/StrategySyncAspect.java @@ -4,9 +4,10 @@ import com.alibaba.fastjson2.JSON; import com.fasterxml.jackson.databind.ObjectMapper; import com.xzzn.common.utils.StringUtils; import com.xzzn.ems.domain.EmsStrategyRunning; -import com.xzzn.ems.domain.MqttSyncStrategyLog; +import com.xzzn.ems.domain.MqttSyncLog; import com.xzzn.ems.mapper.EmsMqttTopicConfigMapper; -import com.xzzn.ems.mapper.MqttSyncStrategyLogMapper; +import com.xzzn.ems.mapper.EmsStrategyRunningMapper; +import com.xzzn.ems.mapper.MqttSyncLogMapper; import com.xzzn.framework.web.service.MqttPublisher; import org.aspectj.lang.JoinPoint; import org.aspectj.lang.annotation.AfterReturning; @@ -21,18 +22,24 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +/** + * 策略运行切面同步 + */ @Aspect @Component public class StrategySyncAspect { @Autowired private MqttPublisher mqttPublisher; - @Autowired - private MqttSyncStrategyLogMapper mqttSyncStrategyLogMapper; private static final ObjectMapper objectMapper = new ObjectMapper(); private static final String STRATEGY_TOPIC = "EMS_STRATEGY_UP"; + private static final String TABLE_NAME = "ems_strategy_running"; @Autowired private EmsMqttTopicConfigMapper emsMqttTopicConfigMapper; + @Autowired + private MqttSyncLogMapper mqttSyncLogMapper; + @Autowired + private EmsStrategyRunningMapper emsStrategyRunningMapper; // 定义切点:拦截策略相关表的Mapper方法 @Pointcut("(execution(* com.xzzn.ems.mapper.EmsStrategyRunningMapper.stopEmsStrategyRunning(..)) && args(id)) ") @@ -49,8 +56,8 @@ public class StrategySyncAspect { } // 方法执行成功后发布同步消息 - @AfterReturning(pointcut = "updatePointCut(insertEntity)", returning = "result") - public void afterUpdate(JoinPoint joinPoint, EmsStrategyRunning insertEntity, Integer result) { + @AfterReturning(pointcut = "updatePointCut(updateEntity)", returning = "result") + public void afterUpdate(JoinPoint joinPoint, EmsStrategyRunning updateEntity, Integer result) { System.out.println("【更新策略切面进入成功】"); if (result == 0) { return; @@ -58,33 +65,27 @@ public class StrategySyncAspect { // 解析方法名,获取操作类型(INSERT/UPDATE/DELETE)和表名 String methodName = joinPoint.getSignature().getName(); String operateType = getOperateType(methodName); - String tableName = getTableNameFromMethod(methodName); // 从Mapper类名提取表名 + String siteId = updateEntity.getSiteId(); // 构建日志同步消息 - MqttSyncStrategyLog message = new MqttSyncStrategyLog(); - message.setSyncId(UUID.randomUUID().toString()); - message.setOperateType(operateType); - message.setTableName(tableName); - message.setCreateTime(new Date()); - message.setTopic(STRATEGY_TOPIC); - message.setStatus("SUCCESS"); + MqttSyncLog message = createMessageObject(operateType,siteId); try { // 数据转换 - String content = convertEntityToJson(insertEntity); + String content = convertEntityToJson(updateEntity); message.setContent(content); - // 发布到MQTT主题 + // 发布到MQTT主题 - 判断区分本地还是云上 String topic = emsMqttTopicConfigMapper.checkTopicIsExist(STRATEGY_TOPIC); - if (!StringUtils.isEmpty(topic)) { - mqttPublisher.publish(topic, objectMapper.writeValueAsString(message), 1); + if (StringUtils.isEmpty(topic)) { + mqttPublisher.publish(STRATEGY_TOPIC, objectMapper.writeValueAsString(message), 1); } } catch (Exception e) { message.setStatus("FAIL"); message.setErrorMsg(e.getMessage()); } // 存储同步信息 - mqttSyncStrategyLogMapper.insertMqttSyncStrategyLog(message); + mqttSyncLogMapper.insertMqttSyncLog(message); } @AfterReturning(pointcut = "insertPointCut(insertEntity)", returning = "result") @@ -96,16 +97,10 @@ public class StrategySyncAspect { // 解析方法名,获取操作类型(INSERT/UPDATE/DELETE)和表名 String methodName = joinPoint.getSignature().getName(); String operateType = getOperateType(methodName); - String tableName = getTableNameFromMethod(methodName); // 从Mapper类名提取表名 + String siteId = insertEntity.getSiteId(); // 构建日志同步消息 - MqttSyncStrategyLog message = new MqttSyncStrategyLog(); - message.setSyncId(UUID.randomUUID().toString()); - message.setOperateType(operateType); - message.setTableName(tableName); - message.setCreateTime(new Date()); - message.setTopic(STRATEGY_TOPIC); - message.setStatus("SUCCESS"); + MqttSyncLog message = createMessageObject(operateType, siteId); try { // 数据转换 @@ -113,13 +108,16 @@ public class StrategySyncAspect { message.setContent(content); // 发布到MQTT主题 - mqttPublisher.publish(STRATEGY_TOPIC, objectMapper.writeValueAsString(message), 1); + String topic = emsMqttTopicConfigMapper.checkTopicIsExist(STRATEGY_TOPIC); + if (StringUtils.isEmpty(topic)) { + mqttPublisher.publish(STRATEGY_TOPIC, objectMapper.writeValueAsString(message), 1); + } } catch (Exception e) { message.setStatus("FAIL"); message.setErrorMsg(e.getMessage()); } // 存储同步信息 - mqttSyncStrategyLogMapper.insertMqttSyncStrategyLog(message); + mqttSyncLogMapper.insertMqttSyncLog(message); } @AfterReturning(pointcut = "stopPointCut(id)", returning = "result") @@ -131,17 +129,11 @@ public class StrategySyncAspect { // 解析方法名,获取操作类型(INSERT/UPDATE/DELETE)和表名 String methodName = joinPoint.getSignature().getName(); String operateType = getOperateType(methodName); - String tableName = getTableNameFromMethod(methodName); // 从Mapper类名提取表名 - + EmsStrategyRunning emsStrategyRunning = emsStrategyRunningMapper.selectEmsStrategyRunningById(id); + String siteId = emsStrategyRunning==null ? null : emsStrategyRunning.getSiteId(); // 构建日志同步消息 - MqttSyncStrategyLog message = new MqttSyncStrategyLog(); - message.setSyncId(UUID.randomUUID().toString()); - message.setOperateType(operateType); - message.setTableName(tableName); - message.setCreateTime(new Date()); - message.setTopic(STRATEGY_TOPIC); - message.setStatus("SUCCESS"); + MqttSyncLog message = createMessageObject(operateType, siteId); try { // 数据转换 @@ -151,15 +143,30 @@ public class StrategySyncAspect { message.setContent(content); // 发布到MQTT主题 - mqttPublisher.publish(STRATEGY_TOPIC, objectMapper.writeValueAsString(message), 1); + String topic = emsMqttTopicConfigMapper.checkTopicIsExist(STRATEGY_TOPIC); + if (StringUtils.isEmpty(topic)) { + mqttPublisher.publish(STRATEGY_TOPIC, objectMapper.writeValueAsString(message), 1); + } } catch (Exception e) { message.setStatus("FAIL"); message.setErrorMsg(e.getMessage()); } // 存储同步信息 - mqttSyncStrategyLogMapper.insertMqttSyncStrategyLog(message); + mqttSyncLogMapper.insertMqttSyncLog(message); } + // 构建同步信息 + private MqttSyncLog createMessageObject(String operateType, String siteId) { + MqttSyncLog message = new MqttSyncLog(); + message.setSyncId(UUID.randomUUID().toString()); + message.setOperateType(operateType); + message.setTableName(TABLE_NAME); + message.setCreateTime(new Date()); + message.setTopic(STRATEGY_TOPIC); + message.setStatus("SUCCESS"); + message.setTarget(siteId); + return message; + } // 从方法名判断操作类型(示例:insert→INSERT,update→UPDATE,delete→DELETE) private String getOperateType(String methodName) { if (methodName.startsWith("insert")) return "INSERT"; @@ -169,12 +176,6 @@ public class StrategySyncAspect { return "UNKNOWN"; } - // 从Mapper类名提取表名(示例:StrategyMapper→strategy) - private String getTableNameFromMethod(String methodName) { - // 实际需通过JoinPoint获取Mapper类名,再转换为表名(如StrategyTemplateMapper→strategy_template) - return "strategy"; // 简化示例 - } - // 从方法参数提取数据(示例:若参数是实体类,转成Map) private Map extractDataFromParams(Object[] args) { // 实际需反射获取实体类的字段和值(如id、name等) @@ -240,4 +241,5 @@ public class StrategySyncAspect { return objectMapper.writeValueAsString(insertEntity); } + } \ No newline at end of file diff --git a/ems-system/src/main/java/com/xzzn/ems/domain/MqttSyncLog.java b/ems-system/src/main/java/com/xzzn/ems/domain/MqttSyncLog.java new file mode 100644 index 0000000..cc8b5cf --- /dev/null +++ b/ems-system/src/main/java/com/xzzn/ems/domain/MqttSyncLog.java @@ -0,0 +1,158 @@ +package com.xzzn.ems.domain; + +import com.xzzn.common.core.domain.BaseEntity; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import com.xzzn.common.annotation.Excel; + +/** + * MQTT云上本地同步日志对象 mqtt_sync_log + * + * @author xzzn + * @date 2025-11-12 + */ +public class MqttSyncLog extends BaseEntity +{ + private static final long serialVersionUID = 1L; + + /** 主键 */ + private Long id; + + /** 同步消息唯一标识(与消息中的syncId一致) */ + @Excel(name = "同步消息唯一标识", readConverterExp = "与=消息中的syncId一致") + private String syncId; + + /** MQTT主题 */ + @Excel(name = "MQTT主题") + private String topic; + + /** 操作类型:INSERT/UPDATE/DELETE */ + @Excel(name = "操作类型:INSERT/UPDATE/DELETE") + private String operateType; + + /** 涉及的表名 */ + @Excel(name = "涉及的表名") + private String tableName; + + /** 同步数据内容(JSON格式) */ + @Excel(name = "同步数据内容", readConverterExp = "J=SON格式") + private String content; + + /** 处理状态:SUCCESS/FAIL */ + @Excel(name = "处理状态:SUCCESS/FAIL") + private String status; + + /** 失败原因(status=FAIL时填写) */ + @Excel(name = "失败原因", readConverterExp = "s=tatus=FAIL时填写") + private String errorMsg; + + /** 同步目标 */ + @Excel(name = "同步目标") + private String target; + + public void setId(Long id) + { + this.id = id; + } + + public Long getId() + { + return id; + } + + public void setSyncId(String syncId) + { + this.syncId = syncId; + } + + public String getSyncId() + { + return syncId; + } + + public void setTopic(String topic) + { + this.topic = topic; + } + + public String getTopic() + { + return topic; + } + + public void setOperateType(String operateType) + { + this.operateType = operateType; + } + + public String getOperateType() + { + return operateType; + } + + public void setTableName(String tableName) + { + this.tableName = tableName; + } + + public String getTableName() + { + return tableName; + } + + public void setContent(String content) + { + this.content = content; + } + + public String getContent() + { + return content; + } + + public void setStatus(String status) + { + this.status = status; + } + + public String getStatus() + { + return status; + } + + public void setErrorMsg(String errorMsg) + { + this.errorMsg = errorMsg; + } + + public String getErrorMsg() + { + return errorMsg; + } + + public void setTarget(String target) + { + this.target = target; + } + + public String getTarget() + { + return target; + } + + @Override + public String toString() { + return new ToStringBuilder(this,ToStringStyle.MULTI_LINE_STYLE) + .append("id", getId()) + .append("syncId", getSyncId()) + .append("topic", getTopic()) + .append("operateType", getOperateType()) + .append("tableName", getTableName()) + .append("content", getContent()) + .append("status", getStatus()) + .append("errorMsg", getErrorMsg()) + .append("target", getTarget()) + .append("createTime", getCreateTime()) + .toString(); + } +} diff --git a/ems-system/src/main/java/com/xzzn/ems/mapper/EmsFaultProtectionPlanMapper.java b/ems-system/src/main/java/com/xzzn/ems/mapper/EmsFaultProtectionPlanMapper.java index 35c5eba..c5e08ce 100644 --- a/ems-system/src/main/java/com/xzzn/ems/mapper/EmsFaultProtectionPlanMapper.java +++ b/ems-system/src/main/java/com/xzzn/ems/mapper/EmsFaultProtectionPlanMapper.java @@ -2,6 +2,7 @@ package com.xzzn.ems.mapper; import java.util.List; import com.xzzn.ems.domain.EmsFaultProtectionPlan; +import org.apache.ibatis.annotations.Param; /** * 故障告警保护方案Mapper接口 @@ -58,4 +59,9 @@ public interface EmsFaultProtectionPlanMapper * @return 结果 */ public int deleteEmsFaultProtectionPlanByIds(Long[] ids); + + // 根据站点+faultName+faultLevel 同步删除 + public void deleteEmsFaultProtectionPlan(@Param("siteId")String siteId, + @Param("faultName")String faultName, + @Param("faultLevel")Integer faultLevel); } diff --git a/ems-system/src/main/java/com/xzzn/ems/mapper/MqttSyncLogMapper.java b/ems-system/src/main/java/com/xzzn/ems/mapper/MqttSyncLogMapper.java new file mode 100644 index 0000000..e8199d0 --- /dev/null +++ b/ems-system/src/main/java/com/xzzn/ems/mapper/MqttSyncLogMapper.java @@ -0,0 +1,63 @@ +package com.xzzn.ems.mapper; + +import java.util.List; +import com.xzzn.ems.domain.MqttSyncLog; + +/** + * MQTT云上本地同步日志Mapper接口 + * + * @author xzzn + * @date 2025-11-12 + */ +public interface MqttSyncLogMapper +{ + /** + * 查询MQTT云上本地同步日志 + * + * @param id MQTT云上本地同步日志主键 + * @return MQTT云上本地同步日志 + */ + public MqttSyncLog selectMqttSyncLogById(Long id); + + /** + * 查询MQTT云上本地同步日志列表 + * + * @param mqttSyncLog MQTT云上本地同步日志 + * @return MQTT云上本地同步日志集合 + */ + public List selectMqttSyncLogList(MqttSyncLog mqttSyncLog); + + /** + * 新增MQTT云上本地同步日志 + * + * @param mqttSyncLog MQTT云上本地同步日志 + * @return 结果 + */ + public int insertMqttSyncLog(MqttSyncLog mqttSyncLog); + + /** + * 修改MQTT云上本地同步日志 + * + * @param mqttSyncLog MQTT云上本地同步日志 + * @return 结果 + */ + public int updateMqttSyncLog(MqttSyncLog mqttSyncLog); + + /** + * 删除MQTT云上本地同步日志 + * + * @param id MQTT云上本地同步日志主键 + * @return 结果 + */ + public int deleteMqttSyncLogById(Long id); + + /** + * 批量删除MQTT云上本地同步日志 + * + * @param ids 需要删除的数据主键集合 + * @return 结果 + */ + public int deleteMqttSyncLogByIds(Long[] ids); + + public MqttSyncLog selectMqttSyncLogBySyncId(String syncId); +} diff --git a/ems-system/src/main/java/com/xzzn/ems/service/IEmsFaultProtectionPlanService.java b/ems-system/src/main/java/com/xzzn/ems/service/IEmsFaultProtectionPlanService.java index 8746bf2..36cbf28 100644 --- a/ems-system/src/main/java/com/xzzn/ems/service/IEmsFaultProtectionPlanService.java +++ b/ems-system/src/main/java/com/xzzn/ems/service/IEmsFaultProtectionPlanService.java @@ -58,4 +58,7 @@ public interface IEmsFaultProtectionPlanService * @return 结果 */ public int deleteEmsFaultProtectionPlanById(Long id); + + // 处理云上同步设备保护告警信息 + public void dealSyncData(String content, String operateType); } diff --git a/ems-system/src/main/java/com/xzzn/ems/service/IMqttSyncLogService.java b/ems-system/src/main/java/com/xzzn/ems/service/IMqttSyncLogService.java new file mode 100644 index 0000000..b984e0f --- /dev/null +++ b/ems-system/src/main/java/com/xzzn/ems/service/IMqttSyncLogService.java @@ -0,0 +1,66 @@ +package com.xzzn.ems.service; + +import java.util.List; +import com.xzzn.ems.domain.MqttSyncLog; + +/** + * MQTT云上本地同步日志Service接口 + * + * @author xzzn + * @date 2025-11-12 + */ +public interface IMqttSyncLogService +{ + /** + * 查询MQTT云上本地同步日志 + * + * @param id MQTT云上本地同步日志主键 + * @return MQTT云上本地同步日志 + */ + public MqttSyncLog selectMqttSyncLogById(Long id); + + /** + * 查询MQTT云上本地同步日志列表 + * + * @param mqttSyncLog MQTT云上本地同步日志 + * @return MQTT云上本地同步日志集合 + */ + public List selectMqttSyncLogList(MqttSyncLog mqttSyncLog); + + /** + * 新增MQTT云上本地同步日志 + * + * @param mqttSyncLog MQTT云上本地同步日志 + * @return 结果 + */ + public int insertMqttSyncLog(MqttSyncLog mqttSyncLog); + + /** + * 修改MQTT云上本地同步日志 + * + * @param mqttSyncLog MQTT云上本地同步日志 + * @return 结果 + */ + public int updateMqttSyncLog(MqttSyncLog mqttSyncLog); + + /** + * 批量删除MQTT云上本地同步日志 + * + * @param ids 需要删除的MQTT云上本地同步日志主键集合 + * @return 结果 + */ + public int deleteMqttSyncLogByIds(Long[] ids); + + /** + * 删除MQTT云上本地同步日志信息 + * + * @param id MQTT云上本地同步日志主键 + * @return 结果 + */ + public int deleteMqttSyncLogById(Long id); + + // 处理策略信息 + public void handleMqttStrategyData(String payload); + // 处理设备告警保护信息 + public void handleMqttPlanData(MqttSyncLog planLog); +} diff --git a/ems-system/src/main/java/com/xzzn/ems/service/impl/EmsFaultProtectionPlanServiceImpl.java b/ems-system/src/main/java/com/xzzn/ems/service/impl/EmsFaultProtectionPlanServiceImpl.java index d252b73..6da7cf1 100644 --- a/ems-system/src/main/java/com/xzzn/ems/service/impl/EmsFaultProtectionPlanServiceImpl.java +++ b/ems-system/src/main/java/com/xzzn/ems/service/impl/EmsFaultProtectionPlanServiceImpl.java @@ -1,7 +1,13 @@ package com.xzzn.ems.service.impl; import java.util.List; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; import com.xzzn.common.utils.DateUtils; +import com.xzzn.common.utils.StringUtils; +import com.xzzn.ems.domain.EmsStrategyRunning; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.xzzn.ems.mapper.EmsFaultProtectionPlanMapper; @@ -93,4 +99,44 @@ public class EmsFaultProtectionPlanServiceImpl implements IEmsFaultProtectionPla { return emsFaultProtectionPlanMapper.deleteEmsFaultProtectionPlanById(id); } + + @Override + public void dealSyncData(String content, String operateType) { + if (StringUtils.isEmpty(content)) { + return; + } + + EmsFaultProtectionPlan faultProtectionPlan = JSON.parseObject(content, EmsFaultProtectionPlan.class); + switch(operateType) { + case "INSERT": + insertEmsFaultProtectionPlan(faultProtectionPlan); + break; + case "UPDATE": + updateEmsFaultProtectionPlan(faultProtectionPlan); + break; + case "DELETE": + // 根据站点+faultName+faultLevel 删除 + String siteId = faultProtectionPlan.getSiteId(); + String faultName = faultProtectionPlan.getFaultName(); + Integer faultLevel = faultProtectionPlan.getFaultLevel(); + emsFaultProtectionPlanMapper.deleteEmsFaultProtectionPlan(siteId, faultName, faultLevel); + break; + default: + break; + } + } + + private static Long[] convertJsonToLongArray(String json) { + // 解析 JSON 为 JSONObject + JSONObject jsonObject = JSON.parseObject(json); + // 获取 "id" 对应的 JSONArray + JSONArray idArray = jsonObject.getJSONArray("id"); + + if (idArray == null) { + return new Long[0]; // 数组不存在时返回空数组 + } + + // 转换为 Long 数组 + return idArray.toArray(new Long[0]); + } } diff --git a/ems-system/src/main/java/com/xzzn/ems/service/impl/EmsStrategyServiceImpl.java b/ems-system/src/main/java/com/xzzn/ems/service/impl/EmsStrategyServiceImpl.java index a7e3ad8..0c277be 100644 --- a/ems-system/src/main/java/com/xzzn/ems/service/impl/EmsStrategyServiceImpl.java +++ b/ems-system/src/main/java/com/xzzn/ems/service/impl/EmsStrategyServiceImpl.java @@ -71,10 +71,32 @@ public class EmsStrategyServiceImpl implements IEmsStrategyService if (StringUtils.isEmpty(content)) { return; } -// switch (operateType) { -// case "INSERT": -// EmsStrategyRunning emsStrategyRunning = JSON.parseObject(content, EmsStrategyRunning.class); -// emsStrategyRunningMapper.insertEmsStrategyRunning(emsStrategyRunning); -// } + EmsStrategyRunning emsStrategyRunning = JSON.parseObject(content, EmsStrategyRunning.class); + switch (operateType) { + case "INSERT": + emsStrategyRunningMapper.insertEmsStrategyRunning(emsStrategyRunning); + break; + case "STOP": + Long id = emsStrategyRunning.getId(); + emsStrategyRunningMapper.stopEmsStrategyRunning(id); + break; + case "UPDATE": + String siteId = emsStrategyRunning.getSiteId(); + Long mainId = emsStrategyRunning.getMainStrategyId(); + Long auxId = emsStrategyRunning.getAuxiliaryStrategyId(); + EmsStrategyRunning existStrategy = emsStrategyRunningMapper.getRunningStrategy(siteId); + if (existStrategy != null) { // 存在正在运行的则更新 + existStrategy.setMainStrategyId(mainId); + existStrategy.setAuxiliaryStrategyId(auxId); + emsStrategyRunningMapper.updateEmsStrategyRunning(emsStrategyRunning); + } else { // 不存在着插入 + emsStrategyRunning.setCreateTime(DateUtils.getNowDate()); + emsStrategyRunningMapper.insertEmsStrategyRunning(emsStrategyRunning); + } + break; + default: + // 未知操作类型-不同步 + break; + } } } diff --git a/ems-system/src/main/java/com/xzzn/ems/service/impl/MqttSyncLogServiceImpl.java b/ems-system/src/main/java/com/xzzn/ems/service/impl/MqttSyncLogServiceImpl.java new file mode 100644 index 0000000..8797e9e --- /dev/null +++ b/ems-system/src/main/java/com/xzzn/ems/service/impl/MqttSyncLogServiceImpl.java @@ -0,0 +1,150 @@ +package com.xzzn.ems.service.impl; + +import java.util.List; + +import com.alibaba.fastjson2.JSON; +import com.xzzn.common.utils.DateUtils; +import com.xzzn.common.utils.StringUtils; +import com.xzzn.ems.service.IEmsFaultProtectionPlanService; +import com.xzzn.ems.service.IEmsStrategyService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import com.xzzn.ems.mapper.MqttSyncLogMapper; +import com.xzzn.ems.domain.MqttSyncLog; +import com.xzzn.ems.service.IMqttSyncLogService; + +/** + * MQTT云上本地同步日志Service业务层处理 + * + * @author xzzn + * @date 2025-11-12 + */ +@Service +public class MqttSyncLogServiceImpl implements IMqttSyncLogService +{ + @Autowired + private MqttSyncLogMapper mqttSyncLogMapper; + @Autowired + private IEmsFaultProtectionPlanService emsFaultProtectionPlanService; + @Autowired + private IEmsStrategyService emsStrategyService; + + /** + * 查询MQTT云上本地同步日志 + * + * @param id MQTT云上本地同步日志主键 + * @return MQTT云上本地同步日志 + */ + @Override + public MqttSyncLog selectMqttSyncLogById(Long id) + { + return mqttSyncLogMapper.selectMqttSyncLogById(id); + } + + /** + * 查询MQTT云上本地同步日志列表 + * + * @param mqttSyncLog MQTT云上本地同步日志 + * @return MQTT云上本地同步日志 + */ + @Override + public List selectMqttSyncLogList(MqttSyncLog mqttSyncLog) + { + return mqttSyncLogMapper.selectMqttSyncLogList(mqttSyncLog); + } + + /** + * 新增MQTT云上本地同步日志 + * + * @param mqttSyncLog MQTT云上本地同步日志 + * @return 结果 + */ + @Override + public int insertMqttSyncLog(MqttSyncLog mqttSyncLog) + { + mqttSyncLog.setCreateTime(DateUtils.getNowDate()); + return mqttSyncLogMapper.insertMqttSyncLog(mqttSyncLog); + } + + /** + * 修改MQTT云上本地同步日志 + * + * @param mqttSyncLog MQTT云上本地同步日志 + * @return 结果 + */ + @Override + public int updateMqttSyncLog(MqttSyncLog mqttSyncLog) + { + return mqttSyncLogMapper.updateMqttSyncLog(mqttSyncLog); + } + + /** + * 批量删除MQTT云上本地同步日志 + * + * @param ids 需要删除的MQTT云上本地同步日志主键 + * @return 结果 + */ + @Override + public int deleteMqttSyncLogByIds(Long[] ids) + { + return mqttSyncLogMapper.deleteMqttSyncLogByIds(ids); + } + + /** + * 删除MQTT云上本地同步日志信息 + * + * @param id MQTT云上本地同步日志主键 + * @return 结果 + */ + @Override + public int deleteMqttSyncLogById(Long id) + { + return mqttSyncLogMapper.deleteMqttSyncLogById(id); + } + + /** + * 处理云上运行策略数据 + * @param payload + */ + @Override + public void handleMqttStrategyData(String payload) { + MqttSyncLog syncLog = JSON.parseObject(payload, MqttSyncLog.class); + if (syncLog != null) { + // 校验是否存在 + String syncId = syncLog.getSyncId(); + MqttSyncLog existLog = mqttSyncLogMapper.selectMqttSyncLogBySyncId(syncId); + if (existLog == null) { + // 根据不同操作更新 + String operateType = syncLog.getOperateType(); + if (!StringUtils.isEmpty(operateType)) { + emsStrategyService.dealStrategyData(syncLog.getContent(),operateType); + } + } + // 保存日志 + insertMqttSyncLog(syncLog); + } + } + + /** + * 处理云上同步的告警保护信息 + * @param planLog + */ + @Override + public void handleMqttPlanData(MqttSyncLog planLog) { + // 校验是否存在 + String syncId = planLog.getSyncId(); + MqttSyncLog existLog = mqttSyncLogMapper.selectMqttSyncLogBySyncId(syncId); + if (existLog != null) { + return; + } + // 保存日志 + insertMqttSyncLog(planLog); + + // 处理数据 + String operateType = planLog.getOperateType(); + if (!StringUtils.isEmpty(operateType)) { + emsFaultProtectionPlanService.dealSyncData(planLog.getContent(),operateType); + } + + } +} diff --git a/ems-system/src/main/resources/mapper/ems/EmsFaultProtectionPlanMapper.xml b/ems-system/src/main/resources/mapper/ems/EmsFaultProtectionPlanMapper.xml index c221022..ea7ca86 100644 --- a/ems-system/src/main/resources/mapper/ems/EmsFaultProtectionPlanMapper.xml +++ b/ems-system/src/main/resources/mapper/ems/EmsFaultProtectionPlanMapper.xml @@ -114,4 +114,11 @@ #{id} + + + delete from ems_fault_protection_plan + where site_id = #{siteId} + and fault_name = #{faultName} + and fault_level = #{faultLevel} + \ No newline at end of file diff --git a/ems-system/src/main/resources/mapper/ems/MqttSyncLogMapper.xml b/ems-system/src/main/resources/mapper/ems/MqttSyncLogMapper.xml new file mode 100644 index 0000000..1a1a2f1 --- /dev/null +++ b/ems-system/src/main/resources/mapper/ems/MqttSyncLogMapper.xml @@ -0,0 +1,100 @@ + + + + + + + + + + + + + + + + + + + select id, sync_id, topic, operate_type, table_name, content, status, error_msg, target, create_time from mqtt_sync_log + + + + + + + + insert into mqtt_sync_log + + sync_id, + topic, + operate_type, + table_name, + content, + status, + error_msg, + target, + create_time, + + + #{syncId}, + #{topic}, + #{operateType}, + #{tableName}, + #{content}, + #{status}, + #{errorMsg}, + #{target}, + #{createTime}, + + + + + update mqtt_sync_log + + sync_id = #{syncId}, + topic = #{topic}, + operate_type = #{operateType}, + table_name = #{tableName}, + content = #{content}, + status = #{status}, + error_msg = #{errorMsg}, + target = #{target}, + create_time = #{createTime}, + + where id = #{id} + + + + delete from mqtt_sync_log where id = #{id} + + + + delete from mqtt_sync_log where id in + + #{id} + + + + + \ No newline at end of file