dev #3

Merged
dashixiong merged 11 commits from dev into waibao 2026-04-09 01:31:06 +00:00
107 changed files with 11483 additions and 3368 deletions
Showing only changes of commit 5ab2cb8f90 - Show all commits

View File

@ -12,7 +12,9 @@ import com.xzzn.ems.domain.vo.ClusterStatisListVo;
import com.xzzn.ems.domain.vo.DateSearchRequest;
import com.xzzn.ems.domain.vo.StatisAmmeterDateRequest;
import com.xzzn.ems.domain.vo.StatisClusterDateRequest;
import com.xzzn.ems.domain.vo.WeatherSyncResultVo;
import com.xzzn.ems.service.IEmsStatsReportService;
import com.xzzn.ems.service.IEmsWeatherSyncService;
import java.util.ArrayList;
import java.util.List;
@ -38,6 +40,8 @@ public class EmsStatisticalReportController extends BaseController
@Autowired
private IEmsStatsReportService ieEmsStatsReportService;
@Autowired
private IEmsWeatherSyncService iEmsWeatherSyncService;
/**
* 概率统计-收益指标查询
@ -176,4 +180,19 @@ public class EmsStatisticalReportController extends BaseController
}
}
/**
* 手动触发天气同步
*/
@PostMapping("/syncWeatherByDateRange")
public AjaxResult syncWeatherByDateRange(StatisAmmeterDateRequest requestVo)
{
if (StringUtils.isEmpty(requestVo.getSiteId())
|| StringUtils.isEmpty(requestVo.getStartTime())
|| StringUtils.isEmpty(requestVo.getEndTime())) {
return error("缺少必传项: siteId/startTime/endTime");
}
WeatherSyncResultVo resultVo = iEmsWeatherSyncService.syncWeatherByDateRange(requestVo);
return success(resultVo);
}
}

View File

@ -6,11 +6,7 @@ import com.xzzn.common.enums.TopicHandleType;
import com.xzzn.common.utils.StringUtils;
import com.xzzn.ems.domain.EmsMqttTopicConfig;
import com.xzzn.ems.mapper.EmsMqttTopicConfigMapper;
import com.xzzn.ems.service.IDDSDataProcessService;
import com.xzzn.ems.service.IDeviceDataProcessService;
import com.xzzn.ems.service.IEmsStrategyService;
import com.xzzn.ems.service.IFXXAlarmDataProcessService;
import com.xzzn.ems.service.IFXXDataProcessService;
import com.xzzn.ems.service.IMqttSyncLogService;
import com.xzzn.framework.manager.MqttLifecycleManager;
import com.xzzn.framework.web.service.MqttPublisher;
@ -38,22 +34,13 @@ public class MqttMessageController implements MqttPublisher, MqttSubscriber {
private final MqttLifecycleManager mqttLifecycleManager;
@Autowired
private IFXXDataProcessService fXXDataProcessService;
@Autowired
private IDDSDataProcessService dDSDataProcessService;
@Autowired
private IDeviceDataProcessService deviceDataProcessService;
@Autowired
private IFXXAlarmDataProcessService fXXAlarmDataProcessService;
@Autowired
private EmsMqttTopicConfigMapper emsMqttTopicConfigMapper;
@Autowired
private IEmsStrategyService emsStrategyService;
@Autowired
private IMqttSyncLogService iMqttSyncLogService;
@Autowired

View File

@ -205,3 +205,10 @@ modbus:
poll:
interval: "0 */5 * * * *" # 5分钟间隔
timeout: 30000 # 30秒超时
weather:
api:
enabled: true
base-url: https://archive-api.open-meteo.com/v1/archive
api-key:
timezone: Asia/Shanghai

View File

@ -205,3 +205,10 @@ modbus:
poll:
interval: "0 */5 * * * *" # 5分钟间隔
timeout: 30000 # 30秒超时
weather:
api:
enabled: true
base-url: https://archive-api.open-meteo.com/v1/archive
api-key:
timezone: Asia/Shanghai

View File

@ -221,3 +221,10 @@ modbus:
poll:
interval: "0 */5 * * * *" # 5分钟间隔
timeout: 30000 # 30秒超时
weather:
api:
enabled: true
base-url: https://archive-api.open-meteo.com/v1/archive
api-key:
timezone: Asia/Shanghai

View File

@ -103,6 +103,9 @@ public class RedisKeyConstants
/** 设备信息初始化 */
public static final String INIT_DEVICE_INFO = "init_device_info";
/** 设备配置缓存(按站点+设备) */
public static final String DEVICE_SETTING = "DEVICE_SETTING_";
/** 告警匹配信息 */
public static final String ALARM_MATCH_INFO = "alarm_message_info";
@ -126,6 +129,9 @@ public class RedisKeyConstants
/** 点位配置缓存(按站点+设备) */
public static final String POINT_CONFIG_DEVICE = "POINT_CONFIG_DEVICE_";
/** 点位配置缓存(按站点+pointId */
public static final String POINT_CONFIG_POINT = "POINT_CONFIG_POINT_";
/** 单站监控最新数据(按站点+模块) */
public static final String SITE_MONITOR_LATEST = "SITE_MONITOR_LATEST_";

View File

@ -2,31 +2,25 @@ package com.xzzn.quartz.task;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.serotonin.modbus4j.ModbusMaster;
import com.xzzn.common.constant.RedisKeyConstants;
import com.xzzn.common.core.modbus.ModbusProcessor;
import com.xzzn.common.core.modbus.domain.DeviceConfig;
import com.xzzn.common.core.modbus.domain.TagConfig;
import com.xzzn.common.core.redis.RedisCache;
import com.xzzn.common.enums.DeviceRunningStatus;
import com.xzzn.common.utils.DateUtils;
import com.xzzn.common.utils.StringUtils;
import com.xzzn.ems.domain.EmsDevicesSetting;
import com.xzzn.ems.domain.EmsPointConfig;
import com.xzzn.ems.mapper.EmsDevicesSettingMapper;
import com.xzzn.ems.mapper.EmsPointConfigMapper;
import com.xzzn.ems.service.IEmsAlarmRecordsService;
import com.xzzn.ems.service.impl.DeviceDataProcessServiceImpl;
import com.xzzn.framework.manager.MqttLifecycleManager;
import com.xzzn.framework.web.service.MqttPublisher;
import com.xzzn.quartz.config.ScheduledTask;
import com.xzzn.quartz.domain.SysJob;
import com.xzzn.quartz.service.ISysJobService;
import com.xzzn.quartz.util.CronUtils;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -54,10 +48,8 @@ import org.springframework.util.CollectionUtils;
@Component("modbusPoller")
public class ModbusPoller {
private static final Logger log = LoggerFactory.getLogger(ModbusPoller.class);
private static final int SITE_DEVICE_OFFLINE_THRESHOLD = 6;
private final MqttLifecycleManager mqttLifecycleManager;
private final ScheduledTask scheduledTask;
private final ObjectMapper objectMapper = new ObjectMapper();
private final Map<String, Integer> deviceFailureCounts = new ConcurrentHashMap<>();
private final AtomicBoolean polling = new AtomicBoolean(false);
@ -69,77 +61,46 @@ public class ModbusPoller {
@Autowired
private IEmsAlarmRecordsService iEmsAlarmRecordsService;
@Autowired
private ISysJobService iSysJobService;
@Autowired
private DeviceDataProcessServiceImpl deviceDataProcessServiceImpl;
@Autowired
private EmsDevicesSettingMapper emsDevicesSettingMapper;
@Autowired
private EmsPointConfigMapper emsPointConfigMapper;
@Autowired
private RedisCache redisCache;
@Autowired
private MqttPublisher mqttPublisher;
@Value("${mqtt.topic}")
private String topic;
@Value("${mqtt.siteId}")
private String siteId;
@Autowired
public ModbusPoller(MqttLifecycleManager mqttLifecycleManager, ScheduledTask scheduledTask) {
this.mqttLifecycleManager = mqttLifecycleManager;
this.scheduledTask = scheduledTask;
}
public void pollAllDevices() {
if (!polling.compareAndSet(false, true)) {
log.warn("上一次轮询尚未完成,本次轮询跳过");
return;
}
Path devicesDir = Paths.get(System.getProperty("user.dir"), "devices");
if (!Files.exists(devicesDir)) {
log.error("Devices目录不存在: {}", devicesDir);
List<PollingTask> pollingTasks = buildPollingTasks();
if (CollectionUtils.isEmpty(pollingTasks)) {
log.warn("未查询到可用的Modbus采集点位配置跳过本轮轮询");
polling.set(false);
return;
}
List<Path> jsonFiles = null;
try {
jsonFiles = Files.list(devicesDir)
.filter(path -> path.toString().endsWith(".json"))
.collect(Collectors.toList());
} catch (IOException e) {
log.error("modbusPoller.loadConfigs 获取设备配置文件失败: {}", devicesDir, e);
polling.set(false);
return;
}
// 按主机IP分组同一网关串行访问避免连接抖动
Map<String, List<PollingTask>> groupedByHost = pollingTasks.stream()
.collect(Collectors.groupingBy(
task -> task.getDeviceConfig().getHost(),
HashMap::new,
Collectors.toList()));
// 按主机IP分组同一网关的不同端口也归为一组避免并发访问导致Connection Reset
Map<String, List<DeviceConfig>> groupedByHost = new HashMap<>();
for (Path filePath : jsonFiles) {
DeviceConfig config = null;
try {
config = objectMapper.readValue(filePath.toFile(), DeviceConfig.class);
} catch (IOException e) {
log.error("modbusPoller.loadConfigs 解析设备配置文件失败: {}", filePath, e);
continue;
}
if (config.isEnabled()) {
// 只按主机IP分组确保同一网关的所有端口串行访问
String hostKey = config.getHost();
groupedByHost.computeIfAbsent(hostKey, k -> new ArrayList<>()).add(config);
}
}
// 使用单线程 executor 串行执行所有主机的 Modbus 操作
// 将所有主机的设备按顺序串行处理,避免任何并发访问
Future<?> future = modbusExecutor.submit(() -> {
for (Map.Entry<String, List<DeviceConfig>> entry : groupedByHost.entrySet()) {
for (Map.Entry<String, List<PollingTask>> entry : groupedByHost.entrySet()) {
String hostKey = entry.getKey();
List<DeviceConfig> configs = entry.getValue();
for (DeviceConfig config : configs) {
List<PollingTask> tasks = entry.getValue();
for (PollingTask task : tasks) {
try {
scheduledStart(config);
scheduledStart(task.getSiteId(), task.getDeviceConfig());
// 每次读取后等待200ms给Modbus网关足够的处理时间
Thread.sleep(200);
} catch (InterruptedException ie) {
@ -147,7 +108,8 @@ public class ModbusPoller {
log.warn("Modbus轮询被中断");
return;
} catch (Exception e) {
log.error("采集设备数据异常: {}", config.getDeviceName(), e);
log.error("采集设备数据异常: siteId={}, deviceId={}",
task.getSiteId(), task.getDeviceConfig().getDeviceNumber(), e);
}
}
log.info("采集设备数据{}轮询任务执行完成", hostKey);
@ -165,7 +127,139 @@ public class ModbusPoller {
}
}
public void scheduledStart(DeviceConfig config) {
private List<PollingTask> buildPollingTasks() {
List<EmsPointConfig> pointConfigs = emsPointConfigMapper.selectModbusCollectPointConfigs(null);
if (CollectionUtils.isEmpty(pointConfigs)) {
return Collections.emptyList();
}
List<EmsDevicesSetting> allDevices = emsDevicesSettingMapper.selectEmsDevicesSettingList(null);
Map<String, EmsDevicesSetting> deviceMap = allDevices.stream()
.filter(Objects::nonNull)
.filter(device -> StringUtils.isNoneBlank(device.getSiteId(), device.getDeviceId()))
.collect(Collectors.toMap(
this::buildSiteDeviceKey,
device -> device,
(left, right) -> left));
Map<String, List<EmsPointConfig>> pointsByDevice = pointConfigs.stream()
.filter(Objects::nonNull)
.filter(point -> StringUtils.isNoneBlank(point.getSiteId(), point.getDeviceId()))
.collect(Collectors.groupingBy(
point -> point.getSiteId() + "_" + point.getDeviceId(),
HashMap::new,
Collectors.toList()));
List<PollingTask> tasks = new ArrayList<>();
for (Map.Entry<String, List<EmsPointConfig>> entry : pointsByDevice.entrySet()) {
String siteDeviceKey = entry.getKey();
EmsDevicesSetting device = deviceMap.get(siteDeviceKey);
if (device == null) {
log.warn("未找到设备连接配置,跳过采集: key={}", siteDeviceKey);
continue;
}
DeviceConfig deviceConfig = buildDeviceConfig(device, entry.getValue());
if (deviceConfig == null) {
continue;
}
tasks.add(new PollingTask(device.getSiteId(), deviceConfig));
}
return tasks;
}
private DeviceConfig buildDeviceConfig(EmsDevicesSetting device, List<EmsPointConfig> pointConfigs) {
if (device == null || CollectionUtils.isEmpty(pointConfigs)) {
return null;
}
if (StringUtils.isBlank(device.getIpAddress()) || device.getIpPort() == null || device.getSlaveId() == null) {
log.warn("设备连接参数不完整,跳过采集: siteId={}, deviceId={}", device.getSiteId(), device.getDeviceId());
return null;
}
List<TagConfig> tags = pointConfigs.stream()
.sorted(Comparator.comparing(point -> point.getModbusReadOrder() == null ? 0 : point.getModbusReadOrder()))
.map(this::toTagConfig)
.filter(Objects::nonNull)
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(tags)) {
log.warn("设备无有效Modbus点位配置跳过采集: siteId={}, deviceId={}", device.getSiteId(), device.getDeviceId());
return null;
}
DeviceConfig deviceConfig = new DeviceConfig();
deviceConfig.setEnabled(true);
deviceConfig.setDeviceName(device.getDeviceName());
deviceConfig.setDeviceNumber(device.getDeviceId());
deviceConfig.setHost(device.getIpAddress());
deviceConfig.setPort(device.getIpPort().intValue());
deviceConfig.setSlaveId(device.getSlaveId().intValue());
deviceConfig.setTags(tags);
return deviceConfig;
}
private TagConfig toTagConfig(EmsPointConfig pointConfig) {
if (pointConfig == null) {
return null;
}
if (StringUtils.isBlank(pointConfig.getDataKey())) {
return null;
}
String address = normalizeAddress(pointConfig.getRegisterAddress(), pointConfig.getModbusRegisterType());
if (StringUtils.isBlank(address)) {
return null;
}
if (StringUtils.isBlank(pointConfig.getModbusDataType())) {
return null;
}
TagConfig tag = new TagConfig();
tag.setKey(pointConfig.getDataKey().trim());
tag.setAddress(address);
tag.setDataType(pointConfig.getModbusDataType().trim());
tag.setA(pointConfig.getDataA() == null ? 0F : pointConfig.getDataA().floatValue());
tag.setK(pointConfig.getDataK() == null ? 1F : pointConfig.getDataK().floatValue());
tag.setB(pointConfig.getDataB() == null ? 0F : pointConfig.getDataB().floatValue());
tag.setBit(pointConfig.getDataBit());
return tag;
}
private String normalizeAddress(String registerAddress, String registerType) {
if (StringUtils.isBlank(registerAddress)) {
return null;
}
String normalizedAddress = registerAddress.trim();
if (!normalizedAddress.chars().allMatch(Character::isDigit)) {
log.warn("寄存器地址必须为数字,当前值: {}", normalizedAddress);
return null;
}
if (normalizedAddress.length() > 1) {
char first = normalizedAddress.charAt(0);
if (first >= '0' && first <= '4') {
return normalizedAddress;
}
}
return getRegisterPrefix(registerType) + normalizedAddress;
}
private String getRegisterPrefix(String registerType) {
String normalized = StringUtils.defaultString(registerType).trim().toUpperCase();
switch (normalized) {
case "COIL":
return "0";
case "DISCRETE_INPUT":
return "1";
case "INPUT_REGISTER":
return "3";
case "HOLDING_REGISTER":
default:
return "4";
}
}
private String buildSiteDeviceKey(EmsDevicesSetting device) {
return device.getSiteId() + "_" + device.getDeviceId();
}
public void scheduledStart(String siteId, DeviceConfig config) {
if (config.isEnabled()) {
log.info("Reading data from devices: {}", config.getDeviceName());
@ -205,7 +299,7 @@ public class ModbusPoller {
log.info("Data from {}: {}", config.getDeviceName(), data);
String deviceNumber = config.getDeviceNumber();
//处理数据并发送MQTT消息、保存Redis数据和数据入库
processingData(data, deviceNumber);
processingData(siteId, data, deviceNumber);
}
}
@ -254,24 +348,25 @@ public class ModbusPoller {
return data;
}
private void processingData(Map<String, Object> data, String deviceNumber) {
private void processingData(String siteId, Map<String, Object> data, String deviceNumber) {
String siteDeviceKey = siteId + "_" + deviceNumber;
if (CollectionUtils.isEmpty(data)) {
// 增加失败计数
int failureCount = deviceFailureCounts.getOrDefault(deviceNumber, 0) + 1;
deviceFailureCounts.put(deviceNumber, failureCount);
int failureCount = deviceFailureCounts.getOrDefault(siteDeviceKey, 0) + 1;
deviceFailureCounts.put(siteDeviceKey, failureCount);
log.warn("设备 {} 数据读取失败,当前连续失败次数: {}", deviceNumber, failureCount);
log.warn("设备 {} 数据读取失败,当前连续失败次数: {}", siteDeviceKey, failureCount);
// 连续6次失败触发报警
if (failureCount >= 6) {
if (failureCount >= SITE_DEVICE_OFFLINE_THRESHOLD) {
addDeviceOfflineRecord(siteId, deviceNumber);
log.error("设备 {} 连续 {} 次未读取到数据,触发报警", deviceNumber, failureCount);
log.error("设备 {} 连续 {} 次未读取到数据,触发报警", siteDeviceKey, failureCount);
}
return;
}
// 数据读取成功,重置计数器
deviceFailureCounts.remove(deviceNumber);
deviceFailureCounts.remove(siteDeviceKey);
// 读取到数据后告警自恢复
deleteDeviceOfflineRecord(siteId, deviceNumber);
@ -281,9 +376,42 @@ public class ModbusPoller {
json.put("Data", data);
json.put("timestamp", timestamp);
json.put("Device", deviceNumber);
sendMqttMsg(json);
saveRedisData(json, deviceNumber);
saveDataToDatabase(data, deviceNumber, timestamp);
if (shouldSendMqttOnChange(siteId, deviceNumber, data)) {
sendMqttMsg(json);
} else {
sendMqttHeartbeat(deviceNumber, timestamp);
log.info("设备 {} 数据无变化已发送心跳MQTT", siteDeviceKey);
}
saveRedisData(siteId, json, deviceNumber);
saveDataToDatabase(siteId, data, deviceNumber, timestamp);
}
/**
* 逢变上送仅当Data发生变化时才发送MQTT
*/
private boolean shouldSendMqttOnChange(String siteId, String deviceNumber, Map<String, Object> currentData) {
JSONObject lastPayload = redisCache.getCacheObject(RedisKeyConstants.ORIGINAL_MQTT_DATA + siteId + "_" + deviceNumber);
if (lastPayload == null) {
return true;
}
Object lastDataObj = lastPayload.get("Data");
if (lastDataObj == null) {
return true;
}
JSONObject lastData = JSON.parseObject(JSON.toJSONString(lastDataObj));
JSONObject currentDataJson = JSON.parseObject(JSON.toJSONString(currentData));
return !Objects.equals(lastData, currentDataJson);
}
private void sendMqttHeartbeat(String deviceNumber, Long timestamp) {
JSONObject heartbeat = new JSONObject();
heartbeat.put("Device", deviceNumber);
heartbeat.put("timestamp", timestamp);
heartbeat.put("Heartbeat", 1);
heartbeat.put("Data", new JSONObject());
sendMqttMsg(heartbeat);
}
public void sendMqttMsg(JSONObject json) {
@ -296,7 +424,7 @@ public class ModbusPoller {
}
public void saveRedisData(JSONObject obj, String deviceNumber) {
public void saveRedisData(String siteId, JSONObject obj, String deviceNumber) {
try {
// 存放mqtt原始每个设备最晚一次数据便于后面点位获取数据
redisCache.setCacheObject(RedisKeyConstants.ORIGINAL_MQTT_DATA + siteId + "_" + deviceNumber, obj);
@ -308,8 +436,12 @@ public class ModbusPoller {
}
}
private void saveDataToDatabase(Map<String, Object> data, String deviceNumber, Long timestamp) {
deviceDataProcessServiceImpl.processingDeviceData(siteId, deviceNumber, JSON.toJSONString(data), DateUtils.convertUpdateTime(timestamp));
private void saveDataToDatabase(String siteId, Map<String, Object> data, String deviceNumber, Long timestamp) {
JSONObject payload = new JSONObject();
payload.put("Device", deviceNumber);
payload.put("Data", JSON.toJSONString(data));
payload.put("timestamp", timestamp);
deviceDataProcessServiceImpl.handleDeviceData(Collections.singletonList(payload).toString(), siteId);
}
//处理设备连接失败的情况,更新设备状态为离线,添加报警记录
@ -333,11 +465,22 @@ public class ModbusPoller {
}
}
private int getScheduledTaskInterval() {
SysJob query = new SysJob();
query.setInvokeTarget("modbusPoller.pollAllDevices");
List<SysJob> sysJobs = iSysJobService.selectJobList(query);
return Math.toIntExact(CronUtils.getNextExecutionIntervalMillis(sysJobs.get(0).getCronExpression()));
private static class PollingTask {
private final String siteId;
private final DeviceConfig deviceConfig;
private PollingTask(String siteId, DeviceConfig deviceConfig) {
this.siteId = siteId;
this.deviceConfig = deviceConfig;
}
public String getSiteId() {
return siteId;
}
public DeviceConfig getDeviceConfig() {
return deviceConfig;
}
}
}

View File

@ -7,8 +7,11 @@ import com.xzzn.common.core.modbus.ModbusProcessor;
import com.xzzn.common.core.modbus.domain.DeviceConfig;
import com.xzzn.common.core.modbus.domain.WriteTagConfig;
import com.xzzn.common.core.redis.RedisCache;
import com.xzzn.common.enums.BusinessStatus;
import com.xzzn.common.enums.BusinessType;
import com.xzzn.common.enums.ChargeStatus;
import com.xzzn.common.enums.DeviceCategory;
import com.xzzn.common.enums.OperatorType;
import com.xzzn.common.enums.SiteDevice;
import com.xzzn.common.enums.SocLimit;
import com.xzzn.common.enums.WorkStatus;
@ -33,6 +36,8 @@ import com.xzzn.ems.mapper.EmsStrategyRuntimeConfigMapper;
import com.xzzn.ems.mapper.EmsStrategyRunningMapper;
import com.xzzn.ems.mapper.EmsStrategyTempMapper;
import com.xzzn.ems.mapper.EmsStrategyTimeConfigMapper;
import com.xzzn.system.domain.SysOperLog;
import com.xzzn.system.service.ISysOperLogService;
import java.math.BigDecimal;
import java.math.RoundingMode;
@ -115,6 +120,8 @@ public class StrategyPoller {
private RedisCache redisCache;
@Autowired
private ModbusProcessor modbusProcessor;
@Autowired
private ISysOperLogService operLogService;
@Resource(name = "modbusExecutor")
private ExecutorService modbusExecutor;
@ -511,8 +518,10 @@ public class StrategyPoller {
boolean result = modbusProcessor.writeDataToDeviceWithRetry(deviceConfig);
if (!result) {
logger.info("当前站点: {}, PCS设备: {} modbus控制设备{}指令发送失败", siteId, deviceId, chargeStatus.getInfo());
recordDeviceOperationLog(siteId, deviceId, "写功率", chargeDischargePower, false, chargeStatus.getInfo() + "功率下发失败");
continue;
} else {
recordDeviceOperationLog(siteId, deviceId, "写功率", chargeDischargePower, true, null);
if (ChargeStatus.STANDBY.equals(chargeStatus)) {
// 待机,先写功率值,再关机
if (!switchDevice(pcsDevice, pcsSetting, WorkStatus.STOP)) {
@ -541,10 +550,42 @@ public class StrategyPoller {
if (!result) {
pcsDevice.setWorkStatus(originalWorkStatus);
logger.info("当前站点: {}, PCS设备: {} modbus控制设备{}指令发送失败", siteId, deviceId, workStatus.getInfo());
recordDeviceOperationLog(siteId, deviceId, "开关机", workStatus.getInfo(), false, workStatus.getInfo() + "指令发送失败");
} else {
recordDeviceOperationLog(siteId, deviceId, "开关机", workStatus.getInfo(), true, null);
}
return result;
}
private void recordDeviceOperationLog(String siteId, String deviceId, String action, Object param, boolean success, String errorMsg) {
try {
SysOperLog operLog = new SysOperLog();
operLog.setTitle("策略设备控制-" + action);
operLog.setBusinessType(BusinessType.UPDATE.ordinal());
operLog.setMethod(this.getClass().getName() + "." + action);
operLog.setRequestMethod("SCHEDULE");
operLog.setOperatorType(OperatorType.OTHER.ordinal());
operLog.setOperName("system");
operLog.setOperIp("127.0.0.1");
operLog.setOperUrl("/quartz/strategyPoller");
operLog.setOperTime(DateUtils.getNowDate());
Map<String, Object> operParam = new HashMap<>();
operParam.put("siteId", siteId);
operParam.put("deviceId", deviceId);
operParam.put("action", action);
operParam.put("param", param);
operLog.setOperParam(StringUtils.substring(JSON.toJSONString(operParam), 0, 2000));
operLog.setJsonResult(StringUtils.substring(JSON.toJSONString(Collections.singletonMap("success", success)), 0, 2000));
operLog.setStatus(success ? BusinessStatus.SUCCESS.ordinal() : BusinessStatus.FAIL.ordinal());
if (!success) {
operLog.setErrorMsg(StringUtils.substring(errorMsg, 0, 2000));
}
operLogService.insertOperlog(operLog);
} catch (Exception e) {
logger.error("记录sys_oper_log失败, siteId={}, deviceId={}, action={}", siteId, deviceId, action, e);
}
}
private ProtectionConstraintVo getProtectionConstraint(String siteId) {
ProtectionConstraintVo constraint = redisCache.getCacheObject(RedisKeyConstants.PROTECTION_CONSTRAINT + siteId);
if (constraint == null) {

View File

@ -0,0 +1,61 @@
package com.xzzn.ems.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Component
@ConfigurationProperties(prefix = "weather.api")
public class WeatherApiProperties {
/**
* 是否启用天气同步
*/
private boolean enabled = false;
/**
* 天气接口基础地址
*/
private String baseUrl = "https://archive-api.open-meteo.com/v1/archive";
/**
* 可选接口鉴权字段
*/
private String apiKey;
/**
* 时区参数
*/
private String timezone = "Asia/Shanghai";
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public String getBaseUrl() {
return baseUrl;
}
public void setBaseUrl(String baseUrl) {
this.baseUrl = baseUrl;
}
public String getApiKey() {
return apiKey;
}
public void setApiKey(String apiKey) {
this.apiKey = apiKey;
}
public String getTimezone() {
return timezone;
}
public void setTimezone(String timezone) {
this.timezone = timezone;
}
}

View File

@ -63,6 +63,24 @@ public class EmsPointConfig extends BaseEntity {
@Excel(name = "计算表达式")
private String calcExpression;
@Excel(name = "是否启用采集", readConverterExp = "0=否,1=是")
private Integer collectEnabled;
@Excel(name = "采集来源")
private String collectSource;
@Excel(name = "Modbus寄存器类型")
private String modbusRegisterType;
@Excel(name = "Modbus数据类型")
private String modbusDataType;
@Excel(name = "Modbus读取顺序")
private Integer modbusReadOrder;
@Excel(name = "Modbus分组")
private String modbusGroup;
public Long getId() {
return id;
}
@ -199,6 +217,54 @@ public class EmsPointConfig extends BaseEntity {
this.calcExpression = calcExpression;
}
public Integer getCollectEnabled() {
return collectEnabled;
}
public void setCollectEnabled(Integer collectEnabled) {
this.collectEnabled = collectEnabled;
}
public String getCollectSource() {
return collectSource;
}
public void setCollectSource(String collectSource) {
this.collectSource = collectSource;
}
public String getModbusRegisterType() {
return modbusRegisterType;
}
public void setModbusRegisterType(String modbusRegisterType) {
this.modbusRegisterType = modbusRegisterType;
}
public String getModbusDataType() {
return modbusDataType;
}
public void setModbusDataType(String modbusDataType) {
this.modbusDataType = modbusDataType;
}
public Integer getModbusReadOrder() {
return modbusReadOrder;
}
public void setModbusReadOrder(Integer modbusReadOrder) {
this.modbusReadOrder = modbusReadOrder;
}
public String getModbusGroup() {
return modbusGroup;
}
public void setModbusGroup(String modbusGroup) {
this.modbusGroup = modbusGroup;
}
@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.MULTI_LINE_STYLE)
@ -219,6 +285,12 @@ public class EmsPointConfig extends BaseEntity {
.append("isAlarm", getIsAlarm())
.append("pointType", getPointType())
.append("calcExpression", getCalcExpression())
.append("collectEnabled", getCollectEnabled())
.append("collectSource", getCollectSource())
.append("modbusRegisterType", getModbusRegisterType())
.append("modbusDataType", getModbusDataType())
.append("modbusReadOrder", getModbusReadOrder())
.append("modbusGroup", getModbusGroup())
.append("createBy", getCreateBy())
.append("createTime", getCreateTime())
.append("updateBy", getUpdateBy())

View File

@ -11,6 +11,15 @@ public class AmmeterRevenueStatisListVo {
/** 类别 */
private String dataTime;
/** 是否工作日1-工作日 0-节假日 */
private Integer isWorkday;
/** 日期类型 */
private String dayType;
/** 天气情况 */
private String weatherDesc;
/** 组合有功-总 */
private BigDecimal activeTotalPrice = BigDecimal.ZERO;
@ -52,6 +61,30 @@ public class AmmeterRevenueStatisListVo {
this.dataTime = dataTime;
}
public Integer getIsWorkday() {
return isWorkday;
}
public void setIsWorkday(Integer isWorkday) {
this.isWorkday = isWorkday;
}
public String getDayType() {
return dayType;
}
public void setDayType(String dayType) {
this.dayType = dayType;
}
public String getWeatherDesc() {
return weatherDesc;
}
public void setWeatherDesc(String weatherDesc) {
this.weatherDesc = weatherDesc;
}
public BigDecimal getActiveTotalPrice() {
return activeTotalPrice;
}

View File

@ -24,11 +24,25 @@ public class DevicePointDataList
private BigDecimal avgValue;
// 差值max - min
private BigDecimal diffValue;
// 第一四分位数
private BigDecimal q1;
// 中位数
private BigDecimal median;
// 第三四分位数
private BigDecimal q3;
public DevicePointDataList(String deviceId, List<GeneralQueryDataVo> pointValueList,String parentDeviceId,
BigDecimal maxValue, BigDecimal minValue,
BigDecimal avgValue, BigDecimal diffValue,
String maxDate, String minDate) {
this(deviceId, pointValueList, parentDeviceId, maxValue, minValue, avgValue, diffValue, maxDate, minDate, null, null, null);
}
public DevicePointDataList(String deviceId, List<GeneralQueryDataVo> pointValueList,String parentDeviceId,
BigDecimal maxValue, BigDecimal minValue,
BigDecimal avgValue, BigDecimal diffValue,
String maxDate, String minDate,
BigDecimal q1, BigDecimal median, BigDecimal q3) {
this.deviceId = deviceId;
this.pointValueList = pointValueList;
this.parentDeviceId = parentDeviceId;
@ -38,6 +52,9 @@ public class DevicePointDataList
this.diffValue = diffValue;
this.maxDate = maxDate;
this.minDate = minDate;
this.q1 = q1;
this.median = median;
this.q3 = q3;
}
public DevicePointDataList(String deviceId, String parentDeviceId, List<GeneralQueryDataVo> pointValueList) {
@ -121,4 +138,28 @@ public class DevicePointDataList
public void setDiffValue(BigDecimal diffValue) {
this.diffValue = diffValue;
}
public BigDecimal getQ1() {
return q1;
}
public void setQ1(BigDecimal q1) {
this.q1 = q1;
}
public BigDecimal getMedian() {
return median;
}
public void setMedian(BigDecimal median) {
this.median = median;
}
public BigDecimal getQ3() {
return q3;
}
public void setQ3(BigDecimal q3) {
this.q3 = q3;
}
}

View File

@ -0,0 +1,76 @@
package com.xzzn.ems.domain.vo;
public class WeatherSyncResultVo {
private String siteId;
private String startTime;
private String endTime;
private int totalDays;
private int successDays;
private int insertedDays;
private int updatedDays;
private String message;
public String getSiteId() {
return siteId;
}
public void setSiteId(String siteId) {
this.siteId = siteId;
}
public String getStartTime() {
return startTime;
}
public void setStartTime(String startTime) {
this.startTime = startTime;
}
public String getEndTime() {
return endTime;
}
public void setEndTime(String endTime) {
this.endTime = endTime;
}
public int getTotalDays() {
return totalDays;
}
public void setTotalDays(int totalDays) {
this.totalDays = totalDays;
}
public int getSuccessDays() {
return successDays;
}
public void setSuccessDays(int successDays) {
this.successDays = successDays;
}
public int getInsertedDays() {
return insertedDays;
}
public void setInsertedDays(int insertedDays) {
this.insertedDays = insertedDays;
}
public int getUpdatedDays() {
return updatedDays;
}
public void setUpdatedDays(int updatedDays) {
this.updatedDays = updatedDays;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}

View File

@ -47,4 +47,6 @@ public interface EmsPointConfigMapper {
List<EmsPointConfig> selectBySiteIdAndPointIds(@Param("siteId") String siteId,
@Param("pointIds") List<String> pointIds);
List<EmsPointConfig> selectModbusCollectPointConfigs(@Param("siteId") String siteId);
}

View File

@ -0,0 +1,19 @@
package com.xzzn.ems.mapper;
import org.apache.ibatis.annotations.Param;
public interface EmsSiteWeatherDayMapper {
int updateWeatherDesc(@Param("siteId") String siteId,
@Param("calendarDate") String calendarDate,
@Param("weatherDesc") String weatherDesc,
@Param("weatherCode") Integer weatherCode);
int selectCountBySiteAndDate(@Param("siteId") String siteId, @Param("calendarDate") String calendarDate);
int insertSiteWeatherDay(@Param("siteId") String siteId,
@Param("calendarDate") String calendarDate,
@Param("weatherDesc") String weatherDesc,
@Param("weatherCode") Integer weatherCode,
@Param("source") String source);
}

View File

@ -1,7 +0,0 @@
package com.xzzn.ems.service;
public interface IDDSDataProcessService {
public void handleDdsData(String message);
}

View File

@ -0,0 +1,8 @@
package com.xzzn.ems.service;
import com.xzzn.ems.domain.vo.StatisAmmeterDateRequest;
import com.xzzn.ems.domain.vo.WeatherSyncResultVo;
public interface IEmsWeatherSyncService {
WeatherSyncResultVo syncWeatherByDateRange(StatisAmmeterDateRequest requestVo);
}

View File

@ -1,6 +0,0 @@
package com.xzzn.ems.service;
public interface IFXXDataProcessService {
public void handleFxData(String message);
}

View File

@ -49,6 +49,7 @@ import com.xzzn.ems.mapper.EmsPcsAlarmDataMapper;
import com.xzzn.ems.mapper.EmsPcsBranchDataMapper;
import com.xzzn.ems.mapper.EmsPcsDataMapper;
import com.xzzn.ems.mapper.EmsPointConfigMapper;
import com.xzzn.ems.mapper.EmsSiteMonitorPointMatchMapper;
import com.xzzn.ems.mapper.EmsStackAlarmDataMapper;
import com.xzzn.ems.mapper.EmsXfDataMapper;
import com.xzzn.ems.service.IDeviceDataProcessService;
@ -118,6 +119,25 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i
private static final long POINT_ENQUEUE_RETRY_WAIT_MS = 10;
private static final long POINT_FLUSH_INTERVAL_MS = 100;
private static final String SITE_LEVEL_CALC_DEVICE_ID = "SITE_CALC";
private static final int MONITOR_POINT_MATCH_REDIS_TTL_SECONDS = 300;
private static final String DELETED_FIELD_MARK = "__DELETED__";
private static final String DAILY_ENERGY_RAW_CACHE_PREFIX = "DAILY_ENERGY_RAW_";
private static final String ENERGY_METRIC_PEAK_CHARGE = "peakCharge";
private static final String ENERGY_METRIC_PEAK_DISCHARGE = "peakDischarge";
private static final String ENERGY_METRIC_HIGH_CHARGE = "highCharge";
private static final String ENERGY_METRIC_HIGH_DISCHARGE = "highDischarge";
private static final String ENERGY_METRIC_FLAT_CHARGE = "flatCharge";
private static final String ENERGY_METRIC_FLAT_DISCHARGE = "flatDischarge";
private static final String ENERGY_METRIC_VALLEY_CHARGE = "valleyCharge";
private static final String ENERGY_METRIC_VALLEY_DISCHARGE = "valleyDischarge";
private static final String[] FIELD_SUFFIX_PEAK_CHARGE = {"activePeakKwh", "peakChargeDiff", "peakCharge"};
private static final String[] FIELD_SUFFIX_PEAK_DISCHARGE = {"reActivePeakKwh", "peakDischargeDiff", "peakDischarge"};
private static final String[] FIELD_SUFFIX_HIGH_CHARGE = {"activeHighKwh", "highChargeDiff", "highCharge"};
private static final String[] FIELD_SUFFIX_HIGH_DISCHARGE = {"reActiveHighKwh", "highDischargeDiff", "highDischarge"};
private static final String[] FIELD_SUFFIX_FLAT_CHARGE = {"activeFlatKwh", "flatChargeDiff", "flatCharge"};
private static final String[] FIELD_SUFFIX_FLAT_DISCHARGE = {"reActiveFlatKwh", "flatDischargeDiff", "flatDischarge"};
private static final String[] FIELD_SUFFIX_VALLEY_CHARGE = {"activeValleyKwh", "valleyChargeDiff", "valleyCharge"};
private static final String[] FIELD_SUFFIX_VALLEY_DISCHARGE = {"reActiveValleyKwh", "valleyDischargeDiff", "valleyDischarge"};
@Autowired
private EmsBatteryClusterMapper emsBatteryClusterMapper;
@ -151,6 +171,8 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i
private EmsBatteryGroupMapper emsBatteryGroupMapper;
@Autowired
private EmsEmsDataMapper emsEmsDataMapper;
@Autowired
private EmsSiteMonitorPointMatchMapper emsSiteMonitorPointMatchMapper;
@Autowired
private EmsCoolingAlarmDataMapper emsCoolingAlarmDataMapper;
@ -233,15 +255,18 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i
}
try {
String mergedJsonData = mergeWithLatestRedisData(siteId, deviceId, jsonData);
obj.put("Data", mergedJsonData);
// 保留每个设备最新一条原始报文,供“点位最新值”从 Redis 读取
redisCache.setCacheObject(RedisKeyConstants.ORIGINAL_MQTT_DATA + siteId + "_" + deviceId, obj);
redisCache.setCacheObject(RedisKeyConstants.SYNC_DATA + siteId + "_" + deviceId, obj, 1, TimeUnit.MINUTES);
String deviceCategory = resolveDeviceCategory(siteId, deviceId, deviceCategoryMap);
Map<String, BigDecimal> pointIdValueMap = processPointConfigData(siteId, deviceId, deviceCategory, jsonData, dataUpdateTime);
// 旧设备表落库链路已下线MQTT 数据仅走点位映射与站点监控数据同步。
Map<String, BigDecimal> pointIdValueMap = processPointConfigData(siteId, deviceId, deviceCategory, mergedJsonData, dataUpdateTime);
iEmsDeviceSettingService.syncSiteMonitorDataByMqtt(siteId, deviceId, JSON.toJSONString(pointIdValueMap), dataUpdateTime);
} catch (Exception e) {
log.warn("点位映射数据处理失败siteId: {}, deviceId: {}, err: {}",
log.warn("设备数据处理失败siteId: {}, deviceId: {}, err: {}",
siteId, deviceId, e.getMessage(), e);
}
}
@ -318,9 +343,468 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i
enqueuePointData(siteId, deviceId, pointId, convertedValue, dataUpdateTime);
pointIdValueMap.put(pointId, convertedValue);
}
updateDailyEnergyDataByMqtt(siteId, deviceId, deviceCategory, pointIdValueMap, dataUpdateTime);
return pointIdValueMap;
}
private String mergeWithLatestRedisData(String siteId, String deviceId, String currentJsonData) {
Map<String, Object> currentDataMap = parseDataJsonToMap(currentJsonData);
if (StringUtils.isAnyBlank(siteId, deviceId)) {
return JSON.toJSONString(currentDataMap);
}
String redisKey = RedisKeyConstants.ORIGINAL_MQTT_DATA + siteId + "_" + deviceId;
Object latestObj = redisCache.getCacheObject(redisKey);
Map<String, Object> latestDataMap = extractDataMapFromLatest(latestObj);
if (latestDataMap.isEmpty()) {
return JSON.toJSONString(currentDataMap);
}
Map<String, Object> merged = new LinkedHashMap<>(latestDataMap);
currentDataMap.forEach((key, value) -> {
if (StringUtils.isBlank(key) || value == null) {
return;
}
merged.put(key, value);
});
return JSON.toJSONString(merged);
}
private Map<String, Object> extractDataMapFromLatest(Object latestObj) {
if (latestObj == null) {
return new LinkedHashMap<>();
}
JSONObject latestJson;
if (latestObj instanceof JSONObject) {
latestJson = (JSONObject) latestObj;
} else if (latestObj instanceof String) {
latestJson = JSON.parseObject((String) latestObj);
} else if (latestObj instanceof Map) {
latestJson = new JSONObject((Map<String, Object>) latestObj);
} else {
latestJson = JSON.parseObject(JSON.toJSONString(latestObj));
}
if (latestJson == null || latestJson.isEmpty()) {
return new LinkedHashMap<>();
}
Object dataObj = latestJson.get("Data");
if (dataObj == null) {
return new LinkedHashMap<>();
}
if (dataObj instanceof JSONObject) {
return new LinkedHashMap<>(((JSONObject) dataObj));
}
if (dataObj instanceof Map) {
return new LinkedHashMap<>((Map<String, Object>) dataObj);
}
if (dataObj instanceof String) {
return parseDataJsonToMap((String) dataObj);
}
return parseDataJsonToMap(JSON.toJSONString(dataObj));
}
private Map<String, Object> parseDataJsonToMap(String jsonData) {
if (StringUtils.isBlank(jsonData)) {
return new LinkedHashMap<>();
}
Map<String, Object> parsed = JSON.parseObject(jsonData, new TypeReference<Map<String, Object>>() {
});
return parsed == null ? new LinkedHashMap<>() : new LinkedHashMap<>(parsed);
}
private void updateDailyEnergyDataByMqtt(String siteId, String deviceId, String deviceCategory,
Map<String, BigDecimal> pointIdValueMap,
Date dataUpdateTime) {
if (StringUtils.isBlank(siteId) || StringUtils.isBlank(deviceId)
|| org.apache.commons.collections4.MapUtils.isEmpty(pointIdValueMap)) {
return;
}
// 仅处理储能电表,避免其它设备误触发日报计算
if (!SiteDevice.METE.name().equalsIgnoreCase(deviceId)
&& !DeviceCategory.AMMETER.getCode().equalsIgnoreCase(StringUtils.defaultString(deviceCategory))) {
return;
}
Map<String, EmsSiteMonitorPointMatch> mappingByFieldAndDevice = getMonitorPointMappingByFieldAndDevice(siteId);
if (mappingByFieldAndDevice.isEmpty()) {
return;
}
Map<String, BigDecimal> normalizedPointValueMap = new HashMap<>();
for (Map.Entry<String, BigDecimal> entry : pointIdValueMap.entrySet()) {
if (StringUtils.isBlank(entry.getKey()) || entry.getValue() == null) {
continue;
}
normalizedPointValueMap.put(entry.getKey().trim().toUpperCase(), entry.getValue());
}
if (normalizedPointValueMap.isEmpty()) {
return;
}
String peakChargePointId = firstNonBlankPointByFieldSuffix(mappingByFieldAndDevice, deviceId, FIELD_SUFFIX_PEAK_CHARGE);
String peakDischargePointId = firstNonBlankPointByFieldSuffix(mappingByFieldAndDevice, deviceId, FIELD_SUFFIX_PEAK_DISCHARGE);
String highChargePointId = firstNonBlankPointByFieldSuffix(mappingByFieldAndDevice, deviceId, FIELD_SUFFIX_HIGH_CHARGE);
String highDischargePointId = firstNonBlankPointByFieldSuffix(mappingByFieldAndDevice, deviceId, FIELD_SUFFIX_HIGH_DISCHARGE);
String flatChargePointId = firstNonBlankPointByFieldSuffix(mappingByFieldAndDevice, deviceId, FIELD_SUFFIX_FLAT_CHARGE);
String flatDischargePointId = firstNonBlankPointByFieldSuffix(mappingByFieldAndDevice, deviceId, FIELD_SUFFIX_FLAT_DISCHARGE);
String valleyChargePointId = firstNonBlankPointByFieldSuffix(mappingByFieldAndDevice, deviceId, FIELD_SUFFIX_VALLEY_CHARGE);
String valleyDischargePointId = firstNonBlankPointByFieldSuffix(mappingByFieldAndDevice, deviceId, FIELD_SUFFIX_VALLEY_DISCHARGE);
Map<String, BigDecimal> currentTotals = new HashMap<>();
currentTotals.put(ENERGY_METRIC_PEAK_CHARGE, getPointValueById(normalizedPointValueMap, peakChargePointId));
currentTotals.put(ENERGY_METRIC_PEAK_DISCHARGE, getPointValueById(normalizedPointValueMap, peakDischargePointId));
currentTotals.put(ENERGY_METRIC_HIGH_CHARGE, getPointValueById(normalizedPointValueMap, highChargePointId));
currentTotals.put(ENERGY_METRIC_HIGH_DISCHARGE, getPointValueById(normalizedPointValueMap, highDischargePointId));
currentTotals.put(ENERGY_METRIC_FLAT_CHARGE, getPointValueById(normalizedPointValueMap, flatChargePointId));
currentTotals.put(ENERGY_METRIC_FLAT_DISCHARGE, getPointValueById(normalizedPointValueMap, flatDischargePointId));
currentTotals.put(ENERGY_METRIC_VALLEY_CHARGE, getPointValueById(normalizedPointValueMap, valleyChargePointId));
currentTotals.put(ENERGY_METRIC_VALLEY_DISCHARGE, getPointValueById(normalizedPointValueMap, valleyDischargePointId));
log.info("日电量分时映射命中, siteId: {}, deviceId: {}, points[尖充:{} 尖放:{} 峰充:{} 峰放:{} 平充:{} 平放:{} 谷充:{} 谷放:{}]",
siteId, deviceId,
peakChargePointId, peakDischargePointId,
highChargePointId, highDischargePointId,
flatChargePointId, flatDischargePointId,
valleyChargePointId, valleyDischargePointId);
boolean hasAnyCurrentTotal = currentTotals.values().stream().anyMatch(Objects::nonNull);
if (!hasAnyCurrentTotal) {
log.info("日电量累计未匹配到有效点位值, siteId: {}, deviceId: {}, pointKeys: {}",
siteId, deviceId, normalizedPointValueMap.keySet());
return;
}
log.info("日电量累计当前值, siteId: {}, deviceId: {}, currentTotals: {}",
siteId, deviceId, currentTotals);
Map<String, BigDecimal> lastTotals = getCachedDailyEnergyRawTotals(siteId, deviceId);
if (org.apache.commons.collections4.MapUtils.isEmpty(lastTotals)) {
lastTotals = loadYesterdayAmmeterTotals(siteId, deviceId);
if (!org.apache.commons.collections4.MapUtils.isEmpty(lastTotals)) {
log.info("日电量累计使用昨日末值作为基线, siteId: {}, deviceId: {}, lastTotals: {}",
siteId, deviceId, lastTotals);
}
}
if (org.apache.commons.collections4.MapUtils.isEmpty(lastTotals)) {
log.info("日电量累计缺少上一条基线, 仅缓存当前值等待下一次计算, siteId: {}, deviceId: {}, currentTotals: {}",
siteId, deviceId, currentTotals);
String redisKey = buildDailyEnergyRawCacheKey(siteId, deviceId);
redisCache.setCacheObject(redisKey, currentTotals, 2, TimeUnit.DAYS);
return;
}
EmsDailyEnergyData energyData = initEnergyData(siteId);
accumulateDailyEnergyDiff(energyData, currentTotals, lastTotals, siteId, deviceId);
recalculateDailyEnergyRevenue(siteId, energyData);
energyData.setCalcTime(dataUpdateTime == null ? DateUtils.getNowDate() : dataUpdateTime);
emsDailyEnergyDataMapper.insertOrUpdateData(energyData);
log.info("日电量累计落库完成, siteId: {}, deviceId: {}, diff[尖充:{} 尖放:{} 峰充:{} 峰放:{} 平充:{} 平放:{} 谷充:{} 谷放:{}], dayRevenue: {}, totalRevenue: {}",
siteId, deviceId,
energyData.getPeakChargeDiff(), energyData.getPeakDischargeDiff(),
energyData.getHighChargeDiff(), energyData.getHighDischargeDiff(),
energyData.getFlatChargeDiff(), energyData.getFlatDischargeDiff(),
energyData.getValleyChargeDiff(), energyData.getValleyDischargeDiff(),
energyData.getDayRevenue(), energyData.getTotalRevenue());
String redisKey = buildDailyEnergyRawCacheKey(siteId, deviceId);
redisCache.setCacheObject(redisKey, currentTotals, 2, TimeUnit.DAYS);
}
private Map<String, BigDecimal> loadYesterdayAmmeterTotals(String siteId, String deviceId) {
if (StringUtils.isAnyBlank(siteId, deviceId)) {
return Collections.emptyMap();
}
String yestDate = DateUtils.getYesterdayDate();
EmsAmmeterData yestData = emsAmmeterDataMapper.getYestLatestDate(siteId, deviceId, yestDate);
if (yestData == null) {
return Collections.emptyMap();
}
Map<String, BigDecimal> lastTotals = new HashMap<>();
lastTotals.put(ENERGY_METRIC_PEAK_CHARGE, safeBigDecimal(yestData.getCurrentForwardActivePeak()));
lastTotals.put(ENERGY_METRIC_PEAK_DISCHARGE, safeBigDecimal(yestData.getCurrentReverseActivePeak()));
lastTotals.put(ENERGY_METRIC_HIGH_CHARGE, safeBigDecimal(yestData.getCurrentForwardActiveHigh()));
lastTotals.put(ENERGY_METRIC_HIGH_DISCHARGE, safeBigDecimal(yestData.getCurrentReverseActiveHigh()));
lastTotals.put(ENERGY_METRIC_FLAT_CHARGE, safeBigDecimal(yestData.getCurrentForwardActiveFlat()));
lastTotals.put(ENERGY_METRIC_FLAT_DISCHARGE, safeBigDecimal(yestData.getCurrentReverseActiveFlat()));
lastTotals.put(ENERGY_METRIC_VALLEY_CHARGE, safeBigDecimal(yestData.getCurrentForwardActiveValley()));
lastTotals.put(ENERGY_METRIC_VALLEY_DISCHARGE, safeBigDecimal(yestData.getCurrentReverseActiveValley()));
return lastTotals;
}
private void accumulateDailyEnergyDiff(EmsDailyEnergyData energyData,
Map<String, BigDecimal> currentTotals,
Map<String, BigDecimal> lastTotals,
String siteId,
String deviceId) {
if (energyData == null || org.apache.commons.collections4.MapUtils.isEmpty(currentTotals)) {
return;
}
addMetricIncrement(energyData, ENERGY_METRIC_PEAK_CHARGE, currentTotals, lastTotals, siteId, deviceId);
addMetricIncrement(energyData, ENERGY_METRIC_PEAK_DISCHARGE, currentTotals, lastTotals, siteId, deviceId);
addMetricIncrement(energyData, ENERGY_METRIC_HIGH_CHARGE, currentTotals, lastTotals, siteId, deviceId);
addMetricIncrement(energyData, ENERGY_METRIC_HIGH_DISCHARGE, currentTotals, lastTotals, siteId, deviceId);
addMetricIncrement(energyData, ENERGY_METRIC_FLAT_CHARGE, currentTotals, lastTotals, siteId, deviceId);
addMetricIncrement(energyData, ENERGY_METRIC_FLAT_DISCHARGE, currentTotals, lastTotals, siteId, deviceId);
addMetricIncrement(energyData, ENERGY_METRIC_VALLEY_CHARGE, currentTotals, lastTotals, siteId, deviceId);
addMetricIncrement(energyData, ENERGY_METRIC_VALLEY_DISCHARGE, currentTotals, lastTotals, siteId, deviceId);
}
private void addMetricIncrement(EmsDailyEnergyData energyData,
String metric,
Map<String, BigDecimal> currentTotals,
Map<String, BigDecimal> lastTotals,
String siteId,
String deviceId) {
BigDecimal current = currentTotals.get(metric);
if (current == null) {
return;
}
BigDecimal last = lastTotals == null ? null : lastTotals.get(metric);
if (last == null) {
return;
}
BigDecimal delta = current.subtract(last);
if (delta.compareTo(BigDecimal.ZERO) < 0) {
log.warn("日电量累计值回退跳过本次增量siteId: {}, deviceId: {}, metric: {}, current: {}, last: {}",
siteId, deviceId, metric, current, last);
return;
}
if (ENERGY_METRIC_PEAK_CHARGE.equals(metric)) {
energyData.setPeakChargeDiff(safeBigDecimal(energyData.getPeakChargeDiff()).add(delta));
return;
}
if (ENERGY_METRIC_PEAK_DISCHARGE.equals(metric)) {
energyData.setPeakDischargeDiff(safeBigDecimal(energyData.getPeakDischargeDiff()).add(delta));
return;
}
if (ENERGY_METRIC_HIGH_CHARGE.equals(metric)) {
energyData.setHighChargeDiff(safeBigDecimal(energyData.getHighChargeDiff()).add(delta));
return;
}
if (ENERGY_METRIC_HIGH_DISCHARGE.equals(metric)) {
energyData.setHighDischargeDiff(safeBigDecimal(energyData.getHighDischargeDiff()).add(delta));
return;
}
if (ENERGY_METRIC_FLAT_CHARGE.equals(metric)) {
energyData.setFlatChargeDiff(safeBigDecimal(energyData.getFlatChargeDiff()).add(delta));
return;
}
if (ENERGY_METRIC_FLAT_DISCHARGE.equals(metric)) {
energyData.setFlatDischargeDiff(safeBigDecimal(energyData.getFlatDischargeDiff()).add(delta));
return;
}
if (ENERGY_METRIC_VALLEY_CHARGE.equals(metric)) {
energyData.setValleyChargeDiff(safeBigDecimal(energyData.getValleyChargeDiff()).add(delta));
return;
}
if (ENERGY_METRIC_VALLEY_DISCHARGE.equals(metric)) {
energyData.setValleyDischargeDiff(safeBigDecimal(energyData.getValleyDischargeDiff()).add(delta));
}
}
private void recalculateDailyEnergyRevenue(String siteId, EmsDailyEnergyData energyData) {
if (StringUtils.isBlank(siteId) || energyData == null) {
return;
}
String priceKey = RedisKeyConstants.ENERGY_PRICE_TIME + siteId + "_" + LocalDate.now().getYear() + LocalDate.now().getMonthValue();
EnergyPriceVo priceVo = redisCache.getCacheObject(priceKey);
if (priceVo == null) {
priceVo = emsEnergyPriceConfigService.getCurrentMonthPrice(siteId);
redisCache.setCacheObject(priceKey, priceVo, 31, TimeUnit.DAYS);
}
if (priceVo == null) {
energyData.setDayRevenue(BigDecimal.ZERO);
energyData.setTotalRevenue(getYestLastData(siteId));
return;
}
BigDecimal peakRevenue = safeBigDecimal(energyData.getPeakDischargeDiff())
.subtract(safeBigDecimal(energyData.getPeakChargeDiff()))
.multiply(safeBigDecimal(priceVo.getPeak()));
BigDecimal highRevenue = safeBigDecimal(energyData.getHighDischargeDiff())
.subtract(safeBigDecimal(energyData.getHighChargeDiff()))
.multiply(safeBigDecimal(priceVo.getHigh()));
BigDecimal flatRevenue = safeBigDecimal(energyData.getFlatDischargeDiff())
.subtract(safeBigDecimal(energyData.getFlatChargeDiff()))
.multiply(safeBigDecimal(priceVo.getFlat()));
BigDecimal valleyRevenue = safeBigDecimal(energyData.getValleyDischargeDiff())
.subtract(safeBigDecimal(energyData.getValleyChargeDiff()))
.multiply(safeBigDecimal(priceVo.getValley()));
BigDecimal dayRevenue = peakRevenue.add(highRevenue).add(flatRevenue).add(valleyRevenue);
energyData.setDayRevenue(dayRevenue);
energyData.setTotalRevenue(getYestLastData(siteId).add(dayRevenue));
}
private BigDecimal resolveEnergyMetricValue(Map<String, EmsSiteMonitorPointMatch> mappingByFieldAndDevice,
String deviceId,
Map<String, BigDecimal> pointIdValueMap,
String[] fieldSuffixes) {
String pointId = firstNonBlankPointByFieldSuffix(mappingByFieldAndDevice, deviceId, fieldSuffixes);
if (StringUtils.isBlank(pointId)) {
return null;
}
return pointIdValueMap.get(pointId.trim().toUpperCase());
}
private BigDecimal getPointValueById(Map<String, BigDecimal> pointIdValueMap, String pointId) {
if (pointIdValueMap == null || pointIdValueMap.isEmpty() || StringUtils.isBlank(pointId)) {
return null;
}
return pointIdValueMap.get(pointId.trim().toUpperCase());
}
private String firstNonBlankPointByFieldSuffix(Map<String, EmsSiteMonitorPointMatch> mappingByFieldAndDevice,
String deviceId,
String[] fieldSuffixes) {
if (mappingByFieldAndDevice == null || fieldSuffixes == null || fieldSuffixes.length == 0) {
return null;
}
String normalizedDeviceId = StringUtils.defaultString(deviceId).trim();
for (String suffix : fieldSuffixes) {
if (StringUtils.isBlank(suffix)) {
continue;
}
String normalizedSuffix = suffix.trim().toLowerCase();
EmsSiteMonitorPointMatch exactMatch = resolvePointMatchByFieldSuffix(mappingByFieldAndDevice, normalizedDeviceId, normalizedSuffix);
if (exactMatch != null && StringUtils.isNotBlank(exactMatch.getDataPoint())) {
return exactMatch.getDataPoint().trim();
}
EmsSiteMonitorPointMatch fallbackMatch = resolvePointMatchByFieldSuffix(mappingByFieldAndDevice, "", normalizedSuffix);
if (fallbackMatch != null && StringUtils.isNotBlank(fallbackMatch.getDataPoint())) {
return fallbackMatch.getDataPoint().trim();
}
}
return null;
}
private EmsSiteMonitorPointMatch resolvePointMatchByFieldSuffix(Map<String, EmsSiteMonitorPointMatch> mappingByFieldAndDevice,
String deviceId,
String suffixLowerCase) {
if (mappingByFieldAndDevice == null || StringUtils.isBlank(suffixLowerCase)) {
return null;
}
for (Map.Entry<String, EmsSiteMonitorPointMatch> entry : mappingByFieldAndDevice.entrySet()) {
EmsSiteMonitorPointMatch mapping = entry.getValue();
if (mapping == null || StringUtils.isBlank(mapping.getFieldCode())) {
continue;
}
String mappingDeviceId = StringUtils.defaultString(mapping.getDeviceId()).trim();
if (!StringUtils.equals(mappingDeviceId, StringUtils.defaultString(deviceId).trim())) {
continue;
}
String dataPoint = StringUtils.defaultString(mapping.getDataPoint()).trim();
if (StringUtils.isBlank(dataPoint) || DELETED_FIELD_MARK.equalsIgnoreCase(dataPoint)) {
continue;
}
String fieldCode = mapping.getFieldCode().trim().toLowerCase();
if (fieldCode.endsWith(suffixLowerCase)) {
return mapping;
}
}
return null;
}
private Map<String, EmsSiteMonitorPointMatch> getMonitorPointMappingByFieldAndDevice(String siteId) {
List<EmsSiteMonitorPointMatch> mappingList = getPointMatchesBySiteId(siteId);
if (CollectionUtils.isEmpty(mappingList)) {
return Collections.emptyMap();
}
return mappingList.stream()
.filter(item -> item != null && StringUtils.isNotBlank(item.getFieldCode()))
.collect(Collectors.toMap(
item -> buildFieldDeviceKey(item.getFieldCode(), item.getDeviceId()),
item -> item,
(a, b) -> b
));
}
private List<EmsSiteMonitorPointMatch> getPointMatchesBySiteId(String siteId) {
if (StringUtils.isBlank(siteId)) {
return Collections.emptyList();
}
String normalizedSiteId = siteId.trim();
String redisKey = RedisKeyConstants.SITE_MONITOR_POINT_MATCH + normalizedSiteId;
Object cacheObj = redisCache.getCacheObject(redisKey);
List<EmsSiteMonitorPointMatch> cached = parsePointMatchCache(cacheObj);
if (cached != null) {
return cached;
}
List<EmsSiteMonitorPointMatch> latest = emsSiteMonitorPointMatchMapper.selectBySiteId(normalizedSiteId);
if (latest == null) {
latest = Collections.emptyList();
}
redisCache.setCacheObject(redisKey, latest, MONITOR_POINT_MATCH_REDIS_TTL_SECONDS, TimeUnit.SECONDS);
return latest;
}
private List<EmsSiteMonitorPointMatch> parsePointMatchCache(Object cacheObj) {
if (cacheObj == null) {
return null;
}
try {
if (cacheObj instanceof String) {
return JSON.parseArray((String) cacheObj, EmsSiteMonitorPointMatch.class);
}
if (cacheObj instanceof List) {
List<?> cacheList = (List<?>) cacheObj;
if (cacheList.isEmpty()) {
return Collections.emptyList();
}
if (cacheList.get(0) instanceof EmsSiteMonitorPointMatch) {
return (List<EmsSiteMonitorPointMatch>) cacheList;
}
return JSON.parseArray(JSON.toJSONString(cacheObj), EmsSiteMonitorPointMatch.class);
}
return JSON.parseArray(JSON.toJSONString(cacheObj), EmsSiteMonitorPointMatch.class);
} catch (Exception ex) {
log.warn("解析单站监控点位映射缓存失败key类型={}, err={}",
cacheObj.getClass().getName(), ex.getMessage());
return null;
}
}
private String buildFieldDeviceKey(String fieldCode, String deviceId) {
return StringUtils.defaultString(fieldCode).trim() + "|" + StringUtils.defaultString(deviceId).trim();
}
private Map<String, BigDecimal> getCachedDailyEnergyRawTotals(String siteId, String deviceId) {
String redisKey = buildDailyEnergyRawCacheKey(siteId, deviceId);
Object cacheObj = redisCache.getCacheObject(redisKey);
if (cacheObj == null) {
return Collections.emptyMap();
}
try {
if (cacheObj instanceof Map) {
Map<?, ?> mapObj = (Map<?, ?>) cacheObj;
Map<String, BigDecimal> parsed = new HashMap<>();
for (Map.Entry<?, ?> entry : mapObj.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
continue;
}
String key = String.valueOf(entry.getKey()).trim();
if (StringUtils.isBlank(key)) {
continue;
}
BigDecimal value = StringUtils.getBigDecimal(entry.getValue());
if (value != null) {
parsed.put(key, value);
}
}
return parsed;
}
return JSON.parseObject(JSON.toJSONString(cacheObj), new TypeReference<Map<String, BigDecimal>>() {
});
} catch (Exception e) {
log.warn("解析日电量原始累计缓存失败siteId: {}, deviceId: {}, err: {}",
siteId, deviceId, e.getMessage());
return Collections.emptyMap();
}
}
private String buildDailyEnergyRawCacheKey(String siteId, String deviceId) {
return DAILY_ENERGY_RAW_CACHE_PREFIX + StringUtils.defaultString(siteId).trim() + "_" + StringUtils.defaultString(deviceId).trim();
}
private BigDecimal safeBigDecimal(BigDecimal value) {
return value == null ? BigDecimal.ZERO : value;
}
private Map<String, String> buildDeviceCategoryMap(String siteId, JSONArray arraylist) {
Map<String, String> deviceCategoryMap = new HashMap<>();
if (arraylist == null || arraylist.isEmpty()) {
@ -1372,51 +1856,8 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i
return null;
}
public void processingDeviceData(String siteId, String deviceId, String jsonData, Date dataUpdateTime) {
// 判断设备类型,并调用对应的方法处理数据
String deviceCategory = getDeviceCategory(siteId, deviceId);
if (deviceId.contains(SiteDevice.BMSD.name())) {
batteryStackDataProcess(siteId, deviceId, jsonData, dataUpdateTime);
if (SiteEnum.DDS.getCode().equals(siteId)) {
batteryGroupDataProcess(siteId, deviceId, jsonData);
batteryDataProcessFromBmsd(siteId, deviceId, jsonData, dataUpdateTime);
}
} else if (deviceId.contains(SiteDevice.BMSC.name())) {
batteryClusterDataProcess(siteId, deviceId, jsonData, dataUpdateTime);
batteryDataProcessFromBmsc(siteId, deviceId, jsonData, dataUpdateTime);
} else if (deviceId.contains(SiteDevice.PCS.name())
|| DeviceCategory.PCS.getCode().equals(deviceCategory)) {
pcsDataProcess(siteId, deviceId, jsonData, dataUpdateTime);
pcsBranchDataProcess(siteId, deviceId, jsonData);
if (SiteEnum.DDS.getCode().equals(siteId)) {
batteryClusterDataProcess(siteId, jsonData, dataUpdateTime);
}
} else if (deviceId.contains(SiteDevice.LOAD.name())
|| deviceId.contains(SiteDevice.METEGF.name())
|| deviceId.contains(SiteDevice.METE.name())
|| deviceId.contains(SiteDevice.METE0.name())
|| DeviceCategory.AMMETER.getCode().equals(deviceCategory)) {
meteDataProcess(siteId, deviceId, jsonData, dataUpdateTime);
} else if (deviceId.contains(SiteDevice.XF.name())
|| DeviceCategory.XF.getCode().equals(deviceCategory)) {
meteXFProcess(siteId, deviceId, jsonData, dataUpdateTime);
} else if (deviceId.contains(SiteDevice.DH.name())
|| deviceId.contains(SiteDevice.donghuan.name())
|| DeviceCategory.DH.getCode().equals(deviceCategory)) {
dhDataProcess(siteId, deviceId, jsonData, dataUpdateTime);
} else if (deviceId.contains(SiteDevice.ZSLQ.name())
|| DeviceCategory.COOLING.getCode().equals(deviceCategory)) {
coolingDataProcess(siteId, deviceId, jsonData, dataUpdateTime);
} else if (deviceId.contains(SiteDevice.EMS.name())
|| DeviceCategory.EMS.getCode().equals(deviceCategory)) {
emsDataProcess(siteId, deviceId, jsonData, dataUpdateTime);
} else {
log.info("未找到匹配的设备类型无法处理数据siteId: " + siteId + "deviceId: " + deviceId);
}
}
private String getDeviceCategory(String siteId, String deviceId) {
EmsDevicesSetting emsDevicesSetting = emsDevicesSettingMapper.getDeviceBySiteAndDeviceId(deviceId, siteId);
EmsDevicesSetting emsDevicesSetting = getDeviceSettingWithCache(siteId, deviceId);
if (emsDevicesSetting != null) {
return emsDevicesSetting.getDeviceCategory();
}
@ -1508,10 +1949,13 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i
redisCache.setCacheObject(RedisKeyConstants.XF + siteId + "_" + deviceId, xfData);
// 状态枚举还没有提供
EmsDevicesSetting emsDevicesSetting = emsDevicesSettingMapper.getDeviceBySiteAndDeviceId(deviceId, siteId);
emsDevicesSetting.setCommunicationStatus(StringUtils.getString(xfData.getDczt()));
emsDevicesSetting.setUpdatedAt(DateUtils.getNowDate());
emsDevicesSettingMapper.updateEmsDevicesSetting(emsDevicesSetting);
EmsDevicesSetting emsDevicesSetting = getDeviceSettingWithCache(siteId, deviceId);
if (emsDevicesSetting != null) {
emsDevicesSetting.setCommunicationStatus(StringUtils.getString(xfData.getDczt()));
emsDevicesSetting.setUpdatedAt(DateUtils.getNowDate());
emsDevicesSettingMapper.updateEmsDevicesSetting(emsDevicesSetting);
cacheDeviceSetting(siteId, deviceId, emsDevicesSetting);
}
}
private void dhDataProcess(String siteId, String deviceId, String dataJson, Date dateUpdateTime) {
@ -2104,6 +2548,9 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i
});
// 点位匹配数据
List<EmsPointMatch> pointMatchList = devicePointMatchDataProcessor.getDevicePointMatch(siteId, deviceId, DeviceMatchTable.AMMETER.getCode());
if (CollectionUtils.isEmpty(pointMatchList)) {
pointMatchList = devicePointMatchDataProcessor.getDeviceDefaultPointMatch(siteId, DeviceMatchTable.AMMETER.getCode());
}
if (CollectionUtils.isEmpty(pointMatchList)) {
log.info("未找到匹配的点位数据无法处理LOAD总表数据siteId: " + siteId + "deviceId: " + deviceId);
return;
@ -2210,10 +2657,59 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i
// workStatus = enumMatch.get().getEnumCode();
// }
// }
EmsDevicesSetting emsDevicesSetting = emsDevicesSettingMapper.getDeviceBySiteAndDeviceId(deviceId, siteId);
EmsDevicesSetting emsDevicesSetting = getDeviceSettingWithCache(siteId, deviceId);
if (emsDevicesSetting == null) {
return;
}
emsDevicesSetting.setWorkStatus(workStatus);
emsDevicesSetting.setUpdatedAt(DateUtils.getNowDate());
emsDevicesSettingMapper.updateEmsDevicesSetting(emsDevicesSetting);
cacheDeviceSetting(siteId, deviceId, emsDevicesSetting);
}
private EmsDevicesSetting getDeviceSettingWithCache(String siteId, String deviceId) {
if (StringUtils.isAnyBlank(siteId, deviceId)) {
return null;
}
String cacheKey = RedisKeyConstants.DEVICE_SETTING + siteId + "_" + deviceId;
EmsDevicesSetting cached = redisCache.getCacheObject(cacheKey);
if (cached != null) {
return cached;
}
EmsDevicesSetting fromInitCache = getDeviceSettingFromInitCache(siteId, deviceId);
if (fromInitCache != null) {
cacheDeviceSetting(siteId, deviceId, fromInitCache);
return fromInitCache;
}
EmsDevicesSetting fromDb = emsDevicesSettingMapper.getDeviceBySiteAndDeviceId(deviceId, siteId);
if (fromDb != null) {
cacheDeviceSetting(siteId, deviceId, fromDb);
}
return fromDb;
}
private EmsDevicesSetting getDeviceSettingFromInitCache(String siteId, String deviceId) {
Map<String, List<EmsDevicesSetting>> map = redisCache.getCacheObject(RedisKeyConstants.INIT_DEVICE_INFO);
if (map == null || map.isEmpty()) {
return null;
}
List<EmsDevicesSetting> list = map.get(siteId);
if (CollectionUtils.isEmpty(list)) {
return null;
}
for (EmsDevicesSetting item : list) {
if (item != null && deviceId.equals(item.getDeviceId())) {
return item;
}
}
return null;
}
private void cacheDeviceSetting(String siteId, String deviceId, EmsDevicesSetting setting) {
if (StringUtils.isAnyBlank(siteId, deviceId) || setting == null) {
return;
}
redisCache.setCacheObject(RedisKeyConstants.DEVICE_SETTING + siteId + "_" + deviceId, setting, 10, TimeUnit.MINUTES);
}
private String getDeviceParent(String siteId, String deviceId) {
@ -2276,6 +2772,9 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i
});
// 点位匹配数据
List<EmsPointMatch> pointMatchList = devicePointMatchDataProcessor.getDevicePointMatch(siteId, deviceId, DeviceMatchTable.AMMETER.getCode());
if (CollectionUtils.isEmpty(pointMatchList)) {
pointMatchList = devicePointMatchDataProcessor.getDeviceDefaultPointMatch(siteId, DeviceMatchTable.AMMETER.getCode());
}
if (CollectionUtils.isEmpty(pointMatchList)) {
log.info("未找到匹配的点位数据无法处理电表数据siteId: " + siteId + "deviceId: " + deviceId);
return;
@ -2531,6 +3030,9 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i
}
private void dealAmmeterDailyDate(String siteId, EmsAmmeterData currentData, Date dataUpdateTime, EmsAmmeterData lastData) {
EmsDailyEnergyData energyData = initEnergyData(siteId);
energyData.setCalcTime(DateUtils.getNowDate());
// 先获取当月电价配置redis没有这查数据库都没有则返回
String priceKey = RedisKeyConstants.ENERGY_PRICE_TIME + siteId + "_" + LocalDate.now().getYear() + LocalDate.now().getMonthValue();
EnergyPriceVo priceVo = redisCache.getCacheObject(priceKey);
@ -2538,11 +3040,13 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i
priceVo = emsEnergyPriceConfigService.getCurrentMonthPrice(siteId);
redisCache.setCacheObject(priceKey, priceVo, 31, TimeUnit.DAYS);
if (priceVo == null) {
emsDailyEnergyDataMapper.insertOrUpdateData(energyData);
return;
}
}
List<EnergyPriceTimeRange> timeRanges = priceVo.getRange();
if (timeRanges == null) {
emsDailyEnergyDataMapper.insertOrUpdateData(energyData);
return;
}
@ -2557,12 +3061,10 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i
}
}
if (StringUtils.isEmpty(costType)) {
emsDailyEnergyDataMapper.insertOrUpdateData(energyData);
return;
}
//初始化电表每日差值对象
EmsDailyEnergyData energyData = initEnergyData(siteId);
// 根据 costType计算本次与上次数据差值累加到对应的数据类型里面
setDiffByCostType(siteId, costType, energyData, lastData, currentData, priceVo);

View File

@ -9,8 +9,11 @@ import com.xzzn.common.core.modbus.ModbusProcessor;
import com.xzzn.common.core.modbus.domain.DeviceConfig;
import com.xzzn.common.core.modbus.domain.WriteTagConfig;
import com.xzzn.common.core.redis.RedisCache;
import com.xzzn.common.enums.BusinessStatus;
import com.xzzn.common.enums.BusinessType;
import com.xzzn.common.enums.DeviceCategory;
import com.xzzn.common.enums.DeviceType;
import com.xzzn.common.enums.OperatorType;
import com.xzzn.common.enums.PointType;
import com.xzzn.common.enums.WorkStatus;
import com.xzzn.common.exception.ServiceException;
@ -45,6 +48,8 @@ import com.xzzn.ems.mapper.EmsSiteMonitorPointMatchMapper;
import com.xzzn.ems.service.IEmsDeviceSettingService;
import com.xzzn.ems.service.InfluxPointDataWriter;
import com.xzzn.ems.utils.DevicePointMatchDataProcessor;
import com.xzzn.system.domain.SysOperLog;
import com.xzzn.system.service.ISysOperLogService;
import java.math.BigDecimal;
import java.util.ArrayList;
@ -144,6 +149,8 @@ public class EmsDeviceSettingServiceImpl implements IEmsDeviceSettingService
private EmsSiteMonitorDataMapper emsSiteMonitorDataMapper;
@Autowired
private InfluxPointDataWriter influxPointDataWriter;
@Autowired
private ISysOperLogService operLogService;
private volatile List<EmsSiteMonitorItem> monitorItemCache = Collections.emptyList();
private volatile long monitorItemCacheExpireAt = 0L;
@ -2119,11 +2126,37 @@ public class EmsDeviceSettingServiceImpl implements IEmsDeviceSettingService
log.info("设备控制指令发送数据: {}", JSON.toJSONString(deviceConfig));
boolean result = modbusProcessor.writeDataToDevice(deviceConfig);
if (!result) {
recordDeviceOperationLog(request, false, "设备控制指令发送失败");
throw new ServiceException("设备控制指令发送失败");
}
recordDeviceOperationLog(request, true, null);
return true;
}
private void recordDeviceOperationLog(DeviceUpdateRequest request, boolean success, String errorMsg) {
try {
SysOperLog operLog = new SysOperLog();
operLog.setTitle("设备控制-开关机/写功率");
operLog.setBusinessType(BusinessType.UPDATE.ordinal());
operLog.setMethod(this.getClass().getName() + ".updateDeviceStatus");
operLog.setRequestMethod("SERVICE");
operLog.setOperatorType(OperatorType.OTHER.ordinal());
operLog.setOperName("system");
operLog.setOperIp("127.0.0.1");
operLog.setOperUrl("/ems/siteConfig/updateDeviceStatus");
operLog.setOperTime(DateUtils.getNowDate());
operLog.setOperParam(StringUtils.substring(JSON.toJSONString(request), 0, 2000));
operLog.setJsonResult(StringUtils.substring(JSON.toJSONString(Collections.singletonMap("success", success)), 0, 2000));
operLog.setStatus(success ? BusinessStatus.SUCCESS.ordinal() : BusinessStatus.FAIL.ordinal());
if (!success) {
operLog.setErrorMsg(StringUtils.substring(errorMsg, 0, 2000));
}
operLogService.insertOperlog(operLog);
} catch (Exception e) {
log.error("记录sys_oper_log失败, siteId={}, deviceId={}", request.getSiteId(), request.getDeviceId(), e);
}
}
public DeviceConfig getDeviceConfig(EmsDevicesSetting device) {
DeviceConfig deviceConfig = new DeviceConfig();
deviceConfig.setDeviceNumber(device.getDeviceId());

View File

@ -398,6 +398,10 @@ public class EmsPointConfigServiceImpl implements IEmsPointConfigService {
if (keys != null && !keys.isEmpty()) {
redisCache.deleteObject(keys);
}
Collection<String> pointKeys = redisCache.keys(RedisKeyConstants.POINT_CONFIG_POINT + siteId + "_*");
if (pointKeys != null && !pointKeys.isEmpty()) {
redisCache.deleteObject(pointKeys);
}
}
private JSONObject toJsonObject(Object raw) {
@ -501,6 +505,11 @@ public class EmsPointConfigServiceImpl implements IEmsPointConfigService {
if (StringUtils.isAnyBlank(siteId, pointId)) {
return null;
}
String cacheKey = RedisKeyConstants.POINT_CONFIG_POINT + siteId + "_" + pointId;
EmsPointConfig cached = redisCache.getCacheObject(cacheKey);
if (cached != null) {
return cached;
}
EmsPointConfig query = new EmsPointConfig();
query.setSiteId(siteId);
query.setPointId(pointId);
@ -508,7 +517,9 @@ public class EmsPointConfigServiceImpl implements IEmsPointConfigService {
if (CollectionUtils.isEmpty(pointConfigs)) {
return null;
}
return pointConfigs.get(0);
EmsPointConfig latest = pointConfigs.get(0);
redisCache.setCacheObject(cacheKey, latest);
return latest;
}
private BigDecimal convertPointValue(BigDecimal sourceValue, EmsPointConfig pointConfig) {

View File

@ -379,20 +379,90 @@ public class EmsStatsReportServiceImpl implements IEmsStatsReportService
@Override
public List<AmmeterRevenueStatisListVo> getAmmeterRevenueDataResult(StatisAmmeterDateRequest requestVo) {
List<AmmeterRevenueStatisListVo> resultList = emsDailyEnergyDataMapper.getRevenueDataBySiteId(requestVo.getSiteId(),requestVo.getStartTime(),requestVo.getEndTime());
log.info("收益报表查询开始, siteId: {}, startTime: {}, endTime: {}",
requestVo.getSiteId(), requestVo.getStartTime(), requestVo.getEndTime());
List<AmmeterRevenueStatisListVo> resultList = emsDailyEnergyDataMapper.getRevenueDataBySiteId(
requestVo.getSiteId(), requestVo.getStartTime(), requestVo.getEndTime());
if (CollectionUtils.isEmpty(resultList)) {
log.warn("收益报表查询结果为空, siteId: {}, startTime: {}, endTime: {}",
requestVo.getSiteId(), requestVo.getStartTime(), requestVo.getEndTime());
return Collections.emptyList();
}
//计算每天总收益和当日实际收益(放电总-充电总)
resultList.forEach(ammeterRevenue -> {
ammeterRevenue.setActiveTotalPrice(ammeterRevenue.getActivePeakPrice().add(ammeterRevenue.getActiveHighPrice()).add(ammeterRevenue.getActiveFlatPrice()).add(ammeterRevenue.getActiveValleyPrice()));
ammeterRevenue.setReActiveTotalPrice(ammeterRevenue.getReActivePeakPrice().add(ammeterRevenue.getReActiveHighPrice()).add(ammeterRevenue.getReActiveFlatPrice()).add(ammeterRevenue.getReActiveValleyPrice()));
ammeterRevenue.setActualRevenue(ammeterRevenue.getReActiveTotalPrice().subtract(ammeterRevenue.getActiveTotalPrice()));
BigDecimal activePeakPrice = nz(ammeterRevenue.getActivePeakPrice());
BigDecimal activeHighPrice = nz(ammeterRevenue.getActiveHighPrice());
BigDecimal activeFlatPrice = nz(ammeterRevenue.getActiveFlatPrice());
BigDecimal activeValleyPrice = nz(ammeterRevenue.getActiveValleyPrice());
BigDecimal reActivePeakPrice = nz(ammeterRevenue.getReActivePeakPrice());
BigDecimal reActiveHighPrice = nz(ammeterRevenue.getReActiveHighPrice());
BigDecimal reActiveFlatPrice = nz(ammeterRevenue.getReActiveFlatPrice());
BigDecimal reActiveValleyPrice = nz(ammeterRevenue.getReActiveValleyPrice());
ammeterRevenue.setActiveTotalPrice(activePeakPrice.add(activeHighPrice).add(activeFlatPrice).add(activeValleyPrice));
ammeterRevenue.setReActiveTotalPrice(reActivePeakPrice.add(reActiveHighPrice).add(reActiveFlatPrice).add(reActiveValleyPrice));
// 实际收益=放电价格(尖峰平谷)-充电价格(尖峰平谷)
ammeterRevenue.setActualRevenue(
reActivePeakPrice.subtract(activePeakPrice)
.add(reActiveHighPrice.subtract(activeHighPrice))
.add(reActiveFlatPrice.subtract(activeFlatPrice))
.add(reActiveValleyPrice.subtract(activeValleyPrice))
);
});
int weatherMissingCount = 0;
int allPriceZeroCount = 0;
for (AmmeterRevenueStatisListVo row : resultList) {
String weatherDesc = row.getWeatherDesc();
if (weatherDesc == null || weatherDesc.trim().isEmpty() || "--".equals(weatherDesc.trim())) {
weatherMissingCount++;
}
boolean allPriceZero = safeCompareToZero(row.getActivePeakPrice())
&& safeCompareToZero(row.getActiveHighPrice())
&& safeCompareToZero(row.getActiveFlatPrice())
&& safeCompareToZero(row.getActiveValleyPrice())
&& safeCompareToZero(row.getReActivePeakPrice())
&& safeCompareToZero(row.getReActiveHighPrice())
&& safeCompareToZero(row.getReActiveFlatPrice())
&& safeCompareToZero(row.getReActiveValleyPrice());
if (allPriceZero) {
allPriceZeroCount++;
}
log.info("收益报表明细, date: {}, dayType: {}, weather: {}, charge[尖:{} 峰:{} 平:{} 谷:{} 总:{}], discharge[尖:{} 峰:{} 平:{} 谷:{} 总:{}], actualRevenue: {}",
row.getDataTime(),
row.getDayType(),
row.getWeatherDesc(),
row.getActivePeakPrice(),
row.getActiveHighPrice(),
row.getActiveFlatPrice(),
row.getActiveValleyPrice(),
row.getActiveTotalPrice(),
row.getReActivePeakPrice(),
row.getReActiveHighPrice(),
row.getReActiveFlatPrice(),
row.getReActiveValleyPrice(),
row.getReActiveTotalPrice(),
row.getActualRevenue());
}
log.info("收益报表查询完成, siteId: {}, startTime: {}, endTime: {}, totalRows: {}, weatherMissingRows: {}, allPriceZeroRows: {}",
requestVo.getSiteId(),
requestVo.getStartTime(),
requestVo.getEndTime(),
resultList.size(),
weatherMissingCount,
allPriceZeroCount);
return resultList;
}
private boolean safeCompareToZero(BigDecimal value) {
return value == null || value.compareTo(BigDecimal.ZERO) == 0;
}
private BigDecimal nz(BigDecimal value) {
return value == null ? BigDecimal.ZERO : value;
}
public static AmmeterRevenueStatisListVo calculateDailyBill(AmmeterStatisListVo ammeter, List<EnergyPriceConfigVo> priceList) {
AmmeterRevenueStatisListVo ammeterRevenue = new AmmeterRevenueStatisListVo();
ammeterRevenue.setDataTime(ammeter.getDataTime());
@ -798,19 +868,19 @@ public class EmsStatsReportServiceImpl implements IEmsStatsReportService
Row row1 = sheet.createRow(0);
Cell cell1 = row1.createCell(0);
cell1.setCellValue("汇总");
Cell cell2 = row1.createCell(1);
Cell cell2 = row1.createCell(3);
cell2.setCellValue("充电价格");
Cell cell3 = row1.createCell(6);
Cell cell3 = row1.createCell(8);
cell3.setCellValue("放电价格");
Cell cell4 = row1.createCell(11);
Cell cell4 = row1.createCell(13);
cell4.setCellValue("");
// 合并充电量列
CellRangeAddress mergeRegion1 = new CellRangeAddress(0, 0, 1, 5);
CellRangeAddress mergeRegion1 = new CellRangeAddress(0, 0, 3, 7);
sheet.addMergedRegion(mergeRegion1);
// 合并放电量列
CellRangeAddress mergeRegion2 = new CellRangeAddress(0, 0, 6, 10);
CellRangeAddress mergeRegion2 = new CellRangeAddress(0, 0, 8, 12);
sheet.addMergedRegion(mergeRegion2);
// 设置第二行
@ -818,27 +888,31 @@ public class EmsStatsReportServiceImpl implements IEmsStatsReportService
Cell cell5 = row2.createCell(0);
cell5.setCellValue("日期");
Cell cell6 = row2.createCell(1);
cell6.setCellValue("");
cell6.setCellValue("日期类型");
Cell cell7 = row2.createCell(2);
cell7.setCellValue("");
cell7.setCellValue("天气情况");
Cell cell8 = row2.createCell(3);
cell8.setCellValue("");
cell8.setCellValue("");
Cell cell9 = row2.createCell(4);
cell9.setCellValue("");
cell9.setCellValue("");
Cell cell10 = row2.createCell(5);
cell10.setCellValue("");
cell10.setCellValue("");
Cell cell11 = row2.createCell(6);
cell11.setCellValue("");
cell11.setCellValue("");
Cell cell12 = row2.createCell(7);
cell12.setCellValue("");
cell12.setCellValue("");
Cell cell13 = row2.createCell(8);
cell13.setCellValue("");
cell13.setCellValue("");
Cell cell14 = row2.createCell(9);
cell14.setCellValue("");
cell14.setCellValue("");
Cell cell15 = row2.createCell(10);
cell15.setCellValue("");
cell15.setCellValue("");
Cell cell16 = row2.createCell(11);
cell16.setCellValue("实际收益");
cell16.setCellValue("");
Cell cell17 = row2.createCell(12);
cell17.setCellValue("");
Cell cell18 = row2.createCell(13);
cell18.setCellValue("实际收益");
// 设置背景颜色
CellStyle headerStyle = workbook.createCellStyle();
@ -908,27 +982,31 @@ public class EmsStatsReportServiceImpl implements IEmsStatsReportService
Cell dataCell1 = dataRow.createCell(0);
dataCell1.setCellValue(ammeterRevenueStatisVo.getDataTime());
Cell dataCell2 = dataRow.createCell(1);
dataCell2.setCellValue(ammeterRevenueStatisVo.getActivePeakPrice().doubleValue());
dataCell2.setCellValue(ammeterRevenueStatisVo.getDayType() == null ? "" : ammeterRevenueStatisVo.getDayType());
Cell dataCell3 = dataRow.createCell(2);
dataCell3.setCellValue(ammeterRevenueStatisVo.getActiveHighPrice().doubleValue());
dataCell3.setCellValue(ammeterRevenueStatisVo.getWeatherDesc() == null ? "" : ammeterRevenueStatisVo.getWeatherDesc());
Cell dataCell4 = dataRow.createCell(3);
dataCell4.setCellValue(ammeterRevenueStatisVo.getActiveFlatPrice().doubleValue());
dataCell4.setCellValue(ammeterRevenueStatisVo.getActivePeakPrice().doubleValue());
Cell dataCell5 = dataRow.createCell(4);
dataCell5.setCellValue(ammeterRevenueStatisVo.getActiveValleyPrice().doubleValue());
dataCell5.setCellValue(ammeterRevenueStatisVo.getActiveHighPrice().doubleValue());
Cell dataCell6 = dataRow.createCell(5);
dataCell6.setCellValue(ammeterRevenueStatisVo.getActiveTotalPrice().doubleValue());
dataCell6.setCellValue(ammeterRevenueStatisVo.getActiveFlatPrice().doubleValue());
Cell dataCell7 = dataRow.createCell(6);
dataCell7.setCellValue(ammeterRevenueStatisVo.getReActivePeakPrice().doubleValue());
dataCell7.setCellValue(ammeterRevenueStatisVo.getActiveValleyPrice().doubleValue());
Cell dataCell8 = dataRow.createCell(7);
dataCell8.setCellValue(ammeterRevenueStatisVo.getReActiveHighPrice().doubleValue());
dataCell8.setCellValue(ammeterRevenueStatisVo.getActiveTotalPrice().doubleValue());
Cell dataCell9 = dataRow.createCell(8);
dataCell9.setCellValue(ammeterRevenueStatisVo.getReActiveFlatPrice().doubleValue());
dataCell9.setCellValue(ammeterRevenueStatisVo.getReActivePeakPrice().doubleValue());
Cell dataCell10 = dataRow.createCell(9);
dataCell10.setCellValue(ammeterRevenueStatisVo.getReActiveValleyPrice().doubleValue());
dataCell10.setCellValue(ammeterRevenueStatisVo.getReActiveHighPrice().doubleValue());
Cell dataCell11 = dataRow.createCell(10);
dataCell11.setCellValue(ammeterRevenueStatisVo.getReActiveValleyPrice().doubleValue());
dataCell11.setCellValue(ammeterRevenueStatisVo.getReActiveFlatPrice().doubleValue());
Cell dataCell12 = dataRow.createCell(11);
dataCell12.setCellValue(ammeterRevenueStatisVo.getActualRevenue().doubleValue());
dataCell12.setCellValue(ammeterRevenueStatisVo.getReActiveValleyPrice().doubleValue());
Cell dataCell13 = dataRow.createCell(12);
dataCell13.setCellValue(ammeterRevenueStatisVo.getReActiveTotalPrice().doubleValue());
Cell dataCell14 = dataRow.createCell(13);
dataCell14.setCellValue(ammeterRevenueStatisVo.getActualRevenue().doubleValue());
// 根据行号设置背景色
if (i % 2 == 0) {
@ -961,27 +1039,31 @@ public class EmsStatsReportServiceImpl implements IEmsStatsReportService
Cell lastRowCell1 = lastRow.createCell(0);
lastRowCell1.setCellValue("合计");
Cell lastRowCell2 = lastRow.createCell(1);
lastRowCell2.setCellValue(activePeakPrice.doubleValue());
lastRowCell2.setCellValue("-");
Cell lastRowCell3 = lastRow.createCell(2);
lastRowCell3.setCellValue(activeHighPrice.doubleValue());
lastRowCell3.setCellValue("-");
Cell lastRowCell4 = lastRow.createCell(3);
lastRowCell4.setCellValue(activeFlatPrice.doubleValue());
lastRowCell4.setCellValue(activePeakPrice.doubleValue());
Cell lastRowCell5 = lastRow.createCell(4);
lastRowCell5.setCellValue(activeValleyPrice.doubleValue());
lastRowCell5.setCellValue(activeHighPrice.doubleValue());
Cell lastRowCell6 = lastRow.createCell(5);
lastRowCell6.setCellValue(activeTotalPrice.doubleValue());
lastRowCell6.setCellValue(activeFlatPrice.doubleValue());
Cell lastRowCell7 = lastRow.createCell(6);
lastRowCell7.setCellValue(reActivePeakPrice.doubleValue());
lastRowCell7.setCellValue(activeValleyPrice.doubleValue());
Cell lastRowCell8 = lastRow.createCell(7);
lastRowCell8.setCellValue(reActiveHighPrice.doubleValue());
lastRowCell8.setCellValue(activeTotalPrice.doubleValue());
Cell lastRowCell9 = lastRow.createCell(8);
lastRowCell9.setCellValue(reActiveFlatPrice.doubleValue());
lastRowCell9.setCellValue(reActivePeakPrice.doubleValue());
Cell lastRowCell10 = lastRow.createCell(9);
lastRowCell10.setCellValue(reActiveValleyPrice.doubleValue());
lastRowCell10.setCellValue(reActiveHighPrice.doubleValue());
Cell lastRowCell11 = lastRow.createCell(10);
lastRowCell11.setCellValue(reActiveTotalPrice.doubleValue());
lastRowCell11.setCellValue(reActiveFlatPrice.doubleValue());
Cell lastRowCell12 = lastRow.createCell(11);
lastRowCell12.setCellValue(actualRevenue.doubleValue());
lastRowCell12.setCellValue(reActiveValleyPrice.doubleValue());
Cell lastRowCell13 = lastRow.createCell(12);
lastRowCell13.setCellValue(reActiveTotalPrice.doubleValue());
Cell lastRowCell14 = lastRow.createCell(13);
lastRowCell14.setCellValue(actualRevenue.doubleValue());
Iterator<Cell> lastRowCellIterator = lastRow.cellIterator();
while (lastRowCellIterator.hasNext()) {
int i = lastRowCellIterator.next().getColumnIndex();

View File

@ -0,0 +1,199 @@
package com.xzzn.ems.service.impl;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.xzzn.common.exception.ServiceException;
import com.xzzn.common.utils.StringUtils;
import com.xzzn.common.utils.http.HttpUtils;
import com.xzzn.ems.config.WeatherApiProperties;
import com.xzzn.ems.domain.EmsSiteSetting;
import com.xzzn.ems.domain.vo.StatisAmmeterDateRequest;
import com.xzzn.ems.domain.vo.WeatherSyncResultVo;
import com.xzzn.ems.mapper.EmsSiteSettingMapper;
import com.xzzn.ems.mapper.EmsSiteWeatherDayMapper;
import com.xzzn.ems.service.IEmsWeatherSyncService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
@Service
public class EmsWeatherSyncServiceImpl implements IEmsWeatherSyncService {
private static final Logger log = LoggerFactory.getLogger(EmsWeatherSyncServiceImpl.class);
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");
@Autowired
private WeatherApiProperties weatherApiProperties;
@Autowired
private EmsSiteSettingMapper emsSiteSettingMapper;
@Autowired
private EmsSiteWeatherDayMapper emsSiteWeatherDayMapper;
@Override
public WeatherSyncResultVo syncWeatherByDateRange(StatisAmmeterDateRequest requestVo) {
if (!weatherApiProperties.isEnabled()) {
throw new ServiceException("天气同步未启用,请先设置 weather.api.enabled=true");
}
if (requestVo == null || StringUtils.isEmpty(requestVo.getSiteId())
|| StringUtils.isEmpty(requestVo.getStartTime())
|| StringUtils.isEmpty(requestVo.getEndTime())) {
throw new ServiceException("siteId、startTime、endTime 不能为空");
}
LocalDate startDate = parseDate(requestVo.getStartTime(), "startTime");
LocalDate endDate = parseDate(requestVo.getEndTime(), "endTime");
if (endDate.isBefore(startDate)) {
throw new ServiceException("endTime 不能早于 startTime");
}
EmsSiteSetting siteSetting = emsSiteSettingMapper.selectEmsSiteSettingBySiteId(requestVo.getSiteId());
if (siteSetting == null) {
throw new ServiceException("未找到站点配置siteId=" + requestVo.getSiteId());
}
BigDecimal latitude = siteSetting.getLatitude();
BigDecimal longitude = siteSetting.getLongitude();
if (latitude == null || longitude == null) {
throw new ServiceException("站点缺少经纬度配置siteId=" + requestVo.getSiteId());
}
String url = buildWeatherUrl(latitude, longitude, startDate, endDate);
String response = HttpUtils.sendGet(url);
if (StringUtils.isEmpty(response)) {
throw new ServiceException("调用天气接口失败,返回为空");
}
JSONObject root = JSONObject.parseObject(response);
JSONObject daily = root == null ? null : root.getJSONObject("daily");
JSONArray dates = daily == null ? null : daily.getJSONArray("time");
JSONArray codes = daily == null ? null : daily.getJSONArray("weather_code");
if (dates == null || codes == null || dates.size() == 0 || dates.size() != codes.size()) {
throw new ServiceException("天气接口返回格式异常daily.time/weather_code 不可用");
}
WeatherSyncResultVo resultVo = new WeatherSyncResultVo();
resultVo.setSiteId(requestVo.getSiteId());
resultVo.setStartTime(requestVo.getStartTime());
resultVo.setEndTime(requestVo.getEndTime());
resultVo.setTotalDays(dates.size());
int updated = 0;
int inserted = 0;
for (int i = 0; i < dates.size(); i++) {
String dateStr = dates.getString(i);
int weatherCode = codes.getIntValue(i);
String weatherDesc = mapWeatherDesc(weatherCode);
int updateRows = emsSiteWeatherDayMapper.updateWeatherDesc(
requestVo.getSiteId(), dateStr, weatherDesc, weatherCode);
if (updateRows > 0) {
updated++;
continue;
}
int count = emsSiteWeatherDayMapper.selectCountBySiteAndDate(requestVo.getSiteId(), dateStr);
if (count == 0) {
emsSiteWeatherDayMapper.insertSiteWeatherDay(
requestVo.getSiteId(), dateStr, weatherDesc, weatherCode, "open-meteo");
inserted++;
continue;
}
// 理论上不会到这里,防御性兜底再更新一次
updated += emsSiteWeatherDayMapper.updateWeatherDesc(
requestVo.getSiteId(), dateStr, weatherDesc, weatherCode);
}
resultVo.setUpdatedDays(updated);
resultVo.setInsertedDays(inserted);
resultVo.setSuccessDays(updated + inserted);
resultVo.setMessage("天气同步完成");
log.info("天气同步完成, siteId: {}, startTime: {}, endTime: {}, totalDays: {}, updatedDays: {}, insertedDays: {}",
requestVo.getSiteId(), requestVo.getStartTime(), requestVo.getEndTime(),
resultVo.getTotalDays(), updated, inserted);
return resultVo;
}
private LocalDate parseDate(String value, String fieldName) {
try {
return LocalDate.parse(value, DATE_FORMATTER);
} catch (Exception e) {
throw new ServiceException(fieldName + " 格式错误,应为 yyyy-MM-dd");
}
}
private String buildWeatherUrl(BigDecimal latitude, BigDecimal longitude, LocalDate startDate, LocalDate endDate) {
StringBuilder builder = new StringBuilder();
builder.append(weatherApiProperties.getBaseUrl())
.append("?latitude=").append(latitude.stripTrailingZeros().toPlainString())
.append("&longitude=").append(longitude.stripTrailingZeros().toPlainString())
.append("&start_date=").append(startDate.format(DATE_FORMATTER))
.append("&end_date=").append(endDate.format(DATE_FORMATTER))
.append("&daily=weather_code")
.append("&timezone=").append(urlEncode(weatherApiProperties.getTimezone()));
if (StringUtils.isNotEmpty(weatherApiProperties.getApiKey())) {
builder.append("&apikey=").append(urlEncode(weatherApiProperties.getApiKey()));
}
return builder.toString();
}
private String urlEncode(String value) {
try {
return URLEncoder.encode(value, StandardCharsets.UTF_8.name());
} catch (UnsupportedEncodingException e) {
throw new ServiceException("URL 编码失败");
}
}
private String mapWeatherDesc(int weatherCode) {
switch (weatherCode) {
case 0:
return "";
case 1:
case 2:
case 3:
return "多云";
case 45:
case 48:
return "";
case 51:
case 53:
case 55:
return "毛毛雨";
case 56:
case 57:
return "冻毛毛雨";
case 61:
case 63:
case 65:
return "";
case 66:
case 67:
return "冻雨";
case 71:
case 73:
case 75:
return "";
case 77:
return "雪粒";
case 80:
case 81:
case 82:
return "阵雨";
case 85:
case 86:
return "阵雪";
case 95:
return "雷暴";
case 96:
case 99:
return "雷暴伴冰雹";
default:
return "未知(" + weatherCode + ")";
}
}
}

View File

@ -477,7 +477,8 @@ public class GeneralQueryServiceImpl implements IGeneralQueryService
// 4. 构建DeviceItem
return new DevicePointDataList(deviceId, pointValueList,parentDeviceId,
stats.max, stats.min,stats.avg,stats.diff,stats.maxDate,stats.minDate);
stats.max, stats.min,stats.avg,stats.diff,stats.maxDate,stats.minDate,
stats.q1, stats.median, stats.q3);
})// 关键排序步骤先按deviceId升序再按parentDeviceId升序
.sorted(
Comparator.comparing(DevicePointDataList::getDeviceId) // 第一排序键deviceId
@ -513,7 +514,8 @@ public class GeneralQueryServiceImpl implements IGeneralQueryService
Stats stats = clacStats(deviceDataList);
return new DevicePointDataList(deviceId, deviceDataList,null,
stats.max, stats.min,stats.avg,stats.diff,stats.maxDate,stats.minDate);
stats.max, stats.min,stats.avg,stats.diff,stats.maxDate,stats.minDate,
stats.q1, stats.median, stats.q3);
})
.collect(Collectors.toList());
@ -536,7 +538,7 @@ public class GeneralQueryServiceImpl implements IGeneralQueryService
.collect(Collectors.toList());
if (validPairs.isEmpty()) {
return new Stats(null, null, null, null, null, null);
return new Stats(null, null, null, null, null, null, null, null, null);
}
// 计算最大最小值
Optional<ValueTimePair> maxPair = validPairs.stream().max((p1, p2) -> p1.value.compareTo(p2.value));
@ -546,8 +548,17 @@ public class GeneralQueryServiceImpl implements IGeneralQueryService
BigDecimal avgValue = sum.divide(BigDecimal.valueOf(validPairs.size()), 4, BigDecimal.ROUND_HALF_UP);
// 增量数据,计算差值
BigDecimal diff = maxPair.get().value.subtract(minPair.get().value);
// 计算分位数统计值
List<BigDecimal> sortedValues = validPairs.stream()
.map(pair -> pair.value)
.sorted()
.collect(Collectors.toList());
BoxPlotData boxStats = calculateBoxPlotData(sortedValues);
return new Stats(maxPair.get().value,minPair.get().value,avgValue,diff,maxPair.get().time,minPair.get().time);
return new Stats(maxPair.get().value,minPair.get().value,avgValue,diff,maxPair.get().time,minPair.get().time,
boxStats == null ? null : boxStats.q1,
boxStats == null ? null : boxStats.median,
boxStats == null ? null : boxStats.q3);
}
private BigDecimal convertToBigDecimal(Object pointValue) {
@ -800,15 +811,22 @@ public class GeneralQueryServiceImpl implements IGeneralQueryService
private final String minDate;
private final BigDecimal avg;
private final BigDecimal diff;
private final BigDecimal q1;
private final BigDecimal median;
private final BigDecimal q3;
public Stats(BigDecimal max, BigDecimal min, BigDecimal avg, BigDecimal diff,
String maxDate, String minDate) {
String maxDate, String minDate,
BigDecimal q1, BigDecimal median, BigDecimal q3) {
this.max = max;
this.maxDate = maxDate;
this.min = min;
this.minDate = minDate;
this.avg = avg;
this.diff = diff;
this.q1 = q1;
this.median = median;
this.q3 = q3;
}
}

View File

@ -512,10 +512,16 @@
<select id="getStackPointByMinute" parameterType="com.xzzn.ems.domain.vo.DateSearchRequest" resultType="com.xzzn.ems.domain.vo.StackPointVo">
WITH ranked AS (
SELECT
*,
DATE_FORMAT(DATE_ADD(DATE_FORMAT(p.update_time, '%Y-%m-%d %H:00:00'), INTERVAL CEIL(MINUTE(p.update_time) / 5) * 5 MINUTE)
, '%Y-%m-%d %H:%i:%s') AS group_time,
ROW_NUMBER() OVER (PARTITION BY p.device_id, date_format(p.update_time, '%Y-%m-%d %H:00:00'), CEIL(MINUTE(p.update_time) / 5) ORDER BY p.data_update_time DESC) as rn
p.site_id,
p.device_id,
p.stack_soc,
p.stack_soh,
p.avg_temperature,
FROM_UNIXTIME(((UNIX_TIMESTAMP(p.update_time) + 299) DIV 300) * 300) AS group_time,
ROW_NUMBER() OVER (
PARTITION BY p.device_id, ((UNIX_TIMESTAMP(p.update_time) + 299) DIV 300)
ORDER BY p.data_update_time DESC
) as rn
FROM
ems_battery_stack p
<include refid="statisCommonFilter"/>
@ -528,16 +534,27 @@
avg(t.avg_temperature) as avgTemp
FROM
ranked as t
where t.rn = 1
GROUP BY t.site_id, t.group_time
</select>
<select id="getStackDataByMinute" parameterType="com.xzzn.ems.domain.vo.DateSearchRequest" resultType="com.xzzn.ems.domain.vo.StackStatisListVo">
WITH ranked AS (
SELECT
*,
DATE_FORMAT(DATE_ADD(DATE_FORMAT(p.update_time, '%Y-%m-%d %H:00:00'), INTERVAL CEIL(MINUTE(p.update_time) / 5) * 5 MINUTE)
, '%Y-%m-%d %H:%i:%s') AS group_time,
ROW_NUMBER() OVER (PARTITION BY p.device_id, date_format(p.update_time, '%Y-%m-%d %H:00:00'), CEIL(MINUTE(p.update_time) / 5) ORDER BY p.data_update_time DESC) as rn
p.site_id,
p.device_id,
p.stack_soc,
p.stack_soh,
p.avg_temperature,
p.operating_temp,
p.stack_current,
p.stack_voltage,
p.data_update_time,
FROM_UNIXTIME(((UNIX_TIMESTAMP(p.update_time) + 299) DIV 300) * 300) AS group_time,
ROW_NUMBER() OVER (
PARTITION BY p.device_id, ((UNIX_TIMESTAMP(p.update_time) + 299) DIV 300)
ORDER BY p.data_update_time DESC
) as rn
FROM
ems_battery_stack p
<include refid="statisCommonFilter"/>
@ -574,4 +591,4 @@
ranked
GROUP BY site_id,groupTime
</select>
</mapper>
</mapper>

View File

@ -236,26 +236,40 @@
<select id="getRevenueDataBySiteId" resultType="com.xzzn.ems.domain.vo.AmmeterRevenueStatisListVo">
select
t.data_date as dataTime,
ROUND(t.peak_charge_diff * pc.peak, 3) as activePeakPrice,
ROUND(t.peak_discharge_diff * pc.peak, 3) as reActivePeakPrice,
ROUND(t.high_charge_diff * pc.high, 3) as activeHighPrice,
ROUND(t.high_discharge_diff * pc.high, 3) as reActiveHighPrice,
ROUND(t.flat_charge_diff * pc.flat, 3) as activeFlatPrice,
ROUND(t.flat_discharge_diff * pc.flat, 3) as reActiveFlatPrice,
ROUND(t.valley_charge_diff * pc.valley, 3) as activeValleyPrice,
ROUND(t.valley_discharge_diff * pc.valley, 3) as reActiveValleyPrice
COALESCE(c.is_workday, CASE WHEN WEEKDAY(t.data_date) &lt; 5 THEN 1 ELSE 0 END) as isWorkday,
CASE
WHEN COALESCE(c.is_workday, CASE WHEN WEEKDAY(t.data_date) &lt; 5 THEN 1 ELSE 0 END) = 1 THEN '工作日'
ELSE '节假日'
END as dayType,
COALESCE(NULLIF(TRIM(w.weather_desc), ''), '--') as weatherDesc,
ROUND(IFNULL(t.peak_charge_diff, 0) * IFNULL(pc.peak, 0), 3) as activePeakPrice,
ROUND(IFNULL(t.peak_discharge_diff, 0) * IFNULL(pc.peak, 0), 3) as reActivePeakPrice,
ROUND(IFNULL(t.high_charge_diff, 0) * IFNULL(pc.high, 0), 3) as activeHighPrice,
ROUND(IFNULL(t.high_discharge_diff, 0) * IFNULL(pc.high, 0), 3) as reActiveHighPrice,
ROUND(IFNULL(t.flat_charge_diff, 0) * IFNULL(pc.flat, 0), 3) as activeFlatPrice,
ROUND(IFNULL(t.flat_discharge_diff, 0) * IFNULL(pc.flat, 0), 3) as reActiveFlatPrice,
ROUND(IFNULL(t.valley_charge_diff, 0) * IFNULL(pc.valley, 0), 3) as activeValleyPrice,
ROUND(IFNULL(t.valley_discharge_diff, 0) * IFNULL(pc.valley, 0), 3) as reActiveValleyPrice
from ems_daily_energy_data t
left join (
select
id, site_id, peak, high, flat, valley,
CONCAT(year, '-', LPAD(month, 2, '0')) as yearMonth
from ems_energy_price_config
where 1=1
<if test="siteId != null">
and site_id = #{siteId}
</if>
order by year, month
) pc on pc.yearMonth = DATE_FORMAT(t.data_date, '%Y-%m')
left join ems_calendar_day c on c.calendar_date = t.data_date
left join ems_site_weather_day w on w.site_id = t.site_id and w.calendar_date = t.data_date
left join ems_energy_price_config pc on pc.id = COALESCE(
(
select p.id
from ems_energy_price_config p
where p.site_id = t.site_id
and STR_TO_DATE(CONCAT(p.year, '-', LPAD(p.month, 2, '0'), '-01'), '%Y-%m-%d') &lt;= DATE_FORMAT(t.data_date, '%Y-%m-01')
order by STR_TO_DATE(CONCAT(p.year, '-', LPAD(p.month, 2, '0'), '-01'), '%Y-%m-%d') desc
limit 1
),
(
select p2.id
from ems_energy_price_config p2
where p2.site_id = t.site_id
order by STR_TO_DATE(CONCAT(p2.year, '-', LPAD(p2.month, 2, '0'), '-01'), '%Y-%m-%d') asc
limit 1
)
)
where 1=1
<if test="siteId != null">
and t.site_id = #{siteId}
@ -266,6 +280,7 @@
<if test="endTime != null">
and t.data_date &lt;= #{endTime}
</if>
order by t.data_date desc
</select>
</mapper>
</mapper>

View File

@ -22,6 +22,12 @@
<result property="isAlarm" column="is_alarm"/>
<result property="pointType" column="point_type"/>
<result property="calcExpression" column="calc_expression"/>
<result property="collectEnabled" column="collect_enabled"/>
<result property="collectSource" column="collect_source"/>
<result property="modbusRegisterType" column="modbus_register_type"/>
<result property="modbusDataType" column="modbus_data_type"/>
<result property="modbusReadOrder" column="modbus_read_order"/>
<result property="modbusGroup" column="modbus_group"/>
<result property="createBy" column="create_by"/>
<result property="createTime" column="create_time"/>
<result property="updateBy" column="update_by"/>
@ -32,6 +38,7 @@
<sql id="selectEmsPointConfigVo">
select id, point_id, site_id, device_category, device_id, point_name, data_key, point_desc, register_address,
data_unit, data_a, data_k, data_b, data_bit, is_alarm, point_type, calc_expression,
collect_enabled, collect_source, modbus_register_type, modbus_data_type, modbus_read_order, modbus_group,
create_by, create_time, update_by, update_time, remark
from ems_point_config
</sql>
@ -51,6 +58,10 @@
<if test="dataKey != null and dataKey != ''">and data_key like concat('%', #{dataKey}, '%')</if>
<if test="pointDesc != null and pointDesc != ''">and point_desc like concat('%', #{pointDesc}, '%')</if>
<if test="pointType != null and pointType != ''">and point_type = #{pointType}</if>
<if test="collectEnabled != null">and collect_enabled = #{collectEnabled}</if>
<if test="collectSource != null and collectSource != ''">and collect_source = #{collectSource}</if>
<if test="modbusRegisterType != null and modbusRegisterType != ''">and modbus_register_type = #{modbusRegisterType}</if>
<if test="modbusDataType != null and modbusDataType != ''">and modbus_data_type = #{modbusDataType}</if>
<if test="registerAddress != null and registerAddress != ''">and register_address = #{registerAddress}</if>
</where>
order by update_time desc, id desc
@ -75,6 +86,12 @@
<if test="isAlarm != null">is_alarm,</if>
<if test="pointType != null">point_type,</if>
<if test="calcExpression != null">calc_expression,</if>
<if test="collectEnabled != null">collect_enabled,</if>
<if test="collectSource != null">collect_source,</if>
<if test="modbusRegisterType != null">modbus_register_type,</if>
<if test="modbusDataType != null">modbus_data_type,</if>
<if test="modbusReadOrder != null">modbus_read_order,</if>
<if test="modbusGroup != null">modbus_group,</if>
<if test="createBy != null">create_by,</if>
<if test="createTime != null">create_time,</if>
<if test="updateBy != null">update_by,</if>
@ -98,6 +115,12 @@
<if test="isAlarm != null">#{isAlarm},</if>
<if test="pointType != null">#{pointType},</if>
<if test="calcExpression != null">#{calcExpression},</if>
<if test="collectEnabled != null">#{collectEnabled},</if>
<if test="collectSource != null">#{collectSource},</if>
<if test="modbusRegisterType != null">#{modbusRegisterType},</if>
<if test="modbusDataType != null">#{modbusDataType},</if>
<if test="modbusReadOrder != null">#{modbusReadOrder},</if>
<if test="modbusGroup != null">#{modbusGroup},</if>
<if test="createBy != null">#{createBy},</if>
<if test="createTime != null">#{createTime},</if>
<if test="updateBy != null">#{updateBy},</if>
@ -110,6 +133,7 @@
insert into ems_point_config (
point_id, site_id, device_category, device_id, point_name, data_key, point_desc, register_address,
data_unit, data_a, data_k, data_b, data_bit, is_alarm, point_type, calc_expression,
collect_enabled, collect_source, modbus_register_type, modbus_data_type, modbus_read_order, modbus_group,
create_by, create_time, update_by, update_time, remark
)
values
@ -131,6 +155,12 @@
#{item.isAlarm},
#{item.pointType},
#{item.calcExpression},
#{item.collectEnabled},
#{item.collectSource},
#{item.modbusRegisterType},
#{item.modbusDataType},
#{item.modbusReadOrder},
#{item.modbusGroup},
#{item.createBy},
now(),
#{item.updateBy},
@ -159,6 +189,12 @@
<if test="isAlarm != null">is_alarm = #{isAlarm},</if>
<if test="pointType != null">point_type = #{pointType},</if>
<if test="calcExpression != null">calc_expression = #{calcExpression},</if>
<if test="collectEnabled != null">collect_enabled = #{collectEnabled},</if>
<if test="collectSource != null">collect_source = #{collectSource},</if>
<if test="modbusRegisterType != null">modbus_register_type = #{modbusRegisterType},</if>
<if test="modbusDataType != null">modbus_data_type = #{modbusDataType},</if>
<if test="modbusReadOrder != null">modbus_read_order = #{modbusReadOrder},</if>
<if test="modbusGroup != null">modbus_group = #{modbusGroup},</if>
<if test="updateBy != null">update_by = #{updateBy},</if>
<if test="updateTime != null">update_time = #{updateTime},</if>
<if test="remark != null">remark = #{remark},</if>
@ -192,11 +228,13 @@
insert into ems_point_config (
point_id, site_id, device_category, device_id, point_name, data_key, point_desc, register_address,
data_unit, data_a, data_k, data_b, data_bit, is_alarm, point_type, calc_expression,
collect_enabled, collect_source, modbus_register_type, modbus_data_type, modbus_read_order, modbus_group,
create_by, create_time, update_by, update_time, remark
)
select
concat('PT_', replace(uuid(), '-', '')), #{targetSiteId}, device_category, device_id, point_name, data_key, point_desc, register_address,
data_unit, data_a, data_k, data_b, data_bit, is_alarm, point_type, calc_expression,
collect_enabled, collect_source, modbus_register_type, modbus_data_type, modbus_read_order, modbus_group,
#{operName}, now(), #{operName}, now(), remark
from ems_point_config
where site_id = #{templateSiteId}
@ -287,4 +325,19 @@
</foreach>
</select>
<select id="selectModbusCollectPointConfigs" resultMap="EmsPointConfigResult">
<include refid="selectEmsPointConfigVo"/>
where collect_enabled = 1
and collect_source = 'MODBUS'
and (point_type is null or point_type <> 'calc')
and (is_alarm is null or is_alarm = 0)
and register_address is not null and register_address <> ''
and modbus_register_type is not null and modbus_register_type <> ''
and modbus_data_type is not null and modbus_data_type <> ''
<if test="siteId != null and siteId != ''">
and site_id = #{siteId}
</if>
order by site_id asc, device_id asc, modbus_read_order asc, id asc
</select>
</mapper>

View File

@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.xzzn.ems.mapper.EmsSiteWeatherDayMapper">
<update id="updateWeatherDesc">
update ems_site_weather_day
set weather_desc = #{weatherDesc},
weather_code = #{weatherCode},
update_time = now()
where site_id = #{siteId}
and calendar_date = #{calendarDate}
</update>
<select id="selectCountBySiteAndDate" resultType="int">
select count(1)
from ems_site_weather_day
where site_id = #{siteId}
and calendar_date = #{calendarDate}
</select>
<insert id="insertSiteWeatherDay">
insert into ems_site_weather_day(site_id, calendar_date, weather_desc, weather_code, source, create_time, update_time)
values (#{siteId}, #{calendarDate}, #{weatherDesc}, #{weatherCode}, #{source}, now(), now())
</insert>
</mapper>