修正 modbus 长连接

This commit is contained in:
2026-01-20 21:22:35 +08:00
parent 939bcbe950
commit fec45dac03
6 changed files with 91 additions and 732 deletions

View File

@ -7,12 +7,16 @@ import com.xzzn.common.core.modbus.domain.DeviceConfig;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** /**
* Modbus连接管理器 * Modbus连接管理器
* 使用连接池,每次请求创建新连接,使用完立即销毁 * 使用连接模式,维护连接缓存,复用已有连接
* 避免复用已被服务端断开的连接导致Connection reset
*/ */
@Component @Component
public class Modbus4jConnectionManager { public class Modbus4jConnectionManager {
@ -20,10 +24,47 @@ public class Modbus4jConnectionManager {
private final ModbusFactory modbusFactory = new ModbusFactory(); private final ModbusFactory modbusFactory = new ModbusFactory();
private final Map<String, ModbusMaster> connectionCache = new ConcurrentHashMap<>();
/** /**
* 创建新连接 * 获取或创建连接(长连接模式)
*/ */
public ModbusMaster borrowMaster(DeviceConfig config) throws Exception { public ModbusMaster borrowMaster(DeviceConfig config) throws Exception {
String key = buildConnectionKey(config);
ModbusMaster master = connectionCache.get(key);
if (master == null) {
synchronized (this) {
master = connectionCache.get(key);
if (master == null) {
master = createNewConnection(config);
connectionCache.put(key, master);
logger.info("创建新Modbus长连接: {}:{}", config.getHost(), config.getPort());
}
}
}
return master;
}
/**
* 归还连接(长连接模式,不关闭)
*/
public void returnMaster(DeviceConfig config, ModbusMaster master) {
}
/**
* 废弃连接(发生异常时关闭并移除)
*/
public void invalidateMaster(DeviceConfig config, ModbusMaster master) {
String key = buildConnectionKey(config);
connectionCache.remove(key);
destroyMaster(master, config);
logger.warn("废弃并移除Modbus连接: {}:{}", config.getHost(), config.getPort());
}
private ModbusMaster createNewConnection(DeviceConfig config) throws Exception {
IpParameters params = new IpParameters(); IpParameters params = new IpParameters();
params.setHost(config.getHost()); params.setHost(config.getHost());
params.setPort(config.getPort()); params.setPort(config.getPort());
@ -31,24 +72,10 @@ public class Modbus4jConnectionManager {
ModbusMaster master = modbusFactory.createTcpMaster(params, true); ModbusMaster master = modbusFactory.createTcpMaster(params, true);
master.init(); master.init();
logger.debug("创建新Modbus连接: {}:{}", config.getHost(), config.getPort());
return master; return master;
} }
/**
* 关闭连接
*/
public void returnMaster(DeviceConfig config, ModbusMaster master) {
destroyMaster(master, config);
}
/**
* 废弃连接与returnMaster相同
*/
public void invalidateMaster(DeviceConfig config, ModbusMaster master) {
destroyMaster(master, config);
}
private void destroyMaster(ModbusMaster master, DeviceConfig config) { private void destroyMaster(ModbusMaster master, DeviceConfig config) {
if (master != null) { if (master != null) {
try { try {
@ -59,4 +86,26 @@ public class Modbus4jConnectionManager {
} }
} }
} }
private String buildConnectionKey(DeviceConfig config) {
return config.getHost() + ":" + config.getPort();
}
/**
* 关闭所有连接
*/
public void closeAllConnections() {
for (Map.Entry<String, ModbusMaster> entry : connectionCache.entrySet()) {
try {
if (entry.getValue() != null) {
entry.getValue().destroy();
}
} catch (Exception e) {
logger.warn("关闭Modbus连接异常: {}", entry.getKey(), e);
}
}
connectionCache.clear();
logger.info("已关闭所有Modbus连接");
}
} }

View File

@ -1,38 +0,0 @@
package com.xzzn.framework.config;
import com.xzzn.framework.manager.ModbusConnectionManager;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ModbusConfig {
@Value("${modbus.pool.max-total:20}")
private int maxTotal;
@Value("${modbus.pool.max-idle:10}")
private int maxIdle;
@Value("${modbus.pool.min-idle:3}")
private int minIdle;
@Value("${modbus.pool.max-wait:3000}")
private long maxWaitMillis;
@Value("${modbus.pool.time-between-eviction-runs:30000}")
private long timeBetweenEvictionRunsMillis;
@Value("${modbus.pool.min-evictable-idle-time:60000}")
private long minEvictableIdleTimeMillis;
public ModbusConnectionManager modbusConnectionManager() {
ModbusConnectionManager manager = new ModbusConnectionManager();
manager.setMaxTotal(maxTotal);
manager.setMaxIdle(maxIdle);
manager.setMinIdle(minIdle);
manager.setMaxWaitMillis(maxWaitMillis);
manager.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
manager.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
return manager;
}
}

View File

@ -1,298 +0,0 @@
package com.xzzn.framework.manager;
import com.ghgande.j2mod.modbus.net.SerialConnection;
import com.ghgande.j2mod.modbus.net.TCPMasterConnection;
import com.ghgande.j2mod.modbus.util.SerialParameters;
import com.xzzn.common.enums.DeviceType;
import com.xzzn.ems.domain.EmsDevicesSetting;
import com.xzzn.ems.mapper.EmsDevicesSettingMapper;
import com.xzzn.ems.service.IEmsAlarmRecordsService;
import com.xzzn.ems.service.IEmsDeviceSettingService;
import com.xzzn.ems.service.IEmsEnergyPriceConfigService;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
@Component
public class ModbusConnectionManager implements ApplicationRunner {
private static final Logger logger = LoggerFactory.getLogger(ModbusConnectionManager.class);
private final Map<Integer, ModbusConnectionWrapper> connectionPool = new ConcurrentHashMap<>();
// 连接池配置参数
private int maxTotal = 20;
private int maxIdle = 10;
private int minIdle = 3;
private long maxWaitMillis = 3000;
private long timeBetweenEvictionRunsMillis = 30000;
private long minEvictableIdleTimeMillis = 60000;
private int connectTimeOut = 5000;
private ScheduledExecutorService scheduler;
@Autowired
private EmsDevicesSettingMapper deviceRepo;
@Autowired
private IEmsAlarmRecordsService iEmsAlarmRecordsService;
@Autowired
private IEmsEnergyPriceConfigService iEmsEnergyPriceConfigService;
@Autowired
private IEmsDeviceSettingService iEmsDeviceSettingService;
@Override
public void run(ApplicationArguments args) throws Exception {
init();
}
public void init() {
// 启动心跳检测线程
scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(this::heartbeatCheck, 1, 5, TimeUnit.MINUTES);
logger.info("Modbus连接管理器已初始化");
// 初始数据工作
initData();
}
private void initData() {
// 初始化-设备信息
iEmsDeviceSettingService.initDeviceInfo();
// 初始化-告警数据
iEmsAlarmRecordsService.initAlarmMatchInfo();
// 初始化当月电价
iEmsEnergyPriceConfigService.initCurrentMonthPrice();
}
/**
* 获取连接(带自动创建和缓存)
*/
public ModbusConnectionWrapper getConnection(EmsDevicesSetting device) throws Exception {
return connectionPool.compute(Math.toIntExact(device.getId()), (id, wrapper) -> {
try {
if (wrapper == null || !wrapper.isActive()) {
if (connectionPool.size() >= maxTotal) {
evictIdleConnection();
}
logger.info("创建新连接: {}", device);
return new ModbusConnectionWrapper(createRawConnection(device));
}
wrapper.updateLastAccess();
return wrapper;
} catch (Exception e) {
throw new RuntimeException("连接创建失败: " + device.getId(), e);
}
});
}
/**
* 创建原始Modbus连接
*/
private Object createRawConnection(EmsDevicesSetting device) throws Exception {
try {
if (DeviceType.TCP.name().equals(device.getDeviceType())) {
InetAddress addr = InetAddress.getByName(device.getIpAddress());
TCPMasterConnection connection = new TCPMasterConnection(addr);
connection.setPort(device.getIpPort().intValue());
connection.setTimeout(connectTimeOut);
connection.connect();
return connection;
} else if (DeviceType.RTU.name().equals(device.getDeviceType())) {
SerialParameters parameters = new SerialParameters();
parameters.setPortName(device.getSerialPort());
parameters.setBaudRate(device.getBaudRate().intValue());
parameters.setDatabits(device.getDataBits().intValue());
parameters.setStopbits(device.getStopBits().intValue());
parameters.setParity(device.getParity());
SerialConnection connection = new SerialConnection(parameters);
connection.setTimeout(connectTimeOut);
connection.open();
return connection;
} else {
throw new IllegalArgumentException("不支持的设备类型: " + device.getDeviceType());
}
} catch (Exception e) {
logger.error("创建Modbus连接失败: {}", device, e);
throw e;
}
}
/**
* 心跳检测
*/
private void heartbeatCheck() {
logger.info("开始监控Modbus连接池状态当前连接数: {}", connectionPool.size());
// 步骤1获取所有活跃设备列表与轮询逻辑共用同一批设备
List<EmsDevicesSetting> activeDevices = null;
if (activeDevices == null || activeDevices.isEmpty()) {
logger.warn("无活跃设备,心跳检测仅清理无效连接");
}
// 步骤2清理无效连接遍历连接池移除已失效的连接
List<Integer> invalidDeviceIds = new ArrayList<>();
connectionPool.forEach((deviceId, wrapper) -> {
try {
if (!wrapper.isActive()) {
logger.info("连接{}已失效,移除连接", deviceId);
invalidDeviceIds.add(deviceId);
wrapper.close();
}
} catch (Exception e) {
logger.error("心跳检测异常: {}", deviceId, e);
}
});
// 批量移除无效连接(避免边遍历边修改)
invalidDeviceIds.forEach(connectionPool::remove);
logger.debug("移除无效连接后,连接池大小: {}", connectionPool.size());
// 步骤3补充关键设备的连接优先保障活跃设备的连接存在
if (!activeDevices.isEmpty()) {
// 3.1 先为所有活跃设备预加载连接(确保需要轮询的设备有连接)
preloadCriticalConnection(activeDevices);
// 3.2 若连接数仍不足minIdle补充额外连接可选避免连接池过小
int currentSize = connectionPool.size();
if (currentSize < minIdle) {
logger.info("连接数{}不足最小空闲数{},补充额外连接", currentSize, minIdle);
// 从活跃设备中选未创建连接的设备补充(避免重复创建)
List<EmsDevicesSetting> needMoreDevices = activeDevices.stream()
.filter(device -> !connectionPool.containsKey(Math.toIntExact(device.getId())))
.limit(minIdle - currentSize) // 只补充差额
.collect(Collectors.toList());
preloadCriticalConnection(needMoreDevices); // 复用预加载方法
}
}
}
/**
* 预加载关键连接
*/
private void preloadCriticalConnection(List<EmsDevicesSetting> devices) {
// 简化示例,不实现具体逻辑
logger.info("预加载连接: 连接池当前大小={}, 最小空闲={}", connectionPool.size(), minIdle);
devices.forEach(device -> {
try {
Integer deviceId = Math.toIntExact(device.getId());
if (!connectionPool.containsKey(deviceId)) {
getConnection(device); // 复用已有创建逻辑
}
} catch (Exception e) {
logger.warn("预加载设备{}连接失败", device.getId(), e);
}
});
}
/**
* 移除最久未使用的空闲连接
*/
private void evictIdleConnection() {
if (connectionPool.isEmpty()) {
return;
}
ModbusConnectionWrapper oldestWrapper = null;
long oldestAccessTime = Long.MAX_VALUE;
for (ModbusConnectionWrapper wrapper : connectionPool.values()) {
if (wrapper.isActive() && wrapper.getLastAccessTime() < oldestAccessTime) {
oldestAccessTime = wrapper.getLastAccessTime();
oldestWrapper = wrapper;
}
}
if (oldestWrapper != null) {
logger.info("移除空闲连接: {}", oldestWrapper.getConnection());
connectionPool.values().remove(oldestWrapper);
oldestWrapper.close();
}
}
// 移除指定设备连接
public void removeConnection(Integer deviceId) {
ModbusConnectionWrapper wrapper = connectionPool.remove(deviceId);
if (wrapper != null) {
wrapper.close(); // 双重保障,确保连接关闭
logger.info("连接池主动移除设备{}的连接", deviceId);
}
}
/**
* 判断是否应该移除空连接池
*/
private boolean shouldRemoveEmptyPool(GenericObjectPool<?> pool) {
// 可根据配置或逻辑决定是否移除空连接池
// 这里简单实现为当连接池数量超过最大值时移除
return connectionPool.size() > maxTotal;
}
/**
* 关闭连接
*/
private void closeConnection(TCPMasterConnection connection) {
try {
if (connection != null && connection.isConnected()) {
connection.close();
}
} catch (Exception e) {
logger.error("关闭Modbus连接失败", e);
}
}
// 容器销毁时关闭线程池
@PreDestroy
public void destroy() {
if (scheduler != null) {
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
}
}
}
// Getters and Setters
public void setMaxTotal(int maxTotal) {
this.maxTotal = maxTotal;
}
public void setMaxIdle(int maxIdle) {
this.maxIdle = maxIdle;
}
public void setMinIdle(int minIdle) {
this.minIdle = minIdle;
}
public void setMaxWaitMillis(long maxWaitMillis) {
this.maxWaitMillis = maxWaitMillis;
}
public void setTimeBetweenEvictionRunsMillis(long timeBetweenEvictionRunsMillis) {
this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;
}
public void setMinEvictableIdleTimeMillis(long minEvictableIdleTimeMillis) {
this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis;
}
}

View File

@ -1,81 +0,0 @@
package com.xzzn.framework.manager;
import com.ghgande.j2mod.modbus.net.SerialConnection;
import com.ghgande.j2mod.modbus.net.TCPMasterConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class ModbusConnectionWrapper {
private static final Logger logger = LoggerFactory.getLogger(ModbusConnectionWrapper.class);
private static final AtomicInteger COUNTER = new AtomicInteger(0);
private final Object connection;
private final int connectionId;
private volatile long lastAccessTime;
private volatile boolean active = true;
public ModbusConnectionWrapper(Object connection) {
this.connection = connection;
this.connectionId = COUNTER.incrementAndGet();
this.lastAccessTime = System.currentTimeMillis();
logger.info("创建连接包装: {}", this);
}
public boolean isActive() {
if (!active) return false;
try {
// 检查连接是否物理上有效
if (connection instanceof TCPMasterConnection) {
return ((TCPMasterConnection) connection).isConnected();
} else if (connection instanceof SerialConnection) {
return ((SerialConnection) connection).isOpen();
}
} catch (Exception e) {
logger.error("连接状态检查失败: {}", connectionId, e);
return false;
}
// 默认检查空闲时间
return System.currentTimeMillis() - lastAccessTime < 300000; // 5分钟
}
public void updateLastAccess() {
this.lastAccessTime = System.currentTimeMillis();
}
public Object getConnection() {
return connection;
}
public void close() {
try {
logger.info("关闭连接: {}", this);
if (connection instanceof TCPMasterConnection) {
((TCPMasterConnection) connection).close();
} else if (connection instanceof SerialConnection) {
((SerialConnection) connection).close();
}
} catch (Exception e) {
logger.error("关闭连接失败: {}", connectionId, e);
} finally {
this.active = false;
}
}
public long getLastAccessTime() {
return lastAccessTime;
}
@Override
public String toString() {
return "ModbusConnectionWrapper{" +
"connectionId=" + connectionId +
", active=" + active +
", lastAccessTime=" + lastAccessTime +
'}';
}
}

View File

@ -1,279 +0,0 @@
package com.xzzn.framework.web.service;
import com.ghgande.j2mod.modbus.ModbusException;
import com.ghgande.j2mod.modbus.ModbusIOException;
import com.ghgande.j2mod.modbus.io.ModbusSerialTransaction;
import com.ghgande.j2mod.modbus.io.ModbusTCPTransaction;
import com.ghgande.j2mod.modbus.msg.ReadInputRegistersRequest;
import com.ghgande.j2mod.modbus.msg.ReadInputRegistersResponse;
import com.ghgande.j2mod.modbus.msg.WriteMultipleRegistersRequest;
import com.ghgande.j2mod.modbus.msg.WriteSingleRegisterRequest;
import com.ghgande.j2mod.modbus.net.SerialConnection;
import com.ghgande.j2mod.modbus.net.TCPMasterConnection;
import com.ghgande.j2mod.modbus.procimg.Register;
import com.ghgande.j2mod.modbus.procimg.SimpleRegister;
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
/**
* Modbus操作服务添加重试机制
*/
@Service
public class ModbusService {
private static final Logger logger = LoggerFactory.getLogger(ModbusService.class);
@Retryable(
value = {ModbusException.class}, // 仅对自定义Modbus异常重试
maxAttempts = 3, // 最大重试3次1次原始调用 + 2次重试
backoff = @Backoff(delay = 1000, multiplier = 2) // 退避策略1s → 2s → 4s
)
@CircuitBreaker(name = "modbusOperation", fallbackMethod = "readRegistersFallback")
public int[] readHoldingRegisters(Object connection, int startAddr, int count) throws ModbusException {
try {
if (connection instanceof TCPMasterConnection) {
return readTcpRegisters((TCPMasterConnection) connection, startAddr, count);
} else if (connection instanceof SerialConnection) {
return readRtuRegisters((SerialConnection) connection, startAddr, count);
}
throw new IllegalArgumentException("不支持的连接类型: " + connection.getClass().getName());
} catch (ModbusIOException e) {
throw new ModbusException("通信故障", e);
} catch (Exception e) {
throw new ModbusException("系统错误", e);
}
}
private int[] readRtuRegisters(SerialConnection connection, int startAddr, int count) throws ModbusException {
if (!connection.isOpen()) {
throw new ModbusIOException("RTU连接未建立");
}
ReadInputRegistersRequest request = new ReadInputRegistersRequest(startAddr, count);
ModbusSerialTransaction transaction = new ModbusSerialTransaction(connection);
transaction.setRequest(request);
transaction.setRetries(2);
try {
transaction.execute();
ReadInputRegistersResponse response = (ReadInputRegistersResponse) transaction.getResponse();
if (response == null) {
throw new ModbusException("RTU响应为空");
}
return parseRegisters(response);
} catch (ModbusException e) {
logger.error("读取RTU寄存器失败: {}", e.getMessage());
throw e;
}
}
private int[] readTcpRegisters(TCPMasterConnection conn, int start, int count) throws ModbusException {
// 验证连接是否已建立
if (!conn.isConnected()) {
throw new ModbusIOException("TCP连接未建立");
}
// 使用正确的功能码03 - 读取保持寄存器ReadHoldingRegistersRequest
ReadInputRegistersRequest request = new ReadInputRegistersRequest(start, count);
ModbusTCPTransaction transaction = new ModbusTCPTransaction(conn);
transaction.setRequest(request);
// 设置超时避免长时间阻塞
transaction.setRetries(2);
try {
transaction.execute();
ReadInputRegistersResponse response = (ReadInputRegistersResponse) transaction.getResponse();
if (response == null) {
throw new ModbusException("Modbus异常响应: " + response.getMessage());
}
// 正确解析寄存器值
return parseRegisters(response);
} catch (ModbusException e) {
// 记录详细错误信息
logger.error("读取TCP寄存器失败: {}", e.getMessage());
throw e;
}
}
/**
* 解析Modbus响应中的寄存器值
*/
private int[] parseRegisters(ReadInputRegistersResponse response) {
int byteCount = response.getByteCount();
int[] result = new int[byteCount / 2];
for (int i = 0; i < result.length; i++) {
// 转换为无符号整数
result[i] = response.getRegisterValue(i) & 0xFFFF;
}
return result;
}
/**
* 熔断降级方法
*/
public int[] readRegistersFallback(Object connection, int startAddr, int count, Exception e) {
logger.warn("Modbus操作降级原因: {}),返回空数据", e.getMessage());
return new int[0];
}
/**
* 写入单个寄存器支持TCP/RTU
* @param connection 连接对象TCPMasterConnection 或 SerialConnection
* @param registerAddr 寄存器地址
* @param value 要写入的值16位整数
*/
@Retryable(
value = {ModbusException.class},
maxAttempts = 3,
backoff = @Backoff(delay = 1000, multiplier = 2)
)
@CircuitBreaker(name = "modbusOperation", fallbackMethod = "writeRegisterFallback")
public boolean writeSingleRegister(Object connection, int registerAddr, int value) throws ModbusException {
try {
if (connection instanceof TCPMasterConnection) {
return writeTcpSingleRegister((TCPMasterConnection) connection, registerAddr, value);
} else if (connection instanceof SerialConnection) {
return writeRtuSingleRegister((SerialConnection) connection, registerAddr, value);
}
throw new IllegalArgumentException("不支持的连接类型: " + connection.getClass().getName());
} catch (ModbusIOException e) {
throw new ModbusException("写入通信故障", e);
} catch (Exception e) {
throw new ModbusException("写入系统错误", e);
}
}
/**
* 写入多个寄存器支持TCP/RTU
* @param connection 连接对象
* @param startAddr 起始寄存器地址
* @param values 要写入的值数组每个值为16位整数
*/
@Retryable(
value = {ModbusException.class},
maxAttempts = 3,
backoff = @Backoff(delay = 1000, multiplier = 2)
)
@CircuitBreaker(name = "modbusOperation", fallbackMethod = "writeRegisterFallback")
public boolean writeMultipleRegisters(Object connection, int startAddr, int[] values) throws ModbusException {
try {
if (connection instanceof TCPMasterConnection) {
return writeTcpMultipleRegisters((TCPMasterConnection) connection, startAddr, values);
} else if (connection instanceof SerialConnection) {
return writeRtuMultipleRegisters((SerialConnection) connection, startAddr, values);
}
throw new IllegalArgumentException("不支持的连接类型: " + connection.getClass().getName());
} catch (ModbusIOException e) {
throw new ModbusException("写入通信故障", e);
} catch (Exception e) {
throw new ModbusException("写入系统错误", e);
}
}
// ==================== TCP写入实现 ====================
private boolean writeTcpSingleRegister(TCPMasterConnection conn, int registerAddr, int value) throws ModbusException {
if (!conn.isConnected()) {
throw new ModbusIOException("TCP连接未建立无法写入");
}
// 创建写入单个寄存器的请求功能码06
WriteSingleRegisterRequest request = new WriteSingleRegisterRequest(registerAddr, new SimpleRegister(value));
ModbusTCPTransaction transaction = new ModbusTCPTransaction(conn);
transaction.setRequest(request);
transaction.setRetries(2);
try {
transaction.execute();
logger.info("TCP写入单个寄存器成功地址:{},值:{}", registerAddr, value);
return true;
} catch (ModbusException e) {
logger.error("TCP写入单个寄存器失败地址:{},值:{}", registerAddr, value, e);
throw e;
}
}
private boolean writeTcpMultipleRegisters(TCPMasterConnection conn, int startAddr, int[] values) throws ModbusException {
if (!conn.isConnected()) {
throw new ModbusIOException("TCP连接未建立无法写入");
}
// 转换值数组为寄存器数组
Register[] registers = new Register[values.length];
for (int i = 0; i < values.length; i++) {
registers[i] = new SimpleRegister(values[i]);
}
// 创建写入多个寄存器的请求功能码16
WriteMultipleRegistersRequest request = new WriteMultipleRegistersRequest(startAddr, registers);
ModbusTCPTransaction transaction = new ModbusTCPTransaction(conn);
transaction.setRequest(request);
transaction.setRetries(2);
try {
transaction.execute();
logger.info("TCP写入多个寄存器成功起始地址:{},数量:{}", startAddr, values.length);
return true;
} catch (ModbusException e) {
logger.error("TCP写入多个寄存器失败起始地址:{}", startAddr, e);
throw e;
}
}
// ==================== RTU写入实现 ====================
private boolean writeRtuSingleRegister(SerialConnection connection, int registerAddr, int value) throws ModbusException {
if (!connection.isOpen()) {
throw new ModbusIOException("RTU串口未打开请先建立连接");
}
WriteSingleRegisterRequest request = new WriteSingleRegisterRequest(registerAddr, new SimpleRegister(value));
ModbusSerialTransaction transaction = new ModbusSerialTransaction(connection);
transaction.setRequest(request);
transaction.setRetries(2);
try {
transaction.execute();
logger.info("RTU写入单个寄存器成功地址:{},值:{}", registerAddr, value);
return true;
} catch (ModbusException e) {
logger.error("RTU写入单个寄存器失败地址:{},值:{}", registerAddr, value, e);
throw e;
}
}
private boolean writeRtuMultipleRegisters(SerialConnection connection, int startAddr, int[] values) throws ModbusException {
if (!connection.isOpen()) {
throw new ModbusIOException("RTU串口未打开请先建立连接");
}
Register[] registers = new Register[values.length];
for (int i = 0; i < values.length; i++) {
registers[i] = new SimpleRegister(values[i]);
}
WriteMultipleRegistersRequest request = new WriteMultipleRegistersRequest(startAddr, registers);
ModbusSerialTransaction transaction = new ModbusSerialTransaction(connection);
transaction.setRequest(request);
transaction.setRetries(2);
try {
transaction.execute();
logger.info("RTU写入多个寄存器成功起始地址:{},数量:{}", startAddr, values.length);
return true;
} catch (ModbusException e) {
logger.error("RTU写入多个寄存器失败起始地址:{}", startAddr, e);
throw e;
}
}
// ==================== 写入操作的降级方法 ====================
public boolean writeRegisterFallback(Object connection, int addr, int value, Exception e) {
logger.warn("写入单个寄存器降级(原因: {}),地址:{}", e.getMessage(), addr);
return false;
}
public boolean writeRegisterFallback(Object connection, int startAddr, int[] values, Exception e) {
logger.warn("写入多个寄存器降级(原因: {}),起始地址:{}", e.getMessage(), startAddr);
return false;
}
}

View File

@ -25,9 +25,9 @@ import com.xzzn.ems.mapper.EmsFaultIssueLogMapper;
import com.xzzn.ems.mapper.EmsFaultProtectionPlanMapper; import com.xzzn.ems.mapper.EmsFaultProtectionPlanMapper;
import com.xzzn.ems.mapper.EmsStrategyRunningMapper; import com.xzzn.ems.mapper.EmsStrategyRunningMapper;
import com.xzzn.ems.service.IEmsFaultProtectionPlanService; import com.xzzn.ems.service.IEmsFaultProtectionPlanService;
import com.xzzn.framework.manager.ModbusConnectionManager; import com.xzzn.common.core.modbus.ModbusProcessor;
import com.xzzn.framework.manager.ModbusConnectionWrapper; import com.xzzn.common.core.modbus.domain.DeviceConfig;
import com.xzzn.framework.web.service.ModbusService; import com.xzzn.common.core.modbus.domain.WriteTagConfig;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.util.ArrayList; import java.util.ArrayList;
@ -75,9 +75,7 @@ public class ProtectionPlanTask {
@Autowired @Autowired
private EmsDevicesSettingMapper emsDevicesSettingMapper; private EmsDevicesSettingMapper emsDevicesSettingMapper;
@Autowired @Autowired
private ModbusConnectionManager connectionManager; private ModbusProcessor modbusProcessor;
@Autowired
private ModbusService modbusService;
@Autowired @Autowired
private EmsFaultIssueLogMapper emsFaultIssueLogMapper; private EmsFaultIssueLogMapper emsFaultIssueLogMapper;
@ -248,25 +246,33 @@ public class ProtectionPlanTask {
// 获取设备地址信息 // 获取设备地址信息
EmsDevicesSetting device = emsDevicesSettingMapper.getDeviceBySiteAndDeviceId(deviceId, siteId); EmsDevicesSetting device = emsDevicesSettingMapper.getDeviceBySiteAndDeviceId(deviceId, siteId);
if (device == null || StringUtils.isEmpty(device.getIpAddress()) || device.getIpPort()==null) { if (device == null || StringUtils.isEmpty(device.getIpAddress()) || device.getIpPort()==null) {
return; logger.warn("设备信息不完整deviceId:{}", deviceId);
}
// 获取设备连接
ModbusConnectionWrapper wrapper = connectionManager.getConnection(device);
if (wrapper == null || !wrapper.isActive()) {
logger.info("<设备连接无效>");
return; return;
} }
// 写入寄存器 // 构建设备配置
boolean success = modbusService.writeSingleRegister( DeviceConfig config = new DeviceConfig();
wrapper.getConnection(), config.setHost(device.getIpAddress());
1, config.setPort(device.getIpPort().intValue());
plan.getValue().intValue()); config.setSlaveId(device.getSlaveId().intValue());
config.setDeviceName(device.getDeviceName());
config.setDeviceNumber(device.getDeviceId());
// 构建写入标签配置
WriteTagConfig writeTag = new WriteTagConfig();
writeTag.setAddress(plan.getPoint());
writeTag.setValue(plan.getValue());
List<WriteTagConfig> writeTags = new ArrayList<>();
writeTags.add(writeTag);
config.setWriteTags(writeTags);
// 写入数据到设备
boolean success = modbusProcessor.writeDataToDeviceWithRetry(config);
if (!success) { if (!success) {
logger.error("写入失败,设备地址:{}", device.getIpAddress()); logger.error("写入失败,设备地址:{}", device.getIpAddress());
} }
} }
// 校验释放值是否取消方案 // 校验释放值是否取消方案