【验收】2 运行策略配置同步
This commit is contained in:
@ -5,10 +5,13 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.xzzn.common.utils.StringUtils;
|
||||
import com.xzzn.ems.domain.EmsStrategyRunning;
|
||||
import com.xzzn.ems.domain.MqttSyncLog;
|
||||
import com.xzzn.ems.domain.vo.StrategyRunningVo;
|
||||
import com.xzzn.ems.mapper.EmsMqttTopicConfigMapper;
|
||||
import com.xzzn.ems.mapper.EmsStrategyRunningMapper;
|
||||
import com.xzzn.ems.mapper.MqttSyncLogMapper;
|
||||
import com.xzzn.framework.web.service.MqttPublisher;
|
||||
import org.apache.juli.logging.Log;
|
||||
import org.apache.juli.logging.LogFactory;
|
||||
import org.aspectj.lang.JoinPoint;
|
||||
import org.aspectj.lang.annotation.AfterReturning;
|
||||
import org.aspectj.lang.annotation.Aspect;
|
||||
@ -17,22 +20,22 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cglib.beans.BeanMap;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* 策略运行切面同步
|
||||
* 云端 - 本地
|
||||
*/
|
||||
@Aspect
|
||||
@Component
|
||||
public class StrategySyncAspect {
|
||||
public class StrategyRunningSyncAspect {
|
||||
|
||||
private static final Log logger = LogFactory.getLog(StrategyRunningSyncAspect.class);
|
||||
@Autowired
|
||||
private MqttPublisher mqttPublisher;
|
||||
|
||||
private static final ObjectMapper objectMapper = new ObjectMapper();
|
||||
private static final String STRATEGY_TOPIC = "EMS_STRATEGY_UP";
|
||||
private static final String MQTT_TOPIC = "EMS_STRATEGY_UP";
|
||||
private static final String TABLE_NAME = "ems_strategy_running";
|
||||
@Autowired
|
||||
private EmsMqttTopicConfigMapper emsMqttTopicConfigMapper;
|
||||
@ -41,27 +44,32 @@ public class StrategySyncAspect {
|
||||
@Autowired
|
||||
private EmsStrategyRunningMapper emsStrategyRunningMapper;
|
||||
|
||||
// 定义切点:拦截策略相关表的Mapper方法
|
||||
@Pointcut("(execution(* com.xzzn.ems.mapper.EmsStrategyRunningMapper.stopEmsStrategyRunning(..)) && args(id)) ")
|
||||
public void stopPointCut(Long id) {
|
||||
System.out.println("【停止策略切面】StrategyAspect 被实例化");
|
||||
logger.info("【停止策略切面】StrategyAspect 被实例化");
|
||||
}
|
||||
@Pointcut("(execution(* com.xzzn.ems.mapper.EmsStrategyRunningMapper.insertEmsStrategyRunning(..)) && args(insertEntity)) ")
|
||||
public void insertPointCut(EmsStrategyRunning insertEntity) {
|
||||
System.out.println("【新增策略切面】StrategyAspect 被实例化");
|
||||
logger.info("【新增策略切面】StrategyAspect 被实例化");
|
||||
}
|
||||
@Pointcut("(execution(* com.xzzn.ems.mapper.EmsStrategyRunningMapper.updateEmsStrategyRunning(..)) && args(updateEntity)) ")
|
||||
public void updatePointCut(EmsStrategyRunning updateEntity) {
|
||||
System.out.println("【更新策略切面】StrategyAspect 被实例化");
|
||||
logger.info("【更新策略切面】StrategyAspect 被实例化");
|
||||
}
|
||||
|
||||
// 方法执行成功后发布同步消息
|
||||
@AfterReturning(pointcut = "updatePointCut(updateEntity)", returning = "result")
|
||||
public void afterUpdate(JoinPoint joinPoint, EmsStrategyRunning updateEntity, Integer result) {
|
||||
System.out.println("【更新策略切面进入成功】");
|
||||
if (result == 0) {
|
||||
logger.info("【更新策略切面进入成功】");
|
||||
if (result == 0 || updateEntity == null) {
|
||||
return;
|
||||
}
|
||||
// 校验是否配置监听topic-监听则不发布
|
||||
String topic = emsMqttTopicConfigMapper.checkTopicIsExist(MQTT_TOPIC);
|
||||
if (!StringUtils.isEmpty(topic)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 解析方法名,获取操作类型(INSERT/UPDATE/DELETE)和表名
|
||||
String methodName = joinPoint.getSignature().getName();
|
||||
String operateType = getOperateType(methodName);
|
||||
@ -72,14 +80,16 @@ public class StrategySyncAspect {
|
||||
|
||||
try {
|
||||
// 数据转换
|
||||
String content = convertEntityToJson(updateEntity);
|
||||
List<StrategyRunningVo> runningVos = emsStrategyRunningMapper.getRunningList(siteId);
|
||||
if (runningVos == null || runningVos.size() == 0) {
|
||||
return;
|
||||
}
|
||||
StrategyRunningVo runningVo = runningVos.get(0);
|
||||
String content = objectMapper.writeValueAsString(runningVo);
|
||||
message.setContent(content);
|
||||
|
||||
// 发布到MQTT主题 - 判断区分本地还是云上
|
||||
String topic = emsMqttTopicConfigMapper.checkTopicIsExist(STRATEGY_TOPIC);
|
||||
if (StringUtils.isEmpty(topic)) {
|
||||
mqttPublisher.publish(STRATEGY_TOPIC, objectMapper.writeValueAsString(message), 1);
|
||||
}
|
||||
// 发布到MQTT主题
|
||||
mqttPublisher.publish(MQTT_TOPIC, objectMapper.writeValueAsString(message), 1);
|
||||
} catch (Exception e) {
|
||||
message.setStatus("FAIL");
|
||||
message.setErrorMsg(e.getMessage());
|
||||
@ -90,10 +100,16 @@ public class StrategySyncAspect {
|
||||
|
||||
@AfterReturning(pointcut = "insertPointCut(insertEntity)", returning = "result")
|
||||
public void afterInsert(JoinPoint joinPoint, EmsStrategyRunning insertEntity, Integer result) {
|
||||
System.out.println("【新增策略切面进入成功】");
|
||||
if (result == 0) {
|
||||
logger.info("【新增策略切面进入成功】");
|
||||
if (result == 0 || insertEntity == null) {
|
||||
return;
|
||||
}
|
||||
// 校验是否配置监听topic-监听则不发布
|
||||
String topic = emsMqttTopicConfigMapper.checkTopicIsExist(MQTT_TOPIC);
|
||||
if (!StringUtils.isEmpty(topic)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 解析方法名,获取操作类型(INSERT/UPDATE/DELETE)和表名
|
||||
String methodName = joinPoint.getSignature().getName();
|
||||
String operateType = getOperateType(methodName);
|
||||
@ -104,14 +120,16 @@ public class StrategySyncAspect {
|
||||
|
||||
try {
|
||||
// 数据转换
|
||||
String content = convertEntityToJson(insertEntity);
|
||||
List<StrategyRunningVo> runningVos = emsStrategyRunningMapper.getRunningList(siteId);
|
||||
if (runningVos == null || runningVos.size() == 0) {
|
||||
return;
|
||||
}
|
||||
StrategyRunningVo runningVo = runningVos.get(0);
|
||||
String content = objectMapper.writeValueAsString(runningVo);
|
||||
message.setContent(content);
|
||||
|
||||
// 发布到MQTT主题
|
||||
String topic = emsMqttTopicConfigMapper.checkTopicIsExist(STRATEGY_TOPIC);
|
||||
if (StringUtils.isEmpty(topic)) {
|
||||
mqttPublisher.publish(STRATEGY_TOPIC, objectMapper.writeValueAsString(message), 1);
|
||||
}
|
||||
mqttPublisher.publish(MQTT_TOPIC, objectMapper.writeValueAsString(message), 1);
|
||||
} catch (Exception e) {
|
||||
message.setStatus("FAIL");
|
||||
message.setErrorMsg(e.getMessage());
|
||||
@ -122,10 +140,16 @@ public class StrategySyncAspect {
|
||||
|
||||
@AfterReturning(pointcut = "stopPointCut(id)", returning = "result")
|
||||
public void afterStop(JoinPoint joinPoint, Long id, Integer result) {
|
||||
System.out.println("【停止策略切面进入成功】");
|
||||
if (result == 0) {
|
||||
logger.info("【停止策略切面进入成功】");
|
||||
if (result == 0 || id == null) {
|
||||
return;
|
||||
}
|
||||
// 校验是否配置监听topic-监听则不发布
|
||||
String topic = emsMqttTopicConfigMapper.checkTopicIsExist(MQTT_TOPIC);
|
||||
if (!StringUtils.isEmpty(topic)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 解析方法名,获取操作类型(INSERT/UPDATE/DELETE)和表名
|
||||
String methodName = joinPoint.getSignature().getName();
|
||||
String operateType = getOperateType(methodName);
|
||||
@ -143,10 +167,7 @@ public class StrategySyncAspect {
|
||||
message.setContent(content);
|
||||
|
||||
// 发布到MQTT主题
|
||||
String topic = emsMqttTopicConfigMapper.checkTopicIsExist(STRATEGY_TOPIC);
|
||||
if (StringUtils.isEmpty(topic)) {
|
||||
mqttPublisher.publish(STRATEGY_TOPIC, objectMapper.writeValueAsString(message), 1);
|
||||
}
|
||||
mqttPublisher.publish(MQTT_TOPIC, objectMapper.writeValueAsString(message), 1);
|
||||
} catch (Exception e) {
|
||||
message.setStatus("FAIL");
|
||||
message.setErrorMsg(e.getMessage());
|
||||
@ -162,8 +183,9 @@ public class StrategySyncAspect {
|
||||
message.setOperateType(operateType);
|
||||
message.setTableName(TABLE_NAME);
|
||||
message.setCreateTime(new Date());
|
||||
message.setTopic(STRATEGY_TOPIC);
|
||||
message.setTopic(MQTT_TOPIC);
|
||||
message.setStatus("SUCCESS");
|
||||
message.setSyncObject("CLOUD");
|
||||
message.setTarget(siteId);
|
||||
return message;
|
||||
}
|
||||
@ -0,0 +1,250 @@
|
||||
package com.xzzn.framework.aspectj;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.xzzn.common.utils.StringUtils;
|
||||
import com.xzzn.common.utils.bean.BeanUtils;
|
||||
import com.xzzn.ems.domain.*;
|
||||
import com.xzzn.ems.domain.vo.SyncStrategyTempVo;
|
||||
import com.xzzn.ems.mapper.*;
|
||||
import com.xzzn.framework.web.service.MqttPublisher;
|
||||
import org.apache.juli.logging.Log;
|
||||
import org.apache.juli.logging.LogFactory;
|
||||
import org.aspectj.lang.JoinPoint;
|
||||
import org.aspectj.lang.annotation.AfterReturning;
|
||||
import org.aspectj.lang.annotation.Aspect;
|
||||
import org.aspectj.lang.annotation.Before;
|
||||
import org.aspectj.lang.annotation.Pointcut;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cglib.beans.BeanMap;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* 策略模板数据同步
|
||||
* 云端 - 本地
|
||||
*/
|
||||
@Aspect
|
||||
@Component
|
||||
public class StrategyTempSyncAspect {
|
||||
|
||||
private static final Log logger = LogFactory.getLog(StrategyTempSyncAspect.class);
|
||||
@Autowired
|
||||
private MqttPublisher mqttPublisher;
|
||||
|
||||
private static final ObjectMapper objectMapper = new ObjectMapper();
|
||||
private static final String MQTT_TOPIC = "EMS_STRATEGY_UP";
|
||||
private static final String TABLE_NAME = "ems_strategy_temp";
|
||||
@Autowired
|
||||
private EmsMqttTopicConfigMapper emsMqttTopicConfigMapper;
|
||||
@Autowired
|
||||
private MqttSyncLogMapper mqttSyncLogMapper;
|
||||
@Autowired
|
||||
private EmsStrategyMapper emsStrategyMapper;
|
||||
@Autowired
|
||||
private EmsStrategyTempMapper emsStrategyTempMapper;
|
||||
|
||||
// 用ThreadLocal暂存删除前的对象
|
||||
private ThreadLocal<EmsStrategyTemp> beforeDeleteThreadLocal = new ThreadLocal<>();
|
||||
@Before("execution(* com.xzzn.ems.mapper.EmsStrategyTempMapper.deleteEmsStrategyTempById(..)) && args(templateId)")
|
||||
public void beforeDelete(JoinPoint joinPoint, String templateId) {
|
||||
// 查询删除前的数据-仅存一获取siteId
|
||||
List<EmsStrategyTemp> tempList = emsStrategyTempMapper.selectStrategyTempByTempId(templateId);
|
||||
if (tempList != null && tempList.size() > 0) {
|
||||
beforeDeleteThreadLocal.set(tempList.get(0)); // 暂存
|
||||
}
|
||||
}
|
||||
|
||||
@Pointcut("(execution(* com.xzzn.ems.mapper.EmsStrategyTempMapper.deleteEmsStrategyTempById(..)) && args(templateId)) ")
|
||||
public void deletePointCut(String templateId) {
|
||||
logger.info("【删除策略模版切面】StrategyTempSyncAspect 被实例化");
|
||||
}
|
||||
@Pointcut("(execution(* com.xzzn.ems.mapper.EmsStrategyTempMapper.insertEmsStrategyTemp(..)) && args(insertEntity)) ")
|
||||
public void insertPointCut(EmsStrategyTemp insertEntity) {
|
||||
logger.info("【新增策略模版切面】StrategyTempSyncAspect 被实例化");
|
||||
}
|
||||
|
||||
// 方法执行成功后发布同步消息
|
||||
@AfterReturning(pointcut = "insertPointCut(insertEntity)", returning = "result")
|
||||
public void afterInsert(JoinPoint joinPoint, EmsStrategyTemp insertEntity, Integer result) {
|
||||
logger.info("【新增策略切面进入成功】");
|
||||
if (result == 0 || insertEntity == null) {
|
||||
return;
|
||||
}
|
||||
// 校验是否配置监听topic-监听则不发布
|
||||
String topic = emsMqttTopicConfigMapper.checkTopicIsExist(MQTT_TOPIC);
|
||||
if (!StringUtils.isEmpty(topic)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 解析方法名,获取操作类型(INSERT/UPDATE/DELETE)和表名
|
||||
String methodName = joinPoint.getSignature().getName();
|
||||
String operateType = getOperateType(methodName);
|
||||
String siteId = insertEntity.getSiteId();
|
||||
|
||||
// 构建日志同步消息
|
||||
MqttSyncLog message = createMessageObject(operateType, siteId);
|
||||
|
||||
try {
|
||||
// 校验策略id是否存在
|
||||
Long strategyId = insertEntity.getStrategyId();
|
||||
if (strategyId == null) {
|
||||
return;
|
||||
}
|
||||
// 数据转换
|
||||
SyncStrategyTempVo tempVo = convertEntity(insertEntity);
|
||||
String content = objectMapper.writeValueAsString(tempVo);
|
||||
message.setContent(content);
|
||||
|
||||
// 发布到MQTT主题
|
||||
mqttPublisher.publish(MQTT_TOPIC, objectMapper.writeValueAsString(message), 1);
|
||||
} catch (Exception e) {
|
||||
message.setStatus("FAIL");
|
||||
message.setErrorMsg(e.getMessage());
|
||||
}
|
||||
// 存储同步信息
|
||||
mqttSyncLogMapper.insertMqttSyncLog(message);
|
||||
}
|
||||
private SyncStrategyTempVo convertEntity(EmsStrategyTemp insertEntity) {
|
||||
SyncStrategyTempVo tempVo = new SyncStrategyTempVo();
|
||||
BeanUtils.copyProperties(insertEntity, tempVo);
|
||||
EmsStrategy strategy = emsStrategyMapper.selectEmsStrategyById(insertEntity.getStrategyId());
|
||||
if (strategy != null) {
|
||||
tempVo.setStrategyName(strategy.getStrategyName());
|
||||
tempVo.setStrategyType(strategy.getStrategyType());
|
||||
}
|
||||
return tempVo;
|
||||
}
|
||||
|
||||
@AfterReturning(pointcut = "deletePointCut(templateId)", returning = "result")
|
||||
public void afterDelete(JoinPoint joinPoint, String templateId, Integer result) {
|
||||
logger.info("【删除策略模版切面进入成功】");
|
||||
if (result == 0 || StringUtils.isEmpty(templateId)) {
|
||||
return;
|
||||
}
|
||||
// 校验是否配置监听topic-监听则不发布
|
||||
String topic = emsMqttTopicConfigMapper.checkTopicIsExist(MQTT_TOPIC);
|
||||
if (!StringUtils.isEmpty(topic)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 解析方法名,获取操作类型(INSERT/UPDATE/DELETE)和表名
|
||||
String methodName = joinPoint.getSignature().getName();
|
||||
String operateType = getOperateType(methodName);
|
||||
// 从ThreadLocal中获取删除前的对象
|
||||
EmsStrategyTemp strategyTemp = beforeDeleteThreadLocal.get();
|
||||
String siteId = "";
|
||||
if (strategyTemp != null) {
|
||||
siteId = strategyTemp.getSiteId();
|
||||
}
|
||||
|
||||
// 构建日志同步消息
|
||||
MqttSyncLog message = createMessageObject(operateType, siteId);
|
||||
|
||||
try {
|
||||
// 数据转换
|
||||
Map<String, Object> idMap = new HashMap<>();
|
||||
idMap.put("templateId", templateId); // 手动将参数值映射到"id"字段
|
||||
String content = JSON.toJSONString(idMap);
|
||||
message.setContent(content);
|
||||
|
||||
// 发布到MQTT主题
|
||||
mqttPublisher.publish(MQTT_TOPIC, objectMapper.writeValueAsString(message), 1);
|
||||
} catch (Exception e) {
|
||||
message.setStatus("FAIL");
|
||||
message.setErrorMsg(e.getMessage());
|
||||
}
|
||||
// 存储同步信息
|
||||
mqttSyncLogMapper.insertMqttSyncLog(message);
|
||||
}
|
||||
|
||||
// 构建同步信息
|
||||
private MqttSyncLog createMessageObject(String operateType, String siteId) {
|
||||
MqttSyncLog message = new MqttSyncLog();
|
||||
message.setSyncId(UUID.randomUUID().toString());
|
||||
message.setOperateType(operateType);
|
||||
message.setTableName(TABLE_NAME);
|
||||
message.setCreateTime(new Date());
|
||||
message.setTopic(MQTT_TOPIC);
|
||||
message.setStatus("SUCCESS");
|
||||
message.setSyncObject("CLOUD");
|
||||
message.setTarget(siteId);
|
||||
return message;
|
||||
}
|
||||
// 从方法名判断操作类型(示例:insert→INSERT,update→UPDATE,delete→DELETE)
|
||||
private String getOperateType(String methodName) {
|
||||
if (methodName.startsWith("insert")) return "INSERT";
|
||||
if (methodName.startsWith("stop")) return "STOP";
|
||||
if (methodName.startsWith("update") || methodName.startsWith("stop")) return "UPDATE";
|
||||
if (methodName.startsWith("delete")) return "DELETE";
|
||||
return "UNKNOWN";
|
||||
}
|
||||
|
||||
// 从方法参数提取数据(示例:若参数是实体类,转成Map)
|
||||
private Map<String, Object> extractDataFromParams(Object[] args) {
|
||||
// 实际需反射获取实体类的字段和值(如id、name等)
|
||||
Map<String, Object> data = new HashMap<>();
|
||||
if (args == null || args.length == 0) {
|
||||
return data;
|
||||
}
|
||||
|
||||
for (int i = 0; i < args.length; i++) {
|
||||
Object arg = args[i];
|
||||
if (arg == null) {
|
||||
continue; // 跳过null参数
|
||||
}
|
||||
|
||||
// 处理基本类型/包装类/字符串(直接作为值存入,key为"param0"、"param1"等)
|
||||
if (isBasicType(arg.getClass())) {
|
||||
String key = "param" + i; // 基本类型参数用"param0"、"param1"作为key
|
||||
data.put(key, arg);
|
||||
} else {
|
||||
Map<String, Object> beanMap = beanToMap(arg);
|
||||
data.putAll(beanMap); // 合并实体类的字段到结果Map
|
||||
}
|
||||
}
|
||||
|
||||
return data;
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否为基本类型或包装类或字符串
|
||||
*/
|
||||
private boolean isBasicType(Class<?> clazz) {
|
||||
return clazz.isPrimitive() // 基本类型(int、long、boolean等)
|
||||
|| clazz == String.class // 字符串
|
||||
|| Number.class.isAssignableFrom(clazz) // 数字包装类(Integer、Long等)
|
||||
|| clazz == Boolean.class; // 布尔包装类
|
||||
}
|
||||
|
||||
/**
|
||||
* 将实体类转换为Map(字段名为key,字段值为value)
|
||||
*/
|
||||
private Map<String, Object> beanToMap(Object bean) {
|
||||
Map<String, Object> map = new HashMap<>();
|
||||
if (bean == null) {
|
||||
return map;
|
||||
}
|
||||
|
||||
// 方式1:使用BeanMap(简洁高效)
|
||||
BeanMap beanMap = BeanMap.create(bean);
|
||||
for (Object key : beanMap.keySet()) {
|
||||
map.put(key.toString(), beanMap.get(key));
|
||||
}
|
||||
|
||||
return map;
|
||||
}
|
||||
|
||||
// 在方法中转换
|
||||
public String convertEntityToJson(EmsStrategyRunning insertEntity) throws Exception {
|
||||
if (insertEntity == null) {
|
||||
return null; // 空对象返回空JSON
|
||||
}
|
||||
|
||||
// 将实体类转换为JSON字符串
|
||||
return objectMapper.writeValueAsString(insertEntity);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@ -0,0 +1,231 @@
|
||||
package com.xzzn.framework.aspectj;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.xzzn.common.utils.StringUtils;
|
||||
import com.xzzn.common.utils.bean.BeanUtils;
|
||||
import com.xzzn.ems.domain.*;
|
||||
import com.xzzn.ems.domain.vo.SyncStrategyTimeConfigVo;
|
||||
import com.xzzn.ems.mapper.*;
|
||||
import com.xzzn.framework.web.service.MqttPublisher;
|
||||
import org.apache.juli.logging.Log;
|
||||
import org.apache.juli.logging.LogFactory;
|
||||
import org.aspectj.lang.JoinPoint;
|
||||
import org.aspectj.lang.annotation.AfterReturning;
|
||||
import org.aspectj.lang.annotation.Aspect;
|
||||
import org.aspectj.lang.annotation.Pointcut;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cglib.beans.BeanMap;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* 策略时间配置同步
|
||||
* 云端 - 本地
|
||||
*/
|
||||
@Aspect
|
||||
@Component
|
||||
public class StrategyTimeConfigSyncAspect {
|
||||
|
||||
private static final Log logger = LogFactory.getLog(StrategyTimeConfigSyncAspect.class);
|
||||
@Autowired
|
||||
private MqttPublisher mqttPublisher;
|
||||
|
||||
private static final ObjectMapper objectMapper = new ObjectMapper();
|
||||
private static final String MQTT_TOPIC = "EMS_STRATEGY_UP";
|
||||
private static final String TABLE_NAME = "ems_strategy_temp";
|
||||
@Autowired
|
||||
private EmsMqttTopicConfigMapper emsMqttTopicConfigMapper;
|
||||
@Autowired
|
||||
private MqttSyncLogMapper mqttSyncLogMapper;
|
||||
@Autowired
|
||||
private EmsStrategyMapper emsStrategyMapper;
|
||||
@Autowired
|
||||
private EmsStrategyTempMapper emsStrategyTempMapper;
|
||||
|
||||
@Pointcut("(execution(* com.xzzn.ems.mapper.EmsStrategyTimeConfigMapper.insertEmsStrategyTimeConfig(..)) && args(insertEntity)) ")
|
||||
public void insertPointCut(EmsStrategyTimeConfig insertEntity) {
|
||||
logger.info("【新增策略模版时间配置切面】StrategyTimeConfigSyncAspect 被实例化");
|
||||
}
|
||||
@Pointcut("(execution(* com.xzzn.ems.mapper.EmsStrategyTimeConfigMapper.updateEmsStrategyTimeConfig(..)) && args(updateEntity)) ")
|
||||
public void updatePointCut(EmsStrategyTimeConfig updateEntity) {
|
||||
logger.info("【更新策略模版时间配置切面】StrategyTimeConfigSyncAspect 被实例化");
|
||||
}
|
||||
|
||||
// 方法执行成功后发布同步消息
|
||||
@AfterReturning(pointcut = "insertPointCut(insertEntity)", returning = "result")
|
||||
public void afterInsert(JoinPoint joinPoint, EmsStrategyTimeConfig insertEntity, Integer result) {
|
||||
logger.info("【新增策略模版时间切面进入成功】");
|
||||
if (result == 0 || insertEntity == null) {
|
||||
return;
|
||||
}
|
||||
// 校验是否配置监听topic-监听则不发布
|
||||
String topic = emsMqttTopicConfigMapper.checkTopicIsExist(MQTT_TOPIC);
|
||||
if (!StringUtils.isEmpty(topic)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 解析方法名,获取操作类型(INSERT/UPDATE/DELETE)和表名
|
||||
String methodName = joinPoint.getSignature().getName();
|
||||
String operateType = getOperateType(methodName);
|
||||
String siteId = insertEntity.getSiteId();
|
||||
|
||||
// 构建日志同步消息
|
||||
MqttSyncLog message = createMessageObject(operateType, siteId);
|
||||
|
||||
try {
|
||||
// 校验策略id是否存在
|
||||
Long strategyId = insertEntity.getStrategyId();
|
||||
if (strategyId == null) {
|
||||
return;
|
||||
}
|
||||
// 数据转换
|
||||
SyncStrategyTimeConfigVo timeConfigVo = convertEntity(insertEntity);
|
||||
String content = objectMapper.writeValueAsString(timeConfigVo);
|
||||
message.setContent(content);
|
||||
|
||||
// 发布到MQTT主题
|
||||
mqttPublisher.publish(MQTT_TOPIC, objectMapper.writeValueAsString(message), 1);
|
||||
} catch (Exception e) {
|
||||
message.setStatus("FAIL");
|
||||
message.setErrorMsg(e.getMessage());
|
||||
}
|
||||
// 存储同步信息
|
||||
mqttSyncLogMapper.insertMqttSyncLog(message);
|
||||
}
|
||||
private SyncStrategyTimeConfigVo convertEntity(EmsStrategyTimeConfig insertEntity) {
|
||||
SyncStrategyTimeConfigVo timeConfigVo = new SyncStrategyTimeConfigVo();
|
||||
BeanUtils.copyProperties(insertEntity, timeConfigVo);
|
||||
EmsStrategy strategy = emsStrategyMapper.selectEmsStrategyById(insertEntity.getStrategyId());
|
||||
if (strategy != null) {
|
||||
timeConfigVo.setStrategyName(strategy.getStrategyName());
|
||||
timeConfigVo.setStrategyType(strategy.getStrategyType());
|
||||
}
|
||||
return timeConfigVo;
|
||||
}
|
||||
|
||||
@AfterReturning(pointcut = "updatePointCut(updateEntity)", returning = "result")
|
||||
public void afterUpdate(JoinPoint joinPoint, EmsStrategyTimeConfig updateEntity, Integer result) {
|
||||
logger.info("【删除策略模版切面进入成功】");
|
||||
if (result == 0 || updateEntity == null) {
|
||||
return;
|
||||
}
|
||||
// 校验是否配置监听topic-监听则不发布
|
||||
String topic = emsMqttTopicConfigMapper.checkTopicIsExist(MQTT_TOPIC);
|
||||
if (!StringUtils.isEmpty(topic)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 解析方法名,获取操作类型(INSERT/UPDATE/DELETE)和表名
|
||||
String methodName = joinPoint.getSignature().getName();
|
||||
String operateType = getOperateType(methodName);
|
||||
String siteId = updateEntity.getSiteId();
|
||||
|
||||
// 构建日志同步消息
|
||||
MqttSyncLog message = createMessageObject(operateType, siteId);
|
||||
|
||||
try {
|
||||
// 数据转换
|
||||
SyncStrategyTimeConfigVo timeConfigVo = convertEntity(updateEntity);
|
||||
String content = objectMapper.writeValueAsString(timeConfigVo);
|
||||
message.setContent(content);
|
||||
|
||||
// 发布到MQTT主题
|
||||
mqttPublisher.publish(MQTT_TOPIC, objectMapper.writeValueAsString(message), 1);
|
||||
} catch (Exception e) {
|
||||
message.setStatus("FAIL");
|
||||
message.setErrorMsg(e.getMessage());
|
||||
}
|
||||
// 存储同步信息
|
||||
mqttSyncLogMapper.insertMqttSyncLog(message);
|
||||
}
|
||||
|
||||
// 构建同步信息
|
||||
private MqttSyncLog createMessageObject(String operateType, String siteId) {
|
||||
MqttSyncLog message = new MqttSyncLog();
|
||||
message.setSyncId(UUID.randomUUID().toString());
|
||||
message.setOperateType(operateType);
|
||||
message.setTableName(TABLE_NAME);
|
||||
message.setCreateTime(new Date());
|
||||
message.setTopic(MQTT_TOPIC);
|
||||
message.setStatus("SUCCESS");
|
||||
message.setSyncObject("CLOUD");
|
||||
message.setTarget(siteId);
|
||||
return message;
|
||||
}
|
||||
// 从方法名判断操作类型(示例:insert→INSERT,update→UPDATE,delete→DELETE)
|
||||
private String getOperateType(String methodName) {
|
||||
if (methodName.startsWith("insert")) return "INSERT";
|
||||
if (methodName.startsWith("stop")) return "STOP";
|
||||
if (methodName.startsWith("update") || methodName.startsWith("stop")) return "UPDATE";
|
||||
if (methodName.startsWith("delete")) return "DELETE";
|
||||
return "UNKNOWN";
|
||||
}
|
||||
|
||||
// 从方法参数提取数据(示例:若参数是实体类,转成Map)
|
||||
private Map<String, Object> extractDataFromParams(Object[] args) {
|
||||
// 实际需反射获取实体类的字段和值(如id、name等)
|
||||
Map<String, Object> data = new HashMap<>();
|
||||
if (args == null || args.length == 0) {
|
||||
return data;
|
||||
}
|
||||
|
||||
for (int i = 0; i < args.length; i++) {
|
||||
Object arg = args[i];
|
||||
if (arg == null) {
|
||||
continue; // 跳过null参数
|
||||
}
|
||||
|
||||
// 处理基本类型/包装类/字符串(直接作为值存入,key为"param0"、"param1"等)
|
||||
if (isBasicType(arg.getClass())) {
|
||||
String key = "param" + i; // 基本类型参数用"param0"、"param1"作为key
|
||||
data.put(key, arg);
|
||||
} else {
|
||||
Map<String, Object> beanMap = beanToMap(arg);
|
||||
data.putAll(beanMap); // 合并实体类的字段到结果Map
|
||||
}
|
||||
}
|
||||
|
||||
return data;
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否为基本类型或包装类或字符串
|
||||
*/
|
||||
private boolean isBasicType(Class<?> clazz) {
|
||||
return clazz.isPrimitive() // 基本类型(int、long、boolean等)
|
||||
|| clazz == String.class // 字符串
|
||||
|| Number.class.isAssignableFrom(clazz) // 数字包装类(Integer、Long等)
|
||||
|| clazz == Boolean.class; // 布尔包装类
|
||||
}
|
||||
|
||||
/**
|
||||
* 将实体类转换为Map(字段名为key,字段值为value)
|
||||
*/
|
||||
private Map<String, Object> beanToMap(Object bean) {
|
||||
Map<String, Object> map = new HashMap<>();
|
||||
if (bean == null) {
|
||||
return map;
|
||||
}
|
||||
|
||||
// 方式1:使用BeanMap(简洁高效)
|
||||
BeanMap beanMap = BeanMap.create(bean);
|
||||
for (Object key : beanMap.keySet()) {
|
||||
map.put(key.toString(), beanMap.get(key));
|
||||
}
|
||||
|
||||
return map;
|
||||
}
|
||||
|
||||
// 在方法中转换
|
||||
public String convertEntityToJson(EmsStrategyRunning insertEntity) throws Exception {
|
||||
if (insertEntity == null) {
|
||||
return null; // 空对象返回空JSON
|
||||
}
|
||||
|
||||
// 将实体类转换为JSON字符串
|
||||
return objectMapper.writeValueAsString(insertEntity);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user