mqtt配置

This commit is contained in:
2025-06-27 14:07:45 +08:00
parent da894c26d1
commit 31dc8e72c6
6 changed files with 413 additions and 9 deletions

View File

@ -1,8 +1,13 @@
package com.xzzn.web.controller.ems;
import com.xzzn.ems.domain.EmsMqttMessage;
import com.xzzn.ems.service.IEmsMqttMessageService;
import com.xzzn.framework.manager.MqttLifecycleManager;
import com.xzzn.framework.web.service.MqttPublisher;
import com.xzzn.framework.web.service.MqttSubscriber;
import org.apache.coyote.http11.Http11NioProtocol;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
@ -14,8 +19,16 @@ import javax.annotation.PostConstruct;
@Service
public class MqttMessageController implements MqttPublisher, MqttSubscriber {
private static final Log log = LogFactory.getLog(MqttMessageController.class);
private final MqttLifecycleManager mqttLifecycleManager;
@Autowired
private IEmsMqttMessageService emsMqttMessageService;
@Autowired
public MqttMessageController(MqttLifecycleManager mqttLifecycleManager) {
this.mqttLifecycleManager = mqttLifecycleManager;
@ -24,29 +37,50 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber {
@PostConstruct
public void init() {
// 订阅系统状态主题
subscribe("system/status", 1, this::handleSystemStatus);
subscribe("021_FXX_01_UP", 1, this::handleDeviceData);
subscribe("021_FXX_01_RECALL", 1, this::handleDeviceData);
subscribe("021_FXX_01_DOWN", 1, this::handleDeviceData);
subscribe("021_FXX_01", 1, this::handleSystemStatus);
// 订阅设备数据主题
subscribe("devices/data", 0, this::handleDeviceData);
}
// 处理系统状态消息
private void handleSystemStatus(String topic, MqttMessage message) {
String payload = new String(message.getPayload());
System.out.println("[SYSTEM] Status update: " + payload);
// 业务处理逻辑
try {
// 业务处理逻辑
EmsMqttMessage mqttMessage = new EmsMqttMessage();
mqttMessage.setMqttTopic(topic);
mqttMessage.setMqttMessage(payload);
mqttMessage.setCreateTime(new java.util.Date());
mqttMessage.setUpdateTime(new java.util.Date());
mqttMessage.setCreateBy("system");
mqttMessage.setUpdateBy("system");
emsMqttMessageService.insertEmsMqttMessage(mqttMessage);
} catch (Exception e) {
log.error("Failed to process system status message: " + e.getMessage(), e);
}
}
// 处理设备数据
private void handleDeviceData(String topic, MqttMessage message) {
String payload = new String(message.getPayload());
System.out.println("[DEVICE] data: " + payload);
// 业务处理逻辑
try {
// 业务处理逻辑
EmsMqttMessage mqttMessage = new EmsMqttMessage();
mqttMessage.setMqttTopic(topic);
mqttMessage.setMqttMessage(payload);
mqttMessage.setCreateTime(new java.util.Date());
mqttMessage.setUpdateTime(new java.util.Date());
mqttMessage.setCreateBy("system");
mqttMessage.setUpdateBy("system");
emsMqttMessageService.insertEmsMqttMessage(mqttMessage);
} catch (Exception e) {
log.error("Failed to process system status message: " + e.getMessage(), e);
}
}