diff --git a/ems-quartz/pom.xml b/ems-quartz/pom.xml
index 791c25d..df521b7 100644
--- a/ems-quartz/pom.xml
+++ b/ems-quartz/pom.xml
@@ -34,6 +34,10 @@
com.xzzn
ems-common
+
+ com.xzzn
+ ems-framework
+
diff --git a/ems-framework/src/main/java/com/xzzn/framework/scheduler/ModbusPoller.java b/ems-quartz/src/main/java/com/xzzn/quartz/task/ModbusPoller.java
similarity index 83%
rename from ems-framework/src/main/java/com/xzzn/framework/scheduler/ModbusPoller.java
rename to ems-quartz/src/main/java/com/xzzn/quartz/task/ModbusPoller.java
index 8133d7b..ea4a175 100644
--- a/ems-framework/src/main/java/com/xzzn/framework/scheduler/ModbusPoller.java
+++ b/ems-quartz/src/main/java/com/xzzn/quartz/task/ModbusPoller.java
@@ -1,5 +1,6 @@
-package com.xzzn.framework.scheduler;
+package com.xzzn.quartz.task;
+import com.xzzn.common.enums.DeviceRunningStatus;
import com.xzzn.ems.domain.EmsDevicesSetting;
import com.xzzn.ems.mapper.EmsDevicesSettingMapper;
import com.xzzn.ems.mapper.EmsMqttMessageMapper;
@@ -11,15 +12,14 @@ import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.scheduling.annotation.Async;
-import org.springframework.scheduling.annotation.EnableScheduling;
-import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
-@Component
-@EnableScheduling
+/**
+ * 轮询设备-通过modbus协议读取数据
+ */
+@Component("modbusPoller")
public class ModbusPoller {
private static final Logger logger = LoggerFactory.getLogger(ModbusPoller.class);
@@ -38,9 +38,7 @@ public class ModbusPoller {
public ModbusPoller(MqttLifecycleManager mqttLifecycleManager) {
this.mqttLifecycleManager = mqttLifecycleManager;
}
- // 每5分钟触发(支持cron表达式动态配置)
- @Scheduled(cron = "${modbus.poll.interval}")
- @Async("modbusTaskExecutor")
+
public void pollAllDevices() {
logger.info("开始执行Modbus设备轮询...");
@@ -48,16 +46,10 @@ public class ModbusPoller {
EmsDevicesSetting device = activeDevices.get(0);
try {
- processData(device,null);
+ //pollSingleDevice(device);
} catch (Exception e) {
logger.error("调度设备{}任务失败", device.getId(), e);
}
- /*
- try {
- pollSingleDevice(device);
- } catch (Exception e) {
- logger.error("调度设备{}任务失败", device.getId(), e);
- }*/
/*activeDevices.forEach(device -> {
try {
CompletableFuture.runAsync(() -> pollSingleDevice(device))
@@ -99,20 +91,33 @@ public class ModbusPoller {
}
}
- // 处理获取到的数据,发到mqtt服务上
+ // 处理获取到的数据
private void processData(EmsDevicesSetting device, int[] data) throws MqttException {
- /*if (data == null || data.length == 0) {
+ if (data == null || data.length == 0) {
logger.warn("设备{}返回空数据", device.getId());
+ // 设备读取不到-设置设备故障
+ device.setRunningStatus(DeviceRunningStatus.FAULT.getCode());
return;
- }*/
+ }
- /*// 数据处理逻辑
+ // 数据处理逻辑
StringBuilder sb = new StringBuilder();
sb.append("设备[").append(device.getDeviceName()).append("]数据: ");
for (int i = 0; i < data.length; i++) {
sb.append("R").append(i).append("=").append(data[i]).append(" ");
}
- logger.info(sb.toString());*/
+
+ String message = sb.toString();
+ logger.info(sb.toString());
+ /*
+ String siteId = device.getSiteId();
+ if (siteId.startsWith("021_DDS")) {
+ ems_devices_setting
+
+ dDSDataProcessService.handleDdsData(message);
+ } else if (siteId.startsWith("021_FXX")) {
+ fXXDataProcessService.handleFxData(message);
+ }*/
// 测试发送mqtt
/* EmsMqttMessage msg = emsMqttMessageMapper.selectEmsMqttMessageById(1L);
diff --git a/ems-framework/src/main/java/com/xzzn/framework/scheduler/StrategyPoller.java b/ems-quartz/src/main/java/com/xzzn/quartz/task/StrategyPoller.java
similarity index 94%
rename from ems-framework/src/main/java/com/xzzn/framework/scheduler/StrategyPoller.java
rename to ems-quartz/src/main/java/com/xzzn/quartz/task/StrategyPoller.java
index 235af73..b605fda 100644
--- a/ems-framework/src/main/java/com/xzzn/framework/scheduler/StrategyPoller.java
+++ b/ems-quartz/src/main/java/com/xzzn/quartz/task/StrategyPoller.java
@@ -1,4 +1,4 @@
-package com.xzzn.framework.scheduler;
+package com.xzzn.quartz.task;
import com.alibaba.fastjson2.JSON;
import com.xzzn.common.utils.DateUtils;
@@ -15,9 +15,6 @@ import com.xzzn.framework.web.service.ModbusService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.scheduling.annotation.Async;
-import org.springframework.scheduling.annotation.EnableScheduling;
-import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
@@ -28,8 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-@Component
-@EnableScheduling
+@Component("strategyPoller")
public class StrategyPoller {
private static final Logger logger = LoggerFactory.getLogger(StrategyPoller.class);
@@ -56,12 +52,10 @@ public class StrategyPoller {
public StrategyPoller(MqttLifecycleManager mqttLifecycleManager) {
this.mqttLifecycleManager = mqttLifecycleManager;
}
- // 每1分钟触发(支持cron表达式动态配置)
- @Scheduled(cron = "0 */1 * * * *")
- @Async("strategyTaskExecutor")
+
public void pollAllDevices() {
logger.info("开始执行策略数据轮询...");
- List strategyRunningVoList = emsStrategyRunningMapper.getRunningList(null);
+ List strategyRunningVoList = emsStrategyRunningMapper.getPendingPollerStrategy(null);
strategyRunningVoList.forEach(strategyVo -> {
try {
CompletableFuture.runAsync(() -> {
@@ -93,7 +87,6 @@ public class StrategyPoller {
}
// 策略数据下发-下方格式暂无
-
logger.info("策略下发结束");
}
diff --git a/ems-system/src/main/java/com/xzzn/ems/mapper/EmsStrategyRunningMapper.java b/ems-system/src/main/java/com/xzzn/ems/mapper/EmsStrategyRunningMapper.java
index 8d52260..1bd6a0e 100644
--- a/ems-system/src/main/java/com/xzzn/ems/mapper/EmsStrategyRunningMapper.java
+++ b/ems-system/src/main/java/com/xzzn/ems/mapper/EmsStrategyRunningMapper.java
@@ -3,6 +3,7 @@ package com.xzzn.ems.mapper;
import java.util.List;
import com.xzzn.ems.domain.EmsStrategyRunning;
import com.xzzn.ems.domain.vo.StrategyRunningVo;
+import org.apache.ibatis.annotations.Param;
/**
* 策略运行Mapper接口
@@ -68,4 +69,10 @@ public interface EmsStrategyRunningMapper
// 根据主策略id、辅助策略id、siteId 获取运行策略
public EmsStrategyRunning selectEmsStrategyRunning(EmsStrategyRunning emsStrategyRunning);
+
+ // 更新站点策略为运行
+ public void updateStatusRunning(@Param("siteId") String siteId,@Param("status") String status);
+
+ // 获取运行中的策略
+ public List getPendingPollerStrategy(String siteId);
}
diff --git a/ems-system/src/main/resources/mapper/ems/EmsStrategyRunningMapper.xml b/ems-system/src/main/resources/mapper/ems/EmsStrategyRunningMapper.xml
index 23c859b..6560e2e 100644
--- a/ems-system/src/main/resources/mapper/ems/EmsStrategyRunningMapper.xml
+++ b/ems-system/src/main/resources/mapper/ems/EmsStrategyRunningMapper.xml
@@ -114,4 +114,25 @@
and auxiliary_strategy_id = #{auxiliaryStrategyId}
and `status` = 1
+
+
+ update ems_strategy_running set `status`= #{status} where site_id = #{siteId}
+
+
+
\ No newline at end of file