@ -28,6 +28,7 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.ConcurrentLinkedQueue ;
import java.util.concurrent.ThreadPoolExecutor ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.locks.ReentrantLock ;
import java.util.stream.Collectors ;
import java.util.stream.IntStream ;
@ -95,7 +96,6 @@ public abstract class AbstractBatteryDataProcessor {
// 传递分片索引和是否最后一个分片的标记
ioExecutor . execute ( ( ) - > processShard ( shard , isLastShard ) ) ;
}
//shards.forEach(shard -> ioExecutor.execute(() -> processShard(shard)));
}
/**
@ -245,20 +245,11 @@ public abstract class AbstractBatteryDataProcessor {
public void batchUpdateHourCache ( List < EmsBatteryDataHour > hourDataList ) {
if ( hourDataList . isEmpty ( ) ) return ;
// 1. 构建缓存键值对(批量操作提升效率)
Map < String , String > tempCacheMap = new HashMap < > ( hourDataList . size ( ) ) ;
for ( EmsBatteryDataHour data : hourDataList ) {
EmsBatteryData cacheData = new EmsBatteryData ( ) ;
BeanUtils . copyProperties ( data , cacheData ) ;
// 2. 生成唯一缓存键(与数据库唯一键一致)
String cacheKey = generateCacheKey ( cacheData , " hour " ) ;
// 3. 存储最高温度和最后更新时间(两个独立缓存项)
tempCacheMap . put ( cacheKey , data . getTemperature ( ) . toString ( ) ) ;
}
Map < String , String > tempCacheMap = buildCacheMap ( hourDataList , " hour " ) ;
try {
// 4. 批量设置缓存( Redis批量操作减少网络交互)
redisCache . multiSet ( tempCacheMap ) ;
// 批量设置缓存
redisCache . multiSetWithExpire ( tempCacheMap , 1 , TimeUnit . HOURS );
} catch ( Exception e ) {
log . error ( " 批量更新小时级缓存失败 " , e ) ;
// 缓存更新失败不影响主流程,但需记录日志便于排查
@ -343,20 +334,11 @@ public abstract class AbstractBatteryDataProcessor {
public void batchUpdateDayCache ( List < EmsBatteryDataDay > dayDataList ) {
if ( dayDataList . isEmpty ( ) ) return ;
// 1. 构建缓存键值对(批量操作提升效率)
Map < String , String > tempCacheMap = new HashMap < > ( dayDataList . size ( ) ) ;
for ( EmsBatteryDataDay data : dayDataList ) {
EmsBatteryData cacheData = new EmsBatteryData ( ) ;
BeanUtils . copyProperties ( data , cacheData ) ;
// 2. 生成唯一缓存键(与数据库唯一键一致)
String cacheKey = generateCacheKey ( cacheData , " day " ) ;
// 3. 存储最高温度和最后更新时间(两个独立缓存项)
tempCacheMap . put ( cacheKey , data . getTemperature ( ) . toString ( ) ) ;
}
Map < String , String > tempCacheMap = buildCacheMap ( dayDataList , " hour " ) ;
try {
// 4. 批量设置缓存( Redis批量操作减少网络交互)
redisCache . multiSet ( tempCacheMap ) ;
// 批量设置缓存
redisCache . multiSetWithExpire ( tempCacheMap , 24 , TimeUnit . HOURS );
} catch ( Exception e ) {
log . error ( " 批量更新小时级缓存失败 " , e ) ;
// 缓存更新失败不影响主流程,但需记录日志便于排查
@ -454,20 +436,11 @@ public abstract class AbstractBatteryDataProcessor {
public void batchUpdateMonthCache ( List < EmsBatteryDataMonth > monthDataList ) {
if ( monthDataList . isEmpty ( ) ) return ;
// 1. 构建缓存键值对(批量操作提升效率)
Map < String , String > tempCacheMap = new HashMap < > ( monthDataList . size ( ) ) ;
for ( EmsBatteryDataMonth data : monthDataList ) {
EmsBatteryData cacheData = new EmsBatteryData ( ) ;
BeanUtils . copyProperties ( data , cacheData ) ;
// 2. 生成唯一缓存键(与数据库唯一键一致)
String cacheKey = generateCacheKey ( cacheData , " month " ) ;
// 3. 存储最高温度和最后更新时间(两个独立缓存项)
tempCacheMap . put ( cacheKey , data . getTemperature ( ) . toString ( ) ) ;
}
Map < String , String > tempCacheMap = buildCacheMap ( monthDataList , " hour " ) ;
try {
// 4. 批量设置缓存( Redis批量操作减少网络交互)
redisCache . multiSet ( tempCacheMap ) ;
// 批量设置缓存
redisCache . multiSetWithExpire ( tempCacheMap , 24 * 30 , TimeUnit . HOURS );
} catch ( Exception e ) {
log . error ( " 批量更新月级缓存失败 " , e ) ;
// 缓存更新失败不影响主流程,但需记录日志便于排查
@ -659,6 +632,30 @@ public abstract class AbstractBatteryDataProcessor {
}
}
// 通用构建缓存键值对的方法(根据不同数据类型适配)
private < T > Map < String , String > buildCacheMap ( List < T > dataList , String granularity ) {
Map < String , String > cacheMap = new HashMap < > ( dataList . size ( ) ) ;
for ( T data : dataList ) {
EmsBatteryData cacheData = new EmsBatteryData ( ) ;
BeanUtils . copyProperties ( data , cacheData ) ;
String cacheKey = generateCacheKey ( cacheData , granularity ) ;
// 根据数据类型获取温度值(实际项目中可通过接口或反射统一处理)
String temperature ;
if ( data instanceof EmsBatteryDataHour ) {
temperature = ( ( EmsBatteryDataHour ) data ) . getTemperature ( ) . toString ( ) ;
} else if ( data instanceof EmsBatteryDataDay ) {
temperature = ( ( EmsBatteryDataDay ) data ) . getTemperature ( ) . toString ( ) ;
} else if ( data instanceof EmsBatteryDataMonth ) {
temperature = ( ( EmsBatteryDataMonth ) data ) . getTemperature ( ) . toString ( ) ;
} else {
throw new IllegalArgumentException ( " 不支持的数据类型 " ) ;
}
cacheMap . put ( cacheKey , temperature ) ;
}
return cacheMap ;
}
// 新增定时任务, 每1分钟强制刷新一次队列( 防止数据长期积压)
@Scheduled ( fixedRate = 60000 ) // 1分钟 = 60000毫秒
protected void scheduledFlushBatch ( ) {