From 614cca4297f93b29805f5b92a62d98b171bd58b5 Mon Sep 17 00:00:00 2001 From: mashili Date: Tue, 28 Oct 2025 14:38:18 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1=E8=BF=81?= =?UTF-8?q?=E7=A7=BB=E5=88=B0quartz=E6=A8=A1=E5=9D=97=E4=B8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ems-quartz/pom.xml | 4 ++ .../com/xzzn/quartz/task}/ModbusPoller.java | 47 ++++++++++--------- .../com/xzzn/quartz/task}/StrategyPoller.java | 15 ++---- .../ems/mapper/EmsStrategyRunningMapper.java | 7 +++ .../mapper/ems/EmsStrategyRunningMapper.xml | 21 +++++++++ 5 files changed, 62 insertions(+), 32 deletions(-) rename {ems-framework/src/main/java/com/xzzn/framework/scheduler => ems-quartz/src/main/java/com/xzzn/quartz/task}/ModbusPoller.java (83%) rename {ems-framework/src/main/java/com/xzzn/framework/scheduler => ems-quartz/src/main/java/com/xzzn/quartz/task}/StrategyPoller.java (94%) diff --git a/ems-quartz/pom.xml b/ems-quartz/pom.xml index 791c25d..df521b7 100644 --- a/ems-quartz/pom.xml +++ b/ems-quartz/pom.xml @@ -34,6 +34,10 @@ com.xzzn ems-common + + com.xzzn + ems-framework + diff --git a/ems-framework/src/main/java/com/xzzn/framework/scheduler/ModbusPoller.java b/ems-quartz/src/main/java/com/xzzn/quartz/task/ModbusPoller.java similarity index 83% rename from ems-framework/src/main/java/com/xzzn/framework/scheduler/ModbusPoller.java rename to ems-quartz/src/main/java/com/xzzn/quartz/task/ModbusPoller.java index 8133d7b..ea4a175 100644 --- a/ems-framework/src/main/java/com/xzzn/framework/scheduler/ModbusPoller.java +++ b/ems-quartz/src/main/java/com/xzzn/quartz/task/ModbusPoller.java @@ -1,5 +1,6 @@ -package com.xzzn.framework.scheduler; +package com.xzzn.quartz.task; +import com.xzzn.common.enums.DeviceRunningStatus; import com.xzzn.ems.domain.EmsDevicesSetting; import com.xzzn.ems.mapper.EmsDevicesSettingMapper; import com.xzzn.ems.mapper.EmsMqttMessageMapper; @@ -11,15 +12,14 @@ import org.eclipse.paho.client.mqttv3.MqttException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Async; -import org.springframework.scheduling.annotation.EnableScheduling; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.List; -@Component -@EnableScheduling +/** + * 轮询设备-通过modbus协议读取数据 + */ +@Component("modbusPoller") public class ModbusPoller { private static final Logger logger = LoggerFactory.getLogger(ModbusPoller.class); @@ -38,9 +38,7 @@ public class ModbusPoller { public ModbusPoller(MqttLifecycleManager mqttLifecycleManager) { this.mqttLifecycleManager = mqttLifecycleManager; } - // 每5分钟触发(支持cron表达式动态配置) - @Scheduled(cron = "${modbus.poll.interval}") - @Async("modbusTaskExecutor") + public void pollAllDevices() { logger.info("开始执行Modbus设备轮询..."); @@ -48,16 +46,10 @@ public class ModbusPoller { EmsDevicesSetting device = activeDevices.get(0); try { - processData(device,null); + //pollSingleDevice(device); } catch (Exception e) { logger.error("调度设备{}任务失败", device.getId(), e); } - /* - try { - pollSingleDevice(device); - } catch (Exception e) { - logger.error("调度设备{}任务失败", device.getId(), e); - }*/ /*activeDevices.forEach(device -> { try { CompletableFuture.runAsync(() -> pollSingleDevice(device)) @@ -99,20 +91,33 @@ public class ModbusPoller { } } - // 处理获取到的数据,发到mqtt服务上 + // 处理获取到的数据 private void processData(EmsDevicesSetting device, int[] data) throws MqttException { - /*if (data == null || data.length == 0) { + if (data == null || data.length == 0) { logger.warn("设备{}返回空数据", device.getId()); + // 设备读取不到-设置设备故障 + device.setRunningStatus(DeviceRunningStatus.FAULT.getCode()); return; - }*/ + } - /*// 数据处理逻辑 + // 数据处理逻辑 StringBuilder sb = new StringBuilder(); sb.append("设备[").append(device.getDeviceName()).append("]数据: "); for (int i = 0; i < data.length; i++) { sb.append("R").append(i).append("=").append(data[i]).append(" "); } - logger.info(sb.toString());*/ + + String message = sb.toString(); + logger.info(sb.toString()); + /* + String siteId = device.getSiteId(); + if (siteId.startsWith("021_DDS")) { + ems_devices_setting + + dDSDataProcessService.handleDdsData(message); + } else if (siteId.startsWith("021_FXX")) { + fXXDataProcessService.handleFxData(message); + }*/ // 测试发送mqtt /* EmsMqttMessage msg = emsMqttMessageMapper.selectEmsMqttMessageById(1L); diff --git a/ems-framework/src/main/java/com/xzzn/framework/scheduler/StrategyPoller.java b/ems-quartz/src/main/java/com/xzzn/quartz/task/StrategyPoller.java similarity index 94% rename from ems-framework/src/main/java/com/xzzn/framework/scheduler/StrategyPoller.java rename to ems-quartz/src/main/java/com/xzzn/quartz/task/StrategyPoller.java index 235af73..b605fda 100644 --- a/ems-framework/src/main/java/com/xzzn/framework/scheduler/StrategyPoller.java +++ b/ems-quartz/src/main/java/com/xzzn/quartz/task/StrategyPoller.java @@ -1,4 +1,4 @@ -package com.xzzn.framework.scheduler; +package com.xzzn.quartz.task; import com.alibaba.fastjson2.JSON; import com.xzzn.common.utils.DateUtils; @@ -15,9 +15,6 @@ import com.xzzn.framework.web.service.ModbusService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Async; -import org.springframework.scheduling.annotation.EnableScheduling; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.time.LocalDate; @@ -28,8 +25,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -@Component -@EnableScheduling +@Component("strategyPoller") public class StrategyPoller { private static final Logger logger = LoggerFactory.getLogger(StrategyPoller.class); @@ -56,12 +52,10 @@ public class StrategyPoller { public StrategyPoller(MqttLifecycleManager mqttLifecycleManager) { this.mqttLifecycleManager = mqttLifecycleManager; } - // 每1分钟触发(支持cron表达式动态配置) - @Scheduled(cron = "0 */1 * * * *") - @Async("strategyTaskExecutor") + public void pollAllDevices() { logger.info("开始执行策略数据轮询..."); - List strategyRunningVoList = emsStrategyRunningMapper.getRunningList(null); + List strategyRunningVoList = emsStrategyRunningMapper.getPendingPollerStrategy(null); strategyRunningVoList.forEach(strategyVo -> { try { CompletableFuture.runAsync(() -> { @@ -93,7 +87,6 @@ public class StrategyPoller { } // 策略数据下发-下方格式暂无 - logger.info("策略下发结束"); } diff --git a/ems-system/src/main/java/com/xzzn/ems/mapper/EmsStrategyRunningMapper.java b/ems-system/src/main/java/com/xzzn/ems/mapper/EmsStrategyRunningMapper.java index 8d52260..1bd6a0e 100644 --- a/ems-system/src/main/java/com/xzzn/ems/mapper/EmsStrategyRunningMapper.java +++ b/ems-system/src/main/java/com/xzzn/ems/mapper/EmsStrategyRunningMapper.java @@ -3,6 +3,7 @@ package com.xzzn.ems.mapper; import java.util.List; import com.xzzn.ems.domain.EmsStrategyRunning; import com.xzzn.ems.domain.vo.StrategyRunningVo; +import org.apache.ibatis.annotations.Param; /** * 策略运行Mapper接口 @@ -68,4 +69,10 @@ public interface EmsStrategyRunningMapper // 根据主策略id、辅助策略id、siteId 获取运行策略 public EmsStrategyRunning selectEmsStrategyRunning(EmsStrategyRunning emsStrategyRunning); + + // 更新站点策略为运行 + public void updateStatusRunning(@Param("siteId") String siteId,@Param("status") String status); + + // 获取运行中的策略 + public List getPendingPollerStrategy(String siteId); } diff --git a/ems-system/src/main/resources/mapper/ems/EmsStrategyRunningMapper.xml b/ems-system/src/main/resources/mapper/ems/EmsStrategyRunningMapper.xml index 23c859b..6560e2e 100644 --- a/ems-system/src/main/resources/mapper/ems/EmsStrategyRunningMapper.xml +++ b/ems-system/src/main/resources/mapper/ems/EmsStrategyRunningMapper.xml @@ -114,4 +114,25 @@ and auxiliary_strategy_id = #{auxiliaryStrategyId} and `status` = 1 + + + update ems_strategy_running set `status`= #{status} where site_id = #{siteId} + + + \ No newline at end of file