diff --git a/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java b/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java new file mode 100644 index 0000000..aec61ae --- /dev/null +++ b/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java @@ -0,0 +1,79 @@ +package com.xzzn.web.controller.ems; + +import com.xzzn.framework.manager.MqttLifecycleManager; +import com.xzzn.framework.web.service.MqttPublisher; +import com.xzzn.framework.web.service.MqttSubscriber; +import org.eclipse.paho.client.mqttv3.IMqttMessageListener; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; + +@Service +public class MqttMessageController implements MqttPublisher, MqttSubscriber { + + private final MqttLifecycleManager mqttLifecycleManager; + + @Autowired + public MqttMessageController(MqttLifecycleManager mqttLifecycleManager) { + this.mqttLifecycleManager = mqttLifecycleManager; + } + + @PostConstruct + public void init() { + // 订阅系统状态主题 + subscribe("system/status", 1, this::handleSystemStatus); + + // 订阅设备数据主题 + subscribe("devices/data", 0, this::handleDeviceData); + } + + // 处理系统状态消息 + private void handleSystemStatus(String topic, MqttMessage message) { + String payload = new String(message.getPayload()); + System.out.println("[SYSTEM] Status update: " + payload); + // 业务处理逻辑 + + + } + + // 处理设备数据 + private void handleDeviceData(String topic, MqttMessage message) { + String payload = new String(message.getPayload()); + System.out.println("[DEVICE] data: " + payload); + // 业务处理逻辑 + + + + + } + + @Override + public void publish(String topic, String message) throws MqttException { + mqttLifecycleManager.publish(topic, message, 0); + } + + @Override + public void publish(String topic, String message, int qos) throws MqttException { + mqttLifecycleManager.publish(topic, message, qos); + } + + @Override + public void subscribe(String topic, int qos, IMqttMessageListener listener) { + mqttLifecycleManager.subscribe(topic, qos, listener); + + + } + + // 发送设备控制命令 + public void sendDeviceCommand(String deviceId, String command) { + try { + String topic = "devices/" + deviceId + "/commands"; + publish(topic, command, 1); + } catch (MqttException e) { + System.err.println("Failed to send command to device " + deviceId); + } + } +} \ No newline at end of file diff --git a/ems-admin/src/main/resources/application.yml b/ems-admin/src/main/resources/application.yml index 58d7acb..9a02bd4 100644 --- a/ems-admin/src/main/resources/application.yml +++ b/ems-admin/src/main/resources/application.yml @@ -127,3 +127,12 @@ xss: excludes: /system/notice # 匹配链接 urlPatterns: /system/*,/monitor/*,/tool/* + +mqtt: + broker.url: tcp://122.51.194.184:1883 + client.id: ems-cloud + username: admin + password: pass123 + connection-timeout: 15 + keep-alive-interval: 30 + automatic-reconnect: true \ No newline at end of file diff --git a/ems-framework/pom.xml b/ems-framework/pom.xml index bc71b9a..06a1274 100644 --- a/ems-framework/pom.xml +++ b/ems-framework/pom.xml @@ -46,7 +46,10 @@ - + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + com.github.oshi @@ -58,6 +61,11 @@ com.xzzn ems-system + + org.projectlombok + lombok + provided + diff --git a/ems-framework/src/main/java/com/xzzn/framework/config/MqttConfig.java b/ems-framework/src/main/java/com/xzzn/framework/config/MqttConfig.java new file mode 100644 index 0000000..53cd2d5 --- /dev/null +++ b/ems-framework/src/main/java/com/xzzn/framework/config/MqttConfig.java @@ -0,0 +1,31 @@ +package com.xzzn.framework.config; + +import com.xzzn.framework.config.properties.MqttProperties; +import org.eclipse.paho.client.mqttv3.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import javax.annotation.Resource; + +@Configuration +public class MqttConfig { + private static final Logger logger = LoggerFactory.getLogger(MqttConfig.class); + + @Resource + private MqttProperties mqttProperties; + @Bean + public MqttConnectOptions mqttConnectOptions() { + MqttConnectOptions options = new MqttConnectOptions(); + options.setServerURIs(new String[]{mqttProperties.getBrokerUrl()}); + if (!mqttProperties.getUsername().isEmpty()) options.setUserName(mqttProperties.getUsername()); + if (!mqttProperties.getPassword().isEmpty()) options.setPassword(mqttProperties.getPassword().toCharArray()); + options.setConnectionTimeout(mqttProperties.getConnectionTimeout()); + options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval()); + options.setAutomaticReconnect(mqttProperties.isAutomaticReconnect()); + options.setCleanSession(true); + return options; + } + +} diff --git a/ems-framework/src/main/java/com/xzzn/framework/config/properties/MqttProperties.java b/ems-framework/src/main/java/com/xzzn/framework/config/properties/MqttProperties.java new file mode 100644 index 0000000..f751596 --- /dev/null +++ b/ems-framework/src/main/java/com/xzzn/framework/config/properties/MqttProperties.java @@ -0,0 +1,31 @@ +package com.xzzn.framework.config.properties; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +public class MqttProperties { + @Value("${mqtt.broker.url}") + private String brokerUrl; + + @Value("${mqtt.client.id:}") + private String clientId; + + @Value("${mqtt.username:}") + private String username; + + @Value("${mqtt.password:}") + private String password; + + @Value("${mqtt.connection-timeout:10}") + private int connectionTimeout; + + @Value("${mqtt.keep-alive-interval:60}") + private int keepAliveInterval; + + @Value("${mqtt.automatic-reconnect:true}") + private boolean automaticReconnect; + +} diff --git a/ems-framework/src/main/java/com/xzzn/framework/manager/MqttLifecycleManager.java b/ems-framework/src/main/java/com/xzzn/framework/manager/MqttLifecycleManager.java new file mode 100644 index 0000000..234382a --- /dev/null +++ b/ems-framework/src/main/java/com/xzzn/framework/manager/MqttLifecycleManager.java @@ -0,0 +1,155 @@ +package com.xzzn.framework.manager; + +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.context.SmartLifecycle; +import org.springframework.stereotype.Component; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +@Component +public class MqttLifecycleManager implements ApplicationRunner, SmartLifecycle, MqttCallback { + + private final MqttConnectOptions connectOptions; + private MqttClient mqttClient; + private volatile boolean running = false; + + // 存储订阅关系: topic -> (listener, qos) + private final ConcurrentHashMap subscriptions = new ConcurrentHashMap<>(); + + @Autowired + public MqttLifecycleManager(MqttConnectOptions connectOptions) { + this.connectOptions = connectOptions; + } + + // Spring Boot 启动完成后执行 + @Override + public void run(ApplicationArguments args) throws Exception { + start(); + } + + @Override + public void start() { + if (running) return; + + try { + String clientId = connectOptions.getUserName() + "-" + System.currentTimeMillis(); + mqttClient = new MqttClient( + connectOptions.getServerURIs()[0], + clientId, + new MemoryPersistence() + ); + + mqttClient.setCallback(this); + mqttClient.connect(connectOptions); + + // 重连后自动重新订阅 + resubscribeAll(); + + running = true; + System.out.println("MQTT client connected to: " + connectOptions.getServerURIs()[0]); + } catch (MqttException e) { + System.err.println("MQTT connection failed: " + e.getMessage()); + // 添加重试逻辑 + } + } + + @Override + public void stop() { + if (mqttClient != null && mqttClient.isConnected()) { + try { + mqttClient.disconnect(); + mqttClient.close(); + } catch (MqttException e) { + System.err.println("Error disconnecting MQTT client: " + e.getMessage()); + } + } + running = false; + } + + @Override + public boolean isRunning() { + return running; + } + + // MQTT 回调方法 + @Override + public void connectionLost(Throwable cause) { + System.err.println("MQTT connection lost: " + cause.getMessage()); + running = false; + // 自动重连由 MqttConnectOptions 处理 + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + SubscriptionInfo info = subscriptions.get(topic); + if (info != null && info.getListener() != null) { + info.getListener().messageArrived(topic, message); + } + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + // 消息发布完成处理 + } + + // 订阅方法 + public void subscribe(String topic, int qos, IMqttMessageListener listener) { + try { + if (mqttClient != null && mqttClient.isConnected()) { + mqttClient.subscribe(topic, qos); + } + subscriptions.put(topic, new SubscriptionInfo(listener, qos)); + } catch (MqttException e) { + System.err.println("Subscribe failed: " + e.getMessage()); + } + } + + // 发布方法 + public void publish(String topic, String payload, int qos) throws MqttException { + if (mqttClient != null && mqttClient.isConnected()) { + MqttMessage message = new MqttMessage(payload.getBytes()); + message.setQos(qos); + mqttClient.publish(topic, message); + } else { + throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED); + } + } + + // 重新订阅所有主题 + private void resubscribeAll() { + if (mqttClient == null || !mqttClient.isConnected()) return; + + subscriptions.forEach((topic, info) -> { + try { + mqttClient.subscribe(topic, info.getQos()); + } catch (MqttException e) { + System.err.println("Resubscribe failed for topic: " + topic); + } + }); + } + + // 订阅信息内部类 + private static class SubscriptionInfo { + private final IMqttMessageListener listener; + private final int qos; + + public SubscriptionInfo(IMqttMessageListener listener, int qos) { + this.listener = listener; + this.qos = qos; + } + + public IMqttMessageListener getListener() { + return listener; + } + + public int getQos() { + return qos; + } + } +} \ No newline at end of file diff --git a/ems-framework/src/main/java/com/xzzn/framework/web/service/MqttPublisher.java b/ems-framework/src/main/java/com/xzzn/framework/web/service/MqttPublisher.java new file mode 100644 index 0000000..927d44e --- /dev/null +++ b/ems-framework/src/main/java/com/xzzn/framework/web/service/MqttPublisher.java @@ -0,0 +1,8 @@ +package com.xzzn.framework.web.service; + +import org.eclipse.paho.client.mqttv3.MqttException; + +public interface MqttPublisher { + void publish(String topic, String message) throws MqttException; + void publish(String topic, String message, int qos) throws MqttException; +} diff --git a/ems-framework/src/main/java/com/xzzn/framework/web/service/MqttSubscriber.java b/ems-framework/src/main/java/com/xzzn/framework/web/service/MqttSubscriber.java new file mode 100644 index 0000000..bf2c3be --- /dev/null +++ b/ems-framework/src/main/java/com/xzzn/framework/web/service/MqttSubscriber.java @@ -0,0 +1,7 @@ +package com.xzzn.framework.web.service; + +import org.eclipse.paho.client.mqttv3.IMqttMessageListener; + +public interface MqttSubscriber { + void subscribe(String topic, int qos, IMqttMessageListener listener); +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index dfc25b0..ee188eb 100644 --- a/pom.xml +++ b/pom.xml @@ -154,6 +154,16 @@ poi-ooxml ${poi.version} + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.5 + + + org.springframework.integration + spring-integration-mqtt +