定时任务迁移到quartz模块下

This commit is contained in:
2025-10-28 14:38:18 +08:00
parent d83af112e7
commit 614cca4297
5 changed files with 62 additions and 32 deletions

View File

@ -1,126 +0,0 @@
package com.xzzn.framework.scheduler;
import com.xzzn.ems.domain.EmsDevicesSetting;
import com.xzzn.ems.mapper.EmsDevicesSettingMapper;
import com.xzzn.ems.mapper.EmsMqttMessageMapper;
import com.xzzn.framework.manager.ModbusConnectionManager;
import com.xzzn.framework.manager.ModbusConnectionWrapper;
import com.xzzn.framework.manager.MqttLifecycleManager;
import com.xzzn.framework.web.service.ModbusService;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@EnableScheduling
public class ModbusPoller {
private static final Logger logger = LoggerFactory.getLogger(ModbusPoller.class);
private final MqttLifecycleManager mqttLifecycleManager;
@Autowired
private ModbusConnectionManager connectionManager;
@Autowired
private ModbusService modbusService;
@Autowired
private EmsDevicesSettingMapper deviceRepo;
@Autowired
private EmsMqttMessageMapper emsMqttMessageMapper;
@Autowired
public ModbusPoller(MqttLifecycleManager mqttLifecycleManager) {
this.mqttLifecycleManager = mqttLifecycleManager;
}
// 每5分钟触发支持cron表达式动态配置
@Scheduled(cron = "${modbus.poll.interval}")
@Async("modbusTaskExecutor")
public void pollAllDevices() {
logger.info("开始执行Modbus设备轮询...");
List<EmsDevicesSetting> activeDevices = deviceRepo.selectEmsDevicesSettingList(null);
EmsDevicesSetting device = activeDevices.get(0);
try {
processData(device,null);
} catch (Exception e) {
logger.error("调度设备{}任务失败", device.getId(), e);
}
/*
try {
pollSingleDevice(device);
} catch (Exception e) {
logger.error("调度设备{}任务失败", device.getId(), e);
}*/
/*activeDevices.forEach(device -> {
try {
CompletableFuture.runAsync(() -> pollSingleDevice(device))
.exceptionally(e -> {
logger.error("设备{}轮询异常", device.getId(), e);
return null;
});
} catch (Exception e) {
logger.error("调度设备{}任务失败", device.getId(), e);
}
});*/
}
private void pollSingleDevice(EmsDevicesSetting device) {
logger.debug("开始轮询设备: {}", device.getSiteId(), device.getDeviceName(), device.getId());
ModbusConnectionWrapper wrapper = null;
try {
// 获取连接
wrapper = connectionManager.getConnection(device);
// 读取保持寄存器
int[] data = modbusService.readHoldingRegisters(
wrapper.getConnection(),
1, //从站ID
10 // 寄存器数量
);
// 处理读取到的数据
processData(device, data);
} catch (Exception e) {
logger.error("轮询设备{}失败: {}", device.getId(), e.getMessage());
// 标记连接为无效
if (wrapper != null) {
wrapper.close();
connectionManager.removeConnection(Integer.parseInt(device.getDeviceId()));
}
throw new RuntimeException("轮询设备失败", e);
}
}
// 处理获取到的数据发到mqtt服务上
private void processData(EmsDevicesSetting device, int[] data) throws MqttException {
/*if (data == null || data.length == 0) {
logger.warn("设备{}返回空数据", device.getId());
return;
}*/
/*// 数据处理逻辑
StringBuilder sb = new StringBuilder();
sb.append("设备[").append(device.getDeviceName()).append("]数据: ");
for (int i = 0; i < data.length; i++) {
sb.append("R").append(i).append("=").append(data[i]).append(" ");
}
logger.info(sb.toString());*/
// 测试发送mqtt
/* EmsMqttMessage msg = emsMqttMessageMapper.selectEmsMqttMessageById(1L);
String dataJson = msg.getMqttMessage();
String topic = msg.getMqttTopic();
logger.info("topic" + topic);
logger.info("dataJson" + dataJson);
// 将设备数据下发到mqtt服务器上
mqttLifecycleManager.publish(topic, dataJson, 0);*/
}
}

View File

@ -1,170 +0,0 @@
package com.xzzn.framework.scheduler;
import com.alibaba.fastjson2.JSON;
import com.xzzn.common.utils.DateUtils;
import com.xzzn.common.utils.StringUtils;
import com.xzzn.ems.domain.EmsStrategyCurve;
import com.xzzn.ems.domain.EmsStrategyTemp;
import com.xzzn.ems.domain.EmsStrategyTimeConfig;
import com.xzzn.ems.domain.vo.StrategyPowerDataVo;
import com.xzzn.ems.domain.vo.StrategyRunningVo;
import com.xzzn.ems.mapper.*;
import com.xzzn.framework.manager.ModbusConnectionManager;
import com.xzzn.framework.manager.MqttLifecycleManager;
import com.xzzn.framework.web.service.ModbusService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.time.YearMonth;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@Component
@EnableScheduling
public class StrategyPoller {
private static final Logger logger = LoggerFactory.getLogger(StrategyPoller.class);
private final MqttLifecycleManager mqttLifecycleManager;
@Autowired
private ModbusConnectionManager connectionManager;
@Autowired
private ModbusService modbusService;
@Autowired
private EmsDevicesSettingMapper deviceRepo;
@Autowired
private EmsMqttMessageMapper emsMqttMessageMapper;
@Autowired
private EmsStrategyRunningMapper emsStrategyRunningMapper;
@Autowired
private EmsStrategyTempMapper emsStrategyTempMapper;
@Autowired
private EmsStrategyTimeConfigMapper emsStrategyTimeConfigMapper;
@Autowired
private EmsStrategyCurveMapper emsStrategyCurveMapper;
@Autowired
public StrategyPoller(MqttLifecycleManager mqttLifecycleManager) {
this.mqttLifecycleManager = mqttLifecycleManager;
}
// 每1分钟触发支持cron表达式动态配置
@Scheduled(cron = "0 */1 * * * *")
@Async("strategyTaskExecutor")
public void pollAllDevices() {
logger.info("开始执行策略数据轮询...");
List<StrategyRunningVo> strategyRunningVoList = emsStrategyRunningMapper.getRunningList(null);
strategyRunningVoList.forEach(strategyVo -> {
try {
CompletableFuture.runAsync(() -> {
processData(strategyVo);
})
.exceptionally(e -> {
logger.error("策略{}轮询异常", strategyVo.getId(), e);
return null;
});
} catch (Exception e) {
logger.error("策略下方{}任务失败", strategyVo.getId(), e);
}
});
}
// 处理获取到的数据发到mqtt服务上
private void processData(StrategyRunningVo strategyVo) {
logger.info("策略下发数据处理开始");
// 根据运行策略获取主副策略的模板数据
Long mainStrategyId = strategyVo.getMainStrategyId();
Long auxStrategyId = strategyVo.getAuxStrategyId();
String siteId = strategyVo.getSiteId();
// 处理主策略数据
if (mainStrategyId != null && StringUtils.isNotBlank(siteId)) {
dealStrategyCurveData(mainStrategyId, siteId);
}
// 处理副策略数据
if (auxStrategyId != null && StringUtils.isNotBlank(siteId)) {
dealStrategyCurveData(auxStrategyId, siteId);
}
// 策略数据下发-下方格式暂无
logger.info("策略下发结束");
}
private void dealStrategyCurveData(Long mainStrategyId, String siteId) {
// 获取当前策略的所有模板
List<Map<String, String>> temps = emsStrategyTempMapper.getTempNameList(mainStrategyId,siteId);
if (temps != null && temps.size() > 0) {
for (Map<String, String> temp : temps) {
String tempId = temp.get("templateId");
List<EmsStrategyTimeConfig> timeConfigs = emsStrategyTimeConfigMapper.getAllTimeConfigByTempId(tempId);
if (timeConfigs != null && timeConfigs.size() > 0) {
for (EmsStrategyTimeConfig timeConfig : timeConfigs) {
EmsStrategyCurve curve = new EmsStrategyCurve();
curve.setStrategyId(mainStrategyId);
curve.setSiteId(siteId);
curve.setTemplateId(tempId);
curve.setCreateBy("system");
curve.setCreateTime(DateUtils.getNowDate());
curve.setUpdateBy("system");
curve.setUpdateTime(DateUtils.getNowDate());
// 时间设置
int month = Integer.parseInt(timeConfig.getMonth().toString());
String[] dateList= dealWithMonth(month);
curve.setMonth(Long.valueOf(month));
curve.setStartDate(DateUtils.dateTime(DateUtils.YYYY_MM_DD,dateList[0]));
curve.setEndDate(DateUtils.dateTime(DateUtils.YYYY_MM_DD,dateList[1]));
// powerData-存json格式
List<EmsStrategyTemp> powerConfig = emsStrategyTempMapper.selectStrategyTempByTempId(tempId);
List<StrategyPowerDataVo> powerDataVoList = new ArrayList<>();
for (int i = 0; i < powerConfig.size(); i++) {
EmsStrategyTemp powerTemp = powerConfig.get(i);
StrategyPowerDataVo powerDataVo = new StrategyPowerDataVo();
powerDataVo.setPowerData(powerTemp.getChargeDischargePower());
powerDataVo.setEndTime(DateUtils.parseDateToStr("HH:mm:ss",powerTemp.getEndTime()));
powerDataVo.setStartTime(DateUtils.parseDateToStr("HH:mm:ss",powerTemp.getStartTime()));
powerDataVoList.add(powerDataVo);
}
curve.setPowerData(powerDataVoList !=null ? JSON.toJSON(powerDataVoList).toString() : "");
// 记录推送记录
emsStrategyCurveMapper.insertEmsStrategyCurve(curve);
// 设置已下发
timeConfig.setIsPost(0);
emsStrategyTimeConfigMapper.updateEmsStrategyTimeConfig(timeConfig);
}
}
}
}
}
private String[] dealWithMonth(int month) {
// 获取当前年份
int currentYear = LocalDate.now().getYear();
// 创建YearMonth对象表示当年指定的月份
YearMonth yearMonth = YearMonth.of(currentYear, month);
// 获取当月的第一天和最后一天
LocalDate firstDay = yearMonth.atDay(1);
LocalDate lastDay = yearMonth.atEndOfMonth();
// 定义日期格式(年月日)
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
// 格式化日期
return new String[]{
firstDay.format(formatter),
lastDay.format(formatter)
};
}
}