dev #2

Merged
dashixiong merged 349 commits from dev into main 2026-02-11 01:55:46 +00:00
184 changed files with 16345 additions and 934 deletions
Showing only changes of commit 1da9867372 - Show all commits

View File

@ -136,3 +136,21 @@ mqtt:
connection-timeout: 15
keep-alive-interval: 30
automatic-reconnect: true
modbus:
pool:
max-total: 20
max-idle: 10
min-idle: 3
poll:
interval: "0 */5 * * * *" # 5分钟间隔
timeout: 30000 # 30秒超时
resilience4j:
circuitbreaker:
instances:
modbusOperation:
failure-rate-threshold: 50
minimum-number-of-calls: 5
sliding-window-size: 10
wait-duration-in-open-state: 10s

View File

@ -50,6 +50,20 @@
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>
<!-- 轮询 -->
<dependency>
<groupId>net.wimpi</groupId>
<artifactId>j2mod</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-circuitbreaker</artifactId>
</dependency>
<!-- 获取系统信息 -->
<dependency>
<groupId>com.github.oshi</groupId>
@ -63,5 +77,4 @@
</dependency>
</dependencies>
</project>

View File

@ -25,7 +25,7 @@ import com.xzzn.common.utils.ip.IpUtils;
* @author xzzn
*/
@Aspect
@Component
@Component("customRateLimiterAspect")
public class RateLimiterAspect
{
private static final Logger log = LoggerFactory.getLogger(RateLimiterAspect.class);

View File

@ -0,0 +1,27 @@
package com.xzzn.framework.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean("modbusTaskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(300);
executor.setThreadNamePrefix("ModbusPoller-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}

View File

@ -0,0 +1,38 @@
package com.xzzn.framework.config;
import com.xzzn.framework.manager.ModbusConnectionManager;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ModbusConfig {
@Value("${modbus.pool.max-total:20}")
private int maxTotal;
@Value("${modbus.pool.max-idle:10}")
private int maxIdle;
@Value("${modbus.pool.min-idle:3}")
private int minIdle;
@Value("${modbus.pool.max-wait:3000}")
private long maxWaitMillis;
@Value("${modbus.pool.time-between-eviction-runs:30000}")
private long timeBetweenEvictionRunsMillis;
@Value("${modbus.pool.min-evictable-idle-time:60000}")
private long minEvictableIdleTimeMillis;
public ModbusConnectionManager modbusConnectionManager() {
ModbusConnectionManager manager = new ModbusConnectionManager();
manager.setMaxTotal(maxTotal);
manager.setMaxIdle(maxIdle);
manager.setMinIdle(minIdle);
manager.setMaxWaitMillis(maxWaitMillis);
manager.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
manager.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
return manager;
}
}

View File

@ -0,0 +1,189 @@
package com.xzzn.framework.manager;
import com.ghgande.j2mod.modbus.net.TCPMasterConnection;
import com.xzzn.ems.domain.EmsDevicesSetting;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.net.InetAddress;
import java.util.Map;
import java.util.concurrent.*;
@Component
public class ModbusConnectionManager implements ApplicationRunner {
private static final Logger logger = LoggerFactory.getLogger(ModbusConnectionManager.class);
private final Map<Integer, ModbusConnectionWrapper> connectionPool = new ConcurrentHashMap<>();
// 连接池配置参数
private int maxTotal = 20;
private int maxIdle = 10;
private int minIdle = 3;
private long maxWaitMillis = 3000;
private long timeBetweenEvictionRunsMillis = 30000;
private long minEvictableIdleTimeMillis = 60000;
@Override
public void run(ApplicationArguments args) throws Exception {
init();
}
@PostConstruct
public void init() {
// 启动心跳检测线程
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(this::heartbeatCheck, 1, 5, TimeUnit.MINUTES);
logger.info("Modbus连接管理器已初始化");
}
/**
* 获取连接(带自动创建和缓存)
*/
public ModbusConnectionWrapper getConnection(EmsDevicesSetting device) throws Exception {
return connectionPool.compute(Math.toIntExact(device.getId()), (id, wrapper) -> {
try {
if (wrapper == null || !wrapper.isActive()) {
if (connectionPool.size() >= maxTotal) {
evictIdleConnection();
}
logger.info("创建新连接: {}", device);
return new ModbusConnectionWrapper(createRawConnection(device));
}
wrapper.updateLastAccess();
return wrapper;
} catch (Exception e) {
throw new RuntimeException("连接创建失败: " + device.getId(), e);
}
});
}
/**
* 创建原始Modbus连接
*/
private TCPMasterConnection createRawConnection(EmsDevicesSetting device) throws Exception {
try {
InetAddress addr = InetAddress.getByName("192.168.80.100");
TCPMasterConnection connection = new TCPMasterConnection(addr);
connection.setPort(502);
connection.setTimeout(5000);
connection.connect();
return connection;
} catch (Exception e) {
logger.error("创建Modbus连接失败: {}", device, e);
throw e;
}
}
/**
* 心跳检测
*/
private void heartbeatCheck() {
logger.debug("开始监控Modbus连接池状态");
connectionPool.forEach((id, wrapper) -> {
try {
if (!wrapper.isActive()) {
logger.warn("连接{}已失效,移除连接", id);
connectionPool.remove(id);
wrapper.close();
}
} catch (Exception e) {
logger.error("心跳检测异常: {}", id, e);
}
});
// 维持最小空闲连接
try {
while (connectionPool.size() < minIdle) {
preloadCriticalConnection();
}
} catch (Exception e) {
logger.error("预加载连接失败", e);
}
}
/**
* 预加载关键连接
*/
private void preloadCriticalConnection() {
// 这里应根据实际业务逻辑选择需要预加载的设备
// 简化示例,不实现具体逻辑
logger.debug("预加载连接: 连接池当前大小={}, 最小空闲={}", connectionPool.size(), minIdle);
}
/**
* 移除最久未使用的空闲连接
*/
private void evictIdleConnection() {
ModbusConnectionWrapper oldestWrapper = null;
long oldestAccessTime = Long.MAX_VALUE;
for (ModbusConnectionWrapper wrapper : connectionPool.values()) {
if (wrapper.isActive() && wrapper.getLastAccessTime() < oldestAccessTime) {
oldestAccessTime = wrapper.getLastAccessTime();
oldestWrapper = wrapper;
}
}
if (oldestWrapper != null) {
logger.info("移除空闲连接: {}", oldestWrapper.getConnection());
connectionPool.values().remove(oldestWrapper);
oldestWrapper.close();
}
}
/**
* 判断是否应该移除空连接池
*/
private boolean shouldRemoveEmptyPool(GenericObjectPool<?> pool) {
// 可根据配置或逻辑决定是否移除空连接池
// 这里简单实现为当连接池数量超过最大值时移除
return connectionPool.size() > maxTotal;
}
/**
* 关闭连接
*/
private void closeConnection(TCPMasterConnection connection) {
try {
if (connection != null && connection.isConnected()) {
connection.close();
}
} catch (Exception e) {
logger.error("关闭Modbus连接失败", e);
}
}
// Getters and Setters
public void setMaxTotal(int maxTotal) {
this.maxTotal = maxTotal;
}
public void setMaxIdle(int maxIdle) {
this.maxIdle = maxIdle;
}
public void setMinIdle(int minIdle) {
this.minIdle = minIdle;
}
public void setMaxWaitMillis(long maxWaitMillis) {
this.maxWaitMillis = maxWaitMillis;
}
public void setTimeBetweenEvictionRunsMillis(long timeBetweenEvictionRunsMillis) {
this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;
}
public void setMinEvictableIdleTimeMillis(long minEvictableIdleTimeMillis) {
this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis;
}
}

View File

@ -0,0 +1,81 @@
package com.xzzn.framework.manager;
import com.ghgande.j2mod.modbus.net.SerialConnection;
import com.ghgande.j2mod.modbus.net.TCPMasterConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class ModbusConnectionWrapper {
private static final Logger logger = LoggerFactory.getLogger(ModbusConnectionWrapper.class);
private static final AtomicInteger COUNTER = new AtomicInteger(0);
private final Object connection;
private final int connectionId;
private volatile long lastAccessTime;
private volatile boolean active = true;
public ModbusConnectionWrapper(Object connection) {
this.connection = connection;
this.connectionId = COUNTER.incrementAndGet();
this.lastAccessTime = System.currentTimeMillis();
logger.info("创建连接包装: {}", this);
}
public boolean isActive() {
if (!active) return false;
try {
// 检查连接是否物理上有效
if (connection instanceof TCPMasterConnection) {
return ((TCPMasterConnection) connection).isConnected();
} else if (connection instanceof SerialConnection) {
return ((SerialConnection) connection).isOpen();
}
} catch (Exception e) {
logger.error("连接状态检查失败: {}", connectionId, e);
return false;
}
// 默认检查空闲时间
return System.currentTimeMillis() - lastAccessTime < 300000; // 5分钟
}
public void updateLastAccess() {
this.lastAccessTime = System.currentTimeMillis();
}
public Object getConnection() {
return connection;
}
public void close() {
try {
logger.info("关闭连接: {}", this);
if (connection instanceof TCPMasterConnection) {
((TCPMasterConnection) connection).close();
} else if (connection instanceof SerialConnection) {
((SerialConnection) connection).close();
}
} catch (Exception e) {
logger.error("关闭连接失败: {}", connectionId, e);
} finally {
this.active = false;
}
}
public long getLastAccessTime() {
return lastAccessTime;
}
@Override
public String toString() {
return "ModbusConnectionWrapper{" +
"connectionId=" + connectionId +
", active=" + active +
", lastAccessTime=" + lastAccessTime +
'}';
}
}

View File

@ -0,0 +1,126 @@
package com.xzzn.framework.scheduler;
import com.xzzn.ems.domain.EmsDevicesSetting;
import com.xzzn.ems.mapper.EmsDevicesSettingMapper;
import com.xzzn.ems.mapper.EmsMqttMessageMapper;
import com.xzzn.framework.manager.ModbusConnectionManager;
import com.xzzn.framework.manager.ModbusConnectionWrapper;
import com.xzzn.framework.manager.MqttLifecycleManager;
import com.xzzn.framework.web.service.ModbusService;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@EnableScheduling
public class ModbusPoller {
private static final Logger logger = LoggerFactory.getLogger(ModbusPoller.class);
private final MqttLifecycleManager mqttLifecycleManager;
@Autowired
private ModbusConnectionManager connectionManager;
@Autowired
private ModbusService modbusService;
@Autowired
private EmsDevicesSettingMapper deviceRepo;
@Autowired
private EmsMqttMessageMapper emsMqttMessageMapper;
@Autowired
public ModbusPoller(MqttLifecycleManager mqttLifecycleManager) {
this.mqttLifecycleManager = mqttLifecycleManager;
}
// 每5分钟触发支持cron表达式动态配置
@Scheduled(cron = "${modbus.poll.interval}")
@Async("modbusTaskExecutor")
public void pollAllDevices() {
logger.info("开始执行Modbus设备轮询...");
List<EmsDevicesSetting> activeDevices = deviceRepo.selectEmsDevicesSettingList(null);
EmsDevicesSetting device = activeDevices.get(0);
try {
processData(device,null);
} catch (Exception e) {
logger.error("调度设备{}任务失败", device.getId(), e);
}
/*
try {
pollSingleDevice(device);
} catch (Exception e) {
logger.error("调度设备{}任务失败", device.getId(), e);
}*/
/*activeDevices.forEach(device -> {
try {
CompletableFuture.runAsync(() -> pollSingleDevice(device))
.exceptionally(e -> {
logger.error("设备{}轮询异常", device.getId(), e);
return null;
});
} catch (Exception e) {
logger.error("调度设备{}任务失败", device.getId(), e);
}
});*/
}
private void pollSingleDevice(EmsDevicesSetting device) {
logger.debug("开始轮询设备: {}", device.getSiteId(), device.getDeviceName(), device.getId());
ModbusConnectionWrapper wrapper = null;
try {
// 获取连接
wrapper = connectionManager.getConnection(device);
// 读取保持寄存器
int[] data = modbusService.readHoldingRegisters(
wrapper.getConnection(),
1, //从站ID
10 // 寄存器数量
);
// 处理读取到的数据
processData(device, data);
} catch (Exception e) {
logger.error("轮询设备{}失败: {}", device.getId(), e.getMessage());
// 标记连接为无效
if (wrapper != null) {
wrapper.close();
}
// 重试机制会在这里触发
throw new RuntimeException("轮询设备失败", e);
}
}
// 处理获取到的数据发到mqtt服务上
private void processData(EmsDevicesSetting device, int[] data) throws MqttException {
/*if (data == null || data.length == 0) {
logger.warn("设备{}返回空数据", device.getId());
return;
}*/
/*// 数据处理逻辑
StringBuilder sb = new StringBuilder();
sb.append("设备[").append(device.getDeviceName()).append("]数据: ");
for (int i = 0; i < data.length; i++) {
sb.append("R").append(i).append("=").append(data[i]).append(" ");
}
logger.info(sb.toString());*/
// 测试发送mqtt
/* EmsMqttMessage msg = emsMqttMessageMapper.selectEmsMqttMessageById(1L);
String dataJson = msg.getMqttMessage();
String topic = msg.getMqttTopic();
logger.info("topic" + topic);
logger.info("dataJson" + dataJson);
// 将设备数据下发到mqtt服务器上
mqttLifecycleManager.publish(topic, dataJson, 0);*/
}
}

View File

@ -0,0 +1,101 @@
package com.xzzn.framework.web.service;
import com.ghgande.j2mod.modbus.ModbusException;
import com.ghgande.j2mod.modbus.ModbusIOException;
import com.ghgande.j2mod.modbus.io.ModbusTCPTransaction;
import com.ghgande.j2mod.modbus.msg.ReadInputRegistersRequest;
import com.ghgande.j2mod.modbus.msg.ReadInputRegistersResponse;
import com.ghgande.j2mod.modbus.net.SerialConnection;
import com.ghgande.j2mod.modbus.net.TCPMasterConnection;
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
/**
* Modbus操作服务添加重试机制
*/
@Service
public class ModbusService {
private static final Logger logger = LoggerFactory.getLogger(ModbusService.class);
@Retryable(
value = {ModbusException.class}, // 仅对自定义Modbus异常重试
maxAttempts = 3, // 最大重试3次1次原始调用 + 2次重试
backoff = @Backoff(delay = 1000, multiplier = 2) // 退避策略1s → 2s → 4s
)
@CircuitBreaker(name = "modbusOperation", fallbackMethod = "readRegistersFallback")
public int[] readHoldingRegisters(Object connection, int startAddr, int count) throws ModbusException {
try {
if (connection instanceof TCPMasterConnection) {
return readTcpRegisters((TCPMasterConnection) connection, startAddr, count);
} else if (connection instanceof SerialConnection) {
return readRtuRegisters((SerialConnection) connection, startAddr, count);
}
throw new IllegalArgumentException("不支持的连接类型: " + connection.getClass().getName());
} catch (ModbusIOException e) {
throw new ModbusException("通信故障", e);
} catch (Exception e) {
throw new ModbusException("系统错误", e);
}
}
private int[] readRtuRegisters(SerialConnection connection, int startAddr, int count) {
return null;
}
private int[] readTcpRegisters(TCPMasterConnection conn, int start, int count) throws ModbusException {
// 验证连接是否已建立
if (!conn.isConnected()) {
throw new ModbusIOException("TCP连接未建立");
}
// 使用正确的功能码03 - 读取保持寄存器ReadHoldingRegistersRequest
ReadInputRegistersRequest request = new ReadInputRegistersRequest(start, count);
ModbusTCPTransaction transaction = new ModbusTCPTransaction(conn);
transaction.setRequest(request);
// 设置超时避免长时间阻塞
transaction.setRetries(2);
try {
transaction.execute();
ReadInputRegistersResponse response = (ReadInputRegistersResponse) transaction.getResponse();
if (response == null) {
throw new ModbusException("Modbus异常响应: " + response.getMessage());
}
// 正确解析寄存器值
return parseRegisters(response);
} catch (ModbusException e) {
// 记录详细错误信息
logger.error("读取TCP寄存器失败: {}", e.getMessage());
throw e;
}
}
/**
* 解析Modbus响应中的寄存器值
*/
private int[] parseRegisters(ReadInputRegistersResponse response) {
int byteCount = response.getByteCount();
int[] result = new int[byteCount / 2];
for (int i = 0; i < result.length; i++) {
// 转换为无符号整数
result[i] = response.getRegisterValue(i) & 0xFFFF;
}
return result;
}
/**
* 熔断降级方法
*/
public int[] readRegistersFallback(Object connection, int startAddr, int count, Exception e) {
logger.warn("Modbus操作降级原因: {}),返回空数据", e.getMessage());
return new int[0];
}
}

17
pom.xml
View File

@ -165,6 +165,23 @@
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!-- modbus -->
<dependency>
<groupId>net.wimpi</groupId>
<artifactId>j2mod</artifactId>
<version>3.1.0</version> <!-- 提供TCPMasterConnection、SerialConnection等类 -->
</dependency>
<!-- Resilience4j 断路器核心 -->
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-circuitbreaker</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
<version>1.3.4</version> <!-- 根据你的 Spring 版本选择兼容的版本 -->
</dependency>
<!-- velocity代码生成使用模板 -->
<dependency>
<groupId>org.apache.velocity</groupId>