验收2-同步策略运行

This commit is contained in:
2025-11-11 10:37:46 +08:00
parent 5381cd597c
commit 49a83fd420
3 changed files with 261 additions and 0 deletions

View File

@ -0,0 +1,243 @@
package com.xzzn.framework.aspectj;
import com.alibaba.fastjson2.JSON;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.xzzn.common.utils.StringUtils;
import com.xzzn.ems.domain.EmsStrategyRunning;
import com.xzzn.ems.domain.MqttSyncStrategyLog;
import com.xzzn.ems.mapper.EmsMqttTopicConfigMapper;
import com.xzzn.ems.mapper.MqttSyncStrategyLogMapper;
import com.xzzn.framework.web.service.MqttPublisher;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cglib.beans.BeanMap;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@Aspect
@Component
public class StrategySyncAspect {
@Autowired
private MqttPublisher mqttPublisher;
@Autowired
private MqttSyncStrategyLogMapper mqttSyncStrategyLogMapper;
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final String STRATEGY_TOPIC = "EMS_STRATEGY_UP";
@Autowired
private EmsMqttTopicConfigMapper emsMqttTopicConfigMapper;
// 定义切点拦截策略相关表的Mapper方法
@Pointcut("(execution(* com.xzzn.ems.mapper.EmsStrategyRunningMapper.stopEmsStrategyRunning(..)) && args(id)) ")
public void stopPointCut(Long id) {
System.out.println("【停止策略切面】StrategyAspect 被实例化");
}
@Pointcut("(execution(* com.xzzn.ems.mapper.EmsStrategyRunningMapper.insertEmsStrategyRunning(..)) && args(insertEntity)) ")
public void insertPointCut(EmsStrategyRunning insertEntity) {
System.out.println("【新增策略切面】StrategyAspect 被实例化");
}
@Pointcut("(execution(* com.xzzn.ems.mapper.EmsStrategyRunningMapper.updateEmsStrategyRunning(..)) && args(updateEntity)) ")
public void updatePointCut(EmsStrategyRunning updateEntity) {
System.out.println("【更新策略切面】StrategyAspect 被实例化");
}
// 方法执行成功后发布同步消息
@AfterReturning(pointcut = "updatePointCut(insertEntity)", returning = "result")
public void afterUpdate(JoinPoint joinPoint, EmsStrategyRunning insertEntity, Integer result) {
System.out.println("【更新策略切面进入成功】");
if (result == 0) {
return;
}
// 解析方法名获取操作类型INSERT/UPDATE/DELETE和表名
String methodName = joinPoint.getSignature().getName();
String operateType = getOperateType(methodName);
String tableName = getTableNameFromMethod(methodName); // 从Mapper类名提取表名
// 构建日志同步消息
MqttSyncStrategyLog message = new MqttSyncStrategyLog();
message.setSyncId(UUID.randomUUID().toString());
message.setOperateType(operateType);
message.setTableName(tableName);
message.setCreateTime(new Date());
message.setTopic(STRATEGY_TOPIC);
message.setStatus("SUCCESS");
try {
// 数据转换
String content = convertEntityToJson(insertEntity);
message.setContent(content);
// 发布到MQTT主题
String topic = emsMqttTopicConfigMapper.checkTopicIsExist(STRATEGY_TOPIC);
if (!StringUtils.isEmpty(topic)) {
mqttPublisher.publish(topic, objectMapper.writeValueAsString(message), 1);
}
} catch (Exception e) {
message.setStatus("FAIL");
message.setErrorMsg(e.getMessage());
}
// 存储同步信息
mqttSyncStrategyLogMapper.insertMqttSyncStrategyLog(message);
}
@AfterReturning(pointcut = "insertPointCut(insertEntity)", returning = "result")
public void afterInsert(JoinPoint joinPoint, EmsStrategyRunning insertEntity, Integer result) {
System.out.println("【新增策略切面进入成功】");
if (result == 0) {
return;
}
// 解析方法名获取操作类型INSERT/UPDATE/DELETE和表名
String methodName = joinPoint.getSignature().getName();
String operateType = getOperateType(methodName);
String tableName = getTableNameFromMethod(methodName); // 从Mapper类名提取表名
// 构建日志同步消息
MqttSyncStrategyLog message = new MqttSyncStrategyLog();
message.setSyncId(UUID.randomUUID().toString());
message.setOperateType(operateType);
message.setTableName(tableName);
message.setCreateTime(new Date());
message.setTopic(STRATEGY_TOPIC);
message.setStatus("SUCCESS");
try {
// 数据转换
String content = convertEntityToJson(insertEntity);
message.setContent(content);
// 发布到MQTT主题
mqttPublisher.publish(STRATEGY_TOPIC, objectMapper.writeValueAsString(message), 1);
} catch (Exception e) {
message.setStatus("FAIL");
message.setErrorMsg(e.getMessage());
}
// 存储同步信息
mqttSyncStrategyLogMapper.insertMqttSyncStrategyLog(message);
}
@AfterReturning(pointcut = "stopPointCut(id)", returning = "result")
public void afterStop(JoinPoint joinPoint, Long id, Integer result) {
System.out.println("【停止策略切面进入成功】");
if (result == 0) {
return;
}
// 解析方法名获取操作类型INSERT/UPDATE/DELETE和表名
String methodName = joinPoint.getSignature().getName();
String operateType = getOperateType(methodName);
String tableName = getTableNameFromMethod(methodName); // 从Mapper类名提取表名
// 构建日志同步消息
MqttSyncStrategyLog message = new MqttSyncStrategyLog();
message.setSyncId(UUID.randomUUID().toString());
message.setOperateType(operateType);
message.setTableName(tableName);
message.setCreateTime(new Date());
message.setTopic(STRATEGY_TOPIC);
message.setStatus("SUCCESS");
try {
// 数据转换
Map<String, Object> idMap = new HashMap<>();
idMap.put("id", id); // 手动将参数值映射到"id"字段
String content = JSON.toJSONString(idMap);
message.setContent(content);
// 发布到MQTT主题
mqttPublisher.publish(STRATEGY_TOPIC, objectMapper.writeValueAsString(message), 1);
} catch (Exception e) {
message.setStatus("FAIL");
message.setErrorMsg(e.getMessage());
}
// 存储同步信息
mqttSyncStrategyLogMapper.insertMqttSyncStrategyLog(message);
}
// 从方法名判断操作类型示例insert→INSERTupdate→UPDATEdelete→DELETE
private String getOperateType(String methodName) {
if (methodName.startsWith("insert")) return "INSERT";
if (methodName.startsWith("stop")) return "STOP";
if (methodName.startsWith("update") || methodName.startsWith("stop")) return "UPDATE";
if (methodName.startsWith("delete")) return "DELETE";
return "UNKNOWN";
}
// 从Mapper类名提取表名示例StrategyMapper→strategy
private String getTableNameFromMethod(String methodName) {
// 实际需通过JoinPoint获取Mapper类名再转换为表名如StrategyTemplateMapper→strategy_template
return "strategy"; // 简化示例
}
// 从方法参数提取数据示例若参数是实体类转成Map
private Map<String, Object> extractDataFromParams(Object[] args) {
// 实际需反射获取实体类的字段和值如id、name等
Map<String, Object> data = new HashMap<>();
if (args == null || args.length == 0) {
return data;
}
for (int i = 0; i < args.length; i++) {
Object arg = args[i];
if (arg == null) {
continue; // 跳过null参数
}
// 处理基本类型/包装类/字符串直接作为值存入key为"param0"、"param1"等)
if (isBasicType(arg.getClass())) {
String key = "param" + i; // 基本类型参数用"param0"、"param1"作为key
data.put(key, arg);
} else {
Map<String, Object> beanMap = beanToMap(arg);
data.putAll(beanMap); // 合并实体类的字段到结果Map
}
}
return data;
}
/**
* 判断是否为基本类型或包装类或字符串
*/
private boolean isBasicType(Class<?> clazz) {
return clazz.isPrimitive() // 基本类型int、long、boolean等
|| clazz == String.class // 字符串
|| Number.class.isAssignableFrom(clazz) // 数字包装类Integer、Long等
|| clazz == Boolean.class; // 布尔包装类
}
/**
* 将实体类转换为Map字段名为key字段值为value
*/
private Map<String, Object> beanToMap(Object bean) {
Map<String, Object> map = new HashMap<>();
if (bean == null) {
return map;
}
// 方式1使用BeanMap简洁高效
BeanMap beanMap = BeanMap.create(bean);
for (Object key : beanMap.keySet()) {
map.put(key.toString(), beanMap.get(key));
}
return map;
}
// 在方法中转换
public String convertEntityToJson(EmsStrategyRunning insertEntity) throws Exception {
if (insertEntity == null) {
return null; // 空对象返回空JSON
}
// 将实体类转换为JSON字符串
return objectMapper.writeValueAsString(insertEntity);
}
}

View File

@ -27,4 +27,7 @@ public interface IEmsStrategyService
public List<EmsStrategy> getAuxStrategyList();
public int configStrategy(EmsStrategyRunning emsStrategyRunning);
// 接收云上运行策略配置
public void dealStrategyData(String content, String operateType);
}

View File

@ -1,7 +1,10 @@
package com.xzzn.ems.service.impl;
import java.util.List;
import com.alibaba.fastjson2.JSON;
import com.xzzn.common.utils.DateUtils;
import com.xzzn.common.utils.StringUtils;
import com.xzzn.ems.domain.EmsStrategyRunning;
import com.xzzn.ems.domain.vo.StrategyRunningVo;
import com.xzzn.ems.mapper.EmsStrategyRunningMapper;
@ -62,4 +65,16 @@ public class EmsStrategyServiceImpl implements IEmsStrategyService
emsStrategyRunning.setCreateTime(DateUtils.getNowDate());
return emsStrategyRunningMapper.insertEmsStrategyRunning(emsStrategyRunning);
}
@Override
public void dealStrategyData(String content, String operateType) {
if (StringUtils.isEmpty(content)) {
return;
}
// switch (operateType) {
// case "INSERT":
// EmsStrategyRunning emsStrategyRunning = JSON.parseObject(content, EmsStrategyRunning.class);
// emsStrategyRunningMapper.insertEmsStrategyRunning(emsStrategyRunning);
// }
}
}