diff --git a/ems-admin/src/main/resources/application.yml b/ems-admin/src/main/resources/application.yml index 828ca58..afad4c3 100644 --- a/ems-admin/src/main/resources/application.yml +++ b/ems-admin/src/main/resources/application.yml @@ -145,12 +145,3 @@ modbus: poll: interval: "0 */5 * * * *" # 5分钟间隔 timeout: 30000 # 30秒超时 - -resilience4j: - circuitbreaker: - instances: - modbusOperation: - failure-rate-threshold: 50 - minimum-number-of-calls: 5 - sliding-window-size: 10 - wait-duration-in-open-state: 10s \ No newline at end of file diff --git a/ems-framework/src/main/java/com/xzzn/framework/config/AsyncConfig.java b/ems-framework/src/main/java/com/xzzn/framework/config/AsyncConfig.java index d6bead0..f0271d7 100644 --- a/ems-framework/src/main/java/com/xzzn/framework/config/AsyncConfig.java +++ b/ems-framework/src/main/java/com/xzzn/framework/config/AsyncConfig.java @@ -1,3 +1,4 @@ +/* package com.xzzn.framework.config; import org.springframework.context.annotation.Bean; @@ -25,10 +26,13 @@ public class AsyncConfig { return executor; } - /** + */ +/** * 策略下方定时任务 - */ - /*@Bean("strategyTaskExecutor") + *//* + + */ +/*@Bean("strategyTaskExecutor") public Executor strategyTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); @@ -39,5 +43,7 @@ public class AsyncConfig { executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; - }*/ + }*//* + } +*/ diff --git a/ems-framework/src/main/java/com/xzzn/framework/manager/ModbusConnectionManager.java b/ems-framework/src/main/java/com/xzzn/framework/manager/ModbusConnectionManager.java index 386c919..9529d6d 100644 --- a/ems-framework/src/main/java/com/xzzn/framework/manager/ModbusConnectionManager.java +++ b/ems-framework/src/main/java/com/xzzn/framework/manager/ModbusConnectionManager.java @@ -2,17 +2,22 @@ package com.xzzn.framework.manager; import com.ghgande.j2mod.modbus.net.TCPMasterConnection; import com.xzzn.ems.domain.EmsDevicesSetting; +import com.xzzn.ems.mapper.EmsDevicesSettingMapper; import org.apache.commons.pool2.impl.GenericObjectPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; -import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.net.InetAddress; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.*; +import java.util.stream.Collectors; @Component public class ModbusConnectionManager implements ApplicationRunner { @@ -27,16 +32,18 @@ public class ModbusConnectionManager implements ApplicationRunner { private long timeBetweenEvictionRunsMillis = 30000; private long minEvictableIdleTimeMillis = 60000; + private ScheduledExecutorService scheduler; + @Autowired + private EmsDevicesSettingMapper deviceRepo; @Override - public void run(ApplicationArguments args) throws Exception { + public void run(ApplicationArguments args) throws Exception { init(); } - @PostConstruct public void init() { // 启动心跳检测线程 - ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + scheduler = Executors.newSingleThreadScheduledExecutor(); scheduler.scheduleAtFixedRate(this::heartbeatCheck, 1, 5, TimeUnit.MINUTES); logger.info("Modbus连接管理器已初始化"); } @@ -84,27 +91,49 @@ public class ModbusConnectionManager implements ApplicationRunner { * 心跳检测 */ private void heartbeatCheck() { - logger.debug("开始监控Modbus连接池状态"); + logger.info("开始监控Modbus连接池状态,当前连接数: {}", connectionPool.size()); - connectionPool.forEach((id, wrapper) -> { + // 步骤1:获取所有活跃设备列表(与轮询逻辑共用同一批设备) + List activeDevices = null; + if (activeDevices == null || activeDevices.isEmpty()) { + logger.warn("无活跃设备,心跳检测仅清理无效连接"); + } + + // 步骤2:清理无效连接(遍历连接池,移除已失效的连接) + List invalidDeviceIds = new ArrayList<>(); + connectionPool.forEach((deviceId, wrapper) -> { try { if (!wrapper.isActive()) { - logger.warn("连接{}已失效,移除连接", id); - connectionPool.remove(id); + logger.info("连接{}已失效,移除连接", deviceId); + invalidDeviceIds.add(deviceId); wrapper.close(); } } catch (Exception e) { - logger.error("心跳检测异常: {}", id, e); + logger.error("心跳检测异常: {}", deviceId, e); } }); - // 维持最小空闲连接 - try { - while (connectionPool.size() < minIdle) { - preloadCriticalConnection(); + // 批量移除无效连接(避免边遍历边修改) + invalidDeviceIds.forEach(connectionPool::remove); + logger.debug("移除无效连接后,连接池大小: {}", connectionPool.size()); + + // 步骤3:补充关键设备的连接(优先保障活跃设备的连接存在) + if (!activeDevices.isEmpty()) { + // 3.1 先为所有活跃设备预加载连接(确保需要轮询的设备有连接) + preloadCriticalConnection(activeDevices); + + // 3.2 若连接数仍不足minIdle,补充额外连接(可选,避免连接池过小) + int currentSize = connectionPool.size(); + if (currentSize < minIdle) { + logger.info("连接数{}不足最小空闲数{},补充额外连接", currentSize, minIdle); + // 从活跃设备中选未创建连接的设备补充(避免重复创建) + List needMoreDevices = activeDevices.stream() + .filter(device -> !connectionPool.containsKey(Math.toIntExact(device.getId()))) + .limit(minIdle - currentSize) // 只补充差额 + .collect(Collectors.toList()); + + preloadCriticalConnection(needMoreDevices); // 复用预加载方法 } - } catch (Exception e) { - logger.error("预加载连接失败", e); } } @@ -112,17 +141,29 @@ public class ModbusConnectionManager implements ApplicationRunner { * 预加载关键连接 */ - private void preloadCriticalConnection() { - // 这里应根据实际业务逻辑选择需要预加载的设备 + private void preloadCriticalConnection(List devices) { // 简化示例,不实现具体逻辑 - logger.debug("预加载连接: 连接池当前大小={}, 最小空闲={}", connectionPool.size(), minIdle); + logger.info("预加载连接: 连接池当前大小={}, 最小空闲={}", connectionPool.size(), minIdle); + devices.forEach(device -> { + try { + Integer deviceId = Math.toIntExact(device.getId()); + if (!connectionPool.containsKey(deviceId)) { + getConnection(device); // 复用已有创建逻辑 + } + } catch (Exception e) { + logger.warn("预加载设备{}连接失败", device.getId(), e); + } + }); } /** * 移除最久未使用的空闲连接 */ - private void evictIdleConnection() { + if (connectionPool.isEmpty()) { + return; + } + ModbusConnectionWrapper oldestWrapper = null; long oldestAccessTime = Long.MAX_VALUE; @@ -140,6 +181,15 @@ public class ModbusConnectionManager implements ApplicationRunner { } } + // 移除指定设备连接 + public void removeConnection(Integer deviceId) { + ModbusConnectionWrapper wrapper = connectionPool.remove(deviceId); + if (wrapper != null) { + wrapper.close(); // 双重保障,确保连接关闭 + logger.info("连接池主动移除设备{}的连接", deviceId); + } + } + /** * 判断是否应该移除空连接池 */ @@ -162,6 +212,21 @@ public class ModbusConnectionManager implements ApplicationRunner { } } + // 容器销毁时关闭线程池 + @PreDestroy + public void destroy() { + if (scheduler != null) { + scheduler.shutdown(); + try { + if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } + } catch (InterruptedException e) { + scheduler.shutdownNow(); + } + } + } + // Getters and Setters public void setMaxTotal(int maxTotal) { this.maxTotal = maxTotal; diff --git a/ems-framework/src/main/java/com/xzzn/framework/scheduler/ModbusPoller.java b/ems-framework/src/main/java/com/xzzn/framework/scheduler/ModbusPoller.java index 94a244a..8133d7b 100644 --- a/ems-framework/src/main/java/com/xzzn/framework/scheduler/ModbusPoller.java +++ b/ems-framework/src/main/java/com/xzzn/framework/scheduler/ModbusPoller.java @@ -93,9 +93,9 @@ public class ModbusPoller { // 标记连接为无效 if (wrapper != null) { wrapper.close(); + connectionManager.removeConnection(Integer.parseInt(device.getDeviceId())); } - // 重试机制会在这里触发 - throw new RuntimeException("轮询设备失败", e); + throw new RuntimeException("轮询设备失败", e); } }