Files
sipaiis_datacollector/src/main/java/com/sipai/config/UDPServer.java
2026-01-29 17:06:01 +08:00

859 lines
41 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package com.sipai.config;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.sipai.entity.data.DeviceData;
import com.sipai.entity.data.PipelineEquipment;
import com.sipai.entity.data.PipelineEquipmentMpoint;
import com.sipai.entity.data.SensorData;
import com.sipai.entity.mqtt.MqttConfig;
import com.sipai.entity.scada.MPoint;
import com.sipai.entity.scada.MPointES;
import com.sipai.entity.scada.MPointHistory;
import com.sipai.service.data.PipelineEquipmentMpointService;
import com.sipai.service.data.PipelineEquipmentService;
import com.sipai.service.mqtt.MqttConfigService;
import com.sipai.service.scada.MPointHistoryService;
import com.sipai.service.scada.MPointService;
import com.sipai.tools.AlarmType;
import com.sipai.tools.CommString;
import com.sipai.tools.CommUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.redisson.api.RBatch;
import org.redisson.api.RMapCache;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* 南昌项目upd接收数据处理
* sj 2025-05-21
*/
@Component
public class UDPServer implements CommandLineRunner {
@Value("${udp.server.port}") //端口
private int port;
@Value("${udp.server.enabled:true}") // 默认值为true
private boolean enabled;
private static final int BUFFER_SIZE = 1024;
@Autowired
private PipelineEquipmentService pipelineEquipmentService;
@Autowired
private PipelineEquipmentMpointService pipelineEquipmentMpointService;
@Autowired
private MPointService mPointService;
@Autowired
private MPointHistoryService mPointHistoryService;
@Autowired
private MqttConfigService mqttConfigService;
//缓存点位
private static final Map<String, String> dataCache = new ConcurrentHashMap<>();
@Override
public void run(String... args) throws Exception {
if (!enabled) {
System.out.println("UDP服务未启用");
return;
}
try (DatagramSocket socket = new DatagramSocket(port)) {
System.out.println("UDP服务端启动监听端口: " + port);
byte[] buffer = new byte[BUFFER_SIZE];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
while (true) {
socket.receive(packet);
// System.out.println(CommUtil.nowDate() + "======收到来自 " + packet.getAddress() + ":" + packet.getPort() + " 的数据");
// 发送应答
String response = "@222\r\n";
byte[] responseData = response.getBytes(StandardCharsets.UTF_8);
DatagramPacket responsePacket = new DatagramPacket(
responseData, responseData.length,
packet.getAddress(), packet.getPort());
socket.send(responsePacket);
// 解析数据包
byte[] receivedData = new byte[packet.getLength()];
System.arraycopy(packet.getData(), 0, receivedData, 0, packet.getLength());
// 打印原始字节数组(十六进制格式)
// System.out.println("原始数据包(十六进制): " + bytesToHex(receivedData));
DeviceData data = parseDeviceData(Unpooled.wrappedBuffer(receivedData));
if (data != null) {
String jsonData = CommUtil.toJson(data);
JSONObject jsonObject = JSON.parseObject(jsonData);
System.out.println("站号收到数据:" + jsonObject.get("stationId")+""+jsonObject.get("startTime"));
JSONArray jsonArray = JSONArray.parseArray(jsonObject.get("sensorDataList").toString());
if (jsonArray != null && !jsonArray.isEmpty()) {
processSensorData(jsonObject, jsonArray, bytesToHex(receivedData));
}
}
}
} catch (IOException e) {
Logger logger = LoggerFactory.getLogger(UDPServer.class);
logger.error("UDP服务端启动异常", e);
e.printStackTrace();
}
}
//每次收到的报文存到D盘下 - 用于调试
private void logs(String type, String bytes, String path) {
try {
// 获取当前时间
LocalDateTime now = LocalDateTime.now();
// 定义文件名(保持原来的格式)
String fileName = DateTimeFormatter
.ofPattern("yyyyMMdd_HHmmss")
.format(now) + "_" + type + ".txt";
// 定义文件路径D:/data_logs/年-月-日/自定义路径/
String dateDir = DateTimeFormatter.ofPattern("yyyy-MM-dd").format(now);
Path dirPath = Paths.get("D:/data_logs_new/" + dateDir + "/" + path + "/");
if (!Files.exists(dirPath)) {
Files.createDirectories(dirPath); // 如果目录不存在,则创建
}
Path filePath = dirPath.resolve(fileName); // 完整路径
Files.write(filePath, bytes.getBytes()); // 写入内容到文件
} catch (IOException e) {
e.printStackTrace();
}
}
//分发到各个业务处理
private void processSensorData(JSONObject jsonObject, JSONArray jsonArray, String bytes) {
String stationId = jsonObject.get("stationId").toString();
switch (stationId.charAt(1)) {
case '1'://井盖
logs(stationId, bytes, "jinggai");
processManholeCoverData(jsonObject, jsonArray);
break;
case '2'://流量计
logs(stationId, bytes, "liuliang");
processFlowMeterData(jsonObject, jsonArray);
break;
case '3'://液位计
logs(stationId, bytes, "yewei");
processLevelMeterData(jsonObject, jsonArray);
break;
case '4'://电导率
logs(stationId, bytes, "diandao");
processConductivityMeterData(jsonObject, jsonArray);
break;
case '5'://雨量计
logs(stationId, bytes, "yuliang");
processRainGaugeData(jsonObject, jsonArray);
break;
case '6'://内涝监测
logs(stationId, bytes, "neilao");
processWaterloggingMonitoringData(jsonObject, jsonArray);
break;
default:
System.out.println("未知设备类型");
}
}
// 井盖处理逻辑
private void processManholeCoverData(JSONObject jsonObject, JSONArray jsonArray) {
// System.out.println("开始处理井盖数据==============================================");
int dataCount = jsonObject.getIntValue("dataCount");
String startTimeStr = jsonObject.getString("startTime");
int intervalMinutes = jsonObject.getIntValue("intervalMinutes");
String stationId = jsonObject.getString("stationId");
double batteryVoltage = jsonObject.getDouble("batteryVoltage"); // 获取电池电压
// System.out.println("数据个数:" + dataCount);
// System.out.println("开始时间:" + startTimeStr);
// System.out.println("间隔分钟:" + intervalMinutes);
//单独处理电压
String channelName = stationId + "_V1";
String mpcode = getMpcodeFromCacheOrDB(stationId, channelName, "V1");
if (!mpcode.equals("未配置")) {
//推送kafka
mPointService.sendKafka4UDP("",mpcode, batteryVoltage, startTimeStr, 0, 1);
//推送mqtt
JSONArray jsonArray_mqtt = new JSONArray();
JSONObject jsonObject_mqtt = new JSONObject();
jsonObject_mqtt.put("mpcode", mpcode);
jsonObject_mqtt.put("value", batteryVoltage);
jsonObject_mqtt.put("time", startTimeStr);
jsonArray_mqtt.add(jsonObject_mqtt);
mqttConfigService.doSendView(jsonArray_mqtt,CommString.Mqtt_Topic_DATA);
}
// 生成时间点序列
List<String> timePoints = generateTimePoints(startTimeStr, dataCount, intervalMinutes);
// 处理模拟量数据 (type=4)
// System.out.println("\n模拟量数据:");
Map<Integer, String> analogMappings = new HashMap<>();
analogMappings.put(1, "A1"); // 信号强度
analogMappings.put(2, "A2"); // 基站序号
analogMappings.put(3, "A3"); // 井盖角度
analogMappings.put(4, "A4"); // 电阻值1
analogMappings.put(5, "A5"); // 电阻值2
processSensorData(stationId, jsonArray, 4, analogMappings, "analogValues", timePoints);
// 创建新的JSONArray来存储处理后的报警数据
JSONArray processedAlarmData = new JSONArray();
// 处理报警量数据 (type=5)
// System.out.println("\n报警量数据:");
for (Object obj : jsonArray) {
JSONObject sensorData = (JSONObject) obj;
if (sensorData.getIntValue("type") == 5) {
int qValue = sensorData.getIntValue("qValue");
// 提取各报警位状态(0/1)
int voltageAlarm = (qValue & 0x0001) != 0 ? 1 : 0; // Q10
int waterAlarm = (qValue & 0x0002) != 0 ? 1 : 0; // Q11
int coverAlarm = (qValue & 0x0004) != 0 ? 1 : 0; // Q12
// 为每个报警位创建单独的数据对象
JSONObject voltageAlarmData = new JSONObject();
voltageAlarmData.put("type", 5);
voltageAlarmData.put("channel", 0); // Q10对应channel 0
voltageAlarmData.put("qValues", Collections.nCopies(dataCount, voltageAlarm));
JSONObject waterAlarmData = new JSONObject();
waterAlarmData.put("type", 5);
waterAlarmData.put("channel", 1); // Q11对应channel 1
waterAlarmData.put("qValues", Collections.nCopies(dataCount, waterAlarm));
JSONObject coverAlarmData = new JSONObject();
coverAlarmData.put("type", 5);
coverAlarmData.put("channel", 2); // Q12对应channel 2
coverAlarmData.put("qValues", Collections.nCopies(dataCount, coverAlarm));
// 添加到处理后的报警数据数组
processedAlarmData.add(voltageAlarmData);
processedAlarmData.add(waterAlarmData);
processedAlarmData.add(coverAlarmData);
} else {
// 非报警数据直接保留
processedAlarmData.add(sensorData);
}
}
// 处理报警数据存储
Map<Integer, String> alarmMappings = new HashMap<>();
alarmMappings.put(0, "Q10"); // 电压报警
alarmMappings.put(1, "Q11"); // 水浸报警
alarmMappings.put(2, "Q12"); // 开盖报警
processSensorData(stationId, processedAlarmData, 5, alarmMappings, "qValues", timePoints);
}
// 流量计处理逻辑
private void processFlowMeterData(JSONObject jsonObject, JSONArray jsonArray) {
// System.out.println("开始处理流量计数据==============================================");
int dataCount = jsonObject.getIntValue("dataCount");
String startTimeStr = jsonObject.getString("startTime");
int intervalMinutes = jsonObject.getIntValue("intervalMinutes");
String stationId = jsonObject.getString("stationId");
// System.out.println("数据个数:" + dataCount);
// System.out.println("开始时间:" + startTimeStr);
// System.out.println("间隔分钟:" + intervalMinutes);
// 生成时间点序列
List<String> timePoints = generateTimePoints(startTimeStr, dataCount, intervalMinutes);
// 处理脉冲量数据 (type=1)
// System.out.println("\n脉冲量数据:");
Map<Integer, String> pulseMappings = new HashMap<>();
pulseMappings.put(1, "P1");
pulseMappings.put(2, "P2");
pulseMappings.put(3, "P3");
processSensorData(stationId, jsonArray, 1, pulseMappings, "accumulatedFlows", timePoints);
// 处理模拟量数据 (type=4)
// System.out.println("\n模拟量数据:");
Map<Integer, String> analogMappings = new HashMap<>();
analogMappings.put(1, "A1");
analogMappings.put(2, "A2");
analogMappings.put(4, "A4");
analogMappings.put(7, "A7");
analogMappings.put(8, "A8");
processSensorData(stationId, jsonArray, 4, analogMappings, "analogValues", timePoints);
}
// 液位计处理逻辑
private void processLevelMeterData(JSONObject jsonObject, JSONArray jsonArray) {
// System.out.println("开始处理液位计数据==============================================");
int dataCount = jsonObject.getIntValue("dataCount");
String startTimeStr = jsonObject.getString("startTime");
int intervalMinutes = jsonObject.getIntValue("intervalMinutes");
String stationId = jsonObject.getString("stationId");
// System.out.println("数据个数:" + dataCount);
// System.out.println("开始时间:" + startTimeStr);
// System.out.println("间隔分钟:" + intervalMinutes);
// 生成时间点序列
List<String> timePoints = generateTimePoints(startTimeStr, dataCount, intervalMinutes);
// 处理模拟量数据 (type=4)
// System.out.println("\n液位计模拟量数据:");
Map<Integer, String> analogMappings = new HashMap<>();
analogMappings.put(1, "A1"); // 信号接收功率(dBm)
analogMappings.put(2, "A2"); // 信噪比
analogMappings.put(3, "A3"); // 小区ID
analogMappings.put(4, "A4"); // 倾斜角度(°)
analogMappings.put(5, "A5"); // 电池电压(V)
analogMappings.put(6, "A6"); // 投入式液位(m)
analogMappings.put(8, "A8"); // 雷达水位(m)
analogMappings.put(10, "A10"); // 综合液位(m)
processSensorData(stationId, jsonArray, 4, analogMappings, "analogValues", timePoints);
}
// 电导率仪处理逻辑
private void processConductivityMeterData(JSONObject jsonObject, JSONArray jsonArray) {
// System.out.println("开始处理电导率仪数据==============================================");
int dataCount = jsonObject.getIntValue("dataCount");
String startTimeStr = jsonObject.getString("startTime");
int intervalMinutes = jsonObject.getIntValue("intervalMinutes");
String stationId = jsonObject.getString("stationId");
double batteryVoltage = jsonObject.getDouble("batteryVoltage");
// System.out.println("数据个数:" + dataCount);
// System.out.println("开始时间:" + startTimeStr);
// System.out.println("间隔分钟:" + intervalMinutes);
//单独处理电压
String channelName = stationId + "_V";
String mpcode = getMpcodeFromCacheOrDB(stationId, channelName, "V");
if (!mpcode.equals("未配置")) {
mPointService.sendKafka4UDP("",mpcode, batteryVoltage, startTimeStr, 0, 1);
//推送mqtt
JSONArray jsonArray_mqtt = new JSONArray();
JSONObject jsonObject_mqtt = new JSONObject();
jsonObject_mqtt.put("mpcode", mpcode);
jsonObject_mqtt.put("value", batteryVoltage);
jsonObject_mqtt.put("time", startTimeStr);
jsonArray_mqtt.add(jsonObject_mqtt);
mqttConfigService.doSendView(jsonArray_mqtt,CommString.Mqtt_Topic_DATA);
}
// 生成时间点序列
List<String> timePoints = generateTimePoints(startTimeStr, dataCount, intervalMinutes);
// 处理模拟量数据 (type=4)
// System.out.println("\n电导率仪数据:");
Map<Integer, String> analogMappings = new HashMap<>();
analogMappings.put(7, "A7"); // 电导率值
analogMappings.put(8, "A8"); // 温度值
processSensorData(stationId, jsonArray, 4, analogMappings, "analogValues", timePoints);
}
// 雨量计处理逻辑
private void processRainGaugeData(JSONObject jsonObject, JSONArray jsonArray) {
// System.out.println("开始处理雨量计数据==============================================");
int dataCount = jsonObject.getIntValue("dataCount");
String startTimeStr = jsonObject.getString("startTime");
int intervalMinutes = jsonObject.getIntValue("intervalMinutes");
String stationId = jsonObject.getString("stationId");
// System.out.println("数据个数:" + dataCount);
// System.out.println("开始时间:" + startTimeStr);
// System.out.println("间隔分钟:" + intervalMinutes);
// 生成时间点序列
List<String> timePoints = generateTimePoints(startTimeStr, dataCount, intervalMinutes);
// 处理模拟量数据 (type=4)
// System.out.println("\n雨量计数据:");
Map<Integer, String> analogMappings = new HashMap<>();
analogMappings.put(1, "A1"); // 雨量值
processSensorData(stationId, jsonArray, 4, analogMappings, "analogValues", timePoints);
}
// 内涝监测处理逻辑
private void processWaterloggingMonitoringData(JSONObject jsonObject, JSONArray jsonArray) {
// System.out.println("开始处理内涝监测数据==============================================");
int dataCount = jsonObject.getIntValue("dataCount");
String startTimeStr = jsonObject.getString("startTime");
int intervalMinutes = jsonObject.getIntValue("intervalMinutes");
String stationId = jsonObject.getString("stationId");
// System.out.println("数据个数:" + dataCount);
// System.out.println("开始时间:" + startTimeStr);
// System.out.println("间隔分钟:" + intervalMinutes);
// 生成时间点序列
List<String> timePoints = generateTimePoints(startTimeStr, dataCount, intervalMinutes);
// 处理模拟量数据 (type=4)
// System.out.println("\n水位数据:");
Map<Integer, String> analogMappings = new HashMap<>();
analogMappings.put(13, "A13"); // 水位值(0x0D)
// analogMappings.put(14, "A14"); // 状态值(0x0D)
processSensorData(stationId, jsonArray, 4, analogMappings, "analogValues", timePoints);
// 处理脉冲量数据 (type=1)
// System.out.println("\n状态数据:");
Map<Integer, String> pulseMappings = new HashMap<>();
pulseMappings.put(14, "P14"); // 状态值(0x0E)
processSensorData(stationId, jsonArray, 1, pulseMappings, "accumulatedFlows", timePoints);
}
private DeviceData parseDeviceData(ByteBuf buf) {
// 查找起始符位置
int startIndex = -1;
for (int i = 0; i < buf.readableBytes() - 1; i++) {
if (buf.getByte(i) == (byte) 0xAB && buf.getByte(i + 1) == (byte) 0xCD) {
startIndex = i;
break;
}
}
if (startIndex == -1) {
// 没找到起始符,丢弃所有数据
buf.skipBytes(buf.readableBytes());
return null;
}
// 跳过起始符前的所有字节
buf.skipBytes(startIndex);
// 标记当前位置
buf.markReaderIndex();
// 验证起始符
byte b1 = buf.readByte();
byte b2 = buf.readByte();
if (b1 != (byte) 0xAB || b2 != (byte) 0xCD) {
buf.resetReaderIndex();
buf.skipBytes(1);
return null;
}
// 确保有足够数据读取长度
if (buf.readableBytes() < 2) {
buf.resetReaderIndex();
return null;
}
// 读取长度
int dataLength = buf.readUnsignedShort();
// 计算完整包长度
int totalLength = 2 + 2 + dataLength + 2; // AB CD + len + data + 结束符
// 确保有完整数据包
if (buf.readableBytes() < dataLength + 2) {
buf.resetReaderIndex();
return null;
}
// 读取完整数据包
ByteBuf frame = buf.readRetainedSlice(dataLength + 2);
DeviceData data = new DeviceData();
// 1. 基础信息解析
data.setStationId(frame.readUnsignedShort());
byte[] timeBytes = new byte[5];
frame.readBytes(timeBytes);
data.setStartTime(parseBcdTime(timeBytes));
data.setDataCount(frame.readUnsignedByte());
data.setIntervalMinutes(frame.readUnsignedByte());
data.setBatteryVoltage(frame.readUnsignedShort() / 100.0f);
data.setUploadCount(frame.readUnsignedShort());
//设备信息处理
int infoLength = frame.readUnsignedByte();
// 读取实际设备信息固定20字节
byte[] deviceInfo = new byte[20];
frame.readBytes(deviceInfo);
data.setDeviceInfo(new String(deviceInfo, StandardCharsets.US_ASCII).trim());
int bytesToSkip = 8; // 跳过预留的8字节
frame.skipBytes(bytesToSkip);
//解析传感器数据
List<SensorData> sensorDataList = new ArrayList<>();
while (frame.readableBytes() >= 2) {
int type = frame.readUnsignedByte();
int channel = frame.readUnsignedByte();
// System.out.printf("Parsing sensor: type=%02X, channel=%02X at pos=%d%n", type, channel, frame.readerIndex() - 2);
SensorData sensor = new SensorData();
sensor.setType(type);
sensor.setChannel(channel);
try {
switch (type) {
case 0x01: // 脉冲量
long yesterdayFlow = frame.readUnsignedIntLE();
sensor.setYesterdayFlow(yesterdayFlow);
List<Long> flows = new ArrayList<>();
for (int j = 0; j < data.getDataCount(); j++) {
if (frame.readableBytes() < 4) {
break;
}
flows.add(frame.readUnsignedIntLE());
}
sensor.setAccumulatedFlows(flows);
break;
case 0x02: // 开关量/报警量
// 每个通道只读取1字节状态值
if (frame.readableBytes() >= 1) {
int status = frame.readUnsignedByte();
List<Integer> statusList = new ArrayList<>();
statusList.add(status);
sensor.setDescription("开关量状态");
}
break;
case 0x41: // 不知道啥类型
// 不知道啥类型
if (frame.readableBytes() >= 1) {
}
break;
case 0x42: // 不知道啥类型
// 不知道啥类型
if (frame.readableBytes() >= 1) {
}
break;
case 0x43: // 不知道啥类型
// 不知道啥类型
if (frame.readableBytes() >= 1) {
}
break;
case 0x04: // 模拟量
List<Float> analogs = new ArrayList<>();
for (int j = 0; j < data.getDataCount(); j++) {
if (frame.readableBytes() < 4) {
break;
}
float value = 0.0f;
try {
// 智能井盖
if (String.valueOf(data.getStationId()).charAt(1) == '1') {
// 尝试不同的解析方式,根据通道类型选择合适的解析方法
switch (channel) {
case 1: // A1 信号强度 (dBm)
byte signalByte = frame.readByte();
frame.skipBytes(3); // 跳过剩余3字节
value = (float) signalByte;
break;
case 2: // A2 基站序号
value = (float) frame.readUnsignedIntLE();
break;
case 3: // A3 井盖角度
int angleInt = frame.readInt(); // 使用readInt()读取大端序
value = Float.intBitsToFloat(angleInt);
// 验证是否为合理的角度值
if (value < 0 || value > 360) {
frame.resetReaderIndex();
frame.skipBytes(j * 4);
int angleFixed = frame.readUnsignedShort();
frame.skipBytes(2);
value = (float) angleFixed / 100.0f;
}
break;
case 4: // A4 电阻值1
case 5: // A5 电阻值2
case 7: //
case 8: //
int resistanceInt1 = frame.readInt(); // 使用readInt()读取大端序
value = Float.intBitsToFloat(resistanceInt1);
break;
default: // 其他模拟量,默认使用浮点数解析
int intValue = frame.readIntLE();
value = Float.intBitsToFloat(intValue);
break;
}
}
// 流量计
if (String.valueOf(data.getStationId()).charAt(1) == '2') {
// 统一使用大端序读取IEEE754浮点数
int intValue = frame.readInt(); // 改为readInt()而不是readIntLE()
value = Float.intBitsToFloat(intValue);
// 保留3位小数
value = (float) (Math.round(value * 1000) / 1000.0);
// 格式化为3位小数不足补零
String formattedValue = String.format("%.3f", value); // 例如 7.4 → "7.400"
value = Float.parseFloat(formattedValue); // 转回float可选
}
// 液位计
if (String.valueOf(data.getStationId()).charAt(1) == '3') {
// 统一使用大端序读取IEEE754浮点数
int intValue = frame.readInt(); // 改为readInt()而不是readIntLE()
value = Float.intBitsToFloat(intValue);
// 保留3位小数
value = (float) (Math.round(value * 1000) / 1000.0);
// 格式化为3位小数不足补零
String formattedValue = String.format("%.3f", value); // 例如 7.4 → "7.400"
value = Float.parseFloat(formattedValue); // 转回float可选
}
// 电导率
/*if (String.valueOf(data.getStationId()).charAt(1) == '4') {
int intValue = frame.readIntLE();
value = Float.intBitsToFloat(intValue);
// 保留2位小数
value = (float) (Math.round(value * 100) / 100.0);
}*/
// 电导率
if (String.valueOf(data.getStationId()).charAt(1) == '4') {
// 改为使用大端序读取
int intValue = frame.readInt(); // 改为readInt()而不是readIntLE()
value = Float.intBitsToFloat(intValue);
// 保留2位小数
value = (float) (Math.round(value * 100) / 100.0);
}
// 雨量计
if (String.valueOf(data.getStationId()).charAt(1) == '5') {
// 改为使用大端序读取
int intValue = frame.readInt(); // 改为readInt()而不是readIntLE()
value = Float.intBitsToFloat(intValue);
// 保留2位小数
value = (float) (Math.round(value * 100) / 100.0);
}
// 内涝监测设备
if (String.valueOf(data.getStationId()).charAt(1) == '6') {
// 改为使用大端序读取
int intValue = frame.readInt(); // 改为readInt()而不是readIntLE()
value = Float.intBitsToFloat(intValue);
// 保留2位小数
value = (float) (Math.round(value * 100) / 100.0);
}
// 过滤异常值
if (Float.isNaN(value) || Float.isInfinite(value) || value < -1000000.0f || value > 1000000.0f) {
value = 0.0f;
}
} catch (Exception e) {
frame.skipBytes(4);
value = 0.0f;
}
analogs.add(value);
}
sensor.setAnalogValues(analogs);
// 智能井盖特定通道处理
if (String.valueOf(data.getStationId()).charAt(1) == '1') {
switch (channel) {
case 1: // A1信号强度
sensor.setDescription("A1信号强度");
break;
case 2: // A2基站序号
sensor.setDescription("A2基站序号");
break;
case 3: // A3井盖角度
sensor.setDescription("A3井盖角度");
break;
case 4: // A4电阻值1
sensor.setDescription("A4电阻值1");
break;
case 5: // A5电阻值2
sensor.setDescription("A5电阻值2");
break;
}
}
break;
case 0x05: // Q通道数据数字量输出
int qValue = frame.readUnsignedShort();
// 校验 qValue 范围0x0000 ~ 0xFFFF
if (qValue < 0 || qValue > 0xFFFF) {
throw new IllegalArgumentException("Invalid Q channel value: " + qValue);
}
// 解析各报警位状态
Map<AlarmType, Boolean> alarmStatus = new EnumMap<>(AlarmType.class);
for (AlarmType alarm : AlarmType.values()) {
alarmStatus.put(alarm, alarm.isTriggered(qValue));
}
// 构建报警描述
StringBuilder alarmDesc = new StringBuilder("Q通道状态: ");
for (AlarmType alarm : AlarmType.values()) {
alarmDesc.append(alarm.getDescription())
.append(alarmStatus.get(alarm) ? "触发" : "正常")
.append(", ");
}
sensor.setDescription(alarmDesc.toString().replaceAll(", $", ""));
// 存储原始报警值(可选)
sensor.setqValue(qValue);
sensor.setAlarmStatus(alarmStatus);
// 记录报警日志
if (alarmStatus.get(AlarmType.COVER_ALARM)) {
LoggerFactory.getLogger(UDPServer.class)
.warn("井盖开盖报警触发!站号: {}, Q值: {}", data.getStationId(), qValue);
}
break;
default:
int bytesToSkip2 = data.getDataCount() * 4;
if (frame.readableBytes() >= bytesToSkip2) {
frame.skipBytes(bytesToSkip2);
} else {
// System.err.printf("Not enough readable bytes to skip %d bytes. Current readable bytes: %d%n",
// bytesToSkip2, frame.readableBytes());
// 可选择抛出异常、记录日志或者做其他处理
}
continue;
}
sensorDataList.add(sensor);
} catch (Exception e) {
System.err.println("Error parsing sensor");
e.printStackTrace();
break;
}
// 检查是否到达结束符
if (frame.readableBytes() >= 2 && frame.getByte(frame.readerIndex()) == 0x0D && frame.getByte(frame.readerIndex() + 1) == 0x0A) {
// 跳过结束符
frame.skipBytes(2);
break;
}
}
data.setSensorDataList(sensorDataList);
return data;
}
private String parseBcdTime(byte[] bytes) {
// bytes: 19 05 29 10 47
int year = 2000 + ((bytes[0] >> 4) & 0xF) * 10 + (bytes[0] & 0xF);
int month = ((bytes[1] >> 4) & 0xF) * 10 + (bytes[1] & 0xF);
int day = ((bytes[2] >> 4) & 0xF) * 10 + (bytes[2] & 0xF);
int hour = ((bytes[3] >> 4) & 0xF) * 10 + (bytes[3] & 0xF);
int minute = ((bytes[4] >> 4) & 0xF) * 10 + (bytes[4] & 0xF);
return String.format("%04d-%02d-%02d %02d:%02d", year, month, day, hour, minute);
}
private static String bytesToHex(byte[] bytes) {
StringBuilder sb = new StringBuilder();
for (byte b : bytes) {
sb.append(String.format("%02X ", b));
}
return sb.toString();
}
/**
* 通用传感器数据处理方法
*
* @param stationId 站点id 用于redis组合key
* @param jsonArray 传感器数据数组
* @param type 要处理的数据类型
* @param channelMappings 通道映射
* @param valueFieldName 值字段名
* @param timePoints 时间点列表
*/
private void processSensorData(String stationId, JSONArray jsonArray, int type,
Map<Integer, String> channelMappings,
String valueFieldName,
List<String> timePoints) {
try {
for (Object obj : jsonArray) {
JSONObject sensorData = (JSONObject) obj;
if (sensorData.getIntValue("type") == type) {
int channel = sensorData.getIntValue("channel");
if (channelMappings.containsKey(channel)) {
String channelName = stationId + "_" + channelMappings.get(channel);
JSONArray values = sensorData.getJSONArray(valueFieldName);
if (values != null && !values.isEmpty()) {
//获取mpcode (优先从缓存拿)
String mpcode = getMpcodeFromCacheOrDB(stationId, channelName, channelMappings.get(channel));
if (!mpcode.equals("未配置")) {
JSONObject dataCache = new JSONObject();
JSONArray jsonArray_mqtt = new JSONArray();
for (int i = 0; i < values.size(); i++) {
String timePoint = timePoints.get(i);
timePoint = timePoint.length() == 16 ? timePoint + ":00" : timePoint;
Object value = values.get(i);
dataCache.put(timePoint, value);
//推送至kafka
mPointService.sendKafka4UDP("",mpcode, value, timePoint, i, values.size());
//添加到mqtt数组
JSONObject jsonObject_mqtt = new JSONObject();
jsonObject_mqtt.put("mpcode", mpcode);
jsonObject_mqtt.put("value", value);
jsonObject_mqtt.put("time", timePoint);
jsonArray_mqtt.add(jsonObject_mqtt);
}
//批量推送mqtt
mqttConfigService.doSendView(jsonArray_mqtt, CommString.Mqtt_Topic_DATA);
} else {
System.out.println("站点:" + stationId + " 标识:" + channelMappings.get(channel) + " 未配置");
}
}
}
}
}
} catch (JSONException e) {
e.printStackTrace();
}
}
// 从缓存或数据库获取mpcode
private String getMpcodeFromCacheOrDB(String stationId, String channelName, String tag) {
// 先查缓存
String mpcode = dataCache.get(channelName);
if (mpcode != null) {
return mpcode;
}
// 缓存没有则查询数据库
List<PipelineEquipment> equipmentList = pipelineEquipmentService.selectListByWhere(
"where station_code = '" + stationId + "'");
if (equipmentList != null && !equipmentList.isEmpty()) {
List<PipelineEquipmentMpoint> mpointList = pipelineEquipmentMpointService.selectListByWhere(
"where pid = '" + equipmentList.get(0).getId() + "' and tag = '" + tag + "'");
if (mpointList != null && !mpointList.isEmpty()) {
mpcode = mpointList.get(0).getMpcode();
// 存入缓存
dataCache.put(channelName, mpcode);
return mpcode;
}
}
return "未配置"; // 或根据业务需求返回默认值
}
//时间生成方法
private List<String> generateTimePoints(String startTimeStr, int count, int intervalMinutes) {
List<String> timePoints = new ArrayList<>();
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
Date date = sdf.parse(startTimeStr);
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
for (int i = 0; i < count; i++) {
timePoints.add(sdf.format(calendar.getTime()));
calendar.add(Calendar.MINUTE, intervalMinutes);
}
} catch (ParseException e) {
e.printStackTrace();
// 如果解析失败,使用简单的时间格式
for (int i = 0; i < count; i++) {
timePoints.add(startTimeStr + " +" + (i * intervalMinutes) + "分钟");
}
}
return timePoints;
}
}