From a31a1a1caa77614bd5037eba81f98ab7bd4b0074 Mon Sep 17 00:00:00 2001 From: dashixiong Date: Tue, 20 Jan 2026 17:19:35 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=AD=A3=20modbus=20=E8=B6=85?= =?UTF-8?q?=E6=97=B6=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../modbus/Modbus4jConnectionManager.java | 86 ++++---- .../common/core/modbus/ModbusProcessor.java | 204 ++++++++++-------- .../com/xzzn/quartz/task/ModbusPoller.java | 122 +++++++---- .../com/xzzn/quartz/task/StrategyPoller.java | 34 +-- 4 files changed, 256 insertions(+), 190 deletions(-) diff --git a/ems-common/src/main/java/com/xzzn/common/core/modbus/Modbus4jConnectionManager.java b/ems-common/src/main/java/com/xzzn/common/core/modbus/Modbus4jConnectionManager.java index a6b626e..b4a901e 100644 --- a/ems-common/src/main/java/com/xzzn/common/core/modbus/Modbus4jConnectionManager.java +++ b/ems-common/src/main/java/com/xzzn/common/core/modbus/Modbus4jConnectionManager.java @@ -1,66 +1,62 @@ package com.xzzn.common.core.modbus; +import com.serotonin.modbus4j.ModbusFactory; import com.serotonin.modbus4j.ModbusMaster; +import com.serotonin.modbus4j.ip.IpParameters; import com.xzzn.common.core.modbus.domain.DeviceConfig; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import javax.annotation.PreDestroy; - -import org.apache.commons.pool2.impl.GenericObjectPool; -import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; +/** + * Modbus连接管理器 + * 不使用连接池,每次请求创建新连接,使用完立即销毁 + * 避免复用已被服务端断开的连接导致Connection reset + */ @Component public class Modbus4jConnectionManager { private static final Logger logger = LoggerFactory.getLogger(Modbus4jConnectionManager.class); - private final Map> connectionPools = new ConcurrentHashMap<>(); - + private final ModbusFactory modbusFactory = new ModbusFactory(); + /** + * 创建新连接 + */ public ModbusMaster borrowMaster(DeviceConfig config) throws Exception { - String poolKey = getPoolKey(config); - GenericObjectPool pool = connectionPools.computeIfAbsent(poolKey, key -> { - PooledModbusMasterFactory factory = new PooledModbusMasterFactory(config.getHost(), config.getPort()); - GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig<>(); - poolConfig.setMaxTotal(20); // 池中最大连接数 - poolConfig.setMinIdle(4); // 最小空闲连接数 - poolConfig.setTestOnBorrow(true); // 借用时测试连接有效性 - poolConfig.setTestOnReturn(true); // 归还时测试连接有效性 - poolConfig.setTestWhileIdle(true); // 空闲时测试连接有效性 - poolConfig.setMaxWaitMillis(3000); // 获取连接的最大等待时间3秒 - poolConfig.setTimeBetweenEvictionRunsMillis(30000); // 30秒检查一次空闲连接 - poolConfig.setMinEvictableIdleTimeMillis(60000); // 空闲超过60秒的连接可以被驱逐 - - return new GenericObjectPool<>(factory, poolConfig); - }); - return pool.borrowObject(); + IpParameters params = new IpParameters(); + params.setHost(config.getHost()); + params.setPort(config.getPort()); + params.setEncapsulated(false); + + ModbusMaster master = modbusFactory.createTcpMaster(params, true); + master.init(); + logger.debug("创建新Modbus连接: {}:{}", config.getHost(), config.getPort()); + return master; } - - - + /** + * 关闭连接 + */ public void returnMaster(DeviceConfig config, ModbusMaster master) { - if (master == null) { - return; - } - String poolKey = getPoolKey(config); - GenericObjectPool pool = connectionPools.get(poolKey); - if (pool != null) { - pool.returnObject(master); + destroyMaster(master, config); + } + + /** + * 废弃连接(与returnMaster相同) + */ + public void invalidateMaster(DeviceConfig config, ModbusMaster master) { + destroyMaster(master, config); + } + + private void destroyMaster(ModbusMaster master, DeviceConfig config) { + if (master != null) { + try { + master.destroy(); + logger.debug("已关闭Modbus连接: {}:{}", config.getHost(), config.getPort()); + } catch (Exception e) { + logger.warn("关闭Modbus连接异常: {}:{}", config.getHost(), config.getPort(), e); + } } } - - private String getPoolKey(DeviceConfig config) { - return config.getHost() + ":" + config.getPort(); - } - - @PreDestroy - public void shutdown() { - connectionPools.values().forEach(GenericObjectPool::close); - } - } diff --git a/ems-common/src/main/java/com/xzzn/common/core/modbus/ModbusProcessor.java b/ems-common/src/main/java/com/xzzn/common/core/modbus/ModbusProcessor.java index 7047d74..91ce835 100644 --- a/ems-common/src/main/java/com/xzzn/common/core/modbus/ModbusProcessor.java +++ b/ems-common/src/main/java/com/xzzn/common/core/modbus/ModbusProcessor.java @@ -34,6 +34,7 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import static com.xzzn.common.enums.RegisterType.COIL; @@ -43,8 +44,12 @@ import static com.xzzn.common.enums.RegisterType.DISCRETE_INPUT; public class ModbusProcessor { private static final Logger logger = LoggerFactory.getLogger(ModbusProcessor.class); - private int readTimeout = 5000; - private int writeTimeout = 3000; + @Value("${modbus.read-timeout:8000}") + private int readTimeout; + @Value("${modbus.write-timeout:5000}") + private int writeTimeout; + @Value("${modbus.read-retries:1}") + private int readRetries; @Autowired private RedisCache redisCache; @@ -54,20 +59,25 @@ public class ModbusProcessor { public boolean writeDataToDevice(DeviceConfig config) { logger.info("writeDataToDevice: {}", JSON.toJSONString(config)); ModbusMaster master = null; - boolean result; + boolean result = false; + boolean hasError = false; try { master = connectionManager.borrowMaster(config); - // 设置了Modbus通信的超时时间为3000毫秒(3秒)。当主设备与从设备通信时,若在3秒内未收到响应,则认为通信超时并抛出异常。这有助于避免长时间等待无响应的设备。 master.setTimeout(writeTimeout); result = writeTagValue(master, config, config.getWriteTags()); } catch (Exception e) { logger.error("Failed to borrow connection or write to devices '{}'", config.getDeviceName(), e); - result = false; + hasError = true; } finally { - // 关键:无论成功与否,都必须将连接归还到池中 if (master != null) { - connectionManager.returnMaster(config, master); + if (hasError) { + // 发生异常时废弃连接,下次重新创建 + connectionManager.invalidateMaster(config, master); + } else { + // 正常时归还连接 + connectionManager.returnMaster(config, master); + } } } return result; @@ -224,16 +234,14 @@ public class ModbusProcessor { ModbusMaster master = connectionManager.borrowMaster(config); // 设置了Modbus通信的超时时间为5000毫秒(5秒)。当主设备与从设备通信时,若在5秒内未收到响应,则认为通信超时并抛出异常。这有助于避免长时间等待无响应的设备。 master.setTimeout(readTimeout); + master.setRetries(readRetries); return master; } public Map readDataFromDevice(DeviceConfig config, ModbusMaster master) { Map deviceData = new HashMap<>(); -// ModbusMaster master = null; // 将master的声明提前 + boolean hasError = false; try { -// master = connectionManager.borrowMaster(config); -// 设置了Modbus通信的超时时间为3000毫秒(3秒)。当主设备与从设备通信时,若在3秒内未收到响应,则认为通信超时并抛出异常。这有助于避免长时间等待无响应的设备。 -// master.setTimeout(5000); BatchResults results = readTagValues(master, config.getSlaveId(), config.getTags()); for (TagConfig tag : config.getTags()) { if (Objects.equals(tag.getDataType(), "FOUR_BYTE_FLOAT_DBCA")){ @@ -243,28 +251,22 @@ public class ModbusProcessor { }else { deviceData.put(tag.getKey(), results.getValue(tag.getKey())); } - - -// try { -// Object value = readTagValue(master, config.getSlaveId(), tag); -// if (value != null) { -// deviceData.put(tag.getKey(), value); -// } -// } catch (Exception e) { -// logger.error("Failed to read tag '{}' from devices '{}'", tag.getKey(), config.getDeviceName(), e); -// } } } catch (Exception e) { logger.error("Failed read from devices '{}'", config.getDeviceName(), e); + hasError = true; } finally { - // 关键:无论成功与否,都必须将连接归还到池中 if (master != null) { - connectionManager.returnMaster(config, master); + if (hasError) { + // 发生异常时废弃连接,下次重新创建 + connectionManager.invalidateMaster(config, master); + } else { + // 正常时归还连接 + connectionManager.returnMaster(config, master); + } } } -// String deviceNumber = config.getDeviceNumber(); -// redisCache.setCacheObject(deviceNumber, deviceData); return deviceData; } @@ -340,77 +342,101 @@ public class ModbusProcessor { private BatchResults readTagValues(ModbusMaster master, int slaveId, List tags) throws Exception { try { - BatchRead batch = new BatchRead<>(); - tags.forEach(tag -> { - Map type = ModBusType.REGISTER_TYPE; - Map DATA_LENGTH = ModBusType.LENGTH; - int firstDigit = Integer.parseInt(tag.getAddress().substring(0, 1)); - int address = 0; - int addressLength = tag.getAddress().length(); - int exp = (int) Math.pow(10, addressLength-1); - if (firstDigit != 0){ - int digit = Integer.parseInt(tag.getAddress()); - address = digit % (exp); - }else { - address = Integer.parseInt(tag.getAddress()); - } - RegisterType registerType = type.get(firstDigit); - int dataLength = DATA_LENGTH.get(tag.getDataType()); - switch (registerType) { - case COIL: { - BaseLocator loc = BaseLocator.coilStatus(slaveId, address); - batch.addLocator(tag.getKey(), loc); - break; - } - case DISCRETE_INPUT: { - BaseLocator loc = BaseLocator.inputStatus(slaveId, address); - batch.addLocator(tag.getKey(), loc); - break; - } - case HOLDING_REGISTER: { -// logger.info("HOLDING_REGISTER: {}",tag.getAddress()); - if (dataLength == 28){ - BaseLocator locator = BaseLocator.holdingRegister(slaveId, address, 4); - batch.addLocator(tag.getKey(), locator); - }else { - BaseLocator loc = BaseLocator.holdingRegister(slaveId, address, dataLength); - batch.addLocator(tag.getKey(), loc); - } - break; - } - case INPUT_REGISTER: { -// logger.info("INPUT_REGISTER: {}",tag.getAddress()); - BaseLocator loc = BaseLocator.inputRegister(slaveId, address, dataLength); - batch.addLocator(tag.getKey(), loc); - break; - } - } - }); - - BatchResults results = master.send(batch); - - List logInfoList = new ArrayList<>(); - for (TagConfig tag : tags){ - StringBuilder logInfo = new StringBuilder(); - logInfo.append(tag.getAddress()); - if (tag.getBit()!=null){ -// logger.info("批处理读取寄存器成功: {}",tag.getAddress() +"(" + tag.getBit() + "):" + results.getValue(tag.getKey())); - logInfo.append("(" + tag.getBit() + "):"); - }else { -// logger.info("批处理读取寄存器成功: {}",tag.getAddress() + ":" + results.getValue(tag.getKey())); - logInfo.append(":"); - } - logInfo.append(results.getValue(tag.getKey())); - logInfoList.add(logInfo.toString()); - } - logger.info("批处理读取寄存器成功: {}", JSON.toJSONString(logInfoList)); + BatchResults results = sendBatchRead(master, slaveId, tags); + logBatchResults(tags, results); return results; } catch (Exception e){ + if (isTimeoutException(e)) { + logger.warn("批量读取超时,尝试降级为单点读取,slaveId: {}", slaveId); + BatchResults fallback = new BatchResults<>(); + for (TagConfig tag : tags) { + Object value = readTagValue(master, slaveId, tag); + fallback.addResult(tag.getKey(), value); + } + logBatchResults(tags, fallback); + return fallback; + } logger.error("Failed to read master '{}'", slaveId, e); throw new Exception(e); } } + private BatchResults sendBatchRead(ModbusMaster master, int slaveId, List tags) throws Exception { + BatchRead batch = new BatchRead<>(); + tags.forEach(tag -> { + Map type = ModBusType.REGISTER_TYPE; + Map DATA_LENGTH = ModBusType.LENGTH; + int firstDigit = Integer.parseInt(tag.getAddress().substring(0, 1)); + int address = 0; + int addressLength = tag.getAddress().length(); + int exp = (int) Math.pow(10, addressLength - 1); + if (firstDigit != 0) { + int digit = Integer.parseInt(tag.getAddress()); + address = digit % (exp); + } else { + address = Integer.parseInt(tag.getAddress()); + } + RegisterType registerType = type.get(firstDigit); + int dataLength = DATA_LENGTH.get(tag.getDataType()); + switch (registerType) { + case COIL: { + BaseLocator loc = BaseLocator.coilStatus(slaveId, address); + batch.addLocator(tag.getKey(), loc); + break; + } + case DISCRETE_INPUT: { + BaseLocator loc = BaseLocator.inputStatus(slaveId, address); + batch.addLocator(tag.getKey(), loc); + break; + } + case HOLDING_REGISTER: { + if (dataLength == 28) { + BaseLocator locator = BaseLocator.holdingRegister(slaveId, address, 4); + batch.addLocator(tag.getKey(), locator); + } else { + BaseLocator loc = BaseLocator.holdingRegister(slaveId, address, dataLength); + batch.addLocator(tag.getKey(), loc); + } + break; + } + case INPUT_REGISTER: { + BaseLocator loc = BaseLocator.inputRegister(slaveId, address, dataLength); + batch.addLocator(tag.getKey(), loc); + break; + } + } + }); + + return master.send(batch); + } + + private void logBatchResults(List tags, BatchResults results) { + List logInfoList = new ArrayList<>(); + for (TagConfig tag : tags) { + StringBuilder logInfo = new StringBuilder(); + logInfo.append(tag.getAddress()); + if (tag.getBit() != null) { + logInfo.append("(").append(tag.getBit()).append("):"); + } else { + logInfo.append(":"); + } + logInfo.append(results.getValue(tag.getKey())); + logInfoList.add(logInfo.toString()); + } + logger.info("批处理读取寄存器成功: {}", JSON.toJSONString(logInfoList)); + } + + private boolean isTimeoutException(Throwable e) { + Throwable current = e; + while (current != null) { + if (current instanceof com.serotonin.modbus4j.sero.messaging.TimeoutException) { + return true; + } + current = current.getCause(); + } + return false; + } + public static float convertValueToFloat(Object value) { if (!(value instanceof Number)) { throw new IllegalArgumentException("Input must be a Number"); diff --git a/ems-quartz/src/main/java/com/xzzn/quartz/task/ModbusPoller.java b/ems-quartz/src/main/java/com/xzzn/quartz/task/ModbusPoller.java index 113462d..dcc2098 100644 --- a/ems-quartz/src/main/java/com/xzzn/quartz/task/ModbusPoller.java +++ b/ems-quartz/src/main/java/com/xzzn/quartz/task/ModbusPoller.java @@ -31,9 +31,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; + +import javax.annotation.Resource; import java.util.stream.Collectors; import org.eclipse.paho.client.mqttv3.MqttException; @@ -56,10 +58,14 @@ public class ModbusPoller { private final ObjectMapper objectMapper = new ObjectMapper(); private final Map deviceFailureCounts = new ConcurrentHashMap<>(); + @Resource(name = "modbusExecutor") + private ExecutorService modbusExecutor; + @Autowired private ModbusProcessor modbusProcessor; @Autowired private IEmsAlarmRecordsService iEmsAlarmRecordsService; + @Autowired private ISysJobService iSysJobService; @Autowired @@ -99,8 +105,8 @@ public class ModbusPoller { return; } - // 按 host:port 分组 - Map> groupedConfigs = new HashMap<>(); + // 按主机IP分组(同一网关的不同端口也归为一组,避免并发访问导致Connection Reset) + Map> groupedByHost = new HashMap<>(); for (Path filePath : jsonFiles) { DeviceConfig config = null; try { @@ -110,51 +116,44 @@ public class ModbusPoller { continue; } if (config.isEnabled()) { - String key = config.getHost() + ":" + config.getPort(); - groupedConfigs.computeIfAbsent(key, k -> new ArrayList<>()).add(config); + // 只按主机IP分组,确保同一网关的所有端口串行访问 + String hostKey = config.getHost(); + groupedByHost.computeIfAbsent(hostKey, k -> new ArrayList<>()).add(config); } } - int interval = getScheduledTaskInterval(); - // 为每个 host:port 启动一个任务 - for (Map.Entry> entry : groupedConfigs.entrySet()) { - String groupKey = entry.getKey(); - List configs = entry.getValue(); - try { - CompletableFuture.runAsync(() -> { - for (DeviceConfig config : configs) { - try { - scheduledStart(config); - } catch (Exception e) { - log.error("采集设备数据异常: {}", config.getDeviceName(), e); - } - } - }) - .exceptionally(e -> { - log.error("采集设备数据{}轮询异常", groupKey, e); - return null; - }) - .thenRun(() -> log.info("采集设备数据{}轮询任务执行完成", groupKey)); - } catch (Exception e) { - log.error("采集设备数据{}任务失败", groupKey, e); + // 使用单线程 executor 串行执行所有主机的 Modbus 操作 + // 将所有主机的设备按顺序串行处理,避免任何并发访问 + modbusExecutor.submit(() -> { + for (Map.Entry> entry : groupedByHost.entrySet()) { + String hostKey = entry.getKey(); + List configs = entry.getValue(); + for (DeviceConfig config : configs) { + try { + scheduledStart(config); + // 每次读取后等待200ms,给Modbus网关足够的处理时间 + Thread.sleep(200); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + log.warn("Modbus轮询被中断"); + return; + } catch (Exception e) { + log.error("采集设备数据异常: {}", config.getDeviceName(), e); + } + } + log.info("采集设备数据{}轮询任务执行完成", hostKey); } - } + }); } public void scheduledStart(DeviceConfig config) { if (config.isEnabled()) { log.info("Reading data from devices: {}", config.getDeviceName()); - ModbusMaster master = null; - try { - master = modbusProcessor.borrowMaster(config); - } catch (Exception e) { - log.error("Failed to borrow connection '{}'", config.getDeviceName(), e); - // 处理设备连接失败的情况,更新设备状态为离线,添加报警记录 - addDeviceOfflineRecord(siteId, config.getDeviceNumber()); - return; - } + + // 带重试的读取,最多重试2次 + Map data = readWithRetry(config, 2); + List rawValuEmptyList = new ArrayList<>(); - Map data = modbusProcessor.readDataFromDevice(config, master); // 在这里处理采集到的数据空 config.getTags().forEach(tag -> { Object rawValue = data.get(tag.getKey()); @@ -191,6 +190,51 @@ public class ModbusPoller { } } + /** + * 带重试的读取方法 + */ + private Map readWithRetry(DeviceConfig config, int maxRetries) { + Map data = new HashMap<>(); + + for (int attempt = 0; attempt <= maxRetries; attempt++) { + try { + ModbusMaster master = modbusProcessor.borrowMaster(config); + data = modbusProcessor.readDataFromDevice(config, master); + + // 如果读取成功(有数据),直接返回 + if (!data.isEmpty()) { + if (attempt > 0) { + log.info("设备 {} 第 {} 次重试成功", config.getDeviceName(), attempt); + } + return data; + } + + // 读取返回空数据,等待后重试 + if (attempt < maxRetries) { + log.warn("设备 {} 读取返回空数据,等待1秒后重试 ({}/{})", + config.getDeviceName(), attempt + 1, maxRetries); + Thread.sleep(1000); + } + } catch (Exception e) { + log.error("设备 {} 读取异常 ({}/{}): {}", + config.getDeviceName(), attempt + 1, maxRetries, e.getMessage()); + + if (attempt < maxRetries) { + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + } + } + + // 所有重试都失败 + log.error("设备 {} 读取失败,已重试 {} 次", config.getDeviceName(), maxRetries); + return data; + } + private void processingData(Map data, String deviceNumber) { if (CollectionUtils.isEmpty(data)) { // 增加失败计数 @@ -270,4 +314,4 @@ public class ModbusPoller { return Math.toIntExact(CronUtils.getNextExecutionIntervalMillis(sysJobs.get(0).getCronExpression())); } -} \ No newline at end of file +} diff --git a/ems-quartz/src/main/java/com/xzzn/quartz/task/StrategyPoller.java b/ems-quartz/src/main/java/com/xzzn/quartz/task/StrategyPoller.java index 938878f..e24f16d 100644 --- a/ems-quartz/src/main/java/com/xzzn/quartz/task/StrategyPoller.java +++ b/ems-quartz/src/main/java/com/xzzn/quartz/task/StrategyPoller.java @@ -39,8 +39,10 @@ import java.util.Date; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; + +import javax.annotation.Resource; import org.apache.commons.collections4.CollectionUtils; import org.slf4j.Logger; @@ -85,28 +87,26 @@ public class StrategyPoller { @Autowired private ModbusProcessor modbusProcessor; + @Resource(name = "modbusExecutor") + private ExecutorService modbusExecutor; + public void pollAllDevices() { logger.info("开始执行运行策略数据轮询..."); List strategyRunningVoList = emsStrategyRunningMapper.getPendingPollerStrategy(null); strategyRunningVoList.forEach(strategyVo -> { Long strategyId = strategyVo.getId(); if (strategyLocks.putIfAbsent(strategyId, true) == null) { - try { - CompletableFuture.runAsync(() -> { - processData(strategyVo); - }) - .exceptionally(e -> { - logger.error("运行策略{}轮询异常", strategyVo.getId(), e); - return null; - }) - .thenRun(() -> { - logger.info("运行策略{}轮询任务执行完成,释放锁", strategyVo.getId()); - strategyLocks.remove(strategyId); - }); - } catch (Exception e) { - logger.error("运行策略{}任务失败", strategyVo.getId(), e); - strategyLocks.remove(strategyId); - } + // 使用共享的modbusExecutor串行执行,避免与ModbusPoller并发访问导致通讯故障 + modbusExecutor.submit(() -> { + try { + processData(strategyVo); + } catch (Exception e) { + logger.error("运行策略{}轮询异常", strategyVo.getId(), e); + } finally { + logger.info("运行策略{}轮询任务执行完成,释放锁", strategyVo.getId()); + strategyLocks.remove(strategyId); + } + }); } else { logger.info("策略{}已在处理中,跳过重复执行", strategyId); }