diff --git a/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java b/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java index a68f394..ec1d846 100644 --- a/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java +++ b/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java @@ -1,23 +1,25 @@ package com.xzzn.web.controller.ems; -import com.xzzn.ems.service.IDDSDataProcessService; -import com.xzzn.ems.service.IEmsMqttMessageService; -import com.xzzn.ems.service.IFXXAlarmDataProcessService; -import com.xzzn.ems.service.IFXXDataProcessService; +import com.alibaba.fastjson2.JSON; +import com.xzzn.common.utils.StringUtils; +import com.xzzn.ems.domain.EmsMqttTopicConfig; +import com.xzzn.ems.domain.MqttSyncStrategyLog; +import com.xzzn.ems.mapper.EmsMqttTopicConfigMapper; +import com.xzzn.ems.service.*; import com.xzzn.framework.manager.MqttLifecycleManager; import com.xzzn.framework.web.service.MqttPublisher; import com.xzzn.framework.web.service.MqttSubscriber; +import org.apache.commons.collections4.CollectionUtils; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; import org.eclipse.paho.client.mqttv3.IMqttMessageListener; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; -import java.util.Date; +import java.util.List; @Service public class MqttMessageController implements MqttPublisher, MqttSubscriber { @@ -39,6 +41,11 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber { @Autowired private IFXXAlarmDataProcessService fXXAlarmDataProcessService; + @Autowired + private EmsMqttTopicConfigMapper emsMqttTopicConfigMapper; + @Autowired + private IEmsStrategyService emsStrategyService; + @Autowired public MqttMessageController(MqttLifecycleManager mqttLifecycleManager) { this.mqttLifecycleManager = mqttLifecycleManager; @@ -46,8 +53,31 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber { @PostConstruct public void init() { + List topicConfigList = emsMqttTopicConfigMapper.selectEmsMqttTopicConfigList(null); + + if (CollectionUtils.isEmpty (topicConfigList)) { + log.info ("未查询到任何 MQTT 主题配置,跳过订阅"); + return; + } + + for (EmsMqttTopicConfig topicConfig : topicConfigList) { + String topic = topicConfig.getMqttTopic(); + if (StringUtils.isEmpty(topic)) { + log.info ("主题配置 ID:" +topicConfig.getId() +"的 mqttTopic 为空,跳过订阅"); + continue; + } + + int qos = topicConfig.getQos() == null ? 1 : topicConfig.getQos(); + if (qos < 0 || qos > 2) { + log.info ("主题:" + topic +"的 QoS值"+ qos + "不合法,自动调整为1"); + qos = 1; + } + + IMqttMessageListener listener = getMqttListenerByTopic(topic, topicConfig.getId()); + subscribe(topic, qos, listener); + } // 订阅奉贤系统状态主题 - subscribe("021_FXX_01_UP", 1, this::handleDeviceData); + /*subscribe("021_FXX_01_UP", 1, this::handleDeviceData); subscribe("021_FXX_01_RECALL", 1, this::handleDeviceData); subscribe("021_FXX_01_DOWN", 1, this::handleDeviceData); subscribe("021_FXX_01", 1, this::handleSystemStatus); @@ -57,10 +87,22 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber { subscribe("021_DDS_01_UP", 1, this::handleDeviceData); subscribe("021_DDS_01_RECALL", 1, this::handleDeviceData); subscribe("021_DDS_01_DOWN", 1, this::handleDeviceData); - subscribe("021_DDS_01", 1, this::handleSystemStatus); + subscribe("021_DDS_01", 1, this::handleSystemStatus);*/ } + private IMqttMessageListener getMqttListenerByTopic(String topic, Long id) { + if (topic.contains("ALARM_UP")) { + return this::handleAlarmData; + } else if (topic.endsWith("_01")) { + return this::handleSystemStatus; + } else if (topic.contains("STRATEGY")) { + return this::handleStrategyData; + } else { + return this::handleDeviceData; + } + } + // 处理系统状态消息 private void handleSystemStatus(String topic, MqttMessage message) { String payload = new String(message.getPayload()); @@ -109,6 +151,29 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber { } } + + // 处理运行策略数据 + private void handleStrategyData(String topic, MqttMessage message) { + String payload = new String(message.getPayload()); + System.out.println("[处理运行策略数据] data: " + payload); + try { + // 业务处理逻辑 + MqttSyncStrategyLog strategyLog = JSON.parseObject(payload, MqttSyncStrategyLog.class); + if (strategyLog != null) { + String content = strategyLog.getContent(); + // 根据不同操作更新 + String operateType = strategyLog.getOperateType(); + if (!StringUtils.isEmpty(operateType)) { + emsStrategyService.dealStrategyData(content,operateType); + } + } + emsMqttMessageService.insertMqttOriginalMessage(topic,payload); + } catch (Exception e) { + log.error("Failed to process system status message: " + e.getMessage(), e); + } + + } + @Override public void publish(String topic, String message) throws MqttException { mqttLifecycleManager.publish(topic, message, 0); @@ -136,12 +201,4 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber { } } - - /*private static final long FORCE_INTERVAL = 60 * 1000; // 1分钟(毫秒) - @Scheduled(fixedRate = FORCE_INTERVAL) // 每分钟执行一次 - public void scheduledForceSave() { - System.out.println("执行定时强制存储任务:" + new Date()); - - }*/ - } \ No newline at end of file diff --git a/ems-system/src/main/java/com/xzzn/ems/domain/EmsMqttTopicConfig.java b/ems-system/src/main/java/com/xzzn/ems/domain/EmsMqttTopicConfig.java index 0698b1b..33b36a2 100644 --- a/ems-system/src/main/java/com/xzzn/ems/domain/EmsMqttTopicConfig.java +++ b/ems-system/src/main/java/com/xzzn/ems/domain/EmsMqttTopicConfig.java @@ -14,7 +14,6 @@ import com.xzzn.common.annotation.Excel; public class EmsMqttTopicConfig extends BaseEntity { private static final long serialVersionUID = 1L; - /** 主键 */ private Long id; @@ -22,13 +21,21 @@ public class EmsMqttTopicConfig extends BaseEntity @Excel(name = "订阅topic") private String mqttTopic; + /** QoS等级(0/1/2) */ + @Excel(name = "QoS等级", readConverterExp = "0=/1/2") + private Integer qos; + /** topic描述 */ @Excel(name = "topic描述") private String topicName; /** 对应方法 */ @Excel(name = "对应方法") - private String method; + private String handleMethod; + + /** 处理器类型:(device=设备数据,system=系统状态,alarm=告警数据等) */ + @Excel(name = "处理器类型:(device=设备数据,system=系统状态,alarm=告警数据等)") + private String handleType; /** 站点id */ @Excel(name = "站点id") @@ -54,6 +61,16 @@ public class EmsMqttTopicConfig extends BaseEntity return mqttTopic; } + public void setQos(Integer qos) + { + this.qos = qos; + } + + public Integer getQos() + { + return qos; + } + public void setTopicName(String topicName) { this.topicName = topicName; @@ -64,14 +81,24 @@ public class EmsMqttTopicConfig extends BaseEntity return topicName; } - public void setMethod(String method) + public void setHandleMethod(String handleMethod) { - this.method = method; + this.handleMethod = handleMethod; } - public String getMethod() + public String getHandleMethod() { - return method; + return handleMethod; + } + + public void setHandleType(String handleType) + { + this.handleType = handleType; + } + + public String getHandleType() + { + return handleType; } public void setSiteId(String siteId) @@ -89,8 +116,10 @@ public class EmsMqttTopicConfig extends BaseEntity return new ToStringBuilder(this,ToStringStyle.MULTI_LINE_STYLE) .append("id", getId()) .append("mqttTopic", getMqttTopic()) + .append("qos", getQos()) .append("topicName", getTopicName()) - .append("method", getMethod()) + .append("handleMethod", getHandleMethod()) + .append("handleType", getHandleType()) .append("siteId", getSiteId()) .append("createBy", getCreateBy()) .append("createTime", getCreateTime()) diff --git a/ems-system/src/main/java/com/xzzn/ems/mapper/EmsMqttTopicConfigMapper.java b/ems-system/src/main/java/com/xzzn/ems/mapper/EmsMqttTopicConfigMapper.java index 0e8c019..10a9fa0 100644 --- a/ems-system/src/main/java/com/xzzn/ems/mapper/EmsMqttTopicConfigMapper.java +++ b/ems-system/src/main/java/com/xzzn/ems/mapper/EmsMqttTopicConfigMapper.java @@ -58,4 +58,7 @@ public interface EmsMqttTopicConfigMapper * @return 结果 */ public int deleteEmsMqttTopicConfigByIds(Long[] ids); + + // 判断topic是否存在 + public String checkTopicIsExist(String strategyTopic); } diff --git a/ems-system/src/main/resources/mapper/ems/EmsMqttTopicConfigMapper.xml b/ems-system/src/main/resources/mapper/ems/EmsMqttTopicConfigMapper.xml index 4e9d96f..dd0ab8d 100644 --- a/ems-system/src/main/resources/mapper/ems/EmsMqttTopicConfigMapper.xml +++ b/ems-system/src/main/resources/mapper/ems/EmsMqttTopicConfigMapper.xml @@ -7,8 +7,10 @@ + - + + @@ -17,15 +19,17 @@ - select id, mqtt_topic, topic_name, method, siteId, create_by, create_time, update_by, update_time from ems_mqtt_topic_config + select id, mqtt_topic, qos, topic_name, handle_method, handle_type, siteId, create_by, create_time, update_by, update_time from ems_mqtt_topic_config @@ -39,8 +43,10 @@ insert into ems_mqtt_topic_config mqtt_topic, + qos, topic_name, - method, + handle_method, + handle_type, siteId, create_by, create_time, @@ -49,8 +55,10 @@ #{mqttTopic}, + #{qos}, #{topicName}, - #{method}, + #{handleMethod}, + #{handleType}, #{siteId}, #{createBy}, #{createTime}, @@ -63,8 +71,10 @@ update ems_mqtt_topic_config mqtt_topic = #{mqttTopic}, + qos = #{qos}, topic_name = #{topicName}, - method = #{method}, + handle_method = #{handleMethod}, + handle_type = #{handleType}, siteId = #{siteId}, create_by = #{createBy}, create_time = #{createTime}, @@ -84,4 +94,10 @@ #{id} + + \ No newline at end of file