【验收】6-保护方案告警同步云端&& 【验收】7-下发操作日志同步云端
This commit is contained in:
@ -0,0 +1,104 @@
|
||||
package com.xzzn.framework.aspectj;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.xzzn.common.utils.StringUtils;
|
||||
import com.xzzn.ems.domain.EmsAlarmRecords;
|
||||
import com.xzzn.ems.domain.MqttSyncLog;
|
||||
import com.xzzn.ems.mapper.EmsMqttTopicConfigMapper;
|
||||
import com.xzzn.ems.mapper.MqttSyncLogMapper;
|
||||
import com.xzzn.framework.web.service.MqttPublisher;
|
||||
import org.apache.juli.logging.Log;
|
||||
import org.apache.juli.logging.LogFactory;
|
||||
import org.aspectj.lang.JoinPoint;
|
||||
import org.aspectj.lang.annotation.AfterReturning;
|
||||
import org.aspectj.lang.annotation.Aspect;
|
||||
import org.aspectj.lang.annotation.Pointcut;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* 本地保护告警信息同步云端
|
||||
* 本地 - 云端
|
||||
* @author xzzn
|
||||
*/
|
||||
@Aspect
|
||||
@Component
|
||||
public class FaultPlanAlarmAspect
|
||||
{
|
||||
|
||||
private static final Log logger = LogFactory.getLog(FaultPlanAlarmAspect.class);
|
||||
|
||||
private static final ObjectMapper objectMapper = new ObjectMapper();
|
||||
@Autowired
|
||||
private MqttSyncLogMapper mqttSyncLogMapper;
|
||||
@Autowired
|
||||
private MqttPublisher mqttPublisher;
|
||||
@Autowired
|
||||
private EmsMqttTopicConfigMapper emsMqttTopicConfigMapper;
|
||||
|
||||
private static final String MQTT_TOPIC = "FAULT_ALARM_RECORD_UP";
|
||||
private static final String TABLE_NAME = "ems_alarm_records";
|
||||
// 切入点:拦截所有添加了@SyncAfterInsert注解的方法
|
||||
@Pointcut("@annotation(com.xzzn.common.annotation.SyncAfterInsert)")
|
||||
public void syncInsertPointcut() {}
|
||||
|
||||
// 方法执行成功后同步数据
|
||||
@AfterReturning(pointcut = "syncInsertPointcut()", returning = "result")
|
||||
public void afterInsert(JoinPoint joinPoint, Integer result) {
|
||||
logger.info("【新增保护策略告警数据切面进入成功】");
|
||||
if (result == 0) {
|
||||
return;
|
||||
}
|
||||
// 校验是否配置监听topic-监听则不发布
|
||||
String topic = emsMqttTopicConfigMapper.checkTopicIsExist(MQTT_TOPIC);
|
||||
if (!StringUtils.isEmpty(topic)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 获取参数中的EmsAlarmRecords对象
|
||||
Object[] args = joinPoint.getArgs();
|
||||
EmsAlarmRecords alarmRecords = null;
|
||||
for (Object arg : args) {
|
||||
if (arg instanceof EmsAlarmRecords) {
|
||||
alarmRecords = (EmsAlarmRecords) arg;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (alarmRecords == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 构建日志同步消息
|
||||
MqttSyncLog message = createMessageObject("INSERT", alarmRecords.getSiteId());
|
||||
try {
|
||||
// 同步内容
|
||||
String content = objectMapper.writeValueAsString(alarmRecords);
|
||||
message.setContent(content);
|
||||
|
||||
// 发布到MQTT主题-同步到云端
|
||||
mqttPublisher.publish(MQTT_TOPIC, objectMapper.writeValueAsString(message), 1);
|
||||
} catch (Exception e) {
|
||||
message.setStatus("FAIL");
|
||||
message.setErrorMsg(e.getMessage());
|
||||
}
|
||||
// 存储同步信息
|
||||
mqttSyncLogMapper.insertMqttSyncLog(message);
|
||||
}
|
||||
|
||||
// 构建同步信息
|
||||
private MqttSyncLog createMessageObject(String operateType, String siteId) {
|
||||
MqttSyncLog message = new MqttSyncLog();
|
||||
message.setSyncId(UUID.randomUUID().toString());
|
||||
message.setOperateType(operateType);
|
||||
message.setTableName(TABLE_NAME);
|
||||
message.setCreateTime(new Date());
|
||||
message.setTopic(MQTT_TOPIC);
|
||||
message.setStatus("SUCCESS");
|
||||
message.setSyncObject(siteId);
|
||||
message.setTarget("CLOUD");
|
||||
return message;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,98 @@
|
||||
package com.xzzn.framework.aspectj;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.xzzn.common.utils.StringUtils;
|
||||
import com.xzzn.ems.domain.EmsFaultIssueLog;
|
||||
import com.xzzn.ems.domain.MqttSyncLog;
|
||||
import com.xzzn.ems.mapper.EmsMqttTopicConfigMapper;
|
||||
import com.xzzn.ems.mapper.MqttSyncLogMapper;
|
||||
import com.xzzn.framework.web.service.MqttPublisher;
|
||||
import org.apache.juli.logging.Log;
|
||||
import org.apache.juli.logging.LogFactory;
|
||||
import org.aspectj.lang.JoinPoint;
|
||||
import org.aspectj.lang.annotation.AfterReturning;
|
||||
import org.aspectj.lang.annotation.Aspect;
|
||||
import org.aspectj.lang.annotation.Pointcut;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* 保护告警方案下发日志同步
|
||||
* 本地 - 云端
|
||||
*/
|
||||
@Aspect
|
||||
@Component
|
||||
public class FaultPlanIssueAspect {
|
||||
|
||||
private static final Log logger = LogFactory.getLog(FaultPlanIssueAspect.class);
|
||||
@Autowired
|
||||
private MqttPublisher mqttPublisher;
|
||||
|
||||
private static final ObjectMapper objectMapper = new ObjectMapper();
|
||||
private static final String MQTT_TOPIC = "FAULT_PLAN_ISSUE_UP";
|
||||
private static final String TABLE_NAME = "ems_fault_issue_log";
|
||||
@Autowired
|
||||
private EmsMqttTopicConfigMapper emsMqttTopicConfigMapper;
|
||||
@Autowired
|
||||
private MqttSyncLogMapper mqttSyncLogMapper;
|
||||
|
||||
// 定义切点:拦截策略相关表的Mapper方法
|
||||
@Pointcut("(execution(* com.xzzn.ems.mapper.EmsFaultIssueLogMapper.insertEmsFaultIssueLog(..)) && args(insertEntity)) ")
|
||||
public void insertPointCut(EmsFaultIssueLog insertEntity) {
|
||||
logger.info("【新增保护告警策略下发数据】FaultPlanIssueAspect 实例化");
|
||||
}
|
||||
|
||||
// 方法执行成功后发布同步消息
|
||||
@AfterReturning(pointcut = "insertPointCut(insertEntity)", returning = "result")
|
||||
public void afterInsert(JoinPoint joinPoint, EmsFaultIssueLog insertEntity, Integer result) {
|
||||
logger.info("【新增保护告警策略下发数据切面进入成功】");
|
||||
if (result == 0 || insertEntity == null) {
|
||||
return;
|
||||
}
|
||||
// 校验是否配置监听topic-监听则不发布
|
||||
String topic = emsMqttTopicConfigMapper.checkTopicIsExist(MQTT_TOPIC);
|
||||
if (!StringUtils.isEmpty(topic)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 解析方法名,获取操作类型(INSERT/UPDATE/DELETE)和表名
|
||||
String operateType = "INSERT";
|
||||
String siteId = insertEntity.getSiteId();
|
||||
|
||||
// 构建日志同步消息
|
||||
MqttSyncLog message = createMessageObject(operateType,siteId);
|
||||
|
||||
try {
|
||||
// 数据转换
|
||||
String content = objectMapper.writeValueAsString(insertEntity);
|
||||
message.setContent(content);
|
||||
|
||||
// mqtt同步到云端
|
||||
mqttPublisher.publish(MQTT_TOPIC, objectMapper.writeValueAsString(message), 1);
|
||||
} catch (Exception e) {
|
||||
message.setStatus("FAIL");
|
||||
message.setErrorMsg(e.getMessage());
|
||||
}
|
||||
// 存储同步信息
|
||||
mqttSyncLogMapper.insertMqttSyncLog(message);
|
||||
}
|
||||
|
||||
// 构建同步信息
|
||||
private MqttSyncLog createMessageObject(String operateType, String siteId) {
|
||||
MqttSyncLog message = new MqttSyncLog();
|
||||
message.setSyncId(UUID.randomUUID().toString());
|
||||
message.setOperateType(operateType);
|
||||
message.setTableName(TABLE_NAME);
|
||||
message.setCreateTime(new Date());
|
||||
message.setTopic(MQTT_TOPIC);
|
||||
message.setStatus("SUCCESS");
|
||||
message.setSyncObject(siteId);
|
||||
message.setTarget("CLOUD");
|
||||
return message;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user