mqtt配置修改
This commit is contained in:
@ -1,23 +1,25 @@
|
|||||||
package com.xzzn.web.controller.ems;
|
package com.xzzn.web.controller.ems;
|
||||||
|
|
||||||
import com.xzzn.ems.service.IDDSDataProcessService;
|
import com.alibaba.fastjson2.JSON;
|
||||||
import com.xzzn.ems.service.IEmsMqttMessageService;
|
import com.xzzn.common.utils.StringUtils;
|
||||||
import com.xzzn.ems.service.IFXXAlarmDataProcessService;
|
import com.xzzn.ems.domain.EmsMqttTopicConfig;
|
||||||
import com.xzzn.ems.service.IFXXDataProcessService;
|
import com.xzzn.ems.domain.MqttSyncStrategyLog;
|
||||||
|
import com.xzzn.ems.mapper.EmsMqttTopicConfigMapper;
|
||||||
|
import com.xzzn.ems.service.*;
|
||||||
import com.xzzn.framework.manager.MqttLifecycleManager;
|
import com.xzzn.framework.manager.MqttLifecycleManager;
|
||||||
import com.xzzn.framework.web.service.MqttPublisher;
|
import com.xzzn.framework.web.service.MqttPublisher;
|
||||||
import com.xzzn.framework.web.service.MqttSubscriber;
|
import com.xzzn.framework.web.service.MqttSubscriber;
|
||||||
|
import org.apache.commons.collections4.CollectionUtils;
|
||||||
import org.apache.juli.logging.Log;
|
import org.apache.juli.logging.Log;
|
||||||
import org.apache.juli.logging.LogFactory;
|
import org.apache.juli.logging.LogFactory;
|
||||||
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
|
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
|
||||||
import org.eclipse.paho.client.mqttv3.MqttException;
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||||
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
import java.util.Date;
|
import java.util.List;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
public class MqttMessageController implements MqttPublisher, MqttSubscriber {
|
public class MqttMessageController implements MqttPublisher, MqttSubscriber {
|
||||||
@ -39,6 +41,11 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber {
|
|||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private IFXXAlarmDataProcessService fXXAlarmDataProcessService;
|
private IFXXAlarmDataProcessService fXXAlarmDataProcessService;
|
||||||
|
@Autowired
|
||||||
|
private EmsMqttTopicConfigMapper emsMqttTopicConfigMapper;
|
||||||
|
@Autowired
|
||||||
|
private IEmsStrategyService emsStrategyService;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
public MqttMessageController(MqttLifecycleManager mqttLifecycleManager) {
|
public MqttMessageController(MqttLifecycleManager mqttLifecycleManager) {
|
||||||
this.mqttLifecycleManager = mqttLifecycleManager;
|
this.mqttLifecycleManager = mqttLifecycleManager;
|
||||||
@ -46,8 +53,31 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber {
|
|||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void init() {
|
public void init() {
|
||||||
|
List<EmsMqttTopicConfig> topicConfigList = emsMqttTopicConfigMapper.selectEmsMqttTopicConfigList(null);
|
||||||
|
|
||||||
|
if (CollectionUtils.isEmpty (topicConfigList)) {
|
||||||
|
log.info ("未查询到任何 MQTT 主题配置,跳过订阅");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (EmsMqttTopicConfig topicConfig : topicConfigList) {
|
||||||
|
String topic = topicConfig.getMqttTopic();
|
||||||
|
if (StringUtils.isEmpty(topic)) {
|
||||||
|
log.info ("主题配置 ID:" +topicConfig.getId() +"的 mqttTopic 为空,跳过订阅");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
int qos = topicConfig.getQos() == null ? 1 : topicConfig.getQos();
|
||||||
|
if (qos < 0 || qos > 2) {
|
||||||
|
log.info ("主题:" + topic +"的 QoS值"+ qos + "不合法,自动调整为1");
|
||||||
|
qos = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
IMqttMessageListener listener = getMqttListenerByTopic(topic, topicConfig.getId());
|
||||||
|
subscribe(topic, qos, listener);
|
||||||
|
}
|
||||||
// 订阅奉贤系统状态主题
|
// 订阅奉贤系统状态主题
|
||||||
subscribe("021_FXX_01_UP", 1, this::handleDeviceData);
|
/*subscribe("021_FXX_01_UP", 1, this::handleDeviceData);
|
||||||
subscribe("021_FXX_01_RECALL", 1, this::handleDeviceData);
|
subscribe("021_FXX_01_RECALL", 1, this::handleDeviceData);
|
||||||
subscribe("021_FXX_01_DOWN", 1, this::handleDeviceData);
|
subscribe("021_FXX_01_DOWN", 1, this::handleDeviceData);
|
||||||
subscribe("021_FXX_01", 1, this::handleSystemStatus);
|
subscribe("021_FXX_01", 1, this::handleSystemStatus);
|
||||||
@ -57,10 +87,22 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber {
|
|||||||
subscribe("021_DDS_01_UP", 1, this::handleDeviceData);
|
subscribe("021_DDS_01_UP", 1, this::handleDeviceData);
|
||||||
subscribe("021_DDS_01_RECALL", 1, this::handleDeviceData);
|
subscribe("021_DDS_01_RECALL", 1, this::handleDeviceData);
|
||||||
subscribe("021_DDS_01_DOWN", 1, this::handleDeviceData);
|
subscribe("021_DDS_01_DOWN", 1, this::handleDeviceData);
|
||||||
subscribe("021_DDS_01", 1, this::handleSystemStatus);
|
subscribe("021_DDS_01", 1, this::handleSystemStatus);*/
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private IMqttMessageListener getMqttListenerByTopic(String topic, Long id) {
|
||||||
|
if (topic.contains("ALARM_UP")) {
|
||||||
|
return this::handleAlarmData;
|
||||||
|
} else if (topic.endsWith("_01")) {
|
||||||
|
return this::handleSystemStatus;
|
||||||
|
} else if (topic.contains("STRATEGY")) {
|
||||||
|
return this::handleStrategyData;
|
||||||
|
} else {
|
||||||
|
return this::handleDeviceData;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// 处理系统状态消息
|
// 处理系统状态消息
|
||||||
private void handleSystemStatus(String topic, MqttMessage message) {
|
private void handleSystemStatus(String topic, MqttMessage message) {
|
||||||
String payload = new String(message.getPayload());
|
String payload = new String(message.getPayload());
|
||||||
@ -109,6 +151,29 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber {
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 处理运行策略数据
|
||||||
|
private void handleStrategyData(String topic, MqttMessage message) {
|
||||||
|
String payload = new String(message.getPayload());
|
||||||
|
System.out.println("[处理运行策略数据] data: " + payload);
|
||||||
|
try {
|
||||||
|
// 业务处理逻辑
|
||||||
|
MqttSyncStrategyLog strategyLog = JSON.parseObject(payload, MqttSyncStrategyLog.class);
|
||||||
|
if (strategyLog != null) {
|
||||||
|
String content = strategyLog.getContent();
|
||||||
|
// 根据不同操作更新
|
||||||
|
String operateType = strategyLog.getOperateType();
|
||||||
|
if (!StringUtils.isEmpty(operateType)) {
|
||||||
|
emsStrategyService.dealStrategyData(content,operateType);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
emsMqttMessageService.insertMqttOriginalMessage(topic,payload);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Failed to process system status message: " + e.getMessage(), e);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void publish(String topic, String message) throws MqttException {
|
public void publish(String topic, String message) throws MqttException {
|
||||||
mqttLifecycleManager.publish(topic, message, 0);
|
mqttLifecycleManager.publish(topic, message, 0);
|
||||||
@ -136,12 +201,4 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*private static final long FORCE_INTERVAL = 60 * 1000; // 1分钟(毫秒)
|
|
||||||
@Scheduled(fixedRate = FORCE_INTERVAL) // 每分钟执行一次
|
|
||||||
public void scheduledForceSave() {
|
|
||||||
System.out.println("执行定时强制存储任务:" + new Date());
|
|
||||||
|
|
||||||
}*/
|
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -14,7 +14,6 @@ import com.xzzn.common.annotation.Excel;
|
|||||||
public class EmsMqttTopicConfig extends BaseEntity
|
public class EmsMqttTopicConfig extends BaseEntity
|
||||||
{
|
{
|
||||||
private static final long serialVersionUID = 1L;
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
/** 主键 */
|
/** 主键 */
|
||||||
private Long id;
|
private Long id;
|
||||||
|
|
||||||
@ -22,13 +21,21 @@ public class EmsMqttTopicConfig extends BaseEntity
|
|||||||
@Excel(name = "订阅topic")
|
@Excel(name = "订阅topic")
|
||||||
private String mqttTopic;
|
private String mqttTopic;
|
||||||
|
|
||||||
|
/** QoS等级(0/1/2) */
|
||||||
|
@Excel(name = "QoS等级", readConverterExp = "0=/1/2")
|
||||||
|
private Integer qos;
|
||||||
|
|
||||||
/** topic描述 */
|
/** topic描述 */
|
||||||
@Excel(name = "topic描述")
|
@Excel(name = "topic描述")
|
||||||
private String topicName;
|
private String topicName;
|
||||||
|
|
||||||
/** 对应方法 */
|
/** 对应方法 */
|
||||||
@Excel(name = "对应方法")
|
@Excel(name = "对应方法")
|
||||||
private String method;
|
private String handleMethod;
|
||||||
|
|
||||||
|
/** 处理器类型:(device=设备数据,system=系统状态,alarm=告警数据等) */
|
||||||
|
@Excel(name = "处理器类型:(device=设备数据,system=系统状态,alarm=告警数据等)")
|
||||||
|
private String handleType;
|
||||||
|
|
||||||
/** 站点id */
|
/** 站点id */
|
||||||
@Excel(name = "站点id")
|
@Excel(name = "站点id")
|
||||||
@ -54,6 +61,16 @@ public class EmsMqttTopicConfig extends BaseEntity
|
|||||||
return mqttTopic;
|
return mqttTopic;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setQos(Integer qos)
|
||||||
|
{
|
||||||
|
this.qos = qos;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getQos()
|
||||||
|
{
|
||||||
|
return qos;
|
||||||
|
}
|
||||||
|
|
||||||
public void setTopicName(String topicName)
|
public void setTopicName(String topicName)
|
||||||
{
|
{
|
||||||
this.topicName = topicName;
|
this.topicName = topicName;
|
||||||
@ -64,14 +81,24 @@ public class EmsMqttTopicConfig extends BaseEntity
|
|||||||
return topicName;
|
return topicName;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setMethod(String method)
|
public void setHandleMethod(String handleMethod)
|
||||||
{
|
{
|
||||||
this.method = method;
|
this.handleMethod = handleMethod;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getMethod()
|
public String getHandleMethod()
|
||||||
{
|
{
|
||||||
return method;
|
return handleMethod;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setHandleType(String handleType)
|
||||||
|
{
|
||||||
|
this.handleType = handleType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getHandleType()
|
||||||
|
{
|
||||||
|
return handleType;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setSiteId(String siteId)
|
public void setSiteId(String siteId)
|
||||||
@ -89,8 +116,10 @@ public class EmsMqttTopicConfig extends BaseEntity
|
|||||||
return new ToStringBuilder(this,ToStringStyle.MULTI_LINE_STYLE)
|
return new ToStringBuilder(this,ToStringStyle.MULTI_LINE_STYLE)
|
||||||
.append("id", getId())
|
.append("id", getId())
|
||||||
.append("mqttTopic", getMqttTopic())
|
.append("mqttTopic", getMqttTopic())
|
||||||
|
.append("qos", getQos())
|
||||||
.append("topicName", getTopicName())
|
.append("topicName", getTopicName())
|
||||||
.append("method", getMethod())
|
.append("handleMethod", getHandleMethod())
|
||||||
|
.append("handleType", getHandleType())
|
||||||
.append("siteId", getSiteId())
|
.append("siteId", getSiteId())
|
||||||
.append("createBy", getCreateBy())
|
.append("createBy", getCreateBy())
|
||||||
.append("createTime", getCreateTime())
|
.append("createTime", getCreateTime())
|
||||||
|
|||||||
@ -58,4 +58,7 @@ public interface EmsMqttTopicConfigMapper
|
|||||||
* @return 结果
|
* @return 结果
|
||||||
*/
|
*/
|
||||||
public int deleteEmsMqttTopicConfigByIds(Long[] ids);
|
public int deleteEmsMqttTopicConfigByIds(Long[] ids);
|
||||||
|
|
||||||
|
// 判断topic是否存在
|
||||||
|
public String checkTopicIsExist(String strategyTopic);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -7,8 +7,10 @@
|
|||||||
<resultMap type="EmsMqttTopicConfig" id="EmsMqttTopicConfigResult">
|
<resultMap type="EmsMqttTopicConfig" id="EmsMqttTopicConfigResult">
|
||||||
<result property="id" column="id" />
|
<result property="id" column="id" />
|
||||||
<result property="mqttTopic" column="mqtt_topic" />
|
<result property="mqttTopic" column="mqtt_topic" />
|
||||||
|
<result property="qos" column="qos" />
|
||||||
<result property="topicName" column="topic_name" />
|
<result property="topicName" column="topic_name" />
|
||||||
<result property="method" column="method" />
|
<result property="handleMethod" column="handle_method" />
|
||||||
|
<result property="handleType" column="handle_type" />
|
||||||
<result property="siteId" column="siteId" />
|
<result property="siteId" column="siteId" />
|
||||||
<result property="createBy" column="create_by" />
|
<result property="createBy" column="create_by" />
|
||||||
<result property="createTime" column="create_time" />
|
<result property="createTime" column="create_time" />
|
||||||
@ -17,15 +19,17 @@
|
|||||||
</resultMap>
|
</resultMap>
|
||||||
|
|
||||||
<sql id="selectEmsMqttTopicConfigVo">
|
<sql id="selectEmsMqttTopicConfigVo">
|
||||||
select id, mqtt_topic, topic_name, method, siteId, create_by, create_time, update_by, update_time from ems_mqtt_topic_config
|
select id, mqtt_topic, qos, topic_name, handle_method, handle_type, siteId, create_by, create_time, update_by, update_time from ems_mqtt_topic_config
|
||||||
</sql>
|
</sql>
|
||||||
|
|
||||||
<select id="selectEmsMqttTopicConfigList" parameterType="EmsMqttTopicConfig" resultMap="EmsMqttTopicConfigResult">
|
<select id="selectEmsMqttTopicConfigList" parameterType="EmsMqttTopicConfig" resultMap="EmsMqttTopicConfigResult">
|
||||||
<include refid="selectEmsMqttTopicConfigVo"/>
|
<include refid="selectEmsMqttTopicConfigVo"/>
|
||||||
<where>
|
<where>
|
||||||
<if test="mqttTopic != null and mqttTopic != ''"> and mqtt_topic like concat('%', #{mqttTopic}, '%')</if>
|
<if test="mqttTopic != null and mqttTopic != ''"> and mqtt_topic = #{mqttTopic}</if>
|
||||||
|
<if test="qos != null "> and qos = #{qos}</if>
|
||||||
<if test="topicName != null and topicName != ''"> and topic_name like concat('%', #{topicName}, '%')</if>
|
<if test="topicName != null and topicName != ''"> and topic_name like concat('%', #{topicName}, '%')</if>
|
||||||
<if test="method != null and method != ''"> and method = #{method}</if>
|
<if test="handleMethod != null and handleMethod != ''"> and handle_method = #{handleMethod}</if>
|
||||||
|
<if test="handleType != null and handleType != ''"> and handle_type = #{handleType}</if>
|
||||||
<if test="siteId != null and siteId != ''"> and siteId = #{siteId}</if>
|
<if test="siteId != null and siteId != ''"> and siteId = #{siteId}</if>
|
||||||
</where>
|
</where>
|
||||||
</select>
|
</select>
|
||||||
@ -39,8 +43,10 @@
|
|||||||
insert into ems_mqtt_topic_config
|
insert into ems_mqtt_topic_config
|
||||||
<trim prefix="(" suffix=")" suffixOverrides=",">
|
<trim prefix="(" suffix=")" suffixOverrides=",">
|
||||||
<if test="mqttTopic != null">mqtt_topic,</if>
|
<if test="mqttTopic != null">mqtt_topic,</if>
|
||||||
|
<if test="qos != null">qos,</if>
|
||||||
<if test="topicName != null">topic_name,</if>
|
<if test="topicName != null">topic_name,</if>
|
||||||
<if test="method != null">method,</if>
|
<if test="handleMethod != null">handle_method,</if>
|
||||||
|
<if test="handleType != null">handle_type,</if>
|
||||||
<if test="siteId != null">siteId,</if>
|
<if test="siteId != null">siteId,</if>
|
||||||
<if test="createBy != null">create_by,</if>
|
<if test="createBy != null">create_by,</if>
|
||||||
<if test="createTime != null">create_time,</if>
|
<if test="createTime != null">create_time,</if>
|
||||||
@ -49,8 +55,10 @@
|
|||||||
</trim>
|
</trim>
|
||||||
<trim prefix="values (" suffix=")" suffixOverrides=",">
|
<trim prefix="values (" suffix=")" suffixOverrides=",">
|
||||||
<if test="mqttTopic != null">#{mqttTopic},</if>
|
<if test="mqttTopic != null">#{mqttTopic},</if>
|
||||||
|
<if test="qos != null">#{qos},</if>
|
||||||
<if test="topicName != null">#{topicName},</if>
|
<if test="topicName != null">#{topicName},</if>
|
||||||
<if test="method != null">#{method},</if>
|
<if test="handleMethod != null">#{handleMethod},</if>
|
||||||
|
<if test="handleType != null">#{handleType},</if>
|
||||||
<if test="siteId != null">#{siteId},</if>
|
<if test="siteId != null">#{siteId},</if>
|
||||||
<if test="createBy != null">#{createBy},</if>
|
<if test="createBy != null">#{createBy},</if>
|
||||||
<if test="createTime != null">#{createTime},</if>
|
<if test="createTime != null">#{createTime},</if>
|
||||||
@ -63,8 +71,10 @@
|
|||||||
update ems_mqtt_topic_config
|
update ems_mqtt_topic_config
|
||||||
<trim prefix="SET" suffixOverrides=",">
|
<trim prefix="SET" suffixOverrides=",">
|
||||||
<if test="mqttTopic != null">mqtt_topic = #{mqttTopic},</if>
|
<if test="mqttTopic != null">mqtt_topic = #{mqttTopic},</if>
|
||||||
|
<if test="qos != null">qos = #{qos},</if>
|
||||||
<if test="topicName != null">topic_name = #{topicName},</if>
|
<if test="topicName != null">topic_name = #{topicName},</if>
|
||||||
<if test="method != null">method = #{method},</if>
|
<if test="handleMethod != null">handle_method = #{handleMethod},</if>
|
||||||
|
<if test="handleType != null">handle_type = #{handleType},</if>
|
||||||
<if test="siteId != null">siteId = #{siteId},</if>
|
<if test="siteId != null">siteId = #{siteId},</if>
|
||||||
<if test="createBy != null">create_by = #{createBy},</if>
|
<if test="createBy != null">create_by = #{createBy},</if>
|
||||||
<if test="createTime != null">create_time = #{createTime},</if>
|
<if test="createTime != null">create_time = #{createTime},</if>
|
||||||
@ -84,4 +94,10 @@
|
|||||||
#{id}
|
#{id}
|
||||||
</foreach>
|
</foreach>
|
||||||
</delete>
|
</delete>
|
||||||
|
|
||||||
|
<select id="checkTopicIsExist" parameterType="String" resultType="String">
|
||||||
|
select mqtt_topic as topic
|
||||||
|
from ems_mqtt_topic_config
|
||||||
|
where mqtt_topic = #{strategyTopic}
|
||||||
|
</select>
|
||||||
</mapper>
|
</mapper>
|
||||||
Reference in New Issue
Block a user