修正 modbus 写入超时问题
This commit is contained in:
@ -119,7 +119,7 @@
|
|||||||
<artifactId>javax.servlet-api</artifactId>
|
<artifactId>javax.servlet-api</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<!-- modbus -->
|
<!-- modbus4j (保留兼容) -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.infiniteautomation</groupId>
|
<groupId>com.infiniteautomation</groupId>
|
||||||
<artifactId>modbus4j</artifactId>
|
<artifactId>modbus4j</artifactId>
|
||||||
@ -127,4 +127,4 @@
|
|||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
</project>
|
</project>
|
||||||
|
|||||||
@ -50,6 +50,8 @@ public class ModbusProcessor {
|
|||||||
private int writeTimeout;
|
private int writeTimeout;
|
||||||
@Value("${modbus.read-retries:1}")
|
@Value("${modbus.read-retries:1}")
|
||||||
private int readRetries;
|
private int readRetries;
|
||||||
|
@Value("${modbus.write-retries:1}")
|
||||||
|
private int writeRetries;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private RedisCache redisCache;
|
private RedisCache redisCache;
|
||||||
@ -64,6 +66,7 @@ public class ModbusProcessor {
|
|||||||
try {
|
try {
|
||||||
master = connectionManager.borrowMaster(config);
|
master = connectionManager.borrowMaster(config);
|
||||||
master.setTimeout(writeTimeout);
|
master.setTimeout(writeTimeout);
|
||||||
|
master.setRetries(0);
|
||||||
result = writeTagValue(master, config, config.getWriteTags());
|
result = writeTagValue(master, config, config.getWriteTags());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("Failed to borrow connection or write to devices '{}'", config.getDeviceName(), e);
|
logger.error("Failed to borrow connection or write to devices '{}'", config.getDeviceName(), e);
|
||||||
@ -91,12 +94,13 @@ public class ModbusProcessor {
|
|||||||
try {
|
try {
|
||||||
master = connectionManager.borrowMaster(config);
|
master = connectionManager.borrowMaster(config);
|
||||||
master.setTimeout(writeTimeout); // 设置超时时间
|
master.setTimeout(writeTimeout); // 设置超时时间
|
||||||
|
master.setRetries(0);
|
||||||
|
|
||||||
// 使用重试装饰器
|
// 使用重试装饰器
|
||||||
ModbusMaster finalMaster = master;
|
ModbusMaster finalMaster = master;
|
||||||
result = RetryableModbusOperation.executeWithRetry(() -> {
|
result = RetryableModbusOperation.executeWithRetry(() -> {
|
||||||
return writeTagValue(finalMaster, config, config.getWriteTags());
|
return writeTagValue(finalMaster, config, config.getWriteTags());
|
||||||
}, 1); // 最大重试1次(共2次尝试)
|
}, writeRetries); // 最大重试次数由配置控制
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("Failed to borrow connection or write to devices '{}'", config.getDeviceName(), e);
|
logger.error("Failed to borrow connection or write to devices '{}'", config.getDeviceName(), e);
|
||||||
|
|||||||
@ -33,7 +33,9 @@ import java.util.Map;
|
|||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
@ -57,6 +59,7 @@ public class ModbusPoller {
|
|||||||
private final ScheduledTask scheduledTask;
|
private final ScheduledTask scheduledTask;
|
||||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||||
private final Map<String, Integer> deviceFailureCounts = new ConcurrentHashMap<>();
|
private final Map<String, Integer> deviceFailureCounts = new ConcurrentHashMap<>();
|
||||||
|
private final AtomicBoolean polling = new AtomicBoolean(false);
|
||||||
|
|
||||||
@Resource(name = "modbusExecutor")
|
@Resource(name = "modbusExecutor")
|
||||||
private ExecutorService modbusExecutor;
|
private ExecutorService modbusExecutor;
|
||||||
@ -89,9 +92,14 @@ public class ModbusPoller {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void pollAllDevices() {
|
public void pollAllDevices() {
|
||||||
|
if (!polling.compareAndSet(false, true)) {
|
||||||
|
log.warn("上一次轮询尚未完成,本次轮询跳过");
|
||||||
|
return;
|
||||||
|
}
|
||||||
Path devicesDir = Paths.get(System.getProperty("user.dir"), "devices");
|
Path devicesDir = Paths.get(System.getProperty("user.dir"), "devices");
|
||||||
if (!Files.exists(devicesDir)) {
|
if (!Files.exists(devicesDir)) {
|
||||||
log.error("Devices目录不存在: {}", devicesDir);
|
log.error("Devices目录不存在: {}", devicesDir);
|
||||||
|
polling.set(false);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -102,6 +110,7 @@ public class ModbusPoller {
|
|||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
log.error("modbusPoller.loadConfigs 获取设备配置文件失败: {}", devicesDir, e);
|
log.error("modbusPoller.loadConfigs 获取设备配置文件失败: {}", devicesDir, e);
|
||||||
|
polling.set(false);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -124,7 +133,7 @@ public class ModbusPoller {
|
|||||||
|
|
||||||
// 使用单线程 executor 串行执行所有主机的 Modbus 操作
|
// 使用单线程 executor 串行执行所有主机的 Modbus 操作
|
||||||
// 将所有主机的设备按顺序串行处理,避免任何并发访问
|
// 将所有主机的设备按顺序串行处理,避免任何并发访问
|
||||||
modbusExecutor.submit(() -> {
|
Future<?> future = modbusExecutor.submit(() -> {
|
||||||
for (Map.Entry<String, List<DeviceConfig>> entry : groupedByHost.entrySet()) {
|
for (Map.Entry<String, List<DeviceConfig>> entry : groupedByHost.entrySet()) {
|
||||||
String hostKey = entry.getKey();
|
String hostKey = entry.getKey();
|
||||||
List<DeviceConfig> configs = entry.getValue();
|
List<DeviceConfig> configs = entry.getValue();
|
||||||
@ -144,6 +153,16 @@ public class ModbusPoller {
|
|||||||
log.info("采集设备数据{}轮询任务执行完成", hostKey);
|
log.info("采集设备数据{}轮询任务执行完成", hostKey);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
try {
|
||||||
|
future.get();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
log.warn("Modbus轮询任务等待中断");
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Modbus轮询任务执行异常", e);
|
||||||
|
} finally {
|
||||||
|
polling.set(false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void scheduledStart(DeviceConfig config) {
|
public void scheduledStart(DeviceConfig config) {
|
||||||
|
|||||||
@ -418,7 +418,7 @@ public class StrategyPoller {
|
|||||||
if (deviceConfig == null) {
|
if (deviceConfig == null) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
boolean result = modbusProcessor.writeDataToDevice(deviceConfig);
|
boolean result = modbusProcessor.writeDataToDeviceWithRetry(deviceConfig);
|
||||||
if (!result) {
|
if (!result) {
|
||||||
logger.info("当前站点: {}, PCS设备: {} modbus控制设备{}指令发送失败", siteId, deviceId, chargeStatus.getInfo());
|
logger.info("当前站点: {}, PCS设备: {} modbus控制设备{}指令发送失败", siteId, deviceId, chargeStatus.getInfo());
|
||||||
continue;
|
continue;
|
||||||
@ -445,7 +445,7 @@ public class StrategyPoller {
|
|||||||
if (deviceConfig == null) {
|
if (deviceConfig == null) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
boolean result = modbusProcessor.writeDataToDevice(deviceConfig);
|
boolean result = modbusProcessor.writeDataToDeviceWithRetry(deviceConfig);
|
||||||
if (!result) {
|
if (!result) {
|
||||||
logger.info("当前站点: {}, PCS设备: {} modbus控制设备{}指令发送失败", siteId, deviceConfig, workStatus.getInfo());
|
logger.info("当前站点: {}, PCS设备: {} modbus控制设备{}指令发送失败", siteId, deviceConfig, workStatus.getInfo());
|
||||||
}
|
}
|
||||||
@ -491,4 +491,4 @@ public class StrategyPoller {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user