mqtt getAsyncMqttHandle

This commit is contained in:
Timer
2026-03-04 19:28:03 +08:00
parent 95e4de1539
commit 96f454f5d2

View File

@ -103,6 +103,7 @@ public class MqttServiceImpl implements MqttService {
// MPointHistoryService mPointHistoryService = (MPointHistoryService) SpringContextUtil.getBean("mPointHistoryService"); // MPointHistoryService mPointHistoryService = (MPointHistoryService) SpringContextUtil.getBean("mPointHistoryService");
for (int i = 0; i < jsonArray.size(); i++) { for (int i = 0; i < jsonArray.size(); i++) {
JSONObject jsonObject = jsonArray.getJSONObject(i); JSONObject jsonObject = jsonArray.getJSONObject(i);
System.out.println(jsonObject);
// 解析时间戳 // 解析时间戳
String timestamp = jsonObject.getString("timestamp"); String timestamp = jsonObject.getString("timestamp");
String[] times = timestamp.split("\\."); String[] times = timestamp.split("\\.");
@ -120,6 +121,7 @@ public class MqttServiceImpl implements MqttService {
// 遍历数据条目 // 遍历数据条目
for (String key : data.keySet()) { for (String key : data.keySet()) {
System.out.println("key=" + key + " value=" + data.getString(key));
String valueStr = data.getString(key); String valueStr = data.getString(key);
// 过滤无效值 // 过滤无效值
if (!isValidValue(valueStr)) { if (!isValidValue(valueStr)) {
@ -558,12 +560,14 @@ public class MqttServiceImpl implements MqttService {
// 查询缓存是否存在 // 查询缓存是否存在
if (mPointCache.containsKey(key)) { if (mPointCache.containsKey(key)) {
MPoint mp = mPointCache.get(key); MPoint mp = mPointCache.get(key);
// loggger.debug("缓存命中: key={} id={}", key, mp.getId()); System.out.println("缓存命中: key=" + key + " id=" + mp.getId());
loggger.debug("缓存命中: key={} id={}", key, mp.getId());
return mp; return mp;
} }
// 点位不存在 新加的点位需要定时清理缓存的时候才会进入缓存不然频繁去查询es // 点位不存在 新加的点位需要定时清理缓存的时候才会进入缓存不然频繁去查询es
if (notFoundKeys.contains(key)) { if (notFoundKeys.contains(key)) {
// loggger.debug("已标记未找到跳过ES查询: key={}", key); System.out.println("已标记未找到跳过ES查询: key=" + key);
loggger.debug("已标记未找到跳过ES查询: key={}", key);
return null; return null;
} }
// 从ES查询 // 从ES查询
@ -571,11 +575,13 @@ public class MqttServiceImpl implements MqttService {
if (mPoint != null) { if (mPoint != null) {
//添加缓存 //添加缓存
mPointCache.put(key, mPoint); mPointCache.put(key, mPoint);
// loggger.debug("ES查询成功并缓存: key={} id={}", key, mPoint.getId()); System.out.println("ES查询成功并缓存: key=" + key + " id=" + mPoint.getId());
loggger.debug("ES查询成功并缓存: key={} id={}", key, mPoint.getId());
} else { } else {
//没有该点位 //没有该点位
notFoundKeys.add(key); notFoundKeys.add(key);
// loggger.warn("ES未找到点位加入未找到集合: key={}", key); System.out.println("ES未找到点位加入未找到集合: key=" + key);
loggger.warn("ES未找到点位加入未找到集合: key={}", key);
} }
return mPoint; return mPoint;
} }