dev #2

Merged
dashixiong merged 349 commits from dev into main 2026-02-11 01:55:46 +00:00
375 changed files with 55827 additions and 1130 deletions
Showing only changes of commit a31a1a1caa - Show all commits

View File

@ -1,66 +1,62 @@
package com.xzzn.common.core.modbus;
import com.serotonin.modbus4j.ModbusFactory;
import com.serotonin.modbus4j.ModbusMaster;
import com.serotonin.modbus4j.ip.IpParameters;
import com.xzzn.common.core.modbus.domain.DeviceConfig;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PreDestroy;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* Modbus连接管理器
* 不使用连接池,每次请求创建新连接,使用完立即销毁
* 避免复用已被服务端断开的连接导致Connection reset
*/
@Component
public class Modbus4jConnectionManager {
private static final Logger logger = LoggerFactory.getLogger(Modbus4jConnectionManager.class);
private final Map<String, GenericObjectPool<ModbusMaster>> connectionPools = new ConcurrentHashMap<>();
private final ModbusFactory modbusFactory = new ModbusFactory();
/**
* 创建新连接
*/
public ModbusMaster borrowMaster(DeviceConfig config) throws Exception {
String poolKey = getPoolKey(config);
GenericObjectPool<ModbusMaster> pool = connectionPools.computeIfAbsent(poolKey, key -> {
PooledModbusMasterFactory factory = new PooledModbusMasterFactory(config.getHost(), config.getPort());
GenericObjectPoolConfig<ModbusMaster> poolConfig = new GenericObjectPoolConfig<>();
poolConfig.setMaxTotal(20); // 池中最大连接数
poolConfig.setMinIdle(4); // 最小空闲连接数
poolConfig.setTestOnBorrow(true); // 借用时测试连接有效性
poolConfig.setTestOnReturn(true); // 归还时测试连接有效性
poolConfig.setTestWhileIdle(true); // 空闲时测试连接有效性
poolConfig.setMaxWaitMillis(3000); // 获取连接的最大等待时间3秒
poolConfig.setTimeBetweenEvictionRunsMillis(30000); // 30秒检查一次空闲连接
poolConfig.setMinEvictableIdleTimeMillis(60000); // 空闲超过60秒的连接可以被驱逐
return new GenericObjectPool<>(factory, poolConfig);
});
return pool.borrowObject();
IpParameters params = new IpParameters();
params.setHost(config.getHost());
params.setPort(config.getPort());
params.setEncapsulated(false);
ModbusMaster master = modbusFactory.createTcpMaster(params, true);
master.init();
logger.debug("创建新Modbus连接: {}:{}", config.getHost(), config.getPort());
return master;
}
/**
* 关闭连接
*/
public void returnMaster(DeviceConfig config, ModbusMaster master) {
if (master == null) {
return;
}
String poolKey = getPoolKey(config);
GenericObjectPool<ModbusMaster> pool = connectionPools.get(poolKey);
if (pool != null) {
pool.returnObject(master);
destroyMaster(master, config);
}
/**
* 废弃连接与returnMaster相同
*/
public void invalidateMaster(DeviceConfig config, ModbusMaster master) {
destroyMaster(master, config);
}
private void destroyMaster(ModbusMaster master, DeviceConfig config) {
if (master != null) {
try {
master.destroy();
logger.debug("已关闭Modbus连接: {}:{}", config.getHost(), config.getPort());
} catch (Exception e) {
logger.warn("关闭Modbus连接异常: {}:{}", config.getHost(), config.getPort(), e);
}
}
}
private String getPoolKey(DeviceConfig config) {
return config.getHost() + ":" + config.getPort();
}
@PreDestroy
public void shutdown() {
connectionPools.values().forEach(GenericObjectPool::close);
}
}

View File

@ -34,6 +34,7 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import static com.xzzn.common.enums.RegisterType.COIL;
@ -43,8 +44,12 @@ import static com.xzzn.common.enums.RegisterType.DISCRETE_INPUT;
public class ModbusProcessor {
private static final Logger logger = LoggerFactory.getLogger(ModbusProcessor.class);
private int readTimeout = 5000;
private int writeTimeout = 3000;
@Value("${modbus.read-timeout:8000}")
private int readTimeout;
@Value("${modbus.write-timeout:5000}")
private int writeTimeout;
@Value("${modbus.read-retries:1}")
private int readRetries;
@Autowired
private RedisCache redisCache;
@ -54,20 +59,25 @@ public class ModbusProcessor {
public boolean writeDataToDevice(DeviceConfig config) {
logger.info("writeDataToDevice: {}", JSON.toJSONString(config));
ModbusMaster master = null;
boolean result;
boolean result = false;
boolean hasError = false;
try {
master = connectionManager.borrowMaster(config);
// 设置了Modbus通信的超时时间为3000毫秒3秒。当主设备与从设备通信时若在3秒内未收到响应则认为通信超时并抛出异常。这有助于避免长时间等待无响应的设备。
master.setTimeout(writeTimeout);
result = writeTagValue(master, config, config.getWriteTags());
} catch (Exception e) {
logger.error("Failed to borrow connection or write to devices '{}'", config.getDeviceName(), e);
result = false;
hasError = true;
}
finally {
// 关键:无论成功与否,都必须将连接归还到池中
if (master != null) {
connectionManager.returnMaster(config, master);
if (hasError) {
// 发生异常时废弃连接,下次重新创建
connectionManager.invalidateMaster(config, master);
} else {
// 正常时归还连接
connectionManager.returnMaster(config, master);
}
}
}
return result;
@ -224,16 +234,14 @@ public class ModbusProcessor {
ModbusMaster master = connectionManager.borrowMaster(config);
// 设置了Modbus通信的超时时间为5000毫秒5秒。当主设备与从设备通信时若在5秒内未收到响应则认为通信超时并抛出异常。这有助于避免长时间等待无响应的设备。
master.setTimeout(readTimeout);
master.setRetries(readRetries);
return master;
}
public Map<String, Object> readDataFromDevice(DeviceConfig config, ModbusMaster master) {
Map<String, Object> deviceData = new HashMap<>();
// ModbusMaster master = null; // 将master的声明提前
boolean hasError = false;
try {
// master = connectionManager.borrowMaster(config);
// 设置了Modbus通信的超时时间为3000毫秒3秒。当主设备与从设备通信时若在3秒内未收到响应则认为通信超时并抛出异常。这有助于避免长时间等待无响应的设备。
// master.setTimeout(5000);
BatchResults<String> results = readTagValues(master, config.getSlaveId(), config.getTags());
for (TagConfig tag : config.getTags()) {
if (Objects.equals(tag.getDataType(), "FOUR_BYTE_FLOAT_DBCA")){
@ -243,28 +251,22 @@ public class ModbusProcessor {
}else {
deviceData.put(tag.getKey(), results.getValue(tag.getKey()));
}
// try {
// Object value = readTagValue(master, config.getSlaveId(), tag);
// if (value != null) {
// deviceData.put(tag.getKey(), value);
// }
// } catch (Exception e) {
// logger.error("Failed to read tag '{}' from devices '{}'", tag.getKey(), config.getDeviceName(), e);
// }
}
} catch (Exception e) {
logger.error("Failed read from devices '{}'", config.getDeviceName(), e);
hasError = true;
}
finally {
// 关键:无论成功与否,都必须将连接归还到池中
if (master != null) {
connectionManager.returnMaster(config, master);
if (hasError) {
// 发生异常时废弃连接,下次重新创建
connectionManager.invalidateMaster(config, master);
} else {
// 正常时归还连接
connectionManager.returnMaster(config, master);
}
}
}
// String deviceNumber = config.getDeviceNumber();
// redisCache.setCacheObject(deviceNumber, deviceData);
return deviceData;
}
@ -340,77 +342,101 @@ public class ModbusProcessor {
private BatchResults<String> readTagValues(ModbusMaster master, int slaveId, List<TagConfig> tags) throws Exception {
try {
BatchRead<String> batch = new BatchRead<>();
tags.forEach(tag -> {
Map<Integer, RegisterType> type = ModBusType.REGISTER_TYPE;
Map<String, Integer> DATA_LENGTH = ModBusType.LENGTH;
int firstDigit = Integer.parseInt(tag.getAddress().substring(0, 1));
int address = 0;
int addressLength = tag.getAddress().length();
int exp = (int) Math.pow(10, addressLength-1);
if (firstDigit != 0){
int digit = Integer.parseInt(tag.getAddress());
address = digit % (exp);
}else {
address = Integer.parseInt(tag.getAddress());
}
RegisterType registerType = type.get(firstDigit);
int dataLength = DATA_LENGTH.get(tag.getDataType());
switch (registerType) {
case COIL: {
BaseLocator<Boolean> loc = BaseLocator.coilStatus(slaveId, address);
batch.addLocator(tag.getKey(), loc);
break;
}
case DISCRETE_INPUT: {
BaseLocator<Boolean> loc = BaseLocator.inputStatus(slaveId, address);
batch.addLocator(tag.getKey(), loc);
break;
}
case HOLDING_REGISTER: {
// logger.info("HOLDING_REGISTER: {}",tag.getAddress());
if (dataLength == 28){
BaseLocator<Number> locator = BaseLocator.holdingRegister(slaveId, address, 4);
batch.addLocator(tag.getKey(), locator);
}else {
BaseLocator<Number> loc = BaseLocator.holdingRegister(slaveId, address, dataLength);
batch.addLocator(tag.getKey(), loc);
}
break;
}
case INPUT_REGISTER: {
// logger.info("INPUT_REGISTER: {}",tag.getAddress());
BaseLocator<Number> loc = BaseLocator.inputRegister(slaveId, address, dataLength);
batch.addLocator(tag.getKey(), loc);
break;
}
}
});
BatchResults<String> results = master.send(batch);
List<String> logInfoList = new ArrayList<>();
for (TagConfig tag : tags){
StringBuilder logInfo = new StringBuilder();
logInfo.append(tag.getAddress());
if (tag.getBit()!=null){
// logger.info("批处理读取寄存器成功: {}",tag.getAddress() +"(" + tag.getBit() + "):" + results.getValue(tag.getKey()));
logInfo.append("(" + tag.getBit() + "):");
}else {
// logger.info("批处理读取寄存器成功: {}",tag.getAddress() + ":" + results.getValue(tag.getKey()));
logInfo.append(":");
}
logInfo.append(results.getValue(tag.getKey()));
logInfoList.add(logInfo.toString());
}
logger.info("批处理读取寄存器成功: {}", JSON.toJSONString(logInfoList));
BatchResults<String> results = sendBatchRead(master, slaveId, tags);
logBatchResults(tags, results);
return results;
} catch (Exception e){
if (isTimeoutException(e)) {
logger.warn("批量读取超时尝试降级为单点读取slaveId: {}", slaveId);
BatchResults<String> fallback = new BatchResults<>();
for (TagConfig tag : tags) {
Object value = readTagValue(master, slaveId, tag);
fallback.addResult(tag.getKey(), value);
}
logBatchResults(tags, fallback);
return fallback;
}
logger.error("Failed to read master '{}'", slaveId, e);
throw new Exception(e);
}
}
private BatchResults<String> sendBatchRead(ModbusMaster master, int slaveId, List<TagConfig> tags) throws Exception {
BatchRead<String> batch = new BatchRead<>();
tags.forEach(tag -> {
Map<Integer, RegisterType> type = ModBusType.REGISTER_TYPE;
Map<String, Integer> DATA_LENGTH = ModBusType.LENGTH;
int firstDigit = Integer.parseInt(tag.getAddress().substring(0, 1));
int address = 0;
int addressLength = tag.getAddress().length();
int exp = (int) Math.pow(10, addressLength - 1);
if (firstDigit != 0) {
int digit = Integer.parseInt(tag.getAddress());
address = digit % (exp);
} else {
address = Integer.parseInt(tag.getAddress());
}
RegisterType registerType = type.get(firstDigit);
int dataLength = DATA_LENGTH.get(tag.getDataType());
switch (registerType) {
case COIL: {
BaseLocator<Boolean> loc = BaseLocator.coilStatus(slaveId, address);
batch.addLocator(tag.getKey(), loc);
break;
}
case DISCRETE_INPUT: {
BaseLocator<Boolean> loc = BaseLocator.inputStatus(slaveId, address);
batch.addLocator(tag.getKey(), loc);
break;
}
case HOLDING_REGISTER: {
if (dataLength == 28) {
BaseLocator<Number> locator = BaseLocator.holdingRegister(slaveId, address, 4);
batch.addLocator(tag.getKey(), locator);
} else {
BaseLocator<Number> loc = BaseLocator.holdingRegister(slaveId, address, dataLength);
batch.addLocator(tag.getKey(), loc);
}
break;
}
case INPUT_REGISTER: {
BaseLocator<Number> loc = BaseLocator.inputRegister(slaveId, address, dataLength);
batch.addLocator(tag.getKey(), loc);
break;
}
}
});
return master.send(batch);
}
private void logBatchResults(List<TagConfig> tags, BatchResults<String> results) {
List<String> logInfoList = new ArrayList<>();
for (TagConfig tag : tags) {
StringBuilder logInfo = new StringBuilder();
logInfo.append(tag.getAddress());
if (tag.getBit() != null) {
logInfo.append("(").append(tag.getBit()).append("):");
} else {
logInfo.append(":");
}
logInfo.append(results.getValue(tag.getKey()));
logInfoList.add(logInfo.toString());
}
logger.info("批处理读取寄存器成功: {}", JSON.toJSONString(logInfoList));
}
private boolean isTimeoutException(Throwable e) {
Throwable current = e;
while (current != null) {
if (current instanceof com.serotonin.modbus4j.sero.messaging.TimeoutException) {
return true;
}
current = current.getCause();
}
return false;
}
public static float convertValueToFloat(Object value) {
if (!(value instanceof Number)) {
throw new IllegalArgumentException("Input must be a Number");

View File

@ -31,9 +31,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import java.util.stream.Collectors;
import org.eclipse.paho.client.mqttv3.MqttException;
@ -56,10 +58,14 @@ public class ModbusPoller {
private final ObjectMapper objectMapper = new ObjectMapper();
private final Map<String, Integer> deviceFailureCounts = new ConcurrentHashMap<>();
@Resource(name = "modbusExecutor")
private ExecutorService modbusExecutor;
@Autowired
private ModbusProcessor modbusProcessor;
@Autowired
private IEmsAlarmRecordsService iEmsAlarmRecordsService;
@Autowired
private ISysJobService iSysJobService;
@Autowired
@ -99,8 +105,8 @@ public class ModbusPoller {
return;
}
// 按 host:port 分组
Map<String, List<DeviceConfig>> groupedConfigs = new HashMap<>();
// 按主机IP分组同一网关的不同端口也归为一组避免并发访问导致Connection Reset
Map<String, List<DeviceConfig>> groupedByHost = new HashMap<>();
for (Path filePath : jsonFiles) {
DeviceConfig config = null;
try {
@ -110,51 +116,44 @@ public class ModbusPoller {
continue;
}
if (config.isEnabled()) {
String key = config.getHost() + ":" + config.getPort();
groupedConfigs.computeIfAbsent(key, k -> new ArrayList<>()).add(config);
// 只按主机IP分组确保同一网关的所有端口串行访问
String hostKey = config.getHost();
groupedByHost.computeIfAbsent(hostKey, k -> new ArrayList<>()).add(config);
}
}
int interval = getScheduledTaskInterval();
// 为每个 host:port 启动一个任务
for (Map.Entry<String, List<DeviceConfig>> entry : groupedConfigs.entrySet()) {
String groupKey = entry.getKey();
List<DeviceConfig> configs = entry.getValue();
try {
CompletableFuture.runAsync(() -> {
for (DeviceConfig config : configs) {
try {
scheduledStart(config);
} catch (Exception e) {
log.error("采集设备数据异常: {}", config.getDeviceName(), e);
}
}
})
.exceptionally(e -> {
log.error("采集设备数据{}轮询异常", groupKey, e);
return null;
})
.thenRun(() -> log.info("采集设备数据{}轮询任务执行完成", groupKey));
} catch (Exception e) {
log.error("采集设备数据{}任务失败", groupKey, e);
// 使用单线程 executor 串行执行所有主机的 Modbus 操作
// 将所有主机的设备按顺序串行处理,避免任何并发访问
modbusExecutor.submit(() -> {
for (Map.Entry<String, List<DeviceConfig>> entry : groupedByHost.entrySet()) {
String hostKey = entry.getKey();
List<DeviceConfig> configs = entry.getValue();
for (DeviceConfig config : configs) {
try {
scheduledStart(config);
// 每次读取后等待200ms给Modbus网关足够的处理时间
Thread.sleep(200);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
log.warn("Modbus轮询被中断");
return;
} catch (Exception e) {
log.error("采集设备数据异常: {}", config.getDeviceName(), e);
}
}
log.info("采集设备数据{}轮询任务执行完成", hostKey);
}
}
});
}
public void scheduledStart(DeviceConfig config) {
if (config.isEnabled()) {
log.info("Reading data from devices: {}", config.getDeviceName());
ModbusMaster master = null;
try {
master = modbusProcessor.borrowMaster(config);
} catch (Exception e) {
log.error("Failed to borrow connection '{}'", config.getDeviceName(), e);
// 处理设备连接失败的情况,更新设备状态为离线,添加报警记录
addDeviceOfflineRecord(siteId, config.getDeviceNumber());
return;
}
// 带重试的读取最多重试2次
Map<String, Object> data = readWithRetry(config, 2);
List<String> rawValuEmptyList = new ArrayList<>();
Map<String, Object> data = modbusProcessor.readDataFromDevice(config, master);
// 在这里处理采集到的数据空
config.getTags().forEach(tag -> {
Object rawValue = data.get(tag.getKey());
@ -191,6 +190,51 @@ public class ModbusPoller {
}
}
/**
* 带重试的读取方法
*/
private Map<String, Object> readWithRetry(DeviceConfig config, int maxRetries) {
Map<String, Object> data = new HashMap<>();
for (int attempt = 0; attempt <= maxRetries; attempt++) {
try {
ModbusMaster master = modbusProcessor.borrowMaster(config);
data = modbusProcessor.readDataFromDevice(config, master);
// 如果读取成功(有数据),直接返回
if (!data.isEmpty()) {
if (attempt > 0) {
log.info("设备 {} 第 {} 次重试成功", config.getDeviceName(), attempt);
}
return data;
}
// 读取返回空数据,等待后重试
if (attempt < maxRetries) {
log.warn("设备 {} 读取返回空数据等待1秒后重试 ({}/{})",
config.getDeviceName(), attempt + 1, maxRetries);
Thread.sleep(1000);
}
} catch (Exception e) {
log.error("设备 {} 读取异常 ({}/{}): {}",
config.getDeviceName(), attempt + 1, maxRetries, e.getMessage());
if (attempt < maxRetries) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}
// 所有重试都失败
log.error("设备 {} 读取失败,已重试 {} 次", config.getDeviceName(), maxRetries);
return data;
}
private void processingData(Map<String, Object> data, String deviceNumber) {
if (CollectionUtils.isEmpty(data)) {
// 增加失败计数
@ -270,4 +314,4 @@ public class ModbusPoller {
return Math.toIntExact(CronUtils.getNextExecutionIntervalMillis(sysJobs.get(0).getCronExpression()));
}
}
}

View File

@ -39,8 +39,10 @@ import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import javax.annotation.Resource;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
@ -85,28 +87,26 @@ public class StrategyPoller {
@Autowired
private ModbusProcessor modbusProcessor;
@Resource(name = "modbusExecutor")
private ExecutorService modbusExecutor;
public void pollAllDevices() {
logger.info("开始执行运行策略数据轮询...");
List<StrategyRunningVo> strategyRunningVoList = emsStrategyRunningMapper.getPendingPollerStrategy(null);
strategyRunningVoList.forEach(strategyVo -> {
Long strategyId = strategyVo.getId();
if (strategyLocks.putIfAbsent(strategyId, true) == null) {
try {
CompletableFuture.runAsync(() -> {
processData(strategyVo);
})
.exceptionally(e -> {
logger.error("运行策略{}轮询异常", strategyVo.getId(), e);
return null;
})
.thenRun(() -> {
logger.info("运行策略{}轮询任务执行完成,释放锁", strategyVo.getId());
strategyLocks.remove(strategyId);
});
} catch (Exception e) {
logger.error("运行策略{}任务失败", strategyVo.getId(), e);
strategyLocks.remove(strategyId);
}
// 使用共享的modbusExecutor串行执行避免与ModbusPoller并发访问导致通讯故障
modbusExecutor.submit(() -> {
try {
processData(strategyVo);
} catch (Exception e) {
logger.error("运行策略{}轮询异常", strategyVo.getId(), e);
} finally {
logger.info("运行策略{}轮询任务执行完成,释放锁", strategyVo.getId());
strategyLocks.remove(strategyId);
}
});
} else {
logger.info("策略{}已在处理中,跳过重复执行", strategyId);
}