奉贤 alarm topic接入

This commit is contained in:
2025-09-23 13:59:45 +08:00
parent a5f1444984
commit 8e8c57cb64
14 changed files with 593 additions and 15 deletions

View File

@ -0,0 +1,117 @@
package com.xzzn.ems.domain;
import com.xzzn.common.core.domain.BaseEntity;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import com.xzzn.common.annotation.Excel;
/**
* 告警点位匹配数据对象 ems_alarm_match_data
*
* @author xzzn
* @date 2025-09-22
*/
public class EmsAlarmMatchData extends BaseEntity
{
private static final long serialVersionUID = 1L;
/** $column.columnComment */
private Long id;
/** 告警点位 */
@Excel(name = "告警点位")
private String point;
/** 告警值 */
@Excel(name = "告警值")
private Long alarmData;
/** 告警描述 */
@Excel(name = "告警描述")
private String alarmDescription;
/** 站点id */
@Excel(name = "站点id")
private String siteId;
/** 设备类型 */
@Excel(name = "设备类型")
private String deviceCategory;
public void setId(Long id)
{
this.id = id;
}
public Long getId()
{
return id;
}
public void setPoint(String point)
{
this.point = point;
}
public String getPoint()
{
return point;
}
public void setAlarmData(Long alarmData)
{
this.alarmData = alarmData;
}
public Long getAlarmData()
{
return alarmData;
}
public void setAlarmDescription(String alarmDescription)
{
this.alarmDescription = alarmDescription;
}
public String getAlarmDescription()
{
return alarmDescription;
}
public void setSiteId(String siteId)
{
this.siteId = siteId;
}
public String getSiteId()
{
return siteId;
}
public void setDeviceCategory(String deviceCategory)
{
this.deviceCategory = deviceCategory;
}
public String getDeviceCategory()
{
return deviceCategory;
}
@Override
public String toString() {
return new ToStringBuilder(this,ToStringStyle.MULTI_LINE_STYLE)
.append("id", getId())
.append("point", getPoint())
.append("alarmData", getAlarmData())
.append("alarmDescription", getAlarmDescription())
.append("siteId", getSiteId())
.append("deviceCategory", getDeviceCategory())
.append("createBy", getCreateBy())
.append("createTime", getCreateTime())
.append("updateBy", getUpdateBy())
.append("updateTime", getUpdateTime())
.append("remark", getRemark())
.toString();
}
}

View File

@ -24,8 +24,8 @@ public class EmsAlarmRecords extends BaseEntity
@Excel(name = "设备类型")
private String deviceType;
/** 告警等级 */
@Excel(name = "告警等级")
/** 告警等级A-提示 B-一般 C-严重 D紧急 */
@Excel(name = "告警等级A-提示 B-一般 C-严重 D紧急")
private String alarmLevel;
/** 告警内容 */
@ -42,8 +42,12 @@ public class EmsAlarmRecords extends BaseEntity
@Excel(name = "告警结束时间", width = 30, dateFormat = "yyyy-MM-dd")
private Date alarmEndTime;
/** 状态 */
@Excel(name = "状态")
/** 告警点位 */
@Excel(name = "告警点位")
private String alarmPoint;
/** 状态:0-待处理 1-已处理 2-处理中 */
@Excel(name = "状态:0-待处理 1-已处理 2-处理中")
private String status;
/** 站点id */
@ -118,6 +122,16 @@ public class EmsAlarmRecords extends BaseEntity
return alarmEndTime;
}
public void setAlarmPoint(String alarmPoint)
{
this.alarmPoint = alarmPoint;
}
public String getAlarmPoint()
{
return alarmPoint;
}
public void setStatus(String status)
{
this.status = status;
@ -148,12 +162,14 @@ public class EmsAlarmRecords extends BaseEntity
return deviceId;
}
public String getTicketNo() {
return ticketNo;
public void setTicketNo(String ticketNo)
{
this.ticketNo = ticketNo;
}
public void setTicketNo(String ticketNo) {
this.ticketNo = ticketNo;
public String getTicketNo()
{
return ticketNo;
}
@Override
@ -165,6 +181,7 @@ public class EmsAlarmRecords extends BaseEntity
.append("alarmContent", getAlarmContent())
.append("alarmStartTime", getAlarmStartTime())
.append("alarmEndTime", getAlarmEndTime())
.append("alarmPoint", getAlarmPoint())
.append("status", getStatus())
.append("createBy", getCreateBy())
.append("createTime", getCreateTime())
@ -176,4 +193,4 @@ public class EmsAlarmRecords extends BaseEntity
.append("ticketNo", getTicketNo())
.toString();
}
}
}

View File

@ -0,0 +1,63 @@
package com.xzzn.ems.mapper;
import java.util.List;
import java.util.Map;
import com.xzzn.ems.domain.EmsAlarmMatchData;
/**
* 告警点位匹配数据Mapper接口
*
* @author xzzn
* @date 2025-09-22
*/
public interface EmsAlarmMatchDataMapper
{
/**
* 查询告警点位匹配数据
*
* @param id 告警点位匹配数据主键
* @return 告警点位匹配数据
*/
public EmsAlarmMatchData selectEmsAlarmMatchDataById(Long id);
/**
* 查询告警点位匹配数据列表
*
* @param emsAlarmMatchData 告警点位匹配数据
* @return 告警点位匹配数据集合
*/
public List<EmsAlarmMatchData> selectEmsAlarmMatchDataList(EmsAlarmMatchData emsAlarmMatchData);
/**
* 新增告警点位匹配数据
*
* @param emsAlarmMatchData 告警点位匹配数据
* @return 结果
*/
public int insertEmsAlarmMatchData(EmsAlarmMatchData emsAlarmMatchData);
/**
* 修改告警点位匹配数据
*
* @param emsAlarmMatchData 告警点位匹配数据
* @return 结果
*/
public int updateEmsAlarmMatchData(EmsAlarmMatchData emsAlarmMatchData);
/**
* 删除告警点位匹配数据
*
* @param id 告警点位匹配数据主键
* @return 结果
*/
public int deleteEmsAlarmMatchDataById(Long id);
/**
* 批量删除告警点位匹配数据
*
* @param ids 需要删除的数据主键集合
* @return 结果
*/
public int deleteEmsAlarmMatchDataByIds(Long[] ids);
}

View File

@ -94,4 +94,10 @@ public interface EmsAlarmRecordsMapper
//获取未处理的订阅失败告警
public EmsAlarmRecords getFailedRecord(@Param("siteId")String siteId, @Param("deviceId")String deviceId,
@Param("content") String content,@Param("status")String status);
// 批量处理告警数据
public void batchUpsert(@Param("recordsList")List<EmsAlarmRecords> recordsList);
// 获取所有没有处理完成的告警记录
public List<EmsAlarmRecords> getAllUnfinishedRecords(@Param("needUpdateKeys")List<String> needUpdateKeys,@Param("siteId") String siteId);
}

View File

@ -83,4 +83,13 @@ public interface IEmsAlarmRecordsService
// topic 内没有数据,按照设备维度告警
public void addEmptyDataAlarmRecord(String siteId, String deviceId);
// 告警字段和告警信息
public void initAlarmMatchInfo();
// 获取所有没有处理完成的告警记录
public List<EmsAlarmRecords> getAllUnfinishedRecords(List<String> needUpdateKeys, String siteId);
// 批量处理告警数据
public void batchProcessAlarmRecords(List<EmsAlarmRecords> recordsList);
}

View File

@ -4,4 +4,5 @@ public interface IFXXDataProcessService {
public void handleFxData(String message);
public void handleFxAlarmData(String message);
}

View File

@ -2,18 +2,20 @@ package com.xzzn.ems.service.impl;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.xzzn.common.constant.RedisKeyConstants;
import com.xzzn.common.core.domain.entity.SysUser;
import com.xzzn.common.core.domain.model.LoginUser;
import com.xzzn.common.core.redis.RedisCache;
import com.xzzn.common.enums.AlarmLevelStatus;
import com.xzzn.common.enums.AlarmStatus;
import com.xzzn.common.utils.DateUtils;
import com.xzzn.common.utils.StringUtils;
import com.xzzn.ems.domain.EmsAlarmMatchData;
import com.xzzn.ems.domain.EmsTicket;
import com.xzzn.ems.domain.vo.AlarmRecordListRequestVo;
import com.xzzn.ems.domain.vo.AlarmRecordListResponseVo;
import com.xzzn.ems.mapper.EmsAlarmMatchDataMapper;
import com.xzzn.ems.mapper.EmsTicketMapper;
import com.xzzn.ems.service.IEmsTicketService;
import com.xzzn.system.mapper.SysUserMapper;
@ -35,8 +37,6 @@ public class EmsAlarmRecordsServiceImpl implements IEmsAlarmRecordsService
@Autowired
private EmsAlarmRecordsMapper emsAlarmRecordsMapper;
@Autowired
private IEmsTicketService emsTicketService;
@Autowired
private EmsTicketMapper emsTicketMapper;
@Autowired
private SysUserMapper sysUserMapper;
@ -47,6 +47,8 @@ public class EmsAlarmRecordsServiceImpl implements IEmsAlarmRecordsService
put("021_DDS_01", "PCS");
put("021_FXX_01", "PCS01");
}};
@Autowired
private EmsAlarmMatchDataMapper emsAlarmMatchDataMapper;
/**
* 查询告警记录
@ -228,7 +230,6 @@ public class EmsAlarmRecordsServiceImpl implements IEmsAlarmRecordsService
redisCache.setCacheObject(RedisKeyConstants.TOPIC_EMPTY_ALARM_RECORD + siteId + "_" + topicDevice, emsAlarmRecords,1, TimeUnit.DAYS);
}
private EmsAlarmRecords createAlarmAtPcs(String siteId, String deviceId,String content,String level) {
EmsAlarmRecords emsAlarmRecords = new EmsAlarmRecords();
emsAlarmRecords.setSiteId(siteId);
@ -251,4 +252,28 @@ public class EmsAlarmRecordsServiceImpl implements IEmsAlarmRecordsService
}
@Override
public void initAlarmMatchInfo() {
List<EmsAlarmMatchData> alarmPointList = emsAlarmMatchDataMapper.selectEmsAlarmMatchDataList(null);
Map<String, EmsAlarmMatchData> resultMap = alarmPointList.stream()
.collect(Collectors.toMap(
data->data.getDeviceCategory()+"_"+data.getPoint(),
data->data,
(existing, replacement) -> replacement
));
// 存redis
redisCache.setCacheObject(RedisKeyConstants.ALARM_MATCH_INFO,resultMap, 1, TimeUnit.DAYS);
}
@Override
public List<EmsAlarmRecords> getAllUnfinishedRecords(List<String> needUpdateKeys, String siteId) {
return emsAlarmRecordsMapper.getAllUnfinishedRecords(needUpdateKeys,siteId);
}
@Override
public void batchProcessAlarmRecords(List<EmsAlarmRecords> recordsList) {
emsAlarmRecordsMapper.batchUpsert(recordsList);
}
}

View File

@ -27,6 +27,7 @@ import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.stream.Collectors;
@Service
public class FXXDataProcessServiceImpl extends AbstractBatteryDataProcessor implements IFXXDataProcessService {
@ -767,6 +768,145 @@ public class FXXDataProcessServiceImpl extends AbstractBatteryDataProcessor impl
return records;
}
@Override
public void handleFxAlarmData(String message) {
JSONArray arraylist = JSONArray.parseArray(message);
// 获取redis缓存-告警信息
Map<String, EmsAlarmMatchData> alarmMatchInfo= redisCache.getCacheObject(RedisKeyConstants.ALARM_MATCH_INFO);
for (int i = 0; i < arraylist.size(); i++) {
JSONObject obj = JSONObject.parseObject(arraylist.get(i).toString());
String deviceId = obj.get("Device").toString();
String jsonData = obj.get("Data").toString();
log.info("deviceId:" + deviceId);
if (deviceId.contains("BMSD")) {
alarmDataProcess(deviceId, jsonData, alarmMatchInfo, DeviceCategory.STACK.getCode());
} else if (deviceId.contains("BMSC")) {
alarmDataProcess(deviceId, jsonData, alarmMatchInfo, DeviceCategory.CLUSTER.getCode());
} else if (deviceId.contains("PCS")) {
alarmDataProcess(deviceId, jsonData, alarmMatchInfo, DeviceCategory.PCS.getCode());
}
}
}
private void alarmDataProcess(String deviceId, String jsonData,
Map<String, EmsAlarmMatchData> alarmInfoData, String category) {
Map<String, Object> obj = JSON.parseObject(jsonData, new TypeReference<Map<String, Object>>() {
});
String redisKey = RedisKeyConstants.LATEST_ALARM_RECORD + "_" + SITE_ID +"_" + deviceId;
// 获取redis里面的当前有效告警遍历添加到已存在告警key里面
Map<String, Object> currentAlarm = redisCache.getCacheMap(redisKey);
final Set<String> currentAlarmKeys = new HashSet<>();
if (currentAlarm != null && !currentAlarm.isEmpty()) {
currentAlarm.keySet().stream()
.filter(Objects::nonNull)
.map(Object::toString)
.forEach(currentAlarmKeys::add);
}
// 结合同步数据,筛选簇需要更新的告警信息
List<String> needUpdateKeys = obj.entrySet().stream()
.filter(entry -> {
Object valueObj = entry.getValue();
if (valueObj == null) {
return false;
}
int value = Integer.parseInt(valueObj.toString());
return value == 0 && currentAlarmKeys.contains(entry.getKey());
})
.map(Map.Entry::getKey)
.collect(Collectors.toList());
// 批量查询数据库-需要更新的数据
Map<String, EmsAlarmRecords> needUpdateMap = new HashMap<>();
if (!needUpdateKeys.isEmpty()) {
List<EmsAlarmRecords> records = iEmsAlarmRecordsService.getAllUnfinishedRecords(needUpdateKeys,SITE_ID);
// 转为Map便于快速获取
needUpdateMap = records.stream()
.collect(Collectors.toMap(
EmsAlarmRecords::getAlarmPoint,
record -> record
));
}
List<EmsAlarmRecords> saveOrUpdateList = new ArrayList<>();
List<EmsAlarmRecords> newAddRecordList = new ArrayList<>();
List<String> toRemoveFromRedis = new ArrayList<>();
// 遍历数据map
for (Map.Entry<String, Object> entry : obj.entrySet()) {
String key = entry.getKey();
Integer value = (Integer) entry.getValue();
Boolean isCurrentAlarm = currentAlarmKeys.contains(key);
// 值为 1且不在当前告警里面 - 新增告警
if (value == 1 && !isCurrentAlarm) {
String matchRedisKey = category + "_" + key;
Object cacheObj = alarmInfoData.get(matchRedisKey);
if (cacheObj == null) {
// 处理空数据逻辑
return;
}
EmsAlarmMatchData matchInfo = JSON.toJavaObject(cacheObj, EmsAlarmMatchData.class);
EmsAlarmRecords emsAlarmRecord = convertAlarmRecord(deviceId,matchInfo);
saveOrUpdateList.add(emsAlarmRecord);
newAddRecordList.add(emsAlarmRecord);
} else if (value == 0 && isCurrentAlarm) {// 值为 0且在当前告警里面 - 更新告警已处理
EmsAlarmRecords existingAlarm = needUpdateMap.get(key);
if (existingAlarm != null) {
existingAlarm.setStatus(AlarmStatus.DONE.getCode());
existingAlarm.setUpdateTime(new Date());
existingAlarm.setAlarmEndTime(new Date());
saveOrUpdateList.add(existingAlarm);
toRemoveFromRedis.add(key);
}
}
}
// 批量处理插入和更新操作
if (saveOrUpdateList != null && saveOrUpdateList.size() > 0) {
iEmsAlarmRecordsService.batchProcessAlarmRecords(saveOrUpdateList);
}
// 已处理的从redis里面删除
if (!toRemoveFromRedis.isEmpty()) {
redisCache.deleteAllCacheMapValue(RedisKeyConstants.LATEST_ALARM_RECORD + SITE_ID,toRemoveFromRedis.toArray());
}
// 批量添加新增的告警到Redis
Map<String, EmsAlarmRecords> newAlarms = newAddRecordList.stream()
.filter(a -> !AlarmStatus.DONE.getCode().equals(a.getStatus()))
.collect(Collectors.toMap(EmsAlarmRecords::getAlarmPoint, a -> a));
if (!newAlarms.isEmpty()) {
// 本次新增的放入redis
redisCache.setAllCacheMapValue(redisKey, newAlarms);
}
}
private EmsAlarmRecords convertAlarmRecord(String deviceId, EmsAlarmMatchData matchInfo) {
EmsAlarmRecords emsAlarmRecords = new EmsAlarmRecords();
emsAlarmRecords.setSiteId(SITE_ID);
emsAlarmRecords.setDeviceId(deviceId);
emsAlarmRecords.setDeviceType(DeviceType.TCP.toString());
emsAlarmRecords.setAlarmLevel(AlarmLevelStatus.GENERAL.getCode());
emsAlarmRecords.setStatus(AlarmStatus.WAITING.getCode());
emsAlarmRecords.setAlarmStartTime(DateUtils.getNowDate());
emsAlarmRecords.setCreateTime(DateUtils.getNowDate());
emsAlarmRecords.setCreateBy("system");
emsAlarmRecords.setUpdateTime(DateUtils.getNowDate());
emsAlarmRecords.setUpdateBy("system");
if (matchInfo != null) {
emsAlarmRecords.setAlarmPoint(matchInfo.getPoint());
emsAlarmRecords.setAlarmContent(matchInfo.getAlarmDescription());
}
return emsAlarmRecords;
}
// 空数据不处理
private boolean checkJsonDataEmpty(String jsonData) {
boolean flag = false;