奉贤 alarm topic接入

This commit is contained in:
2025-09-23 14:00:22 +08:00
parent 8e8c57cb64
commit 630f77a783

View File

@ -1,6 +1,5 @@
package com.xzzn.web.controller.ems;
import com.xzzn.ems.domain.EmsMqttMessage;
import com.xzzn.ems.service.IDDSDataProcessService;
import com.xzzn.ems.service.IEmsMqttMessageService;
import com.xzzn.ems.service.IFXXDataProcessService;
@ -13,9 +12,11 @@ import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.Date;
@Service
public class MqttMessageController implements MqttPublisher, MqttSubscriber {
@ -46,6 +47,7 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber {
subscribe("021_FXX_01_RECALL", 1, this::handleDeviceData);
subscribe("021_FXX_01_DOWN", 1, this::handleDeviceData);
subscribe("021_FXX_01", 1, this::handleSystemStatus);
subscribe("021_FXX_01_ALARM_UP", 1, this::handleAlarmData);
// 订阅电动所系统状态主题
subscribe("021_DDS_01_UP", 1, this::handleDeviceData);
@ -87,6 +89,22 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber {
}
// 处理告警数据
private void handleAlarmData(String topic, MqttMessage message) {
String payload = new String(message.getPayload());
System.out.println("[DEVICE] data: " + payload);
try {
// 业务处理逻辑
if (topic.startsWith("021_FXX")) {
fXXDataProcessService.handleFxAlarmData(payload);
}
emsMqttMessageService.insertMqttOriginalMessage(topic,payload);
} catch (Exception e) {
log.error("Failed to process alarm data message: " + e.getMessage(), e);
}
}
@Override
public void publish(String topic, String message) throws MqttException {
mqttLifecycleManager.publish(topic, message, 0);
@ -115,5 +133,11 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber {
}
private static final long FORCE_INTERVAL = 60 * 1000; // 1分钟毫秒
@Scheduled(fixedRate = FORCE_INTERVAL) // 每分钟执行一次
public void scheduledForceSave() {
System.out.println("执行定时强制存储任务:" + new Date());
}
}