mqtt配置

This commit is contained in:
2025-06-27 12:49:14 +08:00
parent 0544929d07
commit f439228432
9 changed files with 339 additions and 1 deletions

View File

@ -0,0 +1,79 @@
package com.xzzn.web.controller.ems;
import com.xzzn.framework.manager.MqttLifecycleManager;
import com.xzzn.framework.web.service.MqttPublisher;
import com.xzzn.framework.web.service.MqttSubscriber;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
@Service
public class MqttMessageController implements MqttPublisher, MqttSubscriber {
private final MqttLifecycleManager mqttLifecycleManager;
@Autowired
public MqttMessageController(MqttLifecycleManager mqttLifecycleManager) {
this.mqttLifecycleManager = mqttLifecycleManager;
}
@PostConstruct
public void init() {
// 订阅系统状态主题
subscribe("system/status", 1, this::handleSystemStatus);
// 订阅设备数据主题
subscribe("devices/data", 0, this::handleDeviceData);
}
// 处理系统状态消息
private void handleSystemStatus(String topic, MqttMessage message) {
String payload = new String(message.getPayload());
System.out.println("[SYSTEM] Status update: " + payload);
// 业务处理逻辑
}
// 处理设备数据
private void handleDeviceData(String topic, MqttMessage message) {
String payload = new String(message.getPayload());
System.out.println("[DEVICE] data: " + payload);
// 业务处理逻辑
}
@Override
public void publish(String topic, String message) throws MqttException {
mqttLifecycleManager.publish(topic, message, 0);
}
@Override
public void publish(String topic, String message, int qos) throws MqttException {
mqttLifecycleManager.publish(topic, message, qos);
}
@Override
public void subscribe(String topic, int qos, IMqttMessageListener listener) {
mqttLifecycleManager.subscribe(topic, qos, listener);
}
// 发送设备控制命令
public void sendDeviceCommand(String deviceId, String command) {
try {
String topic = "devices/" + deviceId + "/commands";
publish(topic, command, 1);
} catch (MqttException e) {
System.err.println("Failed to send command to device " + deviceId);
}
}
}

View File

@ -127,3 +127,12 @@ xss:
excludes: /system/notice
# 匹配链接
urlPatterns: /system/*,/monitor/*,/tool/*
mqtt:
broker.url: tcp://122.51.194.184:1883
client.id: ems-cloud
username: admin
password: pass123
connection-timeout: 15
keep-alive-interval: 30
automatic-reconnect: true

View File

@ -46,7 +46,10 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>
<!-- 获取系统信息 -->
<dependency>
<groupId>com.github.oshi</groupId>
@ -58,6 +61,11 @@
<groupId>com.xzzn</groupId>
<artifactId>ems-system</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>

View File

@ -0,0 +1,31 @@
package com.xzzn.framework.config;
import com.xzzn.framework.config.properties.MqttProperties;
import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
@Configuration
public class MqttConfig {
private static final Logger logger = LoggerFactory.getLogger(MqttConfig.class);
@Resource
private MqttProperties mqttProperties;
@Bean
public MqttConnectOptions mqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{mqttProperties.getBrokerUrl()});
if (!mqttProperties.getUsername().isEmpty()) options.setUserName(mqttProperties.getUsername());
if (!mqttProperties.getPassword().isEmpty()) options.setPassword(mqttProperties.getPassword().toCharArray());
options.setConnectionTimeout(mqttProperties.getConnectionTimeout());
options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
options.setAutomaticReconnect(mqttProperties.isAutomaticReconnect());
options.setCleanSession(true);
return options;
}
}

View File

@ -0,0 +1,31 @@
package com.xzzn.framework.config.properties;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Data
@Configuration
public class MqttProperties {
@Value("${mqtt.broker.url}")
private String brokerUrl;
@Value("${mqtt.client.id:}")
private String clientId;
@Value("${mqtt.username:}")
private String username;
@Value("${mqtt.password:}")
private String password;
@Value("${mqtt.connection-timeout:10}")
private int connectionTimeout;
@Value("${mqtt.keep-alive-interval:60}")
private int keepAliveInterval;
@Value("${mqtt.automatic-reconnect:true}")
private boolean automaticReconnect;
}

View File

@ -0,0 +1,155 @@
package com.xzzn.framework.manager;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.SmartLifecycle;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class MqttLifecycleManager implements ApplicationRunner, SmartLifecycle, MqttCallback {
private final MqttConnectOptions connectOptions;
private MqttClient mqttClient;
private volatile boolean running = false;
// 存储订阅关系: topic -> (listener, qos)
private final ConcurrentHashMap<String, SubscriptionInfo> subscriptions = new ConcurrentHashMap<>();
@Autowired
public MqttLifecycleManager(MqttConnectOptions connectOptions) {
this.connectOptions = connectOptions;
}
// Spring Boot 启动完成后执行
@Override
public void run(ApplicationArguments args) throws Exception {
start();
}
@Override
public void start() {
if (running) return;
try {
String clientId = connectOptions.getUserName() + "-" + System.currentTimeMillis();
mqttClient = new MqttClient(
connectOptions.getServerURIs()[0],
clientId,
new MemoryPersistence()
);
mqttClient.setCallback(this);
mqttClient.connect(connectOptions);
// 重连后自动重新订阅
resubscribeAll();
running = true;
System.out.println("MQTT client connected to: " + connectOptions.getServerURIs()[0]);
} catch (MqttException e) {
System.err.println("MQTT connection failed: " + e.getMessage());
// 添加重试逻辑
}
}
@Override
public void stop() {
if (mqttClient != null && mqttClient.isConnected()) {
try {
mqttClient.disconnect();
mqttClient.close();
} catch (MqttException e) {
System.err.println("Error disconnecting MQTT client: " + e.getMessage());
}
}
running = false;
}
@Override
public boolean isRunning() {
return running;
}
// MQTT 回调方法
@Override
public void connectionLost(Throwable cause) {
System.err.println("MQTT connection lost: " + cause.getMessage());
running = false;
// 自动重连由 MqttConnectOptions 处理
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
SubscriptionInfo info = subscriptions.get(topic);
if (info != null && info.getListener() != null) {
info.getListener().messageArrived(topic, message);
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// 消息发布完成处理
}
// 订阅方法
public void subscribe(String topic, int qos, IMqttMessageListener listener) {
try {
if (mqttClient != null && mqttClient.isConnected()) {
mqttClient.subscribe(topic, qos);
}
subscriptions.put(topic, new SubscriptionInfo(listener, qos));
} catch (MqttException e) {
System.err.println("Subscribe failed: " + e.getMessage());
}
}
// 发布方法
public void publish(String topic, String payload, int qos) throws MqttException {
if (mqttClient != null && mqttClient.isConnected()) {
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(qos);
mqttClient.publish(topic, message);
} else {
throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
}
}
// 重新订阅所有主题
private void resubscribeAll() {
if (mqttClient == null || !mqttClient.isConnected()) return;
subscriptions.forEach((topic, info) -> {
try {
mqttClient.subscribe(topic, info.getQos());
} catch (MqttException e) {
System.err.println("Resubscribe failed for topic: " + topic);
}
});
}
// 订阅信息内部类
private static class SubscriptionInfo {
private final IMqttMessageListener listener;
private final int qos;
public SubscriptionInfo(IMqttMessageListener listener, int qos) {
this.listener = listener;
this.qos = qos;
}
public IMqttMessageListener getListener() {
return listener;
}
public int getQos() {
return qos;
}
}
}

View File

@ -0,0 +1,8 @@
package com.xzzn.framework.web.service;
import org.eclipse.paho.client.mqttv3.MqttException;
public interface MqttPublisher {
void publish(String topic, String message) throws MqttException;
void publish(String topic, String message, int qos) throws MqttException;
}

View File

@ -0,0 +1,7 @@
package com.xzzn.framework.web.service;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
public interface MqttSubscriber {
void subscribe(String topic, int qos, IMqttMessageListener listener);
}

10
pom.xml
View File

@ -154,6 +154,16 @@
<artifactId>poi-ooxml</artifactId>
<version>${poi.version}</version>
</dependency>
<!-- -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version> <!-- 检查最新版本 -->
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!-- velocity代码生成使用模板 -->
<dependency>