From 630f77a78343f09de29fceb8341031e2d321541c Mon Sep 17 00:00:00 2001 From: mashili Date: Tue, 23 Sep 2025 14:00:22 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A5=89=E8=B4=A4=20alarm=20topic=E6=8E=A5?= =?UTF-8?q?=E5=85=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/ems/MqttMessageController.java | 26 ++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java b/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java index 734a155..c72c44d 100644 --- a/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java +++ b/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java @@ -1,6 +1,5 @@ package com.xzzn.web.controller.ems; -import com.xzzn.ems.domain.EmsMqttMessage; import com.xzzn.ems.service.IDDSDataProcessService; import com.xzzn.ems.service.IEmsMqttMessageService; import com.xzzn.ems.service.IFXXDataProcessService; @@ -13,9 +12,11 @@ import org.eclipse.paho.client.mqttv3.IMqttMessageListener; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; +import java.util.Date; @Service public class MqttMessageController implements MqttPublisher, MqttSubscriber { @@ -46,6 +47,7 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber { subscribe("021_FXX_01_RECALL", 1, this::handleDeviceData); subscribe("021_FXX_01_DOWN", 1, this::handleDeviceData); subscribe("021_FXX_01", 1, this::handleSystemStatus); + subscribe("021_FXX_01_ALARM_UP", 1, this::handleAlarmData); // 订阅电动所系统状态主题 subscribe("021_DDS_01_UP", 1, this::handleDeviceData); @@ -87,6 +89,22 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber { } + // 处理告警数据 + private void handleAlarmData(String topic, MqttMessage message) { + String payload = new String(message.getPayload()); + System.out.println("[DEVICE] data: " + payload); + try { + // 业务处理逻辑 + if (topic.startsWith("021_FXX")) { + fXXDataProcessService.handleFxAlarmData(payload); + } + + emsMqttMessageService.insertMqttOriginalMessage(topic,payload); + } catch (Exception e) { + log.error("Failed to process alarm data message: " + e.getMessage(), e); + } + + } @Override public void publish(String topic, String message) throws MqttException { mqttLifecycleManager.publish(topic, message, 0); @@ -115,5 +133,11 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber { } + private static final long FORCE_INTERVAL = 60 * 1000; // 1分钟(毫秒) + @Scheduled(fixedRate = FORCE_INTERVAL) // 每分钟执行一次 + public void scheduledForceSave() { + System.out.println("执行定时强制存储任务:" + new Date()); + + } } \ No newline at end of file