平台修改意见20251120-设备点位匹配解析;上传点位清单修改;
This commit is contained in:
@ -118,6 +118,17 @@ public class EmsStatisticalReportController extends BaseController
|
||||
return getDataTable(dataList);
|
||||
}
|
||||
|
||||
/**
|
||||
* 概率统计-电表收益报表
|
||||
*/
|
||||
@GetMapping("/getAmmeterRevenueData")
|
||||
public TableDataInfo getAmmeterRevenueData(StatisAmmeterDateRequest requestVo)
|
||||
{
|
||||
startPage();
|
||||
List<AmmeterRevenueStatisListVo> dataList = ieEmsStatsReportService.getAmmeterRevenueDataResult(requestVo);
|
||||
return getDataTable(dataList);
|
||||
}
|
||||
|
||||
/**
|
||||
* 概率统计-功率曲线
|
||||
*/
|
||||
|
||||
@ -1,6 +1,9 @@
|
||||
package com.xzzn.web.controller.ems;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.xzzn.common.constant.RedisKeyConstants;
|
||||
import com.xzzn.common.core.redis.RedisCache;
|
||||
import com.xzzn.common.enums.TopicHandleType;
|
||||
import com.xzzn.common.utils.StringUtils;
|
||||
import com.xzzn.ems.domain.EmsMqttTopicConfig;
|
||||
import com.xzzn.ems.domain.MqttSyncLog;
|
||||
@ -39,6 +42,9 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber {
|
||||
@Autowired
|
||||
private IDDSDataProcessService dDSDataProcessService;
|
||||
|
||||
@Autowired
|
||||
private IDeviceDataProcessService deviceDataProcessService;
|
||||
|
||||
@Autowired
|
||||
private IFXXAlarmDataProcessService fXXAlarmDataProcessService;
|
||||
@Autowired
|
||||
@ -48,6 +54,9 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber {
|
||||
@Autowired
|
||||
private IMqttSyncLogService iMqttSyncLogService;
|
||||
|
||||
@Autowired
|
||||
private RedisCache redisCache;
|
||||
|
||||
@Autowired
|
||||
public MqttMessageController(MqttLifecycleManager mqttLifecycleManager) {
|
||||
this.mqttLifecycleManager = mqttLifecycleManager;
|
||||
@ -75,7 +84,7 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber {
|
||||
qos = 1;
|
||||
}
|
||||
|
||||
IMqttMessageListener listener = getMqttListenerByTopic(topic, topicConfig.getId());
|
||||
IMqttMessageListener listener = getMqttListenerByTopic(topic, topicConfig.getHandleType());
|
||||
subscribe(topic, qos, listener);
|
||||
}
|
||||
// 订阅奉贤系统状态主题
|
||||
@ -93,23 +102,28 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber {
|
||||
|
||||
}
|
||||
|
||||
private IMqttMessageListener getMqttListenerByTopic(String topic, Long id) {
|
||||
if (topic.contains("ALARM_UP")) {
|
||||
return this::handleAlarmData;
|
||||
} else if (topic.endsWith("_01")) {
|
||||
return this::handleSystemStatus;
|
||||
} else if (topic.contains("STRATEGY")) {
|
||||
return this::handleStrategyData;
|
||||
} else if (topic.equals("FAULT_PROTECTION_PLAN_UP")) {
|
||||
return this::handleFaultProtPlanData;
|
||||
} else if (topic.equals("FAULT_ALARM_RECORD_UP")) {
|
||||
return this::handleFaultAlarmData;
|
||||
} else if (topic.equals("FAULT_PLAN_ISSUE_UP")) {
|
||||
return this::handleFaultPlanIssueData;
|
||||
} else if (topic.equals("DEVICE_CHANGE_LOG_UP")) {
|
||||
return this::handleDeviceChangeLogData;
|
||||
} else {
|
||||
return this::handleDeviceData;
|
||||
private IMqttMessageListener getMqttListenerByTopic(String topic, String handleType) {
|
||||
TopicHandleType topicHandleType = TopicHandleType.getEnumByCode(handleType);
|
||||
switch (topicHandleType) {
|
||||
case DEVICE:
|
||||
return this::handleDeviceData;
|
||||
case DEVICE_ALARM:
|
||||
return this::handleAlarmData;
|
||||
case STRATEGY:
|
||||
if (topic.equals("FAULT_PROTECTION_PLAN_UP")) {
|
||||
return this::handleFaultProtPlanData;
|
||||
} else if (topic.equals("FAULT_ALARM_RECORD_UP")) {
|
||||
return this::handleFaultAlarmData;
|
||||
} else if (topic.equals("FAULT_PLAN_ISSUE_UP")) {
|
||||
return this::handleFaultPlanIssueData;
|
||||
} else if (topic.equals("DEVICE_CHANGE_LOG_UP")) {
|
||||
return this::handleDeviceChangeLogData;
|
||||
} else {
|
||||
return this::handleStrategyData;
|
||||
}
|
||||
default:
|
||||
log.warn("Unknown handle type: " + handleType + ", using default handler");
|
||||
return this::handleSystemStatus;
|
||||
}
|
||||
}
|
||||
|
||||
@ -129,18 +143,19 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber {
|
||||
// 处理设备数据
|
||||
private void handleDeviceData(String topic, MqttMessage message) {
|
||||
String payload = new String(message.getPayload());
|
||||
System.out.println("[DEVICE] data: " + payload);
|
||||
log.info("[DEVICE] data: " + payload);
|
||||
try {
|
||||
// 业务处理逻辑
|
||||
if (topic.startsWith("021_DDS")) {
|
||||
dDSDataProcessService.handleDdsData(payload);
|
||||
} else if (topic.startsWith("021_FXX")) {
|
||||
fXXDataProcessService.handleFxData(payload);
|
||||
}
|
||||
// if (topic.startsWith("021_DDS")) {
|
||||
// dDSDataProcessService.handleDdsData(payload);
|
||||
// } else if (topic.startsWith("021_FXX")) {
|
||||
// fXXDataProcessService.handleFxData(payload);
|
||||
// }
|
||||
deviceDataProcessService.handleDeviceData(payload, getSiteIdByTopic(topic));
|
||||
|
||||
emsMqttMessageService.insertMqttOriginalMessage(topic,payload);
|
||||
emsMqttMessageService.insertMqttOriginalMessage(topic, payload);
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to process system status message: " + e.getMessage(), e);
|
||||
log.error("Failed to process device data message: " + e.getMessage(), e);
|
||||
}
|
||||
|
||||
}
|
||||
@ -151,17 +166,29 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber {
|
||||
System.out.println("[DEVICE] data: " + payload);
|
||||
try {
|
||||
// 业务处理逻辑
|
||||
if (topic.startsWith("021_FXX")) {
|
||||
fXXAlarmDataProcessService.handleFxAlarmData(payload);
|
||||
}
|
||||
// if (topic.startsWith("021_FXX")) {
|
||||
// fXXAlarmDataProcessService.handleFxAlarmData(payload);
|
||||
// }
|
||||
deviceDataProcessService.handleAlarmData(payload, getSiteIdByTopic(topic));
|
||||
|
||||
emsMqttMessageService.insertMqttOriginalMessage(topic,payload);
|
||||
emsMqttMessageService.insertMqttOriginalMessage(topic, payload);
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to process alarm data message: " + e.getMessage(), e);
|
||||
log.error("Failed to process device alarm data message: " + e.getMessage(), e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private String getSiteIdByTopic(String topic) {
|
||||
String siteId = redisCache.getCacheObject(RedisKeyConstants.SITE_ID + topic);
|
||||
if (StringUtils.isEmpty(siteId)) {
|
||||
EmsMqttTopicConfig topicConfig = emsMqttTopicConfigMapper.selectOneByTopic(topic);
|
||||
siteId = topicConfig.getSiteId();
|
||||
redisCache.setCacheObject(RedisKeyConstants.SITE_ID + topic, siteId);
|
||||
}
|
||||
log.info("当前处理数据站点:" + siteId + ",topic: " + topic);
|
||||
return siteId;
|
||||
}
|
||||
|
||||
// 处理运行策略数据:云端-本地
|
||||
private void handleStrategyData(String topic, MqttMessage message) {
|
||||
String payload = new String(message.getPayload());
|
||||
|
||||
Reference in New Issue
Block a user