diff --git a/ems-common/src/main/java/com/xzzn/common/core/modbus/Modbus4jConnectionManager.java b/ems-common/src/main/java/com/xzzn/common/core/modbus/Modbus4jConnectionManager.java index b4a901e..ec5f7a4 100644 --- a/ems-common/src/main/java/com/xzzn/common/core/modbus/Modbus4jConnectionManager.java +++ b/ems-common/src/main/java/com/xzzn/common/core/modbus/Modbus4jConnectionManager.java @@ -7,23 +7,64 @@ import com.xzzn.common.core.modbus.domain.DeviceConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + /** * Modbus连接管理器 - * 不使用连接池,每次请求创建新连接,使用完立即销毁 - * 避免复用已被服务端断开的连接导致Connection reset + * 使用长连接模式,维护连接缓存,复用已有连接 */ @Component public class Modbus4jConnectionManager { private static final Logger logger = LoggerFactory.getLogger(Modbus4jConnectionManager.class); private final ModbusFactory modbusFactory = new ModbusFactory(); + + private final Map connectionCache = new ConcurrentHashMap<>(); /** - * 创建新连接 + * 获取或创建连接(长连接模式) */ public ModbusMaster borrowMaster(DeviceConfig config) throws Exception { + String key = buildConnectionKey(config); + ModbusMaster master = connectionCache.get(key); + + if (master == null) { + synchronized (this) { + master = connectionCache.get(key); + if (master == null) { + master = createNewConnection(config); + connectionCache.put(key, master); + logger.info("创建新Modbus长连接: {}:{}", config.getHost(), config.getPort()); + } + } + } + + + return master; + } + + /** + * 归还连接(长连接模式,不关闭) + */ + public void returnMaster(DeviceConfig config, ModbusMaster master) { + } + + /** + * 废弃连接(发生异常时关闭并移除) + */ + public void invalidateMaster(DeviceConfig config, ModbusMaster master) { + String key = buildConnectionKey(config); + connectionCache.remove(key); + destroyMaster(master, config); + logger.warn("废弃并移除Modbus连接: {}:{}", config.getHost(), config.getPort()); + } + + private ModbusMaster createNewConnection(DeviceConfig config) throws Exception { IpParameters params = new IpParameters(); params.setHost(config.getHost()); params.setPort(config.getPort()); @@ -31,24 +72,10 @@ public class Modbus4jConnectionManager { ModbusMaster master = modbusFactory.createTcpMaster(params, true); master.init(); - logger.debug("创建新Modbus连接: {}:{}", config.getHost(), config.getPort()); + return master; } - /** - * 关闭连接 - */ - public void returnMaster(DeviceConfig config, ModbusMaster master) { - destroyMaster(master, config); - } - - /** - * 废弃连接(与returnMaster相同) - */ - public void invalidateMaster(DeviceConfig config, ModbusMaster master) { - destroyMaster(master, config); - } - private void destroyMaster(ModbusMaster master, DeviceConfig config) { if (master != null) { try { @@ -59,4 +86,26 @@ public class Modbus4jConnectionManager { } } } + + private String buildConnectionKey(DeviceConfig config) { + return config.getHost() + ":" + config.getPort(); + } + + /** + * 关闭所有连接 + */ + public void closeAllConnections() { + for (Map.Entry entry : connectionCache.entrySet()) { + try { + if (entry.getValue() != null) { + entry.getValue().destroy(); + } + } catch (Exception e) { + logger.warn("关闭Modbus连接异常: {}", entry.getKey(), e); + } + } + connectionCache.clear(); + logger.info("已关闭所有Modbus连接"); + } + } diff --git a/ems-framework/src/main/java/com/xzzn/framework/config/ModbusConfig.java b/ems-framework/src/main/java/com/xzzn/framework/config/ModbusConfig.java deleted file mode 100644 index 51398b4..0000000 --- a/ems-framework/src/main/java/com/xzzn/framework/config/ModbusConfig.java +++ /dev/null @@ -1,38 +0,0 @@ -package com.xzzn.framework.config; - -import com.xzzn.framework.manager.ModbusConnectionManager; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Configuration; - -@Configuration -public class ModbusConfig { - - @Value("${modbus.pool.max-total:20}") - private int maxTotal; - - @Value("${modbus.pool.max-idle:10}") - private int maxIdle; - - @Value("${modbus.pool.min-idle:3}") - private int minIdle; - - @Value("${modbus.pool.max-wait:3000}") - private long maxWaitMillis; - - @Value("${modbus.pool.time-between-eviction-runs:30000}") - private long timeBetweenEvictionRunsMillis; - - @Value("${modbus.pool.min-evictable-idle-time:60000}") - private long minEvictableIdleTimeMillis; - - public ModbusConnectionManager modbusConnectionManager() { - ModbusConnectionManager manager = new ModbusConnectionManager(); - manager.setMaxTotal(maxTotal); - manager.setMaxIdle(maxIdle); - manager.setMinIdle(minIdle); - manager.setMaxWaitMillis(maxWaitMillis); - manager.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis); - manager.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis); - return manager; - } -} diff --git a/ems-framework/src/main/java/com/xzzn/framework/manager/ModbusConnectionManager.java b/ems-framework/src/main/java/com/xzzn/framework/manager/ModbusConnectionManager.java deleted file mode 100644 index 3831757..0000000 --- a/ems-framework/src/main/java/com/xzzn/framework/manager/ModbusConnectionManager.java +++ /dev/null @@ -1,298 +0,0 @@ -package com.xzzn.framework.manager; - -import com.ghgande.j2mod.modbus.net.SerialConnection; -import com.ghgande.j2mod.modbus.net.TCPMasterConnection; -import com.ghgande.j2mod.modbus.util.SerialParameters; -import com.xzzn.common.enums.DeviceType; -import com.xzzn.ems.domain.EmsDevicesSetting; -import com.xzzn.ems.mapper.EmsDevicesSettingMapper; -import com.xzzn.ems.service.IEmsAlarmRecordsService; -import com.xzzn.ems.service.IEmsDeviceSettingService; -import com.xzzn.ems.service.IEmsEnergyPriceConfigService; - -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import javax.annotation.PreDestroy; - -import org.apache.commons.pool2.impl.GenericObjectPool; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; -import org.springframework.stereotype.Component; - -@Component -public class ModbusConnectionManager implements ApplicationRunner { - private static final Logger logger = LoggerFactory.getLogger(ModbusConnectionManager.class); - - private final Map connectionPool = new ConcurrentHashMap<>(); - // 连接池配置参数 - private int maxTotal = 20; - private int maxIdle = 10; - private int minIdle = 3; - private long maxWaitMillis = 3000; - private long timeBetweenEvictionRunsMillis = 30000; - private long minEvictableIdleTimeMillis = 60000; - private int connectTimeOut = 5000; - - private ScheduledExecutorService scheduler; - @Autowired - private EmsDevicesSettingMapper deviceRepo; - @Autowired - private IEmsAlarmRecordsService iEmsAlarmRecordsService; - @Autowired - private IEmsEnergyPriceConfigService iEmsEnergyPriceConfigService; - @Autowired - private IEmsDeviceSettingService iEmsDeviceSettingService; - - @Override - public void run(ApplicationArguments args) throws Exception { - init(); - } - - public void init() { - // 启动心跳检测线程 - scheduler = Executors.newSingleThreadScheduledExecutor(); - scheduler.scheduleAtFixedRate(this::heartbeatCheck, 1, 5, TimeUnit.MINUTES); - logger.info("Modbus连接管理器已初始化"); - - // 初始数据工作 - initData(); - } - - private void initData() { - // 初始化-设备信息 - iEmsDeviceSettingService.initDeviceInfo(); - // 初始化-告警数据 - iEmsAlarmRecordsService.initAlarmMatchInfo(); - // 初始化当月电价 - iEmsEnergyPriceConfigService.initCurrentMonthPrice(); - } - /** - * 获取连接(带自动创建和缓存) - */ - - public ModbusConnectionWrapper getConnection(EmsDevicesSetting device) throws Exception { - return connectionPool.compute(Math.toIntExact(device.getId()), (id, wrapper) -> { - try { - if (wrapper == null || !wrapper.isActive()) { - if (connectionPool.size() >= maxTotal) { - evictIdleConnection(); - } - logger.info("创建新连接: {}", device); - return new ModbusConnectionWrapper(createRawConnection(device)); - } - wrapper.updateLastAccess(); - return wrapper; - } catch (Exception e) { - throw new RuntimeException("连接创建失败: " + device.getId(), e); - } - }); - } - - /** - * 创建原始Modbus连接 - */ - private Object createRawConnection(EmsDevicesSetting device) throws Exception { - try { - if (DeviceType.TCP.name().equals(device.getDeviceType())) { - InetAddress addr = InetAddress.getByName(device.getIpAddress()); - TCPMasterConnection connection = new TCPMasterConnection(addr); - connection.setPort(device.getIpPort().intValue()); - connection.setTimeout(connectTimeOut); - connection.connect(); - return connection; - } else if (DeviceType.RTU.name().equals(device.getDeviceType())) { - SerialParameters parameters = new SerialParameters(); - parameters.setPortName(device.getSerialPort()); - parameters.setBaudRate(device.getBaudRate().intValue()); - parameters.setDatabits(device.getDataBits().intValue()); - parameters.setStopbits(device.getStopBits().intValue()); - parameters.setParity(device.getParity()); - SerialConnection connection = new SerialConnection(parameters); - connection.setTimeout(connectTimeOut); - connection.open(); - return connection; - } else { - throw new IllegalArgumentException("不支持的设备类型: " + device.getDeviceType()); - } - } catch (Exception e) { - logger.error("创建Modbus连接失败: {}", device, e); - throw e; - } - } - - /** - * 心跳检测 - */ - private void heartbeatCheck() { - logger.info("开始监控Modbus连接池状态,当前连接数: {}", connectionPool.size()); - - // 步骤1:获取所有活跃设备列表(与轮询逻辑共用同一批设备) - List activeDevices = null; - if (activeDevices == null || activeDevices.isEmpty()) { - logger.warn("无活跃设备,心跳检测仅清理无效连接"); - } - - // 步骤2:清理无效连接(遍历连接池,移除已失效的连接) - List invalidDeviceIds = new ArrayList<>(); - connectionPool.forEach((deviceId, wrapper) -> { - try { - if (!wrapper.isActive()) { - logger.info("连接{}已失效,移除连接", deviceId); - invalidDeviceIds.add(deviceId); - wrapper.close(); - } - } catch (Exception e) { - logger.error("心跳检测异常: {}", deviceId, e); - } - }); - - // 批量移除无效连接(避免边遍历边修改) - invalidDeviceIds.forEach(connectionPool::remove); - logger.debug("移除无效连接后,连接池大小: {}", connectionPool.size()); - - // 步骤3:补充关键设备的连接(优先保障活跃设备的连接存在) - if (!activeDevices.isEmpty()) { - // 3.1 先为所有活跃设备预加载连接(确保需要轮询的设备有连接) - preloadCriticalConnection(activeDevices); - - // 3.2 若连接数仍不足minIdle,补充额外连接(可选,避免连接池过小) - int currentSize = connectionPool.size(); - if (currentSize < minIdle) { - logger.info("连接数{}不足最小空闲数{},补充额外连接", currentSize, minIdle); - // 从活跃设备中选未创建连接的设备补充(避免重复创建) - List needMoreDevices = activeDevices.stream() - .filter(device -> !connectionPool.containsKey(Math.toIntExact(device.getId()))) - .limit(minIdle - currentSize) // 只补充差额 - .collect(Collectors.toList()); - - preloadCriticalConnection(needMoreDevices); // 复用预加载方法 - } - } - } - - /** - * 预加载关键连接 - */ - - private void preloadCriticalConnection(List devices) { - // 简化示例,不实现具体逻辑 - logger.info("预加载连接: 连接池当前大小={}, 最小空闲={}", connectionPool.size(), minIdle); - devices.forEach(device -> { - try { - Integer deviceId = Math.toIntExact(device.getId()); - if (!connectionPool.containsKey(deviceId)) { - getConnection(device); // 复用已有创建逻辑 - } - } catch (Exception e) { - logger.warn("预加载设备{}连接失败", device.getId(), e); - } - }); - } - - /** - * 移除最久未使用的空闲连接 - */ - private void evictIdleConnection() { - if (connectionPool.isEmpty()) { - return; - } - - ModbusConnectionWrapper oldestWrapper = null; - long oldestAccessTime = Long.MAX_VALUE; - - for (ModbusConnectionWrapper wrapper : connectionPool.values()) { - if (wrapper.isActive() && wrapper.getLastAccessTime() < oldestAccessTime) { - oldestAccessTime = wrapper.getLastAccessTime(); - oldestWrapper = wrapper; - } - } - - if (oldestWrapper != null) { - logger.info("移除空闲连接: {}", oldestWrapper.getConnection()); - connectionPool.values().remove(oldestWrapper); - oldestWrapper.close(); - } - } - - // 移除指定设备连接 - public void removeConnection(Integer deviceId) { - ModbusConnectionWrapper wrapper = connectionPool.remove(deviceId); - if (wrapper != null) { - wrapper.close(); // 双重保障,确保连接关闭 - logger.info("连接池主动移除设备{}的连接", deviceId); - } - } - - /** - * 判断是否应该移除空连接池 - */ - private boolean shouldRemoveEmptyPool(GenericObjectPool pool) { - // 可根据配置或逻辑决定是否移除空连接池 - // 这里简单实现为当连接池数量超过最大值时移除 - return connectionPool.size() > maxTotal; - } - - /** - * 关闭连接 - */ - private void closeConnection(TCPMasterConnection connection) { - try { - if (connection != null && connection.isConnected()) { - connection.close(); - } - } catch (Exception e) { - logger.error("关闭Modbus连接失败", e); - } - } - - // 容器销毁时关闭线程池 - @PreDestroy - public void destroy() { - if (scheduler != null) { - scheduler.shutdown(); - try { - if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { - scheduler.shutdownNow(); - } - } catch (InterruptedException e) { - scheduler.shutdownNow(); - } - } - } - - // Getters and Setters - public void setMaxTotal(int maxTotal) { - this.maxTotal = maxTotal; - } - - public void setMaxIdle(int maxIdle) { - this.maxIdle = maxIdle; - } - - public void setMinIdle(int minIdle) { - this.minIdle = minIdle; - } - - public void setMaxWaitMillis(long maxWaitMillis) { - this.maxWaitMillis = maxWaitMillis; - } - - public void setTimeBetweenEvictionRunsMillis(long timeBetweenEvictionRunsMillis) { - this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis; - } - - public void setMinEvictableIdleTimeMillis(long minEvictableIdleTimeMillis) { - this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis; - } -} diff --git a/ems-framework/src/main/java/com/xzzn/framework/manager/ModbusConnectionWrapper.java b/ems-framework/src/main/java/com/xzzn/framework/manager/ModbusConnectionWrapper.java deleted file mode 100644 index 8237f4f..0000000 --- a/ems-framework/src/main/java/com/xzzn/framework/manager/ModbusConnectionWrapper.java +++ /dev/null @@ -1,81 +0,0 @@ -package com.xzzn.framework.manager; - -import com.ghgande.j2mod.modbus.net.SerialConnection; -import com.ghgande.j2mod.modbus.net.TCPMasterConnection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.atomic.AtomicInteger; - -public class ModbusConnectionWrapper { - private static final Logger logger = LoggerFactory.getLogger(ModbusConnectionWrapper.class); - - private static final AtomicInteger COUNTER = new AtomicInteger(0); - - private final Object connection; - private final int connectionId; - private volatile long lastAccessTime; - private volatile boolean active = true; - - public ModbusConnectionWrapper(Object connection) { - this.connection = connection; - this.connectionId = COUNTER.incrementAndGet(); - this.lastAccessTime = System.currentTimeMillis(); - logger.info("创建连接包装: {}", this); - } - - public boolean isActive() { - if (!active) return false; - - try { - // 检查连接是否物理上有效 - if (connection instanceof TCPMasterConnection) { - return ((TCPMasterConnection) connection).isConnected(); - } else if (connection instanceof SerialConnection) { - return ((SerialConnection) connection).isOpen(); - } - } catch (Exception e) { - logger.error("连接状态检查失败: {}", connectionId, e); - return false; - } - - // 默认检查空闲时间 - return System.currentTimeMillis() - lastAccessTime < 300000; // 5分钟 - } - - public void updateLastAccess() { - this.lastAccessTime = System.currentTimeMillis(); - } - - public Object getConnection() { - return connection; - } - - public void close() { - try { - logger.info("关闭连接: {}", this); - if (connection instanceof TCPMasterConnection) { - ((TCPMasterConnection) connection).close(); - } else if (connection instanceof SerialConnection) { - ((SerialConnection) connection).close(); - } - } catch (Exception e) { - logger.error("关闭连接失败: {}", connectionId, e); - } finally { - this.active = false; - } - } - - public long getLastAccessTime() { - return lastAccessTime; - } - - @Override - public String toString() { - return "ModbusConnectionWrapper{" + - "connectionId=" + connectionId + - ", active=" + active + - ", lastAccessTime=" + lastAccessTime + - '}'; - } -} diff --git a/ems-framework/src/main/java/com/xzzn/framework/web/service/ModbusService.java b/ems-framework/src/main/java/com/xzzn/framework/web/service/ModbusService.java deleted file mode 100644 index ed7ac59..0000000 --- a/ems-framework/src/main/java/com/xzzn/framework/web/service/ModbusService.java +++ /dev/null @@ -1,279 +0,0 @@ -package com.xzzn.framework.web.service; - -import com.ghgande.j2mod.modbus.ModbusException; -import com.ghgande.j2mod.modbus.ModbusIOException; -import com.ghgande.j2mod.modbus.io.ModbusSerialTransaction; -import com.ghgande.j2mod.modbus.io.ModbusTCPTransaction; -import com.ghgande.j2mod.modbus.msg.ReadInputRegistersRequest; -import com.ghgande.j2mod.modbus.msg.ReadInputRegistersResponse; -import com.ghgande.j2mod.modbus.msg.WriteMultipleRegistersRequest; -import com.ghgande.j2mod.modbus.msg.WriteSingleRegisterRequest; -import com.ghgande.j2mod.modbus.net.SerialConnection; -import com.ghgande.j2mod.modbus.net.TCPMasterConnection; -import com.ghgande.j2mod.modbus.procimg.Register; -import com.ghgande.j2mod.modbus.procimg.SimpleRegister; -import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.retry.annotation.Backoff; -import org.springframework.retry.annotation.Retryable; -import org.springframework.stereotype.Service; - -/** - * Modbus操作服务(添加重试机制) - */ -@Service -public class ModbusService { - private static final Logger logger = LoggerFactory.getLogger(ModbusService.class); - - @Retryable( - value = {ModbusException.class}, // 仅对自定义Modbus异常重试 - maxAttempts = 3, // 最大重试3次(1次原始调用 + 2次重试) - backoff = @Backoff(delay = 1000, multiplier = 2) // 退避策略:1s → 2s → 4s - ) - @CircuitBreaker(name = "modbusOperation", fallbackMethod = "readRegistersFallback") - public int[] readHoldingRegisters(Object connection, int startAddr, int count) throws ModbusException { - try { - if (connection instanceof TCPMasterConnection) { - return readTcpRegisters((TCPMasterConnection) connection, startAddr, count); - } else if (connection instanceof SerialConnection) { - return readRtuRegisters((SerialConnection) connection, startAddr, count); - } - throw new IllegalArgumentException("不支持的连接类型: " + connection.getClass().getName()); - } catch (ModbusIOException e) { - throw new ModbusException("通信故障", e); - } catch (Exception e) { - throw new ModbusException("系统错误", e); - } - } - - private int[] readRtuRegisters(SerialConnection connection, int startAddr, int count) throws ModbusException { - if (!connection.isOpen()) { - throw new ModbusIOException("RTU连接未建立"); - } - ReadInputRegistersRequest request = new ReadInputRegistersRequest(startAddr, count); - ModbusSerialTransaction transaction = new ModbusSerialTransaction(connection); - transaction.setRequest(request); - transaction.setRetries(2); - - try { - transaction.execute(); - ReadInputRegistersResponse response = (ReadInputRegistersResponse) transaction.getResponse(); - if (response == null) { - throw new ModbusException("RTU响应为空"); - } - return parseRegisters(response); - } catch (ModbusException e) { - logger.error("读取RTU寄存器失败: {}", e.getMessage()); - throw e; - } - } - - private int[] readTcpRegisters(TCPMasterConnection conn, int start, int count) throws ModbusException { - // 验证连接是否已建立 - if (!conn.isConnected()) { - throw new ModbusIOException("TCP连接未建立"); - } - // 使用正确的功能码(03 - 读取保持寄存器)ReadHoldingRegistersRequest - ReadInputRegistersRequest request = new ReadInputRegistersRequest(start, count); - ModbusTCPTransaction transaction = new ModbusTCPTransaction(conn); - transaction.setRequest(request); - - // 设置超时避免长时间阻塞 - transaction.setRetries(2); - try { - transaction.execute(); - ReadInputRegistersResponse response = (ReadInputRegistersResponse) transaction.getResponse(); - - if (response == null) { - throw new ModbusException("Modbus异常响应: " + response.getMessage()); - } - - // 正确解析寄存器值 - return parseRegisters(response); - } catch (ModbusException e) { - // 记录详细错误信息 - logger.error("读取TCP寄存器失败: {}", e.getMessage()); - throw e; - } - } - - /** - * 解析Modbus响应中的寄存器值 - */ - private int[] parseRegisters(ReadInputRegistersResponse response) { - int byteCount = response.getByteCount(); - int[] result = new int[byteCount / 2]; - - for (int i = 0; i < result.length; i++) { - // 转换为无符号整数 - result[i] = response.getRegisterValue(i) & 0xFFFF; - } - - return result; - } - - /** - * 熔断降级方法 - */ - public int[] readRegistersFallback(Object connection, int startAddr, int count, Exception e) { - logger.warn("Modbus操作降级(原因: {}),返回空数据", e.getMessage()); - return new int[0]; - } - - /** - * 写入单个寄存器(支持TCP/RTU) - * @param connection 连接对象(TCPMasterConnection 或 SerialConnection) - * @param registerAddr 寄存器地址 - * @param value 要写入的值(16位整数) - */ - @Retryable( - value = {ModbusException.class}, - maxAttempts = 3, - backoff = @Backoff(delay = 1000, multiplier = 2) - ) - @CircuitBreaker(name = "modbusOperation", fallbackMethod = "writeRegisterFallback") - public boolean writeSingleRegister(Object connection, int registerAddr, int value) throws ModbusException { - try { - if (connection instanceof TCPMasterConnection) { - return writeTcpSingleRegister((TCPMasterConnection) connection, registerAddr, value); - } else if (connection instanceof SerialConnection) { - return writeRtuSingleRegister((SerialConnection) connection, registerAddr, value); - } - throw new IllegalArgumentException("不支持的连接类型: " + connection.getClass().getName()); - } catch (ModbusIOException e) { - throw new ModbusException("写入通信故障", e); - } catch (Exception e) { - throw new ModbusException("写入系统错误", e); - } - } - - /** - * 写入多个寄存器(支持TCP/RTU) - * @param connection 连接对象 - * @param startAddr 起始寄存器地址 - * @param values 要写入的值数组(每个值为16位整数) - */ - @Retryable( - value = {ModbusException.class}, - maxAttempts = 3, - backoff = @Backoff(delay = 1000, multiplier = 2) - ) - @CircuitBreaker(name = "modbusOperation", fallbackMethod = "writeRegisterFallback") - public boolean writeMultipleRegisters(Object connection, int startAddr, int[] values) throws ModbusException { - try { - if (connection instanceof TCPMasterConnection) { - return writeTcpMultipleRegisters((TCPMasterConnection) connection, startAddr, values); - } else if (connection instanceof SerialConnection) { - return writeRtuMultipleRegisters((SerialConnection) connection, startAddr, values); - } - throw new IllegalArgumentException("不支持的连接类型: " + connection.getClass().getName()); - } catch (ModbusIOException e) { - throw new ModbusException("写入通信故障", e); - } catch (Exception e) { - throw new ModbusException("写入系统错误", e); - } - } - - // ==================== TCP写入实现 ==================== - private boolean writeTcpSingleRegister(TCPMasterConnection conn, int registerAddr, int value) throws ModbusException { - if (!conn.isConnected()) { - throw new ModbusIOException("TCP连接未建立,无法写入"); - } - // 创建写入单个寄存器的请求(功能码06) - WriteSingleRegisterRequest request = new WriteSingleRegisterRequest(registerAddr, new SimpleRegister(value)); - ModbusTCPTransaction transaction = new ModbusTCPTransaction(conn); - transaction.setRequest(request); - transaction.setRetries(2); - - try { - transaction.execute(); - logger.info("TCP写入单个寄存器成功,地址:{},值:{}", registerAddr, value); - return true; - } catch (ModbusException e) { - logger.error("TCP写入单个寄存器失败,地址:{},值:{}", registerAddr, value, e); - throw e; - } - } - - private boolean writeTcpMultipleRegisters(TCPMasterConnection conn, int startAddr, int[] values) throws ModbusException { - if (!conn.isConnected()) { - throw new ModbusIOException("TCP连接未建立,无法写入"); - } - // 转换值数组为寄存器数组 - Register[] registers = new Register[values.length]; - for (int i = 0; i < values.length; i++) { - registers[i] = new SimpleRegister(values[i]); - } - // 创建写入多个寄存器的请求(功能码16) - WriteMultipleRegistersRequest request = new WriteMultipleRegistersRequest(startAddr, registers); - ModbusTCPTransaction transaction = new ModbusTCPTransaction(conn); - transaction.setRequest(request); - transaction.setRetries(2); - - try { - transaction.execute(); - logger.info("TCP写入多个寄存器成功,起始地址:{},数量:{}", startAddr, values.length); - return true; - } catch (ModbusException e) { - logger.error("TCP写入多个寄存器失败,起始地址:{}", startAddr, e); - throw e; - } - } - - - // ==================== RTU写入实现 ==================== - private boolean writeRtuSingleRegister(SerialConnection connection, int registerAddr, int value) throws ModbusException { - if (!connection.isOpen()) { - throw new ModbusIOException("RTU串口未打开,请先建立连接"); - } - WriteSingleRegisterRequest request = new WriteSingleRegisterRequest(registerAddr, new SimpleRegister(value)); - ModbusSerialTransaction transaction = new ModbusSerialTransaction(connection); - transaction.setRequest(request); - transaction.setRetries(2); - - try { - transaction.execute(); - logger.info("RTU写入单个寄存器成功,地址:{},值:{}", registerAddr, value); - return true; - } catch (ModbusException e) { - logger.error("RTU写入单个寄存器失败,地址:{},值:{}", registerAddr, value, e); - throw e; - } - } - - private boolean writeRtuMultipleRegisters(SerialConnection connection, int startAddr, int[] values) throws ModbusException { - if (!connection.isOpen()) { - throw new ModbusIOException("RTU串口未打开,请先建立连接"); - } - Register[] registers = new Register[values.length]; - for (int i = 0; i < values.length; i++) { - registers[i] = new SimpleRegister(values[i]); - } - WriteMultipleRegistersRequest request = new WriteMultipleRegistersRequest(startAddr, registers); - ModbusSerialTransaction transaction = new ModbusSerialTransaction(connection); - transaction.setRequest(request); - transaction.setRetries(2); - - try { - transaction.execute(); - logger.info("RTU写入多个寄存器成功,起始地址:{},数量:{}", startAddr, values.length); - return true; - } catch (ModbusException e) { - logger.error("RTU写入多个寄存器失败,起始地址:{}", startAddr, e); - throw e; - } - } - - - // ==================== 写入操作的降级方法 ==================== - public boolean writeRegisterFallback(Object connection, int addr, int value, Exception e) { - logger.warn("写入单个寄存器降级(原因: {}),地址:{}", e.getMessage(), addr); - return false; - } - - public boolean writeRegisterFallback(Object connection, int startAddr, int[] values, Exception e) { - logger.warn("写入多个寄存器降级(原因: {}),起始地址:{}", e.getMessage(), startAddr); - return false; - } -} \ No newline at end of file diff --git a/ems-quartz/src/main/java/com/xzzn/quartz/task/ProtectionPlanTask.java b/ems-quartz/src/main/java/com/xzzn/quartz/task/ProtectionPlanTask.java index dc65f1a..9842bf4 100644 --- a/ems-quartz/src/main/java/com/xzzn/quartz/task/ProtectionPlanTask.java +++ b/ems-quartz/src/main/java/com/xzzn/quartz/task/ProtectionPlanTask.java @@ -25,9 +25,9 @@ import com.xzzn.ems.mapper.EmsFaultIssueLogMapper; import com.xzzn.ems.mapper.EmsFaultProtectionPlanMapper; import com.xzzn.ems.mapper.EmsStrategyRunningMapper; import com.xzzn.ems.service.IEmsFaultProtectionPlanService; -import com.xzzn.framework.manager.ModbusConnectionManager; -import com.xzzn.framework.manager.ModbusConnectionWrapper; -import com.xzzn.framework.web.service.ModbusService; +import com.xzzn.common.core.modbus.ModbusProcessor; +import com.xzzn.common.core.modbus.domain.DeviceConfig; +import com.xzzn.common.core.modbus.domain.WriteTagConfig; import java.math.BigDecimal; import java.util.ArrayList; @@ -75,9 +75,7 @@ public class ProtectionPlanTask { @Autowired private EmsDevicesSettingMapper emsDevicesSettingMapper; @Autowired - private ModbusConnectionManager connectionManager; - @Autowired - private ModbusService modbusService; + private ModbusProcessor modbusProcessor; @Autowired private EmsFaultIssueLogMapper emsFaultIssueLogMapper; @@ -248,25 +246,33 @@ public class ProtectionPlanTask { // 获取设备地址信息 EmsDevicesSetting device = emsDevicesSettingMapper.getDeviceBySiteAndDeviceId(deviceId, siteId); if (device == null || StringUtils.isEmpty(device.getIpAddress()) || device.getIpPort()==null) { - return; - } - // 获取设备连接 - ModbusConnectionWrapper wrapper = connectionManager.getConnection(device); - if (wrapper == null || !wrapper.isActive()) { - logger.info("<设备连接无效>"); + logger.warn("设备信息不完整,deviceId:{}", deviceId); return; } - // 写入寄存器 - boolean success = modbusService.writeSingleRegister( - wrapper.getConnection(), - 1, - plan.getValue().intValue()); + // 构建设备配置 + DeviceConfig config = new DeviceConfig(); + config.setHost(device.getIpAddress()); + config.setPort(device.getIpPort().intValue()); + config.setSlaveId(device.getSlaveId().intValue()); + config.setDeviceName(device.getDeviceName()); + config.setDeviceNumber(device.getDeviceId()); + + // 构建写入标签配置 + WriteTagConfig writeTag = new WriteTagConfig(); + writeTag.setAddress(plan.getPoint()); + writeTag.setValue(plan.getValue()); + + List writeTags = new ArrayList<>(); + writeTags.add(writeTag); + config.setWriteTags(writeTags); + + // 写入数据到设备 + boolean success = modbusProcessor.writeDataToDeviceWithRetry(config); if (!success) { logger.error("写入失败,设备地址:{}", device.getIpAddress()); } - } // 校验释放值是否取消方案