临时修改
This commit is contained in:
@ -3,21 +3,37 @@ package com.xzzn.framework.manager;
|
||||
import com.xzzn.ems.service.IEmsAlarmRecordsService;
|
||||
import org.eclipse.paho.client.mqttv3.*;
|
||||
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.ApplicationArguments;
|
||||
import org.springframework.boot.ApplicationRunner;
|
||||
import org.springframework.context.SmartLifecycle;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
@Component
|
||||
public class MqttLifecycleManager implements ApplicationRunner, SmartLifecycle, MqttCallback {
|
||||
public class MqttLifecycleManager implements ApplicationRunner, SmartLifecycle, MqttCallbackExtended {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MqttLifecycleManager.class);
|
||||
private static final long RECONNECT_DELAY_SECONDS = 5;
|
||||
|
||||
private final MqttConnectOptions connectOptions;
|
||||
private final IEmsAlarmRecordsService iEmsAlarmRecordsService;
|
||||
private final ScheduledExecutorService reconnectExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
|
||||
Thread thread = new Thread(r);
|
||||
thread.setName("mqtt-reconnect");
|
||||
thread.setDaemon(true);
|
||||
return thread;
|
||||
});
|
||||
private final AtomicBoolean reconnectScheduled = new AtomicBoolean(false);
|
||||
private volatile ScheduledFuture<?> reconnectFuture;
|
||||
private MqttClient mqttClient;
|
||||
private volatile boolean running = false;
|
||||
|
||||
@ -41,7 +57,9 @@ public class MqttLifecycleManager implements ApplicationRunner, SmartLifecycle,
|
||||
if (running) return;
|
||||
|
||||
try {
|
||||
String clientId = connectOptions.getUserName() + "-" + System.currentTimeMillis();
|
||||
String prefix = connectOptions.getUserName() == null || connectOptions.getUserName().isEmpty()
|
||||
? "mqtt-client" : connectOptions.getUserName();
|
||||
String clientId = prefix + "-" + System.currentTimeMillis();
|
||||
mqttClient = new MqttClient(
|
||||
connectOptions.getServerURIs()[0],
|
||||
clientId,
|
||||
@ -51,27 +69,28 @@ public class MqttLifecycleManager implements ApplicationRunner, SmartLifecycle,
|
||||
mqttClient.setCallback(this);
|
||||
mqttClient.connect(connectOptions);
|
||||
|
||||
// 重连后自动重新订阅
|
||||
resubscribeAll();
|
||||
|
||||
running = true;
|
||||
System.out.println("MQTT client connected to: " + connectOptions.getServerURIs()[0]);
|
||||
log.info("MQTT client connected to: {}", connectOptions.getServerURIs()[0]);
|
||||
} catch (MqttException e) {
|
||||
System.err.println("MQTT connection failed: " + e.getMessage());
|
||||
// 添加重试逻辑
|
||||
running = false;
|
||||
log.error("MQTT connection failed: {}", e.getMessage(), e);
|
||||
scheduleReconnect();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
cancelReconnectTask();
|
||||
if (mqttClient != null && mqttClient.isConnected()) {
|
||||
try {
|
||||
mqttClient.disconnect();
|
||||
mqttClient.close();
|
||||
} catch (MqttException e) {
|
||||
System.err.println("Error disconnecting MQTT client: " + e.getMessage());
|
||||
log.warn("Error disconnecting MQTT client: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
reconnectExecutor.shutdownNow();
|
||||
running = false;
|
||||
}
|
||||
|
||||
@ -83,9 +102,17 @@ public class MqttLifecycleManager implements ApplicationRunner, SmartLifecycle,
|
||||
// MQTT 回调方法
|
||||
@Override
|
||||
public void connectionLost(Throwable cause) {
|
||||
System.err.println("MQTT connection lost: " + cause.getMessage());
|
||||
log.warn("MQTT connection lost: {}", cause == null ? "unknown" : cause.getMessage(), cause);
|
||||
running = false;
|
||||
// 自动重连由 MqttConnectOptions 处理
|
||||
scheduleReconnect();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connectComplete(boolean reconnect, String serverURI) {
|
||||
running = true;
|
||||
cancelReconnectTask();
|
||||
log.info("MQTT connection complete, reconnect: {}, serverURI: {}", reconnect, serverURI);
|
||||
resubscribeAll();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -106,14 +133,16 @@ public class MqttLifecycleManager implements ApplicationRunner, SmartLifecycle,
|
||||
try {
|
||||
if (mqttClient != null && mqttClient.isConnected()) {
|
||||
mqttClient.subscribe(topic, qos);
|
||||
log.info("MQTT subscribe success, topic: {}, qos: {}", topic, qos);
|
||||
} else {
|
||||
log.warn("MQTT subscribe deferred, client not connected, topic: {}", topic);
|
||||
}
|
||||
subscriptions.put(topic, new SubscriptionInfo(listener, qos));
|
||||
} catch (MqttException e) {
|
||||
System.err.println("Subscribe failed: " + e.getMessage());
|
||||
// 订阅失败-增加告警
|
||||
log.error("Subscribe failed, topic: {}, err: {}", topic, e.getMessage(), e);
|
||||
iEmsAlarmRecordsService.addSubFailedAlarmRecord(topic);
|
||||
scheduleReconnect();
|
||||
}
|
||||
// 订阅成功了-校验是否存在未处理或者处理中的订阅失败信息
|
||||
iEmsAlarmRecordsService.checkFailedRecord(topic);
|
||||
}
|
||||
|
||||
@ -135,12 +164,52 @@ public class MqttLifecycleManager implements ApplicationRunner, SmartLifecycle,
|
||||
subscriptions.forEach((topic, info) -> {
|
||||
try {
|
||||
mqttClient.subscribe(topic, info.getQos());
|
||||
log.info("MQTT resubscribe success, topic: {}, qos: {}", topic, info.getQos());
|
||||
} catch (MqttException e) {
|
||||
System.err.println("Resubscribe failed for topic: " + topic);
|
||||
log.error("Resubscribe failed for topic: {}, err: {}", topic, e.getMessage(), e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void scheduleReconnect() {
|
||||
if (mqttClient == null || reconnectExecutor.isShutdown()) {
|
||||
return;
|
||||
}
|
||||
if (!reconnectScheduled.compareAndSet(false, true)) {
|
||||
return;
|
||||
}
|
||||
|
||||
reconnectFuture = reconnectExecutor.scheduleWithFixedDelay(() -> {
|
||||
if (mqttClient == null) {
|
||||
cancelReconnectTask();
|
||||
return;
|
||||
}
|
||||
if (mqttClient.isConnected()) {
|
||||
cancelReconnectTask();
|
||||
return;
|
||||
}
|
||||
try {
|
||||
log.info("MQTT reconnecting...");
|
||||
mqttClient.connect(connectOptions);
|
||||
running = true;
|
||||
cancelReconnectTask();
|
||||
resubscribeAll();
|
||||
log.info("MQTT reconnect success.");
|
||||
} catch (MqttException e) {
|
||||
running = false;
|
||||
log.warn("MQTT reconnect failed: {}", e.getMessage());
|
||||
}
|
||||
}, RECONNECT_DELAY_SECONDS, RECONNECT_DELAY_SECONDS, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
private void cancelReconnectTask() {
|
||||
reconnectScheduled.set(false);
|
||||
ScheduledFuture<?> future = reconnectFuture;
|
||||
if (future != null && !future.isCancelled()) {
|
||||
future.cancel(false);
|
||||
}
|
||||
}
|
||||
|
||||
// 订阅信息内部类
|
||||
private static class SubscriptionInfo {
|
||||
private final IMqttMessageListener listener;
|
||||
@ -159,4 +228,4 @@ public class MqttLifecycleManager implements ApplicationRunner, SmartLifecycle,
|
||||
return qos;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user