From 2ffec8d3a6fb2cc2a65b356830d6802c3cd0bfa4 Mon Sep 17 00:00:00 2001 From: Timer <@> Date: Thu, 26 Feb 2026 01:08:27 +0800 Subject: [PATCH] =?UTF-8?q?mqtt=E5=88=9D=E5=A7=8B=E5=8C=96=E9=93=BE?= =?UTF-8?q?=E6=8E=A5=E8=B0=83=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/sipai/entity/scada/MPointHistory.java | 21 ++++++++ .../java/com/sipai/schedule/DataSynJob.java | 17 ++++--- .../service/mqtt/impl/MqttServiceImpl.java | 38 ++++++++++++-- .../service/scada/MPointHistoryService.java | 6 ++- .../scada/impl/MPointBzwServiceImpl.java | 13 ++--- .../scada/impl/MPointHistoryServiceImpl.java | 50 +++++++++++++------ .../service/scada/impl/MPointServiceImpl.java | 36 ++++++++----- src/main/java/com/sipai/tools/CommUtil.java | 2 +- .../com/sipai/tools/DataSourceAspect.java | 26 +++++----- .../com/sipai/tools/DataSourceTypeAnno.java | 1 + .../mapper/scada/MPointHistoryMapper.xml | 27 +++++----- .../mybatis/mapper/scada/MPointMapper.xml | 2 +- 12 files changed, 163 insertions(+), 76 deletions(-) diff --git a/src/main/java/com/sipai/entity/scada/MPointHistory.java b/src/main/java/com/sipai/entity/scada/MPointHistory.java index 567c043..978d000 100644 --- a/src/main/java/com/sipai/entity/scada/MPointHistory.java +++ b/src/main/java/com/sipai/entity/scada/MPointHistory.java @@ -3,6 +3,7 @@ package com.sipai.entity.scada; import com.sipai.entity.base.SQLAdapter; import java.io.Serializable; import java.math.BigDecimal; +import java.util.Date; public class MPointHistory extends SQLAdapter implements Serializable{ /** @@ -151,4 +152,24 @@ public class MPointHistory extends SQLAdapter implements Serializable{ return serialVersionUID; } + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("MPointHistory{"); + sb.append("itemid=").append(itemid); + sb.append(", parmvalue=").append(parmvalue); + sb.append(", measuredt='").append(measuredt).append('\''); + sb.append(", memotype='").append(memotype).append('\''); + sb.append(", memo='").append(memo).append('\''); + sb.append(", userid='").append(userid).append('\''); + sb.append(", insdt='").append(insdt).append('\''); + sb.append(", tbName='").append(tbName).append('\''); + sb.append(", tag=").append(tag); + sb.append(", year='").append(year).append('\''); + sb.append(", month='").append(month).append('\''); + sb.append(", day='").append(day).append('\''); + sb.append(", hour='").append(hour).append('\''); + sb.append(", min='").append(min).append('\''); + sb.append('}'); + return sb.toString(); + } } \ No newline at end of file diff --git a/src/main/java/com/sipai/schedule/DataSynJob.java b/src/main/java/com/sipai/schedule/DataSynJob.java index 881b788..3ad4ea3 100644 --- a/src/main/java/com/sipai/schedule/DataSynJob.java +++ b/src/main/java/com/sipai/schedule/DataSynJob.java @@ -47,21 +47,22 @@ public class DataSynJob { } @Async -// @Scheduled(cron = "0 0/3 * * * ?")//数据转发 + @Scheduled(cron = "0 0 0/1 * * ?")//数据转发 public void job2() { if (!scheduledEnabled) return; // 手动拦截 String addstr = "zhuanfa"; System.out.println("开始定时器-----------------" + CommUtil.nowDate() + "-----------------" + addstr); - List list2 = this.mPointService.selectListByWhere("where bizid = '0791CNWS' and SignalType='AI' and MeasureDT>DATE_SUB(NOW(), INTERVAL 5 MINUTE) and source_type='auto' "); - if (list2 != null && list2.size() > 0) { - for (int j = 0; j < list2.size(); ++j) { +// List list2 = this.mPointService.selectListByWhere("where bizid = '0533JS' and SignalType='AI' and MeasureDT>DATE_SUB(NOW(), INTERVAL 5 MINUTE) and source_type='auto' "); + List list2 = this.mPointService.selectListByWhere("where bizid = '0533JS' and SignalType='AI' and source_type='auto' "); + if (list2 != null && !list2.isEmpty()) { + for (MPoint mPoint : list2) { MPointHistory mPointHistory = new MPointHistory(); - mPointHistory.setParmvalue(list2.get(j).getParmvalue()); - mPointHistory.setMeasuredt(list2.get(j).getMeasuredt()); - mPointHistory.setTbName(list2.get(j).getMpointcode()); + mPointHistory.setParmvalue(mPoint.getParmvalue()); + mPointHistory.setMeasuredt(mPoint.getMeasuredt()); + mPointHistory.setTbName(mPoint.getMpointcode()); mPointHistory.setUserid("data_job"); mPointHistory.setInsdt(CommUtil.nowDate()); - mPointHistoryService.saveByCreate(list2.get(j).getBizid(), mPointHistory); + mPointHistoryService.saveByCreate(mPoint.getBizid(), mPointHistory); } System.out.println("完成一次(有数据)转发,执行了" + list2.size() + "次点"); } else { diff --git a/src/main/java/com/sipai/service/mqtt/impl/MqttServiceImpl.java b/src/main/java/com/sipai/service/mqtt/impl/MqttServiceImpl.java index 4f6edda..f794fb3 100644 --- a/src/main/java/com/sipai/service/mqtt/impl/MqttServiceImpl.java +++ b/src/main/java/com/sipai/service/mqtt/impl/MqttServiceImpl.java @@ -2,6 +2,7 @@ package com.sipai.service.mqtt.impl; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; +import com.sipai.dao.scada.MPointHistoryDao; import com.sipai.entity.mqtt.Mqtt; import com.sipai.entity.mqtt.MqttProperties; import com.sipai.entity.scada.MPoint; @@ -29,10 +30,9 @@ import org.springframework.stereotype.Service; import java.math.BigDecimal; import java.math.RoundingMode; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -49,6 +49,10 @@ public class MqttServiceImpl implements MqttService { private ListenerPointService listenerPointService; @Autowired private RedissonClient redissonClient; + @Autowired + private MPointHistoryService mPointHistoryService; + @Autowired + MPointHistoryDao mPointHistoryDao; private static MqttClient mqttClient; private static String ipStr = ""; @@ -90,10 +94,13 @@ public class MqttServiceImpl implements MqttService { return mqttClient; } + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + @Async("getAsyncMqttHandle") @Override public void doHandle(String bizId, String topic, String ip4, String port, JSONArray jsonArray) { System.out.println("MQTT接收: biz=" + bizId + " topic=" + topic + " msgCount=" + jsonArray.size()); +// MPointHistoryService mPointHistoryService = (MPointHistoryService) SpringContextUtil.getBean("mPointHistoryService"); for (int i = 0; i < jsonArray.size(); i++) { JSONObject jsonObject = jsonArray.getJSONObject(i); // 解析时间戳 @@ -135,6 +142,13 @@ public class MqttServiceImpl implements MqttService { // 处理ES数据 MPointES esPoint = buildEsPoint(mPoint, value, date); redisList.add(esPoint); + + MPointHistory mPointHistory = new MPointHistory(); + mPointHistory.setMeasuredt(date); + mPointHistory.setParmvalue(value); + mPointHistory.setTbName("tb_mp_" + key); + mPointHistory.setUserid(port); + insertInsert(mPointHistory); } // 推送前端数据 // loggger.info("准备推送前端: topic={} size={} @{}", topic, vueList.size(), date); @@ -146,6 +160,22 @@ public class MqttServiceImpl implements MqttService { } } + @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS) + public void insertInsert(MPointHistory mPointHistory) { + try { + int exist = mPointHistoryService.checkTableExist(mPointHistory.getTbName()); + if (exist == 0) { // 不存在就新建点位表 + loggger.info(String.format("[重要]创建了点位表:%s %s", mPointHistory.getTbName(), DataSourceContextHolder.getDataSourceType())); + mPointHistoryService.createTable(mPointHistory.getTbName()); + } + + mPointHistoryService.save("", mPointHistory); + loggger.info("保存成功一条数据到历史表:{}", mPointHistory); + } catch (Exception e) { + loggger.error("mPointHistoryService.save error :{}", e.getMessage()); + } + } + /** * 校验值是否有效(非空且长度合法) */ diff --git a/src/main/java/com/sipai/service/scada/MPointHistoryService.java b/src/main/java/com/sipai/service/scada/MPointHistoryService.java index 8cf94c1..df13112 100644 --- a/src/main/java/com/sipai/service/scada/MPointHistoryService.java +++ b/src/main/java/com/sipai/service/scada/MPointHistoryService.java @@ -5,7 +5,7 @@ import com.sipai.entity.scada.MPointHistory; import java.util.List; public interface MPointHistoryService { - int save(String bizId, MPointHistory mPointHistory); + Integer save(String bizId, MPointHistory mPointHistory); List selectListByTableAWhere(String bizId, String table, String wherestr); @@ -14,4 +14,8 @@ public interface MPointHistoryService { int saveByCreate(String bizId, MPointHistory mPointHistory); List selectIndustrialLibrary(String bizId, String IP,String MPointID, String intv, String sdt, String edt); + + void createTable(String tbName); + + int checkTableExist(String tbName); } diff --git a/src/main/java/com/sipai/service/scada/impl/MPointBzwServiceImpl.java b/src/main/java/com/sipai/service/scada/impl/MPointBzwServiceImpl.java index a3727a7..0a896e2 100644 --- a/src/main/java/com/sipai/service/scada/impl/MPointBzwServiceImpl.java +++ b/src/main/java/com/sipai/service/scada/impl/MPointBzwServiceImpl.java @@ -3,6 +3,7 @@ package com.sipai.service.scada.impl; import com.sipai.dao.scada.MPointBzwDao; import com.sipai.entity.scada.MPointBzw; import com.sipai.service.scada.MPointBzwService; +import com.sipai.tools.DataSourceEnum; import com.sipai.tools.DataSourceTypeAnno; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; @@ -22,32 +23,32 @@ public class MPointBzwServiceImpl implements MPointBzwService { private MPointBzwDao mPointBzwDao; @Override - @DataSourceTypeAnno() + @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS) public MPointBzw selectByPrimaryKey(String id) { return mPointBzwDao.selectByPrimaryKey(id); } @Override - @DataSourceTypeAnno() + @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS) public Integer deleteByPrimaryKey(String id) { return mPointBzwDao.deleteByPrimaryKey(id); } @Override - @DataSourceTypeAnno() + @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS) public Integer insert(MPointBzw entity) { return mPointBzwDao.insert(entity); } @Async @Override - @DataSourceTypeAnno() + @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS) public Integer updateByPrimaryKeySelective(MPointBzw entity) { return mPointBzwDao.updateByPrimaryKeySelective(entity); } @Override - @DataSourceTypeAnno() + @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS) public List selectListByWhere(String wherestr) { MPointBzw mPoint = new MPointBzw(); mPoint.setWhere(wherestr); @@ -55,7 +56,7 @@ public class MPointBzwServiceImpl implements MPointBzwService { } @Override - @DataSourceTypeAnno() + @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS) public Integer deleteByWhere(String wherestr) { return mPointBzwDao.deleteByWhere(wherestr); } diff --git a/src/main/java/com/sipai/service/scada/impl/MPointHistoryServiceImpl.java b/src/main/java/com/sipai/service/scada/impl/MPointHistoryServiceImpl.java index 5776c1e..1a8eb85 100644 --- a/src/main/java/com/sipai/service/scada/impl/MPointHistoryServiceImpl.java +++ b/src/main/java/com/sipai/service/scada/impl/MPointHistoryServiceImpl.java @@ -5,10 +5,12 @@ import com.sipai.entity.scada.MPointHistory; import com.sipai.service.scada.MPointHistoryService; import com.sipai.tools.CommUtil; import com.sipai.tools.DataSourceContextHolder; +import com.sipai.tools.DataSourceEnum; import com.sipai.tools.DataSourceTypeAnno; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; import java.util.List; @@ -19,11 +21,11 @@ public class MPointHistoryServiceImpl implements MPointHistoryService { @Autowired MPointHistoryDao mPointHistoryDao; - @DataSourceTypeAnno() + @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS) @Override - public int save(String bizId, MPointHistory mPointHistory) { + public Integer save(String bizId, MPointHistory mPointHistory) { try { - int res = mPointHistoryDao.insert(mPointHistory); + Integer res = mPointHistoryDao.insert(mPointHistory); return res; } catch (Exception e) { System.out.println(e); @@ -31,38 +33,43 @@ public class MPointHistoryServiceImpl implements MPointHistoryService { } } - @DataSourceTypeAnno() + @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS) @Override public List selectListByTableAWhere(String bizId, String table, String wherestr) { - return mPointHistoryDao.selectListByTableAWhere(table ,wherestr); + return mPointHistoryDao.selectListByTableAWhere(table, wherestr); } - @DataSourceTypeAnno() + @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS) @Override - public int deleteByTableAWhere(String bizId,String table ,String wherestr) { - return mPointHistoryDao.deleteByTableAWhere(table ,wherestr); + public int deleteByTableAWhere(String bizId, String table, String wherestr) { + return mPointHistoryDao.deleteByTableAWhere(table, wherestr); } - @DataSourceTypeAnno() + @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS) @Override public int saveByCreate(String bizId, MPointHistory mPointHistory) { mPointHistory.setTbName(CommUtil.getMPointTableName(mPointHistory.getTbName())); - if(CommUtil.isBlank(bizId)){ + if (CommUtil.isBlank(bizId)) { return 0; } try { int i = this.mPointHistoryDao.checkTableExist(mPointHistory.getTbName()); - if(i == 0) { - log.info(String.format("[重要]创建了表:%s %s",mPointHistory.getTbName(), DataSourceContextHolder.getDataSourceType())); + if (i == 0) { + log.info(String.format("[重要]创建了表:%s %s", mPointHistory.getTbName(), DataSourceContextHolder.getDataSourceType())); mPointHistoryDao.createTable(mPointHistory.getTbName()); } // 添加时间判断 todo 将来优化 - mPointHistory.setWhere(" where MeasureDT = '" + mPointHistory.getMeasuredt() + "'"); + if (StringUtils.isEmpty(mPointHistory.getMeasuredt())) { + mPointHistory.setWhere(" where MeasureDT IS NULL"); + } else { + mPointHistory.setWhere(" where MeasureDT = '" + mPointHistory.getMeasuredt() + "'"); + } Integer count = this.mPointHistoryDao.selectCount(mPointHistory); - int result = count == 0 ? mPointHistoryDao.insert(mPointHistory) : mPointHistoryDao.updateByMeasureDt(mPointHistory); - return result; +// int result = count == 0 ? mPointHistoryDao.insert(mPointHistory) : mPointHistoryDao.updateByMeasureDt(mPointHistory); +// return result; + return 0; } catch (Exception e) { System.out.println(DataSourceContextHolder.getDataSourceType() + " | " + e); // 返回默认值 @@ -70,10 +77,21 @@ public class MPointHistoryServiceImpl implements MPointHistoryService { } } - @DataSourceTypeAnno() + @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS) @Override public List selectIndustrialLibrary(String bizId, String IP, String MPointID, String intv, String sdt, String edt) { return mPointHistoryDao.selectIndustrialLibrary(IP, MPointID, intv, sdt, edt); } + @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS) + @Override + public void createTable(String tbName) { + mPointHistoryDao.createTable(tbName); + } + + @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS) + @Override + public int checkTableExist(String tbName) { + return mPointHistoryDao.checkTableExist(tbName); + } } diff --git a/src/main/java/com/sipai/service/scada/impl/MPointServiceImpl.java b/src/main/java/com/sipai/service/scada/impl/MPointServiceImpl.java index 05da6e9..5421808 100644 --- a/src/main/java/com/sipai/service/scada/impl/MPointServiceImpl.java +++ b/src/main/java/com/sipai/service/scada/impl/MPointServiceImpl.java @@ -13,6 +13,7 @@ import com.sipai.service.scada.MPointHistoryService; import com.sipai.service.scada.MPointService; import com.sipai.tools.CommString; import com.sipai.tools.CommUtil; +import com.sipai.tools.DataSourceEnum; import com.sipai.tools.DataSourceTypeAnno; import lombok.extern.slf4j.Slf4j; import org.elasticsearch.index.query.BoolQueryBuilder; @@ -81,7 +82,7 @@ public class MPointServiceImpl implements MPointService { return mPointDao.selectByPrimaryKey(id); }*/ - @DataSourceTypeAnno() + @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS) public MPoint selectById(String unitId, String id) { return mPointDao.selectByPrimaryKey(id); } @@ -92,6 +93,7 @@ public class MPointServiceImpl implements MPointService { * @param id * @return */ + @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS) @Override public MPoint selectById4Es(String id) { if (id == null || id.isEmpty()) { @@ -122,7 +124,7 @@ public class MPointServiceImpl implements MPointService { } - // @DataSourceTypeAnno() + @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS) @Override public List selectListByWhere(String wherestr) { MPoint mpoint = new MPoint(); @@ -145,6 +147,7 @@ public class MPointServiceImpl implements MPointService { } @Override + @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS) public List selectListByES(Map map, Integer page, Integer rows) { List mpoints = new ArrayList<>(); try { @@ -187,7 +190,7 @@ public class MPointServiceImpl implements MPointService { return mpoints; } - // @DataSourceTypeAnno() + @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS) @Override public int update(MPoint entity) { try { @@ -199,7 +202,7 @@ public class MPointServiceImpl implements MPointService { } } - @DataSourceTypeAnno() + @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS) @Override public int updateValueByKey(String bizId, MPoint entity) { try { @@ -248,6 +251,7 @@ public class MPointServiceImpl implements MPointService { * @param entity * @return */ + @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS) public int saveInfluxdb(String bizId, MPointHistory entity) { Map fields = new HashMap<>(); Map tags = new HashMap<>(); @@ -261,10 +265,10 @@ public class MPointServiceImpl implements MPointService { long time = Calendar.getInstance().getTimeInMillis(); try { fields.put("parmvalue", Float.parseFloat(entity.getParmvalue().toString())); - time = longDateFormat.parse(entity.getMeasuredt()).getTime(); +// time = longDateFormat.parse(entity.getMeasuredt()).getTime(); // influxDBConfig.insert("tb_mp_" + entity.getTbName(), time, tags, fields); // System.out.println("===插入数据成功===" + entity.getTbName() + "===" + Float.parseFloat(entity.getParmvalue().toString())); - } catch (ParseException e) { + } catch (Exception e) { e.printStackTrace(); } return 1; @@ -276,6 +280,7 @@ public class MPointServiceImpl implements MPointService { * @param mPointES */ // @Async + @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS) @Override public void saveAsync(List mPointES) { this.mPointRepo.batchUpdate(mPointES); @@ -286,6 +291,7 @@ public class MPointServiceImpl implements MPointService { * * @return */ + @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS) public List selectListByWhere4Es(NativeSearchQueryBuilder nativeSearchQueryBuilder) { SearchQuery searchQuery = nativeSearchQueryBuilder.build(); Page mPage = mPointRepo.search(searchQuery); @@ -312,6 +318,7 @@ public class MPointServiceImpl implements MPointService { * @return * @throws Exception */ + @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS) public static String formatTimeEight(String time) throws Exception { Date d = null; SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @@ -321,6 +328,7 @@ public class MPointServiceImpl implements MPointService { return newtime; } + @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS) public void sendKafka4OpcUa(String id, Double value, String time) { JSONObject message = new JSONObject(); message.put("id", id); @@ -365,6 +373,7 @@ public class MPointServiceImpl implements MPointService { } } + @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS) @Override public void saveRedis(String mpcode, Object value, String time) { RBatch batch = redissonClient.createBatch(); @@ -376,7 +385,8 @@ public class MPointServiceImpl implements MPointService { batch.execute(); } - public void sendKafka4UDP(String unitId,String mpcode, Object value, String timePoint, int i, int length) { + @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS) + public void sendKafka4UDP(String unitId, String mpcode, Object value, String timePoint, int i, int length) { JSONObject message = new JSONObject(); message.put("id", mpcode); @@ -408,7 +418,7 @@ public class MPointServiceImpl implements MPointService { try { MPointHistory mPointHistory = new MPointHistory(); mPointHistory.setParmvalue(CommUtil.toBigDecimal(value)); - mPointHistory.setMeasuredt(timePoint); +// mPointHistory.setMeasuredt(timePoint); mPointHistory.setTbName(mpcode); mPointHistory.setUserid("datacollector"); mPointHistory.setInsdt(CommUtil.nowDate()); @@ -421,7 +431,7 @@ public class MPointServiceImpl implements MPointService { //更新最后一条数据到总表 if (i == length - 1) { try { - MPoint mPoint = mPointService.selectById(unitId,mpcode); + MPoint mPoint = mPointService.selectById(unitId, mpcode); if (mPoint != null) { mPoint.setParmvalue(CommUtil.toBigDecimal(value)); mPoint.setMeasuredt(timePoint); @@ -437,14 +447,15 @@ public class MPointServiceImpl implements MPointService { } // @Async + @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS) @Override - public void sendKafka4MQTT(String unitId,String mpcode, Object value, String timePoint, int i, int length, String signaltype) { + public void sendKafka4MQTT(String unitId, String mpcode, Object value, String timePoint, int i, int length, String signaltype) { //写子表 if (signaltype != null && signaltype.equals("DI")) { try { MPointHistory mPointHistory = new MPointHistory(); mPointHistory.setParmvalue(CommUtil.toBigDecimal(value)); - mPointHistory.setMeasuredt(timePoint); +// mPointHistory.setMeasuredt(timePoint); mPointHistory.setTbName(mpcode); mPointHistory.setUserid("datacollector"); mPointHistory.setInsdt(CommUtil.nowDate()); @@ -457,7 +468,7 @@ public class MPointServiceImpl implements MPointService { //更新最后一条数据到总表 if (i == length - 1) { try { - MPoint mPoint = mPointService.selectById(unitId,mpcode); + MPoint mPoint = mPointService.selectById(unitId, mpcode); if (mPoint != null) { mPoint.setParmvalue(CommUtil.toBigDecimal(value)); mPoint.setMeasuredt(timePoint); @@ -470,6 +481,7 @@ public class MPointServiceImpl implements MPointService { } // @Async + @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS) @Override public void sendKafka4MQTT_Alarm(String mpcode, Object value, String timePoint, int i, int length) { JSONObject message = new JSONObject(); diff --git a/src/main/java/com/sipai/tools/CommUtil.java b/src/main/java/com/sipai/tools/CommUtil.java index 4589980..0b4e808 100644 --- a/src/main/java/com/sipai/tools/CommUtil.java +++ b/src/main/java/com/sipai/tools/CommUtil.java @@ -181,7 +181,7 @@ public class CommUtil { format = "yyyy-MM-dd HH:mm:ss"; } SimpleDateFormat sdf = new SimpleDateFormat(format); - return sdf.format(new Date(Long.valueOf(seconds + "000"))); + return sdf.format(new Date(Long.valueOf(seconds))); } /** diff --git a/src/main/java/com/sipai/tools/DataSourceAspect.java b/src/main/java/com/sipai/tools/DataSourceAspect.java index b525273..9c83552 100644 --- a/src/main/java/com/sipai/tools/DataSourceAspect.java +++ b/src/main/java/com/sipai/tools/DataSourceAspect.java @@ -29,23 +29,23 @@ public class DataSourceAspect { MethodSignature methodSignature = (MethodSignature) pjp.getSignature(); Method method = methodSignature.getMethod(); DataSourceTypeAnno typeAnno = method.getAnnotation(DataSourceTypeAnno.class); - DataSourceEnum sourceEnum = typeAnno.value(); - + DataSourceEnum sourceEnum = typeAnno.value(); Object[] argusObjects = pjp.getArgs(); Object argus = argusObjects[0]; - -// if (argus == DataSourceEnum.master) { -// DataSourceContextHolder.setDataSourceType(DataSourceEnum.master); -// } else if (sourceEnum == DataSourceEnum.slaver) { -// DataSourceContextHolder.setDataSourceType(DataSourceEnum.slaver); -// } - - if (argus != null && !argus.equals("") && argusObjects.length >= 2) { - //DataSourceContextHolder.setDataSourceType(DataSourceEnum.valueOf("SCADA_" + argus.toString())); - DataSourceContextHolder.setDataSourceType(DataSourceEnum.valueOf("SCADA_" + argus.toString())); - } else { + String packageName = method.getDeclaringClass().getName(); + if (sourceEnum == DataSourceEnum.SCADA_JSWS || packageName.contains(".scada.")) { + DataSourceContextHolder.setDataSourceType(DataSourceEnum.SCADA_JSWS); + } else { DataSourceContextHolder.setDataSourceType(DataSourceEnum.master); } + +// if (argus != null && !argus.equals("") && argusObjects.length >= 2) { +// //DataSourceContextHolder.setDataSourceType(DataSourceEnum.valueOf("SCADA_" + argus.toString())); +// DataSourceContextHolder.setDataSourceType(DataSourceEnum.valueOf("SCADA_" + argus.toString())); +// } else { +// DataSourceContextHolder.setDataSourceType(DataSourceEnum.master); +// } + Object result = null; try { result = pjp.proceed(); diff --git a/src/main/java/com/sipai/tools/DataSourceTypeAnno.java b/src/main/java/com/sipai/tools/DataSourceTypeAnno.java index c9988bb..d38b9ce 100644 --- a/src/main/java/com/sipai/tools/DataSourceTypeAnno.java +++ b/src/main/java/com/sipai/tools/DataSourceTypeAnno.java @@ -13,4 +13,5 @@ import java.lang.annotation.Target; public @interface DataSourceTypeAnno { //使用方式在service层方法上添加@DataSourceTypeAnno(DataSourceEnum.数据源枚举类型)用于指定所使用的数据源 DataSourceEnum value() default DataSourceEnum.master; +// DataSourceEnum value() default DataSourceEnum.SCADA_JSWS; } diff --git a/src/main/resources/mybatis/mapper/scada/MPointHistoryMapper.xml b/src/main/resources/mybatis/mapper/scada/MPointHistoryMapper.xml index bc773ec..3fec638 100644 --- a/src/main/resources/mybatis/mapper/scada/MPointHistoryMapper.xml +++ b/src/main/resources/mybatis/mapper/scada/MPointHistoryMapper.xml @@ -12,7 +12,7 @@ - insert into `${tbName}` (ParmValue, MeasureDT, + insert into ${tbName} (ParmValue, MeasureDT, memotype, memo, userid, insdt) values (#{parmvalue,jdbcType=DECIMAL}, #{measuredt,jdbcType=TIMESTAMP}, @@ -39,27 +39,26 @@ - CREATE TABLE `${table}` - ( - `ItemID` bigint NOT NULL AUTO_INCREMENT, - `ParmValue` decimal(18, 4) NOT NULL, - `MeasureDT` datetime NOT NULL, - `memotype` varchar(100) NULL, - `memo` varchar(50) NULL, - `userid` varchar(50) NULL, - `insdt` datetime NULL ON UPDATE CURRENT_TIMESTAMP, - PRIMARY KEY (`ItemID`), - INDEX `index_dt`(`MeasureDT`) + CREATE TABLE ${table} ( + ItemID bigint IDENTITY(1,1) NOT NULL, + ParmValue DECIMAL(18, 4) NOT NULL, + MeasureDT datetime NOT NULL, + memotype VARCHAR(100) NULL, + memo VARCHAR(50) NULL, + userid VARCHAR(50) NULL, + insdt datetime NULL, + CONSTRAINT PK_${table} PRIMARY KEY (ItemID) ); + CREATE INDEX index_dt ON ${table} (MeasureDT); - update `${tbName}` + update ${tbName} set ParmValue = #{parmvalue,jdbcType=DECIMAL}, memotype = #{memotype,jdbcType=VARCHAR} where MeasureDT = #{measuredt,jdbcType=TIMESTAMP} diff --git a/src/main/resources/mybatis/mapper/scada/MPointMapper.xml b/src/main/resources/mybatis/mapper/scada/MPointMapper.xml index 6f390d1..2b7a414 100644 --- a/src/main/resources/mybatis/mapper/scada/MPointMapper.xml +++ b/src/main/resources/mybatis/mapper/scada/MPointMapper.xml @@ -682,7 +682,7 @@