集成modbus连接本地设备读取数据代码-配置文件方式读取;

PCS开关机功能通过modbus连接设备发送控制命令;
This commit is contained in:
zq
2025-12-19 11:38:04 +08:00
parent 16d414f092
commit ff487f1b05
26 changed files with 1286 additions and 216 deletions

View File

@ -0,0 +1,34 @@
package com.xzzn.quartz.config;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.springframework.stereotype.Component;
@Component
public class ScheduledTask {
private ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
private final Map<String, ScheduledFuture<?>> futureMap = new ConcurrentHashMap<>();
public void startTask(String deviceId, Runnable task, long period) {
stopTask(deviceId); // 如果已有同ID任务在运行先停止
ScheduledFuture<?> future = executor.scheduleAtFixedRate(task, 0, period, TimeUnit.MILLISECONDS);
futureMap.put(deviceId, future);
}
public void stopTask(String deviceId) {
ScheduledFuture<?> future = futureMap.get(deviceId);
if (future != null && !future.isDone()) {
future.cancel(true);
}
futureMap.remove(deviceId);
}
}

View File

@ -1,179 +1,204 @@
package com.xzzn.quartz.task;
import com.xzzn.common.enums.DeviceRunningStatus;
import com.xzzn.ems.domain.EmsDeviceChangeLog;
import com.xzzn.ems.domain.EmsDevicesSetting;
import com.xzzn.ems.mapper.EmsDeviceChangeLogMapper;
import com.xzzn.ems.mapper.EmsDevicesSettingMapper;
import com.xzzn.ems.mapper.EmsMqttMessageMapper;
import com.xzzn.ems.service.impl.EmsDeviceSettingServiceImpl;
import com.xzzn.framework.manager.ModbusConnectionManager;
import com.xzzn.framework.manager.ModbusConnectionWrapper;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.serotonin.modbus4j.ModbusMaster;
import com.xzzn.common.constant.RedisKeyConstants;
import com.xzzn.common.core.modbus.ModbusProcessor;
import com.xzzn.common.core.modbus.domain.DeviceConfig;
import com.xzzn.common.core.redis.RedisCache;
import com.xzzn.common.utils.DateUtils;
import com.xzzn.ems.service.IEmsAlarmRecordsService;
import com.xzzn.ems.service.impl.DeviceDataProcessServiceImpl;
import com.xzzn.framework.manager.MqttLifecycleManager;
import com.xzzn.framework.web.service.ModbusService;
import com.xzzn.framework.web.service.MqttPublisher;
import com.xzzn.quartz.config.ScheduledTask;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.List;
import java.util.UUID;
/**
* 轮询设备-通过modbus协议读取数据
*/
@Component("modbusPoller")
public class ModbusPoller {
private static final Logger logger = LoggerFactory.getLogger(ModbusPoller.class);
private static final Logger log = LoggerFactory.getLogger(ModbusPoller.class);
private final MqttLifecycleManager mqttLifecycleManager;
private final ScheduledTask scheduledTask;
private final ObjectMapper objectMapper = new ObjectMapper();
@Autowired
private ModbusConnectionManager connectionManager;
private ModbusProcessor modbusProcessor;
@Autowired
private ModbusService modbusService;
private IEmsAlarmRecordsService iEmsAlarmRecordsService;
@Autowired
private EmsDevicesSettingMapper deviceRepo;
private DeviceDataProcessServiceImpl deviceDataProcessServiceImpl;
@Autowired
private EmsMqttMessageMapper emsMqttMessageMapper;
private RedisCache redisCache;
@Autowired
private EmsDeviceSettingServiceImpl emsDeviceSettingServiceImpl;
@Autowired
private EmsDeviceChangeLogMapper emsDeviceChangeLogMapper;
private MqttPublisher mqttPublisher;
@Value("${mqtt.topic}")
private String topic;
@Value("${mqtt.siteId}")
private String siteId;
@Autowired
public ModbusPoller(MqttLifecycleManager mqttLifecycleManager) {
public ModbusPoller(MqttLifecycleManager mqttLifecycleManager, ScheduledTask scheduledTask) {
this.mqttLifecycleManager = mqttLifecycleManager;
this.scheduledTask = scheduledTask;
}
public void pollAllDevices() {
logger.info("开始执行Modbus设备轮询...");
EmsDevicesSetting selectEntity = new EmsDevicesSetting();
selectEntity.setDeviceStatus(DeviceRunningStatus.RUNNING.getCode());
List<EmsDevicesSetting> activeDevices = deviceRepo.selectEmsDevicesSettingList(selectEntity);
EmsDevicesSetting device = activeDevices.get(0);
try {
//pollSingleDevice(device);
} catch (Exception e) {
logger.error("调度设备{}任务失败", device.getId(), e);
}
/*activeDevices.forEach(device -> {
try {
CompletableFuture.runAsync(() -> pollSingleDevice(device))
.exceptionally(e -> {
logger.error("设备{}轮询异常", device.getId(), e);
return null;
});
} catch (Exception e) {
logger.error("调度设备{}任务失败", device.getId(), e);
}
});*/
}
private void pollSingleDevice(EmsDevicesSetting device) {
logger.debug("开始轮询设备: {}", device.getSiteId(), device.getDeviceName(), device.getId());
ModbusConnectionWrapper wrapper = null;
try {
// 获取连接
wrapper = connectionManager.getConnection(device);
if(wrapper == null || !wrapper.isActive()){
logger.error("轮询设备{}连接失败: {}", device.getId());
return;
}
// 读取保持寄存器
int[] data = modbusService.readHoldingRegisters(
wrapper.getConnection(),
1, //从站ID
10 // 寄存器数量
);
// 处理读取到的数据
processData(device, data);
} catch (Exception e) {
logger.error("轮询设备{}失败: {}", device.getId(), e.getMessage());
// 标记连接为无效
if (wrapper != null) {
wrapper.close();
connectionManager.removeConnection(Integer.parseInt(device.getDeviceId()));
}
// 设备轮询不到修改运行状态
String beforeStatus = device.getDeviceStatus();
device.setDeviceStatus(DeviceRunningStatus.SHUTDOWN.getCode());
emsDeviceSettingServiceImpl.updateDevice(device);
// 轮询设备,设备状态变更日志
EmsDeviceChangeLog log = createLogEntity(beforeStatus,device);
emsDeviceChangeLogMapper.insertEmsDeviceChangeLog(log);
throw new RuntimeException("轮询设备失败", e);
}
}
// 处理获取到的数据
private void processData(EmsDevicesSetting device, int[] data) throws MqttException {
String beforeStatus = device.getDeviceStatus();
Boolean error = true;
if (data == null || data.length == 0) {
logger.warn("设备{}返回空数据", device.getId());
// 设备读取不到-设置设备故障
device.setDeviceStatus(DeviceRunningStatus.FAULT.getCode());
error = false;
} else {
// 恢复设备状态 - 运行
device.setDeviceStatus(DeviceRunningStatus.RUNNING.getCode());
}
emsDeviceSettingServiceImpl.updateDevice(device);
// 轮询设备,设备状态变更日志
EmsDeviceChangeLog log = createLogEntity(beforeStatus,device);
emsDeviceChangeLogMapper.insertEmsDeviceChangeLog(log);
// 错误数据-不处理直接返回
if (!error) {
Path devicesDir = Paths.get(System.getProperty("user.dir"), "devices");
if (!Files.exists(devicesDir)) {
log.error("Devices目录不存在: {}", devicesDir);
return;
}
// 数据处理逻辑
StringBuilder sb = new StringBuilder();
sb.append("设备[").append(device.getDeviceName()).append("]数据: ");
for (int i = 0; i < data.length; i++) {
sb.append("R").append(i).append("=").append(data[i]).append(" ");
List<Path> jsonFiles = null;
try {
jsonFiles = Files.list(devicesDir)
.filter(path -> path.toString().endsWith(".json"))
.collect(Collectors.toList());
} catch (IOException e) {
log.error("modbusPoller.loadConfigs 获取设备配置文件失败: {}", devicesDir, e);
return;
}
String message = sb.toString();
logger.info(sb.toString());
/*
String siteId = device.getSiteId();
if (siteId.startsWith("021_DDS")) {
dDSDataProcessService.handleDdsData(message);
} else if (siteId.startsWith("021_FXX")) {
fXXDataProcessService.handleFxData(message);
}*/
// 按 host:port 分组
Map<String, List<DeviceConfig>> groupedConfigs = new HashMap<>();
for (Path filePath : jsonFiles) {
DeviceConfig config = null;
try {
config = objectMapper.readValue(filePath.toFile(), DeviceConfig.class);
} catch (IOException e) {
log.error("modbusPoller.loadConfigs 解析设备配置文件失败: {}", filePath, e);
continue;
}
if (config.isEnabled()) {
String key = config.getHost() + ":" + config.getPort();
groupedConfigs.computeIfAbsent(key, k -> new ArrayList<>()).add(config);
}
}
// 测试发送mqtt
/* EmsMqttMessage msg = emsMqttMessageMapper.selectEmsMqttMessageById(1L);
String dataJson = msg.getMqttMessage();
String topic = msg.getMqttTopic();
logger.info("topic" + topic);
logger.info("dataJson" + dataJson);
// 将设备数据下发到mqtt服务器上
mqttLifecycleManager.publish(topic, dataJson, 0);*/
// 为每个 host:port 启动一个任务
for (Map.Entry<String, List<DeviceConfig>> entry : groupedConfigs.entrySet()) {
String groupKey = entry.getKey();
List<DeviceConfig> configs = entry.getValue();
// 取其中一个配置的时间间隔作为该组任务的执行周期
long interval = configs.get(0).getTime();
scheduledTask.startTask(groupKey, () -> {
for (DeviceConfig config : configs) {
try {
scheduledStart(config, null);
} catch (Exception e) {
log.error("采集设备数据异常: {}", config.getDeviceName(), e);
}
}
}, interval);
}
}
private EmsDeviceChangeLog createLogEntity(String beforeStatus, EmsDevicesSetting device) {
EmsDeviceChangeLog log = new EmsDeviceChangeLog();
log.setLogId(UUID.randomUUID().toString());
log.setLogTime(new Date());
log.setSiteId(device.getSiteId());
log.setDeviceId(device.getDeviceId());
log.setBeforeStatus(beforeStatus);
log.setAfterStatus(device.getDeviceStatus());
log.setCreateBy("sys");
log.setCreateTime(new Date());
return log;
public void scheduledStart(DeviceConfig config, ModbusMaster master) {
try {
if (config.isEnabled()) {
log.info("Reading data from devices: {}", config.getDeviceName());
Map<String, Object> data = modbusProcessor.readDataFromDevice(config, master);
// 在这里处理采集到的数据
config.getTags().forEach(tag -> {
Object rawValue = data.get(tag.getKey());
if (rawValue != null) {
float value = 0;
if (rawValue instanceof Number) {
value = ((Number) rawValue).floatValue(); // 安全地转换为 float
} else {
log.error("tag:{},无法将数据转换为数字: {}", tag.getKey(), rawValue);
}
value = tag.getA() * value * value + tag.getK() * value + tag.getB();
int intValue = (int) value;
if (tag.getBit() != null) {
log.info("tag:{},bit:{},value:{}", tag.getKey(), tag.getBit(), value);
String binary = Integer.toBinaryString(intValue);
data.put(tag.getKey(), binary);
} else {
data.put(tag.getKey(), value);
}
} else {
data.put(tag.getKey(), rawValue);
log.warn("tag:{},数据为空: {}", tag.getKey(), rawValue);
}
});
log.info("Data from {}: {}", config.getDeviceName(), data);
String deviceNumber = config.getDeviceNumber();
//处理数据并发送MQTT消息、保存Redis数据和数据入库
processingData(data, deviceNumber);
}
} catch (Exception e) {
log.error("设备数据采集异常: {}", config.getDeviceName(), e);
}
}
private void processingData(Map<String, Object> data, String deviceNumber) {
if (data == null || data.size() == 0) {
// 添加设备告警
iEmsAlarmRecordsService.addEmptyDataAlarmRecord(siteId, deviceNumber);
return;
}
Long timestamp = System.currentTimeMillis();
JSONObject json = new JSONObject();
json.put("Data", data);
json.put("timestamp", timestamp);
json.put("Device", deviceNumber);
sendMqttMsg(json);
saveRedisData(json, deviceNumber);
saveDataToDatabase(data, deviceNumber, timestamp);
}
public void sendMqttMsg(JSONObject json) {
try {
mqttPublisher.publish(topic, Collections.singletonList(json).toString(), 0);
} catch (MqttException e) {
log.error("MQTT消息发布失败: {}", json.toJSONString(), e);
}
log.info("已发送数据: {}", json.toJSONString());
}
public void saveRedisData(JSONObject obj, String deviceNumber) {
try {
// 存放mqtt原始每个设备最晚一次数据便于后面点位获取数据
redisCache.setCacheObject(RedisKeyConstants.ORIGINAL_MQTT_DATA + siteId + "_" + deviceNumber, obj);
// 存放每次同步数据,失效时间(同同步时间)-用于判断是否正常同步数据和保护策略查询
redisCache.setCacheObject(RedisKeyConstants.SYNC_DATA_ALARM + siteId + "_" + deviceNumber, obj, 1, TimeUnit.MINUTES);
log.info("数据已成功存储在Redis: {}", deviceNumber);
} catch (Exception e) {
log.error("无法在设备的Redis中存储数据: {}", deviceNumber, e);
}
}
private void saveDataToDatabase(Map<String, Object> data, String deviceNumber, Long timestamp) {
deviceDataProcessServiceImpl.processingDeviceData(siteId, deviceNumber, JSON.toJSONString(data), DateUtils.convertUpdateTime(timestamp));
}
}