diff --git a/ems-common/src/main/java/com/xzzn/common/core/modbus/Modbus4jConnectionManager.java b/ems-common/src/main/java/com/xzzn/common/core/modbus/Modbus4jConnectionManager.java index 0234a14..a6b626e 100644 --- a/ems-common/src/main/java/com/xzzn/common/core/modbus/Modbus4jConnectionManager.java +++ b/ems-common/src/main/java/com/xzzn/common/core/modbus/Modbus4jConnectionManager.java @@ -26,9 +26,14 @@ public class Modbus4jConnectionManager { GenericObjectPool pool = connectionPools.computeIfAbsent(poolKey, key -> { PooledModbusMasterFactory factory = new PooledModbusMasterFactory(config.getHost(), config.getPort()); GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig<>(); - poolConfig.setMaxTotal(5); // 池中最大连接数 - poolConfig.setMinIdle(1); // 最小空闲连接数 + poolConfig.setMaxTotal(20); // 池中最大连接数 + poolConfig.setMinIdle(4); // 最小空闲连接数 poolConfig.setTestOnBorrow(true); // 借用时测试连接有效性 + poolConfig.setTestOnReturn(true); // 归还时测试连接有效性 + poolConfig.setTestWhileIdle(true); // 空闲时测试连接有效性 + poolConfig.setMaxWaitMillis(3000); // 获取连接的最大等待时间3秒 + poolConfig.setTimeBetweenEvictionRunsMillis(30000); // 30秒检查一次空闲连接 + poolConfig.setMinEvictableIdleTimeMillis(60000); // 空闲超过60秒的连接可以被驱逐 return new GenericObjectPool<>(factory, poolConfig); }); diff --git a/ems-common/src/main/java/com/xzzn/common/core/modbus/ModbusProcessor.java b/ems-common/src/main/java/com/xzzn/common/core/modbus/ModbusProcessor.java index b30f749..7047d74 100644 --- a/ems-common/src/main/java/com/xzzn/common/core/modbus/ModbusProcessor.java +++ b/ems-common/src/main/java/com/xzzn/common/core/modbus/ModbusProcessor.java @@ -43,6 +43,8 @@ import static com.xzzn.common.enums.RegisterType.DISCRETE_INPUT; public class ModbusProcessor { private static final Logger logger = LoggerFactory.getLogger(ModbusProcessor.class); + private int readTimeout = 5000; + private int writeTimeout = 3000; @Autowired private RedisCache redisCache; @@ -55,8 +57,8 @@ public class ModbusProcessor { boolean result; try { master = connectionManager.borrowMaster(config); - // 设置了Modbus通信的超时时间为1000毫秒(1秒)。当主设备与从设备通信时,若在1秒内未收到响应,则认为通信超时并抛出异常。这有助于避免长时间等待无响应的设备。 - master.setTimeout(1000); + // 设置了Modbus通信的超时时间为3000毫秒(3秒)。当主设备与从设备通信时,若在3秒内未收到响应,则认为通信超时并抛出异常。这有助于避免长时间等待无响应的设备。 + master.setTimeout(writeTimeout); result = writeTagValue(master, config, config.getWriteTags()); } catch (Exception e) { logger.error("Failed to borrow connection or write to devices '{}'", config.getDeviceName(), e); @@ -78,7 +80,7 @@ public class ModbusProcessor { try { master = connectionManager.borrowMaster(config); - master.setTimeout(1000); // 设置超时时间 + master.setTimeout(writeTimeout); // 设置超时时间 // 使用重试装饰器 ModbusMaster finalMaster = master; @@ -221,7 +223,7 @@ public class ModbusProcessor { public ModbusMaster borrowMaster(DeviceConfig config) throws Exception { ModbusMaster master = connectionManager.borrowMaster(config); // 设置了Modbus通信的超时时间为5000毫秒(5秒)。当主设备与从设备通信时,若在5秒内未收到响应,则认为通信超时并抛出异常。这有助于避免长时间等待无响应的设备。 - master.setTimeout(5000); + master.setTimeout(readTimeout); return master; } diff --git a/ems-framework/src/main/java/com/xzzn/framework/manager/ModbusConnectionManager.java b/ems-framework/src/main/java/com/xzzn/framework/manager/ModbusConnectionManager.java index cf968c9..3831757 100644 --- a/ems-framework/src/main/java/com/xzzn/framework/manager/ModbusConnectionManager.java +++ b/ems-framework/src/main/java/com/xzzn/framework/manager/ModbusConnectionManager.java @@ -1,11 +1,27 @@ package com.xzzn.framework.manager; +import com.ghgande.j2mod.modbus.net.SerialConnection; import com.ghgande.j2mod.modbus.net.TCPMasterConnection; +import com.ghgande.j2mod.modbus.util.SerialParameters; +import com.xzzn.common.enums.DeviceType; import com.xzzn.ems.domain.EmsDevicesSetting; import com.xzzn.ems.mapper.EmsDevicesSettingMapper; import com.xzzn.ems.service.IEmsAlarmRecordsService; import com.xzzn.ems.service.IEmsDeviceSettingService; import com.xzzn.ems.service.IEmsEnergyPriceConfigService; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import javax.annotation.PreDestroy; + import org.apache.commons.pool2.impl.GenericObjectPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -14,14 +30,6 @@ import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; -import javax.annotation.PreDestroy; -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.*; -import java.util.stream.Collectors; - @Component public class ModbusConnectionManager implements ApplicationRunner { private static final Logger logger = LoggerFactory.getLogger(ModbusConnectionManager.class); @@ -34,6 +42,7 @@ public class ModbusConnectionManager implements ApplicationRunner { private long maxWaitMillis = 3000; private long timeBetweenEvictionRunsMillis = 30000; private long minEvictableIdleTimeMillis = 60000; + private int connectTimeOut = 5000; private ScheduledExecutorService scheduler; @Autowired @@ -93,14 +102,29 @@ public class ModbusConnectionManager implements ApplicationRunner { /** * 创建原始Modbus连接 */ - private TCPMasterConnection createRawConnection(EmsDevicesSetting device) throws Exception { + private Object createRawConnection(EmsDevicesSetting device) throws Exception { try { - InetAddress addr = InetAddress.getByName("10.1.0.230"); - TCPMasterConnection connection = new TCPMasterConnection(addr); - connection.setPort(502); - connection.setTimeout(5000); - connection.connect(); - return connection; + if (DeviceType.TCP.name().equals(device.getDeviceType())) { + InetAddress addr = InetAddress.getByName(device.getIpAddress()); + TCPMasterConnection connection = new TCPMasterConnection(addr); + connection.setPort(device.getIpPort().intValue()); + connection.setTimeout(connectTimeOut); + connection.connect(); + return connection; + } else if (DeviceType.RTU.name().equals(device.getDeviceType())) { + SerialParameters parameters = new SerialParameters(); + parameters.setPortName(device.getSerialPort()); + parameters.setBaudRate(device.getBaudRate().intValue()); + parameters.setDatabits(device.getDataBits().intValue()); + parameters.setStopbits(device.getStopBits().intValue()); + parameters.setParity(device.getParity()); + SerialConnection connection = new SerialConnection(parameters); + connection.setTimeout(connectTimeOut); + connection.open(); + return connection; + } else { + throw new IllegalArgumentException("不支持的设备类型: " + device.getDeviceType()); + } } catch (Exception e) { logger.error("创建Modbus连接失败: {}", device, e); throw e; diff --git a/ems-quartz/src/main/java/com/xzzn/quartz/task/ModbusPoller.java b/ems-quartz/src/main/java/com/xzzn/quartz/task/ModbusPoller.java index 6c572be..113462d 100644 --- a/ems-quartz/src/main/java/com/xzzn/quartz/task/ModbusPoller.java +++ b/ems-quartz/src/main/java/com/xzzn/quartz/task/ModbusPoller.java @@ -31,6 +31,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -81,7 +82,6 @@ public class ModbusPoller { this.scheduledTask = scheduledTask; } - public void pollAllDevices() { Path devicesDir = Paths.get(System.getProperty("user.dir"), "devices"); if (!Files.exists(devicesDir)) { @@ -120,17 +120,24 @@ public class ModbusPoller { for (Map.Entry> entry : groupedConfigs.entrySet()) { String groupKey = entry.getKey(); List configs = entry.getValue(); - // 取其中一个配置的时间间隔作为该组任务的执行周期 -// long interval = configs.get(0).getTime(); - scheduledTask.startTask(groupKey, () -> { - for (DeviceConfig config : configs) { - try { - scheduledStart(config); - } catch (Exception e) { - log.error("采集设备数据异常: {}", config.getDeviceName(), e); - } - } - }, interval); + try { + CompletableFuture.runAsync(() -> { + for (DeviceConfig config : configs) { + try { + scheduledStart(config); + } catch (Exception e) { + log.error("采集设备数据异常: {}", config.getDeviceName(), e); + } + } + }) + .exceptionally(e -> { + log.error("采集设备数据{}轮询异常", groupKey, e); + return null; + }) + .thenRun(() -> log.info("采集设备数据{}轮询任务执行完成", groupKey)); + } catch (Exception e) { + log.error("采集设备数据{}任务失败", groupKey, e); + } } }