This commit is contained in:
2026-04-20 20:42:57 +08:00
parent 7153c00d0c
commit 111631a426
7 changed files with 206 additions and 109 deletions

View File

@ -28,6 +28,7 @@ import java.util.Map;
public class InfluxPointDataWriter {
private static final Logger log = LoggerFactory.getLogger(InfluxPointDataWriter.class);
private static final Logger influxWriteSuccessLog = LoggerFactory.getLogger("sys-influxdb");
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Value("${influxdb.enabled:true}")
@ -91,6 +92,7 @@ public class InfluxPointDataWriter {
}
try {
StringBuilder body = new StringBuilder();
List<String> pointSummaries = new ArrayList<>();
for (PointWritePayload payload : payloads) {
if (payload == null || payload.getPointValue() == null) {
continue;
@ -103,6 +105,12 @@ public class InfluxPointDataWriter {
.append(" value=").append(payload.getPointValue().toPlainString())
.append(" ").append(time)
.append("\n");
pointSummaries.add(String.format("siteId=%s, deviceId=%s, pointKey=%s, value=%s, time=%s",
safe(payload.getSiteId()),
safe(payload.getDeviceId()),
safe(payload.getPointKey()),
payload.getPointValue(),
new Date(time)));
}
if (body.length() == 0) {
return;
@ -112,6 +120,8 @@ public class InfluxPointDataWriter {
log.warn("写入 InfluxDB 失败v2 写入地址未构建成功,请检查 influxdb.org / influxdb.bucket 配置");
return;
}
long startMillis = System.currentTimeMillis();
int writeCount = countWriteLines(body);
HttpResult result = executeRequest(methodOrDefault(writeMethod, "POST"), writeUrl, body.toString());
if (result.code < 200 || result.code >= 300) {
if (result.code == 404 && isV2WritePath() && isOrgOrBucketMissing(result.body)) {
@ -119,6 +129,7 @@ public class InfluxPointDataWriter {
HttpResult retryResult = executeRequest(methodOrDefault(writeMethod, "POST"), writeUrl, body.toString());
if (retryResult.code >= 200 && retryResult.code < 300) {
log.info("InfluxDB org/bucket 自动创建成功,写入已恢复");
logWriteSuccess(writeCount, writeUrl, startMillis, pointSummaries);
return;
}
log.warn("InfluxDB 重试写入失败HTTP状态码: {}, url: {}, body: {}", retryResult.code, writeUrl, safeLog(retryResult.body));
@ -126,7 +137,9 @@ public class InfluxPointDataWriter {
}
}
log.warn("写入 InfluxDB 失败HTTP状态码: {}, url: {}, body: {}", result.code, writeUrl, safeLog(result.body));
return;
}
logWriteSuccess(writeCount, writeUrl, startMillis, pointSummaries);
} catch (Exception e) {
log.warn("写入 InfluxDB 失败: {}", e.getMessage());
}
@ -173,7 +186,8 @@ public class InfluxPointDataWriter {
startTime.getTime(),
endTime.getTime()
);
values = parseInfluxQlResponse(executeRequestWithResponse(methodOrDefault(readMethod, "GET"), buildQueryUrl(regexQuery)));
String regexQueryUrl = buildQueryUrl(regexQuery);
values = parseInfluxQlResponse(executeRequestWithResponse(methodOrDefault(readMethod, "GET"), regexQueryUrl));
return values;
} catch (Exception e) {
log.warn("查询 InfluxDB 曲线失败: {}", e.getMessage());
@ -204,7 +218,8 @@ public class InfluxPointDataWriter {
try {
String queryUrl = buildQueryUrl(influxQl);
return parseInfluxQlResponse(executeRequestWithResponse(methodOrDefault(readMethod, "GET"), queryUrl));
List<PointValue> values = parseInfluxQlResponse(executeRequestWithResponse(methodOrDefault(readMethod, "GET"), queryUrl));
return values;
} catch (Exception e) {
log.warn("按 pointKey 查询 InfluxDB 曲线失败: {}", e.getMessage());
return Collections.emptyList();
@ -234,24 +249,9 @@ public class InfluxPointDataWriter {
);
try {
String queryUrl = buildQueryUrl(influxQl);
List<PointValue> values = parseInfluxQlResponse(
executeRequestWithResponse(methodOrDefault(readMethod, "GET"), buildQueryUrl(influxQl))
);
if (!values.isEmpty()) {
return values.get(0);
}
String regexQuery = String.format(
"SELECT \"value\" FROM \"%s\" WHERE \"site_id\" = '%s' AND \"point_key\" =~ /(?i)^%s$/ " +
"AND time >= %dms AND time <= %dms ORDER BY time DESC LIMIT 1",
measurement,
escapeTagValue(normalizedSiteId),
escapeRegex(normalizedPointKey),
startTime.getTime(),
endTime.getTime()
);
values = parseInfluxQlResponse(
executeRequestWithResponse(methodOrDefault(readMethod, "GET"), buildQueryUrl(regexQuery))
executeRequestWithResponse(methodOrDefault(readMethod, "GET"), queryUrl)
);
return values.isEmpty() ? null : values.get(0);
} catch (Exception e) {
@ -260,6 +260,21 @@ public class InfluxPointDataWriter {
}
}
private void logWriteSuccess(int writeCount, String writeUrl, long startMillis, List<String> pointSummaries) {
influxWriteSuccessLog.info("InfluxDB写入成功, count: {}, points: {}, url: {}, costMs: {}",
writeCount, pointSummaries, writeUrl, System.currentTimeMillis() - startMillis);
}
private int countWriteLines(StringBuilder body) {
int count = 0;
for (int i = 0; i < body.length(); i++) {
if (body.charAt(i) == '\n') {
count++;
}
}
return count;
}
private String buildWriteUrl() {
if (isV2WritePath()) {
return buildV2WriteUrl();

View File

@ -2816,15 +2816,16 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i
private JSONArray parseJsonData(String message) {
try {
JSONObject object = JSONObject.parseObject(message);
return object.getJSONArray("payload");
} catch (JSONException e) {
log.info("mqtt message is not a json object: " + e.getMessage());
try {
return JSONArray.parseArray(message);
} catch (Exception arrayException) {
log.info("mqtt message is not a json array: " + e.getMessage());
Object parsed = JSON.parse(message);
if (parsed instanceof JSONArray) {
return (JSONArray) parsed;
}
if (parsed instanceof JSONObject) {
return ((JSONObject) parsed).getJSONArray("payload");
}
log.info("mqtt message root type is unsupported: {}", parsed == null ? "null" : parsed.getClass().getName());
} catch (JSONException e) {
log.info("mqtt message parse failed: {}", e.getMessage());
}
return null;
}
@ -3954,15 +3955,22 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i
log.info("deviceId:" + deviceId);
if (checkJsonDataEmpty(jsonData)) {
log.warn("设备告警Data为空跳过告警处理siteId: {}deviceId: {}", siteId, deviceId);
continue;
}
// 存放mqtt原始每个设备最晚一次数据便于后面点位获取数据
redisCache.setCacheObject(RedisKeyConstants.ORIGINAL_MQTT_DATA_ALARM + siteId + "_" + deviceId, obj);
// 存放每次同步数据,失效时间(同同步时间)-用于判断是否正常同步数据
redisCache.setCacheObject(RedisKeyConstants.SYNC_DATA_ALARM + siteId + "_" + deviceId, obj, 1, TimeUnit.MINUTES);
// 处理设备数据,根据不同设备类型进行不同的数据处理
String deviceCategory = processingDeviceAlarmData(siteId, deviceId, jsonData, dataUpdateTime);
if (StringUtils.isEmpty(deviceCategory)) {
if (StringUtils.isNotEmpty(deviceCategory)) {
// 处理告警信息
alarmDataProcess(siteId, deviceId, jsonData, alarmMatchInfo, deviceCategory);
} else {
log.warn("设备告警数据未识别到设备类型跳过告警处理siteId: {}deviceId: {}", siteId, deviceId);
}
}
}
@ -4097,8 +4105,21 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i
private void alarmDataProcess(String siteId, String deviceId, String jsonData,
Map<String, EmsAlarmMatchData> alarmInfoData, String category) {
if (StringUtils.isEmpty(category)) {
log.warn("设备类型为空跳过告警处理siteId: {}deviceId: {}", siteId, deviceId);
return;
}
if (alarmInfoData == null || alarmInfoData.isEmpty()) {
log.warn("告警匹配缓存为空跳过告警处理siteId: {}deviceId: {}category: {}", siteId, deviceId, category);
return;
}
Map<String, Object> obj = JSON.parseObject(jsonData, new TypeReference<Map<String, Object>>() {
});
if (obj == null || obj.isEmpty()) {
log.warn("设备告警Data解析结果为空跳过告警处理siteId: {}deviceId: {}category: {}data: {}",
siteId, deviceId, category, jsonData);
return;
}
String redisKey = RedisKeyConstants.LATEST_ALARM_RECORD + "_" + siteId + "_" + deviceId;
// 获取redis里面的当前有效告警遍历添加到已存在告警key里面
@ -4114,12 +4135,8 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i
// 结合同步数据,筛选簇需要更新的告警信息
List<String> needUpdateKeys = obj.entrySet().stream()
.filter(entry -> {
Object valueObj = entry.getValue();
if (valueObj == null) {
return false;
}
int value = Integer.parseInt(valueObj.toString());
return value == 0 && currentAlarmKeys.contains(entry.getKey());
Long value = convertToLong(entry.getValue());
return value != null && value == 0L && currentAlarmKeys.contains(entry.getKey());
})
.map(Map.Entry::getKey)
.collect(Collectors.toList());
@ -4129,11 +4146,13 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i
if (!needUpdateKeys.isEmpty()) {
List<EmsAlarmRecords> records = iEmsAlarmRecordsService.getAllUnfinishedRecords(needUpdateKeys, siteId, deviceId);
// 转为Map便于快速获取
needUpdateMap = records.stream()
.collect(Collectors.toMap(
EmsAlarmRecords::getAlarmPoint,
record -> record
));
if (CollectionUtils.isNotEmpty(records)) {
needUpdateMap = records.stream()
.collect(Collectors.toMap(
EmsAlarmRecords::getAlarmPoint,
record -> record
));
}
}
@ -4147,20 +4166,28 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i
continue;
}
String key = entry.getKey();
Long value = (Long) entry.getValue();
Long value = convertToLong(entry.getValue());
if (value == null) {
log.warn("告警值不是有效数字忽略处理siteId: {}deviceId: {}point: {}value: {}",
siteId, deviceId, key, entry.getValue());
continue;
}
Boolean isCurrentAlarm = currentAlarmKeys.contains(key);
String matchRedisKey = siteId + "_" + category + "_" + key;
// 默认告警值0是正常1是异常
Long alarmData = 1L;
Object cacheObj = alarmInfoData.get(matchRedisKey);
if (cacheObj != null) {
EmsAlarmMatchData matchInfo = JSON.toJavaObject(cacheObj, EmsAlarmMatchData.class);
EmsAlarmMatchData matchInfo = alarmInfoData.get(matchRedisKey);
if (matchInfo != null) {
alarmData = matchInfo.getAlarmData();
}
// 处理告警
if (value.equals(alarmData) && !isCurrentAlarm) {
// 上送值和匹配值相同,新增告警
EmsAlarmMatchData matchInfo = JSON.toJavaObject(cacheObj, EmsAlarmMatchData.class);
if (matchInfo == null) {
log.warn("未找到告警匹配配置忽略新增告警siteId: {}deviceId: {}category: {}point: {}",
siteId, deviceId, category, key);
continue;
}
EmsAlarmRecords emsAlarmRecord = convertAlarmRecord(siteId, deviceId, matchInfo);
saveOrUpdateList.add(emsAlarmRecord);
newAddRecordList.add(emsAlarmRecord);
@ -4219,6 +4246,24 @@ public class DeviceDataProcessServiceImpl extends AbstractBatteryDataProcessor i
}
}
private Long convertToLong(Object value) {
if (value == null) {
return null;
}
if (value instanceof Number) {
return ((Number) value).longValue();
}
String text = String.valueOf(value);
if (StringUtils.isBlank(text)) {
return null;
}
try {
return new BigDecimal(text.trim()).longValue();
} catch (NumberFormatException ex) {
return null;
}
}
private EmsAlarmRecords convertAlarmRecord(String siteId, String deviceId, EmsAlarmMatchData matchInfo) {
EmsAlarmRecords emsAlarmRecords = new EmsAlarmRecords();
emsAlarmRecords.setSiteId(siteId);

View File

@ -125,7 +125,10 @@
t.alarm_content as alarmContent,
t.ticket_no as ticketNo,
t.id
from ems_alarm_records t INNER JOIN ems_devices_setting t2 on t.site_id = t2.site_id and t.device_id = t2.device_id
from ems_alarm_records t
INNER JOIN ems_devices_setting t2
on t.site_id COLLATE utf8mb4_general_ci = t2.site_id
and t.device_id COLLATE utf8mb4_general_ci = t2.device_id
where t.site_id = #{siteId}
and t.status != 1
and t.alarm_level in ('C','D')
@ -156,7 +159,9 @@
t.ticket_no as ticketNo,
t.id
from ems_alarm_records t
LEFT JOIN ems_devices_setting t2 on t.site_id = t2.site_id and t.device_id = t2.device_id
LEFT JOIN ems_devices_setting t2
on t.site_id = t2.site_id
and t.device_id = t2.device_id
where t.site_id = #{siteId}
<if test="deviceType != null and deviceType != ''">
AND t.device_type = #{deviceType}
@ -265,4 +270,4 @@
and device_id = #{deviceId}
and alarm_point is not null
</select>
</mapper>
</mapper>

View File

@ -288,65 +288,80 @@
<select id="getRevenueDataBySiteId" resultType="com.xzzn.ems.domain.vo.AmmeterRevenueStatisListVo">
select
DATE_FORMAT(t.data_date, '%Y-%m-%d') as dataTime,
COALESCE(c.is_workday, CASE WHEN WEEKDAY(t.data_date) &lt; 5 THEN 1 ELSE 0 END) as isWorkday,
CASE
WHEN COALESCE(c.is_workday, CASE WHEN WEEKDAY(t.data_date) &lt; 5 THEN 1 ELSE 0 END) = 1 THEN '工作日'
ELSE '节假日'
END as dayType,
COALESCE(NULLIF(TRIM(w.weather_desc), ''), '--') as weatherDesc,
ROUND(SUM(IFNULL(t.peak_charge_diff, 0) * IFNULL(pc.peak, 0)), 3) as activePeakPrice,
ROUND(SUM(IFNULL(t.peak_discharge_diff, 0) * IFNULL(pc.peak, 0)), 3) as reActivePeakPrice,
ROUND(SUM(IFNULL(t.high_charge_diff, 0) * IFNULL(pc.high, 0)), 3) as activeHighPrice,
ROUND(SUM(IFNULL(t.high_discharge_diff, 0) * IFNULL(pc.high, 0)), 3) as reActiveHighPrice,
ROUND(SUM(IFNULL(t.flat_charge_diff, 0) * IFNULL(pc.flat, 0)), 3) as activeFlatPrice,
ROUND(SUM(IFNULL(t.flat_discharge_diff, 0) * IFNULL(pc.flat, 0)), 3) as reActiveFlatPrice,
ROUND(SUM(IFNULL(t.valley_charge_diff, 0) * IFNULL(pc.valley, 0)), 3) as activeValleyPrice,
ROUND(SUM(IFNULL(t.valley_discharge_diff, 0) * IFNULL(pc.valley, 0)), 3) as reActiveValleyPrice,
ROUND(
SUM(
(IFNULL(t.peak_discharge_diff, 0) - IFNULL(t.peak_charge_diff, 0)) * IFNULL(pc.peak, 0)
+ (IFNULL(t.high_discharge_diff, 0) - IFNULL(t.high_charge_diff, 0)) * IFNULL(pc.high, 0)
+ (IFNULL(t.flat_discharge_diff, 0) - IFNULL(t.flat_charge_diff, 0)) * IFNULL(pc.flat, 0)
+ (IFNULL(t.valley_discharge_diff, 0) - IFNULL(t.valley_charge_diff, 0)) * IFNULL(pc.valley, 0)
revenue.dataTime as dataTime,
revenue.isWorkday as isWorkday,
case
when revenue.isWorkday = 1 then '工作日'
else '节假日'
end as dayType,
revenue.weatherDesc as weatherDesc,
revenue.activePeakPrice as activePeakPrice,
revenue.reActivePeakPrice as reActivePeakPrice,
revenue.activeHighPrice as activeHighPrice,
revenue.reActiveHighPrice as reActiveHighPrice,
revenue.activeFlatPrice as activeFlatPrice,
revenue.reActiveFlatPrice as reActiveFlatPrice,
revenue.activeValleyPrice as activeValleyPrice,
revenue.reActiveValleyPrice as reActiveValleyPrice,
revenue.actualRevenue as actualRevenue
from (
select
DATE_FORMAT(t.data_date, '%Y-%m-%d') as dataTime,
COALESCE(c.is_workday, CASE WHEN WEEKDAY(t.data_date) &lt; 5 THEN 1 ELSE 0 END) as isWorkday,
COALESCE(NULLIF(TRIM(w.weather_desc), ''), '--') as weatherDesc,
ROUND(SUM(IFNULL(t.peak_charge_diff, 0) * IFNULL(pc.peak, 0)), 3) as activePeakPrice,
ROUND(SUM(IFNULL(t.peak_discharge_diff, 0) * IFNULL(pc.peak, 0)), 3) as reActivePeakPrice,
ROUND(SUM(IFNULL(t.high_charge_diff, 0) * IFNULL(pc.high, 0)), 3) as activeHighPrice,
ROUND(SUM(IFNULL(t.high_discharge_diff, 0) * IFNULL(pc.high, 0)), 3) as reActiveHighPrice,
ROUND(SUM(IFNULL(t.flat_charge_diff, 0) * IFNULL(pc.flat, 0)), 3) as activeFlatPrice,
ROUND(SUM(IFNULL(t.flat_discharge_diff, 0) * IFNULL(pc.flat, 0)), 3) as reActiveFlatPrice,
ROUND(SUM(IFNULL(t.valley_charge_diff, 0) * IFNULL(pc.valley, 0)), 3) as activeValleyPrice,
ROUND(SUM(IFNULL(t.valley_discharge_diff, 0) * IFNULL(pc.valley, 0)), 3) as reActiveValleyPrice,
ROUND(
SUM(
(IFNULL(t.peak_discharge_diff, 0) - IFNULL(t.peak_charge_diff, 0)) * IFNULL(pc.peak, 0)
+ (IFNULL(t.high_discharge_diff, 0) - IFNULL(t.high_charge_diff, 0)) * IFNULL(pc.high, 0)
+ (IFNULL(t.flat_discharge_diff, 0) - IFNULL(t.flat_charge_diff, 0)) * IFNULL(pc.flat, 0)
+ (IFNULL(t.valley_discharge_diff, 0) - IFNULL(t.valley_charge_diff, 0)) * IFNULL(pc.valley, 0)
),
3
) as actualRevenue
from ems_daily_energy_data t
left join ems_calendar_day c on c.calendar_date = t.data_date
left join ems_site_weather_day w on w.site_id = t.site_id and w.calendar_date = t.data_date
left join ems_energy_price_config pc on pc.id = COALESCE(
(
select p.id
from ems_energy_price_config p
where p.site_id = t.site_id
and STR_TO_DATE(CONCAT(p.year, '-', LPAD(p.month, 2, '0'), '-01'), '%Y-%m-%d') &lt;= DATE_FORMAT(t.data_date, '%Y-%m-01')
order by STR_TO_DATE(CONCAT(p.year, '-', LPAD(p.month, 2, '0'), '-01'), '%Y-%m-%d') desc
limit 1
),
3
) as actualRevenue
from ems_daily_energy_data t
left join ems_calendar_day c on c.calendar_date = t.data_date
left join ems_site_weather_day w on w.site_id = t.site_id and w.calendar_date = t.data_date
left join ems_energy_price_config pc on pc.id = COALESCE(
(
select p.id
from ems_energy_price_config p
where p.site_id = t.site_id
and STR_TO_DATE(CONCAT(p.year, '-', LPAD(p.month, 2, '0'), '-01'), '%Y-%m-%d') &lt;= DATE_FORMAT(t.data_date, '%Y-%m-01')
order by STR_TO_DATE(CONCAT(p.year, '-', LPAD(p.month, 2, '0'), '-01'), '%Y-%m-%d') desc
limit 1
),
(
select p2.id
from ems_energy_price_config p2
where p2.site_id = t.site_id
order by STR_TO_DATE(CONCAT(p2.year, '-', LPAD(p2.month, 2, '0'), '-01'), '%Y-%m-%d') asc
limit 1
(
select p2.id
from ems_energy_price_config p2
where p2.site_id = t.site_id
order by STR_TO_DATE(CONCAT(p2.year, '-', LPAD(p2.month, 2, '0'), '-01'), '%Y-%m-%d') asc
limit 1
)
)
)
where 1=1
and t.data_hour is not null
<if test="siteId != null">
and t.site_id = #{siteId}
</if>
<if test="startTime != null">
and t.data_date &gt;= #{startTime}
</if>
<if test="endTime != null">
and t.data_date &lt;= #{endTime}
</if>
group by t.data_date,
COALESCE(c.is_workday, CASE WHEN WEEKDAY(t.data_date) &lt; 5 THEN 1 ELSE 0 END),
COALESCE(NULLIF(TRIM(w.weather_desc), ''), '--')
order by t.data_date asc
where 1=1
and t.data_hour is not null
<if test="siteId != null">
and t.site_id = #{siteId}
</if>
<if test="startTime != null">
and t.data_date &gt;= #{startTime}
</if>
<if test="endTime != null">
and t.data_date &lt;= #{endTime}
</if>
group by t.data_date,
COALESCE(c.is_workday, CASE WHEN WEEKDAY(t.data_date) &lt; 5 THEN 1 ELSE 0 END),
COALESCE(NULLIF(TRIM(w.weather_desc), ''), '--')
) revenue
order by revenue.dataTime asc
</select>
</mapper>