getAsyncMqttHandle
This commit is contained in:
@ -126,34 +126,35 @@ public class MqttServiceImpl implements MqttService {
|
|||||||
String deviceName = deviceEnum == null ? device : deviceEnum.getDeviceName();
|
String deviceName = deviceEnum == null ? device : deviceEnum.getDeviceName();
|
||||||
// 遍历数据条目
|
// 遍历数据条目
|
||||||
for (String key : data.keySet()) {
|
for (String key : data.keySet()) {
|
||||||
System.out.println("key=" + key + " value=" + data.getString(key));
|
|
||||||
String valueStr = data.getString(key);
|
String valueStr = data.getString(key);
|
||||||
|
System.out.println("源 key=" + key + " value=" + data.getString(key));
|
||||||
// 过滤无效值
|
// 过滤无效值
|
||||||
if (!isValidValue(valueStr)) {
|
if (!isValidValue(valueStr)) {
|
||||||
loggger.warn("值无效,跳过: key={} raw={} @{}", key, valueStr, date);
|
loggger.warn("值无效,跳过:源 key={} raw={} @{}", key, valueStr, date);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
if ("SSLL".equals(key) || "LJLL".equals(key)) {
|
||||||
|
// 场外点位(定制化)
|
||||||
|
key = device + "_" + key;
|
||||||
|
}
|
||||||
|
System.out.println("标准 key=" + key + " value=" + data.getString(key));
|
||||||
// 获取点位信息
|
// 获取点位信息
|
||||||
MPoint mPoint = getMPointCacheOrES(bizId, key);
|
MPoint mPoint = getMPointCacheOrES(bizId, key);
|
||||||
if (mPoint == null) {
|
if (mPoint == null) {
|
||||||
boolean inNotFound = notFoundKeys.contains(key);
|
boolean inNotFound = notFoundKeys.contains(key);
|
||||||
loggger.warn("点位不存在,跳过: key={} notFoundCache={} @{}", key, inNotFound, date);
|
loggger.warn("点位不存在,跳过:标准 key={} notFoundCache={} @{}", key, inNotFound, date);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// 处理数值转换、倍率计算、取反操作、推报警信息、发送Kafka消息
|
// 处理数值转换、倍率计算、取反操作、推报警信息、发送Kafka消息
|
||||||
BigDecimal value = handleValueConversion(bizId, valueStr, key, date, vueList);
|
BigDecimal value = handleValueConversion(bizId, valueStr, key, date, vueList);
|
||||||
if (value == null) {
|
if (value == null) {
|
||||||
loggger.warn("数值转换失败,跳过: key={} raw={} @{}", key, valueStr, date);
|
loggger.warn("数值转换失败,跳过:标准 key={} raw={} @{}", key, valueStr, date);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// 处理ES数据
|
// 处理ES数据
|
||||||
MPointES esPoint = buildEsPoint(mPoint, value, date);
|
MPointES esPoint = buildEsPoint(mPoint, value, date);
|
||||||
redisList.add(esPoint);
|
redisList.add(esPoint);
|
||||||
String tableName = "tb_mp_" + key;
|
String tableName = "tb_mp_" + key;
|
||||||
if ("SSLL".equals(key) || "LJLL".equals(key)) {
|
|
||||||
// 场外点位(定制化)
|
|
||||||
tableName = "tb_mp_" + device + "_" + key;
|
|
||||||
}
|
|
||||||
MPointHistory mPointHistory = new MPointHistory();
|
MPointHistory mPointHistory = new MPointHistory();
|
||||||
mPointHistory.setMeasuredt(date);
|
mPointHistory.setMeasuredt(date);
|
||||||
mPointHistory.setParmvalue(value);
|
mPointHistory.setParmvalue(value);
|
||||||
|
|||||||
Reference in New Issue
Block a user