策略数据下发大概逻辑
This commit is contained in:
@ -24,4 +24,20 @@ public class AsyncConfig {
|
|||||||
executor.initialize();
|
executor.initialize();
|
||||||
return executor;
|
return executor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 策略下方定时任务
|
||||||
|
*/
|
||||||
|
/*@Bean("strategyTaskExecutor")
|
||||||
|
public Executor strategyTaskExecutor() {
|
||||||
|
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||||
|
executor.setCorePoolSize(5);
|
||||||
|
executor.setMaxPoolSize(10);
|
||||||
|
executor.setQueueCapacity(100);
|
||||||
|
executor.setKeepAliveSeconds(300);
|
||||||
|
executor.setThreadNamePrefix("StrategyPoller-");
|
||||||
|
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
|
||||||
|
executor.initialize();
|
||||||
|
return executor;
|
||||||
|
}*/
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,161 @@
|
|||||||
|
package com.xzzn.framework.scheduler;
|
||||||
|
|
||||||
|
import com.xzzn.common.utils.DateUtils;
|
||||||
|
import com.xzzn.common.utils.StringUtils;
|
||||||
|
import com.xzzn.ems.domain.EmsStrategyCurve;
|
||||||
|
import com.xzzn.ems.domain.EmsStrategyTimeConfig;
|
||||||
|
import com.xzzn.ems.domain.vo.StrategyRunningVo;
|
||||||
|
import com.xzzn.ems.mapper.*;
|
||||||
|
import com.xzzn.framework.manager.ModbusConnectionManager;
|
||||||
|
import com.xzzn.framework.manager.MqttLifecycleManager;
|
||||||
|
import com.xzzn.framework.web.service.ModbusService;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.scheduling.annotation.Async;
|
||||||
|
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||||
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.time.LocalDate;
|
||||||
|
import java.time.YearMonth;
|
||||||
|
import java.time.format.DateTimeFormatter;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@EnableScheduling
|
||||||
|
public class StrategyPoller {
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(StrategyPoller.class);
|
||||||
|
|
||||||
|
private final MqttLifecycleManager mqttLifecycleManager;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private ModbusConnectionManager connectionManager;
|
||||||
|
@Autowired
|
||||||
|
private ModbusService modbusService;
|
||||||
|
@Autowired
|
||||||
|
private EmsDevicesSettingMapper deviceRepo;
|
||||||
|
@Autowired
|
||||||
|
private EmsMqttMessageMapper emsMqttMessageMapper;
|
||||||
|
@Autowired
|
||||||
|
private EmsStrategyRunningMapper emsStrategyRunningMapper;
|
||||||
|
@Autowired
|
||||||
|
private EmsStrategyTempMapper emsStrategyTempMapper;
|
||||||
|
@Autowired
|
||||||
|
private EmsStrategyTimeConfigMapper emsStrategyTimeConfigMapper;
|
||||||
|
@Autowired
|
||||||
|
private EmsStrategyCurveMapper emsStrategyCurveMapper;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
public StrategyPoller(MqttLifecycleManager mqttLifecycleManager) {
|
||||||
|
this.mqttLifecycleManager = mqttLifecycleManager;
|
||||||
|
}
|
||||||
|
// 每1分钟触发(支持cron表达式动态配置)
|
||||||
|
//@Scheduled(cron = "0 */5 * * * *")
|
||||||
|
//@Async("strategyTaskExecutor")
|
||||||
|
public void pollAllDevices() {
|
||||||
|
logger.info("开始执行策略数据轮询...");
|
||||||
|
List<StrategyRunningVo> strategyRunningVoList = emsStrategyRunningMapper.getRunningList(null);
|
||||||
|
strategyRunningVoList.forEach(strategyVo -> {
|
||||||
|
try {
|
||||||
|
CompletableFuture.runAsync(() -> {
|
||||||
|
processData(strategyVo);
|
||||||
|
})
|
||||||
|
.exceptionally(e -> {
|
||||||
|
logger.error("策略{}轮询异常", strategyVo.getId(), e);
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("策略下方{}任务失败", strategyVo.getId(), e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
/*
|
||||||
|
try {
|
||||||
|
pollSingleDevice(device);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("调度设备{}任务失败", device.getId(), e);
|
||||||
|
}*/
|
||||||
|
/*activeDevices.forEach(device -> {
|
||||||
|
try {
|
||||||
|
CompletableFuture.runAsync(() -> processData(device))
|
||||||
|
.exceptionally(e -> {
|
||||||
|
logger.error("设备{}轮询异常", device.getId(), e);
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("调度设备{}任务失败", device.getId(), e);
|
||||||
|
}
|
||||||
|
});*/
|
||||||
|
}
|
||||||
|
// 处理获取到的数据,发到mqtt服务上
|
||||||
|
private void processData(StrategyRunningVo strategyVo) {
|
||||||
|
logger.info("策略下发数据处理开始");
|
||||||
|
// 根据运行策略获取主副策略的模板数据
|
||||||
|
Long minStrategyId = strategyVo.getMainStrategyId();
|
||||||
|
Long auxStrategyId = strategyVo.getAuxStrategyId();
|
||||||
|
String siteId = strategyVo.getSiteId();
|
||||||
|
// 处理主策略数据
|
||||||
|
if (minStrategyId != null && StringUtils.isNotBlank(siteId)) {
|
||||||
|
// 获取当前策略的所有模板
|
||||||
|
List<Map<String, String>> temps = emsStrategyTempMapper.getTempNameList(minStrategyId,siteId);
|
||||||
|
if (temps != null && temps.size() > 0) {
|
||||||
|
for (Map<String, String> temp : temps) {
|
||||||
|
String tempId = temp.get("templateId");
|
||||||
|
List<EmsStrategyTimeConfig> timeConfigs = emsStrategyTimeConfigMapper.getAllTimeConfigByTempId(tempId);
|
||||||
|
if (timeConfigs != null && timeConfigs.size() > 0) {
|
||||||
|
for (EmsStrategyTimeConfig timeConfig : timeConfigs) {
|
||||||
|
EmsStrategyCurve curve = new EmsStrategyCurve();
|
||||||
|
curve.setStrategyId(minStrategyId);
|
||||||
|
curve.setSiteId(strategyVo.getSiteId());
|
||||||
|
curve.setTemplateId(tempId);
|
||||||
|
curve.setCreateBy("system");
|
||||||
|
curve.setCreateTime(DateUtils.getNowDate());
|
||||||
|
curve.setUpdateBy("system");
|
||||||
|
curve.setUpdateTime(DateUtils.getNowDate());
|
||||||
|
// 时间设置
|
||||||
|
String[] dateList= dealWithMonth(Integer.parseInt(timeConfig.getMonth().toString()));
|
||||||
|
curve.setStartDate(DateUtils.dateTime(DateUtils.YYYY_MM_DD,dateList[0]));
|
||||||
|
curve.setEndDate(DateUtils.dateTime(DateUtils.YYYY_MM_DD,dateList[1]));
|
||||||
|
// powerData暂时不处理
|
||||||
|
|
||||||
|
// 策略数据下发-下方格式暂无
|
||||||
|
|
||||||
|
// 记录推送记录
|
||||||
|
emsStrategyCurveMapper.insertEmsStrategyCurve(curve);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// 处理副策略数据
|
||||||
|
if (auxStrategyId != null && StringUtils.isNotBlank(siteId)) {
|
||||||
|
|
||||||
|
}
|
||||||
|
logger.info("策略下发结束");
|
||||||
|
}
|
||||||
|
|
||||||
|
private String[] dealWithMonth(int month) {
|
||||||
|
// 获取当前年份
|
||||||
|
int currentYear = LocalDate.now().getYear();
|
||||||
|
|
||||||
|
// 创建YearMonth对象表示当年指定的月份
|
||||||
|
YearMonth yearMonth = YearMonth.of(currentYear, month);
|
||||||
|
|
||||||
|
// 获取当月的第一天和最后一天
|
||||||
|
LocalDate firstDay = yearMonth.atDay(1);
|
||||||
|
LocalDate lastDay = yearMonth.atEndOfMonth();
|
||||||
|
|
||||||
|
// 定义日期格式(年月日)
|
||||||
|
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
|
||||||
|
|
||||||
|
// 格式化日期
|
||||||
|
return new String[]{
|
||||||
|
firstDay.format(formatter),
|
||||||
|
lastDay.format(formatter)
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@ -65,4 +65,7 @@ public interface EmsStrategyTimeConfigMapper
|
|||||||
|
|
||||||
// 清空该月的模板信息
|
// 清空该月的模板信息
|
||||||
public void cleanTemplateId(String templateId);
|
public void cleanTemplateId(String templateId);
|
||||||
|
|
||||||
|
// 获取该策略下的时间配置
|
||||||
|
List<EmsStrategyTimeConfig> getAllTimeConfigByTempId(String templateId);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -97,8 +97,10 @@
|
|||||||
from ems_strategy_running t
|
from ems_strategy_running t
|
||||||
LEFT JOIN ems_strategy main on t.main_strategy_id = main.id
|
LEFT JOIN ems_strategy main on t.main_strategy_id = main.id
|
||||||
LEFT JOIN ems_strategy aux on t.auxiliary_strategy_id = aux.id
|
LEFT JOIN ems_strategy aux on t.auxiliary_strategy_id = aux.id
|
||||||
where t.site_id = #{siteId}
|
where t.`status` != 4
|
||||||
and t.`status` != 4
|
<if test="siteId != null and siteId != ''">
|
||||||
|
and t.site_id = #{siteId}
|
||||||
|
</if>
|
||||||
</select>
|
</select>
|
||||||
|
|
||||||
<update id="stopEmsStrategyRunning" parameterType="Long">
|
<update id="stopEmsStrategyRunning" parameterType="Long">
|
||||||
|
|||||||
@ -110,4 +110,9 @@
|
|||||||
<update id="cleanTemplateId" parameterType="String">
|
<update id="cleanTemplateId" parameterType="String">
|
||||||
update ems_strategy_time_config set template_id = '' where template_id = #{templateId}
|
update ems_strategy_time_config set template_id = '' where template_id = #{templateId}
|
||||||
</update>
|
</update>
|
||||||
|
|
||||||
|
<select id="getAllTimeConfigByTempId" parameterType="String" resultMap="EmsStrategyTimeConfigResult">
|
||||||
|
<include refid="selectEmsStrategyTimeConfigVo"/>
|
||||||
|
where template_id = #{templateId}
|
||||||
|
</select>
|
||||||
</mapper>
|
</mapper>
|
||||||
Reference in New Issue
Block a user