From 18ff0100b9a6af99741fd68ce0b1ab804b1d256c Mon Sep 17 00:00:00 2001 From: zq Date: Wed, 24 Dec 2025 16:19:04 +0800 Subject: [PATCH] =?UTF-8?q?modbus=E8=AF=BB=E5=8F=96=E6=9C=AC=E5=9C=B0?= =?UTF-8?q?=E8=AE=BE=E5=A4=87=E6=95=B0=E6=8D=AE=E5=A2=9E=E5=8A=A0=E5=91=8A?= =?UTF-8?q?=E8=AD=A6=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/constant/RedisKeyConstants.java | 4 + .../common/core/modbus/ModbusProcessor.java | 18 ++- .../com/xzzn/quartz/task/ModbusPoller.java | 136 ++++++++++++------ .../java/com/xzzn/quartz/util/CronUtils.java | 21 +++ .../ems/service/IEmsAlarmRecordsService.java | 5 +- .../impl/DeviceDataProcessServiceImpl.java | 46 +++++- .../impl/EmsAlarmRecordsServiceImpl.java | 32 ++++- 7 files changed, 201 insertions(+), 61 deletions(-) diff --git a/ems-common/src/main/java/com/xzzn/common/constant/RedisKeyConstants.java b/ems-common/src/main/java/com/xzzn/common/constant/RedisKeyConstants.java index 6d124f9..82c0a75 100644 --- a/ems-common/src/main/java/com/xzzn/common/constant/RedisKeyConstants.java +++ b/ems-common/src/main/java/com/xzzn/common/constant/RedisKeyConstants.java @@ -96,6 +96,10 @@ public class RedisKeyConstants public static final String TOPIC_FAILED_ALRAM_RECORD = "topic_failed_"; /** topic 内没有数据设备维度告警 */ public static final String TOPIC_EMPTY_ALARM_RECORD = "topic_empty_"; + /** modbus读取 没有数据设备维度告警 */ + public static final String MODBUS_EMPTY_ALARM_RECORD = "modbus_empty_"; + /** modbus读取 设备离线告警 */ + public static final String MODBUS_OFFLINE_ALARM_RECORD = "modbus_offline_"; /** 设备信息初始化 */ public static final String INIT_DEVICE_INFO = "init_device_info"; diff --git a/ems-common/src/main/java/com/xzzn/common/core/modbus/ModbusProcessor.java b/ems-common/src/main/java/com/xzzn/common/core/modbus/ModbusProcessor.java index f68610b..4cf3c4d 100644 --- a/ems-common/src/main/java/com/xzzn/common/core/modbus/ModbusProcessor.java +++ b/ems-common/src/main/java/com/xzzn/common/core/modbus/ModbusProcessor.java @@ -170,14 +170,20 @@ public class ModbusProcessor { } } + public ModbusMaster borrowMaster(DeviceConfig config) throws Exception { + ModbusMaster master = connectionManager.borrowMaster(config); + // 设置了Modbus通信的超时时间为3000毫秒(3秒)。当主设备与从设备通信时,若在3秒内未收到响应,则认为通信超时并抛出异常。这有助于避免长时间等待无响应的设备。 + master.setTimeout(5000); + return master; + } public Map readDataFromDevice(DeviceConfig config, ModbusMaster master) { Map deviceData = new HashMap<>(); // ModbusMaster master = null; // 将master的声明提前 try { - master = connectionManager.borrowMaster(config); +// master = connectionManager.borrowMaster(config); // 设置了Modbus通信的超时时间为3000毫秒(3秒)。当主设备与从设备通信时,若在3秒内未收到响应,则认为通信超时并抛出异常。这有助于避免长时间等待无响应的设备。 - master.setTimeout(5000); +// master.setTimeout(5000); BatchResults results = readTagValues(master, config.getSlaveId(), config.getTags()); for (TagConfig tag : config.getTags()) { if (Objects.equals(tag.getDataType(), "FOUR_BYTE_FLOAT_DBCA")){ @@ -199,7 +205,7 @@ public class ModbusProcessor { // } } } catch (Exception e) { - logger.error("Failed to borrow connection or read from devices '{}'", config.getDeviceName(), e); + logger.error("Failed read from devices '{}'", config.getDeviceName(), e); } finally { // 关键:无论成功与否,都必须将连接归还到池中 @@ -207,8 +213,8 @@ public class ModbusProcessor { connectionManager.returnMaster(config, master); } } - String deviceNumber = config.getDeviceNumber(); - redisCache.setCacheObject(deviceNumber, deviceData); +// String deviceNumber = config.getDeviceNumber(); +// redisCache.setCacheObject(deviceNumber, deviceData); return deviceData; } @@ -341,7 +347,7 @@ public class ModbusProcessor { } } return results; - }catch (Exception e){ + } catch (Exception e){ logger.error("Failed to read master '{}'", slaveId, e); throw new Exception(e); } diff --git a/ems-quartz/src/main/java/com/xzzn/quartz/task/ModbusPoller.java b/ems-quartz/src/main/java/com/xzzn/quartz/task/ModbusPoller.java index 83d9abc..6e54173 100644 --- a/ems-quartz/src/main/java/com/xzzn/quartz/task/ModbusPoller.java +++ b/ems-quartz/src/main/java/com/xzzn/quartz/task/ModbusPoller.java @@ -8,12 +8,18 @@ import com.xzzn.common.constant.RedisKeyConstants; import com.xzzn.common.core.modbus.ModbusProcessor; import com.xzzn.common.core.modbus.domain.DeviceConfig; import com.xzzn.common.core.redis.RedisCache; +import com.xzzn.common.enums.DeviceRunningStatus; import com.xzzn.common.utils.DateUtils; +import com.xzzn.ems.domain.EmsDevicesSetting; +import com.xzzn.ems.mapper.EmsDevicesSettingMapper; import com.xzzn.ems.service.IEmsAlarmRecordsService; import com.xzzn.ems.service.impl.DeviceDataProcessServiceImpl; import com.xzzn.framework.manager.MqttLifecycleManager; import com.xzzn.framework.web.service.MqttPublisher; import com.xzzn.quartz.config.ScheduledTask; +import com.xzzn.quartz.domain.SysJob; +import com.xzzn.quartz.service.ISysJobService; +import com.xzzn.quartz.util.CronUtils; import java.io.IOException; import java.nio.file.Files; @@ -24,6 +30,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -44,14 +52,19 @@ public class ModbusPoller { private final MqttLifecycleManager mqttLifecycleManager; private final ScheduledTask scheduledTask; private final ObjectMapper objectMapper = new ObjectMapper(); + private final Map deviceFailureCounts = new ConcurrentHashMap<>(); @Autowired private ModbusProcessor modbusProcessor; @Autowired private IEmsAlarmRecordsService iEmsAlarmRecordsService; @Autowired + private ISysJobService iSysJobService; + @Autowired private DeviceDataProcessServiceImpl deviceDataProcessServiceImpl; @Autowired + private EmsDevicesSettingMapper emsDevicesSettingMapper; + @Autowired private RedisCache redisCache; @Autowired private MqttPublisher mqttPublisher; @@ -101,16 +114,17 @@ public class ModbusPoller { } } + int interval = getScheduledTaskInterval(); // 为每个 host:port 启动一个任务 for (Map.Entry> entry : groupedConfigs.entrySet()) { String groupKey = entry.getKey(); List configs = entry.getValue(); // 取其中一个配置的时间间隔作为该组任务的执行周期 - long interval = configs.get(0).getTime(); +// long interval = configs.get(0).getTime(); scheduledTask.startTask(groupKey, () -> { for (DeviceConfig config : configs) { try { - scheduledStart(config, null); + scheduledStart(config); } catch (Exception e) { log.error("采集设备数据异常: {}", config.getDeviceName(), e); } @@ -119,52 +133,72 @@ public class ModbusPoller { } } - public void scheduledStart(DeviceConfig config, ModbusMaster master) { - try { - if (config.isEnabled()) { - log.info("Reading data from devices: {}", config.getDeviceName()); - Map data = modbusProcessor.readDataFromDevice(config, master); - // 在这里处理采集到的数据 - config.getTags().forEach(tag -> { - Object rawValue = data.get(tag.getKey()); - if (rawValue != null) { - float value = 0; - if (rawValue instanceof Number) { - value = ((Number) rawValue).floatValue(); // 安全地转换为 float - } else { - log.error("tag:{},无法将数据转换为数字: {}", tag.getKey(), rawValue); - } - value = tag.getA() * value * value + tag.getK() * value + tag.getB(); - - int intValue = (int) value; - if (tag.getBit() != null) { - log.info("tag:{},bit:{},value:{}", tag.getKey(), tag.getBit(), value); - String binary = Integer.toBinaryString(intValue); - data.put(tag.getKey(), binary); - } else { - data.put(tag.getKey(), value); - } - } else { - data.put(tag.getKey(), rawValue); - log.warn("tag:{},数据为空: {}", tag.getKey(), rawValue); - } - }); - log.info("Data from {}: {}", config.getDeviceName(), data); - String deviceNumber = config.getDeviceNumber(); - //处理数据并发送MQTT消息、保存Redis数据和数据入库 - processingData(data, deviceNumber); + public void scheduledStart(DeviceConfig config) { + if (config.isEnabled()) { + log.info("Reading data from devices: {}", config.getDeviceName()); + ModbusMaster master = null; + try { + master = modbusProcessor.borrowMaster(config); + } catch (Exception e) { + log.error("Failed to borrow connection '{}'", config.getDeviceName(), e); + // 处理设备连接失败的情况,更新设备状态为离线,添加报警记录 + addDeviceOfflineRecord(siteId, config.getDeviceNumber()); + return; } - } catch (Exception e) { - log.error("设备数据采集异常: {}", config.getDeviceName(), e); + Map data = modbusProcessor.readDataFromDevice(config, master); + // 在这里处理采集到的数据 + config.getTags().forEach(tag -> { + Object rawValue = data.get(tag.getKey()); + if (rawValue != null) { + float value = 0; + if (rawValue instanceof Number) { + value = ((Number) rawValue).floatValue(); // 安全地转换为 float + } else { + log.error("tag:{},无法将数据转换为数字: {}", tag.getKey(), rawValue); + } + value = tag.getA() * value * value + tag.getK() * value + tag.getB(); + + int intValue = (int) value; + if (tag.getBit() != null) { + log.info("tag:{},bit:{},value:{}", tag.getKey(), tag.getBit(), value); + String binary = Integer.toBinaryString(intValue); + data.put(tag.getKey(), binary); + } else { + data.put(tag.getKey(), value); + } + } else { + data.put(tag.getKey(), rawValue); + log.warn("tag:{},数据为空: {}", tag.getKey(), rawValue); + } + }); + log.info("Data from {}: {}", config.getDeviceName(), data); + String deviceNumber = config.getDeviceNumber(); + //处理数据并发送MQTT消息、保存Redis数据和数据入库 + processingData(data, deviceNumber); } } private void processingData(Map data, String deviceNumber) { if (data == null || data.size() == 0) { - // 添加设备告警 - iEmsAlarmRecordsService.addEmptyDataAlarmRecord(siteId, deviceNumber); + // 增加失败计数 + int failureCount = deviceFailureCounts.getOrDefault(deviceNumber, 0) + 1; + deviceFailureCounts.put(deviceNumber, failureCount); + + log.warn("设备 {} 数据读取失败,当前连续失败次数: {}", deviceNumber, failureCount); + + // 连续6次失败触发报警 + if (failureCount >= 6) { + addDeviceOfflineRecord(siteId, deviceNumber); + log.error("设备 {} 连续 {} 次未读取到数据,触发报警", deviceNumber, failureCount); + } return; } + + // 数据读取成功,重置计数器 + deviceFailureCounts.remove(deviceNumber); + updateDeviceStatus(siteId, deviceNumber, DeviceRunningStatus.RUNNING.getCode()); + + // 发送MQTT消息、保存Redis数据和数据入库 Long timestamp = System.currentTimeMillis(); JSONObject json = new JSONObject(); json.put("Data", data); @@ -201,4 +235,26 @@ public class ModbusPoller { deviceDataProcessServiceImpl.processingDeviceData(siteId, deviceNumber, JSON.toJSONString(data), DateUtils.convertUpdateTime(timestamp)); } + //处理设备连接失败的情况,更新设备状态为离线,添加报警记录 + private void addDeviceOfflineRecord(String siteId, String deviceNumber) { + updateDeviceStatus(siteId, deviceNumber, DeviceRunningStatus.OFFLINE.getCode()); + iEmsAlarmRecordsService.addDeviceOfflineRecord(siteId, deviceNumber); + } + + // 更新设备状态为在线或离线 + private void updateDeviceStatus(String siteId, String deviceNumber, String deviceStatus) { + EmsDevicesSetting emsDevicesSetting = emsDevicesSettingMapper.getDeviceBySiteAndDeviceId(deviceNumber, siteId); + if (emsDevicesSetting != null && !Objects.equals(emsDevicesSetting.getDeviceStatus(), deviceStatus)) { + emsDevicesSetting.setDeviceStatus(deviceStatus); + emsDevicesSettingMapper.updateEmsDevicesSetting(emsDevicesSetting); + } + } + + private int getScheduledTaskInterval() { + SysJob query = new SysJob(); + query.setInvokeTarget("modbusPoller.pollAllDevices"); + List sysJobs = iSysJobService.selectJobList(query); + return Math.toIntExact(CronUtils.getNextExecutionIntervalMillis(sysJobs.get(0).getCronExpression())); + } + } \ No newline at end of file diff --git a/ems-quartz/src/main/java/com/xzzn/quartz/util/CronUtils.java b/ems-quartz/src/main/java/com/xzzn/quartz/util/CronUtils.java index 052ed37..10b7764 100644 --- a/ems-quartz/src/main/java/com/xzzn/quartz/util/CronUtils.java +++ b/ems-quartz/src/main/java/com/xzzn/quartz/util/CronUtils.java @@ -60,4 +60,25 @@ public class CronUtils throw new IllegalArgumentException(e.getMessage()); } } + + /** + * 返回当前时间到下一次执行时间间隔的毫秒 + */ + public static long getNextExecutionIntervalMillis(String cronExpression) + { + try + { + CronExpression cron = new CronExpression(cronExpression); + Date now = new Date(); + Date nextExecution = cron.getNextValidTimeAfter(now); + Date nextExecution2 = cron.getNextValidTimeAfter(nextExecution); + + return nextExecution2.getTime() - nextExecution.getTime(); + } + catch (ParseException e) + { + throw new IllegalArgumentException(e.getMessage()); + } + } + } diff --git a/ems-system/src/main/java/com/xzzn/ems/service/IEmsAlarmRecordsService.java b/ems-system/src/main/java/com/xzzn/ems/service/IEmsAlarmRecordsService.java index 38e66bf..b178467 100644 --- a/ems-system/src/main/java/com/xzzn/ems/service/IEmsAlarmRecordsService.java +++ b/ems-system/src/main/java/com/xzzn/ems/service/IEmsAlarmRecordsService.java @@ -1,10 +1,11 @@ package com.xzzn.ems.service; -import java.util.List; import com.xzzn.ems.domain.EmsAlarmRecords; import com.xzzn.ems.domain.vo.AlarmRecordListRequestVo; import com.xzzn.ems.domain.vo.AlarmRecordListResponseVo; +import java.util.List; + /** * 告警记录Service接口 * @@ -98,4 +99,6 @@ public interface IEmsAlarmRecordsService // 处理本地端同步的保护策略告警信息 public void dealSyncData(String content, String operateType); + + public void addDeviceOfflineRecord(String siteId, String deviceNumber); } diff --git a/ems-system/src/main/java/com/xzzn/ems/service/impl/DeviceDataProcessServiceImpl.java b/ems-system/src/main/java/com/xzzn/ems/service/impl/DeviceDataProcessServiceImpl.java index 044e02b..0b842c8 100644 --- a/ems-system/src/main/java/com/xzzn/ems/service/impl/DeviceDataProcessServiceImpl.java +++ b/ems-system/src/main/java/com/xzzn/ems/service/impl/DeviceDataProcessServiceImpl.java @@ -31,7 +31,25 @@ import com.xzzn.ems.domain.*; import com.xzzn.ems.domain.vo.EnergyPriceTimeRange; import com.xzzn.ems.domain.vo.EnergyPriceVo; import com.xzzn.ems.enums.DeviceMatchTable; -import com.xzzn.ems.mapper.*; +import com.xzzn.ems.mapper.EmsAmmeterDataMapper; +import com.xzzn.ems.mapper.EmsBatteryClusterMapper; +import com.xzzn.ems.mapper.EmsBatteryDataMapper; +import com.xzzn.ems.mapper.EmsBatteryDataMinutesMapper; +import com.xzzn.ems.mapper.EmsBatteryGroupMapper; +import com.xzzn.ems.mapper.EmsBatteryStackMapper; +import com.xzzn.ems.mapper.EmsClusterAlarmDataMapper; +import com.xzzn.ems.mapper.EmsCoolingAlarmDataMapper; +import com.xzzn.ems.mapper.EmsCoolingDataMapper; +import com.xzzn.ems.mapper.EmsDailyChargeDataMapper; +import com.xzzn.ems.mapper.EmsDailyEnergyDataMapper; +import com.xzzn.ems.mapper.EmsDevicesSettingMapper; +import com.xzzn.ems.mapper.EmsDhDataMapper; +import com.xzzn.ems.mapper.EmsEmsDataMapper; +import com.xzzn.ems.mapper.EmsPcsAlarmDataMapper; +import com.xzzn.ems.mapper.EmsPcsBranchDataMapper; +import com.xzzn.ems.mapper.EmsPcsDataMapper; +import com.xzzn.ems.mapper.EmsStackAlarmDataMapper; +import com.xzzn.ems.mapper.EmsXfDataMapper; import com.xzzn.ems.service.IDeviceDataProcessService; import com.xzzn.ems.service.IEmsAlarmRecordsService; import com.xzzn.ems.service.IEmsDeviceSettingService; @@ -557,21 +575,31 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i } } + /** + * + * @param pointMatchList 字段匹配规则列表 + * @param obj 原始数据Map + * @param entity 目标Java对象 + * @param pointEnumMatchMap 枚举匹配规则 + */ private void saveDeviceData(List pointMatchList, Map obj, Object entity, Map> pointEnumMatchMap) { Map pointMatchMap = pointMatchList.stream() .collect(Collectors.toMap( - data -> StringUtils.toCamelCase(data.getMatchField()), - EmsPointMatch::getDataPoint, - (existing, replacement) -> replacement)); + data -> StringUtils.toCamelCase(data.getMatchField()), // 源字段名转为驼峰 + EmsPointMatch::getDataPoint, // 获取目标点位名 + (existing, replacement) -> replacement)); // 处理重复键 Field[] fields = entity.getClass().getDeclaredFields(); for (Field field : fields) { String fieldName = field.getName(); if (pointMatchMap.containsKey(fieldName)) { - field.setAccessible(true); + // 处理匹配的字段名,并设置值 + field.setAccessible(true); // 允许访问私有字段 try { + // 1. 从原始数据中获取匹配值 Object matchValue = obj.get(pointMatchMap.get(fieldName)); - //匹配枚举值转换 + + // 2. 处理枚举值转换 List pointEnumMatchList = pointEnumMatchMap.get(fieldName); if (CollectionUtils.isNotEmpty(pointEnumMatchList)) { Object finalMatchValue = matchValue; @@ -581,6 +609,8 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i matchValue = enumMatch.get().getEnumCode(); } } + + // 3. 类型转换 Class fieldType = field.getType(); if (String.class.equals(fieldType)) { matchValue = StringUtils.getString(matchValue); @@ -591,6 +621,8 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i } else if (Integer.class.equals(fieldType)) { matchValue = MapUtils.getInteger(obj, pointMatchMap.get(fieldName)); } + + // 4. 设置字段值 field.set(entity, matchValue); } catch (IllegalAccessException e) { log.warn("deviceDataProcess 设置字段值时出错", e); @@ -1001,7 +1033,7 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i if (!deviceId.contains(SiteDevice.METEGF.name()) && !deviceId.contains(SiteDevice.METE0.name())) { // 处理储能电表-METE每日充放电数据 - EmsAmmeterData yestData = dealDailyChargeDate(siteId, deviceId, dataMete); + dealDailyChargeDate(siteId, deviceId, dataMete); // 处理储能电表-METE每日数据(尖、峰、平、谷差值) // if (SiteEnum.FX.getCode().equals(siteId)) { diff --git a/ems-system/src/main/java/com/xzzn/ems/service/impl/EmsAlarmRecordsServiceImpl.java b/ems-system/src/main/java/com/xzzn/ems/service/impl/EmsAlarmRecordsServiceImpl.java index 0d37974..4fcedad 100644 --- a/ems-system/src/main/java/com/xzzn/ems/service/impl/EmsAlarmRecordsServiceImpl.java +++ b/ems-system/src/main/java/com/xzzn/ems/service/impl/EmsAlarmRecordsServiceImpl.java @@ -1,9 +1,5 @@ package com.xzzn.ems.service.impl; -import java.util.*; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - import com.alibaba.fastjson2.JSON; import com.xzzn.common.constant.RedisKeyConstants; import com.xzzn.common.core.domain.entity.SysUser; @@ -14,17 +10,26 @@ import com.xzzn.common.enums.TicketStatus; import com.xzzn.common.utils.DateUtils; import com.xzzn.common.utils.StringUtils; import com.xzzn.ems.domain.EmsAlarmMatchData; +import com.xzzn.ems.domain.EmsAlarmRecords; import com.xzzn.ems.domain.EmsTicket; import com.xzzn.ems.domain.vo.AlarmRecordListRequestVo; import com.xzzn.ems.domain.vo.AlarmRecordListResponseVo; import com.xzzn.ems.mapper.EmsAlarmMatchDataMapper; +import com.xzzn.ems.mapper.EmsAlarmRecordsMapper; import com.xzzn.ems.mapper.EmsTicketMapper; +import com.xzzn.ems.service.IEmsAlarmRecordsService; import com.xzzn.system.mapper.SysUserMapper; + +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import com.xzzn.ems.mapper.EmsAlarmRecordsMapper; -import com.xzzn.ems.domain.EmsAlarmRecords; -import com.xzzn.ems.service.IEmsAlarmRecordsService; /** * 告警记录Service业务层处理 @@ -308,5 +313,18 @@ public class EmsAlarmRecordsServiceImpl implements IEmsAlarmRecordsService } } + @Override + public void addDeviceOfflineRecord(String siteId, String deviceId) { + EmsAlarmRecords emsAlarmRecords = redisCache.getCacheObject(RedisKeyConstants.MODBUS_OFFLINE_ALARM_RECORD + siteId + "_" + deviceId); + if (emsAlarmRecords != null) { + return; + } + emsAlarmRecords = createAlarmAtPcs(siteId, deviceId,"modbus连接设备失败", AlarmLevelStatus.EMERGENCY.getCode()); + emsAlarmRecordsMapper.insertEmsAlarmRecords(emsAlarmRecords); + + // 存redis-防止重复插入-有效期一天 + redisCache.setCacheObject(RedisKeyConstants.MODBUS_OFFLINE_ALARM_RECORD + siteId + "_" + deviceId, emsAlarmRecords,1, TimeUnit.DAYS); + } + }