数据20250916优化-topic订阅失败&数据空报警

This commit is contained in:
2025-09-22 00:40:38 +08:00
parent e93f9cc6b8
commit a5f1444984
9 changed files with 151 additions and 36 deletions

View File

@ -1,5 +1,6 @@
package com.xzzn.framework.manager;
import com.xzzn.ems.service.IEmsAlarmRecordsService;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
@ -16,6 +17,7 @@ import java.util.concurrent.ConcurrentHashMap;
public class MqttLifecycleManager implements ApplicationRunner, SmartLifecycle, MqttCallback {
private final MqttConnectOptions connectOptions;
private final IEmsAlarmRecordsService iEmsAlarmRecordsService;
private MqttClient mqttClient;
private volatile boolean running = false;
@ -23,8 +25,9 @@ public class MqttLifecycleManager implements ApplicationRunner, SmartLifecycle,
private final ConcurrentHashMap<String, SubscriptionInfo> subscriptions = new ConcurrentHashMap<>();
@Autowired
public MqttLifecycleManager(MqttConnectOptions connectOptions) {
public MqttLifecycleManager(MqttConnectOptions connectOptions, IEmsAlarmRecordsService iEmsAlarmRecordsService) {
this.connectOptions = connectOptions;
this.iEmsAlarmRecordsService = iEmsAlarmRecordsService;
}
// Spring Boot 启动完成后执行
@ -107,7 +110,11 @@ public class MqttLifecycleManager implements ApplicationRunner, SmartLifecycle,
subscriptions.put(topic, new SubscriptionInfo(listener, qos));
} catch (MqttException e) {
System.err.println("Subscribe failed: " + e.getMessage());
// 订阅失败-增加告警
iEmsAlarmRecordsService.addSubFailedAlarmRecord(topic);
}
// 订阅成功了-校验是否存在未处理或者处理中的订阅失败信息
iEmsAlarmRecordsService.checkFailedRecord(topic);
}
// 发布方法