dev #2

Merged
dashixiong merged 349 commits from dev into main 2026-02-11 01:55:46 +00:00
305 changed files with 43967 additions and 1115 deletions
Showing only changes of commit 38ade0c2ed - Show all commits

View File

@ -3,7 +3,7 @@ package com.xzzn.web.controller.ems;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
import com.xzzn.common.utils.StringUtils; import com.xzzn.common.utils.StringUtils;
import com.xzzn.ems.domain.EmsMqttTopicConfig; import com.xzzn.ems.domain.EmsMqttTopicConfig;
import com.xzzn.ems.domain.MqttSyncStrategyLog; import com.xzzn.ems.domain.MqttSyncLog;
import com.xzzn.ems.mapper.EmsMqttTopicConfigMapper; import com.xzzn.ems.mapper.EmsMqttTopicConfigMapper;
import com.xzzn.ems.service.*; import com.xzzn.ems.service.*;
import com.xzzn.framework.manager.MqttLifecycleManager; import com.xzzn.framework.manager.MqttLifecycleManager;
@ -45,6 +45,8 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber {
private EmsMqttTopicConfigMapper emsMqttTopicConfigMapper; private EmsMqttTopicConfigMapper emsMqttTopicConfigMapper;
@Autowired @Autowired
private IEmsStrategyService emsStrategyService; private IEmsStrategyService emsStrategyService;
@Autowired
private IMqttSyncLogService iMqttSyncLogService;
@Autowired @Autowired
public MqttMessageController(MqttLifecycleManager mqttLifecycleManager) { public MqttMessageController(MqttLifecycleManager mqttLifecycleManager) {
@ -98,6 +100,8 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber {
return this::handleSystemStatus; return this::handleSystemStatus;
} else if (topic.contains("STRATEGY")) { } else if (topic.contains("STRATEGY")) {
return this::handleStrategyData; return this::handleStrategyData;
} else if (topic.contains("PROTECTION_PLAN")) {
return this::handleFaultProtPlanData;
} else { } else {
return this::handleDeviceData; return this::handleDeviceData;
} }
@ -158,20 +162,29 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber {
System.out.println("[处理运行策略数据] data: " + payload); System.out.println("[处理运行策略数据] data: " + payload);
try { try {
// 业务处理逻辑 // 业务处理逻辑
MqttSyncStrategyLog strategyLog = JSON.parseObject(payload, MqttSyncStrategyLog.class); iMqttSyncLogService.handleMqttStrategyData(payload);
if (strategyLog != null) {
String content = strategyLog.getContent();
// 根据不同操作更新
String operateType = strategyLog.getOperateType();
if (!StringUtils.isEmpty(operateType)) {
emsStrategyService.dealStrategyData(content,operateType);
}
}
emsMqttMessageService.insertMqttOriginalMessage(topic,payload); emsMqttMessageService.insertMqttOriginalMessage(topic,payload);
} catch (Exception e) { } catch (Exception e) {
log.error("Failed to process system status message: " + e.getMessage(), e); log.error("Failed to process strategy data message: " + e.getMessage(), e);
} }
}
// 处理设备保护告警策略数据
private void handleFaultProtPlanData(String topic, MqttMessage message) {
String payload = new String(message.getPayload());
System.out.println("[处理设备保护告警策略数据] data: " + payload);
try {
// 业务处理逻辑
MqttSyncLog planLog = JSON.parseObject(payload, MqttSyncLog.class);
if (planLog != null) {
iMqttSyncLogService.handleMqttPlanData(planLog);
}
emsMqttMessageService.insertMqttOriginalMessage(topic,payload);
} catch (Exception e) {
log.error("Failed to process strategy data message: " + e.getMessage(), e);
}
} }
@Override @Override

View File

@ -0,0 +1,262 @@
package com.xzzn.framework.aspectj;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.xzzn.common.utils.StringUtils;
import com.xzzn.ems.domain.EmsFaultProtectionPlan;
import com.xzzn.ems.domain.MqttSyncLog;
import com.xzzn.ems.mapper.EmsFaultProtectionPlanMapper;
import com.xzzn.ems.mapper.EmsMqttTopicConfigMapper;
import com.xzzn.ems.mapper.MqttSyncLogMapper;
import com.xzzn.framework.web.service.MqttPublisher;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cglib.beans.BeanMap;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* 设备保护告警同步
*/
@Aspect
@Component
public class FaultProtPlanAspect {
@Autowired
private MqttPublisher mqttPublisher;
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final String STRATEGY_TOPIC = "FAULT_PROTECTION_PLAN_UP";
private static final String TABLE_NAME = "ems_fault_protection_plan";
@Autowired
private EmsMqttTopicConfigMapper emsMqttTopicConfigMapper;
@Autowired
private MqttSyncLogMapper mqttSyncLogMapper;
@Autowired
private EmsFaultProtectionPlanMapper emsFaultProtectionPlanMapper;
// 用ThreadLocal暂存删除前的对象
private ThreadLocal<EmsFaultProtectionPlan> beforeDeleteThreadLocal = new ThreadLocal<>();
@Before("execution(* com.xzzn.ems.mapper.EmsFaultProtectionPlanMapper.deleteEmsFaultProtectionPlanByIds(..)) && args(ids)")
public void beforeDelete(JoinPoint joinPoint, Long[] ids) {
// 获取删除的id
Object[] args = joinPoint.getArgs();
if (args == null || args.length == 0) {
return;
}
Long[] id = (Long[]) args[0];
// 查询删除前的数据
EmsFaultProtectionPlan faultInfo = emsFaultProtectionPlanMapper.selectEmsFaultProtectionPlanById(id[0]);
beforeDeleteThreadLocal.set(faultInfo); // 暂存
}
// 定义切点拦截策略相关表的Mapper方法
@Pointcut("(execution(* com.xzzn.ems.mapper.EmsFaultProtectionPlanMapper.insertEmsFaultProtectionPlan(..)) && args(insertEntity)) ")
public void insertPointCut(EmsFaultProtectionPlan insertEntity) {
System.out.println("【新增设备保护告警】FaultProtPlanAspect 实例化");
}
@Pointcut("(execution(* com.xzzn.ems.mapper.EmsFaultProtectionPlanMapper.updateEmsFaultProtectionPlan(..)) && args(updateEntity)) ")
public void updatePointCut(EmsFaultProtectionPlan updateEntity) {
System.out.println("【更新设备保护告警】FaultProtPlanAspect 实例化");
}
@Pointcut("(execution(* com.xzzn.ems.mapper.EmsFaultProtectionPlanMapper.deleteEmsFaultProtectionPlanByIds(..)) && args(ids)) ")
public void deletePointCut(Long[] ids) {
System.out.println("【删除设备保护告警】FaultProtPlanAspect 实例化");
}
// 方法执行成功后发布同步消息
@AfterReturning(pointcut = "insertPointCut(insertEntity)", returning = "result")
public void afterInsert(JoinPoint joinPoint, EmsFaultProtectionPlan insertEntity, Integer result) {
System.out.println("【新增设备保护告警切面进入成功】");
if (result == 0) {
return;
}
// 解析方法名获取操作类型INSERT/UPDATE/DELETE和表名
String methodName = joinPoint.getSignature().getName();
String operateType = getOperateType(methodName);
String siteId = insertEntity.getSiteId();
// 构建日志同步消息
MqttSyncLog message = createMessageObject(operateType,siteId);
try {
// 数据转换
String content = convertEntityToJson(insertEntity);
message.setContent(content);
// 发布到MQTT主题
String topic = emsMqttTopicConfigMapper.checkTopicIsExist(STRATEGY_TOPIC);
if (StringUtils.isEmpty(topic)) {
mqttPublisher.publish(STRATEGY_TOPIC, objectMapper.writeValueAsString(message), 1);
}
} catch (Exception e) {
message.setStatus("FAIL");
message.setErrorMsg(e.getMessage());
}
// 存储同步信息
mqttSyncLogMapper.insertMqttSyncLog(message);
}
@AfterReturning(pointcut = "updatePointCut(updateEntity)", returning = "result")
public void afterUpdate(JoinPoint joinPoint, EmsFaultProtectionPlan updateEntity, Integer result) {
System.out.println("【更新设备保护告警切面进入成功】");
if (result == 0) {
return;
}
// 解析方法名获取操作类型INSERT/UPDATE/DELETE和表名
String methodName = joinPoint.getSignature().getName();
String operateType = getOperateType(methodName);
String siteId = updateEntity.getSiteId();
// 构建日志同步消息
MqttSyncLog message = createMessageObject(operateType,siteId);
try {
// 数据转换
String content = convertEntityToJson(updateEntity);
message.setContent(content);
// 发布到MQTT主题 - 判断区分本地还是云上
String topic = emsMqttTopicConfigMapper.checkTopicIsExist(STRATEGY_TOPIC);
if (StringUtils.isEmpty(topic)) {
mqttPublisher.publish(STRATEGY_TOPIC, objectMapper.writeValueAsString(message), 1);
}
} catch (Exception e) {
message.setStatus("FAIL");
message.setErrorMsg(e.getMessage());
}
// 存储同步信息
mqttSyncLogMapper.insertMqttSyncLog(message);
}
@AfterReturning(pointcut = "deletePointCut(id)", returning = "result")
public void afterDelete(JoinPoint joinPoint, Long[] id, Integer result) {
System.out.println("【删除设备保护告警切面进入成功】");
if (result == 0) {
return;
}
// 解析方法名获取操作类型INSERT/UPDATE/DELETE和表名
String methodName = joinPoint.getSignature().getName();
String operateType = getOperateType(methodName);
// 从ThreadLocal中获取删除前的对象
EmsFaultProtectionPlan faultInfo = beforeDeleteThreadLocal.get();
if (faultInfo == null) {
return;
}
// 构建日志同步消息
MqttSyncLog message = createMessageObject(operateType,faultInfo.getSiteId());
try {
// 数据转换
String content = convertEntityToJson(faultInfo);
message.setContent(content);
// 发布到MQTT主题
String topic = emsMqttTopicConfigMapper.checkTopicIsExist(STRATEGY_TOPIC);
if (StringUtils.isEmpty(topic)) {
mqttPublisher.publish(STRATEGY_TOPIC, objectMapper.writeValueAsString(message), 1);
}
} catch (Exception e) {
message.setStatus("FAIL");
message.setErrorMsg(e.getMessage());
}
// 存储同步信息
mqttSyncLogMapper.insertMqttSyncLog(message);
}
// 构建同步信息
private MqttSyncLog createMessageObject(String operateType, String siteId) {
MqttSyncLog message = new MqttSyncLog();
message.setSyncId(UUID.randomUUID().toString());
message.setOperateType(operateType);
message.setTableName(TABLE_NAME);
message.setCreateTime(new Date());
message.setTopic(STRATEGY_TOPIC);
message.setStatus("SUCCESS");
message.setTarget(siteId);
return message;
}
// 从方法名判断操作类型示例insert→INSERTupdate→UPDATEdelete→DELETE
private String getOperateType(String methodName) {
if (methodName.startsWith("insert")) return "INSERT";
if (methodName.startsWith("stop")) return "STOP";
if (methodName.startsWith("update") || methodName.startsWith("stop")) return "UPDATE";
if (methodName.startsWith("delete")) return "DELETE";
return "UNKNOWN";
}
// 从方法参数提取数据示例若参数是实体类转成Map
private Map<String, Object> extractDataFromParams(Object[] args) {
// 实际需反射获取实体类的字段和值如id、name等
Map<String, Object> data = new HashMap<>();
if (args == null || args.length == 0) {
return data;
}
for (int i = 0; i < args.length; i++) {
Object arg = args[i];
if (arg == null) {
continue; // 跳过null参数
}
// 处理基本类型/包装类/字符串直接作为值存入key为"param0"、"param1"等)
if (isBasicType(arg.getClass())) {
String key = "param" + i; // 基本类型参数用"param0"、"param1"作为key
data.put(key, arg);
} else {
Map<String, Object> beanMap = beanToMap(arg);
data.putAll(beanMap); // 合并实体类的字段到结果Map
}
}
return data;
}
/**
* 判断是否为基本类型或包装类或字符串
*/
private boolean isBasicType(Class<?> clazz) {
return clazz.isPrimitive() // 基本类型int、long、boolean等
|| clazz == String.class // 字符串
|| Number.class.isAssignableFrom(clazz) // 数字包装类Integer、Long等
|| clazz == Boolean.class; // 布尔包装类
}
/**
* 将实体类转换为Map字段名为key字段值为value
*/
private Map<String, Object> beanToMap(Object bean) {
Map<String, Object> map = new HashMap<>();
if (bean == null) {
return map;
}
// 方式1使用BeanMap简洁高效
BeanMap beanMap = BeanMap.create(bean);
for (Object key : beanMap.keySet()) {
map.put(key.toString(), beanMap.get(key));
}
return map;
}
// 在方法中转换
public String convertEntityToJson(EmsFaultProtectionPlan insertEntity) throws Exception {
if (insertEntity == null) {
return null; // 空对象返回空JSON
}
// 将实体类转换为JSON字符串
return objectMapper.writeValueAsString(insertEntity);
}
}

View File

@ -4,9 +4,10 @@ import com.alibaba.fastjson2.JSON;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.xzzn.common.utils.StringUtils; import com.xzzn.common.utils.StringUtils;
import com.xzzn.ems.domain.EmsStrategyRunning; import com.xzzn.ems.domain.EmsStrategyRunning;
import com.xzzn.ems.domain.MqttSyncStrategyLog; import com.xzzn.ems.domain.MqttSyncLog;
import com.xzzn.ems.mapper.EmsMqttTopicConfigMapper; import com.xzzn.ems.mapper.EmsMqttTopicConfigMapper;
import com.xzzn.ems.mapper.MqttSyncStrategyLogMapper; import com.xzzn.ems.mapper.EmsStrategyRunningMapper;
import com.xzzn.ems.mapper.MqttSyncLogMapper;
import com.xzzn.framework.web.service.MqttPublisher; import com.xzzn.framework.web.service.MqttPublisher;
import org.aspectj.lang.JoinPoint; import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning; import org.aspectj.lang.annotation.AfterReturning;
@ -21,18 +22,24 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
/**
* 策略运行切面同步
*/
@Aspect @Aspect
@Component @Component
public class StrategySyncAspect { public class StrategySyncAspect {
@Autowired @Autowired
private MqttPublisher mqttPublisher; private MqttPublisher mqttPublisher;
@Autowired
private MqttSyncStrategyLogMapper mqttSyncStrategyLogMapper;
private static final ObjectMapper objectMapper = new ObjectMapper(); private static final ObjectMapper objectMapper = new ObjectMapper();
private static final String STRATEGY_TOPIC = "EMS_STRATEGY_UP"; private static final String STRATEGY_TOPIC = "EMS_STRATEGY_UP";
private static final String TABLE_NAME = "ems_strategy_running";
@Autowired @Autowired
private EmsMqttTopicConfigMapper emsMqttTopicConfigMapper; private EmsMqttTopicConfigMapper emsMqttTopicConfigMapper;
@Autowired
private MqttSyncLogMapper mqttSyncLogMapper;
@Autowired
private EmsStrategyRunningMapper emsStrategyRunningMapper;
// 定义切点拦截策略相关表的Mapper方法 // 定义切点拦截策略相关表的Mapper方法
@Pointcut("(execution(* com.xzzn.ems.mapper.EmsStrategyRunningMapper.stopEmsStrategyRunning(..)) && args(id)) ") @Pointcut("(execution(* com.xzzn.ems.mapper.EmsStrategyRunningMapper.stopEmsStrategyRunning(..)) && args(id)) ")
@ -49,8 +56,8 @@ public class StrategySyncAspect {
} }
// 方法执行成功后发布同步消息 // 方法执行成功后发布同步消息
@AfterReturning(pointcut = "updatePointCut(insertEntity)", returning = "result") @AfterReturning(pointcut = "updatePointCut(updateEntity)", returning = "result")
public void afterUpdate(JoinPoint joinPoint, EmsStrategyRunning insertEntity, Integer result) { public void afterUpdate(JoinPoint joinPoint, EmsStrategyRunning updateEntity, Integer result) {
System.out.println("【更新策略切面进入成功】"); System.out.println("【更新策略切面进入成功】");
if (result == 0) { if (result == 0) {
return; return;
@ -58,33 +65,27 @@ public class StrategySyncAspect {
// 解析方法名获取操作类型INSERT/UPDATE/DELETE和表名 // 解析方法名获取操作类型INSERT/UPDATE/DELETE和表名
String methodName = joinPoint.getSignature().getName(); String methodName = joinPoint.getSignature().getName();
String operateType = getOperateType(methodName); String operateType = getOperateType(methodName);
String tableName = getTableNameFromMethod(methodName); // 从Mapper类名提取表名 String siteId = updateEntity.getSiteId();
// 构建日志同步消息 // 构建日志同步消息
MqttSyncStrategyLog message = new MqttSyncStrategyLog(); MqttSyncLog message = createMessageObject(operateType,siteId);
message.setSyncId(UUID.randomUUID().toString());
message.setOperateType(operateType);
message.setTableName(tableName);
message.setCreateTime(new Date());
message.setTopic(STRATEGY_TOPIC);
message.setStatus("SUCCESS");
try { try {
// 数据转换 // 数据转换
String content = convertEntityToJson(insertEntity); String content = convertEntityToJson(updateEntity);
message.setContent(content); message.setContent(content);
// 发布到MQTT主题 // 发布到MQTT主题 - 判断区分本地还是云上
String topic = emsMqttTopicConfigMapper.checkTopicIsExist(STRATEGY_TOPIC); String topic = emsMqttTopicConfigMapper.checkTopicIsExist(STRATEGY_TOPIC);
if (!StringUtils.isEmpty(topic)) { if (StringUtils.isEmpty(topic)) {
mqttPublisher.publish(topic, objectMapper.writeValueAsString(message), 1); mqttPublisher.publish(STRATEGY_TOPIC, objectMapper.writeValueAsString(message), 1);
} }
} catch (Exception e) { } catch (Exception e) {
message.setStatus("FAIL"); message.setStatus("FAIL");
message.setErrorMsg(e.getMessage()); message.setErrorMsg(e.getMessage());
} }
// 存储同步信息 // 存储同步信息
mqttSyncStrategyLogMapper.insertMqttSyncStrategyLog(message); mqttSyncLogMapper.insertMqttSyncLog(message);
} }
@AfterReturning(pointcut = "insertPointCut(insertEntity)", returning = "result") @AfterReturning(pointcut = "insertPointCut(insertEntity)", returning = "result")
@ -96,16 +97,10 @@ public class StrategySyncAspect {
// 解析方法名获取操作类型INSERT/UPDATE/DELETE和表名 // 解析方法名获取操作类型INSERT/UPDATE/DELETE和表名
String methodName = joinPoint.getSignature().getName(); String methodName = joinPoint.getSignature().getName();
String operateType = getOperateType(methodName); String operateType = getOperateType(methodName);
String tableName = getTableNameFromMethod(methodName); // 从Mapper类名提取表名 String siteId = insertEntity.getSiteId();
// 构建日志同步消息 // 构建日志同步消息
MqttSyncStrategyLog message = new MqttSyncStrategyLog(); MqttSyncLog message = createMessageObject(operateType, siteId);
message.setSyncId(UUID.randomUUID().toString());
message.setOperateType(operateType);
message.setTableName(tableName);
message.setCreateTime(new Date());
message.setTopic(STRATEGY_TOPIC);
message.setStatus("SUCCESS");
try { try {
// 数据转换 // 数据转换
@ -113,13 +108,16 @@ public class StrategySyncAspect {
message.setContent(content); message.setContent(content);
// 发布到MQTT主题 // 发布到MQTT主题
mqttPublisher.publish(STRATEGY_TOPIC, objectMapper.writeValueAsString(message), 1); String topic = emsMqttTopicConfigMapper.checkTopicIsExist(STRATEGY_TOPIC);
if (StringUtils.isEmpty(topic)) {
mqttPublisher.publish(STRATEGY_TOPIC, objectMapper.writeValueAsString(message), 1);
}
} catch (Exception e) { } catch (Exception e) {
message.setStatus("FAIL"); message.setStatus("FAIL");
message.setErrorMsg(e.getMessage()); message.setErrorMsg(e.getMessage());
} }
// 存储同步信息 // 存储同步信息
mqttSyncStrategyLogMapper.insertMqttSyncStrategyLog(message); mqttSyncLogMapper.insertMqttSyncLog(message);
} }
@AfterReturning(pointcut = "stopPointCut(id)", returning = "result") @AfterReturning(pointcut = "stopPointCut(id)", returning = "result")
@ -131,17 +129,11 @@ public class StrategySyncAspect {
// 解析方法名获取操作类型INSERT/UPDATE/DELETE和表名 // 解析方法名获取操作类型INSERT/UPDATE/DELETE和表名
String methodName = joinPoint.getSignature().getName(); String methodName = joinPoint.getSignature().getName();
String operateType = getOperateType(methodName); String operateType = getOperateType(methodName);
String tableName = getTableNameFromMethod(methodName); // 从Mapper类名提取表名 EmsStrategyRunning emsStrategyRunning = emsStrategyRunningMapper.selectEmsStrategyRunningById(id);
String siteId = emsStrategyRunning==null ? null : emsStrategyRunning.getSiteId();
// 构建日志同步消息 // 构建日志同步消息
MqttSyncStrategyLog message = new MqttSyncStrategyLog(); MqttSyncLog message = createMessageObject(operateType, siteId);
message.setSyncId(UUID.randomUUID().toString());
message.setOperateType(operateType);
message.setTableName(tableName);
message.setCreateTime(new Date());
message.setTopic(STRATEGY_TOPIC);
message.setStatus("SUCCESS");
try { try {
// 数据转换 // 数据转换
@ -151,15 +143,30 @@ public class StrategySyncAspect {
message.setContent(content); message.setContent(content);
// 发布到MQTT主题 // 发布到MQTT主题
mqttPublisher.publish(STRATEGY_TOPIC, objectMapper.writeValueAsString(message), 1); String topic = emsMqttTopicConfigMapper.checkTopicIsExist(STRATEGY_TOPIC);
if (StringUtils.isEmpty(topic)) {
mqttPublisher.publish(STRATEGY_TOPIC, objectMapper.writeValueAsString(message), 1);
}
} catch (Exception e) { } catch (Exception e) {
message.setStatus("FAIL"); message.setStatus("FAIL");
message.setErrorMsg(e.getMessage()); message.setErrorMsg(e.getMessage());
} }
// 存储同步信息 // 存储同步信息
mqttSyncStrategyLogMapper.insertMqttSyncStrategyLog(message); mqttSyncLogMapper.insertMqttSyncLog(message);
} }
// 构建同步信息
private MqttSyncLog createMessageObject(String operateType, String siteId) {
MqttSyncLog message = new MqttSyncLog();
message.setSyncId(UUID.randomUUID().toString());
message.setOperateType(operateType);
message.setTableName(TABLE_NAME);
message.setCreateTime(new Date());
message.setTopic(STRATEGY_TOPIC);
message.setStatus("SUCCESS");
message.setTarget(siteId);
return message;
}
// 从方法名判断操作类型示例insert→INSERTupdate→UPDATEdelete→DELETE // 从方法名判断操作类型示例insert→INSERTupdate→UPDATEdelete→DELETE
private String getOperateType(String methodName) { private String getOperateType(String methodName) {
if (methodName.startsWith("insert")) return "INSERT"; if (methodName.startsWith("insert")) return "INSERT";
@ -169,12 +176,6 @@ public class StrategySyncAspect {
return "UNKNOWN"; return "UNKNOWN";
} }
// 从Mapper类名提取表名示例StrategyMapper→strategy
private String getTableNameFromMethod(String methodName) {
// 实际需通过JoinPoint获取Mapper类名再转换为表名如StrategyTemplateMapper→strategy_template
return "strategy"; // 简化示例
}
// 从方法参数提取数据示例若参数是实体类转成Map // 从方法参数提取数据示例若参数是实体类转成Map
private Map<String, Object> extractDataFromParams(Object[] args) { private Map<String, Object> extractDataFromParams(Object[] args) {
// 实际需反射获取实体类的字段和值如id、name等 // 实际需反射获取实体类的字段和值如id、name等
@ -240,4 +241,5 @@ public class StrategySyncAspect {
return objectMapper.writeValueAsString(insertEntity); return objectMapper.writeValueAsString(insertEntity);
} }
} }

View File

@ -0,0 +1,158 @@
package com.xzzn.ems.domain;
import com.xzzn.common.core.domain.BaseEntity;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import com.xzzn.common.annotation.Excel;
/**
* MQTT云上本地同步日志对象 mqtt_sync_log
*
* @author xzzn
* @date 2025-11-12
*/
public class MqttSyncLog extends BaseEntity
{
private static final long serialVersionUID = 1L;
/** 主键 */
private Long id;
/** 同步消息唯一标识与消息中的syncId一致 */
@Excel(name = "同步消息唯一标识", readConverterExp = "与=消息中的syncId一致")
private String syncId;
/** MQTT主题 */
@Excel(name = "MQTT主题")
private String topic;
/** 操作类型INSERT/UPDATE/DELETE */
@Excel(name = "操作类型INSERT/UPDATE/DELETE")
private String operateType;
/** 涉及的表名 */
@Excel(name = "涉及的表名")
private String tableName;
/** 同步数据内容JSON格式 */
@Excel(name = "同步数据内容", readConverterExp = "J=SON格式")
private String content;
/** 处理状态SUCCESS/FAIL */
@Excel(name = "处理状态SUCCESS/FAIL")
private String status;
/** 失败原因status=FAIL时填写 */
@Excel(name = "失败原因", readConverterExp = "s=tatus=FAIL时填写")
private String errorMsg;
/** 同步目标 */
@Excel(name = "同步目标")
private String target;
public void setId(Long id)
{
this.id = id;
}
public Long getId()
{
return id;
}
public void setSyncId(String syncId)
{
this.syncId = syncId;
}
public String getSyncId()
{
return syncId;
}
public void setTopic(String topic)
{
this.topic = topic;
}
public String getTopic()
{
return topic;
}
public void setOperateType(String operateType)
{
this.operateType = operateType;
}
public String getOperateType()
{
return operateType;
}
public void setTableName(String tableName)
{
this.tableName = tableName;
}
public String getTableName()
{
return tableName;
}
public void setContent(String content)
{
this.content = content;
}
public String getContent()
{
return content;
}
public void setStatus(String status)
{
this.status = status;
}
public String getStatus()
{
return status;
}
public void setErrorMsg(String errorMsg)
{
this.errorMsg = errorMsg;
}
public String getErrorMsg()
{
return errorMsg;
}
public void setTarget(String target)
{
this.target = target;
}
public String getTarget()
{
return target;
}
@Override
public String toString() {
return new ToStringBuilder(this,ToStringStyle.MULTI_LINE_STYLE)
.append("id", getId())
.append("syncId", getSyncId())
.append("topic", getTopic())
.append("operateType", getOperateType())
.append("tableName", getTableName())
.append("content", getContent())
.append("status", getStatus())
.append("errorMsg", getErrorMsg())
.append("target", getTarget())
.append("createTime", getCreateTime())
.toString();
}
}

View File

@ -2,6 +2,7 @@ package com.xzzn.ems.mapper;
import java.util.List; import java.util.List;
import com.xzzn.ems.domain.EmsFaultProtectionPlan; import com.xzzn.ems.domain.EmsFaultProtectionPlan;
import org.apache.ibatis.annotations.Param;
/** /**
* 故障告警保护方案Mapper接口 * 故障告警保护方案Mapper接口
@ -58,4 +59,9 @@ public interface EmsFaultProtectionPlanMapper
* @return 结果 * @return 结果
*/ */
public int deleteEmsFaultProtectionPlanByIds(Long[] ids); public int deleteEmsFaultProtectionPlanByIds(Long[] ids);
// 根据站点+faultName+faultLevel 同步删除
public void deleteEmsFaultProtectionPlan(@Param("siteId")String siteId,
@Param("faultName")String faultName,
@Param("faultLevel")Integer faultLevel);
} }

View File

@ -0,0 +1,63 @@
package com.xzzn.ems.mapper;
import java.util.List;
import com.xzzn.ems.domain.MqttSyncLog;
/**
* MQTT云上本地同步日志Mapper接口
*
* @author xzzn
* @date 2025-11-12
*/
public interface MqttSyncLogMapper
{
/**
* 查询MQTT云上本地同步日志
*
* @param id MQTT云上本地同步日志主键
* @return MQTT云上本地同步日志
*/
public MqttSyncLog selectMqttSyncLogById(Long id);
/**
* 查询MQTT云上本地同步日志列表
*
* @param mqttSyncLog MQTT云上本地同步日志
* @return MQTT云上本地同步日志集合
*/
public List<MqttSyncLog> selectMqttSyncLogList(MqttSyncLog mqttSyncLog);
/**
* 新增MQTT云上本地同步日志
*
* @param mqttSyncLog MQTT云上本地同步日志
* @return 结果
*/
public int insertMqttSyncLog(MqttSyncLog mqttSyncLog);
/**
* 修改MQTT云上本地同步日志
*
* @param mqttSyncLog MQTT云上本地同步日志
* @return 结果
*/
public int updateMqttSyncLog(MqttSyncLog mqttSyncLog);
/**
* 删除MQTT云上本地同步日志
*
* @param id MQTT云上本地同步日志主键
* @return 结果
*/
public int deleteMqttSyncLogById(Long id);
/**
* 批量删除MQTT云上本地同步日志
*
* @param ids 需要删除的数据主键集合
* @return 结果
*/
public int deleteMqttSyncLogByIds(Long[] ids);
public MqttSyncLog selectMqttSyncLogBySyncId(String syncId);
}

View File

@ -58,4 +58,7 @@ public interface IEmsFaultProtectionPlanService
* @return 结果 * @return 结果
*/ */
public int deleteEmsFaultProtectionPlanById(Long id); public int deleteEmsFaultProtectionPlanById(Long id);
// 处理云上同步设备保护告警信息
public void dealSyncData(String content, String operateType);
} }

View File

@ -0,0 +1,66 @@
package com.xzzn.ems.service;
import java.util.List;
import com.xzzn.ems.domain.MqttSyncLog;
/**
* MQTT云上本地同步日志Service接口
*
* @author xzzn
* @date 2025-11-12
*/
public interface IMqttSyncLogService
{
/**
* 查询MQTT云上本地同步日志
*
* @param id MQTT云上本地同步日志主键
* @return MQTT云上本地同步日志
*/
public MqttSyncLog selectMqttSyncLogById(Long id);
/**
* 查询MQTT云上本地同步日志列表
*
* @param mqttSyncLog MQTT云上本地同步日志
* @return MQTT云上本地同步日志集合
*/
public List<MqttSyncLog> selectMqttSyncLogList(MqttSyncLog mqttSyncLog);
/**
* 新增MQTT云上本地同步日志
*
* @param mqttSyncLog MQTT云上本地同步日志
* @return 结果
*/
public int insertMqttSyncLog(MqttSyncLog mqttSyncLog);
/**
* 修改MQTT云上本地同步日志
*
* @param mqttSyncLog MQTT云上本地同步日志
* @return 结果
*/
public int updateMqttSyncLog(MqttSyncLog mqttSyncLog);
/**
* 批量删除MQTT云上本地同步日志
*
* @param ids 需要删除的MQTT云上本地同步日志主键集合
* @return 结果
*/
public int deleteMqttSyncLogByIds(Long[] ids);
/**
* 删除MQTT云上本地同步日志信息
*
* @param id MQTT云上本地同步日志主键
* @return 结果
*/
public int deleteMqttSyncLogById(Long id);
// 处理策略信息
public void handleMqttStrategyData(String payload);
// 处理设备告警保护信息
public void handleMqttPlanData(MqttSyncLog planLog);
}

View File

@ -1,7 +1,13 @@
package com.xzzn.ems.service.impl; package com.xzzn.ems.service.impl;
import java.util.List; import java.util.List;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.xzzn.common.utils.DateUtils; import com.xzzn.common.utils.DateUtils;
import com.xzzn.common.utils.StringUtils;
import com.xzzn.ems.domain.EmsStrategyRunning;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.xzzn.ems.mapper.EmsFaultProtectionPlanMapper; import com.xzzn.ems.mapper.EmsFaultProtectionPlanMapper;
@ -93,4 +99,44 @@ public class EmsFaultProtectionPlanServiceImpl implements IEmsFaultProtectionPla
{ {
return emsFaultProtectionPlanMapper.deleteEmsFaultProtectionPlanById(id); return emsFaultProtectionPlanMapper.deleteEmsFaultProtectionPlanById(id);
} }
@Override
public void dealSyncData(String content, String operateType) {
if (StringUtils.isEmpty(content)) {
return;
}
EmsFaultProtectionPlan faultProtectionPlan = JSON.parseObject(content, EmsFaultProtectionPlan.class);
switch(operateType) {
case "INSERT":
insertEmsFaultProtectionPlan(faultProtectionPlan);
break;
case "UPDATE":
updateEmsFaultProtectionPlan(faultProtectionPlan);
break;
case "DELETE":
// 根据站点+faultName+faultLevel 删除
String siteId = faultProtectionPlan.getSiteId();
String faultName = faultProtectionPlan.getFaultName();
Integer faultLevel = faultProtectionPlan.getFaultLevel();
emsFaultProtectionPlanMapper.deleteEmsFaultProtectionPlan(siteId, faultName, faultLevel);
break;
default:
break;
}
}
private static Long[] convertJsonToLongArray(String json) {
// 解析 JSON 为 JSONObject
JSONObject jsonObject = JSON.parseObject(json);
// 获取 "id" 对应的 JSONArray
JSONArray idArray = jsonObject.getJSONArray("id");
if (idArray == null) {
return new Long[0]; // 数组不存在时返回空数组
}
// 转换为 Long 数组
return idArray.toArray(new Long[0]);
}
} }

View File

@ -71,10 +71,32 @@ public class EmsStrategyServiceImpl implements IEmsStrategyService
if (StringUtils.isEmpty(content)) { if (StringUtils.isEmpty(content)) {
return; return;
} }
// switch (operateType) { EmsStrategyRunning emsStrategyRunning = JSON.parseObject(content, EmsStrategyRunning.class);
// case "INSERT": switch (operateType) {
// EmsStrategyRunning emsStrategyRunning = JSON.parseObject(content, EmsStrategyRunning.class); case "INSERT":
// emsStrategyRunningMapper.insertEmsStrategyRunning(emsStrategyRunning); emsStrategyRunningMapper.insertEmsStrategyRunning(emsStrategyRunning);
// } break;
case "STOP":
Long id = emsStrategyRunning.getId();
emsStrategyRunningMapper.stopEmsStrategyRunning(id);
break;
case "UPDATE":
String siteId = emsStrategyRunning.getSiteId();
Long mainId = emsStrategyRunning.getMainStrategyId();
Long auxId = emsStrategyRunning.getAuxiliaryStrategyId();
EmsStrategyRunning existStrategy = emsStrategyRunningMapper.getRunningStrategy(siteId);
if (existStrategy != null) { // 存在正在运行的则更新
existStrategy.setMainStrategyId(mainId);
existStrategy.setAuxiliaryStrategyId(auxId);
emsStrategyRunningMapper.updateEmsStrategyRunning(emsStrategyRunning);
} else { // 不存在着插入
emsStrategyRunning.setCreateTime(DateUtils.getNowDate());
emsStrategyRunningMapper.insertEmsStrategyRunning(emsStrategyRunning);
}
break;
default:
// 未知操作类型-不同步
break;
}
} }
} }

View File

@ -0,0 +1,150 @@
package com.xzzn.ems.service.impl;
import java.util.List;
import com.alibaba.fastjson2.JSON;
import com.xzzn.common.utils.DateUtils;
import com.xzzn.common.utils.StringUtils;
import com.xzzn.ems.service.IEmsFaultProtectionPlanService;
import com.xzzn.ems.service.IEmsStrategyService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.xzzn.ems.mapper.MqttSyncLogMapper;
import com.xzzn.ems.domain.MqttSyncLog;
import com.xzzn.ems.service.IMqttSyncLogService;
/**
* MQTT云上本地同步日志Service业务层处理
*
* @author xzzn
* @date 2025-11-12
*/
@Service
public class MqttSyncLogServiceImpl implements IMqttSyncLogService
{
@Autowired
private MqttSyncLogMapper mqttSyncLogMapper;
@Autowired
private IEmsFaultProtectionPlanService emsFaultProtectionPlanService;
@Autowired
private IEmsStrategyService emsStrategyService;
/**
* 查询MQTT云上本地同步日志
*
* @param id MQTT云上本地同步日志主键
* @return MQTT云上本地同步日志
*/
@Override
public MqttSyncLog selectMqttSyncLogById(Long id)
{
return mqttSyncLogMapper.selectMqttSyncLogById(id);
}
/**
* 查询MQTT云上本地同步日志列表
*
* @param mqttSyncLog MQTT云上本地同步日志
* @return MQTT云上本地同步日志
*/
@Override
public List<MqttSyncLog> selectMqttSyncLogList(MqttSyncLog mqttSyncLog)
{
return mqttSyncLogMapper.selectMqttSyncLogList(mqttSyncLog);
}
/**
* 新增MQTT云上本地同步日志
*
* @param mqttSyncLog MQTT云上本地同步日志
* @return 结果
*/
@Override
public int insertMqttSyncLog(MqttSyncLog mqttSyncLog)
{
mqttSyncLog.setCreateTime(DateUtils.getNowDate());
return mqttSyncLogMapper.insertMqttSyncLog(mqttSyncLog);
}
/**
* 修改MQTT云上本地同步日志
*
* @param mqttSyncLog MQTT云上本地同步日志
* @return 结果
*/
@Override
public int updateMqttSyncLog(MqttSyncLog mqttSyncLog)
{
return mqttSyncLogMapper.updateMqttSyncLog(mqttSyncLog);
}
/**
* 批量删除MQTT云上本地同步日志
*
* @param ids 需要删除的MQTT云上本地同步日志主键
* @return 结果
*/
@Override
public int deleteMqttSyncLogByIds(Long[] ids)
{
return mqttSyncLogMapper.deleteMqttSyncLogByIds(ids);
}
/**
* 删除MQTT云上本地同步日志信息
*
* @param id MQTT云上本地同步日志主键
* @return 结果
*/
@Override
public int deleteMqttSyncLogById(Long id)
{
return mqttSyncLogMapper.deleteMqttSyncLogById(id);
}
/**
* 处理云上运行策略数据
* @param payload
*/
@Override
public void handleMqttStrategyData(String payload) {
MqttSyncLog syncLog = JSON.parseObject(payload, MqttSyncLog.class);
if (syncLog != null) {
// 校验是否存在
String syncId = syncLog.getSyncId();
MqttSyncLog existLog = mqttSyncLogMapper.selectMqttSyncLogBySyncId(syncId);
if (existLog == null) {
// 根据不同操作更新
String operateType = syncLog.getOperateType();
if (!StringUtils.isEmpty(operateType)) {
emsStrategyService.dealStrategyData(syncLog.getContent(),operateType);
}
}
// 保存日志
insertMqttSyncLog(syncLog);
}
}
/**
* 处理云上同步的告警保护信息
* @param planLog
*/
@Override
public void handleMqttPlanData(MqttSyncLog planLog) {
// 校验是否存在
String syncId = planLog.getSyncId();
MqttSyncLog existLog = mqttSyncLogMapper.selectMqttSyncLogBySyncId(syncId);
if (existLog != null) {
return;
}
// 保存日志
insertMqttSyncLog(planLog);
// 处理数据
String operateType = planLog.getOperateType();
if (!StringUtils.isEmpty(operateType)) {
emsFaultProtectionPlanService.dealSyncData(planLog.getContent(),operateType);
}
}
}

View File

@ -114,4 +114,11 @@
#{id} #{id}
</foreach> </foreach>
</delete> </delete>
<delete id="deleteEmsFaultProtectionPlan">
delete from ems_fault_protection_plan
where site_id = #{siteId}
and fault_name = #{faultName}
and fault_level = #{faultLevel}
</delete>
</mapper> </mapper>

View File

@ -0,0 +1,100 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.xzzn.ems.mapper.MqttSyncLogMapper">
<resultMap type="MqttSyncLog" id="MqttSyncLogResult">
<result property="id" column="id" />
<result property="syncId" column="sync_id" />
<result property="topic" column="topic" />
<result property="operateType" column="operate_type" />
<result property="tableName" column="table_name" />
<result property="content" column="content" />
<result property="status" column="status" />
<result property="errorMsg" column="error_msg" />
<result property="target" column="target" />
<result property="createTime" column="create_time" />
</resultMap>
<sql id="selectMqttSyncLogVo">
select id, sync_id, topic, operate_type, table_name, content, status, error_msg, target, create_time from mqtt_sync_log
</sql>
<select id="selectMqttSyncLogList" parameterType="MqttSyncLog" resultMap="MqttSyncLogResult">
<include refid="selectMqttSyncLogVo"/>
<where>
<if test="syncId != null and syncId != ''"> and sync_id = #{syncId}</if>
<if test="topic != null and topic != ''"> and topic = #{topic}</if>
<if test="operateType != null and operateType != ''"> and operate_type = #{operateType}</if>
<if test="tableName != null and tableName != ''"> and table_name like concat('%', #{tableName}, '%')</if>
<if test="content != null and content != ''"> and content = #{content}</if>
<if test="status != null and status != ''"> and status = #{status}</if>
<if test="errorMsg != null and errorMsg != ''"> and error_msg = #{errorMsg}</if>
<if test="target != null and target != ''"> and target = #{target}</if>
</where>
</select>
<select id="selectMqttSyncLogById" parameterType="Long" resultMap="MqttSyncLogResult">
<include refid="selectMqttSyncLogVo"/>
where id = #{id}
</select>
<insert id="insertMqttSyncLog" parameterType="MqttSyncLog" useGeneratedKeys="true" keyProperty="id">
insert into mqtt_sync_log
<trim prefix="(" suffix=")" suffixOverrides=",">
<if test="syncId != null and syncId != ''">sync_id,</if>
<if test="topic != null and topic != ''">topic,</if>
<if test="operateType != null and operateType != ''">operate_type,</if>
<if test="tableName != null and tableName != ''">table_name,</if>
<if test="content != null">content,</if>
<if test="status != null and status != ''">status,</if>
<if test="errorMsg != null">error_msg,</if>
<if test="target != null">target,</if>
<if test="createTime != null">create_time,</if>
</trim>
<trim prefix="values (" suffix=")" suffixOverrides=",">
<if test="syncId != null and syncId != ''">#{syncId},</if>
<if test="topic != null and topic != ''">#{topic},</if>
<if test="operateType != null and operateType != ''">#{operateType},</if>
<if test="tableName != null and tableName != ''">#{tableName},</if>
<if test="content != null">#{content},</if>
<if test="status != null and status != ''">#{status},</if>
<if test="errorMsg != null">#{errorMsg},</if>
<if test="target != null">#{target},</if>
<if test="createTime != null">#{createTime},</if>
</trim>
</insert>
<update id="updateMqttSyncLog" parameterType="MqttSyncLog">
update mqtt_sync_log
<trim prefix="SET" suffixOverrides=",">
<if test="syncId != null and syncId != ''">sync_id = #{syncId},</if>
<if test="topic != null and topic != ''">topic = #{topic},</if>
<if test="operateType != null and operateType != ''">operate_type = #{operateType},</if>
<if test="tableName != null and tableName != ''">table_name = #{tableName},</if>
<if test="content != null">content = #{content},</if>
<if test="status != null and status != ''">status = #{status},</if>
<if test="errorMsg != null">error_msg = #{errorMsg},</if>
<if test="target != null">target = #{target},</if>
<if test="createTime != null">create_time = #{createTime},</if>
</trim>
where id = #{id}
</update>
<delete id="deleteMqttSyncLogById" parameterType="Long">
delete from mqtt_sync_log where id = #{id}
</delete>
<delete id="deleteMqttSyncLogByIds" parameterType="String">
delete from mqtt_sync_log where id in
<foreach item="id" collection="array" open="(" separator="," close=")">
#{id}
</foreach>
</delete>
<select id="selectMqttSyncLogBySyncId" parameterType="String" resultMap="MqttSyncLogResult">
<include refid="selectMqttSyncLogVo"/>
where sync_id = #{syncId}
</select>
</mapper>