mqtt初始化链接调试

This commit is contained in:
Timer
2026-02-26 01:08:27 +08:00
parent 0093cc70dd
commit 2ffec8d3a6
12 changed files with 163 additions and 76 deletions

View File

@ -3,6 +3,7 @@ package com.sipai.entity.scada;
import com.sipai.entity.base.SQLAdapter; import com.sipai.entity.base.SQLAdapter;
import java.io.Serializable; import java.io.Serializable;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.util.Date;
public class MPointHistory extends SQLAdapter implements Serializable{ public class MPointHistory extends SQLAdapter implements Serializable{
/** /**
@ -151,4 +152,24 @@ public class MPointHistory extends SQLAdapter implements Serializable{
return serialVersionUID; return serialVersionUID;
} }
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("MPointHistory{");
sb.append("itemid=").append(itemid);
sb.append(", parmvalue=").append(parmvalue);
sb.append(", measuredt='").append(measuredt).append('\'');
sb.append(", memotype='").append(memotype).append('\'');
sb.append(", memo='").append(memo).append('\'');
sb.append(", userid='").append(userid).append('\'');
sb.append(", insdt='").append(insdt).append('\'');
sb.append(", tbName='").append(tbName).append('\'');
sb.append(", tag=").append(tag);
sb.append(", year='").append(year).append('\'');
sb.append(", month='").append(month).append('\'');
sb.append(", day='").append(day).append('\'');
sb.append(", hour='").append(hour).append('\'');
sb.append(", min='").append(min).append('\'');
sb.append('}');
return sb.toString();
}
} }

View File

@ -47,21 +47,22 @@ public class DataSynJob {
} }
@Async @Async
// @Scheduled(cron = "0 0/3 * * * ?")//数据转发 @Scheduled(cron = "0 0 0/1 * * ?")//数据转发
public void job2() { public void job2() {
if (!scheduledEnabled) return; // 手动拦截 if (!scheduledEnabled) return; // 手动拦截
String addstr = "zhuanfa"; String addstr = "zhuanfa";
System.out.println("开始定时器-----------------" + CommUtil.nowDate() + "-----------------" + addstr); System.out.println("开始定时器-----------------" + CommUtil.nowDate() + "-----------------" + addstr);
List<MPoint> list2 = this.mPointService.selectListByWhere("where bizid = '0791CNWS' and SignalType='AI' and MeasureDT>DATE_SUB(NOW(), INTERVAL 5 MINUTE) and source_type='auto' "); // List<MPoint> list2 = this.mPointService.selectListByWhere("where bizid = '0533JS' and SignalType='AI' and MeasureDT>DATE_SUB(NOW(), INTERVAL 5 MINUTE) and source_type='auto' ");
if (list2 != null && list2.size() > 0) { List<MPoint> list2 = this.mPointService.selectListByWhere("where bizid = '0533JS' and SignalType='AI' and source_type='auto' ");
for (int j = 0; j < list2.size(); ++j) { if (list2 != null && !list2.isEmpty()) {
for (MPoint mPoint : list2) {
MPointHistory mPointHistory = new MPointHistory(); MPointHistory mPointHistory = new MPointHistory();
mPointHistory.setParmvalue(list2.get(j).getParmvalue()); mPointHistory.setParmvalue(mPoint.getParmvalue());
mPointHistory.setMeasuredt(list2.get(j).getMeasuredt()); mPointHistory.setMeasuredt(mPoint.getMeasuredt());
mPointHistory.setTbName(list2.get(j).getMpointcode()); mPointHistory.setTbName(mPoint.getMpointcode());
mPointHistory.setUserid("data_job"); mPointHistory.setUserid("data_job");
mPointHistory.setInsdt(CommUtil.nowDate()); mPointHistory.setInsdt(CommUtil.nowDate());
mPointHistoryService.saveByCreate(list2.get(j).getBizid(), mPointHistory); mPointHistoryService.saveByCreate(mPoint.getBizid(), mPointHistory);
} }
System.out.println("完成一次(有数据)转发,执行了" + list2.size() + "次点"); System.out.println("完成一次(有数据)转发,执行了" + list2.size() + "次点");
} else { } else {

View File

@ -2,6 +2,7 @@ package com.sipai.service.mqtt.impl;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.sipai.dao.scada.MPointHistoryDao;
import com.sipai.entity.mqtt.Mqtt; import com.sipai.entity.mqtt.Mqtt;
import com.sipai.entity.mqtt.MqttProperties; import com.sipai.entity.mqtt.MqttProperties;
import com.sipai.entity.scada.MPoint; import com.sipai.entity.scada.MPoint;
@ -29,10 +30,9 @@ import org.springframework.stereotype.Service;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.math.RoundingMode; import java.math.RoundingMode;
import java.util.ArrayList; import java.time.LocalDateTime;
import java.util.List; import java.time.format.DateTimeFormatter;
import java.util.Map; import java.util.*;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -49,6 +49,10 @@ public class MqttServiceImpl implements MqttService {
private ListenerPointService listenerPointService; private ListenerPointService listenerPointService;
@Autowired @Autowired
private RedissonClient redissonClient; private RedissonClient redissonClient;
@Autowired
private MPointHistoryService mPointHistoryService;
@Autowired
MPointHistoryDao mPointHistoryDao;
private static MqttClient mqttClient; private static MqttClient mqttClient;
private static String ipStr = ""; private static String ipStr = "";
@ -90,10 +94,13 @@ public class MqttServiceImpl implements MqttService {
return mqttClient; return mqttClient;
} }
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@Async("getAsyncMqttHandle") @Async("getAsyncMqttHandle")
@Override @Override
public void doHandle(String bizId, String topic, String ip4, String port, JSONArray jsonArray) { public void doHandle(String bizId, String topic, String ip4, String port, JSONArray jsonArray) {
System.out.println("MQTT接收: biz=" + bizId + " topic=" + topic + " msgCount=" + jsonArray.size()); System.out.println("MQTT接收: biz=" + bizId + " topic=" + topic + " msgCount=" + jsonArray.size());
// MPointHistoryService mPointHistoryService = (MPointHistoryService) SpringContextUtil.getBean("mPointHistoryService");
for (int i = 0; i < jsonArray.size(); i++) { for (int i = 0; i < jsonArray.size(); i++) {
JSONObject jsonObject = jsonArray.getJSONObject(i); JSONObject jsonObject = jsonArray.getJSONObject(i);
// 解析时间戳 // 解析时间戳
@ -135,6 +142,13 @@ public class MqttServiceImpl implements MqttService {
// 处理ES数据 // 处理ES数据
MPointES esPoint = buildEsPoint(mPoint, value, date); MPointES esPoint = buildEsPoint(mPoint, value, date);
redisList.add(esPoint); redisList.add(esPoint);
MPointHistory mPointHistory = new MPointHistory();
mPointHistory.setMeasuredt(date);
mPointHistory.setParmvalue(value);
mPointHistory.setTbName("tb_mp_" + key);
mPointHistory.setUserid(port);
insertInsert(mPointHistory);
} }
// 推送前端数据 // 推送前端数据
// loggger.info("准备推送前端: topic={} size={} @{}", topic, vueList.size(), date); // loggger.info("准备推送前端: topic={} size={} @{}", topic, vueList.size(), date);
@ -146,6 +160,22 @@ public class MqttServiceImpl implements MqttService {
} }
} }
@DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS)
public void insertInsert(MPointHistory mPointHistory) {
try {
int exist = mPointHistoryService.checkTableExist(mPointHistory.getTbName());
if (exist == 0) { // 不存在就新建点位表
loggger.info(String.format("[重要]创建了点位表:%s %s", mPointHistory.getTbName(), DataSourceContextHolder.getDataSourceType()));
mPointHistoryService.createTable(mPointHistory.getTbName());
}
mPointHistoryService.save("", mPointHistory);
loggger.info("保存成功一条数据到历史表:{}", mPointHistory);
} catch (Exception e) {
loggger.error("mPointHistoryService.save error :{}", e.getMessage());
}
}
/** /**
* 校验值是否有效(非空且长度合法) * 校验值是否有效(非空且长度合法)
*/ */

View File

@ -5,7 +5,7 @@ import com.sipai.entity.scada.MPointHistory;
import java.util.List; import java.util.List;
public interface MPointHistoryService { public interface MPointHistoryService {
int save(String bizId, MPointHistory mPointHistory); Integer save(String bizId, MPointHistory mPointHistory);
List<MPointHistory> selectListByTableAWhere(String bizId, String table, String wherestr); List<MPointHistory> selectListByTableAWhere(String bizId, String table, String wherestr);
@ -14,4 +14,8 @@ public interface MPointHistoryService {
int saveByCreate(String bizId, MPointHistory mPointHistory); int saveByCreate(String bizId, MPointHistory mPointHistory);
List<MPointHistory> selectIndustrialLibrary(String bizId, String IP,String MPointID, String intv, String sdt, String edt); List<MPointHistory> selectIndustrialLibrary(String bizId, String IP,String MPointID, String intv, String sdt, String edt);
void createTable(String tbName);
int checkTableExist(String tbName);
} }

View File

@ -3,6 +3,7 @@ package com.sipai.service.scada.impl;
import com.sipai.dao.scada.MPointBzwDao; import com.sipai.dao.scada.MPointBzwDao;
import com.sipai.entity.scada.MPointBzw; import com.sipai.entity.scada.MPointBzw;
import com.sipai.service.scada.MPointBzwService; import com.sipai.service.scada.MPointBzwService;
import com.sipai.tools.DataSourceEnum;
import com.sipai.tools.DataSourceTypeAnno; import com.sipai.tools.DataSourceTypeAnno;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
@ -22,32 +23,32 @@ public class MPointBzwServiceImpl implements MPointBzwService {
private MPointBzwDao mPointBzwDao; private MPointBzwDao mPointBzwDao;
@Override @Override
@DataSourceTypeAnno() @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS)
public MPointBzw selectByPrimaryKey(String id) { public MPointBzw selectByPrimaryKey(String id) {
return mPointBzwDao.selectByPrimaryKey(id); return mPointBzwDao.selectByPrimaryKey(id);
} }
@Override @Override
@DataSourceTypeAnno() @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS)
public Integer deleteByPrimaryKey(String id) { public Integer deleteByPrimaryKey(String id) {
return mPointBzwDao.deleteByPrimaryKey(id); return mPointBzwDao.deleteByPrimaryKey(id);
} }
@Override @Override
@DataSourceTypeAnno() @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS)
public Integer insert(MPointBzw entity) { public Integer insert(MPointBzw entity) {
return mPointBzwDao.insert(entity); return mPointBzwDao.insert(entity);
} }
@Async @Async
@Override @Override
@DataSourceTypeAnno() @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS)
public Integer updateByPrimaryKeySelective(MPointBzw entity) { public Integer updateByPrimaryKeySelective(MPointBzw entity) {
return mPointBzwDao.updateByPrimaryKeySelective(entity); return mPointBzwDao.updateByPrimaryKeySelective(entity);
} }
@Override @Override
@DataSourceTypeAnno() @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS)
public List<MPointBzw> selectListByWhere(String wherestr) { public List<MPointBzw> selectListByWhere(String wherestr) {
MPointBzw mPoint = new MPointBzw(); MPointBzw mPoint = new MPointBzw();
mPoint.setWhere(wherestr); mPoint.setWhere(wherestr);
@ -55,7 +56,7 @@ public class MPointBzwServiceImpl implements MPointBzwService {
} }
@Override @Override
@DataSourceTypeAnno() @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS)
public Integer deleteByWhere(String wherestr) { public Integer deleteByWhere(String wherestr) {
return mPointBzwDao.deleteByWhere(wherestr); return mPointBzwDao.deleteByWhere(wherestr);
} }

View File

@ -5,10 +5,12 @@ import com.sipai.entity.scada.MPointHistory;
import com.sipai.service.scada.MPointHistoryService; import com.sipai.service.scada.MPointHistoryService;
import com.sipai.tools.CommUtil; import com.sipai.tools.CommUtil;
import com.sipai.tools.DataSourceContextHolder; import com.sipai.tools.DataSourceContextHolder;
import com.sipai.tools.DataSourceEnum;
import com.sipai.tools.DataSourceTypeAnno; import com.sipai.tools.DataSourceTypeAnno;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.util.List; import java.util.List;
@ -19,11 +21,11 @@ public class MPointHistoryServiceImpl implements MPointHistoryService {
@Autowired @Autowired
MPointHistoryDao mPointHistoryDao; MPointHistoryDao mPointHistoryDao;
@DataSourceTypeAnno() @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS)
@Override @Override
public int save(String bizId, MPointHistory mPointHistory) { public Integer save(String bizId, MPointHistory mPointHistory) {
try { try {
int res = mPointHistoryDao.insert(mPointHistory); Integer res = mPointHistoryDao.insert(mPointHistory);
return res; return res;
} catch (Exception e) { } catch (Exception e) {
System.out.println(e); System.out.println(e);
@ -31,38 +33,43 @@ public class MPointHistoryServiceImpl implements MPointHistoryService {
} }
} }
@DataSourceTypeAnno() @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS)
@Override @Override
public List<MPointHistory> selectListByTableAWhere(String bizId, String table, String wherestr) { public List<MPointHistory> selectListByTableAWhere(String bizId, String table, String wherestr) {
return mPointHistoryDao.selectListByTableAWhere(table ,wherestr); return mPointHistoryDao.selectListByTableAWhere(table, wherestr);
} }
@DataSourceTypeAnno() @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS)
@Override @Override
public int deleteByTableAWhere(String bizId,String table ,String wherestr) { public int deleteByTableAWhere(String bizId, String table, String wherestr) {
return mPointHistoryDao.deleteByTableAWhere(table ,wherestr); return mPointHistoryDao.deleteByTableAWhere(table, wherestr);
} }
@DataSourceTypeAnno() @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS)
@Override @Override
public int saveByCreate(String bizId, MPointHistory mPointHistory) { public int saveByCreate(String bizId, MPointHistory mPointHistory) {
mPointHistory.setTbName(CommUtil.getMPointTableName(mPointHistory.getTbName())); mPointHistory.setTbName(CommUtil.getMPointTableName(mPointHistory.getTbName()));
if(CommUtil.isBlank(bizId)){ if (CommUtil.isBlank(bizId)) {
return 0; return 0;
} }
try { try {
int i = this.mPointHistoryDao.checkTableExist(mPointHistory.getTbName()); int i = this.mPointHistoryDao.checkTableExist(mPointHistory.getTbName());
if(i == 0) { if (i == 0) {
log.info(String.format("[重要]创建了表:%s %s",mPointHistory.getTbName(), DataSourceContextHolder.getDataSourceType())); log.info(String.format("[重要]创建了表:%s %s", mPointHistory.getTbName(), DataSourceContextHolder.getDataSourceType()));
mPointHistoryDao.createTable(mPointHistory.getTbName()); mPointHistoryDao.createTable(mPointHistory.getTbName());
} }
// 添加时间判断 todo 将来优化 // 添加时间判断 todo 将来优化
mPointHistory.setWhere(" where MeasureDT = '" + mPointHistory.getMeasuredt() + "'"); if (StringUtils.isEmpty(mPointHistory.getMeasuredt())) {
mPointHistory.setWhere(" where MeasureDT IS NULL");
} else {
mPointHistory.setWhere(" where MeasureDT = '" + mPointHistory.getMeasuredt() + "'");
}
Integer count = this.mPointHistoryDao.selectCount(mPointHistory); Integer count = this.mPointHistoryDao.selectCount(mPointHistory);
int result = count == 0 ? mPointHistoryDao.insert(mPointHistory) : mPointHistoryDao.updateByMeasureDt(mPointHistory); // int result = count == 0 ? mPointHistoryDao.insert(mPointHistory) : mPointHistoryDao.updateByMeasureDt(mPointHistory);
return result; // return result;
return 0;
} catch (Exception e) { } catch (Exception e) {
System.out.println(DataSourceContextHolder.getDataSourceType() + " | " + e); System.out.println(DataSourceContextHolder.getDataSourceType() + " | " + e);
// 返回默认值 // 返回默认值
@ -70,10 +77,21 @@ public class MPointHistoryServiceImpl implements MPointHistoryService {
} }
} }
@DataSourceTypeAnno() @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS)
@Override @Override
public List<MPointHistory> selectIndustrialLibrary(String bizId, String IP, String MPointID, String intv, String sdt, String edt) { public List<MPointHistory> selectIndustrialLibrary(String bizId, String IP, String MPointID, String intv, String sdt, String edt) {
return mPointHistoryDao.selectIndustrialLibrary(IP, MPointID, intv, sdt, edt); return mPointHistoryDao.selectIndustrialLibrary(IP, MPointID, intv, sdt, edt);
} }
@DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS)
@Override
public void createTable(String tbName) {
mPointHistoryDao.createTable(tbName);
}
@DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS)
@Override
public int checkTableExist(String tbName) {
return mPointHistoryDao.checkTableExist(tbName);
}
} }

View File

@ -13,6 +13,7 @@ import com.sipai.service.scada.MPointHistoryService;
import com.sipai.service.scada.MPointService; import com.sipai.service.scada.MPointService;
import com.sipai.tools.CommString; import com.sipai.tools.CommString;
import com.sipai.tools.CommUtil; import com.sipai.tools.CommUtil;
import com.sipai.tools.DataSourceEnum;
import com.sipai.tools.DataSourceTypeAnno; import com.sipai.tools.DataSourceTypeAnno;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.BoolQueryBuilder;
@ -81,7 +82,7 @@ public class MPointServiceImpl implements MPointService {
return mPointDao.selectByPrimaryKey(id); return mPointDao.selectByPrimaryKey(id);
}*/ }*/
@DataSourceTypeAnno() @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS)
public MPoint selectById(String unitId, String id) { public MPoint selectById(String unitId, String id) {
return mPointDao.selectByPrimaryKey(id); return mPointDao.selectByPrimaryKey(id);
} }
@ -92,6 +93,7 @@ public class MPointServiceImpl implements MPointService {
* @param id * @param id
* @return * @return
*/ */
@DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS)
@Override @Override
public MPoint selectById4Es(String id) { public MPoint selectById4Es(String id) {
if (id == null || id.isEmpty()) { if (id == null || id.isEmpty()) {
@ -122,7 +124,7 @@ public class MPointServiceImpl implements MPointService {
} }
// @DataSourceTypeAnno() @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS)
@Override @Override
public List<MPoint> selectListByWhere(String wherestr) { public List<MPoint> selectListByWhere(String wherestr) {
MPoint mpoint = new MPoint(); MPoint mpoint = new MPoint();
@ -145,6 +147,7 @@ public class MPointServiceImpl implements MPointService {
} }
@Override @Override
@DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS)
public List<MPoint> selectListByES(Map<String, Object> map, Integer page, Integer rows) { public List<MPoint> selectListByES(Map<String, Object> map, Integer page, Integer rows) {
List<MPoint> mpoints = new ArrayList<>(); List<MPoint> mpoints = new ArrayList<>();
try { try {
@ -187,7 +190,7 @@ public class MPointServiceImpl implements MPointService {
return mpoints; return mpoints;
} }
// @DataSourceTypeAnno() @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS)
@Override @Override
public int update(MPoint entity) { public int update(MPoint entity) {
try { try {
@ -199,7 +202,7 @@ public class MPointServiceImpl implements MPointService {
} }
} }
@DataSourceTypeAnno() @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS)
@Override @Override
public int updateValueByKey(String bizId, MPoint entity) { public int updateValueByKey(String bizId, MPoint entity) {
try { try {
@ -248,6 +251,7 @@ public class MPointServiceImpl implements MPointService {
* @param entity * @param entity
* @return * @return
*/ */
@DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS)
public int saveInfluxdb(String bizId, MPointHistory entity) { public int saveInfluxdb(String bizId, MPointHistory entity) {
Map<String, Object> fields = new HashMap<>(); Map<String, Object> fields = new HashMap<>();
Map<String, String> tags = new HashMap<>(); Map<String, String> tags = new HashMap<>();
@ -261,10 +265,10 @@ public class MPointServiceImpl implements MPointService {
long time = Calendar.getInstance().getTimeInMillis(); long time = Calendar.getInstance().getTimeInMillis();
try { try {
fields.put("parmvalue", Float.parseFloat(entity.getParmvalue().toString())); fields.put("parmvalue", Float.parseFloat(entity.getParmvalue().toString()));
time = longDateFormat.parse(entity.getMeasuredt()).getTime(); // time = longDateFormat.parse(entity.getMeasuredt()).getTime();
// influxDBConfig.insert("tb_mp_" + entity.getTbName(), time, tags, fields); // influxDBConfig.insert("tb_mp_" + entity.getTbName(), time, tags, fields);
// System.out.println("===插入数据成功===" + entity.getTbName() + "===" + Float.parseFloat(entity.getParmvalue().toString())); // System.out.println("===插入数据成功===" + entity.getTbName() + "===" + Float.parseFloat(entity.getParmvalue().toString()));
} catch (ParseException e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
return 1; return 1;
@ -276,6 +280,7 @@ public class MPointServiceImpl implements MPointService {
* @param mPointES * @param mPointES
*/ */
// @Async // @Async
@DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS)
@Override @Override
public void saveAsync(List<MPointES> mPointES) { public void saveAsync(List<MPointES> mPointES) {
this.mPointRepo.batchUpdate(mPointES); this.mPointRepo.batchUpdate(mPointES);
@ -286,6 +291,7 @@ public class MPointServiceImpl implements MPointService {
* *
* @return * @return
*/ */
@DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS)
public List<MPoint> selectListByWhere4Es(NativeSearchQueryBuilder nativeSearchQueryBuilder) { public List<MPoint> selectListByWhere4Es(NativeSearchQueryBuilder nativeSearchQueryBuilder) {
SearchQuery searchQuery = nativeSearchQueryBuilder.build(); SearchQuery searchQuery = nativeSearchQueryBuilder.build();
Page<MPoint> mPage = mPointRepo.search(searchQuery); Page<MPoint> mPage = mPointRepo.search(searchQuery);
@ -312,6 +318,7 @@ public class MPointServiceImpl implements MPointService {
* @return * @return
* @throws Exception * @throws Exception
*/ */
@DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS)
public static String formatTimeEight(String time) throws Exception { public static String formatTimeEight(String time) throws Exception {
Date d = null; Date d = null;
SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@ -321,6 +328,7 @@ public class MPointServiceImpl implements MPointService {
return newtime; return newtime;
} }
@DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS)
public void sendKafka4OpcUa(String id, Double value, String time) { public void sendKafka4OpcUa(String id, Double value, String time) {
JSONObject message = new JSONObject(); JSONObject message = new JSONObject();
message.put("id", id); message.put("id", id);
@ -365,6 +373,7 @@ public class MPointServiceImpl implements MPointService {
} }
} }
@DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS)
@Override @Override
public void saveRedis(String mpcode, Object value, String time) { public void saveRedis(String mpcode, Object value, String time) {
RBatch batch = redissonClient.createBatch(); RBatch batch = redissonClient.createBatch();
@ -376,7 +385,8 @@ public class MPointServiceImpl implements MPointService {
batch.execute(); batch.execute();
} }
public void sendKafka4UDP(String unitId,String mpcode, Object value, String timePoint, int i, int length) { @DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS)
public void sendKafka4UDP(String unitId, String mpcode, Object value, String timePoint, int i, int length) {
JSONObject message = new JSONObject(); JSONObject message = new JSONObject();
message.put("id", mpcode); message.put("id", mpcode);
@ -408,7 +418,7 @@ public class MPointServiceImpl implements MPointService {
try { try {
MPointHistory mPointHistory = new MPointHistory(); MPointHistory mPointHistory = new MPointHistory();
mPointHistory.setParmvalue(CommUtil.toBigDecimal(value)); mPointHistory.setParmvalue(CommUtil.toBigDecimal(value));
mPointHistory.setMeasuredt(timePoint); // mPointHistory.setMeasuredt(timePoint);
mPointHistory.setTbName(mpcode); mPointHistory.setTbName(mpcode);
mPointHistory.setUserid("datacollector"); mPointHistory.setUserid("datacollector");
mPointHistory.setInsdt(CommUtil.nowDate()); mPointHistory.setInsdt(CommUtil.nowDate());
@ -421,7 +431,7 @@ public class MPointServiceImpl implements MPointService {
//更新最后一条数据到总表 //更新最后一条数据到总表
if (i == length - 1) { if (i == length - 1) {
try { try {
MPoint mPoint = mPointService.selectById(unitId,mpcode); MPoint mPoint = mPointService.selectById(unitId, mpcode);
if (mPoint != null) { if (mPoint != null) {
mPoint.setParmvalue(CommUtil.toBigDecimal(value)); mPoint.setParmvalue(CommUtil.toBigDecimal(value));
mPoint.setMeasuredt(timePoint); mPoint.setMeasuredt(timePoint);
@ -437,14 +447,15 @@ public class MPointServiceImpl implements MPointService {
} }
// @Async // @Async
@DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS)
@Override @Override
public void sendKafka4MQTT(String unitId,String mpcode, Object value, String timePoint, int i, int length, String signaltype) { public void sendKafka4MQTT(String unitId, String mpcode, Object value, String timePoint, int i, int length, String signaltype) {
//写子表 //写子表
if (signaltype != null && signaltype.equals("DI")) { if (signaltype != null && signaltype.equals("DI")) {
try { try {
MPointHistory mPointHistory = new MPointHistory(); MPointHistory mPointHistory = new MPointHistory();
mPointHistory.setParmvalue(CommUtil.toBigDecimal(value)); mPointHistory.setParmvalue(CommUtil.toBigDecimal(value));
mPointHistory.setMeasuredt(timePoint); // mPointHistory.setMeasuredt(timePoint);
mPointHistory.setTbName(mpcode); mPointHistory.setTbName(mpcode);
mPointHistory.setUserid("datacollector"); mPointHistory.setUserid("datacollector");
mPointHistory.setInsdt(CommUtil.nowDate()); mPointHistory.setInsdt(CommUtil.nowDate());
@ -457,7 +468,7 @@ public class MPointServiceImpl implements MPointService {
//更新最后一条数据到总表 //更新最后一条数据到总表
if (i == length - 1) { if (i == length - 1) {
try { try {
MPoint mPoint = mPointService.selectById(unitId,mpcode); MPoint mPoint = mPointService.selectById(unitId, mpcode);
if (mPoint != null) { if (mPoint != null) {
mPoint.setParmvalue(CommUtil.toBigDecimal(value)); mPoint.setParmvalue(CommUtil.toBigDecimal(value));
mPoint.setMeasuredt(timePoint); mPoint.setMeasuredt(timePoint);
@ -470,6 +481,7 @@ public class MPointServiceImpl implements MPointService {
} }
// @Async // @Async
@DataSourceTypeAnno(DataSourceEnum.SCADA_JSWS)
@Override @Override
public void sendKafka4MQTT_Alarm(String mpcode, Object value, String timePoint, int i, int length) { public void sendKafka4MQTT_Alarm(String mpcode, Object value, String timePoint, int i, int length) {
JSONObject message = new JSONObject(); JSONObject message = new JSONObject();

View File

@ -181,7 +181,7 @@ public class CommUtil {
format = "yyyy-MM-dd HH:mm:ss"; format = "yyyy-MM-dd HH:mm:ss";
} }
SimpleDateFormat sdf = new SimpleDateFormat(format); SimpleDateFormat sdf = new SimpleDateFormat(format);
return sdf.format(new Date(Long.valueOf(seconds + "000"))); return sdf.format(new Date(Long.valueOf(seconds)));
} }
/** /**

View File

@ -30,22 +30,22 @@ public class DataSourceAspect {
Method method = methodSignature.getMethod(); Method method = methodSignature.getMethod();
DataSourceTypeAnno typeAnno = method.getAnnotation(DataSourceTypeAnno.class); DataSourceTypeAnno typeAnno = method.getAnnotation(DataSourceTypeAnno.class);
DataSourceEnum sourceEnum = typeAnno.value(); DataSourceEnum sourceEnum = typeAnno.value();
Object[] argusObjects = pjp.getArgs(); Object[] argusObjects = pjp.getArgs();
Object argus = argusObjects[0]; Object argus = argusObjects[0];
String packageName = method.getDeclaringClass().getName();
// if (argus == DataSourceEnum.master) { if (sourceEnum == DataSourceEnum.SCADA_JSWS || packageName.contains(".scada.")) {
// DataSourceContextHolder.setDataSourceType(DataSourceEnum.master); DataSourceContextHolder.setDataSourceType(DataSourceEnum.SCADA_JSWS);
// } else if (sourceEnum == DataSourceEnum.slaver) { } else {
// DataSourceContextHolder.setDataSourceType(DataSourceEnum.slaver);
// }
if (argus != null && !argus.equals("") && argusObjects.length >= 2) {
//DataSourceContextHolder.setDataSourceType(DataSourceEnum.valueOf("SCADA_" + argus.toString()));
DataSourceContextHolder.setDataSourceType(DataSourceEnum.valueOf("SCADA_" + argus.toString()));
} else {
DataSourceContextHolder.setDataSourceType(DataSourceEnum.master); DataSourceContextHolder.setDataSourceType(DataSourceEnum.master);
} }
// if (argus != null && !argus.equals("") && argusObjects.length >= 2) {
// //DataSourceContextHolder.setDataSourceType(DataSourceEnum.valueOf("SCADA_" + argus.toString()));
// DataSourceContextHolder.setDataSourceType(DataSourceEnum.valueOf("SCADA_" + argus.toString()));
// } else {
// DataSourceContextHolder.setDataSourceType(DataSourceEnum.master);
// }
Object result = null; Object result = null;
try { try {
result = pjp.proceed(); result = pjp.proceed();

View File

@ -13,4 +13,5 @@ import java.lang.annotation.Target;
public @interface DataSourceTypeAnno { public @interface DataSourceTypeAnno {
//使用方式在service层方法上添加@DataSourceTypeAnno(DataSourceEnum.数据源枚举类型)用于指定所使用的数据源 //使用方式在service层方法上添加@DataSourceTypeAnno(DataSourceEnum.数据源枚举类型)用于指定所使用的数据源
DataSourceEnum value() default DataSourceEnum.master; DataSourceEnum value() default DataSourceEnum.master;
// DataSourceEnum value() default DataSourceEnum.SCADA_JSWS;
} }

View File

@ -12,7 +12,7 @@
</resultMap> </resultMap>
<insert id="insert" parameterType="com.sipai.entity.scada.MPointHistory"> <insert id="insert" parameterType="com.sipai.entity.scada.MPointHistory">
insert into `${tbName}` (ParmValue, MeasureDT, insert into ${tbName} (ParmValue, MeasureDT,
memotype, memo, userid, memotype, memo, userid,
insdt) insdt)
values (#{parmvalue,jdbcType=DECIMAL}, #{measuredt,jdbcType=TIMESTAMP}, values (#{parmvalue,jdbcType=DECIMAL}, #{measuredt,jdbcType=TIMESTAMP},
@ -39,27 +39,26 @@
<select id="selectCount" parameterType="com.sipai.entity.scada.MPointHistory" resultType="java.lang.Integer"> <select id="selectCount" parameterType="com.sipai.entity.scada.MPointHistory" resultType="java.lang.Integer">
select count(*) select count(*)
from `${tbName}` from ${tbName}
${where} ${where}
</select> </select>
<update id="createTable" parameterType="java.lang.String"> <update id="createTable" parameterType="java.lang.String">
CREATE TABLE `${table}` CREATE TABLE ${table} (
( ItemID bigint IDENTITY(1,1) NOT NULL,
`ItemID` bigint NOT NULL AUTO_INCREMENT, ParmValue DECIMAL(18, 4) NOT NULL,
`ParmValue` decimal(18, 4) NOT NULL, MeasureDT datetime NOT NULL,
`MeasureDT` datetime NOT NULL, memotype VARCHAR(100) NULL,
`memotype` varchar(100) NULL, memo VARCHAR(50) NULL,
`memo` varchar(50) NULL, userid VARCHAR(50) NULL,
`userid` varchar(50) NULL, insdt datetime NULL,
`insdt` datetime NULL ON UPDATE CURRENT_TIMESTAMP, CONSTRAINT PK_${table} PRIMARY KEY (ItemID)
PRIMARY KEY (`ItemID`),
INDEX `index_dt`(`MeasureDT`)
); );
CREATE INDEX index_dt ON ${table} (MeasureDT);
</update> </update>
<update id="updateByMeasureDt" parameterType="com.sipai.entity.scada.MPointHistory"> <update id="updateByMeasureDt" parameterType="com.sipai.entity.scada.MPointHistory">
update `${tbName}` update ${tbName}
set ParmValue = #{parmvalue,jdbcType=DECIMAL}, memotype = #{memotype,jdbcType=VARCHAR} set ParmValue = #{parmvalue,jdbcType=DECIMAL}, memotype = #{memotype,jdbcType=VARCHAR}
where MeasureDT = #{measuredt,jdbcType=TIMESTAMP} where MeasureDT = #{measuredt,jdbcType=TIMESTAMP}
</update> </update>

View File

@ -682,7 +682,7 @@
<select id="selectListByWhere" parameterType="java.lang.String" resultMap="BaseResultMap"> <select id="selectListByWhere" parameterType="java.lang.String" resultMap="BaseResultMap">
select select
<include refid="Base_Column_List" /> <include refid="Base_Column_List" />
from tb_measurepoint from TB_MeasurePoint
${where} ${where}
</select> </select>
<delete id="deleteByWhere" parameterType="java.lang.String"> <delete id="deleteByWhere" parameterType="java.lang.String">