From 63ed2641eee15fc79cbd687f5b97d00bd6a90367 Mon Sep 17 00:00:00 2001 From: dashixiong Date: Thu, 12 Feb 2026 21:05:11 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=B4=E6=97=B6=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ems/EmsPointMatchController.java | 71 +++ .../ems/EmsSiteConfigController.java | 40 ++ .../ems/EmsSiteMonitorController.java | 31 ++ .../controller/ems/MqttMessageController.java | 139 +++--- ems-admin/src/main/resources/application.yml | 14 + ems-admin/src/main/resources/logback.xml | 4 +- .../common/constant/RedisKeyConstants.java | 6 + .../manager/MqttLifecycleManager.java | 103 +++- .../generator/controller/GenController.java | 45 +- .../xzzn/generator/mapper/GenTableMapper.java | 7 - .../service/GenTableServiceImpl.java | 14 +- .../generator/service/IGenTableService.java | 8 - .../mapper/generator/GenTableMapper.xml | 6 +- ems-system/pom.xml | 8 +- .../com/xzzn/ems/domain/EmsSiteSetting.java | 14 + .../xzzn/ems/domain/vo/PointNameRequest.java | 9 + .../ems/mapper/EmsPointEnumMatchMapper.java | 8 + .../xzzn/ems/mapper/EmsPointMatchMapper.java | 10 + .../ems/service/IEmsDeviceSettingService.java | 15 + .../ems/service/IEmsPointMatchService.java | 49 ++ .../com/xzzn/ems/service/IEmsSiteService.java | 4 + .../ems/service/IGeneralQueryService.java | 2 +- .../impl/DeviceDataProcessServiceImpl.java | 260 +++++++++- .../impl/EmsDeviceSettingServiceImpl.java | 457 ++++++++++++++++++ .../impl/EmsPointMatchServiceImpl.java | 118 +++++ .../ems/service/impl/EmsSiteServiceImpl.java | 64 ++- .../service/impl/GeneralQueryServiceImpl.java | 169 +++++-- .../mapper/ems/EmsPointEnumMatchMapper.xml | 45 +- .../mapper/ems/EmsPointMatchMapper.xml | 72 ++- 29 files changed, 1559 insertions(+), 233 deletions(-) diff --git a/ems-admin/src/main/java/com/xzzn/web/controller/ems/EmsPointMatchController.java b/ems-admin/src/main/java/com/xzzn/web/controller/ems/EmsPointMatchController.java index 8441650..fd8e697 100644 --- a/ems-admin/src/main/java/com/xzzn/web/controller/ems/EmsPointMatchController.java +++ b/ems-admin/src/main/java/com/xzzn/web/controller/ems/EmsPointMatchController.java @@ -7,8 +7,12 @@ import javax.validation.Valid; import org.apache.commons.collections4.CollectionUtils; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.DeleteMapping; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.PutMapping; import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.xzzn.common.annotation.Log; @@ -19,6 +23,7 @@ import com.xzzn.ems.domain.EmsPointMatch; import com.xzzn.ems.domain.vo.DevicePointMatchExportVo; import com.xzzn.ems.domain.vo.DevicePointMatchVo; import com.xzzn.ems.domain.vo.ImportPointDataRequest; +import com.xzzn.ems.domain.vo.ImportPointTemplateRequest; import com.xzzn.ems.service.IEmsPointMatchService; import com.xzzn.common.utils.poi.ExcelUtil; import org.springframework.web.multipart.MultipartFile; @@ -49,6 +54,58 @@ public class EmsPointMatchController extends BaseController util.exportExcel(response, list, "点位匹配数据"); } + /** + * 查询点位配置列表 + */ + @GetMapping("/list") + public com.xzzn.common.core.page.TableDataInfo list(EmsPointMatch emsPointMatch) + { + startPage(); + List list = emsPointMatchService.selectPointMatchConfigList(emsPointMatch); + return getDataTable(list); + } + + /** + * 查询点位配置详情 + */ + @GetMapping(value = "/{id}") + public AjaxResult getInfo(@PathVariable("id") Long id) + { + return success(emsPointMatchService.selectPointMatchById(id)); + } + + /** + * 新增点位配置 + */ + @Log(title = "点位配置", businessType = BusinessType.INSERT) + @PostMapping + public AjaxResult add(@RequestBody EmsPointMatch emsPointMatch) + { + emsPointMatch.setCreateBy(getUsername()); + return toAjax(emsPointMatchService.insertPointMatch(emsPointMatch)); + } + + /** + * 修改点位配置 + */ + @Log(title = "点位配置", businessType = BusinessType.UPDATE) + @PutMapping + public AjaxResult edit(@RequestBody EmsPointMatch emsPointMatch) + { + emsPointMatch.setUpdateBy(getUsername()); + return toAjax(emsPointMatchService.updatePointMatch(emsPointMatch)); + } + + /** + * 删除点位配置 + */ + @Log(title = "点位配置", businessType = BusinessType.DELETE) + @DeleteMapping("/{ids}") + public AjaxResult remove(@PathVariable Long[] ids) + { + return toAjax(emsPointMatchService.deletePointMatchByIds(ids)); + } + /** * 上传点位清单 * @param file @@ -85,4 +142,18 @@ public class EmsPointMatchController extends BaseController } } + /** + * 根据站点导入模板点位配置 + * @param request 请求参数 + * @return 导入结果 + */ + @PreAuthorize("@ss.hasPermi('system:user:import')") + @Log(title = "点位配置", businessType = BusinessType.IMPORT) + @PostMapping("/importTemplateBySite") + public AjaxResult importTemplateBySite(@Valid @RequestBody ImportPointTemplateRequest request) + { + String message = emsPointMatchService.importTemplateBySite(request, getUsername()); + return success(message); + } + } diff --git a/ems-admin/src/main/java/com/xzzn/web/controller/ems/EmsSiteConfigController.java b/ems-admin/src/main/java/com/xzzn/web/controller/ems/EmsSiteConfigController.java index aec2ad4..bc24160 100644 --- a/ems-admin/src/main/java/com/xzzn/web/controller/ems/EmsSiteConfigController.java +++ b/ems-admin/src/main/java/com/xzzn/web/controller/ems/EmsSiteConfigController.java @@ -13,6 +13,7 @@ import com.xzzn.ems.domain.vo.DeviceUpdateRequest; import com.xzzn.ems.domain.vo.DevicesSettingVo; import com.xzzn.ems.domain.vo.PointDataRequest; import com.xzzn.ems.domain.vo.PointQueryResponse; +import com.xzzn.ems.domain.vo.SiteMonitorProjectPointMappingSaveRequest; import com.xzzn.ems.domain.vo.SiteDeviceListVo; import com.xzzn.ems.service.IEmsDeviceSettingService; import com.xzzn.ems.service.IEmsSiteService; @@ -59,6 +60,26 @@ public class EmsSiteConfigController extends BaseController{ return getDataTable(list); } + /** + * 新增站点 + */ + @PostMapping("/addSite") + public AjaxResult addSite(@RequestBody EmsSiteSetting emsSiteSetting) + { + emsSiteSetting.setCreateBy(getUsername()); + return toAjax(iEmsSiteService.addSite(emsSiteSetting)); + } + + /** + * 编辑站点 + */ + @PostMapping("/updateSite") + public AjaxResult updateSite(@RequestBody EmsSiteSetting emsSiteSetting) + { + emsSiteSetting.setUpdateBy(getUsername()); + return toAjax(iEmsSiteService.updateSite(emsSiteSetting)); + } + /** * 获取设备列表-分页 */ @@ -192,6 +213,25 @@ public class EmsSiteConfigController extends BaseController{ return success(iEmsDeviceSettingService.getDeviceListBySiteAndCategory(siteId, deviceCategory)); } + /** + * 获取单站监控项目点位映射 + */ + @GetMapping("/getSingleMonitorProjectPointMapping") + public AjaxResult getSingleMonitorProjectPointMapping(@RequestParam String siteId) + { + return success(iEmsDeviceSettingService.getSiteMonitorProjectPointMapping(siteId)); + } + + /** + * 保存单站监控项目点位映射 + */ + @PostMapping("/saveSingleMonitorProjectPointMapping") + public AjaxResult saveSingleMonitorProjectPointMapping(@RequestBody SiteMonitorProjectPointMappingSaveRequest request) + { + int rows = iEmsDeviceSettingService.saveSiteMonitorProjectPointMapping(request, getUsername()); + return AjaxResult.success(rows); + } + /** * PCS设备开关机 */ diff --git a/ems-admin/src/main/java/com/xzzn/web/controller/ems/EmsSiteMonitorController.java b/ems-admin/src/main/java/com/xzzn/web/controller/ems/EmsSiteMonitorController.java index 3e959f5..8796797 100644 --- a/ems-admin/src/main/java/com/xzzn/web/controller/ems/EmsSiteMonitorController.java +++ b/ems-admin/src/main/java/com/xzzn/web/controller/ems/EmsSiteMonitorController.java @@ -9,6 +9,8 @@ import com.xzzn.ems.domain.vo.BatteryDataStatsListVo; import com.xzzn.ems.domain.vo.DateSearchRequest; import com.xzzn.ems.domain.vo.RunningGraphRequest; import com.xzzn.ems.domain.vo.SiteBatteryDataList; +import com.xzzn.ems.domain.vo.SiteMonitorDataSaveRequest; +import com.xzzn.ems.service.IEmsDeviceSettingService; import com.xzzn.ems.service.IEmsSiteService; import com.xzzn.ems.service.IEmsStatsReportService; import com.xzzn.ems.service.ISingleSiteService; @@ -33,6 +35,8 @@ public class EmsSiteMonitorController extends BaseController{ private IEmsSiteService iEmsSiteService; @Autowired private IEmsStatsReportService iemsStatsReportService; + @Autowired + private IEmsDeviceSettingService iEmsDeviceSettingService; /** * 获取单站首页数据 @@ -228,4 +232,31 @@ public class EmsSiteMonitorController extends BaseController{ return error("缺少必传项"); } } + + /** + * 单站监控项目点位配置查询 + */ + @GetMapping("/getProjectPointMapping") + public AjaxResult getProjectPointMapping(@RequestParam String siteId) + { + return success(iEmsDeviceSettingService.getSiteMonitorProjectPointMapping(siteId)); + } + + /** + * 单站监控项目展示数据查询(配置字段 + 字段值) + */ + @GetMapping("/getProjectDisplayData") + public AjaxResult getProjectDisplayData(@RequestParam String siteId) + { + return success(iEmsDeviceSettingService.getSiteMonitorProjectDisplay(siteId)); + } + + /** + * 单站监控项目展示数据写入 + */ + @PostMapping("/saveProjectDisplayData") + public AjaxResult saveProjectDisplayData(@RequestBody SiteMonitorDataSaveRequest request) + { + return AjaxResult.success(iEmsDeviceSettingService.saveSiteMonitorProjectData(request, getUsername())); + } } diff --git a/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java b/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java index ff74aa9..47ed489 100644 --- a/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java +++ b/ems-admin/src/main/java/com/xzzn/web/controller/ems/MqttMessageController.java @@ -8,7 +8,6 @@ import com.xzzn.ems.domain.EmsMqttTopicConfig; import com.xzzn.ems.mapper.EmsMqttTopicConfigMapper; import com.xzzn.ems.service.IDDSDataProcessService; import com.xzzn.ems.service.IDeviceDataProcessService; -import com.xzzn.ems.service.IEmsMqttMessageService; import com.xzzn.ems.service.IEmsStrategyService; import com.xzzn.ems.service.IFXXAlarmDataProcessService; import com.xzzn.ems.service.IFXXDataProcessService; @@ -27,6 +26,7 @@ import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -38,9 +38,6 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber { private final MqttLifecycleManager mqttLifecycleManager; - @Autowired - private IEmsMqttMessageService emsMqttMessageService; - @Autowired private IFXXDataProcessService fXXDataProcessService; @@ -61,6 +58,8 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber { @Autowired private RedisCache redisCache; + @Autowired + private ThreadPoolTaskExecutor threadPoolTaskExecutor; @Autowired public MqttMessageController(MqttLifecycleManager mqttLifecycleManager) { @@ -136,51 +135,32 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber { private void handleSystemStatus(String topic, MqttMessage message) { String payload = new String(message.getPayload()); System.out.println("[SYSTEM] Status update: " + payload); - - try { - emsMqttMessageService.insertMqttOriginalMessage(topic,payload); - - } catch (Exception e) { - log.error("Failed to process system status message: " + e.getMessage(), e); - } } // 处理设备数据 private void handleDeviceData(String topic, MqttMessage message) { String payload = new String(message.getPayload()); - log.error("[DEVICE] data: " + payload); - try { - // 业务处理逻辑 -// if (topic.startsWith("021_DDS")) { -// dDSDataProcessService.handleDdsData(payload); -// } else if (topic.startsWith("021_FXX")) { -// fXXDataProcessService.handleFxData(payload); -// } - deviceDataProcessService.handleDeviceData(payload, getSiteIdByTopic(topic)); - - emsMqttMessageService.insertMqttOriginalMessage(topic, payload); - } catch (Exception e) { - log.error("Failed to process device data message: " + e.getMessage(), e); - } + threadPoolTaskExecutor.execute(() -> { + log.debug("[DEVICE] data: {}", payload); + try { + deviceDataProcessService.handleDeviceData(payload, getSiteIdByTopic(topic)); + } catch (Exception e) { + log.error("Failed to process device data message: {}", e.getMessage(), e); + } + }); } // 处理告警数据 private void handleAlarmData(String topic, MqttMessage message) { String payload = new String(message.getPayload()); - System.out.println("[DEVICE] data: " + payload); - try { - // 业务处理逻辑 -// if (topic.startsWith("021_FXX")) { -// fXXAlarmDataProcessService.handleFxAlarmData(payload); -// } - deviceDataProcessService.handleAlarmData(payload, getSiteIdByTopic(topic)); - - emsMqttMessageService.insertMqttOriginalMessage(topic, payload); - } catch (Exception e) { - e.printStackTrace(); - log.error("Failed to process device alarm data message: " + e.getMessage(), e); - } + threadPoolTaskExecutor.execute(() -> { + try { + deviceDataProcessService.handleAlarmData(payload, getSiteIdByTopic(topic)); + } catch (Exception e) { + log.error("Failed to process device alarm data message: {}", e.getMessage(), e); + } + }); } @@ -191,78 +171,67 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber { siteId = topicConfig.getSiteId(); redisCache.setCacheObject(RedisKeyConstants.SITE_ID + topic, siteId); } - log.info("当前处理数据站点:" + siteId + ",topic: " + topic); return siteId; } // 处理运行策略数据:云端-本地 private void handleStrategyData(String topic, MqttMessage message) { String payload = new String(message.getPayload()); - System.out.println("[处理运行策略数据] data: " + payload); - try { - // 业务处理逻辑 - iMqttSyncLogService.handleMqttStrategyData(payload); - - emsMqttMessageService.insertMqttOriginalMessage(topic,payload); - } catch (Exception e) { - log.error("Failed to process strategy data message: " + e.getMessage(), e); - } + threadPoolTaskExecutor.execute(() -> { + try { + iMqttSyncLogService.handleMqttStrategyData(payload); + } catch (Exception e) { + log.error("Failed to process strategy data message: {}", e.getMessage(), e); + } + }); } // 处理设备保护告警策略数据:云端-本地 private void handleFaultProtPlanData(String topic, MqttMessage message) { String payload = new String(message.getPayload()); - System.out.println("[处理设备保护告警策略数据] data: " + payload); - try { - // 业务处理逻辑 - iMqttSyncLogService.handleMqttPlanData(payload); - - emsMqttMessageService.insertMqttOriginalMessage(topic,payload); - } catch (Exception e) { - log.error("Failed to process fault plan data message: " + e.getMessage(), e); - } + threadPoolTaskExecutor.execute(() -> { + try { + iMqttSyncLogService.handleMqttPlanData(payload); + } catch (Exception e) { + log.error("Failed to process fault plan data message: {}", e.getMessage(), e); + } + }); } // 处理保护策略告警信息:本地-云端 private void handleFaultAlarmData(String topic, MqttMessage message) { String payload = new String(message.getPayload()); - System.out.println("[处理本地保护策略告警信息到云端] data: " + payload); - try { - // 业务处理逻辑 - iMqttSyncLogService.handleFaultAlarmData(payload); - - emsMqttMessageService.insertMqttOriginalMessage(topic,payload); - } catch (Exception e) { - log.error("Failed to process fault plan alarm data message: " + e.getMessage(), e); - } + threadPoolTaskExecutor.execute(() -> { + try { + iMqttSyncLogService.handleFaultAlarmData(payload); + } catch (Exception e) { + log.error("Failed to process fault plan alarm data message: {}", e.getMessage(), e); + } + }); } // 处理保护策略下发日志:本地-云端 private void handleFaultPlanIssueData(String topic, MqttMessage message) { String payload = new String(message.getPayload()); - System.out.println("[处理本地保护策略下发日志到云端] data: " + payload); - try { - // 业务处理逻辑 - iMqttSyncLogService.handleFaultPlanIssueData(payload); - - emsMqttMessageService.insertMqttOriginalMessage(topic,payload); - } catch (Exception e) { - log.error("Failed to process fault plan issue log message: " + e.getMessage(), e); - } + threadPoolTaskExecutor.execute(() -> { + try { + iMqttSyncLogService.handleFaultPlanIssueData(payload); + } catch (Exception e) { + log.error("Failed to process fault plan issue log message: {}", e.getMessage(), e); + } + }); } // 处理设备状态变更日志:本地-云端 private void handleDeviceChangeLogData(String topic, MqttMessage message) { String payload = new String(message.getPayload()); - System.out.println("[处理本地的保护策略告警信息到云端] data: " + payload); - try { - // 业务处理逻辑 - iMqttSyncLogService.handleDeviceChangeLogData(payload); - - emsMqttMessageService.insertMqttOriginalMessage(topic,payload); - } catch (Exception e) { - log.error("Failed to process device change log message: " + e.getMessage(), e); - } + threadPoolTaskExecutor.execute(() -> { + try { + iMqttSyncLogService.handleDeviceChangeLogData(payload); + } catch (Exception e) { + log.error("Failed to process device change log message: {}", e.getMessage(), e); + } + }); } @Override @@ -292,4 +261,4 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber { } } -} \ No newline at end of file +} diff --git a/ems-admin/src/main/resources/application.yml b/ems-admin/src/main/resources/application.yml index cf6c040..46df20a 100644 --- a/ems-admin/src/main/resources/application.yml +++ b/ems-admin/src/main/resources/application.yml @@ -199,6 +199,20 @@ mqtt: topic: siteId: +influxdb: + enabled: true + url: http://122.51.194.184:8086/ + api-token: F2XcmBzZsWcz90ikU2_t7UXY2fzWuf2ruVp1BkusNkIS_gwrQZuiaIjl33XQMQajm7vSI6TQSRnpPSx5CXThlA== + write-method: POST + read-method: GET + write-path: /api/v2/write + query-path: /query + org: ems + bucket: point_data + database: ems_point_data + retention-policy: autogen + measurement: mqtt_point_data + modbus: pool: max-total: 20 diff --git a/ems-admin/src/main/resources/logback.xml b/ems-admin/src/main/resources/logback.xml index 8f26067..96d503c 100644 --- a/ems-admin/src/main/resources/logback.xml +++ b/ems-admin/src/main/resources/logback.xml @@ -1,7 +1,7 @@ - + @@ -90,4 +90,4 @@ - \ No newline at end of file + diff --git a/ems-common/src/main/java/com/xzzn/common/constant/RedisKeyConstants.java b/ems-common/src/main/java/com/xzzn/common/constant/RedisKeyConstants.java index 82c0a75..c627bb4 100644 --- a/ems-common/src/main/java/com/xzzn/common/constant/RedisKeyConstants.java +++ b/ems-common/src/main/java/com/xzzn/common/constant/RedisKeyConstants.java @@ -122,4 +122,10 @@ public class RedisKeyConstants /** 每个设备最新数据-设置失效时间-判断是否正常同步数据 */ public static final String SYNC_DATA_ALARM = "SYNC_DATA_ALARM_"; + + /** 点位配置缓存(按站点+设备) */ + public static final String POINT_CONFIG_DEVICE = "POINT_CONFIG_DEVICE_"; + + /** 单站监控最新数据(按站点+模块) */ + public static final String SITE_MONITOR_LATEST = "SITE_MONITOR_LATEST_"; } diff --git a/ems-framework/src/main/java/com/xzzn/framework/manager/MqttLifecycleManager.java b/ems-framework/src/main/java/com/xzzn/framework/manager/MqttLifecycleManager.java index b4c05ce..bdc0b45 100644 --- a/ems-framework/src/main/java/com/xzzn/framework/manager/MqttLifecycleManager.java +++ b/ems-framework/src/main/java/com/xzzn/framework/manager/MqttLifecycleManager.java @@ -3,21 +3,37 @@ package com.xzzn.framework.manager; import com.xzzn.ems.service.IEmsAlarmRecordsService; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.context.SmartLifecycle; import org.springframework.stereotype.Component; -import java.util.HashSet; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; @Component -public class MqttLifecycleManager implements ApplicationRunner, SmartLifecycle, MqttCallback { +public class MqttLifecycleManager implements ApplicationRunner, SmartLifecycle, MqttCallbackExtended { + + private static final Logger log = LoggerFactory.getLogger(MqttLifecycleManager.class); + private static final long RECONNECT_DELAY_SECONDS = 5; private final MqttConnectOptions connectOptions; private final IEmsAlarmRecordsService iEmsAlarmRecordsService; + private final ScheduledExecutorService reconnectExecutor = Executors.newSingleThreadScheduledExecutor(r -> { + Thread thread = new Thread(r); + thread.setName("mqtt-reconnect"); + thread.setDaemon(true); + return thread; + }); + private final AtomicBoolean reconnectScheduled = new AtomicBoolean(false); + private volatile ScheduledFuture reconnectFuture; private MqttClient mqttClient; private volatile boolean running = false; @@ -41,7 +57,9 @@ public class MqttLifecycleManager implements ApplicationRunner, SmartLifecycle, if (running) return; try { - String clientId = connectOptions.getUserName() + "-" + System.currentTimeMillis(); + String prefix = connectOptions.getUserName() == null || connectOptions.getUserName().isEmpty() + ? "mqtt-client" : connectOptions.getUserName(); + String clientId = prefix + "-" + System.currentTimeMillis(); mqttClient = new MqttClient( connectOptions.getServerURIs()[0], clientId, @@ -51,27 +69,28 @@ public class MqttLifecycleManager implements ApplicationRunner, SmartLifecycle, mqttClient.setCallback(this); mqttClient.connect(connectOptions); - // 重连后自动重新订阅 resubscribeAll(); - running = true; - System.out.println("MQTT client connected to: " + connectOptions.getServerURIs()[0]); + log.info("MQTT client connected to: {}", connectOptions.getServerURIs()[0]); } catch (MqttException e) { - System.err.println("MQTT connection failed: " + e.getMessage()); - // 添加重试逻辑 + running = false; + log.error("MQTT connection failed: {}", e.getMessage(), e); + scheduleReconnect(); } } @Override public void stop() { + cancelReconnectTask(); if (mqttClient != null && mqttClient.isConnected()) { try { mqttClient.disconnect(); mqttClient.close(); } catch (MqttException e) { - System.err.println("Error disconnecting MQTT client: " + e.getMessage()); + log.warn("Error disconnecting MQTT client: {}", e.getMessage(), e); } } + reconnectExecutor.shutdownNow(); running = false; } @@ -83,9 +102,17 @@ public class MqttLifecycleManager implements ApplicationRunner, SmartLifecycle, // MQTT 回调方法 @Override public void connectionLost(Throwable cause) { - System.err.println("MQTT connection lost: " + cause.getMessage()); + log.warn("MQTT connection lost: {}", cause == null ? "unknown" : cause.getMessage(), cause); running = false; - // 自动重连由 MqttConnectOptions 处理 + scheduleReconnect(); + } + + @Override + public void connectComplete(boolean reconnect, String serverURI) { + running = true; + cancelReconnectTask(); + log.info("MQTT connection complete, reconnect: {}, serverURI: {}", reconnect, serverURI); + resubscribeAll(); } @Override @@ -106,14 +133,16 @@ public class MqttLifecycleManager implements ApplicationRunner, SmartLifecycle, try { if (mqttClient != null && mqttClient.isConnected()) { mqttClient.subscribe(topic, qos); + log.info("MQTT subscribe success, topic: {}, qos: {}", topic, qos); + } else { + log.warn("MQTT subscribe deferred, client not connected, topic: {}", topic); } subscriptions.put(topic, new SubscriptionInfo(listener, qos)); } catch (MqttException e) { - System.err.println("Subscribe failed: " + e.getMessage()); - // 订阅失败-增加告警 + log.error("Subscribe failed, topic: {}, err: {}", topic, e.getMessage(), e); iEmsAlarmRecordsService.addSubFailedAlarmRecord(topic); + scheduleReconnect(); } - // 订阅成功了-校验是否存在未处理或者处理中的订阅失败信息 iEmsAlarmRecordsService.checkFailedRecord(topic); } @@ -135,12 +164,52 @@ public class MqttLifecycleManager implements ApplicationRunner, SmartLifecycle, subscriptions.forEach((topic, info) -> { try { mqttClient.subscribe(topic, info.getQos()); + log.info("MQTT resubscribe success, topic: {}, qos: {}", topic, info.getQos()); } catch (MqttException e) { - System.err.println("Resubscribe failed for topic: " + topic); + log.error("Resubscribe failed for topic: {}, err: {}", topic, e.getMessage(), e); } }); } + private void scheduleReconnect() { + if (mqttClient == null || reconnectExecutor.isShutdown()) { + return; + } + if (!reconnectScheduled.compareAndSet(false, true)) { + return; + } + + reconnectFuture = reconnectExecutor.scheduleWithFixedDelay(() -> { + if (mqttClient == null) { + cancelReconnectTask(); + return; + } + if (mqttClient.isConnected()) { + cancelReconnectTask(); + return; + } + try { + log.info("MQTT reconnecting..."); + mqttClient.connect(connectOptions); + running = true; + cancelReconnectTask(); + resubscribeAll(); + log.info("MQTT reconnect success."); + } catch (MqttException e) { + running = false; + log.warn("MQTT reconnect failed: {}", e.getMessage()); + } + }, RECONNECT_DELAY_SECONDS, RECONNECT_DELAY_SECONDS, TimeUnit.SECONDS); + } + + private void cancelReconnectTask() { + reconnectScheduled.set(false); + ScheduledFuture future = reconnectFuture; + if (future != null && !future.isCancelled()) { + future.cancel(false); + } + } + // 订阅信息内部类 private static class SubscriptionInfo { private final IMqttMessageListener listener; @@ -159,4 +228,4 @@ public class MqttLifecycleManager implements ApplicationRunner, SmartLifecycle, return qos; } } -} \ No newline at end of file +} diff --git a/ems-generator/src/main/java/com/xzzn/generator/controller/GenController.java b/ems-generator/src/main/java/com/xzzn/generator/controller/GenController.java index a3c8e2d..248b0b6 100644 --- a/ems-generator/src/main/java/com/xzzn/generator/controller/GenController.java +++ b/ems-generator/src/main/java/com/xzzn/generator/controller/GenController.java @@ -1,7 +1,6 @@ package com.xzzn.generator.controller; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -18,10 +17,6 @@ import org.springframework.web.bind.annotation.PutMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; -import com.alibaba.druid.DbType; -import com.alibaba.druid.sql.SQLUtils; -import com.alibaba.druid.sql.ast.SQLStatement; -import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlCreateTableStatement; import com.xzzn.common.annotation.Log; import com.xzzn.common.core.controller.BaseController; import com.xzzn.common.core.domain.AjaxResult; @@ -29,7 +24,6 @@ import com.xzzn.common.core.page.TableDataInfo; import com.xzzn.common.core.text.Convert; import com.xzzn.common.enums.BusinessType; import com.xzzn.common.utils.SecurityUtils; -import com.xzzn.common.utils.sql.SqlUtil; import com.xzzn.generator.config.GenConfig; import com.xzzn.generator.domain.GenTable; import com.xzzn.generator.domain.GenTableColumn; @@ -121,43 +115,6 @@ public class GenController extends BaseController return success(); } - /** - * 创建表结构(保存) - */ - @PreAuthorize("@ss.hasRole('admin')") - @Log(title = "创建表", businessType = BusinessType.OTHER) - @PostMapping("/createTable") - public AjaxResult createTableSave(String sql) - { - try - { - SqlUtil.filterKeyword(sql); - List sqlStatements = SQLUtils.parseStatements(sql, DbType.mysql); - List tableNames = new ArrayList<>(); - for (SQLStatement sqlStatement : sqlStatements) - { - if (sqlStatement instanceof MySqlCreateTableStatement) - { - MySqlCreateTableStatement createTableStatement = (MySqlCreateTableStatement) sqlStatement; - if (genTableService.createTable(createTableStatement.toString())) - { - String tableName = createTableStatement.getTableName().replaceAll("`", ""); - tableNames.add(tableName); - } - } - } - List tableList = genTableService.selectDbTableListByNames(tableNames.toArray(new String[tableNames.size()])); - String operName = SecurityUtils.getUsername(); - genTableService.importGenTable(tableList, operName); - return AjaxResult.success(); - } - catch (Exception e) - { - logger.error(e.getMessage(), e); - return AjaxResult.error("创建表结构异常"); - } - } - /** * 修改保存代码生成业务 */ @@ -260,4 +217,4 @@ public class GenController extends BaseController response.setContentType("application/octet-stream; charset=UTF-8"); IOUtils.write(data, response.getOutputStream()); } -} \ No newline at end of file +} diff --git a/ems-generator/src/main/java/com/xzzn/generator/mapper/GenTableMapper.java b/ems-generator/src/main/java/com/xzzn/generator/mapper/GenTableMapper.java index 0bcd959..c10cd22 100644 --- a/ems-generator/src/main/java/com/xzzn/generator/mapper/GenTableMapper.java +++ b/ems-generator/src/main/java/com/xzzn/generator/mapper/GenTableMapper.java @@ -81,11 +81,4 @@ public interface GenTableMapper */ public int deleteGenTableByIds(Long[] ids); - /** - * 创建表 - * - * @param sql 表结构 - * @return 结果 - */ - public int createTable(String sql); } diff --git a/ems-generator/src/main/java/com/xzzn/generator/service/GenTableServiceImpl.java b/ems-generator/src/main/java/com/xzzn/generator/service/GenTableServiceImpl.java index 7b0ff5a..0e51245 100644 --- a/ems-generator/src/main/java/com/xzzn/generator/service/GenTableServiceImpl.java +++ b/ems-generator/src/main/java/com/xzzn/generator/service/GenTableServiceImpl.java @@ -149,18 +149,6 @@ public class GenTableServiceImpl implements IGenTableService genTableColumnMapper.deleteGenTableColumnByIds(tableIds); } - /** - * 创建表 - * - * @param sql 创建表语句 - * @return 结果 - */ - @Override - public boolean createTable(String sql) - { - return genTableMapper.createTable(sql) == 0; - } - /** * 导入表结构 * @@ -528,4 +516,4 @@ public class GenTableServiceImpl implements IGenTableService } return genPath + File.separator + VelocityUtils.getFileName(template, table); } -} \ No newline at end of file +} diff --git a/ems-generator/src/main/java/com/xzzn/generator/service/IGenTableService.java b/ems-generator/src/main/java/com/xzzn/generator/service/IGenTableService.java index f64f132..6b78595 100644 --- a/ems-generator/src/main/java/com/xzzn/generator/service/IGenTableService.java +++ b/ems-generator/src/main/java/com/xzzn/generator/service/IGenTableService.java @@ -66,14 +66,6 @@ public interface IGenTableService */ public void deleteGenTableByIds(Long[] tableIds); - /** - * 创建表 - * - * @param sql 创建表语句 - * @return 结果 - */ - public boolean createTable(String sql); - /** * 导入表结构 * diff --git a/ems-generator/src/main/resources/mapper/generator/GenTableMapper.xml b/ems-generator/src/main/resources/mapper/generator/GenTableMapper.xml index 2322914..3bf7137 100644 --- a/ems-generator/src/main/resources/mapper/generator/GenTableMapper.xml +++ b/ems-generator/src/main/resources/mapper/generator/GenTableMapper.xml @@ -171,10 +171,6 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" ) - - ${sql} - - update gen_table @@ -207,4 +203,4 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" - \ No newline at end of file + diff --git a/ems-system/pom.xml b/ems-system/pom.xml index 7f2b1e1..305a0f8 100644 --- a/ems-system/pom.xml +++ b/ems-system/pom.xml @@ -27,6 +27,12 @@ jaxb-runtime + + org.influxdb + influxdb-java + 2.24 + + - \ No newline at end of file + diff --git a/ems-system/src/main/java/com/xzzn/ems/domain/EmsSiteSetting.java b/ems-system/src/main/java/com/xzzn/ems/domain/EmsSiteSetting.java index 9ef28ee..e67d0d2 100644 --- a/ems-system/src/main/java/com/xzzn/ems/domain/EmsSiteSetting.java +++ b/ems-system/src/main/java/com/xzzn/ems/domain/EmsSiteSetting.java @@ -54,6 +54,9 @@ public class EmsSiteSetting extends BaseEntity @Excel(name = "站点id") private String siteId; + /** 授权状态:true-已授权,false-未授权 */ + private Boolean authorized; + public void setId(Long id) { this.id = id; @@ -144,6 +147,16 @@ public class EmsSiteSetting extends BaseEntity return siteId; } + public void setAuthorized(Boolean authorized) + { + this.authorized = authorized; + } + + public Boolean getAuthorized() + { + return authorized; + } + @Override public String toString() { return new ToStringBuilder(this,ToStringStyle.MULTI_LINE_STYLE) @@ -161,6 +174,7 @@ public class EmsSiteSetting extends BaseEntity .append("createTime", getCreateTime()) .append("updateTime", getUpdateTime()) .append("siteId", getSiteId()) + .append("authorized", getAuthorized()) .toString(); } } diff --git a/ems-system/src/main/java/com/xzzn/ems/domain/vo/PointNameRequest.java b/ems-system/src/main/java/com/xzzn/ems/domain/vo/PointNameRequest.java index 8552c37..9098c7a 100644 --- a/ems-system/src/main/java/com/xzzn/ems/domain/vo/PointNameRequest.java +++ b/ems-system/src/main/java/com/xzzn/ems/domain/vo/PointNameRequest.java @@ -14,6 +14,7 @@ public class PointNameRequest { private String deviceCategory; private String pointName; + private List pointNames; /** 数据分组 1-分钟 2-小时 3-天 */ private int dataUnit; @@ -50,6 +51,14 @@ public class PointNameRequest { this.pointName = pointName; } + public List getPointNames() { + return pointNames; + } + + public void setPointNames(List pointNames) { + this.pointNames = pointNames; + } + public int getDataUnit() { return dataUnit; } diff --git a/ems-system/src/main/java/com/xzzn/ems/mapper/EmsPointEnumMatchMapper.java b/ems-system/src/main/java/com/xzzn/ems/mapper/EmsPointEnumMatchMapper.java index f517c59..829b058 100644 --- a/ems-system/src/main/java/com/xzzn/ems/mapper/EmsPointEnumMatchMapper.java +++ b/ems-system/src/main/java/com/xzzn/ems/mapper/EmsPointEnumMatchMapper.java @@ -67,4 +67,12 @@ public interface EmsPointEnumMatchMapper public List selectList(@Param("siteId") String siteId, @Param("deviceCategory") String deviceCategory, @Param("matchField") String matchField); + + int countBySiteId(@Param("siteId") String siteId); + + int deleteBySiteId(@Param("siteId") String siteId); + + int copyTemplateToSite(@Param("templateSiteId") String templateSiteId, + @Param("targetSiteId") String targetSiteId, + @Param("operName") String operName); } diff --git a/ems-system/src/main/java/com/xzzn/ems/mapper/EmsPointMatchMapper.java b/ems-system/src/main/java/com/xzzn/ems/mapper/EmsPointMatchMapper.java index 0a9e0cf..622f584 100644 --- a/ems-system/src/main/java/com/xzzn/ems/mapper/EmsPointMatchMapper.java +++ b/ems-system/src/main/java/com/xzzn/ems/mapper/EmsPointMatchMapper.java @@ -172,4 +172,14 @@ public interface EmsPointMatchMapper int getDevicePointAlarmNum(@Param("siteId") String siteId, @Param("deviceId") String deviceId, @Param("deviceCategory") String deviceCategory); List selectDeviceStatusPoint(@Param("request") DeviceUpdateRequest request); + + int countBySiteId(@Param("siteId") String siteId); + + int deleteBySiteId(@Param("siteId") String siteId); + + int copyTemplateToSite(@Param("templateSiteId") String templateSiteId, + @Param("targetSiteId") String targetSiteId, + @Param("operName") String operName); + + List selectBySiteId(@Param("siteId") String siteId); } diff --git a/ems-system/src/main/java/com/xzzn/ems/service/IEmsDeviceSettingService.java b/ems-system/src/main/java/com/xzzn/ems/service/IEmsDeviceSettingService.java index 554bdee..c6369bc 100644 --- a/ems-system/src/main/java/com/xzzn/ems/service/IEmsDeviceSettingService.java +++ b/ems-system/src/main/java/com/xzzn/ems/service/IEmsDeviceSettingService.java @@ -6,7 +6,12 @@ import com.xzzn.ems.domain.vo.DeviceUpdateRequest; import com.xzzn.ems.domain.vo.DevicesSettingVo; import com.xzzn.ems.domain.vo.PointDataRequest; import com.xzzn.ems.domain.vo.PointQueryResponse; +import com.xzzn.ems.domain.vo.SiteMonitorDataSaveRequest; +import com.xzzn.ems.domain.vo.SiteMonitorProjectDisplayVo; +import com.xzzn.ems.domain.vo.SiteMonitorProjectPointMappingSaveRequest; +import com.xzzn.ems.domain.vo.SiteMonitorProjectPointMappingVo; +import java.util.Date; import java.util.List; import java.util.Map; @@ -36,4 +41,14 @@ public interface IEmsDeviceSettingService public List> getDeviceListBySiteAndCategory(String siteId, String deviceCategory); public boolean updateDeviceStatus(DeviceUpdateRequest request); + + public List getSiteMonitorProjectPointMapping(String siteId); + + public int saveSiteMonitorProjectPointMapping(SiteMonitorProjectPointMappingSaveRequest request, String operName); + + public List getSiteMonitorProjectDisplay(String siteId); + + public int saveSiteMonitorProjectData(SiteMonitorDataSaveRequest request, String operName); + + public int syncSiteMonitorDataByMqtt(String siteId, String deviceId, String jsonData, Date valueTime); } diff --git a/ems-system/src/main/java/com/xzzn/ems/service/IEmsPointMatchService.java b/ems-system/src/main/java/com/xzzn/ems/service/IEmsPointMatchService.java index 121ec50..394ceb5 100644 --- a/ems-system/src/main/java/com/xzzn/ems/service/IEmsPointMatchService.java +++ b/ems-system/src/main/java/com/xzzn/ems/service/IEmsPointMatchService.java @@ -6,6 +6,7 @@ import com.xzzn.ems.domain.EmsPointMatch; import com.xzzn.ems.domain.vo.DevicePointMatchExportVo; import com.xzzn.ems.domain.vo.DevicePointMatchVo; import com.xzzn.ems.domain.vo.ImportPointDataRequest; +import com.xzzn.ems.domain.vo.ImportPointTemplateRequest; /** * 点位匹配Service接口 @@ -40,4 +41,52 @@ public interface IEmsPointMatchService * @throws Exception */ public List importDataByDevice(ImportPointDataRequest request, String operName); + + /** + * 按站点导入模板点位数据 + * @param request 请求参数 + * @param operName 操作人 + * @return 导入结果信息 + */ + public String importTemplateBySite(ImportPointTemplateRequest request, String operName); + + /** + * 查询点位配置列表 + * + * @param emsPointMatch 点位配置 + * @return 点位配置列表 + */ + public List selectPointMatchConfigList(EmsPointMatch emsPointMatch); + + /** + * 查询点位配置详情 + * + * @param id 主键ID + * @return 点位配置 + */ + public EmsPointMatch selectPointMatchById(Long id); + + /** + * 新增点位配置 + * + * @param emsPointMatch 点位配置 + * @return 结果 + */ + public int insertPointMatch(EmsPointMatch emsPointMatch); + + /** + * 修改点位配置 + * + * @param emsPointMatch 点位配置 + * @return 结果 + */ + public int updatePointMatch(EmsPointMatch emsPointMatch); + + /** + * 删除点位配置 + * + * @param ids 主键集合 + * @return 结果 + */ + public int deletePointMatchByIds(Long[] ids); } diff --git a/ems-system/src/main/java/com/xzzn/ems/service/IEmsSiteService.java b/ems-system/src/main/java/com/xzzn/ems/service/IEmsSiteService.java index 298266f..df32981 100644 --- a/ems-system/src/main/java/com/xzzn/ems/service/IEmsSiteService.java +++ b/ems-system/src/main/java/com/xzzn/ems/service/IEmsSiteService.java @@ -32,4 +32,8 @@ public interface IEmsSiteService public List> getAllPcsInfo(String siteId); public List> getParentCategoryDeviceId(String siteId, String deviceCategory); + + int addSite(EmsSiteSetting emsSiteSetting); + + int updateSite(EmsSiteSetting emsSiteSetting); } diff --git a/ems-system/src/main/java/com/xzzn/ems/service/IGeneralQueryService.java b/ems-system/src/main/java/com/xzzn/ems/service/IGeneralQueryService.java index 325993f..c245f3d 100644 --- a/ems-system/src/main/java/com/xzzn/ems/service/IGeneralQueryService.java +++ b/ems-system/src/main/java/com/xzzn/ems/service/IGeneralQueryService.java @@ -13,7 +13,7 @@ public interface IGeneralQueryService { // 模糊查询获取点位名称List - public List getPointNameList(PointNameRequest request); + public List getPointNameList(PointNameRequest request); // 根据条件获取点位数据变化 public List getPointValueList(PointNameRequest request); diff --git a/ems-system/src/main/java/com/xzzn/ems/service/impl/DeviceDataProcessServiceImpl.java b/ems-system/src/main/java/com/xzzn/ems/service/impl/DeviceDataProcessServiceImpl.java index 6663e82..0f26d46 100644 --- a/ems-system/src/main/java/com/xzzn/ems/service/impl/DeviceDataProcessServiceImpl.java +++ b/ems-system/src/main/java/com/xzzn/ems/service/impl/DeviceDataProcessServiceImpl.java @@ -48,12 +48,14 @@ import com.xzzn.ems.mapper.EmsEmsDataMapper; import com.xzzn.ems.mapper.EmsPcsAlarmDataMapper; import com.xzzn.ems.mapper.EmsPcsBranchDataMapper; import com.xzzn.ems.mapper.EmsPcsDataMapper; +import com.xzzn.ems.mapper.EmsPointConfigMapper; import com.xzzn.ems.mapper.EmsStackAlarmDataMapper; import com.xzzn.ems.mapper.EmsXfDataMapper; import com.xzzn.ems.service.IDeviceDataProcessService; import com.xzzn.ems.service.IEmsAlarmRecordsService; import com.xzzn.ems.service.IEmsDeviceSettingService; import com.xzzn.ems.service.IEmsEnergyPriceConfigService; +import com.xzzn.ems.service.InfluxPointDataWriter; import com.xzzn.ems.utils.AbstractBatteryDataProcessor; import com.xzzn.ems.utils.DevicePointMatchDataProcessor; @@ -68,15 +70,25 @@ import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.Collections; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + import org.apache.commons.collections4.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,6 +103,13 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i private static final Pattern PATTERN = Pattern.compile("(BMSD\\d{2})(ZT|SOC|SOH|DL|DY|BDSC)"); // 匹配DTDC+数字格式的正则(提取序号) private static final Pattern DTDC_PATTERN = Pattern.compile("DTDC(\\d+)([A-Za-z]*)"); + private static final Pattern DB_NAME_PATTERN = Pattern.compile("^[A-Za-z0-9_]+$"); + private static final int POINT_QUEUE_CAPACITY = 100000; + private static final int POINT_FLUSH_BATCH_SIZE = 2000; + private static final int POINT_FLUSH_MAX_DRAIN_PER_RUN = 20000; + private static final int POINT_ENQUEUE_RETRY_TIMES = 3; + private static final long POINT_ENQUEUE_RETRY_WAIT_MS = 10; + private static final long POINT_FLUSH_INTERVAL_MS = 100; @Autowired private EmsBatteryClusterMapper emsBatteryClusterMapper; @@ -101,6 +120,8 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i @Autowired private EmsPcsDataMapper emsPcsDataMapper; @Autowired + private EmsPointConfigMapper emsPointConfigMapper; + @Autowired private EmsPcsBranchDataMapper emsPcsBranchDataMapper; @Autowired private EmsAmmeterDataMapper emsAmmeterDataMapper; @@ -145,17 +166,41 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i @Autowired private RedisCache redisCache; + @Autowired + private InfluxPointDataWriter influxPointDataWriter; + private final BlockingQueue pointDataQueue = new LinkedBlockingQueue<>(POINT_QUEUE_CAPACITY); + private final ScheduledExecutorService pointDataWriter = Executors.newSingleThreadScheduledExecutor(r -> { + Thread thread = new Thread(r); + thread.setName("point-data-writer"); + thread.setDaemon(true); + return thread; + }); + private final AtomicBoolean pointDataFlushing = new AtomicBoolean(false); // 构造方法(调用父类构造) public DeviceDataProcessServiceImpl(ObjectMapper objectMapper) { super(objectMapper); } + @PostConstruct + public void initPointDataBatchWriter() { + pointDataWriter.scheduleWithFixedDelay(this::flushPointDataSafely, + POINT_FLUSH_INTERVAL_MS, POINT_FLUSH_INTERVAL_MS, TimeUnit.MILLISECONDS); + } + + @PreDestroy + public void destroyPointDataBatchWriter() { + flushPointDataSafely(); + pointDataWriter.shutdown(); + } + @Override public void handleDeviceData(String message, String siteId) { JSONArray arraylist = parseJsonData(message); if (arraylist == null) return; - // 过滤掉空数据(空数据不处理,直接返回 + long startMs = System.currentTimeMillis(); + log.info("开始处理设备数据, siteId: {}, messageSize: {}", siteId, arraylist.size()); + Set deviceIds = new LinkedHashSet<>(); for (int i = 0; i < arraylist.size(); i++) { JSONObject obj = JSONObject.parseObject(arraylist.get(i).toString()); @@ -165,21 +210,210 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i Long timestamp = obj.getLong("timestamp"); Date dataUpdateTime = DateUtils.convertUpdateTime(timestamp); - log.info("deviceId:" + deviceId); - boolean isEmpty = checkJsonDataEmpty(jsonData); - if (isEmpty) { - // 添加设备告警 - iEmsAlarmRecordsService.addEmptyDataAlarmRecord(siteId, deviceId); - return; + if (StringUtils.isNotBlank(deviceId)) { + deviceIds.add(deviceId); + } + if (checkJsonDataEmpty(jsonData)) { + continue; } - // 存放mqtt原始每个设备最晚一次数据,便于后面点位获取数据 - redisCache.setCacheObject(RedisKeyConstants.ORIGINAL_MQTT_DATA + siteId + "_" + deviceId, obj); - // 存放每次同步数据,失效时间(同同步时间)-用于判断是否正常同步数据 - redisCache.setCacheObject(RedisKeyConstants.SYNC_DATA + siteId + "_" + deviceId, obj, 1, TimeUnit.MINUTES); + try { + // 保留每个设备最新一条原始报文,供“点位最新值”从 Redis 读取 + redisCache.setCacheObject(RedisKeyConstants.ORIGINAL_MQTT_DATA + siteId + "_" + deviceId, obj); + redisCache.setCacheObject(RedisKeyConstants.SYNC_DATA + siteId + "_" + deviceId, obj, 1, TimeUnit.MINUTES); - // 处理设备数据 - processingDeviceData(siteId, deviceId, jsonData, dataUpdateTime); + processPointConfigData(siteId, deviceId, jsonData, dataUpdateTime); + iEmsDeviceSettingService.syncSiteMonitorDataByMqtt(siteId, deviceId, jsonData, dataUpdateTime); + } catch (Exception e) { + log.warn("点位映射数据处理失败,siteId: {}, deviceId: {}, err: {}", + siteId, deviceId, e.getMessage(), e); + } + } + log.info("结束处理设备数据, siteId: {}, deviceCount: {}, deviceIds: {}, costMs: {}", + siteId, deviceIds.size(), String.join(",", deviceIds), System.currentTimeMillis() - startMs); + } + + private void processPointConfigData(String siteId, String deviceId, String jsonData, Date dataUpdateTime) { + if (StringUtils.isAnyBlank(siteId, deviceId, jsonData)) { + return; + } + if (!DB_NAME_PATTERN.matcher(siteId).matches()) { + log.warn("站点ID不合法,跳过点位映射落库,siteId: {}", siteId); + return; + } + + Map dataMap = JSON.parseObject(jsonData, new TypeReference>() { + }); + if (org.apache.commons.collections4.MapUtils.isEmpty(dataMap)) { + return; + } + + List pointConfigs = getPointConfigsWithCache(siteId, deviceId); + if (CollectionUtils.isEmpty(pointConfigs)) { + return; + } + + Map configByDataKey = pointConfigs.stream() + .filter(cfg -> StringUtils.isNotBlank(cfg.getDataKey())) + .collect(Collectors.toMap( + cfg -> cfg.getDataKey().toUpperCase(), + cfg -> cfg, + (oldValue, newValue) -> oldValue + )); + + for (Map.Entry entry : dataMap.entrySet()) { + String dataKey = entry.getKey(); + if (StringUtils.isBlank(dataKey)) { + continue; + } + EmsPointConfig pointConfig = configByDataKey.get(dataKey.toUpperCase()); + if (pointConfig == null) { + continue; + } + + BigDecimal pointValue = StringUtils.getBigDecimal(entry.getValue()); + if (pointValue == null) { + continue; + } + + // 支持按配置进行二次转换:f(x)=A*x^2 + K*x + B + pointValue = convertPointValue(pointValue, pointConfig); + + enqueuePointData(siteId, deviceId, pointConfig.getDataKey(), pointValue, dataUpdateTime); + } + } + + private void enqueuePointData(String siteId, String deviceId, String pointKey, BigDecimal pointValue, Date dataTime) { + if (StringUtils.isAnyBlank(siteId, deviceId, pointKey) || pointValue == null) { + return; + } + PointDataRecord record = new PointDataRecord(siteId, deviceId, pointKey, pointValue, dataTime); + if (pointDataQueue.offer(record)) { + if (pointDataQueue.remainingCapacity() <= POINT_FLUSH_BATCH_SIZE) { + flushPointDataSafely(); + } + return; + } + + // 队列压力较高时,优先触发冲刷并短暂重试,避免 MQTT 处理线程直接阻塞在数据库写入上。 + for (int i = 0; i < POINT_ENQUEUE_RETRY_TIMES; i++) { + flushPointDataSafely(); + try { + if (pointDataQueue.offer(record, POINT_ENQUEUE_RETRY_WAIT_MS, TimeUnit.MILLISECONDS)) { + return; + } + } catch (InterruptedException interruptedException) { + Thread.currentThread().interrupt(); + break; + } + } + + log.warn("点位写入队列持续拥塞,降级同步写入 InfluxDB,siteId: {}, queueSize: {}", + siteId, pointDataQueue.size()); + writePointDataToInflux(Collections.singletonList(record)); + } + + private BigDecimal convertPointValue(BigDecimal sourceValue, EmsPointConfig pointConfig) { + if (sourceValue == null || pointConfig == null) { + return sourceValue; + } + BigDecimal a = pointConfig.getDataA() == null ? BigDecimal.ZERO : pointConfig.getDataA(); + BigDecimal k = pointConfig.getDataK() == null ? BigDecimal.ONE : pointConfig.getDataK(); + BigDecimal b = pointConfig.getDataB() == null ? BigDecimal.ZERO : pointConfig.getDataB(); + return a.multiply(sourceValue).multiply(sourceValue) + .add(k.multiply(sourceValue)) + .add(b); + } + + private void flushPointDataSafely() { + if (!pointDataFlushing.compareAndSet(false, true)) { + return; + } + try { + int drainedCount = 0; + while (drainedCount < POINT_FLUSH_MAX_DRAIN_PER_RUN) { + List batch = new ArrayList<>(POINT_FLUSH_BATCH_SIZE); + pointDataQueue.drainTo(batch, POINT_FLUSH_BATCH_SIZE); + if (batch.isEmpty()) { + break; + } + drainedCount += batch.size(); + writePointDataToInflux(batch); + } + + if (pointDataQueue.remainingCapacity() <= POINT_FLUSH_BATCH_SIZE) { + log.warn("点位写入队列积压较高,queueSize: {}", pointDataQueue.size()); + } + } catch (Exception e) { + log.error("批量写入点位数据失败: {}", e.getMessage(), e); + } finally { + pointDataFlushing.set(false); + } + } + + private void writePointDataToInflux(List records) { + if (CollectionUtils.isEmpty(records)) { + return; + } + List payloads = records.stream() + .map(item -> new InfluxPointDataWriter.PointWritePayload( + item.getSiteId(), + item.getDeviceId(), + item.getPointKey(), + item.getPointValue(), + item.getDataTime())) + .collect(Collectors.toList()); + influxPointDataWriter.writeBatch(payloads); + } + + private List getPointConfigsWithCache(String siteId, String deviceId) { + String cacheKey = RedisKeyConstants.POINT_CONFIG_DEVICE + siteId + "_" + deviceId; + List cached = redisCache.getCacheObject(cacheKey); + if (cached != null) { + return cached; + } + EmsPointConfig query = new EmsPointConfig(); + query.setSiteId(siteId); + query.setDeviceId(deviceId); + List latest = emsPointConfigMapper.selectEmsPointConfigList(query); + List cacheValue = latest == null ? new ArrayList<>() : latest; + redisCache.setCacheObject(cacheKey, cacheValue); + return cacheValue; + } + + private static class PointDataRecord { + private final String siteId; + private final String deviceId; + private final String pointKey; + private final BigDecimal pointValue; + private final Date dataTime; + + private PointDataRecord(String siteId, String deviceId, String pointKey, BigDecimal pointValue, Date dataTime) { + this.siteId = siteId; + this.deviceId = deviceId; + this.pointKey = pointKey; + this.pointValue = pointValue; + this.dataTime = dataTime; + } + + private String getSiteId() { + return siteId; + } + + private String getDeviceId() { + return deviceId; + } + + private String getPointKey() { + return pointKey; + } + + private BigDecimal getPointValue() { + return pointValue; + } + + private Date getDataTime() { + return dataTime; } } diff --git a/ems-system/src/main/java/com/xzzn/ems/service/impl/EmsDeviceSettingServiceImpl.java b/ems-system/src/main/java/com/xzzn/ems/service/impl/EmsDeviceSettingServiceImpl.java index b9f1ab0..f950484 100644 --- a/ems-system/src/main/java/com/xzzn/ems/service/impl/EmsDeviceSettingServiceImpl.java +++ b/ems-system/src/main/java/com/xzzn/ems/service/impl/EmsDeviceSettingServiceImpl.java @@ -17,15 +17,26 @@ import com.xzzn.common.exception.ServiceException; import com.xzzn.common.utils.DateUtils; import com.xzzn.common.utils.StringUtils; import com.xzzn.ems.domain.EmsDevicesSetting; +import com.xzzn.ems.domain.EmsPointMatch; import com.xzzn.ems.domain.EmsPcsSetting; +import com.xzzn.ems.domain.EmsSiteMonitorItem; +import com.xzzn.ems.domain.EmsSiteMonitorPointMatch; import com.xzzn.ems.domain.vo.DeviceUpdateRequest; import com.xzzn.ems.domain.vo.DevicesSettingVo; import com.xzzn.ems.domain.vo.PointDataRequest; import com.xzzn.ems.domain.vo.PointQueryResponse; +import com.xzzn.ems.domain.vo.SiteMonitorDataSaveItemVo; +import com.xzzn.ems.domain.vo.SiteMonitorDataSaveRequest; +import com.xzzn.ems.domain.vo.SiteMonitorProjectDisplayVo; +import com.xzzn.ems.domain.vo.SiteMonitorProjectPointMappingSaveRequest; +import com.xzzn.ems.domain.vo.SiteMonitorProjectPointMappingVo; import com.xzzn.ems.mapper.EmsBatteryDataMinutesMapper; import com.xzzn.ems.mapper.EmsDevicesSettingMapper; import com.xzzn.ems.mapper.EmsPcsSettingMapper; import com.xzzn.ems.mapper.EmsPointMatchMapper; +import com.xzzn.ems.mapper.EmsSiteMonitorDataMapper; +import com.xzzn.ems.mapper.EmsSiteMonitorItemMapper; +import com.xzzn.ems.mapper.EmsSiteMonitorPointMatchMapper; import com.xzzn.ems.service.IEmsDeviceSettingService; import java.math.BigDecimal; @@ -37,6 +48,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -56,6 +68,12 @@ public class EmsDeviceSettingServiceImpl implements IEmsDeviceSettingService private static final Logger log = LoggerFactory.getLogger(EmsDeviceSettingServiceImpl.class); private static final String DDS_SITE_ID = "021_DDS_01"; + private static final String MODULE_HOME = "HOME"; + private static final String MODULE_SBJK = "SBJK"; + private static final String MODULE_TJBB = "TJBB"; + private static final String HISTORY_TABLE_HOME = "ems_site_monitor_data_home_his"; + private static final String HISTORY_TABLE_SBJK = "ems_site_monitor_data_sbjk_his"; + private static final String HISTORY_TABLE_TJBB = "ems_site_monitor_data_tjbb_his"; @Autowired private EmsDevicesSettingMapper emsDevicesMapper; @Autowired @@ -70,6 +88,12 @@ public class EmsDeviceSettingServiceImpl implements IEmsDeviceSettingService private EmsBatteryClusterServiceImpl emsBatteryClusterServiceImpl; @Autowired private ModbusProcessor modbusProcessor; + @Autowired + private EmsSiteMonitorItemMapper emsSiteMonitorItemMapper; + @Autowired + private EmsSiteMonitorPointMatchMapper emsSiteMonitorPointMatchMapper; + @Autowired + private EmsSiteMonitorDataMapper emsSiteMonitorDataMapper; /** * 获取设备详细信息 @@ -376,6 +400,439 @@ public class EmsDeviceSettingServiceImpl implements IEmsDeviceSettingService return emsDevicesMapper.getAllDeviceCategoryBySiteId(siteId); } + @Override + public List getSiteMonitorProjectPointMapping(String siteId) { + List result = new ArrayList<>(); + if (StringUtils.isBlank(siteId)) { + return result; + } + List itemList = emsSiteMonitorItemMapper.selectEnabledList(); + if (itemList == null || itemList.isEmpty()) { + return result; + } + List mappingList = emsSiteMonitorPointMatchMapper.selectBySiteId(siteId); + Map pointMap = mappingList.stream() + .filter(item -> StringUtils.isNotBlank(item.getFieldCode())) + .collect(Collectors.toMap(EmsSiteMonitorPointMatch::getFieldCode, EmsSiteMonitorPointMatch::getDataPoint, (a, b) -> b)); + + itemList.forEach(item -> { + SiteMonitorProjectPointMappingVo vo = new SiteMonitorProjectPointMappingVo(); + vo.setModuleCode(item.getModuleCode()); + vo.setModuleName(item.getModuleName()); + vo.setMenuCode(item.getMenuCode()); + vo.setMenuName(item.getMenuName()); + vo.setSectionName(item.getSectionName()); + vo.setFieldCode(item.getFieldCode()); + vo.setFieldName(item.getFieldName()); + vo.setDataPoint(pointMap.getOrDefault(item.getFieldCode(), "")); + result.add(vo); + }); + return result; + } + + @Override + public int saveSiteMonitorProjectPointMapping(SiteMonitorProjectPointMappingSaveRequest request, String operName) { + if (request == null || StringUtils.isBlank(request.getSiteId())) { + throw new ServiceException("站点ID不能为空"); + } + String siteId = request.getSiteId(); + List itemList = emsSiteMonitorItemMapper.selectEnabledList(); + if (itemList == null || itemList.isEmpty()) { + throw new ServiceException("单站监控配置项为空,请先初始化 ems_site_monitor_item"); + } + Set fieldCodeSet = itemList.stream().map(EmsSiteMonitorItem::getFieldCode).collect(Collectors.toSet()); + emsSiteMonitorPointMatchMapper.deleteBySiteId(siteId); + + if (request.getMappings() == null || request.getMappings().isEmpty()) { + return 0; + } + List saveList = new ArrayList<>(); + for (SiteMonitorProjectPointMappingVo mapping : request.getMappings()) { + if (mapping == null || StringUtils.isBlank(mapping.getFieldCode()) || StringUtils.isBlank(mapping.getDataPoint())) { + continue; + } + String fieldCode = mapping.getFieldCode().trim(); + if (!fieldCodeSet.contains(fieldCode)) { + continue; + } + EmsSiteMonitorPointMatch pointMatch = new EmsSiteMonitorPointMatch(); + pointMatch.setSiteId(siteId); + pointMatch.setFieldCode(fieldCode); + pointMatch.setDataPoint(mapping.getDataPoint().trim()); + pointMatch.setCreateBy(operName); + pointMatch.setUpdateBy(operName); + saveList.add(pointMatch); + } + if (saveList.isEmpty()) { + return 0; + } + return emsSiteMonitorPointMatchMapper.insertBatch(saveList); + } + + @Override + public List getSiteMonitorProjectDisplay(String siteId) { + List mappingList = getSiteMonitorProjectPointMapping(siteId); + if (mappingList.isEmpty()) { + return new ArrayList<>(); + } + Map homeLatestMap = safeRedisMap(redisCache.getCacheMap(buildSiteMonitorLatestRedisKey(siteId, MODULE_HOME))); + Map sbjkLatestMap = safeRedisMap(redisCache.getCacheMap(buildSiteMonitorLatestRedisKey(siteId, MODULE_SBJK))); + Map tjbbLatestMap = safeRedisMap(redisCache.getCacheMap(buildSiteMonitorLatestRedisKey(siteId, MODULE_TJBB))); + + List result = new ArrayList<>(); + for (SiteMonitorProjectPointMappingVo mapping : mappingList) { + SiteMonitorProjectDisplayVo vo = new SiteMonitorProjectDisplayVo(); + BeanUtils.copyProperties(mapping, vo); + Object cacheObj = null; + if (MODULE_HOME.equals(mapping.getModuleCode())) { + cacheObj = homeLatestMap.get(mapping.getFieldCode()); + } else if (MODULE_SBJK.equals(mapping.getModuleCode())) { + cacheObj = sbjkLatestMap.get(mapping.getFieldCode()); + } else if (MODULE_TJBB.equals(mapping.getModuleCode())) { + cacheObj = tjbbLatestMap.get(mapping.getFieldCode()); + } + if (cacheObj != null) { + JSONObject snapshot = parseFieldSnapshot(cacheObj); + vo.setFieldValue(snapshot.getString("fieldValue")); + vo.setValueTime(parseValueTime(snapshot.get("valueTime"))); + } + result.add(vo); + } + return result; + } + + @Override + public int saveSiteMonitorProjectData(SiteMonitorDataSaveRequest request, String operName) { + if (request == null || StringUtils.isBlank(request.getSiteId())) { + throw new ServiceException("站点ID不能为空"); + } + if (request.getItems() == null || request.getItems().isEmpty()) { + return 0; + } + List itemList = emsSiteMonitorItemMapper.selectEnabledList(); + if (itemList == null || itemList.isEmpty()) { + throw new ServiceException("单站监控配置项为空,请先初始化 ems_site_monitor_item"); + } + Set fieldCodeSet = itemList.stream().map(EmsSiteMonitorItem::getFieldCode).collect(Collectors.toSet()); + + Map itemMap = itemList.stream() + .collect(Collectors.toMap(EmsSiteMonitorItem::getFieldCode, item -> item, (a, b) -> a)); + + Date now = DateUtils.getNowDate(); + Map homeLatestUpdates = new HashMap<>(); + Map sbjkLatestUpdates = new HashMap<>(); + Map tjbbLatestUpdates = new HashMap<>(); + Map homeHistoryByMinute = new HashMap<>(); + Map sbjkHistoryByMinute = new HashMap<>(); + Map tjbbHistoryByMinute = new HashMap<>(); + for (SiteMonitorDataSaveItemVo item : request.getItems()) { + if (item == null || StringUtils.isBlank(item.getFieldCode())) { + continue; + } + String fieldCode = item.getFieldCode().trim(); + if (!fieldCodeSet.contains(fieldCode)) { + continue; + } + EmsSiteMonitorItem itemDef = itemMap.get(fieldCode); + if (itemDef == null || StringUtils.isBlank(itemDef.getModuleCode())) { + continue; + } + Date valueTime = item.getValueTime() == null ? now : item.getValueTime(); + JSONObject snapshot = buildFieldSnapshot(item.getFieldValue(), valueTime); + Date statisMinute = truncateToMinute(valueTime); + if (MODULE_HOME.equals(itemDef.getModuleCode())) { + homeLatestUpdates.put(fieldCode, snapshot); + mergeHistorySnapshot(homeHistoryByMinute, statisMinute, fieldCode, snapshot); + } else if (MODULE_SBJK.equals(itemDef.getModuleCode())) { + sbjkLatestUpdates.put(fieldCode, snapshot); + mergeHistorySnapshot(sbjkHistoryByMinute, statisMinute, fieldCode, snapshot); + } else if (MODULE_TJBB.equals(itemDef.getModuleCode())) { + tjbbLatestUpdates.put(fieldCode, snapshot); + mergeHistorySnapshot(tjbbHistoryByMinute, statisMinute, fieldCode, snapshot); + } + } + if (homeLatestUpdates.isEmpty() && sbjkLatestUpdates.isEmpty() && tjbbLatestUpdates.isEmpty()) { + return 0; + } + int rows = 0; + if (!homeLatestUpdates.isEmpty()) { + upsertLatestToRedis(request.getSiteId(), MODULE_HOME, homeLatestUpdates); + rows += upsertHistoryByMinute(HISTORY_TABLE_HOME, request.getSiteId(), homeHistoryByMinute, operName); + } + if (!sbjkLatestUpdates.isEmpty()) { + upsertLatestToRedis(request.getSiteId(), MODULE_SBJK, sbjkLatestUpdates); + rows += upsertHistoryByMinute(HISTORY_TABLE_SBJK, request.getSiteId(), sbjkHistoryByMinute, operName); + } + if (!tjbbLatestUpdates.isEmpty()) { + upsertLatestToRedis(request.getSiteId(), MODULE_TJBB, tjbbLatestUpdates); + rows += upsertHistoryByMinute(HISTORY_TABLE_TJBB, request.getSiteId(), tjbbHistoryByMinute, operName); + } + return rows; + } + + @Override + public int syncSiteMonitorDataByMqtt(String siteId, String deviceId, String jsonData, Date valueTime) { + if (StringUtils.isAnyBlank(siteId, jsonData)) { + return 0; + } + Map dataMap = JSON.parseObject(jsonData, new TypeReference>() {}); + if (dataMap == null || dataMap.isEmpty()) { + return 0; + } + Map upperDataMap = new HashMap<>(); + dataMap.forEach((k, v) -> { + if (StringUtils.isNotBlank(k)) { + upperDataMap.put(k.toUpperCase(), v); + } + }); + + List mappingList = emsSiteMonitorPointMatchMapper.selectBySiteId(siteId); + if (mappingList == null || mappingList.isEmpty()) { + return 0; + } + List itemList = emsSiteMonitorItemMapper.selectEnabledList(); + if (itemList == null || itemList.isEmpty()) { + return 0; + } + Map itemMap = itemList.stream() + .filter(item -> StringUtils.isNotBlank(item.getFieldCode())) + .collect(Collectors.toMap(EmsSiteMonitorItem::getFieldCode, item -> item, (a, b) -> a)); + + Date actualValueTime = valueTime == null ? DateUtils.getNowDate() : valueTime; + Date statisMinute = truncateToMinute(actualValueTime); + Map homeLatestUpdates = new HashMap<>(); + Map sbjkLatestUpdates = new HashMap<>(); + Map tjbbLatestUpdates = new HashMap<>(); + Map homeHistoryByMinute = new HashMap<>(); + Map sbjkHistoryByMinute = new HashMap<>(); + Map tjbbHistoryByMinute = new HashMap<>(); + + for (EmsSiteMonitorPointMatch mapping : mappingList) { + if (mapping == null || StringUtils.isAnyBlank(mapping.getFieldCode(), mapping.getDataPoint())) { + continue; + } + EmsSiteMonitorItem itemDef = itemMap.get(mapping.getFieldCode()); + if (itemDef == null || StringUtils.isBlank(itemDef.getModuleCode())) { + continue; + } + + String point = mapping.getDataPoint().trim(); + Object value = upperDataMap.get(point.toUpperCase()); + if (value == null && StringUtils.isNotBlank(deviceId)) { + value = upperDataMap.get((deviceId + point).toUpperCase()); + } + if (value == null) { + continue; + } + + JSONObject snapshot = buildFieldSnapshot(String.valueOf(value), actualValueTime); + + if (MODULE_HOME.equals(itemDef.getModuleCode())) { + homeLatestUpdates.put(mapping.getFieldCode(), snapshot); + mergeHistorySnapshot(homeHistoryByMinute, statisMinute, mapping.getFieldCode(), snapshot); + } else if (MODULE_SBJK.equals(itemDef.getModuleCode())) { + sbjkLatestUpdates.put(mapping.getFieldCode(), snapshot); + mergeHistorySnapshot(sbjkHistoryByMinute, statisMinute, mapping.getFieldCode(), snapshot); + } else if (MODULE_TJBB.equals(itemDef.getModuleCode())) { + tjbbLatestUpdates.put(mapping.getFieldCode(), snapshot); + mergeHistorySnapshot(tjbbHistoryByMinute, statisMinute, mapping.getFieldCode(), snapshot); + } + } + + int rows = 0; + if (!homeLatestUpdates.isEmpty()) { + upsertLatestToRedis(siteId, MODULE_HOME, homeLatestUpdates); + rows += upsertHistoryByMinute(HISTORY_TABLE_HOME, siteId, homeHistoryByMinute, "mqtt"); + } + if (!sbjkLatestUpdates.isEmpty()) { + upsertLatestToRedis(siteId, MODULE_SBJK, sbjkLatestUpdates); + rows += upsertHistoryByMinute(HISTORY_TABLE_SBJK, siteId, sbjkHistoryByMinute, "mqtt"); + } + if (!tjbbLatestUpdates.isEmpty()) { + upsertLatestToRedis(siteId, MODULE_TJBB, tjbbLatestUpdates); + rows += upsertHistoryByMinute(HISTORY_TABLE_TJBB, siteId, tjbbHistoryByMinute, "mqtt"); + } + return rows; + } + + private int upsertHistoryByMinute(String tableName, String siteId, Map minuteSnapshotMap, String operName) { + int rows = 0; + for (Map.Entry entry : minuteSnapshotMap.entrySet()) { + Date statisMinute = entry.getKey(); + JSONObject merged = mergeHistoryRecord(tableName, siteId, statisMinute, entry.getValue()); + rows += emsSiteMonitorDataMapper.upsertHistoryJsonByMinute( + tableName, + siteId, + statisMinute, + merged.toJSONString(), + operName + ); + Map hotColumns = extractHotColumns(merged); + rows += emsSiteMonitorDataMapper.updateHistoryHotColumns( + tableName, + siteId, + statisMinute, + hotColumns.get("hotSoc"), + hotColumns.get("hotTotalActivePower"), + hotColumns.get("hotTotalReactivePower"), + hotColumns.get("hotDayChargedCap"), + hotColumns.get("hotDayDisChargedCap"), + operName + ); + } + return rows; + } + + private JSONObject mergeHistoryRecord(String tableName, String siteId, Date statisMinute, JSONObject updates) { + String existingJson = emsSiteMonitorDataMapper.selectHistoryJsonByMinute(tableName, siteId, statisMinute); + JSONObject merged = StringUtils.isBlank(existingJson) ? new JSONObject() : JSON.parseObject(existingJson); + if (merged == null) { + merged = new JSONObject(); + } + if (updates != null) { + merged.putAll(updates); + } + return merged; + } + + private void upsertLatestToRedis(String siteId, String moduleCode, Map latestUpdates) { + if (latestUpdates == null || latestUpdates.isEmpty()) { + return; + } + String redisKey = buildSiteMonitorLatestRedisKey(siteId, moduleCode); + Map redisMap = new HashMap<>(); + latestUpdates.forEach((fieldCode, snapshot) -> redisMap.put(fieldCode, snapshot.toJSONString())); + redisCache.setAllCacheMapValue(redisKey, redisMap); + redisCache.expire(redisKey, 7, TimeUnit.DAYS); + } + + private void mergeHistorySnapshot(Map historyByMinute, Date statisMinute, String fieldCode, JSONObject snapshot) { + JSONObject minuteObj = historyByMinute.computeIfAbsent(statisMinute, k -> new JSONObject()); + minuteObj.put(fieldCode, snapshot); + } + + private Map safeRedisMap(Map source) { + return source == null ? new HashMap<>() : source; + } + + private Map extractHotColumns(JSONObject merged) { + Map result = new HashMap<>(); + result.put("hotSoc", extractFieldValue(merged, + new String[]{"SBJK_SSYX__soc", "TJBB_DCDQX__soc_stat", "HOME__avgSoc"}, + new String[]{"__soc", "__soc_stat", "__avgSoc"})); + result.put("hotTotalActivePower", extractFieldValue(merged, + new String[]{"SBJK_SSYX__totalActivePower"}, + new String[]{"__totalActivePower", "__activePower", "__activePower_stat"})); + result.put("hotTotalReactivePower", extractFieldValue(merged, + new String[]{"SBJK_SSYX__totalReactivePower"}, + new String[]{"__totalReactivePower", "__reactivePower", "__reactivePower_stat"})); + result.put("hotDayChargedCap", extractFieldValue(merged, + new String[]{"HOME__dayChargedCap", "SBJK_SSYX__dayChargedCap_rt", "TJBB_GLTJ__chargedCap_stat"}, + new String[]{"__dayChargedCap", "__dayChargedCap_rt", "__chargedCap_stat"})); + result.put("hotDayDisChargedCap", extractFieldValue(merged, + new String[]{"HOME__dayDisChargedCap", "SBJK_SSYX__dayDisChargedCap_rt", "TJBB_GLTJ__disChargedCap_stat"}, + new String[]{"__dayDisChargedCap", "__dayDisChargedCap_rt", "__disChargedCap_stat"})); + return result; + } + + private String extractFieldValue(JSONObject merged, String[] exactKeys, String[] suffixKeys) { + if (merged == null || merged.isEmpty()) { + return null; + } + if (exactKeys != null) { + for (String key : exactKeys) { + String value = getSnapshotFieldValue(merged.get(key)); + if (StringUtils.isNotBlank(value)) { + return value; + } + } + } + if (suffixKeys != null) { + for (Map.Entry entry : merged.entrySet()) { + String fieldCode = entry.getKey(); + if (StringUtils.isBlank(fieldCode)) { + continue; + } + for (String suffix : suffixKeys) { + if (StringUtils.isNotBlank(suffix) && fieldCode.endsWith(suffix)) { + String value = getSnapshotFieldValue(entry.getValue()); + if (StringUtils.isNotBlank(value)) { + return value; + } + } + } + } + } + return null; + } + + private String getSnapshotFieldValue(Object snapshotObj) { + if (snapshotObj == null) { + return null; + } + if (snapshotObj instanceof JSONObject) { + return ((JSONObject) snapshotObj).getString("fieldValue"); + } + if (snapshotObj instanceof Map) { + Object val = ((Map) snapshotObj).get("fieldValue"); + return val == null ? null : String.valueOf(val); + } + return String.valueOf(snapshotObj); + } + + private JSONObject buildFieldSnapshot(String fieldValue, Date valueTime) { + JSONObject snapshot = new JSONObject(); + snapshot.put("fieldValue", fieldValue); + snapshot.put("valueTime", valueTime == null ? null : valueTime.getTime()); + return snapshot; + } + + private JSONObject parseFieldSnapshot(Object cacheObj) { + if (cacheObj == null) { + return new JSONObject(); + } + if (cacheObj instanceof JSONObject) { + return (JSONObject) cacheObj; + } + if (cacheObj instanceof Map) { + return new JSONObject((Map) cacheObj); + } + try { + JSONObject snapshot = JSON.parseObject(String.valueOf(cacheObj)); + return snapshot == null ? new JSONObject() : snapshot; + } catch (Exception e) { + return new JSONObject(); + } + } + + private Date parseValueTime(Object valueTimeObj) { + if (valueTimeObj == null) { + return null; + } + if (valueTimeObj instanceof Date) { + return (Date) valueTimeObj; + } + if (valueTimeObj instanceof Number) { + return new Date(((Number) valueTimeObj).longValue()); + } + try { + return DateUtils.parseDate(valueTimeObj.toString()); + } catch (Exception e) { + return null; + } + } + + private Date truncateToMinute(Date date) { + Date source = date == null ? DateUtils.getNowDate() : date; + long millis = source.getTime(); + long minuteMillis = 60_000L; + return new Date((millis / minuteMillis) * minuteMillis); + } + + private String buildSiteMonitorLatestRedisKey(String siteId, String moduleCode) { + return RedisKeyConstants.SITE_MONITOR_LATEST + siteId + "_" + moduleCode; + } + // 辅助方法:根据值查找对应的对象(用于比较器中获取完整对象) private PointQueryResponse findByValue(List list, Object value) { return list.stream() diff --git a/ems-system/src/main/java/com/xzzn/ems/service/impl/EmsPointMatchServiceImpl.java b/ems-system/src/main/java/com/xzzn/ems/service/impl/EmsPointMatchServiceImpl.java index 22231a3..6e712a3 100644 --- a/ems-system/src/main/java/com/xzzn/ems/service/impl/EmsPointMatchServiceImpl.java +++ b/ems-system/src/main/java/com/xzzn/ems/service/impl/EmsPointMatchServiceImpl.java @@ -12,6 +12,7 @@ import com.xzzn.ems.domain.EmsPointMatch; import com.xzzn.ems.domain.vo.DevicePointMatchExportVo; import com.xzzn.ems.domain.vo.DevicePointMatchVo; import com.xzzn.ems.domain.vo.ImportPointDataRequest; +import com.xzzn.ems.domain.vo.ImportPointTemplateRequest; import com.xzzn.ems.enums.DeviceMatchTable; import com.xzzn.ems.mapper.EmsPointEnumMatchMapper; import com.xzzn.ems.mapper.EmsPointMatchMapper; @@ -20,8 +21,10 @@ import com.xzzn.ems.utils.DevicePointMatchDataProcessor; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.stream.Collectors; import javax.validation.Validator; @@ -213,6 +216,99 @@ public class EmsPointMatchServiceImpl implements IEmsPointMatchService { return errorList; } + /** + * 按站点导入模板点位 + * + * @param request + * @param operName + * @return + */ + @Override + @Transactional(rollbackFor = Exception.class) + public String importTemplateBySite(ImportPointTemplateRequest request, String operName) { + String targetSiteId = request.getSiteId(); + if (SITE_ID.equals(targetSiteId)) { + throw new ServiceException("模板站点不支持作为导入目标站点"); + } + + int templatePointCount = emsPointMatchMapper.countBySiteId(SITE_ID); + if (templatePointCount <= 0) { + throw new ServiceException("模板点位数据不存在,无法导入"); + } + + boolean overwrite = Boolean.TRUE.equals(request.getOverwrite()); + int targetPointCount = emsPointMatchMapper.countBySiteId(targetSiteId); + if (targetPointCount > 0 && !overwrite) { + throw new ServiceException("目标站点已存在点位数据,请勾选“覆盖已存在数据”后重试"); + } + + if (targetPointCount > 0) { + emsPointMatchMapper.deleteBySiteId(targetSiteId); + } + + int targetEnumCount = emsPointEnumMatchMapper.countBySiteId(targetSiteId); + if (targetEnumCount > 0) { + emsPointEnumMatchMapper.deleteBySiteId(targetSiteId); + } + + int importPointCount = emsPointMatchMapper.copyTemplateToSite(SITE_ID, targetSiteId, operName); + int importEnumCount = 0; + if (emsPointEnumMatchMapper.countBySiteId(SITE_ID) > 0) { + importEnumCount = emsPointEnumMatchMapper.copyTemplateToSite(SITE_ID, targetSiteId, operName); + } + + syncSiteToRedis(targetSiteId); + return String.format("导入成功:站点 %s,点位 %d 条,枚举 %d 条", targetSiteId, importPointCount, importEnumCount); + } + + @Override + public List selectPointMatchConfigList(EmsPointMatch emsPointMatch) { + return emsPointMatchMapper.selectEmsPointMatchList(emsPointMatch); + } + + @Override + public EmsPointMatch selectPointMatchById(Long id) { + return emsPointMatchMapper.selectEmsPointMatchById(id); + } + + @Override + public int insertPointMatch(EmsPointMatch emsPointMatch) { + int rows = emsPointMatchMapper.insertEmsPointMatch(emsPointMatch); + if (rows > 0 && StringUtils.isNotBlank(emsPointMatch.getSiteId()) && StringUtils.isNotBlank(emsPointMatch.getDeviceCategory())) { + syncToRedis(emsPointMatch.getSiteId(), emsPointMatch.getDeviceId(), emsPointMatch.getDeviceCategory()); + } + return rows; + } + + @Override + public int updatePointMatch(EmsPointMatch emsPointMatch) { + int rows = emsPointMatchMapper.updateEmsPointMatch(emsPointMatch); + if (rows > 0 && StringUtils.isNotBlank(emsPointMatch.getSiteId()) && StringUtils.isNotBlank(emsPointMatch.getDeviceCategory())) { + syncToRedis(emsPointMatch.getSiteId(), emsPointMatch.getDeviceId(), emsPointMatch.getDeviceCategory()); + } + return rows; + } + + @Override + public int deletePointMatchByIds(Long[] ids) { + List deletedList = new ArrayList<>(); + for (Long id : ids) { + EmsPointMatch pointMatch = emsPointMatchMapper.selectEmsPointMatchById(id); + if (pointMatch != null) { + deletedList.add(pointMatch); + } + } + int rows = emsPointMatchMapper.deleteEmsPointMatchByIds(ids); + if (rows > 0) { + deletedList.forEach(pointMatch -> { + if (StringUtils.isNotBlank(pointMatch.getSiteId()) && StringUtils.isNotBlank(pointMatch.getDeviceCategory())) { + syncToRedis(pointMatch.getSiteId(), pointMatch.getDeviceId(), pointMatch.getDeviceCategory()); + } + }); + } + return rows; + } + private boolean validDevicePointMatch(DevicePointMatchVo pointMatch, List errorList) { StringBuilder errorMsg = new StringBuilder(); if (StringUtils.isBlank(pointMatch.getMatchField())) { @@ -238,6 +334,9 @@ public class EmsPointMatchServiceImpl implements IEmsPointMatchService { List pointMatchData = emsPointMatchMapper.getDevicePointMatchList(siteId, deviceId, deviceCategory); // log.info("同步点位匹配数据到Redis key:{} data:{}", pointMatchKey, pointMatchData); if (CollectionUtils.isEmpty(pointMatchData)) { + if (redisCache.hasKey(pointMatchKey)) { + redisCache.deleteObject(pointMatchKey); + } return; } if (redisCache.hasKey(pointMatchKey)) { @@ -247,6 +346,10 @@ public class EmsPointMatchServiceImpl implements IEmsPointMatchService { log.info("点位匹配数据同步完成 data:{}", JSON.toJSONString(redisCache.getCacheList(pointMatchKey))); // 点位枚举匹配数据同步到Redis + syncPointEnumToRedis(siteId, deviceCategory); + } + + private void syncPointEnumToRedis(String siteId, String deviceCategory) { String pointEnumMatchKey = DevicePointMatchDataProcessor.getPointEnumMacthCacheKey(siteId, deviceCategory); List pointEnumMatchList = emsPointEnumMatchMapper.selectList(siteId, deviceCategory, null); if (!CollectionUtils.isEmpty(pointEnumMatchList)) { @@ -256,7 +359,22 @@ public class EmsPointMatchServiceImpl implements IEmsPointMatchService { redisCache.setCacheList(pointEnumMatchKey, pointEnumMatchList); log.info("点位枚举匹配数据同步完成 data:{}", JSON.toJSONString(redisCache.getCacheList(pointEnumMatchKey))); } + } + private void syncSiteToRedis(String siteId) { + List sitePointList = emsPointMatchMapper.selectBySiteId(siteId); + if (CollectionUtils.isEmpty(sitePointList)) { + return; + } + Set categorySet = new HashSet<>(); + for (EmsPointMatch pointMatch : sitePointList) { + categorySet.add(pointMatch.getDeviceCategory()); + if (StringUtils.isBlank(pointMatch.getDeviceId())) { + continue; + } + syncToRedis(siteId, pointMatch.getDeviceId(), pointMatch.getDeviceCategory()); + } + categorySet.forEach(category -> syncPointEnumToRedis(siteId, category)); } private void savePointMatchEnum(String matchFieldEnum, String dataEnum, EmsPointMatch savePoint) { diff --git a/ems-system/src/main/java/com/xzzn/ems/service/impl/EmsSiteServiceImpl.java b/ems-system/src/main/java/com/xzzn/ems/service/impl/EmsSiteServiceImpl.java index 1abb914..e391e8c 100644 --- a/ems-system/src/main/java/com/xzzn/ems/service/impl/EmsSiteServiceImpl.java +++ b/ems-system/src/main/java/com/xzzn/ems/service/impl/EmsSiteServiceImpl.java @@ -2,7 +2,10 @@ package com.xzzn.ems.service.impl; import com.xzzn.common.constant.RedisKeyConstants; import com.xzzn.common.core.redis.RedisCache; +import com.xzzn.common.exception.ServiceException; import com.xzzn.common.enums.DeviceCategory; +import com.xzzn.common.utils.DateUtils; +import com.xzzn.common.utils.StringUtils; import com.xzzn.ems.domain.EmsBatteryData; import com.xzzn.ems.domain.EmsSiteSetting; import com.xzzn.ems.domain.vo.SiteDeviceListVo; @@ -12,9 +15,11 @@ import com.xzzn.ems.mapper.EmsSiteSettingMapper; import com.xzzn.ems.service.IEmsSiteService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import java.util.List; import java.util.Map; +import java.util.regex.Pattern; /** * 站点信息 服务层实现 @@ -23,6 +28,7 @@ import java.util.Map; @Service public class EmsSiteServiceImpl implements IEmsSiteService { + private static final Pattern SITE_ID_PATTERN = Pattern.compile("^[A-Za-z0-9_]+$"); @Autowired private EmsSiteSettingMapper emsSiteMapper; @@ -70,7 +76,14 @@ public class EmsSiteServiceImpl implements IEmsSiteService */ @Override public List getAllSiteInfoList(String siteName, String startTime, String endTime) { - return emsSiteMapper.getSiteInfoList(siteName,startTime,endTime); + List list = emsSiteMapper.getSiteInfoList(siteName,startTime,endTime); + if (list == null || list.isEmpty()) { + return list; + } + for (EmsSiteSetting site : list) { + site.setAuthorized(Boolean.TRUE); + } + return list; } /** @@ -145,4 +158,53 @@ public class EmsSiteServiceImpl implements IEmsSiteService List> deviceIdList = emsDevicesMapper.getDeviceInfosBySiteIdAndCategory(siteId, parentCategory); return deviceIdList; } + + @Override + @Transactional(rollbackFor = Exception.class) + public int addSite(EmsSiteSetting emsSiteSetting) { + validateSiteSetting(emsSiteSetting, true); + String siteId = emsSiteSetting.getSiteId(); + EmsSiteSetting existing = emsSiteMapper.selectEmsSiteSettingBySiteId(siteId); + if (existing != null) { + throw new ServiceException("站点ID已存在,请更换后重试"); + } + emsSiteSetting.setCreateTime(DateUtils.getNowDate()); + return emsSiteMapper.insertEmsSiteSetting(emsSiteSetting); + } + + @Override + @Transactional(rollbackFor = Exception.class) + public int updateSite(EmsSiteSetting emsSiteSetting) { + if (emsSiteSetting == null || emsSiteSetting.getId() == null) { + throw new ServiceException("站点主键不能为空"); + } + validateSiteSetting(emsSiteSetting, false); + EmsSiteSetting dbData = emsSiteMapper.selectEmsSiteSettingById(emsSiteSetting.getId()); + if (dbData == null) { + throw new ServiceException("站点不存在"); + } + if (StringUtils.isNotEmpty(emsSiteSetting.getSiteId()) + && !StringUtils.equals(emsSiteSetting.getSiteId(), dbData.getSiteId())) { + throw new ServiceException("不支持修改站点ID"); + } + emsSiteSetting.setSiteId(dbData.getSiteId()); + emsSiteSetting.setUpdateTime(DateUtils.getNowDate()); + return emsSiteMapper.updateEmsSiteSetting(emsSiteSetting); + } + + private void validateSiteSetting(EmsSiteSetting emsSiteSetting, boolean requireSiteId) { + if (emsSiteSetting == null) { + throw new ServiceException("参数不能为空"); + } + if (StringUtils.isBlank(emsSiteSetting.getSiteName())) { + throw new ServiceException("站点名称不能为空"); + } + if (requireSiteId && StringUtils.isBlank(emsSiteSetting.getSiteId())) { + throw new ServiceException("站点ID不能为空"); + } + if (StringUtils.isNotBlank(emsSiteSetting.getSiteId()) && !SITE_ID_PATTERN.matcher(emsSiteSetting.getSiteId()).matches()) { + throw new ServiceException("站点ID仅支持字母、数字、下划线"); + } + } + } diff --git a/ems-system/src/main/java/com/xzzn/ems/service/impl/GeneralQueryServiceImpl.java b/ems-system/src/main/java/com/xzzn/ems/service/impl/GeneralQueryServiceImpl.java index 70b2170..8e634fd 100644 --- a/ems-system/src/main/java/com/xzzn/ems/service/impl/GeneralQueryServiceImpl.java +++ b/ems-system/src/main/java/com/xzzn/ems/service/impl/GeneralQueryServiceImpl.java @@ -2,12 +2,15 @@ package com.xzzn.ems.service.impl; import com.xzzn.common.enums.DeviceCategory; import com.xzzn.common.utils.DateUtils; +import com.xzzn.ems.domain.EmsPointConfig; import com.xzzn.ems.domain.EmsPointMatch; import com.xzzn.ems.domain.vo.*; import com.xzzn.ems.mapper.EmsBatteryDataMonthMapper; +import com.xzzn.ems.mapper.EmsPointConfigMapper; import com.xzzn.ems.mapper.EmsDevicesSettingMapper; import com.xzzn.ems.mapper.EmsPointMatchMapper; import com.xzzn.ems.service.IGeneralQueryService; +import com.xzzn.ems.service.InfluxPointDataWriter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -20,8 +23,6 @@ import java.time.temporal.TemporalAdjusters; import java.util.*; import java.util.stream.Collectors; -import javax.lang.model.util.ElementScanner6; - /** * 综合查询 服务层实现 * @@ -34,23 +35,29 @@ public class GeneralQueryServiceImpl implements IGeneralQueryService @Autowired private EmsPointMatchMapper emsPointMatchMapper; @Autowired + private EmsPointConfigMapper emsPointConfigMapper; + @Autowired private EmsDevicesSettingMapper emsDevicesSettingMapper; @Autowired private EmsBatteryDataMonthMapper emsBatteryDataMonthMapper; + @Autowired + private InfluxPointDataWriter influxPointDataWriter; @Override - public List getPointNameList(PointNameRequest request) { + public List getPointNameList(PointNameRequest request) { List siteIds = request.getSiteIds(); if (siteIds == null || siteIds.isEmpty()) { return Collections.emptyList(); } String deviceCategory = request.getDeviceCategory(); - if (deviceCategory == null) { + String deviceId = request.getDeviceId(); + if ((deviceCategory == null || "".equals(deviceCategory.trim())) + && (deviceId == null || "".equals(deviceId.trim()))) { return Collections.emptyList(); } - return emsPointMatchMapper.getPointNameList(siteIds,deviceCategory,request.getPointName()); + return emsPointConfigMapper.getPointNameList(siteIds, deviceCategory, deviceId, request.getPointName()); } @Override @@ -81,48 +88,142 @@ public class GeneralQueryServiceImpl implements IGeneralQueryService @Override public List getPointValueList(PointNameRequest request){ - List result = new ArrayList<>(); - List querySiteIds = new ArrayList<>(); - List siteIds = request.getSiteIds(); - String deviceCategory = request.getDeviceCategory(); - // 根据入参获取点位对应的表和字段 - List matchInfo = emsPointMatchMapper.getMatchInfo(siteIds,deviceCategory,request.getPointName()); - if (matchInfo == null || matchInfo.size() == 0) { - return result; - } else { - for (EmsPointMatch emsPointMatch : matchInfo) { - querySiteIds.add(emsPointMatch.getSiteId()); - } + if (siteIds == null || siteIds.isEmpty()) { + return Collections.emptyList(); } - // 单体电池特殊校验 - Map> siteDeviceMap = request.getSiteDeviceMap(); - if (DeviceCategory.BATTERY.getCode().equals(deviceCategory) && (siteDeviceMap == null || siteDeviceMap.size() == 0)) { - return result; + String deviceCategory = request.getDeviceCategory(); + String requestDeviceId = request.getDeviceId(); + if ((deviceCategory == null || "".equals(deviceCategory.trim())) + && (requestDeviceId == null || "".equals(requestDeviceId.trim())) + ) { + return Collections.emptyList(); + } + + List pointNames = resolvePointNames(request); + if (pointNames.isEmpty()) { + return Collections.emptyList(); } // 处理时间范围,如果未传根据数据单位设默认值 dealDataTime(request); + Date startDate = DateUtils.dateTime(DateUtils.YYYY_MM_DD_HH_MM_SS, request.getStartDate()); + Date endDate = DateUtils.dateTime(DateUtils.YYYY_MM_DD_HH_MM_SS, request.getEndDate()); + if (request.getDataUnit() == 3) { + endDate = DateUtils.adjustToEndOfDay(request.getEndDate()); + } - try { - // 不同的site_id根据设备类型和字段,默认取第一个匹配到的表和表字段只会有一个, - String tableName = matchInfo.get(0).getMatchTable(); - String tableField = matchInfo.get(0).getMatchField(); - Long dataType = matchInfo.get(0).getDataType(); - if (DeviceCategory.BATTERY.getCode().equals(deviceCategory)) { - // 单体电池数据特殊处理 - result = generalQueryBatteryData(querySiteIds,tableName,tableField,request,deviceCategory,dataType); - } else { - // 其他设备数据 - result = generalQueryCommonData(querySiteIds,tableName,tableField,request,deviceCategory,dataType); + List selectedDeviceIds = resolveSelectedDeviceIds(request); + List pointConfigs = emsPointConfigMapper.getConfigListForGeneralQuery( + siteIds, deviceCategory, pointNames, selectedDeviceIds + ); + if (pointConfigs == null || pointConfigs.isEmpty()) { + return Collections.emptyList(); + } + + List dataVoList = new ArrayList<>(); + for (EmsPointConfig pointConfig : pointConfigs) { + dataVoList.addAll(queryPointCurve(pointConfig, request.getDataUnit(), startDate, endDate)); + } + + if (dataVoList.isEmpty()) { + return Collections.emptyList(); + } + + if (request.getDataUnit() == 1) { + try { + dataVoList = dealWithMinutesData(siteIds, dataVoList, deviceCategory, request.getStartDate(), request.getEndDate()); + } catch (ParseException e) { + throw new RuntimeException(e); } - } catch (ParseException e) { - throw new RuntimeException(e); + } + return convertCommonToResultList(dataVoList, 1L); + } + + private List resolvePointNames(PointNameRequest request) { + List names = new ArrayList<>(); + if (request.getPointNames() != null && !request.getPointNames().isEmpty()) { + names.addAll(request.getPointNames()); + } else if (request.getPointName() != null && !"".equals(request.getPointName().trim())) { + names.addAll(Arrays.stream(request.getPointName().split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList())); + } + return names.stream().distinct().collect(Collectors.toList()); + } + + private List resolveSelectedDeviceIds(PointNameRequest request) { + List selected = new ArrayList<>(); + if (request.getDeviceId() != null && !"".equals(request.getDeviceId().trim())) { + selected.add(request.getDeviceId().trim()); + } + Map> siteDeviceMap = request.getSiteDeviceMap(); + if (siteDeviceMap != null && !siteDeviceMap.isEmpty()) { + for (List devices : siteDeviceMap.values()) { + if (devices == null) { + continue; + } + for (String deviceId : devices) { + if (deviceId != null && !"".equals(deviceId.trim())) { + selected.add(deviceId.trim()); + } + } + } + } + return selected.stream().distinct().collect(Collectors.toList()); + } + + private List queryPointCurve(EmsPointConfig config, int dataUnit, Date startDate, Date endDate) { + if (config == null || config.getSiteId() == null || config.getDeviceId() == null || config.getDataKey() == null) { + return Collections.emptyList(); + } + List values = influxPointDataWriter.queryCurveData( + config.getSiteId(), config.getDeviceId(), config.getDataKey(), startDate, endDate + ); + if (values == null || values.isEmpty()) { + return Collections.emptyList(); + } + + // 每个时间桶取该桶最后一条值 + Map latestByBucket = new LinkedHashMap<>(); + for (InfluxPointDataWriter.PointValue value : values) { + latestByBucket.put(formatByDataUnit(value.getDataTime(), dataUnit), value.getPointValue()); + } + + List result = new ArrayList<>(); + String displayDeviceId = buildDisplayDeviceId(config); + for (Map.Entry entry : latestByBucket.entrySet()) { + GeneralQueryDataVo vo = new GeneralQueryDataVo(); + vo.setSiteId(config.getSiteId()); + vo.setDeviceId(displayDeviceId); + vo.setValueDate(entry.getKey()); + vo.setPointValue(entry.getValue()); + result.add(vo); } return result; } + private String buildDisplayDeviceId(EmsPointConfig config) { + String pointName = config.getPointName() == null || "".equals(config.getPointName().trim()) + ? config.getDataKey() : config.getPointName().trim(); + return config.getDeviceId() + "-" + pointName; + } + + private String formatByDataUnit(Date dataTime, int dataUnit) { + if (dataTime == null) { + return ""; + } + if (dataUnit == 3) { + return new SimpleDateFormat("yyyy-MM-dd").format(dataTime); + } + if (dataUnit == 2) { + return new SimpleDateFormat("yyyy-MM-dd HH:00").format(dataTime); + } + return new SimpleDateFormat("yyyy-MM-dd HH:mm:00").format(dataTime); + } + private List generalQueryCommonData(List querySiteIds, String tableName, String tableField, PointNameRequest request, String deviceCategory, Long dataType) throws ParseException { diff --git a/ems-system/src/main/resources/mapper/ems/EmsPointEnumMatchMapper.xml b/ems-system/src/main/resources/mapper/ems/EmsPointEnumMatchMapper.xml index 7ba0254..2f12e8d 100644 --- a/ems-system/src/main/resources/mapper/ems/EmsPointEnumMatchMapper.xml +++ b/ems-system/src/main/resources/mapper/ems/EmsPointEnumMatchMapper.xml @@ -119,4 +119,47 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" - \ No newline at end of file + + + + delete from ems_point_enum_match + where site_id = #{siteId} + + + + insert into ems_point_enum_match ( + match_field, + site_id, + device_category, + enum_code, + enum_name, + enum_desc, + data_enum_code, + create_by, + create_time, + update_by, + update_time, + remark + ) + select + match_field, + #{targetSiteId}, + device_category, + enum_code, + enum_name, + enum_desc, + data_enum_code, + #{operName}, + now(), + #{operName}, + now(), + remark + from ems_point_enum_match + where site_id = #{templateSiteId} + + + diff --git a/ems-system/src/main/resources/mapper/ems/EmsPointMatchMapper.xml b/ems-system/src/main/resources/mapper/ems/EmsPointMatchMapper.xml index 6abab7a..1a964e4 100644 --- a/ems-system/src/main/resources/mapper/ems/EmsPointMatchMapper.xml +++ b/ems-system/src/main/resources/mapper/ems/EmsPointMatchMapper.xml @@ -570,4 +570,74 @@ - \ No newline at end of file + + + + delete from ems_point_match + where site_id = #{siteId} + + + + insert into ems_point_match ( + point_name, + match_table, + match_field, + site_id, + device_category, + data_point, + data_point_name, + data_device, + data_unit, + ip_address, + ip_port, + data_type, + need_diff_device_id, + create_by, + create_time, + update_by, + update_time, + remark, + is_alarm, + device_id, + a, + k, + b + ) + select + point_name, + match_table, + match_field, + #{targetSiteId}, + device_category, + data_point, + data_point_name, + data_device, + data_unit, + ip_address, + ip_port, + data_type, + need_diff_device_id, + #{operName}, + now(), + #{operName}, + now(), + remark, + is_alarm, + device_id, + a, + k, + b + from ems_point_match + where site_id = #{templateSiteId} + + + + +