奉贤数据接入

This commit is contained in:
2025-06-27 17:08:26 +08:00
parent 31dc8e72c6
commit 365dd819b5
5 changed files with 407 additions and 0 deletions

View File

@ -1,7 +1,17 @@
package com.xzzn.web.controller.ems;
import com.alibaba.druid.support.json.JSONParser;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.TypeReference;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.xzzn.ems.domain.EmsBatteryData;
import com.xzzn.ems.domain.EmsMqttMessage;
import com.xzzn.ems.domain.EmsPcsData;
import com.xzzn.ems.service.IEmsBatteryDataService;
import com.xzzn.ems.service.IEmsMqttMessageService;
import com.xzzn.ems.service.IEmsPcsDataService;
import com.xzzn.framework.manager.MqttLifecycleManager;
import com.xzzn.framework.web.service.MqttPublisher;
import com.xzzn.framework.web.service.MqttSubscriber;
@ -15,6 +25,11 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
public class MqttMessageController implements MqttPublisher, MqttSubscriber {
@ -28,6 +43,11 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber {
@Autowired
private IEmsMqttMessageService emsMqttMessageService;
@Autowired
private IEmsBatteryDataService emsBatteryDataService;
@Autowired
private IEmsPcsDataService emsPcsDataService;
@Autowired
public MqttMessageController(MqttLifecycleManager mqttLifecycleManager) {
@ -70,6 +90,8 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber {
System.out.println("[DEVICE] data: " + payload);
try {
// 业务处理逻辑
// handleFxData(JSONArray.parseArray(payload).get(0).toString());
EmsMqttMessage mqttMessage = new EmsMqttMessage();
mqttMessage.setMqttTopic(topic);
mqttMessage.setMqttMessage(payload);
@ -110,4 +132,73 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber {
System.err.println("Failed to send command to device " + deviceId);
}
}
private void handleFxData(String message) {
List<String> upAll = JSON.parseObject(message, new TypeReference<ArrayList<String>>() {});
for (int i = 0; i < upAll.size(); i++) {
JSONObject obj = JSONObject.parseObject(upAll.get(i));
String deviceId = obj.get("Device").toString();
String jsonData = obj.get("Data").toString();
if (deviceId.contains("BMS")) {
Map<String, Map<String, Object>> records = processData(JSON.parseObject(jsonData, new TypeReference<Map<String, Object>>() {}));
//单体电池
for (Map.Entry<String, Map<String, Object>> record : records.entrySet()) {
String recordId = record.getKey();
Map<String, Object> fields = record.getValue();
EmsBatteryData data = new EmsBatteryData();
data.setDeviceId(Long.parseLong(recordId));
data.setBatteryCellId(recordId);
data.setSoc(new BigDecimal(fields.get("DTSOC").toString()));
data.setSoh(new BigDecimal(fields.get("DTSOH").toString()));
data.setTemperature(new BigDecimal(fields.get("DTWD").toString()));
data.setVoltage(new BigDecimal(fields.get("DTDY").toString()));
data.setBatteryCluster(deviceId);
data.setBatteryPack(deviceId);
emsBatteryDataService.insertEmsBatteryData(data);
}
} else if (deviceId.contains("PCS")) {
//pcs
EmsPcsData data = new EmsPcsData();
emsPcsDataService.insertEmsPcsData(data);
}
}
}
// 数据分组处理
private static Map<String, Map<String, Object>> processData(Map<String, Object> rawData) {
Map<String, Map<String, Object>> records = new HashMap<>();
for (Map.Entry<String, Object> entry : rawData.entrySet()) {
String key = entry.getKey();
// 提取记录ID最后3位
String recordId = key.substring(key.length() - 3);
// 提取字段类型(前缀)
String fieldType = key.substring(0, key.length() - 3);
// 初始化记录
records.putIfAbsent(recordId, new HashMap<>());
// 存入字段值
records.get(recordId).put(fieldType, entry.getValue());
}
return records;
}
}