diff --git a/ems-common/pom.xml b/ems-common/pom.xml index 9bade5d..c28364f 100644 --- a/ems-common/pom.xml +++ b/ems-common/pom.xml @@ -119,7 +119,7 @@ javax.servlet-api - + com.infiniteautomation modbus4j @@ -127,4 +127,4 @@ - \ No newline at end of file + diff --git a/ems-common/src/main/java/com/xzzn/common/core/modbus/ModbusProcessor.java b/ems-common/src/main/java/com/xzzn/common/core/modbus/ModbusProcessor.java index 91ce835..1ea73d9 100644 --- a/ems-common/src/main/java/com/xzzn/common/core/modbus/ModbusProcessor.java +++ b/ems-common/src/main/java/com/xzzn/common/core/modbus/ModbusProcessor.java @@ -50,6 +50,8 @@ public class ModbusProcessor { private int writeTimeout; @Value("${modbus.read-retries:1}") private int readRetries; + @Value("${modbus.write-retries:1}") + private int writeRetries; @Autowired private RedisCache redisCache; @@ -64,6 +66,7 @@ public class ModbusProcessor { try { master = connectionManager.borrowMaster(config); master.setTimeout(writeTimeout); + master.setRetries(0); result = writeTagValue(master, config, config.getWriteTags()); } catch (Exception e) { logger.error("Failed to borrow connection or write to devices '{}'", config.getDeviceName(), e); @@ -91,12 +94,13 @@ public class ModbusProcessor { try { master = connectionManager.borrowMaster(config); master.setTimeout(writeTimeout); // 设置超时时间 + master.setRetries(0); // 使用重试装饰器 ModbusMaster finalMaster = master; result = RetryableModbusOperation.executeWithRetry(() -> { return writeTagValue(finalMaster, config, config.getWriteTags()); - }, 1); // 最大重试1次(共2次尝试) + }, writeRetries); // 最大重试次数由配置控制 } catch (Exception e) { logger.error("Failed to borrow connection or write to devices '{}'", config.getDeviceName(), e); diff --git a/ems-quartz/src/main/java/com/xzzn/quartz/task/ModbusPoller.java b/ems-quartz/src/main/java/com/xzzn/quartz/task/ModbusPoller.java index dcc2098..32a2141 100644 --- a/ems-quartz/src/main/java/com/xzzn/quartz/task/ModbusPoller.java +++ b/ems-quartz/src/main/java/com/xzzn/quartz/task/ModbusPoller.java @@ -33,7 +33,9 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Resource; import java.util.stream.Collectors; @@ -57,6 +59,7 @@ public class ModbusPoller { private final ScheduledTask scheduledTask; private final ObjectMapper objectMapper = new ObjectMapper(); private final Map deviceFailureCounts = new ConcurrentHashMap<>(); + private final AtomicBoolean polling = new AtomicBoolean(false); @Resource(name = "modbusExecutor") private ExecutorService modbusExecutor; @@ -89,9 +92,14 @@ public class ModbusPoller { } public void pollAllDevices() { + if (!polling.compareAndSet(false, true)) { + log.warn("上一次轮询尚未完成,本次轮询跳过"); + return; + } Path devicesDir = Paths.get(System.getProperty("user.dir"), "devices"); if (!Files.exists(devicesDir)) { log.error("Devices目录不存在: {}", devicesDir); + polling.set(false); return; } @@ -102,6 +110,7 @@ public class ModbusPoller { .collect(Collectors.toList()); } catch (IOException e) { log.error("modbusPoller.loadConfigs 获取设备配置文件失败: {}", devicesDir, e); + polling.set(false); return; } @@ -124,7 +133,7 @@ public class ModbusPoller { // 使用单线程 executor 串行执行所有主机的 Modbus 操作 // 将所有主机的设备按顺序串行处理,避免任何并发访问 - modbusExecutor.submit(() -> { + Future future = modbusExecutor.submit(() -> { for (Map.Entry> entry : groupedByHost.entrySet()) { String hostKey = entry.getKey(); List configs = entry.getValue(); @@ -144,6 +153,16 @@ public class ModbusPoller { log.info("采集设备数据{}轮询任务执行完成", hostKey); } }); + try { + future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("Modbus轮询任务等待中断"); + } catch (Exception e) { + log.error("Modbus轮询任务执行异常", e); + } finally { + polling.set(false); + } } public void scheduledStart(DeviceConfig config) { diff --git a/ems-quartz/src/main/java/com/xzzn/quartz/task/StrategyPoller.java b/ems-quartz/src/main/java/com/xzzn/quartz/task/StrategyPoller.java index e24f16d..c06902d 100644 --- a/ems-quartz/src/main/java/com/xzzn/quartz/task/StrategyPoller.java +++ b/ems-quartz/src/main/java/com/xzzn/quartz/task/StrategyPoller.java @@ -418,7 +418,7 @@ public class StrategyPoller { if (deviceConfig == null) { continue; } - boolean result = modbusProcessor.writeDataToDevice(deviceConfig); + boolean result = modbusProcessor.writeDataToDeviceWithRetry(deviceConfig); if (!result) { logger.info("当前站点: {}, PCS设备: {} modbus控制设备{}指令发送失败", siteId, deviceId, chargeStatus.getInfo()); continue; @@ -445,7 +445,7 @@ public class StrategyPoller { if (deviceConfig == null) { return false; } - boolean result = modbusProcessor.writeDataToDevice(deviceConfig); + boolean result = modbusProcessor.writeDataToDeviceWithRetry(deviceConfig); if (!result) { logger.info("当前站点: {}, PCS设备: {} modbus控制设备{}指令发送失败", siteId, deviceConfig, workStatus.getInfo()); } @@ -491,4 +491,4 @@ public class StrategyPoller { return true; } -} \ No newline at end of file +}