diff --git a/ems-framework/src/main/java/com/xzzn/framework/config/AsyncConfig.java b/ems-framework/src/main/java/com/xzzn/framework/config/AsyncConfig.java index d80ce10..d6bead0 100644 --- a/ems-framework/src/main/java/com/xzzn/framework/config/AsyncConfig.java +++ b/ems-framework/src/main/java/com/xzzn/framework/config/AsyncConfig.java @@ -24,4 +24,20 @@ public class AsyncConfig { executor.initialize(); return executor; } + + /** + * 策略下方定时任务 + */ + /*@Bean("strategyTaskExecutor") + public Executor strategyTaskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(5); + executor.setMaxPoolSize(10); + executor.setQueueCapacity(100); + executor.setKeepAliveSeconds(300); + executor.setThreadNamePrefix("StrategyPoller-"); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.initialize(); + return executor; + }*/ } diff --git a/ems-framework/src/main/java/com/xzzn/framework/scheduler/StrategyPoller.java b/ems-framework/src/main/java/com/xzzn/framework/scheduler/StrategyPoller.java new file mode 100644 index 0000000..a8b19d1 --- /dev/null +++ b/ems-framework/src/main/java/com/xzzn/framework/scheduler/StrategyPoller.java @@ -0,0 +1,161 @@ +package com.xzzn.framework.scheduler; + +import com.xzzn.common.utils.DateUtils; +import com.xzzn.common.utils.StringUtils; +import com.xzzn.ems.domain.EmsStrategyCurve; +import com.xzzn.ems.domain.EmsStrategyTimeConfig; +import com.xzzn.ems.domain.vo.StrategyRunningVo; +import com.xzzn.ems.mapper.*; +import com.xzzn.framework.manager.ModbusConnectionManager; +import com.xzzn.framework.manager.MqttLifecycleManager; +import com.xzzn.framework.web.service.ModbusService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.time.LocalDate; +import java.time.YearMonth; +import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +@Component +@EnableScheduling +public class StrategyPoller { + private static final Logger logger = LoggerFactory.getLogger(StrategyPoller.class); + + private final MqttLifecycleManager mqttLifecycleManager; + + @Autowired + private ModbusConnectionManager connectionManager; + @Autowired + private ModbusService modbusService; + @Autowired + private EmsDevicesSettingMapper deviceRepo; + @Autowired + private EmsMqttMessageMapper emsMqttMessageMapper; + @Autowired + private EmsStrategyRunningMapper emsStrategyRunningMapper; + @Autowired + private EmsStrategyTempMapper emsStrategyTempMapper; + @Autowired + private EmsStrategyTimeConfigMapper emsStrategyTimeConfigMapper; + @Autowired + private EmsStrategyCurveMapper emsStrategyCurveMapper; + + @Autowired + public StrategyPoller(MqttLifecycleManager mqttLifecycleManager) { + this.mqttLifecycleManager = mqttLifecycleManager; + } + // 每1分钟触发(支持cron表达式动态配置) + //@Scheduled(cron = "0 */5 * * * *") + //@Async("strategyTaskExecutor") + public void pollAllDevices() { + logger.info("开始执行策略数据轮询..."); + List strategyRunningVoList = emsStrategyRunningMapper.getRunningList(null); + strategyRunningVoList.forEach(strategyVo -> { + try { + CompletableFuture.runAsync(() -> { + processData(strategyVo); + }) + .exceptionally(e -> { + logger.error("策略{}轮询异常", strategyVo.getId(), e); + return null; + }); + } catch (Exception e) { + logger.error("策略下方{}任务失败", strategyVo.getId(), e); + } + }); + /* + try { + pollSingleDevice(device); + } catch (Exception e) { + logger.error("调度设备{}任务失败", device.getId(), e); + }*/ + /*activeDevices.forEach(device -> { + try { + CompletableFuture.runAsync(() -> processData(device)) + .exceptionally(e -> { + logger.error("设备{}轮询异常", device.getId(), e); + return null; + }); + } catch (Exception e) { + logger.error("调度设备{}任务失败", device.getId(), e); + } + });*/ + } + // 处理获取到的数据,发到mqtt服务上 + private void processData(StrategyRunningVo strategyVo) { + logger.info("策略下发数据处理开始"); + // 根据运行策略获取主副策略的模板数据 + Long minStrategyId = strategyVo.getMainStrategyId(); + Long auxStrategyId = strategyVo.getAuxStrategyId(); + String siteId = strategyVo.getSiteId(); + // 处理主策略数据 + if (minStrategyId != null && StringUtils.isNotBlank(siteId)) { + // 获取当前策略的所有模板 + List> temps = emsStrategyTempMapper.getTempNameList(minStrategyId,siteId); + if (temps != null && temps.size() > 0) { + for (Map temp : temps) { + String tempId = temp.get("templateId"); + List timeConfigs = emsStrategyTimeConfigMapper.getAllTimeConfigByTempId(tempId); + if (timeConfigs != null && timeConfigs.size() > 0) { + for (EmsStrategyTimeConfig timeConfig : timeConfigs) { + EmsStrategyCurve curve = new EmsStrategyCurve(); + curve.setStrategyId(minStrategyId); + curve.setSiteId(strategyVo.getSiteId()); + curve.setTemplateId(tempId); + curve.setCreateBy("system"); + curve.setCreateTime(DateUtils.getNowDate()); + curve.setUpdateBy("system"); + curve.setUpdateTime(DateUtils.getNowDate()); + // 时间设置 + String[] dateList= dealWithMonth(Integer.parseInt(timeConfig.getMonth().toString())); + curve.setStartDate(DateUtils.dateTime(DateUtils.YYYY_MM_DD,dateList[0])); + curve.setEndDate(DateUtils.dateTime(DateUtils.YYYY_MM_DD,dateList[1])); + // powerData暂时不处理 + + // 策略数据下发-下方格式暂无 + + // 记录推送记录 + emsStrategyCurveMapper.insertEmsStrategyCurve(curve); + } + } + } + } + } + // 处理副策略数据 + if (auxStrategyId != null && StringUtils.isNotBlank(siteId)) { + + } + logger.info("策略下发结束"); + } + + private String[] dealWithMonth(int month) { + // 获取当前年份 + int currentYear = LocalDate.now().getYear(); + + // 创建YearMonth对象表示当年指定的月份 + YearMonth yearMonth = YearMonth.of(currentYear, month); + + // 获取当月的第一天和最后一天 + LocalDate firstDay = yearMonth.atDay(1); + LocalDate lastDay = yearMonth.atEndOfMonth(); + + // 定义日期格式(年月日) + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + + // 格式化日期 + return new String[]{ + firstDay.format(formatter), + lastDay.format(formatter) + }; + + } + +} \ No newline at end of file diff --git a/ems-system/src/main/java/com/xzzn/ems/mapper/EmsStrategyTimeConfigMapper.java b/ems-system/src/main/java/com/xzzn/ems/mapper/EmsStrategyTimeConfigMapper.java index 8b0d4d4..174b2ae 100644 --- a/ems-system/src/main/java/com/xzzn/ems/mapper/EmsStrategyTimeConfigMapper.java +++ b/ems-system/src/main/java/com/xzzn/ems/mapper/EmsStrategyTimeConfigMapper.java @@ -65,4 +65,7 @@ public interface EmsStrategyTimeConfigMapper // 清空该月的模板信息 public void cleanTemplateId(String templateId); + + // 获取该策略下的时间配置 + List getAllTimeConfigByTempId(String templateId); } diff --git a/ems-system/src/main/resources/mapper/ems/EmsStrategyRunningMapper.xml b/ems-system/src/main/resources/mapper/ems/EmsStrategyRunningMapper.xml index 030e983..23c859b 100644 --- a/ems-system/src/main/resources/mapper/ems/EmsStrategyRunningMapper.xml +++ b/ems-system/src/main/resources/mapper/ems/EmsStrategyRunningMapper.xml @@ -97,8 +97,10 @@ from ems_strategy_running t LEFT JOIN ems_strategy main on t.main_strategy_id = main.id LEFT JOIN ems_strategy aux on t.auxiliary_strategy_id = aux.id - where t.site_id = #{siteId} - and t.`status` != 4 + where t.`status` != 4 + + and t.site_id = #{siteId} + diff --git a/ems-system/src/main/resources/mapper/ems/EmsStrategyTimeConfigMapper.xml b/ems-system/src/main/resources/mapper/ems/EmsStrategyTimeConfigMapper.xml index 08707c1..701e00f 100644 --- a/ems-system/src/main/resources/mapper/ems/EmsStrategyTimeConfigMapper.xml +++ b/ems-system/src/main/resources/mapper/ems/EmsStrategyTimeConfigMapper.xml @@ -110,4 +110,9 @@ update ems_strategy_time_config set template_id = '' where template_id = #{templateId} + + \ No newline at end of file