modbus读取本地设备数据增加告警逻辑
This commit is contained in:
@ -8,12 +8,18 @@ import com.xzzn.common.constant.RedisKeyConstants;
|
||||
import com.xzzn.common.core.modbus.ModbusProcessor;
|
||||
import com.xzzn.common.core.modbus.domain.DeviceConfig;
|
||||
import com.xzzn.common.core.redis.RedisCache;
|
||||
import com.xzzn.common.enums.DeviceRunningStatus;
|
||||
import com.xzzn.common.utils.DateUtils;
|
||||
import com.xzzn.ems.domain.EmsDevicesSetting;
|
||||
import com.xzzn.ems.mapper.EmsDevicesSettingMapper;
|
||||
import com.xzzn.ems.service.IEmsAlarmRecordsService;
|
||||
import com.xzzn.ems.service.impl.DeviceDataProcessServiceImpl;
|
||||
import com.xzzn.framework.manager.MqttLifecycleManager;
|
||||
import com.xzzn.framework.web.service.MqttPublisher;
|
||||
import com.xzzn.quartz.config.ScheduledTask;
|
||||
import com.xzzn.quartz.domain.SysJob;
|
||||
import com.xzzn.quartz.service.ISysJobService;
|
||||
import com.xzzn.quartz.util.CronUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
@ -24,6 +30,8 @@ import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -44,14 +52,19 @@ public class ModbusPoller {
|
||||
private final MqttLifecycleManager mqttLifecycleManager;
|
||||
private final ScheduledTask scheduledTask;
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
private final Map<String, Integer> deviceFailureCounts = new ConcurrentHashMap<>();
|
||||
|
||||
@Autowired
|
||||
private ModbusProcessor modbusProcessor;
|
||||
@Autowired
|
||||
private IEmsAlarmRecordsService iEmsAlarmRecordsService;
|
||||
@Autowired
|
||||
private ISysJobService iSysJobService;
|
||||
@Autowired
|
||||
private DeviceDataProcessServiceImpl deviceDataProcessServiceImpl;
|
||||
@Autowired
|
||||
private EmsDevicesSettingMapper emsDevicesSettingMapper;
|
||||
@Autowired
|
||||
private RedisCache redisCache;
|
||||
@Autowired
|
||||
private MqttPublisher mqttPublisher;
|
||||
@ -101,16 +114,17 @@ public class ModbusPoller {
|
||||
}
|
||||
}
|
||||
|
||||
int interval = getScheduledTaskInterval();
|
||||
// 为每个 host:port 启动一个任务
|
||||
for (Map.Entry<String, List<DeviceConfig>> entry : groupedConfigs.entrySet()) {
|
||||
String groupKey = entry.getKey();
|
||||
List<DeviceConfig> configs = entry.getValue();
|
||||
// 取其中一个配置的时间间隔作为该组任务的执行周期
|
||||
long interval = configs.get(0).getTime();
|
||||
// long interval = configs.get(0).getTime();
|
||||
scheduledTask.startTask(groupKey, () -> {
|
||||
for (DeviceConfig config : configs) {
|
||||
try {
|
||||
scheduledStart(config, null);
|
||||
scheduledStart(config);
|
||||
} catch (Exception e) {
|
||||
log.error("采集设备数据异常: {}", config.getDeviceName(), e);
|
||||
}
|
||||
@ -119,52 +133,72 @@ public class ModbusPoller {
|
||||
}
|
||||
}
|
||||
|
||||
public void scheduledStart(DeviceConfig config, ModbusMaster master) {
|
||||
try {
|
||||
if (config.isEnabled()) {
|
||||
log.info("Reading data from devices: {}", config.getDeviceName());
|
||||
Map<String, Object> data = modbusProcessor.readDataFromDevice(config, master);
|
||||
// 在这里处理采集到的数据
|
||||
config.getTags().forEach(tag -> {
|
||||
Object rawValue = data.get(tag.getKey());
|
||||
if (rawValue != null) {
|
||||
float value = 0;
|
||||
if (rawValue instanceof Number) {
|
||||
value = ((Number) rawValue).floatValue(); // 安全地转换为 float
|
||||
} else {
|
||||
log.error("tag:{},无法将数据转换为数字: {}", tag.getKey(), rawValue);
|
||||
}
|
||||
value = tag.getA() * value * value + tag.getK() * value + tag.getB();
|
||||
|
||||
int intValue = (int) value;
|
||||
if (tag.getBit() != null) {
|
||||
log.info("tag:{},bit:{},value:{}", tag.getKey(), tag.getBit(), value);
|
||||
String binary = Integer.toBinaryString(intValue);
|
||||
data.put(tag.getKey(), binary);
|
||||
} else {
|
||||
data.put(tag.getKey(), value);
|
||||
}
|
||||
} else {
|
||||
data.put(tag.getKey(), rawValue);
|
||||
log.warn("tag:{},数据为空: {}", tag.getKey(), rawValue);
|
||||
}
|
||||
});
|
||||
log.info("Data from {}: {}", config.getDeviceName(), data);
|
||||
String deviceNumber = config.getDeviceNumber();
|
||||
//处理数据并发送MQTT消息、保存Redis数据和数据入库
|
||||
processingData(data, deviceNumber);
|
||||
public void scheduledStart(DeviceConfig config) {
|
||||
if (config.isEnabled()) {
|
||||
log.info("Reading data from devices: {}", config.getDeviceName());
|
||||
ModbusMaster master = null;
|
||||
try {
|
||||
master = modbusProcessor.borrowMaster(config);
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to borrow connection '{}'", config.getDeviceName(), e);
|
||||
// 处理设备连接失败的情况,更新设备状态为离线,添加报警记录
|
||||
addDeviceOfflineRecord(siteId, config.getDeviceNumber());
|
||||
return;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("设备数据采集异常: {}", config.getDeviceName(), e);
|
||||
Map<String, Object> data = modbusProcessor.readDataFromDevice(config, master);
|
||||
// 在这里处理采集到的数据
|
||||
config.getTags().forEach(tag -> {
|
||||
Object rawValue = data.get(tag.getKey());
|
||||
if (rawValue != null) {
|
||||
float value = 0;
|
||||
if (rawValue instanceof Number) {
|
||||
value = ((Number) rawValue).floatValue(); // 安全地转换为 float
|
||||
} else {
|
||||
log.error("tag:{},无法将数据转换为数字: {}", tag.getKey(), rawValue);
|
||||
}
|
||||
value = tag.getA() * value * value + tag.getK() * value + tag.getB();
|
||||
|
||||
int intValue = (int) value;
|
||||
if (tag.getBit() != null) {
|
||||
log.info("tag:{},bit:{},value:{}", tag.getKey(), tag.getBit(), value);
|
||||
String binary = Integer.toBinaryString(intValue);
|
||||
data.put(tag.getKey(), binary);
|
||||
} else {
|
||||
data.put(tag.getKey(), value);
|
||||
}
|
||||
} else {
|
||||
data.put(tag.getKey(), rawValue);
|
||||
log.warn("tag:{},数据为空: {}", tag.getKey(), rawValue);
|
||||
}
|
||||
});
|
||||
log.info("Data from {}: {}", config.getDeviceName(), data);
|
||||
String deviceNumber = config.getDeviceNumber();
|
||||
//处理数据并发送MQTT消息、保存Redis数据和数据入库
|
||||
processingData(data, deviceNumber);
|
||||
}
|
||||
}
|
||||
|
||||
private void processingData(Map<String, Object> data, String deviceNumber) {
|
||||
if (data == null || data.size() == 0) {
|
||||
// 添加设备告警
|
||||
iEmsAlarmRecordsService.addEmptyDataAlarmRecord(siteId, deviceNumber);
|
||||
// 增加失败计数
|
||||
int failureCount = deviceFailureCounts.getOrDefault(deviceNumber, 0) + 1;
|
||||
deviceFailureCounts.put(deviceNumber, failureCount);
|
||||
|
||||
log.warn("设备 {} 数据读取失败,当前连续失败次数: {}", deviceNumber, failureCount);
|
||||
|
||||
// 连续6次失败触发报警
|
||||
if (failureCount >= 6) {
|
||||
addDeviceOfflineRecord(siteId, deviceNumber);
|
||||
log.error("设备 {} 连续 {} 次未读取到数据,触发报警", deviceNumber, failureCount);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// 数据读取成功,重置计数器
|
||||
deviceFailureCounts.remove(deviceNumber);
|
||||
updateDeviceStatus(siteId, deviceNumber, DeviceRunningStatus.RUNNING.getCode());
|
||||
|
||||
// 发送MQTT消息、保存Redis数据和数据入库
|
||||
Long timestamp = System.currentTimeMillis();
|
||||
JSONObject json = new JSONObject();
|
||||
json.put("Data", data);
|
||||
@ -201,4 +235,26 @@ public class ModbusPoller {
|
||||
deviceDataProcessServiceImpl.processingDeviceData(siteId, deviceNumber, JSON.toJSONString(data), DateUtils.convertUpdateTime(timestamp));
|
||||
}
|
||||
|
||||
//处理设备连接失败的情况,更新设备状态为离线,添加报警记录
|
||||
private void addDeviceOfflineRecord(String siteId, String deviceNumber) {
|
||||
updateDeviceStatus(siteId, deviceNumber, DeviceRunningStatus.OFFLINE.getCode());
|
||||
iEmsAlarmRecordsService.addDeviceOfflineRecord(siteId, deviceNumber);
|
||||
}
|
||||
|
||||
// 更新设备状态为在线或离线
|
||||
private void updateDeviceStatus(String siteId, String deviceNumber, String deviceStatus) {
|
||||
EmsDevicesSetting emsDevicesSetting = emsDevicesSettingMapper.getDeviceBySiteAndDeviceId(deviceNumber, siteId);
|
||||
if (emsDevicesSetting != null && !Objects.equals(emsDevicesSetting.getDeviceStatus(), deviceStatus)) {
|
||||
emsDevicesSetting.setDeviceStatus(deviceStatus);
|
||||
emsDevicesSettingMapper.updateEmsDevicesSetting(emsDevicesSetting);
|
||||
}
|
||||
}
|
||||
|
||||
private int getScheduledTaskInterval() {
|
||||
SysJob query = new SysJob();
|
||||
query.setInvokeTarget("modbusPoller.pollAllDevices");
|
||||
List<SysJob> sysJobs = iSysJobService.selectJobList(query);
|
||||
return Math.toIntExact(CronUtils.getNextExecutionIntervalMillis(sysJobs.get(0).getCronExpression()));
|
||||
}
|
||||
|
||||
}
|
||||
@ -60,4 +60,25 @@ public class CronUtils
|
||||
throw new IllegalArgumentException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 返回当前时间到下一次执行时间间隔的毫秒
|
||||
*/
|
||||
public static long getNextExecutionIntervalMillis(String cronExpression)
|
||||
{
|
||||
try
|
||||
{
|
||||
CronExpression cron = new CronExpression(cronExpression);
|
||||
Date now = new Date();
|
||||
Date nextExecution = cron.getNextValidTimeAfter(now);
|
||||
Date nextExecution2 = cron.getNextValidTimeAfter(nextExecution);
|
||||
|
||||
return nextExecution2.getTime() - nextExecution.getTime();
|
||||
}
|
||||
catch (ParseException e)
|
||||
{
|
||||
throw new IllegalArgumentException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user