同步日志&监听处理

This commit is contained in:
2025-11-17 02:47:43 +08:00
parent aab2b1d94e
commit 3753075b10
5 changed files with 232 additions and 31 deletions

View File

@ -100,8 +100,14 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber {
return this::handleSystemStatus;
} else if (topic.contains("STRATEGY")) {
return this::handleStrategyData;
} else if (topic.contains("PROTECTION_PLAN")) {
} else if (topic.equals("FAULT_PROTECTION_PLAN_UP")) {
return this::handleFaultProtPlanData;
} else if (topic.equals("FAULT_ALARM_RECORD_UP")) {
return this::handleFaultAlarmData;
} else if (topic.equals("FAULT_PLAN_ISSUE_UP")) {
return this::handleFaultPlanIssueData;
} else if (topic.equals("DEVICE_CHANGE_LOG_UP")) {
return this::handleDeviceChangeLogData;
} else {
return this::handleDeviceData;
}
@ -156,7 +162,7 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber {
}
// 处理运行策略数据
// 处理运行策略数据:云端-本地
private void handleStrategyData(String topic, MqttMessage message) {
String payload = new String(message.getPayload());
System.out.println("[处理运行策略数据] data: " + payload);
@ -170,20 +176,59 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber {
}
}
// 处理设备保护告警策略数据
// 处理设备保护告警策略数据:云端-本地
private void handleFaultProtPlanData(String topic, MqttMessage message) {
String payload = new String(message.getPayload());
System.out.println("[处理设备保护告警策略数据] data: " + payload);
try {
// 业务处理逻辑
MqttSyncLog planLog = JSON.parseObject(payload, MqttSyncLog.class);
if (planLog != null) {
iMqttSyncLogService.handleMqttPlanData(planLog);
}
iMqttSyncLogService.handleMqttPlanData(payload);
emsMqttMessageService.insertMqttOriginalMessage(topic,payload);
} catch (Exception e) {
log.error("Failed to process strategy data message: " + e.getMessage(), e);
log.error("Failed to process fault plan data message: " + e.getMessage(), e);
}
}
// 处理保护策略告警信息:本地-云端
private void handleFaultAlarmData(String topic, MqttMessage message) {
String payload = new String(message.getPayload());
System.out.println("[处理本地保护策略告警信息到云端] data: " + payload);
try {
// 业务处理逻辑
iMqttSyncLogService.handleFaultAlarmData(payload);
emsMqttMessageService.insertMqttOriginalMessage(topic,payload);
} catch (Exception e) {
log.error("Failed to process fault plan alarm data message: " + e.getMessage(), e);
}
}
// 处理保护策略下发日志:本地-云端
private void handleFaultPlanIssueData(String topic, MqttMessage message) {
String payload = new String(message.getPayload());
System.out.println("[处理本地保护策略下发日志到云端] data: " + payload);
try {
// 业务处理逻辑
iMqttSyncLogService.handleFaultPlanIssueData(payload);
emsMqttMessageService.insertMqttOriginalMessage(topic,payload);
} catch (Exception e) {
log.error("Failed to process fault plan issue log message: " + e.getMessage(), e);
}
}
// 处理设备状态变更日志:本地-云端
private void handleDeviceChangeLogData(String topic, MqttMessage message) {
String payload = new String(message.getPayload());
System.out.println("[处理本地的保护策略告警信息到云端] data: " + payload);
try {
// 业务处理逻辑
iMqttSyncLogService.handleDeviceChangeLogData(payload);
emsMqttMessageService.insertMqttOriginalMessage(topic,payload);
} catch (Exception e) {
log.error("Failed to process device change log message: " + e.getMessage(), e);
}
}

View File

@ -46,6 +46,10 @@ public class MqttSyncLog extends BaseEntity
@Excel(name = "失败原因", readConverterExp = "s=tatus=FAIL时填写")
private String errorMsg;
/** 同步对象 */
@Excel(name = "同步对象")
private String syncObject;
/** 同步目标 */
@Excel(name = "同步目标")
private String target;
@ -130,6 +134,16 @@ public class MqttSyncLog extends BaseEntity
return errorMsg;
}
public void setSyncObject(String syncObject)
{
this.syncObject = syncObject;
}
public String getSyncObject()
{
return syncObject;
}
public void setTarget(String target)
{
this.target = target;
@ -151,6 +165,7 @@ public class MqttSyncLog extends BaseEntity
.append("content", getContent())
.append("status", getStatus())
.append("errorMsg", getErrorMsg())
.append("syncObject", getSyncObject())
.append("target", getTarget())
.append("createTime", getCreateTime())
.toString();

View File

@ -62,5 +62,11 @@ public interface IMqttSyncLogService
// 处理策略信息
public void handleMqttStrategyData(String payload);
// 处理设备告警保护信息
public void handleMqttPlanData(MqttSyncLog planLog);
public void handleMqttPlanData(String payload);
// 处理设备保护策略触发的告警信息
public void handleFaultAlarmData(String payload);
// 处理保护策略告警信息
public void handleFaultPlanIssueData(String payload);
// 处理设备运行状态变更
public void handleDeviceChangeLogData(String payload);
}

View File

@ -5,13 +5,13 @@ import java.util.List;
import com.alibaba.fastjson2.JSON;
import com.xzzn.common.utils.DateUtils;
import com.xzzn.common.utils.StringUtils;
import com.xzzn.ems.service.IEmsFaultProtectionPlanService;
import com.xzzn.ems.service.IEmsStrategyService;
import com.xzzn.ems.domain.EmsSiteSetting;
import com.xzzn.ems.mapper.EmsSiteSettingMapper;
import com.xzzn.ems.service.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.xzzn.ems.mapper.MqttSyncLogMapper;
import com.xzzn.ems.domain.MqttSyncLog;
import com.xzzn.ems.service.IMqttSyncLogService;
/**
* MQTT云上本地同步日志Service业务层处理
@ -28,6 +28,18 @@ public class MqttSyncLogServiceImpl implements IMqttSyncLogService
private IEmsFaultProtectionPlanService emsFaultProtectionPlanService;
@Autowired
private IEmsStrategyService emsStrategyService;
@Autowired
private IEmsAlarmRecordsService emsAlarmRecordsService;
@Autowired
private EmsSiteSettingMapper emsSiteSettingMapper;
@Autowired
private IEmsFaultIssueLogService emsFaultIssueLogService;
@Autowired
private IEmsDeviceChangeLogService emsDeviceChangeLogService;
@Autowired
private IEmsStrategyTempService emsStrategyTempService;
@Autowired
private IEmsStrategyTimeConfigService emsStrategyTimeConfigService;
/**
* 查询MQTT云上本地同步日志
@ -102,12 +114,103 @@ public class MqttSyncLogServiceImpl implements IMqttSyncLogService
return mqttSyncLogMapper.deleteMqttSyncLogById(id);
}
// 校验同步目标是否该站点
private boolean checkIsSite(String target) {
if (StringUtils.isEmpty(target)) {
return false;
}
EmsSiteSetting emsSiteSetting = emsSiteSettingMapper.selectEmsSiteSettingBySiteId(target);
if (emsSiteSetting == null) {
return false;
}
return true;
}
/**
* 处理云上运行策略数据
* 云上-本地:校验是否该站点
* @param payload
*/
@Override
public void handleMqttStrategyData(String payload) {
MqttSyncLog syncLog = JSON.parseObject(payload, MqttSyncLog.class);
if (syncLog != null) {
// 校验是否该站点数据
if (!checkIsSite(syncLog.getTarget())) {
return;
}
// 校验是否存在
String syncId = syncLog.getSyncId();
MqttSyncLog existLog = mqttSyncLogMapper.selectMqttSyncLogBySyncId(syncId);
if (existLog == null) {
try {
// 根据不同操作更新
String operateType = syncLog.getOperateType();
if (!StringUtils.isEmpty(operateType)) {
// 不同表操作
String tableName = syncLog.getTableName();
switch (tableName) {
case "ems_strategy_running":
emsStrategyService.dealStrategyRunningData(syncLog.getContent(), operateType);
break;
case "ems_strategy_temp":
emsStrategyTempService.dealStrategyTempData(syncLog.getContent(), operateType);
break;
case "ems_strategy_time_config":
emsStrategyTimeConfigService.dealStrategyTimeData(syncLog.getContent(), operateType);
break;
default:
break;
}
}
} catch (Exception e) {
syncLog.setStatus("FAIL");
syncLog.setErrorMsg(e.getMessage());
}
// 保存日志
insertMqttSyncLog(syncLog);
}
}
}
/**
* 处理云上同步的告警保护信息
* 云上-本地:校验是否该站点
* @param payload
*/
@Override
public void handleMqttPlanData(String payload) {
MqttSyncLog planLog = JSON.parseObject(payload, MqttSyncLog.class);
if (planLog != null) {
// 校验是否该站点数据
if (!checkIsSite(planLog.getTarget())) {
return;
}
// 校验是否存在
String syncId = planLog.getSyncId();
MqttSyncLog existLog = mqttSyncLogMapper.selectMqttSyncLogBySyncId(syncId);
if (existLog == null) {
// 处理数据
String operateType = planLog.getOperateType();
if (!StringUtils.isEmpty(operateType)) {
emsFaultProtectionPlanService.dealSyncData(planLog.getContent(), operateType);
}
// 保存日志
insertMqttSyncLog(planLog);
}
}
}
/**
* 处理设备保护策略触发的告警信息
* 本地-云上
* @param payload
*/
@Override
public void handleFaultAlarmData(String payload) {
MqttSyncLog syncLog = JSON.parseObject(payload, MqttSyncLog.class);
if (syncLog != null) {
// 校验是否存在
@ -117,34 +220,61 @@ public class MqttSyncLogServiceImpl implements IMqttSyncLogService
// 根据不同操作更新
String operateType = syncLog.getOperateType();
if (!StringUtils.isEmpty(operateType)) {
emsStrategyService.dealStrategyData(syncLog.getContent(),operateType);
emsAlarmRecordsService.dealSyncData(syncLog.getContent(),operateType);
}
// 保存日志
insertMqttSyncLog(syncLog);
}
// 保存日志
insertMqttSyncLog(syncLog);
}
}
/**
* 处理云上同步的告警保护信息
* @param planLog
* 处理保护策略告警信息
* 本地-云上
* @param payload
*/
@Override
public void handleMqttPlanData(MqttSyncLog planLog) {
// 校验是否存在
String syncId = planLog.getSyncId();
MqttSyncLog existLog = mqttSyncLogMapper.selectMqttSyncLogBySyncId(syncId);
if (existLog != null) {
return;
}
// 保存日志
insertMqttSyncLog(planLog);
public void handleFaultPlanIssueData(String payload) {
MqttSyncLog syncLog = JSON.parseObject(payload, MqttSyncLog.class);
if (syncLog != null) {
// 校验是否存在
String syncId = syncLog.getSyncId();
MqttSyncLog existLog = mqttSyncLogMapper.selectMqttSyncLogBySyncId(syncId);
if (existLog == null) {
// 根据不同操作更新
String operateType = syncLog.getOperateType();
if (!StringUtils.isEmpty(operateType)) {
emsFaultIssueLogService.dealSyncData(syncLog.getContent(),operateType);
}
// 处理数据
String operateType = planLog.getOperateType();
if (!StringUtils.isEmpty(operateType)) {
emsFaultProtectionPlanService.dealSyncData(planLog.getContent(),operateType);
// 保存日志
insertMqttSyncLog(syncLog);
}
}
}
/**
* 处理设备运行状态变更日志
* 本地-云上
* @param payload
*/
@Override
public void handleDeviceChangeLogData(String payload) {
MqttSyncLog syncLog = JSON.parseObject(payload, MqttSyncLog.class);
if (syncLog == null) {
// 校验是否存在
String syncId = syncLog.getSyncId();
MqttSyncLog existLog = mqttSyncLogMapper.selectMqttSyncLogBySyncId(syncId);
if (existLog == null) {
// 根据不同操作更新
String operateType = syncLog.getOperateType();
if (!StringUtils.isEmpty(operateType)) {
emsDeviceChangeLogService.dealSyncData(syncLog.getContent(),operateType);
}
// 保存日志
insertMqttSyncLog(syncLog);
}
}
}
}

View File

@ -13,12 +13,13 @@
<result property="content" column="content" />
<result property="status" column="status" />
<result property="errorMsg" column="error_msg" />
<result property="syncObject" column="sync_object" />
<result property="target" column="target" />
<result property="createTime" column="create_time" />
</resultMap>
<sql id="selectMqttSyncLogVo">
select id, sync_id, topic, operate_type, table_name, content, status, error_msg, target, create_time from mqtt_sync_log
select id, sync_id, topic, operate_type, table_name, content, status, error_msg, sync_object, target, create_time from mqtt_sync_log
</sql>
<select id="selectMqttSyncLogList" parameterType="MqttSyncLog" resultMap="MqttSyncLogResult">
@ -31,6 +32,7 @@
<if test="content != null and content != ''"> and content = #{content}</if>
<if test="status != null and status != ''"> and status = #{status}</if>
<if test="errorMsg != null and errorMsg != ''"> and error_msg = #{errorMsg}</if>
<if test="syncObject != null and syncObject != ''"> and sync_object = #{syncObject}</if>
<if test="target != null and target != ''"> and target = #{target}</if>
</where>
</select>
@ -50,6 +52,7 @@
<if test="content != null">content,</if>
<if test="status != null and status != ''">status,</if>
<if test="errorMsg != null">error_msg,</if>
<if test="syncObject != null">sync_object,</if>
<if test="target != null">target,</if>
<if test="createTime != null">create_time,</if>
</trim>
@ -61,6 +64,7 @@
<if test="content != null">#{content},</if>
<if test="status != null and status != ''">#{status},</if>
<if test="errorMsg != null">#{errorMsg},</if>
<if test="syncObject != null">#{syncObject},</if>
<if test="target != null">#{target},</if>
<if test="createTime != null">#{createTime},</if>
</trim>
@ -76,6 +80,7 @@
<if test="content != null">content = #{content},</if>
<if test="status != null and status != ''">status = #{status},</if>
<if test="errorMsg != null">error_msg = #{errorMsg},</if>
<if test="syncObject != null">sync_object = #{syncObject},</if>
<if test="target != null">target = #{target},</if>
<if test="createTime != null">create_time = #{createTime},</if>
</trim>