diff --git a/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java b/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java index 734a155..c72c44d 100644 --- a/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java +++ b/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java @@ -1,6 +1,5 @@ package com.xzzn.web.controller.ems; -import com.xzzn.ems.domain.EmsMqttMessage; import com.xzzn.ems.service.IDDSDataProcessService; import com.xzzn.ems.service.IEmsMqttMessageService; import com.xzzn.ems.service.IFXXDataProcessService; @@ -13,9 +12,11 @@ import org.eclipse.paho.client.mqttv3.IMqttMessageListener; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; +import java.util.Date; @Service public class MqttMessageController implements MqttPublisher, MqttSubscriber { @@ -46,6 +47,7 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber { subscribe("021_FXX_01_RECALL", 1, this::handleDeviceData); subscribe("021_FXX_01_DOWN", 1, this::handleDeviceData); subscribe("021_FXX_01", 1, this::handleSystemStatus); + subscribe("021_FXX_01_ALARM_UP", 1, this::handleAlarmData); // 订阅电动所系统状态主题 subscribe("021_DDS_01_UP", 1, this::handleDeviceData); @@ -87,6 +89,22 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber { } + // 处理告警数据 + private void handleAlarmData(String topic, MqttMessage message) { + String payload = new String(message.getPayload()); + System.out.println("[DEVICE] data: " + payload); + try { + // 业务处理逻辑 + if (topic.startsWith("021_FXX")) { + fXXDataProcessService.handleFxAlarmData(payload); + } + + emsMqttMessageService.insertMqttOriginalMessage(topic,payload); + } catch (Exception e) { + log.error("Failed to process alarm data message: " + e.getMessage(), e); + } + + } @Override public void publish(String topic, String message) throws MqttException { mqttLifecycleManager.publish(topic, message, 0); @@ -115,5 +133,11 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber { } + private static final long FORCE_INTERVAL = 60 * 1000; // 1分钟(毫秒) + @Scheduled(fixedRate = FORCE_INTERVAL) // 每分钟执行一次 + public void scheduledForceSave() { + System.out.println("执行定时强制存储任务:" + new Date()); + + } } \ No newline at end of file