From 725ae14548fc59bd653a7f279f0c3a5c6d00a093 Mon Sep 17 00:00:00 2001 From: mashili Date: Sat, 23 Aug 2025 00:09:02 +0800 Subject: [PATCH] =?UTF-8?q?20250808=E4=BC=98=E5=8C=96-=E5=8D=95=E4=BD=93?= =?UTF-8?q?=E7=94=B5=E6=B1=A0=E8=A1=A8=E6=8B=86=E5=88=86=E4=B8=BA=E5=88=86?= =?UTF-8?q?=E9=92=9F=EF=BC=8C=E6=97=B6=EF=BC=8C=E5=A4=A9=EF=BC=8C=E6=9C=88?= =?UTF-8?q?=E7=BA=A7=E5=9B=9B=E4=B8=AA=E8=A1=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/resources/application-druid.yml | 2 +- .../common/constant/RedisKeyConstants.java | 5 + .../xzzn/common/core/redis/RedisCache.java | 11 + .../java/com/xzzn/common/utils/DateUtils.java | 6 + .../xzzn/ems/domain/EmsBatteryDataDay.java | 242 +++++++ .../xzzn/ems/domain/EmsBatteryDataHour.java | 242 +++++++ .../ems/domain/EmsBatteryDataMinutes.java | 226 ++++++ .../xzzn/ems/domain/EmsBatteryDataMonth.java | 242 +++++++ .../ems/mapper/EmsBatteryDataDayMapper.java | 72 ++ .../ems/mapper/EmsBatteryDataHourMapper.java | 72 ++ .../mapper/EmsBatteryDataMinutesMapper.java | 68 ++ .../ems/mapper/EmsBatteryDataMonthMapper.java | 72 ++ .../impl/DDSDataProcessServiceImpl.java | 85 ++- .../impl/FXXDataProcessServiceImpl.java | 49 +- .../utils/AbstractBatteryDataProcessor.java | 681 ++++++++++++++++++ .../mapper/ems/EmsBatteryDataDayMapper.xml | 201 ++++++ .../mapper/ems/EmsBatteryDataHourMapper.xml | 202 ++++++ .../ems/EmsBatteryDataMinutesMapper.xml | 152 ++++ .../mapper/ems/EmsBatteryDataMonthMapper.xml | 202 ++++++ 19 files changed, 2785 insertions(+), 47 deletions(-) create mode 100644 ems-system/src/main/java/com/xzzn/ems/domain/EmsBatteryDataDay.java create mode 100644 ems-system/src/main/java/com/xzzn/ems/domain/EmsBatteryDataHour.java create mode 100644 ems-system/src/main/java/com/xzzn/ems/domain/EmsBatteryDataMinutes.java create mode 100644 ems-system/src/main/java/com/xzzn/ems/domain/EmsBatteryDataMonth.java create mode 100644 ems-system/src/main/java/com/xzzn/ems/mapper/EmsBatteryDataDayMapper.java create mode 100644 ems-system/src/main/java/com/xzzn/ems/mapper/EmsBatteryDataHourMapper.java create mode 100644 ems-system/src/main/java/com/xzzn/ems/mapper/EmsBatteryDataMinutesMapper.java create mode 100644 ems-system/src/main/java/com/xzzn/ems/mapper/EmsBatteryDataMonthMapper.java create mode 100644 ems-system/src/main/java/com/xzzn/ems/utils/AbstractBatteryDataProcessor.java create mode 100644 ems-system/src/main/resources/mapper/ems/EmsBatteryDataDayMapper.xml create mode 100644 ems-system/src/main/resources/mapper/ems/EmsBatteryDataHourMapper.xml create mode 100644 ems-system/src/main/resources/mapper/ems/EmsBatteryDataMinutesMapper.xml create mode 100644 ems-system/src/main/resources/mapper/ems/EmsBatteryDataMonthMapper.xml diff --git a/ems-admin/src/main/resources/application-druid.yml b/ems-admin/src/main/resources/application-druid.yml index 3c7fe20..c7ce773 100644 --- a/ems-admin/src/main/resources/application-druid.yml +++ b/ems-admin/src/main/resources/application-druid.yml @@ -6,7 +6,7 @@ spring: druid: # 主库数据源 master: - url: jdbc:mysql://122.51.194.184:13306/setri_ems?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8 + url: jdbc:mysql://122.51.194.184:13306/setri_ems?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8 username: ems password: 12345678 # 从库数据源 diff --git a/ems-common/src/main/java/com/xzzn/common/constant/RedisKeyConstants.java b/ems-common/src/main/java/com/xzzn/common/constant/RedisKeyConstants.java index e16229c..58df29b 100644 --- a/ems-common/src/main/java/com/xzzn/common/constant/RedisKeyConstants.java +++ b/ems-common/src/main/java/com/xzzn/common/constant/RedisKeyConstants.java @@ -51,4 +51,9 @@ public class RedisKeyConstants * BMSD原始数据 redis key */ public static final String ORIGINAL_BMSD = "BMSD_"; + + /** + * mqtt监听的原始报文 + */ + public static final String MQTT_ORIGINAL = "mqtt_"; } diff --git a/ems-common/src/main/java/com/xzzn/common/core/redis/RedisCache.java b/ems-common/src/main/java/com/xzzn/common/core/redis/RedisCache.java index eb9dfdf..a8af534 100644 --- a/ems-common/src/main/java/com/xzzn/common/core/redis/RedisCache.java +++ b/ems-common/src/main/java/com/xzzn/common/core/redis/RedisCache.java @@ -276,4 +276,15 @@ public class RedisCache { return redisTemplate.delete(key); } + + /** + * 批量缓存 + * @param cacheMap + */ + public void multiSet(final Map cacheMap) + { + + redisTemplate.opsForValue().multiSet(cacheMap); + } + } diff --git a/ems-common/src/main/java/com/xzzn/common/utils/DateUtils.java b/ems-common/src/main/java/com/xzzn/common/utils/DateUtils.java index 786575b..a49d234 100644 --- a/ems-common/src/main/java/com/xzzn/common/utils/DateUtils.java +++ b/ems-common/src/main/java/com/xzzn/common/utils/DateUtils.java @@ -200,4 +200,10 @@ public class DateUtils extends org.apache.commons.lang3.time.DateUtils long date = calendar.get(Calendar.DAY_OF_MONTH); // 月份从0开始,所以要加1 return date; } + + // LocalDateTime 转 Date(带时区) + public static Date convertToDate(LocalDateTime localDateTime) { + ZonedDateTime zonedDateTime = localDateTime.atZone(ZoneId.systemDefault()); + return Date.from(zonedDateTime.toInstant()); + } } diff --git a/ems-system/src/main/java/com/xzzn/ems/domain/EmsBatteryDataDay.java b/ems-system/src/main/java/com/xzzn/ems/domain/EmsBatteryDataDay.java new file mode 100644 index 0000000..6db06fd --- /dev/null +++ b/ems-system/src/main/java/com/xzzn/ems/domain/EmsBatteryDataDay.java @@ -0,0 +1,242 @@ +package com.xzzn.ems.domain; + +import java.math.BigDecimal; +import java.util.Date; +import com.fasterxml.jackson.annotation.JsonFormat; +import com.xzzn.common.core.domain.BaseEntity; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import com.xzzn.common.annotation.Excel; + +/** + * 单体电池天级数据对象 ems_battery_data_day + * + * @author xzzn + * @date 2025-08-20 + */ +public class EmsBatteryDataDay extends BaseEntity +{ + private static final long serialVersionUID = 1L; + + /** $column.columnComment */ + private Long id; + + /** 电池堆 */ + @Excel(name = "电池堆") + private String batteryPack; + + /** 电池簇 */ + @Excel(name = "电池簇") + private String batteryCluster; + + /** 单体编号 */ + @Excel(name = "单体编号") + private String batteryCellId; + + /** 电压 (V) */ + @Excel(name = "电压 (V)") + private BigDecimal voltage; + + /** 温度 (℃) */ + @Excel(name = "温度 (℃)") + private BigDecimal temperature; + + /** SOC (%) */ + @Excel(name = "SOC (%)") + private BigDecimal soc; + + /** SOH (%) */ + @Excel(name = "SOH (%)") + private BigDecimal soh; + + /** 数据采集时间 */ + @JsonFormat(pattern = "yyyy-MM-dd") + @Excel(name = "数据采集时间", width = 30, dateFormat = "yyyy-MM-dd") + private Date dataTimestamp; + + /** 站点id */ + @Excel(name = "站点id") + private String siteId; + + /** 设备唯一标识符 */ + @Excel(name = "设备唯一标识符") + private String deviceId; + + /** 簇设备id */ + @Excel(name = "簇设备id") + private String clusterDeviceId; + + /** 单体电池内阻 */ + @Excel(name = "单体电池内阻") + private BigDecimal interResistance; + + /** 天级时间维度 */ + @JsonFormat(pattern = "yyyy-MM-dd") + @Excel(name = "天级时间维度", width = 30, dateFormat = "yyyy-MM-dd") + private Date dayTime; + + public void setId(Long id) + { + this.id = id; + } + + public Long getId() + { + return id; + } + + public void setBatteryPack(String batteryPack) + { + this.batteryPack = batteryPack; + } + + public String getBatteryPack() + { + return batteryPack; + } + + public void setBatteryCluster(String batteryCluster) + { + this.batteryCluster = batteryCluster; + } + + public String getBatteryCluster() + { + return batteryCluster; + } + + public void setBatteryCellId(String batteryCellId) + { + this.batteryCellId = batteryCellId; + } + + public String getBatteryCellId() + { + return batteryCellId; + } + + public void setVoltage(BigDecimal voltage) + { + this.voltage = voltage; + } + + public BigDecimal getVoltage() + { + return voltage; + } + + public void setTemperature(BigDecimal temperature) + { + this.temperature = temperature; + } + + public BigDecimal getTemperature() + { + return temperature; + } + + public void setSoc(BigDecimal soc) + { + this.soc = soc; + } + + public BigDecimal getSoc() + { + return soc; + } + + public void setSoh(BigDecimal soh) + { + this.soh = soh; + } + + public BigDecimal getSoh() + { + return soh; + } + + public void setDataTimestamp(Date dataTimestamp) + { + this.dataTimestamp = dataTimestamp; + } + + public Date getDataTimestamp() + { + return dataTimestamp; + } + + public void setSiteId(String siteId) + { + this.siteId = siteId; + } + + public String getSiteId() + { + return siteId; + } + + public void setDeviceId(String deviceId) + { + this.deviceId = deviceId; + } + + public String getDeviceId() + { + return deviceId; + } + + public void setClusterDeviceId(String clusterDeviceId) + { + this.clusterDeviceId = clusterDeviceId; + } + + public String getClusterDeviceId() + { + return clusterDeviceId; + } + + public void setInterResistance(BigDecimal interResistance) + { + this.interResistance = interResistance; + } + + public BigDecimal getInterResistance() + { + return interResistance; + } + + public void setDayTime(Date dayTime) + { + this.dayTime = dayTime; + } + + public Date getDayTime() + { + return dayTime; + } + + @Override + public String toString() { + return new ToStringBuilder(this,ToStringStyle.MULTI_LINE_STYLE) + .append("id", getId()) + .append("batteryPack", getBatteryPack()) + .append("batteryCluster", getBatteryCluster()) + .append("batteryCellId", getBatteryCellId()) + .append("voltage", getVoltage()) + .append("temperature", getTemperature()) + .append("soc", getSoc()) + .append("soh", getSoh()) + .append("dataTimestamp", getDataTimestamp()) + .append("createBy", getCreateBy()) + .append("createTime", getCreateTime()) + .append("updateBy", getUpdateBy()) + .append("updateTime", getUpdateTime()) + .append("remark", getRemark()) + .append("siteId", getSiteId()) + .append("deviceId", getDeviceId()) + .append("clusterDeviceId", getClusterDeviceId()) + .append("interResistance", getInterResistance()) + .append("dayTime", getDayTime()) + .toString(); + } +} diff --git a/ems-system/src/main/java/com/xzzn/ems/domain/EmsBatteryDataHour.java b/ems-system/src/main/java/com/xzzn/ems/domain/EmsBatteryDataHour.java new file mode 100644 index 0000000..859504b --- /dev/null +++ b/ems-system/src/main/java/com/xzzn/ems/domain/EmsBatteryDataHour.java @@ -0,0 +1,242 @@ +package com.xzzn.ems.domain; + +import java.math.BigDecimal; +import java.util.Date; +import com.fasterxml.jackson.annotation.JsonFormat; +import com.xzzn.common.core.domain.BaseEntity; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import com.xzzn.common.annotation.Excel; + +/** + * 单体电池小时级数据对象 ems_battery_data_hour + * + * @author xzzn + * @date 2025-08-19 + */ +public class EmsBatteryDataHour extends BaseEntity +{ + private static final long serialVersionUID = 1L; + + /** $column.columnComment */ + private Long id; + + /** 电池堆 */ + @Excel(name = "电池堆") + private String batteryPack; + + /** 电池簇 */ + @Excel(name = "电池簇") + private String batteryCluster; + + /** 单体编号 */ + @Excel(name = "单体编号") + private String batteryCellId; + + /** 电压 (V) */ + @Excel(name = "电压 (V)") + private BigDecimal voltage; + + /** 温度 (℃) */ + @Excel(name = "温度 (℃)") + private BigDecimal temperature; + + /** SOC (%) */ + @Excel(name = "SOC (%)") + private BigDecimal soc; + + /** SOH (%) */ + @Excel(name = "SOH (%)") + private BigDecimal soh; + + /** 数据采集时间 */ + @JsonFormat(pattern = "yyyy-MM-dd") + @Excel(name = "数据采集时间", width = 30, dateFormat = "yyyy-MM-dd") + private Date dataTimestamp; + + /** 站点id */ + @Excel(name = "站点id") + private String siteId; + + /** 设备唯一标识符 */ + @Excel(name = "设备唯一标识符") + private String deviceId; + + /** 簇设备id */ + @Excel(name = "簇设备id") + private String clusterDeviceId; + + /** 单体电池内阻 */ + @Excel(name = "单体电池内阻") + private BigDecimal interResistance; + + /** 小时级时间维度 */ + @JsonFormat(pattern = "yyyy-MM-dd") + @Excel(name = "小时级时间维度", width = 30, dateFormat = "yyyy-MM-dd") + private Date hourTime; + + public void setId(Long id) + { + this.id = id; + } + + public Long getId() + { + return id; + } + + public void setBatteryPack(String batteryPack) + { + this.batteryPack = batteryPack; + } + + public String getBatteryPack() + { + return batteryPack; + } + + public void setBatteryCluster(String batteryCluster) + { + this.batteryCluster = batteryCluster; + } + + public String getBatteryCluster() + { + return batteryCluster; + } + + public void setBatteryCellId(String batteryCellId) + { + this.batteryCellId = batteryCellId; + } + + public String getBatteryCellId() + { + return batteryCellId; + } + + public void setVoltage(BigDecimal voltage) + { + this.voltage = voltage; + } + + public BigDecimal getVoltage() + { + return voltage; + } + + public void setTemperature(BigDecimal temperature) + { + this.temperature = temperature; + } + + public BigDecimal getTemperature() + { + return temperature; + } + + public void setSoc(BigDecimal soc) + { + this.soc = soc; + } + + public BigDecimal getSoc() + { + return soc; + } + + public void setSoh(BigDecimal soh) + { + this.soh = soh; + } + + public BigDecimal getSoh() + { + return soh; + } + + public void setDataTimestamp(Date dataTimestamp) + { + this.dataTimestamp = dataTimestamp; + } + + public Date getDataTimestamp() + { + return dataTimestamp; + } + + public void setSiteId(String siteId) + { + this.siteId = siteId; + } + + public String getSiteId() + { + return siteId; + } + + public void setDeviceId(String deviceId) + { + this.deviceId = deviceId; + } + + public String getDeviceId() + { + return deviceId; + } + + public void setClusterDeviceId(String clusterDeviceId) + { + this.clusterDeviceId = clusterDeviceId; + } + + public String getClusterDeviceId() + { + return clusterDeviceId; + } + + public void setInterResistance(BigDecimal interResistance) + { + this.interResistance = interResistance; + } + + public BigDecimal getInterResistance() + { + return interResistance; + } + + public void setHourTime(Date hourTime) + { + this.hourTime = hourTime; + } + + public Date getHourTime() + { + return hourTime; + } + + @Override + public String toString() { + return new ToStringBuilder(this,ToStringStyle.MULTI_LINE_STYLE) + .append("id", getId()) + .append("batteryPack", getBatteryPack()) + .append("batteryCluster", getBatteryCluster()) + .append("batteryCellId", getBatteryCellId()) + .append("voltage", getVoltage()) + .append("temperature", getTemperature()) + .append("soc", getSoc()) + .append("soh", getSoh()) + .append("dataTimestamp", getDataTimestamp()) + .append("createBy", getCreateBy()) + .append("createTime", getCreateTime()) + .append("updateBy", getUpdateBy()) + .append("updateTime", getUpdateTime()) + .append("remark", getRemark()) + .append("siteId", getSiteId()) + .append("deviceId", getDeviceId()) + .append("clusterDeviceId", getClusterDeviceId()) + .append("interResistance", getInterResistance()) + .append("hourTime", getHourTime()) + .toString(); + } +} diff --git a/ems-system/src/main/java/com/xzzn/ems/domain/EmsBatteryDataMinutes.java b/ems-system/src/main/java/com/xzzn/ems/domain/EmsBatteryDataMinutes.java new file mode 100644 index 0000000..24ed5fe --- /dev/null +++ b/ems-system/src/main/java/com/xzzn/ems/domain/EmsBatteryDataMinutes.java @@ -0,0 +1,226 @@ +package com.xzzn.ems.domain; + +import java.math.BigDecimal; +import java.util.Date; +import com.fasterxml.jackson.annotation.JsonFormat; +import com.xzzn.common.core.domain.BaseEntity; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import com.xzzn.common.annotation.Excel; + +/** + * 单体电池分钟级数据对象 ems_battery_data_minutes + * + * @author xzzn + * @date 2025-08-18 + */ +public class EmsBatteryDataMinutes extends BaseEntity +{ + private static final long serialVersionUID = 1L; + + /** $column.columnComment */ + private Long id; + + /** 电池堆 */ + @Excel(name = "电池堆") + private String batteryPack; + + /** 电池簇 */ + @Excel(name = "电池簇") + private String batteryCluster; + + /** 单体编号 */ + @Excel(name = "单体编号") + private String batteryCellId; + + /** 电压 (V) */ + @Excel(name = "电压 (V)") + private BigDecimal voltage; + + /** 温度 (℃) */ + @Excel(name = "温度 (℃)") + private BigDecimal temperature; + + /** SOC (%) */ + @Excel(name = "SOC (%)") + private BigDecimal soc; + + /** SOH (%) */ + @Excel(name = "SOH (%)") + private BigDecimal soh; + + /** 数据采集时间 */ + @JsonFormat(pattern = "yyyy-MM-dd") + @Excel(name = "数据采集时间", width = 30, dateFormat = "yyyy-MM-dd") + private Date dataTimestamp; + + /** 站点id */ + @Excel(name = "站点id") + private String siteId; + + /** 设备唯一标识符 */ + @Excel(name = "设备唯一标识符") + private String deviceId; + + /** 簇设备id */ + @Excel(name = "簇设备id") + private String clusterDeviceId; + + /** 单体电池内阻 */ + @Excel(name = "单体电池内阻") + private BigDecimal interResistance; + + public void setId(Long id) + { + this.id = id; + } + + public Long getId() + { + return id; + } + + public void setBatteryPack(String batteryPack) + { + this.batteryPack = batteryPack; + } + + public String getBatteryPack() + { + return batteryPack; + } + + public void setBatteryCluster(String batteryCluster) + { + this.batteryCluster = batteryCluster; + } + + public String getBatteryCluster() + { + return batteryCluster; + } + + public void setBatteryCellId(String batteryCellId) + { + this.batteryCellId = batteryCellId; + } + + public String getBatteryCellId() + { + return batteryCellId; + } + + public void setVoltage(BigDecimal voltage) + { + this.voltage = voltage; + } + + public BigDecimal getVoltage() + { + return voltage; + } + + public void setTemperature(BigDecimal temperature) + { + this.temperature = temperature; + } + + public BigDecimal getTemperature() + { + return temperature; + } + + public void setSoc(BigDecimal soc) + { + this.soc = soc; + } + + public BigDecimal getSoc() + { + return soc; + } + + public void setSoh(BigDecimal soh) + { + this.soh = soh; + } + + public BigDecimal getSoh() + { + return soh; + } + + public void setDataTimestamp(Date dataTimestamp) + { + this.dataTimestamp = dataTimestamp; + } + + public Date getDataTimestamp() + { + return dataTimestamp; + } + + public void setSiteId(String siteId) + { + this.siteId = siteId; + } + + public String getSiteId() + { + return siteId; + } + + public void setDeviceId(String deviceId) + { + this.deviceId = deviceId; + } + + public String getDeviceId() + { + return deviceId; + } + + public void setClusterDeviceId(String clusterDeviceId) + { + this.clusterDeviceId = clusterDeviceId; + } + + public String getClusterDeviceId() + { + return clusterDeviceId; + } + + public void setInterResistance(BigDecimal interResistance) + { + this.interResistance = interResistance; + } + + public BigDecimal getInterResistance() + { + return interResistance; + } + + @Override + public String toString() { + return new ToStringBuilder(this,ToStringStyle.MULTI_LINE_STYLE) + .append("id", getId()) + .append("batteryPack", getBatteryPack()) + .append("batteryCluster", getBatteryCluster()) + .append("batteryCellId", getBatteryCellId()) + .append("voltage", getVoltage()) + .append("temperature", getTemperature()) + .append("soc", getSoc()) + .append("soh", getSoh()) + .append("dataTimestamp", getDataTimestamp()) + .append("createBy", getCreateBy()) + .append("createTime", getCreateTime()) + .append("updateBy", getUpdateBy()) + .append("updateTime", getUpdateTime()) + .append("remark", getRemark()) + .append("siteId", getSiteId()) + .append("deviceId", getDeviceId()) + .append("clusterDeviceId", getClusterDeviceId()) + .append("interResistance", getInterResistance()) + .toString(); + } +} diff --git a/ems-system/src/main/java/com/xzzn/ems/domain/EmsBatteryDataMonth.java b/ems-system/src/main/java/com/xzzn/ems/domain/EmsBatteryDataMonth.java new file mode 100644 index 0000000..1828042 --- /dev/null +++ b/ems-system/src/main/java/com/xzzn/ems/domain/EmsBatteryDataMonth.java @@ -0,0 +1,242 @@ +package com.xzzn.ems.domain; + +import java.math.BigDecimal; +import java.util.Date; +import com.fasterxml.jackson.annotation.JsonFormat; +import com.xzzn.common.core.domain.BaseEntity; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import com.xzzn.common.annotation.Excel; + +/** + * 单体电池月级数据对象 ems_battery_data_month + * + * @author xzzn + * @date 2025-08-22 + */ +public class EmsBatteryDataMonth extends BaseEntity +{ + private static final long serialVersionUID = 1L; + + /** $column.columnComment */ + private Long id; + + /** 电池堆 */ + @Excel(name = "电池堆") + private String batteryPack; + + /** 电池簇 */ + @Excel(name = "电池簇") + private String batteryCluster; + + /** 单体编号 */ + @Excel(name = "单体编号") + private String batteryCellId; + + /** 电压 (V) */ + @Excel(name = "电压 (V)") + private BigDecimal voltage; + + /** 温度 (℃) */ + @Excel(name = "温度 (℃)") + private BigDecimal temperature; + + /** SOC (%) */ + @Excel(name = "SOC (%)") + private BigDecimal soc; + + /** SOH (%) */ + @Excel(name = "SOH (%)") + private BigDecimal soh; + + /** 数据采集时间 */ + @JsonFormat(pattern = "yyyy-MM-dd") + @Excel(name = "数据采集时间", width = 30, dateFormat = "yyyy-MM-dd") + private Date dataTimestamp; + + /** 站点id */ + @Excel(name = "站点id") + private String siteId; + + /** 设备唯一标识符 */ + @Excel(name = "设备唯一标识符") + private String deviceId; + + /** 簇设备id */ + @Excel(name = "簇设备id") + private String clusterDeviceId; + + /** 单体电池内阻 */ + @Excel(name = "单体电池内阻") + private BigDecimal interResistance; + + /** 月级时间维度 */ + @JsonFormat(pattern = "yyyy-MM-dd") + @Excel(name = "月级时间维度", width = 30, dateFormat = "yyyy-MM-dd") + private Date monthTime; + + public void setId(Long id) + { + this.id = id; + } + + public Long getId() + { + return id; + } + + public void setBatteryPack(String batteryPack) + { + this.batteryPack = batteryPack; + } + + public String getBatteryPack() + { + return batteryPack; + } + + public void setBatteryCluster(String batteryCluster) + { + this.batteryCluster = batteryCluster; + } + + public String getBatteryCluster() + { + return batteryCluster; + } + + public void setBatteryCellId(String batteryCellId) + { + this.batteryCellId = batteryCellId; + } + + public String getBatteryCellId() + { + return batteryCellId; + } + + public void setVoltage(BigDecimal voltage) + { + this.voltage = voltage; + } + + public BigDecimal getVoltage() + { + return voltage; + } + + public void setTemperature(BigDecimal temperature) + { + this.temperature = temperature; + } + + public BigDecimal getTemperature() + { + return temperature; + } + + public void setSoc(BigDecimal soc) + { + this.soc = soc; + } + + public BigDecimal getSoc() + { + return soc; + } + + public void setSoh(BigDecimal soh) + { + this.soh = soh; + } + + public BigDecimal getSoh() + { + return soh; + } + + public void setDataTimestamp(Date dataTimestamp) + { + this.dataTimestamp = dataTimestamp; + } + + public Date getDataTimestamp() + { + return dataTimestamp; + } + + public void setSiteId(String siteId) + { + this.siteId = siteId; + } + + public String getSiteId() + { + return siteId; + } + + public void setDeviceId(String deviceId) + { + this.deviceId = deviceId; + } + + public String getDeviceId() + { + return deviceId; + } + + public void setClusterDeviceId(String clusterDeviceId) + { + this.clusterDeviceId = clusterDeviceId; + } + + public String getClusterDeviceId() + { + return clusterDeviceId; + } + + public void setInterResistance(BigDecimal interResistance) + { + this.interResistance = interResistance; + } + + public BigDecimal getInterResistance() + { + return interResistance; + } + + public void setMonthTime(Date monthTime) + { + this.monthTime = monthTime; + } + + public Date getMonthTime() + { + return monthTime; + } + + @Override + public String toString() { + return new ToStringBuilder(this,ToStringStyle.MULTI_LINE_STYLE) + .append("id", getId()) + .append("batteryPack", getBatteryPack()) + .append("batteryCluster", getBatteryCluster()) + .append("batteryCellId", getBatteryCellId()) + .append("voltage", getVoltage()) + .append("temperature", getTemperature()) + .append("soc", getSoc()) + .append("soh", getSoh()) + .append("dataTimestamp", getDataTimestamp()) + .append("createBy", getCreateBy()) + .append("createTime", getCreateTime()) + .append("updateBy", getUpdateBy()) + .append("updateTime", getUpdateTime()) + .append("remark", getRemark()) + .append("siteId", getSiteId()) + .append("deviceId", getDeviceId()) + .append("clusterDeviceId", getClusterDeviceId()) + .append("interResistance", getInterResistance()) + .append("monthTime", getMonthTime()) + .toString(); + } +} diff --git a/ems-system/src/main/java/com/xzzn/ems/mapper/EmsBatteryDataDayMapper.java b/ems-system/src/main/java/com/xzzn/ems/mapper/EmsBatteryDataDayMapper.java new file mode 100644 index 0000000..45f7da2 --- /dev/null +++ b/ems-system/src/main/java/com/xzzn/ems/mapper/EmsBatteryDataDayMapper.java @@ -0,0 +1,72 @@ +package com.xzzn.ems.mapper; + +import java.time.LocalDateTime; +import java.util.List; +import com.xzzn.ems.domain.EmsBatteryDataDay; +import com.xzzn.ems.domain.EmsBatteryDataHour; +import org.apache.ibatis.annotations.Param; + +/** + * 单体电池天级数据Mapper接口 + * + * @author xzzn + * @date 2025-08-20 + */ +public interface EmsBatteryDataDayMapper +{ + /** + * 查询单体电池天级数据 + * + * @param id 单体电池天级数据主键 + * @return 单体电池天级数据 + */ + public EmsBatteryDataDay selectEmsBatteryDataDayById(Long id); + + /** + * 查询单体电池天级数据列表 + * + * @param emsBatteryDataDay 单体电池天级数据 + * @return 单体电池天级数据集合 + */ + public List selectEmsBatteryDataDayList(EmsBatteryDataDay emsBatteryDataDay); + + /** + * 新增单体电池天级数据 + * + * @param emsBatteryDataDay 单体电池天级数据 + * @return 结果 + */ + public int insertEmsBatteryDataDay(EmsBatteryDataDay emsBatteryDataDay); + + /** + * 修改单体电池天级数据 + * + * @param emsBatteryDataDay 单体电池天级数据 + * @return 结果 + */ + public int updateEmsBatteryDataDay(EmsBatteryDataDay emsBatteryDataDay); + + /** + * 删除单体电池天级数据 + * + * @param id 单体电池天级数据主键 + * @return 结果 + */ + public int deleteEmsBatteryDataDayById(Long id); + + /** + * 批量删除单体电池天级数据 + * + * @param ids 需要删除的数据主键集合 + * @return 结果 + */ + public int deleteEmsBatteryDataDayByIds(Long[] ids); + + EmsBatteryDataDay findDayMaxTemp(@Param("siteId") String siteId, @Param("stackId") String stackId, + @Param("clusterId") String clusterId, @Param("batteryId") String batteryId, @Param("dayStart") LocalDateTime dayStart); + + /** + * 批量插入或更新天级数据(存在则更新,不存在则插入) + */ + void batchInsertOrUpdate(@Param("list") List dayDataList); +} diff --git a/ems-system/src/main/java/com/xzzn/ems/mapper/EmsBatteryDataHourMapper.java b/ems-system/src/main/java/com/xzzn/ems/mapper/EmsBatteryDataHourMapper.java new file mode 100644 index 0000000..f7713fe --- /dev/null +++ b/ems-system/src/main/java/com/xzzn/ems/mapper/EmsBatteryDataHourMapper.java @@ -0,0 +1,72 @@ +package com.xzzn.ems.mapper; + +import java.time.LocalDateTime; +import java.util.List; +import com.xzzn.ems.domain.EmsBatteryDataHour; +import org.apache.ibatis.annotations.Param; + +/** + * 单体电池小时级数据Mapper接口 + * + * @author xzzn + * @date 2025-08-19 + */ +public interface EmsBatteryDataHourMapper +{ + /** + * 查询单体电池小时级数据 + * + * @param id 单体电池小时级数据主键 + * @return 单体电池小时级数据 + */ + public EmsBatteryDataHour selectEmsBatteryDataHourById(Long id); + + /** + * 查询单体电池小时级数据列表 + * + * @param emsBatteryDataHour 单体电池小时级数据 + * @return 单体电池小时级数据集合 + */ + public List selectEmsBatteryDataHourList(EmsBatteryDataHour emsBatteryDataHour); + + /** + * 新增单体电池小时级数据 + * + * @param emsBatteryDataHour 单体电池小时级数据 + * @return 结果 + */ + public int insertEmsBatteryDataHour(EmsBatteryDataHour emsBatteryDataHour); + + /** + * 修改单体电池小时级数据 + * + * @param emsBatteryDataHour 单体电池小时级数据 + * @return 结果 + */ + public int updateEmsBatteryDataHour(EmsBatteryDataHour emsBatteryDataHour); + + /** + * 删除单体电池小时级数据 + * + * @param id 单体电池小时级数据主键 + * @return 结果 + */ + public int deleteEmsBatteryDataHourById(Long id); + + /** + * 批量删除单体电池小时级数据 + * + * @param ids 需要删除的数据主键集合 + * @return 结果 + */ + public int deleteEmsBatteryDataHourByIds(Long[] ids); + + EmsBatteryDataHour findHourMaxTemp(@Param("siteId") String siteId, @Param("stackId") String stackId, + @Param("clusterId") String clusterId, @Param("batteryId") String batteryId, @Param("hourStart") LocalDateTime hourStart); + + /** + * 批量插入或更新小时级数据(存在则更新,不存在则插入) + */ + void batchInsertOrUpdate(@Param("list") List hourDataList); + +} diff --git a/ems-system/src/main/java/com/xzzn/ems/mapper/EmsBatteryDataMinutesMapper.java b/ems-system/src/main/java/com/xzzn/ems/mapper/EmsBatteryDataMinutesMapper.java new file mode 100644 index 0000000..b001591 --- /dev/null +++ b/ems-system/src/main/java/com/xzzn/ems/mapper/EmsBatteryDataMinutesMapper.java @@ -0,0 +1,68 @@ +package com.xzzn.ems.mapper; + +import java.util.Date; +import java.util.List; + +import com.xzzn.ems.domain.EmsBatteryData; +import com.xzzn.ems.domain.EmsBatteryDataMinutes; + +/** + * 单体电池分钟级数据Mapper接口 + * + * @author xzzn + * @date 2025-08-17 + */ +public interface EmsBatteryDataMinutesMapper +{ + /** + * 查询单体电池分钟级数据 + * + * @param dateDay 单体电池分钟级数据主键 + * @return 单体电池分钟级数据 + */ + public EmsBatteryDataMinutes selectEmsBatteryDataMinutesByDateDay(Date dateDay); + + /** + * 查询单体电池分钟级数据列表 + * + * @param emsBatteryDataMinutes 单体电池分钟级数据 + * @return 单体电池分钟级数据集合 + */ + public List selectEmsBatteryDataMinutesList(EmsBatteryDataMinutes emsBatteryDataMinutes); + + /** + * 新增单体电池分钟级数据 + * + * @param emsBatteryDataMinutes 单体电池分钟级数据 + * @return 结果 + */ + public int insertEmsBatteryDataMinutes(EmsBatteryDataMinutes emsBatteryDataMinutes); + + /** + * 修改单体电池分钟级数据 + * + * @param emsBatteryDataMinutes 单体电池分钟级数据 + * @return 结果 + */ + public int updateEmsBatteryDataMinutes(EmsBatteryDataMinutes emsBatteryDataMinutes); + + /** + * 删除单体电池分钟级数据 + * + * @param dateDay 单体电池分钟级数据主键 + * @return 结果 + */ + public int deleteEmsBatteryDataMinutesByDateDay(Date dateDay); + + /** + * 批量删除单体电池分钟级数据 + * + * @param dateDays 需要删除的数据主键集合 + * @return 结果 + */ + public int deleteEmsBatteryDataMinutesByDateDays(Date[] dateDays); + + int insertMinutesBatteryDataList(List emsBatteryDataList); + + public void deleteByTimeBeforeOneHour(String oneHourAgoStr); +} \ No newline at end of file diff --git a/ems-system/src/main/java/com/xzzn/ems/mapper/EmsBatteryDataMonthMapper.java b/ems-system/src/main/java/com/xzzn/ems/mapper/EmsBatteryDataMonthMapper.java new file mode 100644 index 0000000..cc5784e --- /dev/null +++ b/ems-system/src/main/java/com/xzzn/ems/mapper/EmsBatteryDataMonthMapper.java @@ -0,0 +1,72 @@ +package com.xzzn.ems.mapper; + +import java.time.LocalDateTime; +import java.util.List; + +import com.xzzn.ems.domain.EmsBatteryDataMonth; +import org.apache.ibatis.annotations.Param; + +/** + * 单体电池月级数据Mapper接口 + * + * @author xzzn + * @date 2025-08-22 + */ +public interface EmsBatteryDataMonthMapper +{ + /** + * 查询单体电池月级数据 + * + * @param id 单体电池月级数据主键 + * @return 单体电池月级数据 + */ + public EmsBatteryDataMonth selectEmsBatteryDataMonthById(Long id); + + /** + * 查询单体电池月级数据列表 + * + * @param emsBatteryDataMonth 单体电池月级数据 + * @return 单体电池月级数据集合 + */ + public List selectEmsBatteryDataMonthList(EmsBatteryDataMonth emsBatteryDataMonth); + + /** + * 新增单体电池月级数据 + * + * @param emsBatteryDataMonth 单体电池月级数据 + * @return 结果 + */ + public int insertEmsBatteryDataMonth(EmsBatteryDataMonth emsBatteryDataMonth); + + /** + * 修改单体电池月级数据 + * + * @param emsBatteryDataMonth 单体电池月级数据 + * @return 结果 + */ + public int updateEmsBatteryDataMonth(EmsBatteryDataMonth emsBatteryDataMonth); + + /** + * 删除单体电池月级数据 + * + * @param id 单体电池月级数据主键 + * @return 结果 + */ + public int deleteEmsBatteryDataMonthById(Long id); + + /** + * 批量删除单体电池月级数据 + * + * @param ids 需要删除的数据主键集合 + * @return 结果 + */ + public int deleteEmsBatteryDataMonthByIds(Long[] ids); + + EmsBatteryDataMonth findMonthMaxTemp(@Param("siteId") String siteId, @Param("stackId") String stackId, + @Param("clusterId") String clusterId, @Param("batteryId") String batteryId, @Param("monthStart") LocalDateTime monthStart); + + /** + * 批量插入或更新月级数据(存在则更新,不存在则插入) + */ + void batchInsertOrUpdate(@Param("list") List monthDataList); +} diff --git a/ems-system/src/main/java/com/xzzn/ems/service/impl/DDSDataProcessServiceImpl.java b/ems-system/src/main/java/com/xzzn/ems/service/impl/DDSDataProcessServiceImpl.java index 69c251f..cec9755 100644 --- a/ems-system/src/main/java/com/xzzn/ems/service/impl/DDSDataProcessServiceImpl.java +++ b/ems-system/src/main/java/com/xzzn/ems/service/impl/DDSDataProcessServiceImpl.java @@ -14,6 +14,7 @@ import com.xzzn.common.utils.StringUtils; import com.xzzn.ems.domain.*; import com.xzzn.ems.mapper.*; import com.xzzn.ems.service.IDDSDataProcessService; +import com.xzzn.ems.utils.AbstractBatteryDataProcessor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.BeanUtils; @@ -22,20 +23,22 @@ import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; @Service -public class DDSDataProcessServiceImpl implements IDDSDataProcessService { +public class DDSDataProcessServiceImpl extends AbstractBatteryDataProcessor implements IDDSDataProcessService { private static final Log log = LogFactory.getLog(DDSDataProcessServiceImpl.class); private static final String SITE_ID = "021_DDS_01"; // 正则表达式匹配BMS设备编号和属性 private static final Pattern PATTERN = Pattern.compile("(BMSD\\d{2})(ZT|SOC|SOH|DL|DY|BDSC)"); // 匹配DTDC+数字格式的正则(提取序号) private static final Pattern DTDC_PATTERN = Pattern.compile("DTDC(\\d+)([A-Za-z]*)"); - // 初始化ObjectMapper(可以作为全局变量) - private static final ObjectMapper objectMapper = new ObjectMapper(); + @Autowired private EmsBatteryClusterMapper emsBatteryClusterMapper; @@ -65,6 +68,12 @@ public class DDSDataProcessServiceImpl implements IDDSDataProcessService { private EmsDhDataMapper emsDhDataMapper; @Autowired private EmsBatteryGroupMapper emsBatteryGroupMapper; + @Autowired + private EmsBatteryDataMinutesMapper emsBatteryDataMinutesMapper; + + public DDSDataProcessServiceImpl(ObjectMapper objectMapper) { + super(objectMapper); + } @Override public void handleDdsData(String message) { @@ -231,7 +240,6 @@ public class DDSDataProcessServiceImpl implements IDDSDataProcessService { } // 批量插入数据库 - log.info("批量插入"); if (!CollectionUtils.isEmpty(groupMap)) { List batteryGroupList = new ArrayList<>(groupMap.values()); emsBatteryGroupMapper.batchInsertGroupData(batteryGroupList); @@ -273,8 +281,17 @@ public class DDSDataProcessServiceImpl implements IDDSDataProcessService { //电池组 Map obj = JSON.parseObject(dataJson, new TypeReference>() { }); - List dailyList = new ArrayList<>(); + List dataList = new ArrayList<>(); + + // 前一个小时 + LocalDateTime oneHourAgo = LocalDateTime.now().minus(1, ChronoUnit.HOURS); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + String oneHourAgoStr = oneHourAgo.format(formatter); + Map dataMap = new HashMap<>(); + Map dailyMap = new HashMap<>(); + Map minutesMap = new HashMap<>(); + for (Map.Entry entry : obj.entrySet()) { String key = entry.getKey(); @@ -302,26 +319,45 @@ public class DDSDataProcessServiceImpl implements IDDSDataProcessService { setDTDCPropertyValue(data, property, entry.getValue()); dataMap.put(batteryCellId, data); - // 每日最新数据 - EmsBatteryDataDailyLatest daily = new EmsBatteryDataDailyLatest(); + // 每日最新数据:按batteryCellId去重 + EmsBatteryDataDailyLatest daily = dailyMap.getOrDefault(batteryCellId, new EmsBatteryDataDailyLatest()); BeanUtils.copyProperties(data, daily); daily.setDateDay(DateUtils.getNowDate()); - dailyList.add(daily); + dailyMap.put(batteryCellId, daily); + + // 分钟级的表,上报的数据直接存入,数据上限是 1 个小时 + EmsBatteryDataMinutes minutes = minutesMap.getOrDefault(batteryCellId, new EmsBatteryDataMinutes()); + BeanUtils.copyProperties(data, minutes); + minutesMap.put(batteryCellId, minutes); } } - } if (!CollectionUtils.isEmpty(dataMap)) { - List dataList = new ArrayList<>(dataMap.values()); + dataList = new ArrayList<>(dataMap.values()); emsBatteryDataMapper.insertEmsBatteryDataList(new ArrayList<>(dataList)); redisCache.deleteList(RedisKeyConstants.BATTERY + SITE_ID + "_" + "BMSC01"); redisCache.setCacheList(RedisKeyConstants.BATTERY + SITE_ID + "_" + "BMSC01" , dataList); } // 批量处理每日最新数据 - if (dailyList != null && dailyList.size() > 0) { + List dailyList = new ArrayList<>(dailyMap.values()); + if (!dailyList.isEmpty()) { + dailyList = new ArrayList<>(dailyMap.values()); emsBatteryDailyLatestServiceImpl.batchProcessBatteryData(dailyList); } + + // 实时插入每分钟数据 + List minutesList = new ArrayList<>(minutesMap.values()); + if (!minutesList.isEmpty()) { + emsBatteryDataMinutesMapper.insertMinutesBatteryDataList(minutesList); + } + // 清理分钟级表里一小时前数据 + emsBatteryDataMinutesMapper.deleteByTimeBeforeOneHour(oneHourAgoStr); + + // 分片处理时级,天级,月级数据 + if (dataList.size() > 0) { + super.processBatch(dataList); + } } private void setDTDCPropertyValue(EmsBatteryData data, String property, Object value) { @@ -884,33 +920,6 @@ public class DDSDataProcessServiceImpl implements IDDSDataProcessService { return records; } - private static Map> processDataPrefix(Map rawData) { - Map> records = new HashMap<>(); - - for (Map.Entry entry : rawData.entrySet()) { - String key = entry.getKey(); - // 确保键长度足够 - if (key.length() < 3) { - continue; - } - - // 提取记录ID(前3位) - String recordId = key.substring(0, 3); - if (!recordId.startsWith("DY")) { - continue; - } - - // 提取字段类型(剩余部分) - String fieldType = key.substring(3); - - // 初始化记录 - records.putIfAbsent(recordId, new HashMap<>()); - // 存入字段值 - records.get(recordId).put(fieldType, entry.getValue()); - } - return records; - } - // 空数据不处理 private boolean checkJsonDataEmpty(String jsonData) { boolean flag = false; diff --git a/ems-system/src/main/java/com/xzzn/ems/service/impl/FXXDataProcessServiceImpl.java b/ems-system/src/main/java/com/xzzn/ems/service/impl/FXXDataProcessServiceImpl.java index ada3dd4..a740418 100644 --- a/ems-system/src/main/java/com/xzzn/ems/service/impl/FXXDataProcessServiceImpl.java +++ b/ems-system/src/main/java/com/xzzn/ems/service/impl/FXXDataProcessServiceImpl.java @@ -15,20 +15,22 @@ import com.xzzn.common.utils.StringUtils; import com.xzzn.ems.domain.*; import com.xzzn.ems.mapper.*; import com.xzzn.ems.service.IFXXDataProcessService; +import com.xzzn.ems.utils.AbstractBatteryDataProcessor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; import java.util.*; @Service -public class FXXDataProcessServiceImpl implements IFXXDataProcessService { +public class FXXDataProcessServiceImpl extends AbstractBatteryDataProcessor implements IFXXDataProcessService { private static final Log log = LogFactory.getLog(FXXDataProcessServiceImpl.class); private static final String SITE_ID = "021_FXX_01"; - // 初始化ObjectMapper(可以作为全局变量) - private static final ObjectMapper objectMapper = new ObjectMapper(); @Autowired private EmsBatteryClusterMapper emsBatteryClusterMapper; @@ -55,6 +57,13 @@ public class FXXDataProcessServiceImpl implements IFXXDataProcessService { private EmsAmmeterDataMapper emsAmmeterDataMapper; @Autowired private EmsBatteryDailyLatestServiceImpl emsBatteryDailyLatestServiceImpl; + @Autowired + private EmsBatteryDataMinutesMapper emsBatteryDataMinutesMapper; + + // 构造方法(调用父类构造) + public FXXDataProcessServiceImpl(ObjectMapper objectMapper) { + super(objectMapper); + } @Override public void handleFxData(String message) { @@ -268,6 +277,11 @@ public class FXXDataProcessServiceImpl implements IFXXDataProcessService { Map> records = processData(JSON.parseObject(dataJson, new TypeReference>() {})); List list = new ArrayList<>(); List dailyList = new ArrayList<>(); + List minutesList = new ArrayList<>(); + // 前一个小时 + LocalDateTime oneHourAgo = LocalDateTime.now().minus(1, ChronoUnit.HOURS); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + String oneHourAgoStr = oneHourAgo.format(formatter); //单体电池 for (Map.Entry> record : records.entrySet()) { String recordId = record.getKey(); @@ -286,11 +300,14 @@ public class FXXDataProcessServiceImpl implements IFXXDataProcessService { // 时间戳 batteryData.setDataTimestamp(new Date()); - + // 系统管理字段 + batteryData.setCreateBy("system"); + batteryData.setCreateTime(DateUtils.getNowDate()); + batteryData.setUpdateBy("system"); + batteryData.setUpdateTime(DateUtils.getNowDate()); // ID字段 batteryData.setSiteId(SITE_ID); batteryData.setClusterDeviceId(deviceId); - list.add(batteryData); // 每日最新数据 @@ -298,18 +315,34 @@ public class FXXDataProcessServiceImpl implements IFXXDataProcessService { BeanUtils.copyProperties(batteryData, daily); daily.setDateDay(DateUtils.getNowDate()); dailyList.add(daily); + + // 分钟级的表,上报的数据直接存入,数据上限是 1 个小时 + EmsBatteryDataMinutes minutes = new EmsBatteryDataMinutes(); + BeanUtils.copyProperties(batteryData, minutes); + minutesList.add(minutes); } - if (list.size() > 0 ) { + if (list.size() > 0) { emsBatteryDataMapper.insertEmsBatteryDataList(list); - redisCache.deleteList(RedisKeyConstants.BATTERY + SITE_ID + "_" +deviceId); - redisCache.setCacheList(RedisKeyConstants.BATTERY + SITE_ID + "_" +deviceId, list); + redisCache.deleteList(RedisKeyConstants.BATTERY + SITE_ID + "_" + deviceId); + redisCache.setCacheList(RedisKeyConstants.BATTERY + SITE_ID + "_" + deviceId, list); } // 批量处理每日最新数据 if (dailyList != null && dailyList.size() > 0) { emsBatteryDailyLatestServiceImpl.batchProcessBatteryData(dailyList); } + // 实时插入每分钟数据 + if (minutesList != null && minutesList.size() > 0) { + emsBatteryDataMinutesMapper.insertMinutesBatteryDataList(minutesList); + } + // 清理分钟级表里一小时前数据 + emsBatteryDataMinutesMapper.deleteByTimeBeforeOneHour(oneHourAgoStr); + + // 分片处理时级,天级,月级数据 + if (list.size() > 0) { + super.processBatch(list); + } } private void pcsDataProcess(String deviceId, String dataJson) { diff --git a/ems-system/src/main/java/com/xzzn/ems/utils/AbstractBatteryDataProcessor.java b/ems-system/src/main/java/com/xzzn/ems/utils/AbstractBatteryDataProcessor.java new file mode 100644 index 0000000..2dcc4a1 --- /dev/null +++ b/ems-system/src/main/java/com/xzzn/ems/utils/AbstractBatteryDataProcessor.java @@ -0,0 +1,681 @@ +package com.xzzn.ems.utils; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.xzzn.common.core.redis.RedisCache; +import com.xzzn.common.utils.DateUtils; +import com.xzzn.ems.domain.EmsBatteryData; +import com.xzzn.ems.domain.EmsBatteryDataDay; +import com.xzzn.ems.domain.EmsBatteryDataHour; +import com.xzzn.ems.domain.EmsBatteryDataMonth; +import com.xzzn.ems.mapper.EmsBatteryDataDayMapper; +import com.xzzn.ems.mapper.EmsBatteryDataHourMapper; +import com.xzzn.ems.mapper.EmsBatteryDataMonthMapper; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; + +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public abstract class AbstractBatteryDataProcessor { + + private static final Log log = LogFactory.getLog(AbstractBatteryDataProcessor.class); + + protected static final DateTimeFormatter HOUR_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH"); + protected static final DateTimeFormatter DAY_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + protected static final DateTimeFormatter MONTH_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM"); + // 公共缓存: + protected final Map tempCache = new ConcurrentHashMap<>(); + // 公共锁缓存: + protected final Map lockCache = new ConcurrentHashMap<>(); + protected final ObjectMapper objectMapper; + + @Autowired + private EmsBatteryDataHourMapper emsBatteryDataHourMapper; + @Autowired + private EmsBatteryDataDayMapper emsBatteryDataDayMapper; + @Autowired + private EmsBatteryDataMonthMapper emsBatteryDataMonthMapper; + @Autowired + private RedisCache redisCache; + + // 公共线程池(IO密集型,子类共享) + protected ThreadPoolTaskExecutor ioExecutor; + // 批量缓冲区(线程安全队列,按级别区分) + protected ConcurrentLinkedQueue hourBatchQueue = new ConcurrentLinkedQueue<>(); + protected ConcurrentLinkedQueue dayBatchQueue = new ConcurrentLinkedQueue<>(); + protected ConcurrentLinkedQueue monthBatchQueue = new ConcurrentLinkedQueue<>(); + // 批量提交阈值(满100条触发数据库操作) + protected static final int BATCH_THRESHOLD = 50; + // 死锁重试等待时间基数(毫秒) + private static final int RETRY_BASE_DELAY = 100; + // 随机数生成器,用于重试退避 + private static final Random random = new Random(); + + // 初始化方法(构造时初始化线程池) + public AbstractBatteryDataProcessor(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + ioExecutor = new ThreadPoolTaskExecutor(); + int coreThreads = Runtime.getRuntime().availableProcessors() * 2; + ioExecutor.setCorePoolSize(coreThreads); + ioExecutor.setMaxPoolSize(coreThreads * 2); + ioExecutor.setQueueCapacity(10000); + ioExecutor.setThreadNamePrefix("battery-io-"); + // 线程池饱和策略:使用调用者线程执行,避免任务丢失 + ioExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + ioExecutor.initialize(); + } + + // 模板方法:定义数据处理主流程(子类无需重写) + public void processBatch(List batchData) { + // 1. 数据分片(实现通用分片逻辑) + List> shards = splitIntoShards(batchData, 50); + int totalShards = shards.size(); + + // 2. 并行处理每个分片(使用父类线程池) + for (int i = 0; i < totalShards; i++) { + List shard = shards.get(i); + boolean isLastShard = (i == totalShards - 1); // 判断是否为最后一个分片 + + // 传递分片索引和是否最后一个分片的标记 + ioExecutor.execute(() -> processShard(shard, isLastShard)); + } + //shards.forEach(shard -> ioExecutor.execute(() -> processShard(shard))); + } + + /** + * 将大批量数据拆分为多个小分片 + * @param dataList 原始数据列表 + * @param shardSize 每个分片的最大大小(如100条/片) + * @return 分片后的列表(每个元素是一个子列表) + */ + protected List> splitIntoShards(List dataList, int shardSize) { + // 1. 校验参数:避免空列表或无效分片大小 + if (dataList == null || dataList.isEmpty()) { + return new ArrayList<>(); + } + if (shardSize <= 0) { + throw new IllegalArgumentException("分片大小必须大于0"); + } + + // 2. 计算总分片数:向上取整(如150条数据,分片大小100 → 2片) + int totalSize = dataList.size(); + int shardCount = (totalSize + shardSize - 1) / shardSize; + + // 3. 拆分数据为分片(使用IntStream生成分片索引) + return IntStream.range(0, shardCount) + .mapToObj(shardIndex -> { + // 计算当前分片的起始索引和结束索引 + int startIndex = shardIndex * shardSize; + int endIndex = Math.min(startIndex + shardSize, totalSize); + // 截取子列表作为一个分片 + return dataList.subList(startIndex, endIndex); + }) + .collect(Collectors.toList()); + } + + // 处理单个分片(父类定义流程,调用子类实现的具体判断和更新逻辑) + private void processShard(List shard, boolean isLastShard) { + int shardSize = shard.size(); + log.info("分片校验:" + shardSize + ""); + for (int i = 0; i < shardSize; i++) { + EmsBatteryData data = shard.get(i); + if (!isValidData(data)) continue; + + // 判断当前数据是否为最后一个分片的最后一条数据 + boolean isLastData = isLastShard && (i == shardSize - 1); + // shouldUpdate判断是否需要更新 + if (shouldUpdate(data, "hour")) { + addToHourBatch(data, isLastData); // 加入小时级批量队列 + } + if (shouldUpdate(data, "day")) { + addToDayBatch(data, isLastData); // 加入天级批量队列 + } + if (shouldUpdate(data, "month")) { + addToMonthBatch(data, isLastData); + } + } + } + + // 加入小时级批量队列,达到阈值时触发批量更新 + protected void addToHourBatch(EmsBatteryData data, boolean isLastData) { + hourBatchQueue.add(data); + if (hourBatchQueue.size() >= BATCH_THRESHOLD || isLastData) { + List batch = drainQueue(hourBatchQueue); + ioExecutor.execute(() -> batchUpdateHour(batch)); // 异步批量更新 + } + } + + // 加入天级批量队列,达到阈值时触发批量更新 + protected void addToDayBatch(EmsBatteryData data, boolean isLastData) { + dayBatchQueue.add(data); + if (dayBatchQueue.size() >= BATCH_THRESHOLD || isLastData) { + List batch = drainQueue(dayBatchQueue); + ioExecutor.execute(() -> batchUpdateDay(batch)); // 异步批量更新 + } + } + + // 加入月级批量队列,达到阈值时触发批量更新 + protected void addToMonthBatch(EmsBatteryData data, boolean isLastData) { + monthBatchQueue.add(data); + if (monthBatchQueue.size() >= BATCH_THRESHOLD || isLastData) { + List batch = drainQueue(monthBatchQueue); + ioExecutor.execute(() -> batchUpdateMonth(batch)); // 异步批量更新 + } + } + + // 实现父类的抽象方法:批量更新小时级数据 + @Transactional(propagation = Propagation.REQUIRES_NEW) + public void batchUpdateHour(List batch) { + if (batch.isEmpty()) return; + + int maxRetry = 3; // 最多重试3次 + int retryCount = 0; + while (retryCount < maxRetry) { + try { + // 关键优化:按唯一键排序,保证所有线程处理相同记录时顺序一致 + List sortedBatch = batch.stream() + .sorted(Comparator.comparing(data -> generateCacheKey(data, "hour"))) // 用唯一键排序 + .collect(Collectors.toList()); + + // 1. 转换原始数据为小时级统计对象(分组取最高温) + List hourDataList = sortedBatch.stream() + .collect(Collectors.groupingBy( + data -> generateCacheKey(data, "hour"), // 分组键 + Collectors.collectingAndThen( + Collectors.maxBy((d1, d2) -> d1.getTemperature().compareTo(d2.getTemperature())), + maxData -> convertToHourData(maxData.get()) // 取每组最高温转换为HourData + ) + )) + .values() + .stream() + .sorted(Comparator.comparing(this::getHourDataUniqueKey)) + .collect(Collectors.toList()); + + // 2. 批量插入或更新数据库(依赖数据库唯一索引) + emsBatteryDataHourMapper.batchInsertOrUpdate(hourDataList); + + // 3. 同步更新Redis缓存(批量操作) + batchUpdateHourCache(hourDataList); + log.info("小时级批量更新成功,批次大小:" + batch.size() + ",重试次数:" + retryCount); + return; + } catch (Exception e) { + log.error("小时级批量更新失败", e); + // 使用改进的死锁判断方法,直接传入异常对象 + if (isDeadlockException(e) && retryCount < maxRetry - 1) { + retryCount++; + long sleepTime = calculateBackoffTime(retryCount); + log.info("检测到死锁,第" + retryCount + "次重试(当前批次大小:" + batch.size() + ")", e); + try { + // 指数退避策略:100ms, 200ms, 300ms... + Thread.sleep(sleepTime); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); // 保留中断状态 + break; + } + } else { + log.error("小时级批量更新失败(已达最大重试次数)", e); + //retryQueue.addAll(batch); // 加入重试队列 + return; + } + } + } + } + // 辅助方法:获取小时级数据的唯一键(用于排序) + private String getHourDataUniqueKey(EmsBatteryDataHour data) { + return data.getSiteId() + "_" + data.getBatteryPack() + "_" + data.getClusterDeviceId() + + "_" + data.getDeviceId() + "_" + data.getHourTime(); + } + // 批量更新小时级数据缓存-小时级统计数据列表(已按唯一键分组并取最高温) + public void batchUpdateHourCache(List hourDataList) { + if (hourDataList.isEmpty()) return; + + // 1. 构建缓存键值对(批量操作提升效率) + Map tempCacheMap = new HashMap<>(hourDataList.size()); + for (EmsBatteryDataHour data : hourDataList) { + EmsBatteryData cacheData = new EmsBatteryData(); + BeanUtils.copyProperties(data, cacheData); + // 2. 生成唯一缓存键(与数据库唯一键一致) + String cacheKey = generateCacheKey(cacheData, "hour"); + // 3. 存储最高温度和最后更新时间(两个独立缓存项) + tempCacheMap.put(cacheKey, data.getTemperature().toString()); + } + + try { + // 4. 批量设置缓存(Redis批量操作减少网络交互) + redisCache.multiSet(tempCacheMap); + } catch (Exception e) { + log.error("批量更新小时级缓存失败", e); + // 缓存更新失败不影响主流程,但需记录日志便于排查 + } + } + // 转换为小时级数据对象 + private EmsBatteryDataHour convertToHourData(EmsBatteryData data) { + LocalDateTime localDateTime = data.getDataTimestamp().toInstant().atZone(ZoneId.of("Asia/Shanghai")) + .toLocalDateTime(); + LocalDateTime hourStart = localDateTime.truncatedTo(ChronoUnit.HOURS); + + EmsBatteryDataHour hourData = new EmsBatteryDataHour(); + BeanUtils.copyProperties(data, hourData); + hourData.setUpdateTime(new Date()); + hourData.setHourTime(DateUtils.convertToDate(hourStart)); + return hourData; + } + + // 实现父类的抽象方法:批量更新天级数据 + @Transactional(propagation = Propagation.REQUIRES_NEW) + public void batchUpdateDay(List batch) { + if (batch.isEmpty()) return; + + int maxRetry = 3; // 最多重试3次 + int retryCount = 0; + while (retryCount < maxRetry) { + try { + // 关键优化:按唯一键排序,保证所有线程处理相同记录时顺序一致 + List sortedBatch = batch.stream() + .sorted(Comparator.comparing(data -> generateCacheKey(data, "day"))) // 用唯一键排序 + .collect(Collectors.toList()); + + // 1. 转换原始数据为天级统计对象(分组取最高温) + List dayDataList = sortedBatch.stream() + .collect(Collectors.groupingBy( + data -> generateCacheKey(data, "day"), // 分组键 + Collectors.collectingAndThen( + Collectors.maxBy((d1, d2) -> d1.getTemperature().compareTo(d2.getTemperature())), + maxData -> convertToDayData(maxData.get()) // 取每组最高温转换为HourData + ) + )) + .values() + .stream() + .sorted(Comparator.comparing(this::getDayDataUniqueKey)) + .collect(Collectors.toList()); + + // 2. 批量插入或更新数据库(依赖数据库唯一索引) + emsBatteryDataDayMapper.batchInsertOrUpdate(dayDataList); + + // 3. 同步更新Redis缓存(批量操作) + batchUpdateDayCache(dayDataList); + log.info("天级批量更新成功,批次大小:" + batch.size() + ",重试次数:" + retryCount); + return; + } catch (Exception e) { + log.error("天级批量更新失败", e); + // 使用改进的死锁判断方法,直接传入异常对象 + if (isDeadlockException(e) && retryCount < maxRetry - 1) { + retryCount++; + long sleepTime = calculateBackoffTime(retryCount); + log.info("检测到死锁,第" + retryCount + "次重试(当前批次大小:" + batch.size() + ")", e); + try { + // 指数退避策略:100ms, 200ms, 300ms... + Thread.sleep(sleepTime); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); // 保留中断状态 + break; + } + } else { + log.error("天级批量更新失败(已达最大重试次数)", e); + //retryQueue.addAll(batch); // 加入重试队列 + return; + } + } + } + } + // 辅助方法:获取天级数据的唯一键(用于排序) + private String getDayDataUniqueKey(EmsBatteryDataDay data) { + return data.getSiteId() + "_" + data.getBatteryPack() + "_" + data.getClusterDeviceId() + + "_" + data.getDeviceId() + "_" + data.getDayTime(); + } + // 批量更新天级数据缓存-天级统计数据列表(已按唯一键分组并取最高温) + public void batchUpdateDayCache(List dayDataList) { + if (dayDataList.isEmpty()) return; + + // 1. 构建缓存键值对(批量操作提升效率) + Map tempCacheMap = new HashMap<>(dayDataList.size()); + for (EmsBatteryDataDay data : dayDataList) { + EmsBatteryData cacheData = new EmsBatteryData(); + BeanUtils.copyProperties(data, cacheData); + // 2. 生成唯一缓存键(与数据库唯一键一致) + String cacheKey = generateCacheKey(cacheData, "day"); + // 3. 存储最高温度和最后更新时间(两个独立缓存项) + tempCacheMap.put(cacheKey, data.getTemperature().toString()); + } + + try { + // 4. 批量设置缓存(Redis批量操作减少网络交互) + redisCache.multiSet(tempCacheMap); + } catch (Exception e) { + log.error("批量更新小时级缓存失败", e); + // 缓存更新失败不影响主流程,但需记录日志便于排查 + } + } + // 转换为天级数据对象 + private EmsBatteryDataDay convertToDayData(EmsBatteryData data) { + LocalDateTime localDateTime = data.getDataTimestamp().toInstant().atZone(ZoneId.of("Asia/Shanghai")) + .toLocalDateTime(); + LocalDateTime dayStart = localDateTime.truncatedTo(ChronoUnit.DAYS); + + EmsBatteryDataDay dayData = new EmsBatteryDataDay(); + BeanUtils.copyProperties(data, dayData); + dayData.setUpdateTime(new Date()); + dayData.setDayTime(DateUtils.convertToDate(dayStart)); + return dayData; + } + + // 实现父类的抽象方法:批量更新月级数据 + @Transactional(propagation = Propagation.REQUIRES_NEW) + public void batchUpdateMonth(List batch) { + if (batch.isEmpty()) return; + + int maxRetry = 3; // 最多重试3次 + int retryCount = 0; + while (retryCount < maxRetry) { + try { + // 关键优化:按唯一键排序,保证所有线程处理相同记录时顺序一致 + List sortedBatch = batch.stream() + .sorted(Comparator.comparing(data -> generateCacheKey(data, "month"))) // 用唯一键排序 + .collect(Collectors.toList()); + + // 1. 转换原始数据为小时级统计对象(分组取最高温) + List monthDataList = sortedBatch.stream() + .collect(Collectors.groupingBy( + data -> generateCacheKey(data, "month"), // 分组键 + Collectors.collectingAndThen( + Collectors.maxBy((d1, d2) -> d1.getTemperature().compareTo(d2.getTemperature())), + maxData -> convertToMonthData(maxData.get()) // 取每组最高温转换为HourData + ) + )) + .values() + .stream() + .sorted(Comparator.comparing(this::getMonthDataUniqueKey)) + .collect(Collectors.toList()); + + // 2. 批量插入或更新数据库(依赖数据库唯一索引) + emsBatteryDataMonthMapper.batchInsertOrUpdate(monthDataList); + + // 3. 同步更新Redis缓存(批量操作) + batchUpdateMonthCache(monthDataList); + log.info("月级批量更新成功,批次大小:" + batch.size() + ",重试次数:" + retryCount); + return; + } catch (Exception e) { + log.error("月级批量更新失败", e); + // 使用改进的死锁判断方法,直接传入异常对象 + if (isDeadlockException(e) && retryCount < maxRetry - 1) { + retryCount++; + long sleepTime = calculateBackoffTime(retryCount); + log.info("检测到死锁,第" + retryCount + "次重试(当前批次大小:" + batch.size() + ")", e); + try { + // 指数退避策略:100ms, 200ms, 300ms... + Thread.sleep(sleepTime); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); // 保留中断状态 + break; + } + } else { + log.error("月级批量更新失败(已达最大重试次数)", e); + //retryQueue.addAll(batch); // 加入重试队列 + return; + } + } + } + } + // 辅助方法:获取月级数据的唯一键(用于排序) + private String getMonthDataUniqueKey(EmsBatteryDataMonth data) { + return data.getSiteId() + "_" + data.getBatteryPack() + "_" + data.getClusterDeviceId() + + "_" + data.getDeviceId() + "_" + data.getMonthTime(); + } + // 转换为月级数据对象 + private EmsBatteryDataMonth convertToMonthData(EmsBatteryData data) { + LocalDateTime localDateTime = data.getDataTimestamp().toInstant().atZone(ZoneId.of("Asia/Shanghai")) + .toLocalDateTime(); + LocalDateTime monthStart = localDateTime.withDayOfMonth(1).withHour(0).withMinute(0) + .withSecond(0).withNano(0); + + EmsBatteryDataMonth monthData = new EmsBatteryDataMonth(); + BeanUtils.copyProperties(data, monthData); + monthData.setUpdateTime(new Date()); + monthData.setMonthTime(DateUtils.convertToDate(monthStart)); + return monthData; + } + // 批量更新月级数据缓存-月级统计数据列表(已按唯一键分组并取最高温) + public void batchUpdateMonthCache(List monthDataList) { + if (monthDataList.isEmpty()) return; + + // 1. 构建缓存键值对(批量操作提升效率) + Map tempCacheMap = new HashMap<>(monthDataList.size()); + for (EmsBatteryDataMonth data : monthDataList) { + EmsBatteryData cacheData = new EmsBatteryData(); + BeanUtils.copyProperties(data, cacheData); + // 2. 生成唯一缓存键(与数据库唯一键一致) + String cacheKey = generateCacheKey(cacheData, "month"); + // 3. 存储最高温度和最后更新时间(两个独立缓存项) + tempCacheMap.put(cacheKey, data.getTemperature().toString()); + } + + try { + // 4. 批量设置缓存(Redis批量操作减少网络交互) + redisCache.multiSet(tempCacheMap); + } catch (Exception e) { + log.error("批量更新月级缓存失败", e); + // 缓存更新失败不影响主流程,但需记录日志便于排查 + } + } + + + /** + * 校验数据有效性(公共逻辑:检查核心字段) + */ + protected boolean isValidData(EmsBatteryData data) { + return data.getDeviceId() != null + && data.getTemperature() != null + && data.getDataTimestamp() != null; + } + + /** + * 判断是否需要更新(公共缓存逻辑) + */ + protected boolean shouldUpdate(EmsBatteryData data, String granularity) { + String cacheKey = generateCacheKey(data, granularity); + if (cacheKey == null) return true; + + BigDecimal newTemp = data.getTemperature(); + CacheValue localCache = tempCache.get(cacheKey); + if (localCache != null) { + return newTemp.compareTo(localCache.maxTemp) > 0; + } + + // 本地缓存未命中:查Redis + try { + String redisValue = redisCache.getCacheObject(cacheKey); + if (redisValue != null) { + // Redis有值:转换为BigDecimal比较 + BigDecimal redisMaxTemp = new BigDecimal(redisValue); + // 同时更新本地缓存(减少下次查询Redis的开销) + tempCache.put(cacheKey, new CacheValue(redisMaxTemp, LocalDateTime.now())); + return newTemp.compareTo(redisMaxTemp) > 0; + } + } catch (Exception e) { + log.error("查询Redis缓存失败(key: " + cacheKey + ")", e); + } + + + // 数据库校验(缓存都未命中或新温度更高时) + BigDecimal dbMaxTemp = queryDbMaxTemp(data, granularity); + if (dbMaxTemp != null && newTemp.compareTo(dbMaxTemp) <= 0) { + // 数据库已有更高温度,更新缓存后返回 + CacheValue dbCache = new CacheValue(dbMaxTemp, LocalDateTime.now()); + tempCache.put(cacheKey, dbCache); + redisCache.setCacheObject(cacheKey, dbMaxTemp.toString()); // 同步到Redis + return false; + } + + return true; + } + + /** + * 查询数据库中该设备在指定时间粒度下的最大温度 + */ + private BigDecimal queryDbMaxTemp(EmsBatteryData data, String granularity) { + try { + LocalDateTime dataTime = data.getDataTimestamp().toInstant() + .atZone(ZoneId.of("Asia/Shanghai")).toLocalDateTime(); + + String siteId = data.getSiteId(); + String batteryPack = data.getBatteryPack(); + String clusterId = data.getClusterDeviceId(); + String deviceId = data.getDeviceId(); + + // 根据粒度查询对应的数据表 + switch (granularity) { + case "hour": + LocalDateTime hourStart = dataTime.truncatedTo(ChronoUnit.HOURS); + // 调用Mapper查询该小时的最大温度(需确保Mapper方法存在) + EmsBatteryDataHour hourMax = emsBatteryDataHourMapper.findHourMaxTemp( + siteId, batteryPack, clusterId, deviceId, hourStart + ); + return hourMax != null ? hourMax.getTemperature() : null; + + case "day": + LocalDateTime dayStart = dataTime.truncatedTo(ChronoUnit.DAYS); + EmsBatteryDataDay dayMax = emsBatteryDataDayMapper.findDayMaxTemp( + siteId, batteryPack, clusterId, deviceId, dayStart + ); + return dayMax != null ? dayMax.getTemperature() : null; + + case "month": + // 设为当月1号 与其他表统一格式 + LocalDateTime monthStart = dataTime.withDayOfMonth(1).withHour(0).withMinute(0) + .withSecond(0).withNano(0); + EmsBatteryDataMonth monthMax = emsBatteryDataMonthMapper.findMonthMaxTemp( + siteId, batteryPack, clusterId, deviceId, monthStart + ); + return monthMax != null ? monthMax.getTemperature() : null; + + default: + log.error("不支持的查询粒度:" + granularity); + return null; + } + } catch (Exception e) { + log.error("查询数据库最大温度失败", e); + return null; // 数据库查询失败时,暂不阻止更新(避免业务中断) + } + } + + // 生成缓存键(不同级别格式不同) + private String generateCacheKey(EmsBatteryData data, String granularity) { + String siteId = data.getSiteId(); + String stackId = data.getBatteryPack(); + String clusterId = data.getClusterDeviceId(); + String batteryId = data.getDeviceId(); + try { + LocalDateTime localDateTime = data.getDataTimestamp().toInstant().atZone(ZoneId.of("Asia/Shanghai")) + .toLocalDateTime(); + String timeStr; + // 根据级别格式化时间(小时/天/月) + switch (granularity) { + case "hour": + timeStr = localDateTime.format(HOUR_FORMATTER); + break; + case "day": + timeStr = localDateTime.format(DAY_FORMATTER); + break; + case "month": + timeStr = localDateTime.format(MONTH_FORMATTER); + break; + default: + log.info("不支持的级别:" + granularity); + return null; + } + // 缓存键格式 + return siteId + "_" + stackId + "_" + clusterId + "_" + batteryId + "_" +timeStr; + } catch (Exception e) { + log.error("生成缓存键失败", e); + return null; + } + } + + // 工具方法:从队列中取出所有元素 + protected List drainQueue(ConcurrentLinkedQueue queue) { + List result = new ArrayList<>(); + T element; + while ((element = queue.poll()) != null) { + result.add(element); + } + return result; + } + + /** + * 获取电池+粒度的专属锁(公共锁逻辑) + */ + protected ReentrantLock getLock(String siteId, String stackId, String clusterId, + String batteryId, String granularity) { + String lockKey = siteId + "_" + stackId + "_" + clusterId + "_" + batteryId + "_" + granularity; + return lockCache.computeIfAbsent(lockKey, k -> new ReentrantLock()); + } + + // 计算退避时间:指数退避 + 随机抖动,避免重试风暴 + private long calculateBackoffTime(int retryCount) { + // 100ms * 2^retry + 随机0-100ms + return (long)(RETRY_BASE_DELAY * Math.pow(2, retryCount)) + random.nextInt(100); + } + + // 判断是否为死锁异常 + private boolean isDeadlockException(Throwable e) { + // 遍历所有异常链(包括cause的cause...) + while (e != null) { + // 检查异常消息是否包含死锁关键字 + if (e.getMessage() != null && e.getMessage().contains("Deadlock found when trying to get lock")) { + return true; + } + // 深入下一层异常 + e = e.getCause(); + } + return false; + } + + /** + * 公共缓存实体(所有站点通用) + */ + protected static class CacheValue { + BigDecimal maxTemp; + LocalDateTime lastUpdateTime; + + public CacheValue(BigDecimal maxTemp, LocalDateTime lastUpdateTime) { + this.maxTemp = maxTemp; + this.lastUpdateTime = lastUpdateTime; + } + } + + // 新增定时任务,每1分钟强制刷新一次队列(防止数据长期积压) + @Scheduled(fixedRate = 60000) // 1分钟 = 60000毫秒 + protected void scheduledFlushBatch() { + if (!hourBatchQueue.isEmpty()) { + List remaining = drainQueue(hourBatchQueue); + ioExecutor.execute(() -> batchUpdateHour(remaining)); + log.info("定时任务触发,处理小时级剩余数据,数量: " + remaining.size()); + } + if (!dayBatchQueue.isEmpty()) { + List remaining = drainQueue(dayBatchQueue); + ioExecutor.execute(() -> batchUpdateDay(remaining)); + log.info("定时任务触发,处理天级剩余数据,数量: " + remaining.size()); + } + if (!monthBatchQueue.isEmpty()) { + List remaining = drainQueue(monthBatchQueue); + ioExecutor.execute(() -> batchUpdateMonth(remaining)); + log.info("定时任务触发,处理月级剩余数据,数量: " + remaining.size()); + } + } +} diff --git a/ems-system/src/main/resources/mapper/ems/EmsBatteryDataDayMapper.xml b/ems-system/src/main/resources/mapper/ems/EmsBatteryDataDayMapper.xml new file mode 100644 index 0000000..d1c9bc5 --- /dev/null +++ b/ems-system/src/main/resources/mapper/ems/EmsBatteryDataDayMapper.xml @@ -0,0 +1,201 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + select id, battery_pack, battery_cluster, battery_cell_id, voltage, temperature, soc, soh, data_timestamp, create_by, create_time, update_by, update_time, remark, site_id, device_id, cluster_device_id, inter_resistance, day_time from ems_battery_data_day + + + + + + + + insert into ems_battery_data_day + + battery_pack, + battery_cluster, + battery_cell_id, + voltage, + temperature, + soc, + soh, + data_timestamp, + create_by, + create_time, + update_by, + update_time, + remark, + site_id, + device_id, + cluster_device_id, + inter_resistance, + day_time, + + + #{batteryPack}, + #{batteryCluster}, + #{batteryCellId}, + #{voltage}, + #{temperature}, + #{soc}, + #{soh}, + #{dataTimestamp}, + #{createBy}, + #{createTime}, + #{updateBy}, + #{updateTime}, + #{remark}, + #{siteId}, + #{deviceId}, + #{clusterDeviceId}, + #{interResistance}, + #{dayTime}, + + + + + update ems_battery_data_day + + battery_pack = #{batteryPack}, + battery_cluster = #{batteryCluster}, + battery_cell_id = #{batteryCellId}, + voltage = #{voltage}, + temperature = #{temperature}, + soc = #{soc}, + soh = #{soh}, + data_timestamp = #{dataTimestamp}, + create_by = #{createBy}, + create_time = #{createTime}, + update_by = #{updateBy}, + update_time = #{updateTime}, + remark = #{remark}, + site_id = #{siteId}, + device_id = #{deviceId}, + cluster_device_id = #{clusterDeviceId}, + inter_resistance = #{interResistance}, + day_time = #{dayTime}, + + where id = #{id} + + + + delete from ems_battery_data_day where id = #{id} + + + + delete from ems_battery_data_day where id in + + #{id} + + + + + + + + INSERT INTO ems_battery_data_day ( + battery_pack, + battery_cluster, + battery_cell_id, + voltage, + temperature, + soc, + soh, + data_timestamp, + create_by, + create_time, + update_by, + update_time, + remark, + site_id, + device_id, + cluster_device_id, + inter_resistance, + day_time + ) VALUES + + ( + #{item.batteryPack}, + #{item.batteryCluster}, + #{item.batteryCellId}, + #{item.voltage}, + #{item.temperature}, + #{item.soc}, + #{item.soh}, + #{item.dataTimestamp}, + #{item.createBy}, + #{item.createTime}, #{item.updateBy}, + #{item.updateTime}, + #{item.remark}, + #{item.siteId}, + #{item.deviceId}, + #{item.clusterDeviceId}, + #{item.interResistance}, + #{item.dayTime} + ) + + ON DUPLICATE KEY UPDATE + -- 仅当新温度高于现有温度时,才更新温度和相关字段 + update_time = IF(VALUES(temperature) > temperature, NOW(), update_time), + data_timestamp = IF(VALUES(temperature) > temperature, VALUES(data_timestamp), data_timestamp), + voltage = IF(VALUES(temperature) > temperature, VALUES(voltage), voltage), + soc = IF(VALUES(temperature) > temperature, VALUES(soc), soc), + soh = IF(VALUES(temperature) > temperature, VALUES(soh), soh), + inter_resistance = IF(VALUES(temperature) > temperature, VALUES(inter_resistance), inter_resistance), + update_by = IF(VALUES(temperature) > temperature, VALUES(update_by), update_by) + temperature = IF(VALUES(temperature) > temperature, VALUES(temperature), temperature) + + diff --git a/ems-system/src/main/resources/mapper/ems/EmsBatteryDataHourMapper.xml b/ems-system/src/main/resources/mapper/ems/EmsBatteryDataHourMapper.xml new file mode 100644 index 0000000..aff9a5c --- /dev/null +++ b/ems-system/src/main/resources/mapper/ems/EmsBatteryDataHourMapper.xml @@ -0,0 +1,202 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + select id, battery_pack, battery_cluster, battery_cell_id, voltage, temperature, soc, soh, data_timestamp, create_by, create_time, update_by, update_time, remark, site_id, device_id, cluster_device_id, inter_resistance, hour_time from ems_battery_data_hour + + + + + + + + insert into ems_battery_data_hour + + battery_pack, + battery_cluster, + battery_cell_id, + voltage, + temperature, + soc, + soh, + data_timestamp, + create_by, + create_time, + update_by, + update_time, + remark, + site_id, + device_id, + cluster_device_id, + inter_resistance, + hour_time, + + + #{batteryPack}, + #{batteryCluster}, + #{batteryCellId}, + #{voltage}, + #{temperature}, + #{soc}, + #{soh}, + #{dataTimestamp}, + #{createBy}, + #{createTime}, + #{updateBy}, + #{updateTime}, + #{remark}, + #{siteId}, + #{deviceId}, + #{clusterDeviceId}, + #{interResistance}, + #{hourTime}, + + + + + update ems_battery_data_hour + + battery_pack = #{batteryPack}, + battery_cluster = #{batteryCluster}, + battery_cell_id = #{batteryCellId}, + voltage = #{voltage}, + temperature = #{temperature}, + soc = #{soc}, + soh = #{soh}, + data_timestamp = #{dataTimestamp}, + create_by = #{createBy}, + create_time = #{createTime}, + update_by = #{updateBy}, + update_time = #{updateTime}, + remark = #{remark}, + site_id = #{siteId}, + device_id = #{deviceId}, + cluster_device_id = #{clusterDeviceId}, + inter_resistance = #{interResistance}, + hour_time = #{hourTime}, + + where id = #{id} + + + + delete from ems_battery_data_hour where id = #{id} + + + + delete from ems_battery_data_hour where id in + + #{id} + + + + + + + + INSERT INTO ems_battery_data_hour ( + battery_pack, + battery_cluster, + battery_cell_id, + voltage, + temperature, + soc, + soh, + data_timestamp, + create_by, + create_time, + update_by, + update_time, + remark, + site_id, + device_id, + cluster_device_id, + inter_resistance, + hour_time + ) VALUES + + ( + #{item.batteryPack}, + #{item.batteryCluster}, + #{item.batteryCellId}, + #{item.voltage}, + #{item.temperature}, + #{item.soc}, + #{item.soh}, + #{item.dataTimestamp}, + #{item.createBy}, + #{item.createTime}, + #{item.updateBy}, + #{item.updateTime}, + #{item.remark}, + #{item.siteId}, + #{item.deviceId}, + #{item.clusterDeviceId}, + #{item.interResistance}, + #{item.hourTime} + ) + + ON DUPLICATE KEY UPDATE + -- 仅当新温度高于现有温度时,才更新温度和相关字段 + update_time = IF(VALUES(temperature) > temperature, NOW(), update_time), + data_timestamp = IF(VALUES(temperature) > temperature, VALUES(data_timestamp), data_timestamp), + voltage = IF(VALUES(temperature) > temperature, VALUES(voltage), voltage), + soc = IF(VALUES(temperature) > temperature, VALUES(soc), soc), + soh = IF(VALUES(temperature) > temperature, VALUES(soh), soh), + inter_resistance = IF(VALUES(temperature) > temperature, VALUES(inter_resistance), inter_resistance), + update_by = IF(VALUES(temperature) > temperature, VALUES(update_by), update_by) + temperature = IF(VALUES(temperature) > temperature, VALUES(temperature), temperature) + + \ No newline at end of file diff --git a/ems-system/src/main/resources/mapper/ems/EmsBatteryDataMinutesMapper.xml b/ems-system/src/main/resources/mapper/ems/EmsBatteryDataMinutesMapper.xml new file mode 100644 index 0000000..3781843 --- /dev/null +++ b/ems-system/src/main/resources/mapper/ems/EmsBatteryDataMinutesMapper.xml @@ -0,0 +1,152 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + select id, battery_pack, battery_cluster, battery_cell_id, voltage, temperature, soc, soh, data_timestamp, create_by, create_time, update_by, update_time, remark, site_id, device_id, cluster_device_id, inter_resistance from ems_battery_data_minutes + + + + + + + + insert into ems_battery_data_minutes + + battery_pack, + battery_cluster, + battery_cell_id, + voltage, + temperature, + soc, + soh, + data_timestamp, + create_by, + create_time, + update_by, + update_time, + remark, + site_id, + device_id, + cluster_device_id, + inter_resistance, + + + #{batteryPack}, + #{batteryCluster}, + #{batteryCellId}, + #{voltage}, + #{temperature}, + #{soc}, + #{soh}, + #{dataTimestamp}, + #{createBy}, + #{createTime}, + #{updateBy}, + #{updateTime}, + #{remark}, + #{siteId}, + #{deviceId}, + #{clusterDeviceId}, + #{interResistance}, + + + + + update ems_battery_data_minutes + + battery_pack = #{batteryPack}, + battery_cluster = #{batteryCluster}, + battery_cell_id = #{batteryCellId}, + voltage = #{voltage}, + temperature = #{temperature}, + soc = #{soc}, + soh = #{soh}, + data_timestamp = #{dataTimestamp}, + create_by = #{createBy}, + create_time = #{createTime}, + update_by = #{updateBy}, + update_time = #{updateTime}, + remark = #{remark}, + site_id = #{siteId}, + device_id = #{deviceId}, + cluster_device_id = #{clusterDeviceId}, + inter_resistance = #{interResistance}, + + where id = #{id} + + + + delete from ems_battery_data_minutes where date_day = #{dateDay} + + + + delete from ems_battery_data_minutes where date_day in + + #{dateDay} + + + + + INSERT INTO ems_battery_data_minutes ( + battery_pack, battery_cluster, battery_cell_id, + voltage, temperature, soc, soh, data_timestamp, + create_by, create_time, update_by, update_time, + remark, site_id, device_id, cluster_device_id, inter_resistance + ) VALUES + + ( + #{item.batteryPack}, #{item.batteryCluster}, #{item.batteryCellId}, + #{item.voltage}, #{item.temperature}, #{item.soc}, #{item.soh}, #{item.dataTimestamp}, + #{item.createBy}, #{item.createTime}, #{item.updateBy}, #{item.updateTime}, + #{item.remark}, #{item.siteId}, #{item.deviceId}, #{item.clusterDeviceId},#{item.interResistance} + ) + + + + + delete from ems_battery_data_minutes where data_timestamp < #{oneHourAgoStr} + + \ No newline at end of file diff --git a/ems-system/src/main/resources/mapper/ems/EmsBatteryDataMonthMapper.xml b/ems-system/src/main/resources/mapper/ems/EmsBatteryDataMonthMapper.xml new file mode 100644 index 0000000..87f6b2b --- /dev/null +++ b/ems-system/src/main/resources/mapper/ems/EmsBatteryDataMonthMapper.xml @@ -0,0 +1,202 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + select id, battery_pack, battery_cluster, battery_cell_id, voltage, temperature, soc, soh, data_timestamp, create_by, create_time, update_by, update_time, remark, site_id, device_id, cluster_device_id, inter_resistance, month_time from ems_battery_data_month + + + + + + + + insert into ems_battery_data_month + + battery_pack, + battery_cluster, + battery_cell_id, + voltage, + temperature, + soc, + soh, + data_timestamp, + create_by, + create_time, + update_by, + update_time, + remark, + site_id, + device_id, + cluster_device_id, + inter_resistance, + month_time, + + + #{batteryPack}, + #{batteryCluster}, + #{batteryCellId}, + #{voltage}, + #{temperature}, + #{soc}, + #{soh}, + #{dataTimestamp}, + #{createBy}, + #{createTime}, + #{updateBy}, + #{updateTime}, + #{remark}, + #{siteId}, + #{deviceId}, + #{clusterDeviceId}, + #{interResistance}, + #{monthTime}, + + + + + update ems_battery_data_month + + battery_pack = #{batteryPack}, + battery_cluster = #{batteryCluster}, + battery_cell_id = #{batteryCellId}, + voltage = #{voltage}, + temperature = #{temperature}, + soc = #{soc}, + soh = #{soh}, + data_timestamp = #{dataTimestamp}, + create_by = #{createBy}, + create_time = #{createTime}, + update_by = #{updateBy}, + update_time = #{updateTime}, + remark = #{remark}, + site_id = #{siteId}, + device_id = #{deviceId}, + cluster_device_id = #{clusterDeviceId}, + inter_resistance = #{interResistance}, + month_time = #{monthTime}, + + where id = #{id} + + + + delete from ems_battery_data_month where id = #{id} + + + + delete from ems_battery_data_month where id in + + #{id} + + + + + + + + INSERT INTO ems_battery_data_month ( + battery_pack, + battery_cluster, + battery_cell_id, + voltage, + temperature, + soc, + soh, + data_timestamp, + create_by, + create_time, + update_by, + update_time, + remark, + site_id, + device_id, + cluster_device_id, + inter_resistance, + month_time + ) VALUES + + ( + #{item.batteryPack}, + #{item.batteryCluster}, + #{item.batteryCellId}, + #{item.voltage}, + #{item.temperature}, + #{item.soc}, + #{item.soh}, + #{item.dataTimestamp}, + #{item.createBy}, + #{item.createTime}, + #{item.updateBy}, + #{item.updateTime}, + #{item.remark}, + #{item.siteId}, + #{item.deviceId}, + #{item.clusterDeviceId}, + #{item.interResistance}, + #{item.monthTime} + ) + + ON DUPLICATE KEY UPDATE + -- 仅当新温度高于现有温度时,才更新温度和相关字段 + update_time = IF(VALUES(temperature) > temperature, NOW(), update_time), + data_timestamp = IF(VALUES(temperature) > temperature, VALUES(data_timestamp), data_timestamp), + voltage = IF(VALUES(temperature) > temperature, VALUES(voltage), voltage), + soc = IF(VALUES(temperature) > temperature, VALUES(soc), soc), + soh = IF(VALUES(temperature) > temperature, VALUES(soh), soh), + inter_resistance = IF(VALUES(temperature) > temperature, VALUES(inter_resistance), inter_resistance), + update_by = IF(VALUES(temperature) > temperature, VALUES(update_by), update_by) + temperature = IF(VALUES(temperature) > temperature, VALUES(temperature), temperature) + + \ No newline at end of file