From 1da986737216d7d31d91edb8fcfa67c1a8f17c70 Mon Sep 17 00:00:00 2001 From: mashili Date: Thu, 17 Jul 2025 10:13:19 +0800 Subject: [PATCH] =?UTF-8?q?modbus=E5=8D=8F=E8=AE=AE=E8=BD=AE=E8=AF=A2?= =?UTF-8?q?=E8=AE=BE=E5=A4=87=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ems-admin/src/main/resources/application.yml | 18 ++ ems-framework/pom.xml | 15 +- .../framework/aspectj/RateLimiterAspect.java | 2 +- .../xzzn/framework/config/AsyncConfig.java | 27 +++ .../xzzn/framework/config/ModbusConfig.java | 38 ++++ .../manager/ModbusConnectionManager.java | 189 ++++++++++++++++++ .../manager/ModbusConnectionWrapper.java | 81 ++++++++ .../framework/scheduler/ModbusPoller.java | 126 ++++++++++++ .../framework/web/service/ModbusService.java | 101 ++++++++++ pom.xml | 17 ++ 10 files changed, 612 insertions(+), 2 deletions(-) create mode 100644 ems-framework/src/main/java/com/xzzn/framework/config/AsyncConfig.java create mode 100644 ems-framework/src/main/java/com/xzzn/framework/config/ModbusConfig.java create mode 100644 ems-framework/src/main/java/com/xzzn/framework/manager/ModbusConnectionManager.java create mode 100644 ems-framework/src/main/java/com/xzzn/framework/manager/ModbusConnectionWrapper.java create mode 100644 ems-framework/src/main/java/com/xzzn/framework/scheduler/ModbusPoller.java create mode 100644 ems-framework/src/main/java/com/xzzn/framework/web/service/ModbusService.java diff --git a/ems-admin/src/main/resources/application.yml b/ems-admin/src/main/resources/application.yml index 7b1963f..828ca58 100644 --- a/ems-admin/src/main/resources/application.yml +++ b/ems-admin/src/main/resources/application.yml @@ -136,3 +136,21 @@ mqtt: connection-timeout: 15 keep-alive-interval: 30 automatic-reconnect: true + +modbus: + pool: + max-total: 20 + max-idle: 10 + min-idle: 3 + poll: + interval: "0 */5 * * * *" # 5分钟间隔 + timeout: 30000 # 30秒超时 + +resilience4j: + circuitbreaker: + instances: + modbusOperation: + failure-rate-threshold: 50 + minimum-number-of-calls: 5 + sliding-window-size: 10 + wait-duration-in-open-state: 10s \ No newline at end of file diff --git a/ems-framework/pom.xml b/ems-framework/pom.xml index 809fa4f..62d4c64 100644 --- a/ems-framework/pom.xml +++ b/ems-framework/pom.xml @@ -50,6 +50,20 @@ org.eclipse.paho org.eclipse.paho.client.mqttv3 + + + net.wimpi + j2mod + + + org.springframework.retry + spring-retry + + + + io.github.resilience4j + resilience4j-circuitbreaker + com.github.oshi @@ -63,5 +77,4 @@ - \ No newline at end of file diff --git a/ems-framework/src/main/java/com/xzzn/framework/aspectj/RateLimiterAspect.java b/ems-framework/src/main/java/com/xzzn/framework/aspectj/RateLimiterAspect.java index 857add2..9ade61c 100644 --- a/ems-framework/src/main/java/com/xzzn/framework/aspectj/RateLimiterAspect.java +++ b/ems-framework/src/main/java/com/xzzn/framework/aspectj/RateLimiterAspect.java @@ -25,7 +25,7 @@ import com.xzzn.common.utils.ip.IpUtils; * @author xzzn */ @Aspect -@Component +@Component("customRateLimiterAspect") public class RateLimiterAspect { private static final Logger log = LoggerFactory.getLogger(RateLimiterAspect.class); diff --git a/ems-framework/src/main/java/com/xzzn/framework/config/AsyncConfig.java b/ems-framework/src/main/java/com/xzzn/framework/config/AsyncConfig.java new file mode 100644 index 0000000..d80ce10 --- /dev/null +++ b/ems-framework/src/main/java/com/xzzn/framework/config/AsyncConfig.java @@ -0,0 +1,27 @@ +package com.xzzn.framework.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +@Configuration +@EnableAsync +public class AsyncConfig { + + @Bean("modbusTaskExecutor") + public Executor taskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(5); + executor.setMaxPoolSize(10); + executor.setQueueCapacity(100); + executor.setKeepAliveSeconds(300); + executor.setThreadNamePrefix("ModbusPoller-"); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.initialize(); + return executor; + } +} diff --git a/ems-framework/src/main/java/com/xzzn/framework/config/ModbusConfig.java b/ems-framework/src/main/java/com/xzzn/framework/config/ModbusConfig.java new file mode 100644 index 0000000..51398b4 --- /dev/null +++ b/ems-framework/src/main/java/com/xzzn/framework/config/ModbusConfig.java @@ -0,0 +1,38 @@ +package com.xzzn.framework.config; + +import com.xzzn.framework.manager.ModbusConnectionManager; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class ModbusConfig { + + @Value("${modbus.pool.max-total:20}") + private int maxTotal; + + @Value("${modbus.pool.max-idle:10}") + private int maxIdle; + + @Value("${modbus.pool.min-idle:3}") + private int minIdle; + + @Value("${modbus.pool.max-wait:3000}") + private long maxWaitMillis; + + @Value("${modbus.pool.time-between-eviction-runs:30000}") + private long timeBetweenEvictionRunsMillis; + + @Value("${modbus.pool.min-evictable-idle-time:60000}") + private long minEvictableIdleTimeMillis; + + public ModbusConnectionManager modbusConnectionManager() { + ModbusConnectionManager manager = new ModbusConnectionManager(); + manager.setMaxTotal(maxTotal); + manager.setMaxIdle(maxIdle); + manager.setMinIdle(minIdle); + manager.setMaxWaitMillis(maxWaitMillis); + manager.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis); + manager.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis); + return manager; + } +} diff --git a/ems-framework/src/main/java/com/xzzn/framework/manager/ModbusConnectionManager.java b/ems-framework/src/main/java/com/xzzn/framework/manager/ModbusConnectionManager.java new file mode 100644 index 0000000..386c919 --- /dev/null +++ b/ems-framework/src/main/java/com/xzzn/framework/manager/ModbusConnectionManager.java @@ -0,0 +1,189 @@ +package com.xzzn.framework.manager; + +import com.ghgande.j2mod.modbus.net.TCPMasterConnection; +import com.xzzn.ems.domain.EmsDevicesSetting; +import org.apache.commons.pool2.impl.GenericObjectPool; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.net.InetAddress; +import java.util.Map; +import java.util.concurrent.*; + +@Component +public class ModbusConnectionManager implements ApplicationRunner { + private static final Logger logger = LoggerFactory.getLogger(ModbusConnectionManager.class); + + private final Map connectionPool = new ConcurrentHashMap<>(); + // 连接池配置参数 + private int maxTotal = 20; + private int maxIdle = 10; + private int minIdle = 3; + private long maxWaitMillis = 3000; + private long timeBetweenEvictionRunsMillis = 30000; + private long minEvictableIdleTimeMillis = 60000; + + + @Override + public void run(ApplicationArguments args) throws Exception { + init(); + } + + @PostConstruct + public void init() { + // 启动心跳检测线程 + ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + scheduler.scheduleAtFixedRate(this::heartbeatCheck, 1, 5, TimeUnit.MINUTES); + logger.info("Modbus连接管理器已初始化"); + } + + /** + * 获取连接(带自动创建和缓存) + */ + + public ModbusConnectionWrapper getConnection(EmsDevicesSetting device) throws Exception { + return connectionPool.compute(Math.toIntExact(device.getId()), (id, wrapper) -> { + try { + if (wrapper == null || !wrapper.isActive()) { + if (connectionPool.size() >= maxTotal) { + evictIdleConnection(); + } + logger.info("创建新连接: {}", device); + return new ModbusConnectionWrapper(createRawConnection(device)); + } + wrapper.updateLastAccess(); + return wrapper; + } catch (Exception e) { + throw new RuntimeException("连接创建失败: " + device.getId(), e); + } + }); + } + + /** + * 创建原始Modbus连接 + */ + private TCPMasterConnection createRawConnection(EmsDevicesSetting device) throws Exception { + try { + InetAddress addr = InetAddress.getByName("192.168.80.100"); + TCPMasterConnection connection = new TCPMasterConnection(addr); + connection.setPort(502); + connection.setTimeout(5000); + connection.connect(); + return connection; + } catch (Exception e) { + logger.error("创建Modbus连接失败: {}", device, e); + throw e; + } + } + + /** + * 心跳检测 + */ + private void heartbeatCheck() { + logger.debug("开始监控Modbus连接池状态"); + + connectionPool.forEach((id, wrapper) -> { + try { + if (!wrapper.isActive()) { + logger.warn("连接{}已失效,移除连接", id); + connectionPool.remove(id); + wrapper.close(); + } + } catch (Exception e) { + logger.error("心跳检测异常: {}", id, e); + } + }); + + // 维持最小空闲连接 + try { + while (connectionPool.size() < minIdle) { + preloadCriticalConnection(); + } + } catch (Exception e) { + logger.error("预加载连接失败", e); + } + } + + /** + * 预加载关键连接 + */ + + private void preloadCriticalConnection() { + // 这里应根据实际业务逻辑选择需要预加载的设备 + // 简化示例,不实现具体逻辑 + logger.debug("预加载连接: 连接池当前大小={}, 最小空闲={}", connectionPool.size(), minIdle); + } + + /** + * 移除最久未使用的空闲连接 + */ + + private void evictIdleConnection() { + ModbusConnectionWrapper oldestWrapper = null; + long oldestAccessTime = Long.MAX_VALUE; + + for (ModbusConnectionWrapper wrapper : connectionPool.values()) { + if (wrapper.isActive() && wrapper.getLastAccessTime() < oldestAccessTime) { + oldestAccessTime = wrapper.getLastAccessTime(); + oldestWrapper = wrapper; + } + } + + if (oldestWrapper != null) { + logger.info("移除空闲连接: {}", oldestWrapper.getConnection()); + connectionPool.values().remove(oldestWrapper); + oldestWrapper.close(); + } + } + + /** + * 判断是否应该移除空连接池 + */ + private boolean shouldRemoveEmptyPool(GenericObjectPool pool) { + // 可根据配置或逻辑决定是否移除空连接池 + // 这里简单实现为当连接池数量超过最大值时移除 + return connectionPool.size() > maxTotal; + } + + /** + * 关闭连接 + */ + private void closeConnection(TCPMasterConnection connection) { + try { + if (connection != null && connection.isConnected()) { + connection.close(); + } + } catch (Exception e) { + logger.error("关闭Modbus连接失败", e); + } + } + + // Getters and Setters + public void setMaxTotal(int maxTotal) { + this.maxTotal = maxTotal; + } + + public void setMaxIdle(int maxIdle) { + this.maxIdle = maxIdle; + } + + public void setMinIdle(int minIdle) { + this.minIdle = minIdle; + } + + public void setMaxWaitMillis(long maxWaitMillis) { + this.maxWaitMillis = maxWaitMillis; + } + + public void setTimeBetweenEvictionRunsMillis(long timeBetweenEvictionRunsMillis) { + this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis; + } + + public void setMinEvictableIdleTimeMillis(long minEvictableIdleTimeMillis) { + this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis; + } +} diff --git a/ems-framework/src/main/java/com/xzzn/framework/manager/ModbusConnectionWrapper.java b/ems-framework/src/main/java/com/xzzn/framework/manager/ModbusConnectionWrapper.java new file mode 100644 index 0000000..8237f4f --- /dev/null +++ b/ems-framework/src/main/java/com/xzzn/framework/manager/ModbusConnectionWrapper.java @@ -0,0 +1,81 @@ +package com.xzzn.framework.manager; + +import com.ghgande.j2mod.modbus.net.SerialConnection; +import com.ghgande.j2mod.modbus.net.TCPMasterConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicInteger; + +public class ModbusConnectionWrapper { + private static final Logger logger = LoggerFactory.getLogger(ModbusConnectionWrapper.class); + + private static final AtomicInteger COUNTER = new AtomicInteger(0); + + private final Object connection; + private final int connectionId; + private volatile long lastAccessTime; + private volatile boolean active = true; + + public ModbusConnectionWrapper(Object connection) { + this.connection = connection; + this.connectionId = COUNTER.incrementAndGet(); + this.lastAccessTime = System.currentTimeMillis(); + logger.info("创建连接包装: {}", this); + } + + public boolean isActive() { + if (!active) return false; + + try { + // 检查连接是否物理上有效 + if (connection instanceof TCPMasterConnection) { + return ((TCPMasterConnection) connection).isConnected(); + } else if (connection instanceof SerialConnection) { + return ((SerialConnection) connection).isOpen(); + } + } catch (Exception e) { + logger.error("连接状态检查失败: {}", connectionId, e); + return false; + } + + // 默认检查空闲时间 + return System.currentTimeMillis() - lastAccessTime < 300000; // 5分钟 + } + + public void updateLastAccess() { + this.lastAccessTime = System.currentTimeMillis(); + } + + public Object getConnection() { + return connection; + } + + public void close() { + try { + logger.info("关闭连接: {}", this); + if (connection instanceof TCPMasterConnection) { + ((TCPMasterConnection) connection).close(); + } else if (connection instanceof SerialConnection) { + ((SerialConnection) connection).close(); + } + } catch (Exception e) { + logger.error("关闭连接失败: {}", connectionId, e); + } finally { + this.active = false; + } + } + + public long getLastAccessTime() { + return lastAccessTime; + } + + @Override + public String toString() { + return "ModbusConnectionWrapper{" + + "connectionId=" + connectionId + + ", active=" + active + + ", lastAccessTime=" + lastAccessTime + + '}'; + } +} diff --git a/ems-framework/src/main/java/com/xzzn/framework/scheduler/ModbusPoller.java b/ems-framework/src/main/java/com/xzzn/framework/scheduler/ModbusPoller.java new file mode 100644 index 0000000..94a244a --- /dev/null +++ b/ems-framework/src/main/java/com/xzzn/framework/scheduler/ModbusPoller.java @@ -0,0 +1,126 @@ +package com.xzzn.framework.scheduler; + +import com.xzzn.ems.domain.EmsDevicesSetting; +import com.xzzn.ems.mapper.EmsDevicesSettingMapper; +import com.xzzn.ems.mapper.EmsMqttMessageMapper; +import com.xzzn.framework.manager.ModbusConnectionManager; +import com.xzzn.framework.manager.ModbusConnectionWrapper; +import com.xzzn.framework.manager.MqttLifecycleManager; +import com.xzzn.framework.web.service.ModbusService; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.List; + +@Component +@EnableScheduling +public class ModbusPoller { + private static final Logger logger = LoggerFactory.getLogger(ModbusPoller.class); + + private final MqttLifecycleManager mqttLifecycleManager; + + @Autowired + private ModbusConnectionManager connectionManager; + @Autowired + private ModbusService modbusService; + @Autowired + private EmsDevicesSettingMapper deviceRepo; + @Autowired + private EmsMqttMessageMapper emsMqttMessageMapper; + + @Autowired + public ModbusPoller(MqttLifecycleManager mqttLifecycleManager) { + this.mqttLifecycleManager = mqttLifecycleManager; + } + // 每5分钟触发(支持cron表达式动态配置) + @Scheduled(cron = "${modbus.poll.interval}") + @Async("modbusTaskExecutor") + public void pollAllDevices() { + logger.info("开始执行Modbus设备轮询..."); + + List activeDevices = deviceRepo.selectEmsDevicesSettingList(null); + + EmsDevicesSetting device = activeDevices.get(0); + try { + processData(device,null); + } catch (Exception e) { + logger.error("调度设备{}任务失败", device.getId(), e); + } + /* + try { + pollSingleDevice(device); + } catch (Exception e) { + logger.error("调度设备{}任务失败", device.getId(), e); + }*/ + /*activeDevices.forEach(device -> { + try { + CompletableFuture.runAsync(() -> pollSingleDevice(device)) + .exceptionally(e -> { + logger.error("设备{}轮询异常", device.getId(), e); + return null; + }); + } catch (Exception e) { + logger.error("调度设备{}任务失败", device.getId(), e); + } + });*/ + } + + private void pollSingleDevice(EmsDevicesSetting device) { + logger.debug("开始轮询设备: {}", device.getSiteId(), device.getDeviceName(), device.getId()); + + ModbusConnectionWrapper wrapper = null; + try { + // 获取连接 + wrapper = connectionManager.getConnection(device); + + // 读取保持寄存器 + int[] data = modbusService.readHoldingRegisters( + wrapper.getConnection(), + 1, //从站ID + 10 // 寄存器数量 + ); + + // 处理读取到的数据 + processData(device, data); + } catch (Exception e) { + logger.error("轮询设备{}失败: {}", device.getId(), e.getMessage()); + // 标记连接为无效 + if (wrapper != null) { + wrapper.close(); + } + // 重试机制会在这里触发 + throw new RuntimeException("轮询设备失败", e); + } + } + + // 处理获取到的数据,发到mqtt服务上 + private void processData(EmsDevicesSetting device, int[] data) throws MqttException { + /*if (data == null || data.length == 0) { + logger.warn("设备{}返回空数据", device.getId()); + return; + }*/ + + /*// 数据处理逻辑 + StringBuilder sb = new StringBuilder(); + sb.append("设备[").append(device.getDeviceName()).append("]数据: "); + for (int i = 0; i < data.length; i++) { + sb.append("R").append(i).append("=").append(data[i]).append(" "); + } + logger.info(sb.toString());*/ + + // 测试发送mqtt + /* EmsMqttMessage msg = emsMqttMessageMapper.selectEmsMqttMessageById(1L); + String dataJson = msg.getMqttMessage(); + String topic = msg.getMqttTopic(); + logger.info("topic:" + topic); + logger.info("dataJson:" + dataJson); + // 将设备数据下发到mqtt服务器上 + mqttLifecycleManager.publish(topic, dataJson, 0);*/ + } +} \ No newline at end of file diff --git a/ems-framework/src/main/java/com/xzzn/framework/web/service/ModbusService.java b/ems-framework/src/main/java/com/xzzn/framework/web/service/ModbusService.java new file mode 100644 index 0000000..5783d18 --- /dev/null +++ b/ems-framework/src/main/java/com/xzzn/framework/web/service/ModbusService.java @@ -0,0 +1,101 @@ +package com.xzzn.framework.web.service; + +import com.ghgande.j2mod.modbus.ModbusException; +import com.ghgande.j2mod.modbus.ModbusIOException; +import com.ghgande.j2mod.modbus.io.ModbusTCPTransaction; +import com.ghgande.j2mod.modbus.msg.ReadInputRegistersRequest; +import com.ghgande.j2mod.modbus.msg.ReadInputRegistersResponse; +import com.ghgande.j2mod.modbus.net.SerialConnection; +import com.ghgande.j2mod.modbus.net.TCPMasterConnection; +import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.retry.annotation.Backoff; +import org.springframework.retry.annotation.Retryable; +import org.springframework.stereotype.Service; + +/** + * Modbus操作服务(添加重试机制) + */ +@Service +public class ModbusService { + private static final Logger logger = LoggerFactory.getLogger(ModbusService.class); + + @Retryable( + value = {ModbusException.class}, // 仅对自定义Modbus异常重试 + maxAttempts = 3, // 最大重试3次(1次原始调用 + 2次重试) + backoff = @Backoff(delay = 1000, multiplier = 2) // 退避策略:1s → 2s → 4s + ) + @CircuitBreaker(name = "modbusOperation", fallbackMethod = "readRegistersFallback") + public int[] readHoldingRegisters(Object connection, int startAddr, int count) throws ModbusException { + try { + if (connection instanceof TCPMasterConnection) { + return readTcpRegisters((TCPMasterConnection) connection, startAddr, count); + } else if (connection instanceof SerialConnection) { + return readRtuRegisters((SerialConnection) connection, startAddr, count); + } + throw new IllegalArgumentException("不支持的连接类型: " + connection.getClass().getName()); + } catch (ModbusIOException e) { + throw new ModbusException("通信故障", e); + } catch (Exception e) { + throw new ModbusException("系统错误", e); + } + } + + private int[] readRtuRegisters(SerialConnection connection, int startAddr, int count) { + return null; + } + + private int[] readTcpRegisters(TCPMasterConnection conn, int start, int count) throws ModbusException { + // 验证连接是否已建立 + if (!conn.isConnected()) { + throw new ModbusIOException("TCP连接未建立"); + } + // 使用正确的功能码(03 - 读取保持寄存器)ReadHoldingRegistersRequest + ReadInputRegistersRequest request = new ReadInputRegistersRequest(start, count); + ModbusTCPTransaction transaction = new ModbusTCPTransaction(conn); + transaction.setRequest(request); + + // 设置超时避免长时间阻塞 + transaction.setRetries(2); + try { + transaction.execute(); + ReadInputRegistersResponse response = (ReadInputRegistersResponse) transaction.getResponse(); + + if (response == null) { + throw new ModbusException("Modbus异常响应: " + response.getMessage()); + } + + // 正确解析寄存器值 + return parseRegisters(response); + } catch (ModbusException e) { + // 记录详细错误信息 + logger.error("读取TCP寄存器失败: {}", e.getMessage()); + throw e; + } + } + + /** + * 解析Modbus响应中的寄存器值 + */ + private int[] parseRegisters(ReadInputRegistersResponse response) { + int byteCount = response.getByteCount(); + int[] result = new int[byteCount / 2]; + + for (int i = 0; i < result.length; i++) { + // 转换为无符号整数 + result[i] = response.getRegisterValue(i) & 0xFFFF; + } + + return result; + } + + /** + * 熔断降级方法 + */ + public int[] readRegistersFallback(Object connection, int startAddr, int count, Exception e) { + logger.warn("Modbus操作降级(原因: {}),返回空数据", e.getMessage()); + return new int[0]; + } + +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index ee188eb..68a3378 100644 --- a/pom.xml +++ b/pom.xml @@ -165,6 +165,23 @@ spring-integration-mqtt + + + net.wimpi + j2mod + 3.1.0 + + + + io.github.resilience4j + resilience4j-circuitbreaker + 1.7.1 + + + org.springframework.retry + spring-retry + 1.3.4 + org.apache.velocity