@ -2,17 +2,22 @@ package com.xzzn.framework.manager;
import com.ghgande.j2mod.modbus.net.TCPMasterConnection ;
import com.xzzn.ems.domain.EmsDevicesSetting ;
import com.xzzn.ems.mapper.EmsDevicesSettingMapper ;
import org.apache.commons.pool2.impl.GenericObjectPool ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.boot.ApplicationArguments ;
import org.springframework.boot.ApplicationRunner ;
import org.springframework.stereotype.Component ;
import javax.annotation.PostConstruct ;
import javax.annotation.PreDestroy ;
import java.net.InetAddress ;
import java.util.ArrayList ;
import java.util.List ;
import java.util.Map ;
import java.util.concurrent.* ;
import java.util.stream.Collectors ;
@Component
public class ModbusConnectionManager implements ApplicationRunner {
@ -27,16 +32,18 @@ public class ModbusConnectionManager implements ApplicationRunner {
private long timeBetweenEvictionRunsMillis = 30000 ;
private long minEvictableIdleTimeMillis = 60000 ;
private ScheduledExecutorService scheduler ;
@Autowired
private EmsDevicesSettingMapper deviceRepo ;
@Override
public void run ( ApplicationArguments args ) throws Exception {
init ( ) ;
}
@PostConstruct
public void init ( ) {
// 启动心跳检测线程
ScheduledExecutorService scheduler = Executors . newSingleThreadScheduledExecutor ( ) ;
scheduler = Executors . newSingleThreadScheduledExecutor ( ) ;
scheduler . scheduleAtFixedRate ( this : : heartbeatCheck , 1 , 5 , TimeUnit . MINUTES ) ;
logger . info ( " Modbus连接管理器已初始化 " ) ;
}
@ -84,27 +91,49 @@ public class ModbusConnectionManager implements ApplicationRunner {
* 心跳检测
*/
private void heartbeatCheck ( ) {
logger . debug ( " 开始监控Modbus连接池状态" ) ;
logger . info ( " 开始监控Modbus连接池状态,当前连接数: {} " , connectionPool . size ( ) ) ;
connectionPool . forEach ( ( id , wrapper ) - > {
// 步骤1: 获取所有活跃设备列表( 与轮询逻辑共用同一批设备)
List < EmsDevicesSetting > activeDevices = null ;
if ( activeDevices = = null | | activeDevices . isEmpty ( ) ) {
logger . warn ( " 无活跃设备,心跳检测仅清理无效连接 " ) ;
}
// 步骤2: 清理无效连接( 遍历连接池, 移除已失效的连接)
List < Integer > invalidDeviceIds = new ArrayList < > ( ) ;
connectionPool . forEach ( ( deviceId , wrapper ) - > {
try {
if ( ! wrapper . isActive ( ) ) {
logger . warn ( " 连接{}已失效,移除连接 " , i d) ;
connectionPool . remove ( i d) ;
logger . info ( " 连接{}已失效,移除连接 " , deviceI d) ;
invalidDeviceIds . add ( deviceI d) ;
wrapper . close ( ) ;
}
} catch ( Exception e ) {
logger . error ( " 心跳检测异常: {} " , i d, e ) ;
logger . error ( " 心跳检测异常: {} " , deviceI d, e ) ;
}
} ) ;
// 维持最小空闲连接
try {
while ( connectionPool . size ( ) < minIdle ) {
preloadCriticalConnection ( ) ;
// 批量移除无效连接(避免边遍历边修改)
invalidDeviceIds . forEach ( connectionPool : : remove ) ;
logger . debug ( " 移除无效连接后,连接池大小: {} " , connectionPool . size ( ) ) ;
// 步骤3: 补充关键设备的连接( 优先保障活跃设备的连接存在)
if ( ! activeDevices . isEmpty ( ) ) {
// 3.1 先为所有活跃设备预加载连接(确保需要轮询的设备有连接)
preloadCriticalConnection ( activeDevices ) ;
// 3.2 若连接数仍不足minIdle, 补充额外连接( 可选, 避免连接池过小)
int currentSize = connectionPool . size ( ) ;
if ( currentSize < minIdle ) {
logger . info ( " 连接数{}不足最小空闲数{},补充额外连接 " , currentSize , minIdle ) ;
// 从活跃设备中选未创建连接的设备补充(避免重复创建)
List < EmsDevicesSetting > needMoreDevices = activeDevices . stream ( )
. filter ( device - > ! connectionPool . containsKey ( Math . toIntExact ( device . getId ( ) ) ) )
. limit ( minIdle - currentSize ) // 只补充差额
. collect ( Collectors . toList ( ) ) ;
preloadCriticalConnection ( needMoreDevices ) ; // 复用预加载方法
}
} catch ( Exception e ) {
logger . error ( " 预加载连接失败 " , e ) ;
}
}
@ -112,17 +141,29 @@ public class ModbusConnectionManager implements ApplicationRunner {
* 预加载关键连接
*/
private void preloadCriticalConnection ( ) {
// 这里应根据实际业务逻辑选择需要预加载的设备
private void preloadCriticalConnection ( List < EmsDevicesSetting > devices ) {
// 简化示例,不实现具体逻辑
logger . debug ( " 预加载连接: 连接池当前大小={}, 最小空闲={} " , connectionPool . size ( ) , minIdle ) ;
logger . info ( " 预加载连接: 连接池当前大小={}, 最小空闲={} " , connectionPool . size ( ) , minIdle ) ;
devices . forEach ( device - > {
try {
Integer deviceId = Math . toIntExact ( device . getId ( ) ) ;
if ( ! connectionPool . containsKey ( deviceId ) ) {
getConnection ( device ) ; // 复用已有创建逻辑
}
} catch ( Exception e ) {
logger . warn ( " 预加载设备{}连接失败 " , device . getId ( ) , e ) ;
}
} ) ;
}
/**
* 移除最久未使用的空闲连接
*/
private void evictIdleConnection ( ) {
if ( connectionPool . isEmpty ( ) ) {
return ;
}
ModbusConnectionWrapper oldestWrapper = null ;
long oldestAccessTime = Long . MAX_VALUE ;
@ -140,6 +181,15 @@ public class ModbusConnectionManager implements ApplicationRunner {
}
}
// 移除指定设备连接
public void removeConnection ( Integer deviceId ) {
ModbusConnectionWrapper wrapper = connectionPool . remove ( deviceId ) ;
if ( wrapper ! = null ) {
wrapper . close ( ) ; // 双重保障,确保连接关闭
logger . info ( " 连接池主动移除设备{}的连接 " , deviceId ) ;
}
}
/**
* 判断是否应该移除空连接池
*/
@ -162,6 +212,21 @@ public class ModbusConnectionManager implements ApplicationRunner {
}
}
// 容器销毁时关闭线程池
@PreDestroy
public void destroy ( ) {
if ( scheduler ! = null ) {
scheduler . shutdown ( ) ;
try {
if ( ! scheduler . awaitTermination ( 5 , TimeUnit . SECONDS ) ) {
scheduler . shutdownNow ( ) ;
}
} catch ( InterruptedException e ) {
scheduler . shutdownNow ( ) ;
}
}
}
// Getters and Setters
public void setMaxTotal ( int maxTotal ) {
this . maxTotal = maxTotal ;