diff --git a/ems-common/src/main/java/com/xzzn/common/constant/RedisKeyConstants.java b/ems-common/src/main/java/com/xzzn/common/constant/RedisKeyConstants.java index 408dc42..eaab5be 100644 --- a/ems-common/src/main/java/com/xzzn/common/constant/RedisKeyConstants.java +++ b/ems-common/src/main/java/com/xzzn/common/constant/RedisKeyConstants.java @@ -56,4 +56,9 @@ public class RedisKeyConstants * 存放单个设备同步过来的原始数据 */ public static final String ORIGINAL_MQTT_DATA = "MQTT_"; + + /** 存放订阅失败告警信息 */ + public static final String TOPIC_FAILED_ALRAM_RECORD = "topic_failed_"; + /** topic 内没有数据设备维度告警 */ + public static final String TOPIC_EMPTY_ALARM_RECORD = "topic_empty_"; } diff --git a/ems-framework/src/main/java/com/xzzn/framework/manager/MqttLifecycleManager.java b/ems-framework/src/main/java/com/xzzn/framework/manager/MqttLifecycleManager.java index 234382a..b4c05ce 100644 --- a/ems-framework/src/main/java/com/xzzn/framework/manager/MqttLifecycleManager.java +++ b/ems-framework/src/main/java/com/xzzn/framework/manager/MqttLifecycleManager.java @@ -1,5 +1,6 @@ package com.xzzn.framework.manager; +import com.xzzn.ems.service.IEmsAlarmRecordsService; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Autowired; @@ -16,6 +17,7 @@ import java.util.concurrent.ConcurrentHashMap; public class MqttLifecycleManager implements ApplicationRunner, SmartLifecycle, MqttCallback { private final MqttConnectOptions connectOptions; + private final IEmsAlarmRecordsService iEmsAlarmRecordsService; private MqttClient mqttClient; private volatile boolean running = false; @@ -23,8 +25,9 @@ public class MqttLifecycleManager implements ApplicationRunner, SmartLifecycle, private final ConcurrentHashMap subscriptions = new ConcurrentHashMap<>(); @Autowired - public MqttLifecycleManager(MqttConnectOptions connectOptions) { + public MqttLifecycleManager(MqttConnectOptions connectOptions, IEmsAlarmRecordsService iEmsAlarmRecordsService) { this.connectOptions = connectOptions; + this.iEmsAlarmRecordsService = iEmsAlarmRecordsService; } // Spring Boot 启动完成后执行 @@ -107,7 +110,11 @@ public class MqttLifecycleManager implements ApplicationRunner, SmartLifecycle, subscriptions.put(topic, new SubscriptionInfo(listener, qos)); } catch (MqttException e) { System.err.println("Subscribe failed: " + e.getMessage()); + // 订阅失败-增加告警 + iEmsAlarmRecordsService.addSubFailedAlarmRecord(topic); } + // 订阅成功了-校验是否存在未处理或者处理中的订阅失败信息 + iEmsAlarmRecordsService.checkFailedRecord(topic); } // 发布方法 diff --git a/ems-system/src/main/java/com/xzzn/ems/domain/EmsAlarmRecords.java b/ems-system/src/main/java/com/xzzn/ems/domain/EmsAlarmRecords.java index 258d3b5..72ec48a 100644 --- a/ems-system/src/main/java/com/xzzn/ems/domain/EmsAlarmRecords.java +++ b/ems-system/src/main/java/com/xzzn/ems/domain/EmsAlarmRecords.java @@ -54,10 +54,6 @@ public class EmsAlarmRecords extends BaseEntity @Excel(name = "设备唯一标识符") private String deviceId; - /** 设备名称,用于标识设备 */ - @Excel(name = "设备名称,用于标识设备") - private String deviceName; - /** 工单号(规则:T+日期+6位随机) */ @Excel(name = "工单号", readConverterExp = "规=则:T+日期+6位随机") private String ticketNo; @@ -152,16 +148,6 @@ public class EmsAlarmRecords extends BaseEntity return deviceId; } - public void setDeviceName(String deviceName) - { - this.deviceName = deviceName; - } - - public String getDeviceName() - { - return deviceName; - } - public String getTicketNo() { return ticketNo; } @@ -187,7 +173,6 @@ public class EmsAlarmRecords extends BaseEntity .append("remark", getRemark()) .append("siteId", getSiteId()) .append("deviceId", getDeviceId()) - .append("deviceName", getDeviceName()) .append("ticketNo", getTicketNo()) .toString(); } diff --git a/ems-system/src/main/java/com/xzzn/ems/mapper/EmsAlarmRecordsMapper.java b/ems-system/src/main/java/com/xzzn/ems/mapper/EmsAlarmRecordsMapper.java index ca0ba7b..09cdcd4 100644 --- a/ems-system/src/main/java/com/xzzn/ems/mapper/EmsAlarmRecordsMapper.java +++ b/ems-system/src/main/java/com/xzzn/ems/mapper/EmsAlarmRecordsMapper.java @@ -90,4 +90,8 @@ public interface EmsAlarmRecordsMapper // 获取站点设备告警数量 public int getDeviceAlarmNum(@Param("siteId") String siteId, @Param("deviceId") String deviceId); + + //获取未处理的订阅失败告警 + public EmsAlarmRecords getFailedRecord(@Param("siteId")String siteId, @Param("deviceId")String deviceId, + @Param("content") String content,@Param("status")String status); } diff --git a/ems-system/src/main/java/com/xzzn/ems/service/IEmsAlarmRecordsService.java b/ems-system/src/main/java/com/xzzn/ems/service/IEmsAlarmRecordsService.java index 2a29700..94c7743 100644 --- a/ems-system/src/main/java/com/xzzn/ems/service/IEmsAlarmRecordsService.java +++ b/ems-system/src/main/java/com/xzzn/ems/service/IEmsAlarmRecordsService.java @@ -76,4 +76,11 @@ public interface IEmsAlarmRecordsService * @return */ public String createTicketNo(Long id, Long userId); + // 订阅失败-增加告警 + public void addSubFailedAlarmRecord(String topic); + // 订阅成功-处理告警 + public void checkFailedRecord(String topic); + + // topic 内没有数据,按照设备维度告警 + public void addEmptyDataAlarmRecord(String siteId, String deviceId); } diff --git a/ems-system/src/main/java/com/xzzn/ems/service/impl/DDSDataProcessServiceImpl.java b/ems-system/src/main/java/com/xzzn/ems/service/impl/DDSDataProcessServiceImpl.java index 3689c55..8efb366 100644 --- a/ems-system/src/main/java/com/xzzn/ems/service/impl/DDSDataProcessServiceImpl.java +++ b/ems-system/src/main/java/com/xzzn/ems/service/impl/DDSDataProcessServiceImpl.java @@ -15,6 +15,7 @@ import com.xzzn.common.utils.StringUtils; import com.xzzn.ems.domain.*; import com.xzzn.ems.mapper.*; import com.xzzn.ems.service.IDDSDataProcessService; +import com.xzzn.ems.service.IEmsAlarmRecordsService; import com.xzzn.ems.utils.AbstractBatteryDataProcessor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -74,6 +75,8 @@ public class DDSDataProcessServiceImpl extends AbstractBatteryDataProcessor impl private EmsBatteryDataMinutesMapper emsBatteryDataMinutesMapper; @Autowired private EmsDailyChargeDataMapper emsDailyChargeDataMapper; + @Autowired + private IEmsAlarmRecordsService iEmsAlarmRecordsService; public DDSDataProcessServiceImpl(ObjectMapper objectMapper) { super(objectMapper); @@ -89,6 +92,8 @@ public class DDSDataProcessServiceImpl extends AbstractBatteryDataProcessor impl log.info("deviceId:" + deviceId); boolean isEmpty = checkJsonDataEmpty(jsonData); if (isEmpty) { + // 添加设备告警 + iEmsAlarmRecordsService.addEmptyDataAlarmRecord(SITE_ID,deviceId); return; } diff --git a/ems-system/src/main/java/com/xzzn/ems/service/impl/EmsAlarmRecordsServiceImpl.java b/ems-system/src/main/java/com/xzzn/ems/service/impl/EmsAlarmRecordsServiceImpl.java index 6e38d64..f5ecb3b 100644 --- a/ems-system/src/main/java/com/xzzn/ems/service/impl/EmsAlarmRecordsServiceImpl.java +++ b/ems-system/src/main/java/com/xzzn/ems/service/impl/EmsAlarmRecordsServiceImpl.java @@ -1,10 +1,14 @@ package com.xzzn.ems.service.impl; -import java.util.List; -import java.util.Random; +import java.util.*; +import java.util.concurrent.TimeUnit; +import com.xzzn.common.constant.RedisKeyConstants; import com.xzzn.common.core.domain.entity.SysUser; import com.xzzn.common.core.domain.model.LoginUser; +import com.xzzn.common.core.redis.RedisCache; +import com.xzzn.common.enums.AlarmLevelStatus; +import com.xzzn.common.enums.AlarmStatus; import com.xzzn.common.utils.DateUtils; import com.xzzn.common.utils.StringUtils; import com.xzzn.ems.domain.EmsTicket; @@ -36,6 +40,13 @@ public class EmsAlarmRecordsServiceImpl implements IEmsAlarmRecordsService private EmsTicketMapper emsTicketMapper; @Autowired private SysUserMapper sysUserMapper; + @Autowired + private RedisCache redisCache; + + private Map alarmMap = new HashMap() {{ + put("021_DDS_01", "PCS"); + put("021_FXX_01", "PCS01"); + }}; /** * 查询告警记录 @@ -153,6 +164,85 @@ public class EmsAlarmRecordsServiceImpl implements IEmsAlarmRecordsService return ticketNo; } + // 订阅失败-增加告警 + @Override + public void addSubFailedAlarmRecord(String topic) { + EmsAlarmRecords emsAlarmRecords = new EmsAlarmRecords(); + String siteId = ""; + String deviceId = ""; + String content = "topic订阅失败"; + String status = AlarmLevelStatus.EMERGENCY.getCode(); + + if (topic.startsWith("021_DDS")) { + siteId = "021_DDS_01"; + deviceId = alarmMap.get("021_DDS_01"); + } else if (topic.startsWith("021_FXX")) { + siteId = "021_FXX_01"; + deviceId = alarmMap.get("021_FXX_01"); + } + // 先判断是否存在未处理的订阅失败告警 + emsAlarmRecords = redisCache.getCacheObject(RedisKeyConstants.TOPIC_FAILED_ALRAM_RECORD + siteId + "_" + deviceId); + if (emsAlarmRecords != null) { + return; + } + emsAlarmRecords = createAlarmAtPcs(siteId,deviceId,content,status); + emsAlarmRecordsMapper.insertEmsAlarmRecords(emsAlarmRecords); + + // 存redis便于订阅成功后处理 + redisCache.setCacheObject(RedisKeyConstants.TOPIC_FAILED_ALRAM_RECORD + siteId + "_" + deviceId, emsAlarmRecords); + } + + @Override + public void checkFailedRecord(String topic) { + String siteId = ""; + String deviceId = ""; + + if (topic.startsWith("021_DDS")) { + siteId = "021_DDS_01"; + deviceId = alarmMap.get("021_DDS_01"); + } else if (topic.startsWith("021_FXX")) { + siteId = "021_FXX_01"; + deviceId = alarmMap.get("021_FXX_01"); + } + EmsAlarmRecords emsAlarmRecords = redisCache.getCacheObject(RedisKeyConstants.TOPIC_FAILED_ALRAM_RECORD + siteId + "_" + deviceId); + if (emsAlarmRecords != null) { + // 存在更新为已处理,并清除redis + emsAlarmRecords.setStatus(AlarmStatus.DONE.getCode()); + emsAlarmRecords.setAlarmEndTime(new Date()); + emsAlarmRecords.setUpdateTime(DateUtils.getNowDate()); + emsAlarmRecords.setUpdateBy("system"); + emsAlarmRecordsMapper.updateEmsAlarmRecords(emsAlarmRecords); + } + } + + @Override + public void addEmptyDataAlarmRecord(String siteId, String topicDevice) { + EmsAlarmRecords emsAlarmRecords = redisCache.getCacheObject(RedisKeyConstants.TOPIC_EMPTY_ALARM_RECORD + siteId + "_" + topicDevice); + if (emsAlarmRecords != null) { + return; + } + emsAlarmRecords = createAlarmAtPcs(siteId,topicDevice,"topic内没有数据",AlarmLevelStatus.EMERGENCY.getCode()); + emsAlarmRecordsMapper.insertEmsAlarmRecords(emsAlarmRecords); + + // 存redis-防止重复插入-有效期一天 + redisCache.setCacheObject(RedisKeyConstants.TOPIC_EMPTY_ALARM_RECORD + siteId + "_" + topicDevice, emsAlarmRecords,1, TimeUnit.DAYS); + } + + + private EmsAlarmRecords createAlarmAtPcs(String siteId, String deviceId,String content,String level) { + EmsAlarmRecords emsAlarmRecords = new EmsAlarmRecords(); + emsAlarmRecords.setSiteId(siteId); + emsAlarmRecords.setDeviceId(deviceId); + emsAlarmRecords.setAlarmContent(content); + emsAlarmRecords.setAlarmLevel(level); + emsAlarmRecords.setAlarmStartTime(new Date()); + emsAlarmRecords.setStatus(AlarmStatus.WAITING.getCode()); + emsAlarmRecords.setDeviceType("TCP"); + emsAlarmRecords.setCreateBy("system"); + emsAlarmRecords.setCreateTime(new Date()); + return emsAlarmRecords; + } + private String createRandomTicketNo() { String ticketNo = ""; String nowDate = DateUtils.dateTime(); diff --git a/ems-system/src/main/java/com/xzzn/ems/service/impl/FXXDataProcessServiceImpl.java b/ems-system/src/main/java/com/xzzn/ems/service/impl/FXXDataProcessServiceImpl.java index fe2d5ac..625d0cc 100644 --- a/ems-system/src/main/java/com/xzzn/ems/service/impl/FXXDataProcessServiceImpl.java +++ b/ems-system/src/main/java/com/xzzn/ems/service/impl/FXXDataProcessServiceImpl.java @@ -14,6 +14,7 @@ import com.xzzn.common.utils.DateUtils; import com.xzzn.common.utils.StringUtils; import com.xzzn.ems.domain.*; import com.xzzn.ems.mapper.*; +import com.xzzn.ems.service.IEmsAlarmRecordsService; import com.xzzn.ems.service.IFXXDataProcessService; import com.xzzn.ems.utils.AbstractBatteryDataProcessor; import org.apache.commons.logging.Log; @@ -63,6 +64,8 @@ public class FXXDataProcessServiceImpl extends AbstractBatteryDataProcessor impl private EmsDailyChargeDataMapper emsDailyChargeDataMapper; @Autowired private EmsDhDataMapper emsDhDataMapper; + @Autowired + private IEmsAlarmRecordsService iEmsAlarmRecordsService; // 构造方法(调用父类构造) public FXXDataProcessServiceImpl(ObjectMapper objectMapper) { @@ -82,6 +85,8 @@ public class FXXDataProcessServiceImpl extends AbstractBatteryDataProcessor impl log.info("deviceId:" + deviceId); boolean isEmpty = checkJsonDataEmpty(jsonData); if (isEmpty) { + // 添加设备告警 + iEmsAlarmRecordsService.addEmptyDataAlarmRecord(SITE_ID,deviceId); return; } diff --git a/ems-system/src/main/resources/mapper/ems/EmsAlarmRecordsMapper.xml b/ems-system/src/main/resources/mapper/ems/EmsAlarmRecordsMapper.xml index e47c236..f2d5952 100644 --- a/ems-system/src/main/resources/mapper/ems/EmsAlarmRecordsMapper.xml +++ b/ems-system/src/main/resources/mapper/ems/EmsAlarmRecordsMapper.xml @@ -19,12 +19,11 @@ - - select id, device_type, alarm_level, alarm_content, alarm_start_time, alarm_end_time, status, create_by, create_time, update_by, update_time, remark, site_id, device_id, device_name, ticket_no from ems_alarm_records + select id, device_type, alarm_level, alarm_content, alarm_start_time, alarm_end_time, status, create_by, create_time, update_by, update_time, remark, site_id, device_id, ticket_no from ems_alarm_records @@ -64,7 +62,6 @@ remark, site_id, device_id, - device_name, ticket_no, @@ -81,7 +78,6 @@ #{remark}, #{siteId}, #{deviceId}, - #{deviceName}, #{ticketNo}, @@ -102,7 +98,6 @@ remark = #{remark}, site_id = #{siteId}, device_id = #{deviceId}, - device_name = #{deviceName}, ticket_no = #{ticketNo}, where id = #{id} @@ -120,13 +115,13 @@ select COUNT(*) as alarmNum from ems_alarm_records where site_id = #{siteId} and device_id = #{deviceId} + and status 1 + + + \ No newline at end of file