selectListByPid(String pid);
+}
diff --git a/src/main/java/com/sipai/service/mqtt/MqttService.java b/src/main/java/com/sipai/service/mqtt/MqttService.java
new file mode 100644
index 0000000..fc9c722
--- /dev/null
+++ b/src/main/java/com/sipai/service/mqtt/MqttService.java
@@ -0,0 +1,57 @@
+package com.sipai.service.mqtt;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import org.springframework.scheduling.annotation.Scheduled;
+
+public interface MqttService {
+ /**
+ * 鐢ㄤ簬鎻愪緵浜戝钩鍙 缁檖lc鍙戝竷鎸囦护鎺ュ彛
+ * @param json
+ * @param topic
+ * @param userId
+ * @param bizId
+ * @return
+ */
+ public abstract int doPublish(JSONObject json, String topic, String userId, String bizId);
+
+ /**
+ * 澶勭悊mqtt璁㈤槄鍒扮殑鏁版嵁
+ * @param bizId
+ * @param topic
+ * @param ip4
+ * @param port
+ * @param jsonArray
+ */
+ public abstract void doHandle(String bizId, String topic, String ip4, String port, JSONArray jsonArray);
+
+ /**
+ * 澶勭悊mqtt璁㈤槄鍒扮殑鏁版嵁 -- 锛堣ˉ鍙戠殑鏁版嵁 1.涓嶆姤璀 2.涓嶆洿鏂颁富琛 3.AI鐐3鍒嗛挓瀛樹竴娆 DI鐐规瘡娆¢兘瀛橈級
+ * @param bizId
+ * @param topic
+ * @param ip4
+ * @param port
+ * @param jsonArray
+ */
+ public abstract void doHandleHis(String bizId, String topic, String ip4, String port, JSONArray jsonArray);
+
+ /**
+ * 缃戝叧鏁版嵁鍏ㄩ儴鍙洖
+ * @param bizId
+ * @return
+ */
+ public abstract void doRecall(String bizId,JSONObject jsonObject);
+
+ /**
+ * 鍙戦 rabbitmq
+ *
+ * @return
+ */
+ public abstract void sentRabbitmq(String exchange, String key, String value_BigDecimal, String date);
+
+ /**
+ * 娓呯悊娴嬮噺鐐瑰璞$紦瀛
+ */
+ @Scheduled(fixedRate = 3600000) // 姣忓皬鏃舵竻鐞嗕竴娆
+ void cleanCache();
+}
diff --git a/src/main/java/com/sipai/service/mqtt/PushCallback.java b/src/main/java/com/sipai/service/mqtt/PushCallback.java
new file mode 100644
index 0000000..d5c07ab
--- /dev/null
+++ b/src/main/java/com/sipai/service/mqtt/PushCallback.java
@@ -0,0 +1,229 @@
+package com.sipai.service.mqtt;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.sipai.entity.user.Company;
+import com.sipai.service.rabbitmq.MQService;
+import com.sipai.service.user.CompanyService;
+import com.sipai.tools.CommUtil;
+import com.sipai.tools.SpringContextUtil;
+import org.eclipse.paho.client.mqttv3.*;
+
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.Query;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.lang.management.ManagementFactory;
+import java.math.BigDecimal;
+import java.net.*;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * 鍙戝竷娑堟伅鐨勫洖璋冪被
+ *
+ * 蹇呴』瀹炵幇MqttCallback鐨勬帴鍙e苟瀹炵幇瀵瑰簲鐨勭浉鍏虫帴鍙f柟娉旵allBack 绫诲皢瀹炵幇 MqttCallBack銆
+ * 姣忎釜瀹㈡埛鏈烘爣璇嗛兘闇瑕佷竴涓洖璋冨疄渚嬨傚湪姝ょず渚嬩腑锛屾瀯閫犲嚱鏁颁紶閫掑鎴锋満鏍囪瘑浠ュ彟瀛樹负瀹炰緥鏁版嵁銆
+ * 鍦ㄥ洖璋冧腑锛屽皢瀹冪敤鏉ユ爣璇嗗凡缁忓惎鍔ㄤ簡璇ュ洖璋冪殑鍝釜瀹炰緥銆
+ * 蹇呴』鍦ㄥ洖璋冪被涓疄鐜颁笁涓柟娉曪細
+ *
+ * public void messageArrived(MqttTopic topic, MqttMessage message)鎺ユ敹宸茬粡棰勮鐨勫彂甯冦
+ *
+ * public void connectionLost(Throwable cause)鍦ㄦ柇寮杩炴帴鏃惰皟鐢ㄣ
+ *
+ * public void deliveryComplete(MqttDeliveryToken token))
+ * 鎺ユ敹鍒板凡缁忓彂甯冪殑 QoS 1 鎴 QoS 2 娑堟伅鐨勪紶閫掍护鐗屾椂璋冪敤銆
+ * 鐢 MqttClient.connect 婵娲绘鍥炶皟銆
+ */
+
+public class PushCallback implements MqttCallbackExtended {
+
+ InetAddress ip4;//ip
+
+ {
+ try {
+ ip4 = Inet4Address.getLocalHost();
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ }
+ }
+
+ MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer();
+ Set objectNames;
+ String port = "";//绔彛
+
+ {
+ try {
+ objectNames = beanServer.queryNames(new ObjectName("*:type=Connector,*"),
+ Query.match(Query.attr("protocol"), Query.value("HTTP/1.1")));
+// port = objectNames.iterator().next().getKeyProperty("port");
+ port = "8080";
+ } catch (MalformedObjectNameException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private MqttClient client;
+ private MqttConnectOptions options;
+ private String[] topic;
+ private int[] qos;
+
+ public PushCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) {
+ this.client = client;
+ this.options = options;
+ this.topic = topic;
+ this.qos = qos;
+ }
+
+ /**
+ * 杩炴帴鏂紑鍥炶皟鏂规硶
+ */
+ public void connectionLost(Throwable cause) {
+ System.out.println("===杩炴帴鏂紑鍥炶皟鏂规硶");
+ try {
+ if (null != client && !client.isConnected()) {
+ System.out.println("灏濊瘯閲嶆柊杩炴帴");
+ client.connect(options);
+ } else {
+ System.out.println("灏濊瘯寤虹珛鏂拌繛鎺");
+ client.connect(options);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 杩炴帴鎴愬姛鍥炶皟鏂规硶
+ */
+ @Override
+ public void connectComplete(boolean reconnect, String serverURI) {
+ System.out.println("===杩炴帴鎴愬姛鍥炶皟鏂规硶");
+ try {
+ if (null != topic && null != qos) {
+ if (client.isConnected()) {
+ client.subscribe(topic, qos);
+ System.out.println("mqtt杩炴帴鎴愬姛");
+ } else {
+ System.out.println("mqtt杩炴帴澶辫触");
+ }
+ }
+ } catch (Exception e) {
+ System.out.println("mqtt璁㈤槄涓婚寮傚父:" + e);
+ }
+ }
+
+ public void deliveryComplete(IMqttDeliveryToken token) {
+// System.out.println("deliveryComplete---------" + token.isComplete());
+ }
+
+ public void messageArrived(String topic, MqttMessage message) {
+ try {
+ JSONArray jsonArray = JSONArray.parseArray(message.toString());
+// System.out.println(topic + "===寮濮===" + CommUtil.nowDate() + "===" + jsonArray.size());
+ String unitId = topic.substring(0, 4);//鎴彇topic鐨勫墠闈㈤儴鍒嗕綔涓哄巶id
+ MqttService mqttService = (MqttService) SpringContextUtil.getBean("mqttService");
+ //姝e父鐨勪富棰 -- 鏁版嵁澶勭悊
+ mqttService.doHandle(unitId, topic, ip4.toString(), port, jsonArray);
+ } catch (Exception e) {
+ System.out.println(topic + "鎵ц澶辫触" + e);
+ }
+ }
+
+
+ /**
+ * 鏃ユ湡鏍煎紡瀛楃涓茶浆鎹㈡垚鏃堕棿鎴
+ *
+ * @param date_str 瀛楃涓叉棩鏈
+ * @param format 濡傦細yyyy-MM-dd HH:mm:ss
+ * @return
+ */
+ public static String date2TimeStamp(String date_str, String format) {
+ try {
+ SimpleDateFormat sdf = new SimpleDateFormat(format);
+ return String.valueOf(sdf.parse(date_str).getTime() / 1000);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return "";
+ }
+
+ /**
+ * 鍙戦 rabbitmq
+ *
+ * @param key
+ * @param value_BigDecimal
+ * @param date
+ */
+ public static void sentRabbitmq(String exchange, String key, String value_BigDecimal, String date) {
+ JSONObject jsonObject1 = new JSONObject();
+ jsonObject1.put("id", key);
+ jsonObject1.put("value", value_BigDecimal);
+ jsonObject1.put("time", date);
+ MQService mqService = (MQService) SpringContextUtil.getBean("mQService");
+ mqService.sendMQ(exchange, jsonObject1.toString());
+ }
+
+ public static void sentRabbitMqMpoint(String exchange, String json) {
+ MQService mqService = (MQService) SpringContextUtil.getBean("mQService");
+ mqService.sendMQ(exchange, json);
+ }
+
+ public String doPost(String URL) {
+ OutputStreamWriter out = null;
+ BufferedReader in = null;
+ StringBuilder result = new StringBuilder();
+ HttpURLConnection conn = null;
+ try {
+ java.net.URL url = new URL(URL);
+ conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod("POST");
+ //鍙戦丳OST璇锋眰蹇呴』璁剧疆涓簍rue
+ conn.setDoOutput(true);
+ conn.setDoInput(true);
+ //璁剧疆杩炴帴瓒呮椂鏃堕棿鍜岃鍙栬秴鏃舵椂闂
+ conn.setConnectTimeout(30000);
+ conn.setReadTimeout(10000);
+ conn.setRequestProperty("Content-Type", "application/json");
+ conn.setRequestProperty("Accept", "application/json");
+ //鑾峰彇杈撳嚭娴
+ out = new OutputStreamWriter(conn.getOutputStream());
+ String jsonStr = "{\"qry_by\":\"name\", \"name\":\"Tim\"}";
+ out.write(jsonStr);
+ out.flush();
+ out.close();
+ //鍙栧緱杈撳叆娴侊紝骞朵娇鐢≧eader璇诲彇
+ if (200 == conn.getResponseCode()) {
+ in = new BufferedReader(new InputStreamReader(conn.getInputStream(), "UTF-8"));
+ String line;
+ while ((line = in.readLine()) != null) {
+ result.append(line);
+ }
+ } else {
+ System.out.println("ResponseCode is an error code:" + conn.getResponseCode());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ if (out != null) {
+ out.close();
+ }
+ if (in != null) {
+ in.close();
+ }
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ }
+ }
+ return result.toString();
+ }
+
+}
diff --git a/src/main/java/com/sipai/service/mqtt/impl/MqttConfigServiceImpl.java b/src/main/java/com/sipai/service/mqtt/impl/MqttConfigServiceImpl.java
new file mode 100644
index 0000000..57016ea
--- /dev/null
+++ b/src/main/java/com/sipai/service/mqtt/impl/MqttConfigServiceImpl.java
@@ -0,0 +1,348 @@
+package com.sipai.service.mqtt.impl;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.sipai.entity.mqtt.Mqtt;
+import com.sipai.entity.mqtt.MqttProperties;
+import com.sipai.entity.scada.MPoint;
+import com.sipai.service.mqtt.PushCallback;
+import com.sipai.dao.mqtt.MqttConfigDao;
+import com.sipai.entity.mqtt.MqttConfig;
+import com.sipai.entity.mqtt.MqttConfigTopic;
+import com.sipai.service.mqtt.MqttConfigService;
+import com.sipai.service.mqtt.MqttConfigTopicService;
+import com.sipai.tools.CommUtil;
+import org.apache.commons.codec.binary.Base64;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Service;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Created with IntelliJ IDEA.
+ *
+ * @Auther: sj
+ * @Date: 2021/03/24/16:45
+ * @Description:
+ */
+@Service
+public class MqttConfigServiceImpl implements MqttConfigService {
+ private final MqttProperties mqttProperties;
+ @Autowired
+ private MqttConfigDao mqttConfigDao;
+ @Autowired
+ private MqttConfigTopicService mqttConfigTopicService;
+
+ private static MqttClient mqttClient1;
+
+ // 鏋勯犲櫒娉ㄥ叆
+ @Autowired
+ public MqttConfigServiceImpl(MqttProperties mqttProperties) {
+ this.mqttProperties = mqttProperties;
+ }
+
+ private static MqttClient connect(String brokeraddress, String clientId, String userName, String password) throws MqttException {
+ MemoryPersistence persistence = new MemoryPersistence();
+ MqttConnectOptions connOpts = new MqttConnectOptions();
+ connOpts.setCleanSession(true);
+ connOpts.setUserName(userName);
+ connOpts.setPassword(password.toCharArray());
+ connOpts.setConnectionTimeout(1000);
+ connOpts.setKeepAliveInterval(1000);
+ mqttClient1 = new MqttClient(brokeraddress, clientId, persistence);
+ mqttClient1.connect(connOpts);
+ return mqttClient1;
+ }
+
+ @Override
+ public MqttConfig selectByPrimaryKey(String id) {
+ return mqttConfigDao.selectByPrimaryKey(id);
+ }
+
+ @Override
+ public Integer deleteByPrimaryKey(String id) {
+ return mqttConfigDao.deleteByPrimaryKey(id);
+ }
+
+ @Override
+ public Integer insert(MqttConfig entity) {
+ return mqttConfigDao.insert(entity);
+ }
+
+ @Override
+ public Integer updateByPrimaryKeySelective(MqttConfig entity) {
+ return mqttConfigDao.insert(entity);
+ }
+
+ @Override
+ public List selectListByWhere(String whereStr) {
+ MqttConfig entity = new MqttConfig();
+ entity.setWhere(whereStr);
+ return mqttConfigDao.selectListByWhere(entity);
+ }
+
+ @Override
+ public Integer deleteByWhere(String whereStr) {
+ return mqttConfigDao.deleteByWhere(whereStr);
+ }
+
+ @Override
+ public String connect(String id) {
+ String clientId = "";
+ MqttConfig mqttConfig = mqttConfigDao.selectByPrimaryKey(id);
+ if (mqttConfig != null) {
+ clientId = mqttConfig.getTomcatPort() + "-" + CommUtil.getUUID();
+ System.out.println(clientId);
+ String sql = "where id = '" + mqttConfig.getId() + "'";
+ mqttConfig.setWhere(sql);
+ List list_config = mqttConfigDao.selectListByWhere(mqttConfig);
+ List topic_str = new ArrayList<>();
+
+ //姣忎釜瀹㈡埛绔牴鎹厤缃殑tomcat绔彛鍙疯繘琛屽垎鍖鸿繛鎺
+ if (list_config != null && list_config.size() > 0) {
+ //寰幆閰嶇疆涓昏〃
+ for (int i = 0; i < list_config.size(); i++) {
+ String sql_detail = "where pid = '" + list_config.get(i).getId() + "'";
+ List list_topic = mqttConfigTopicService.selectListByWhere(sql_detail);
+ if (list_topic != null && list_topic.size() > 0) {
+ //寰幆閰嶇疆闄勮〃锛堣幏鍙栨墍鏈塼opic锛
+ for (int j = 0; j < list_topic.size(); j++) {
+ //閰嶇疆鐨刜UP涓婚
+// topic_str.add("$queue/" + list_topic.get(j).getTopic());
+ topic_str.add(list_topic.get(j).getTopic());
+ }
+ }
+ }
+ }
+
+ //閰嶇疆鐨勬潯鏁板嵆涓烘暟缁勭殑闀垮害
+ String[] topic = new String[topic_str.size()];
+ System.out.println("璁㈤槄涓婚锛" + topic_str);
+ int[] qos = new int[topic_str.size()];
+ for (int j = 0; j < topic_str.size(); j++) {
+ topic[j] = topic_str.get(j);
+ qos[j] = 0;
+ }
+ try {
+ MqttConnectOptions connOpts = new MqttConnectOptions();
+ connOpts.setCleanSession(true);
+ connOpts.setConnectionTimeout(1000);
+ //姣忛殧1.5*10绉掔殑鏃堕棿鍚戝鎴风鍙戦佷釜娑堟伅鍒ゆ柇瀹㈡埛绔槸鍚﹀湪绾
+ connOpts.setKeepAliveInterval(1000);
+ //鑷姩閲嶈繛
+ connOpts.setAutomaticReconnect(true);
+ connOpts.setUserName(mqttProperties.getUsername());
+ connOpts.setPassword(mqttProperties.getPassword().toCharArray());
+
+ if (mqttClient1 != null) {
+ System.out.println("宸茶繛鎺");
+ //鍥炶皟
+ mqttClient1.setCallback(new PushCallback(mqttClient1, connOpts, topic, qos));
+ //璁㈤槄
+ mqttClient1.subscribe(topic, qos);
+ } else {
+ System.out.println("閲嶆柊杩炴帴");
+ //杩炴帴
+ mqttClient1 = connect(mqttConfig.getBrokerIp(), clientId, mqttProperties.getUsername(), mqttProperties.getPassword());
+ //鍥炶皟
+ mqttClient1.setCallback(new PushCallback(mqttClient1, connOpts, topic, qos));
+ //璁㈤槄
+ mqttClient1.subscribe(topic, qos);
+ }
+
+ } catch (MqttException me) {
+ me.printStackTrace();
+ }
+ }
+ return clientId;
+ }
+
+ public Boolean clientStatus(String clientId) {
+ boolean status = true;
+ try {
+ System.out.println(mqttProperties.getDashboard01() + "/api/v4/clients/" + clientId);
+ URL url = new URL(mqttProperties.getDashboard01() + "/api/v4/clients/" + clientId);
+ Base64 b = new Base64();
+ String encoding = b.encodeAsString(new String("admin:sipai@64368180").getBytes());
+ HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+ connection.setRequestMethod("GET");
+ connection.setDoOutput(true);
+ connection.setRequestProperty("Content-Type", "application/json");
+ connection.setRequestProperty("Authorization", "Basic " + encoding);
+ InputStream content = connection.getInputStream();
+ BufferedReader in = new BufferedReader(new InputStreamReader(content));
+
+ JSONObject jsonObject = JSONObject.parseObject(in.readLine());
+ String data = jsonObject.get("data").toString();
+ JSONArray jsonArrayStr = JSONArray.parseArray(data);
+ if (jsonArrayStr.size() > 0) {
+ JSONObject jsonObject1 = jsonArrayStr.getJSONObject(0);
+ if (jsonObject1.get("connected").equals(true)) {
+ status = true;
+ } else {
+ status = false;
+ }
+ } else {
+ status = false;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return status;
+ }
+
+ @Override
+ public JSONObject getEmqxHost4WS(String topic) {
+ JSONObject jsonObject = new JSONObject();
+ List list = mqttConfigTopicService.selectListByWhere("where topic = '" + topic + "'");
+ if (list != null && list.size() > 0) {
+ MqttConfig mqttConfig = new MqttConfig();
+ //澧炲姞涓惊鐜煡鍑烘墍鏈夌殑 鏈変簺鏃ф暟鎹 璺宠繃 鍙煡鏈変富琛ㄦ暟鎹殑
+ for (int i = 0; i < list.size(); i++) {
+ mqttConfig.setWhere("where id = '" + list.get(i).getPid() + "'");
+ List list2 = mqttConfigDao.selectListByWhere(mqttConfig);
+ if (list2 != null && list2.size() > 0) {
+ if (list2.get(0).getBrokerIp() != null && !list2.get(0).getBrokerIp().trim().equals("")) {
+ String ip = list2.get(0).getBrokerIp();
+ ip = ip.replace("tcp://", "");
+ String host = ip.substring(0, ip.indexOf(":"));
+ //涓鏈
+ if (host != null && host.equals("172.16.242.16")) {
+ jsonObject.put("host", "58.254.140.62");
+ jsonObject.put("port", "8088");
+ }
+ //浜屾湡
+ if (host != null && host.equals("172.16.242.24")) {
+ jsonObject.put("host", "58.254.140.121");
+ jsonObject.put("port", "8083");
+ }
+ }
+ break;
+ }
+ }
+ }
+ return jsonObject;
+ }
+
+ @Override
+ public String getEmqxHost4TCP(String topic) {
+ String host = mqttProperties.getBrokerAddress();//榛樿涓鏈
+ List list = mqttConfigTopicService.selectListByWhere("where topic = '" + topic + "'");
+ if (list != null && list.size() > 0) {
+ MqttConfig mqttConfig = new MqttConfig();
+ //澧炲姞涓惊鐜煡鍑烘墍鏈夌殑 鏈変簺鏃ф暟鎹 璺宠繃 鍙煡鏈変富琛ㄦ暟鎹殑
+ for (int i = 0; i < list.size(); i++) {
+
+ mqttConfig.setWhere("where id = '" + list.get(i).getPid() + "'");
+ List list2 = mqttConfigDao.selectListByWhere(mqttConfig);
+ if (list2 != null && list2.size() > 0) {
+ if (list2.get(0).getBrokerIp() != null && !list2.get(0).getBrokerIp().trim().equals("")) {
+ host = list2.get(0).getBrokerIp();
+ }
+ break;
+ }
+ }
+ }
+ return host;
+ }
+
+ @Async
+ public void doSendMqttVue(List list, String topic) {
+ JSONArray jsonVue = (JSONArray) JSONArray.toJSON(list);
+ MqttMessage message = new MqttMessage(jsonVue.toString().getBytes());
+ message.setQos(0);
+ message.setRetained(false);
+ try {
+ String host = this.getEmqxHost4TCP(topic);
+ if (host != null && !host.equals("")) {
+ String[] str = host.split(";");
+ String ip = str[0];//杩炴帴鐨刬p
+ String topic_vue = topic;
+ if (topic != null && topic.contains("_IM")) {
+ topic_vue = topic.replace("_IM", "");
+ }
+// topic_vue = topic_vue + "_VIEW";
+ topic_vue = topic_vue.substring(0, 4) + "_01_UP_VIEW";
+ int size = list != null ? list.size() : 0;
+ StringBuilder ids = new StringBuilder();
+ if (list != null) {
+ int max = Math.min(5, list.size());
+ for (int i = 0; i < max; i++) {
+ ids.append(list.get(i).getId());
+ if (i < max - 1) ids.append(",");
+ }
+ }
+ System.out.println("鎺ㄩ乂IEW: topic=" + topic_vue + " size=" + size);
+ if (mqttClient1 != null && mqttClient1.isConnected()) {
+ //鍙戝竷
+// System.out.println("鎺ㄩ佸墠绔細" + topic_vue);
+ mqttClient1.publish(topic_vue, message);
+ } else {
+ System.out.println("===閲嶆柊杩炴帴===" + "鎺╒UE===涓婚锛" + topic_vue + "===" + ip + "===" + CommUtil.nowDate() + "===");
+ //杩炴帴
+ String clientId = "send_vue_" + str[2];
+ mqttClient1 = connect(ip, clientId, mqttProperties.getUsername(), mqttProperties.getPassword());
+ //鍙戝竷
+ mqttClient1.publish(topic_vue, message);
+ }
+ }
+ } catch (MqttException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Async
+ public void doSendView(JSONArray jsonArray, String topic) {
+ MqttMessage message = new MqttMessage(jsonArray.toString().getBytes());
+ message.setQos(0);
+ message.setRetained(false);
+ try {
+ if (mqttClient1 != null && mqttClient1.isConnected()) {
+ //鍙戝竷
+ mqttClient1.publish(topic, message);
+ } else {
+ mqttClient1 = connect(mqttProperties.getBrokerAddress(), "send_vue_" + CommUtil.getUUID(), mqttProperties.getUsername(), mqttProperties.getPassword());
+ //鍙戝竷
+ mqttClient1.publish(topic, message);
+ }
+ } catch (MqttException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * @param jsonObject 闇瑕佹帹閫佺殑鏁版嵁
+ * @param topic 鎺ㄩ佺殑涓婚
+ */
+ @Async
+ public void doSendJson(JSONObject jsonObject, String topic) {
+ MqttMessage message = new MqttMessage(jsonObject.toString().getBytes());
+ message.setQos(0);
+ message.setRetained(false);
+ try {
+ if (mqttClient1 != null && mqttClient1.isConnected()) {
+ //鍙戝竷
+ mqttClient1.publish(topic, message);
+ } else {
+ mqttClient1 = connect(mqttProperties.getBrokerAddress(), "send_vue_" + CommUtil.getUUID(), mqttProperties.getUsername(), mqttProperties.getPassword());
+ //鍙戝竷
+ mqttClient1.publish(topic, message);
+ }
+ } catch (MqttException e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/src/main/java/com/sipai/service/mqtt/impl/MqttConfigTopicServiceImpl.java b/src/main/java/com/sipai/service/mqtt/impl/MqttConfigTopicServiceImpl.java
new file mode 100644
index 0000000..4c3120e
--- /dev/null
+++ b/src/main/java/com/sipai/service/mqtt/impl/MqttConfigTopicServiceImpl.java
@@ -0,0 +1,61 @@
+package com.sipai.service.mqtt.impl;
+
+import com.sipai.dao.mqtt.MqttConfigTopicDao;
+import com.sipai.entity.mqtt.MqttConfigTopic;
+import com.sipai.service.mqtt.MqttConfigTopicService;
+import com.sipai.tools.DataSourceEnum;
+import com.sipai.tools.DataSourceTypeAnno;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+/**
+ * Created with IntelliJ IDEA.
+ *
+ * @Auther: sj
+ * @Date: 2021/03/24/17:56
+ * @Description:
+ */
+@Service
+public class MqttConfigTopicServiceImpl implements MqttConfigTopicService {
+ @Autowired
+ private MqttConfigTopicDao mqttConfigTopicDao;
+
+ @Override
+ public MqttConfigTopic selectByPrimaryKey(String id) {
+ return mqttConfigTopicDao.selectByPrimaryKey(id);
+ }
+
+ @Override
+ public Integer deleteByPrimaryKey(String id) {
+ return mqttConfigTopicDao.deleteByPrimaryKey(id);
+ }
+
+ @Override
+ public Integer insert(MqttConfigTopic entity) {
+ return mqttConfigTopicDao.insert(entity);
+ }
+
+ @Override
+ public Integer updateByPrimaryKeySelective(MqttConfigTopic entity) {
+ return mqttConfigTopicDao.updateByPrimaryKeySelective(entity);
+ }
+
+ @Override
+ public List selectListByWhere(String wherestr) {
+ MqttConfigTopic entity = new MqttConfigTopic();
+ entity.setWhere(wherestr);
+ return mqttConfigTopicDao.selectListByWhere(entity);
+ }
+
+ @Override
+ public Integer deleteByWhere(String wherestr) {
+ return mqttConfigTopicDao.deleteByWhere(wherestr);
+ }
+
+ @Override
+ public List selectListByPid(String pid) {
+ return mqttConfigTopicDao.selectListByPid(pid);
+ }
+}
diff --git a/src/main/java/com/sipai/service/mqtt/impl/MqttServiceImpl.java b/src/main/java/com/sipai/service/mqtt/impl/MqttServiceImpl.java
new file mode 100644
index 0000000..a1e4480
--- /dev/null
+++ b/src/main/java/com/sipai/service/mqtt/impl/MqttServiceImpl.java
@@ -0,0 +1,553 @@
+package com.sipai.service.mqtt.impl;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.sipai.entity.mqtt.Mqtt;
+import com.sipai.entity.mqtt.MqttProperties;
+import com.sipai.entity.scada.MPoint;
+import com.sipai.entity.scada.MPointES;
+import com.sipai.entity.scada.MPointHistory;
+import com.sipai.service.Listener.ListenerPointService;
+import com.sipai.service.mqtt.MqttConfigService;
+import com.sipai.service.mqtt.MqttConfigTopicService;
+import com.sipai.service.mqtt.MqttService;
+import com.sipai.service.rabbitmq.MQService;
+import com.sipai.service.scada.MPointHistoryService;
+import com.sipai.service.scada.MPointService;
+import com.sipai.tools.*;
+import jodd.util.ClassUtil;
+import org.eclipse.paho.client.mqttv3.*;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.redisson.api.RBatch;
+import org.redisson.api.RedissonClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+@Service("mqttService")
+public class MqttServiceImpl implements MqttService {
+ private final MqttProperties mqttProperties;
+ @Autowired
+ private MPointService mPointService;
+ @Autowired
+ private MqttConfigService mqttConfigService;
+ @Autowired
+ private MqttConfigTopicService mqttConfigTopicService;
+ @Autowired
+ private ListenerPointService listenerPointService;
+ @Autowired
+ private RedissonClient redissonClient;
+
+ private static MqttClient mqttClient;
+ private static String ipStr = "";
+ private static final Logger loggger = LoggerFactory.getLogger(ClassUtil.class);
+
+ //娴嬮噺鐐瑰璞$紦瀛
+ private final Map mPointCache = new ConcurrentHashMap<>();
+ private final Set notFoundKeys = ConcurrentHashMap.newKeySet();
+
+ @Override
+ @Scheduled(fixedRate = 3600000)//姣忓皬鏃舵竻鐞嗕竴娆
+ public void cleanCache() {
+ loggger.info("娓呯悊缂撳瓨(" + mPointCache.size() + " 鏉):" + CommUtil.nowDate());
+ mPointCache.clear();
+ notFoundKeys.clear();
+ }
+
+ public MqttServiceImpl(MqttProperties mqttProperties) {
+ this.mqttProperties = mqttProperties;
+ }
+
+ private static MqttClient connect(String brokeraddress, String clientId, String userName, String password) throws MqttException {
+
+ ipStr = brokeraddress;
+
+ MemoryPersistence persistence = new MemoryPersistence();
+ MqttConnectOptions connOpts = new MqttConnectOptions();
+ connOpts.setCleanSession(true);
+ connOpts.setUserName(userName);
+ connOpts.setPassword(password.toCharArray());
+ connOpts.setConnectionTimeout(10);
+ connOpts.setKeepAliveInterval(20);
+// String[] uris = {"tcp://10.100.124.206:1883","tcp://10.100.124.207:1883"};
+// connOpts.setServerURIs(uris); //璧峰埌璐熻浇鍧囪 鍜岄珮鍙敤鐨勪綔鐢
+// MqttClient mqttClient = new MqttClient(brokeraddress, clientId, persistence);
+ mqttClient = new MqttClient(brokeraddress, clientId, persistence);
+ //mqttClient.setCallback(new PushCallback("test"));
+ mqttClient.connect(connOpts);
+ return mqttClient;
+ }
+
+ @Async("getAsyncMqttHandle")
+ @Override
+ public void doHandle(String bizId, String topic, String ip4, String port, JSONArray jsonArray) {
+// System.out.println("MQTT鎺ユ敹: biz=" + bizId + " topic=" + topic + " msgCount=" + jsonArray.size());
+ for (int i = 0; i < jsonArray.size(); i++) {
+ JSONObject jsonObject = jsonArray.getJSONObject(i);
+ // 瑙f瀽鏃堕棿鎴
+ String timestamp = jsonObject.getString("timestamp");
+ String[] times = timestamp.split("\\.");
+ String date = CommUtil.timeStamp2Date(times[0], "yyyy-MM-dd HH:mm:ss");
+
+ // 瑙f瀽鏁版嵁瀵硅薄
+ JSONObject data = jsonObject.getJSONObject("Data");
+ if (data == null || data.isEmpty()) {
+ loggger.warn("Data涓虹┖锛岃烦杩: @{}", date);
+ continue;
+ }
+
+ List redisList = new ArrayList<>();
+ List vueList = new ArrayList<>();
+
+ // 閬嶅巻鏁版嵁鏉$洰
+ for (String key : data.keySet()) {
+ String valueStr = data.getString(key);
+ // 杩囨护鏃犳晥鍊
+ if (!isValidValue(valueStr)) {
+ loggger.warn("鍊兼棤鏁堬紝璺宠繃: key={} raw={} @{}", key, valueStr, date);
+ continue;
+ }
+ // 鑾峰彇鐐逛綅淇℃伅
+ MPoint mPoint = getMPointCacheOrES(bizId, key);
+ if (mPoint == null) {
+ boolean inNotFound = notFoundKeys.contains(key);
+ loggger.warn("鐐逛綅涓嶅瓨鍦紝璺宠繃: key={} notFoundCache={} @{}", key, inNotFound, date);
+ continue;
+ }
+ // 澶勭悊鏁板艰浆鎹€佸嶇巼璁$畻銆佸彇鍙嶆搷浣溿佹帹鎶ヨ淇℃伅銆佸彂閫並afka娑堟伅
+ BigDecimal value = handleValueConversion(bizId, valueStr, key, date, vueList);
+ if (value == null) {
+ loggger.warn("鏁板艰浆鎹㈠け璐ワ紝璺宠繃: key={} raw={} @{}", key, valueStr, date);
+ continue;
+ }
+ // 澶勭悊ES鏁版嵁
+ MPointES esPoint = buildEsPoint(mPoint, value, date);
+ redisList.add(esPoint);
+ }
+ // 鎺ㄩ佸墠绔暟鎹
+// loggger.info("鍑嗗鎺ㄩ佸墠绔: topic={} size={} @{}", topic, vueList.size(), date);
+ mqttConfigService.doSendMqttVue(vueList, topic);
+
+ // 鎵归噺鍐欏叆Redis
+ saveToRedisBatch(redisList);
+// loggger.info("鎵归噺鍐欏叆Redis瀹屾垚: size={} @{}", redisList.size(), date);
+ }
+ }
+
+ /**
+ * 鏍¢獙鍊兼槸鍚︽湁鏁堬紙闈炵┖涓旈暱搴﹀悎娉曪級
+ */
+ private boolean isValidValue(String value) {
+ if (value == null || value.isEmpty()) {
+ loggger.warn("鏃犳晥鍊: 绌哄瓧绗︿覆");
+ return false;
+ }
+ BigDecimal bd = new BigDecimal(value);
+ String plainValue = bd.toPlainString();
+ String[] parts = plainValue.split("\\.");
+ boolean ok = parts[0].length() <= 15;
+ if (!ok) {
+ loggger.warn("鏃犳晥鍊: 鏁存暟浣嶈繃闀 len={} raw={}", parts[0].length(), value);
+ }
+ return ok;
+ }
+
+ /**
+ * 澶勭悊鏁板艰浆鎹€佸嶇巼璁$畻銆佸彇鍙嶆搷浣溿佹帹鎶ヨ淇℃伅銆佸彂閫並afka娑堟伅
+ */
+ private BigDecimal handleValueConversion(String unitId, String valueStr, String key, String date, List vueList) {
+ try {
+ double valDou = Double.parseDouble(valueStr);
+ MPoint mPoint = getMPointCacheOrES(unitId, key);
+ if (mPoint == null) {
+ return null;
+ }
+
+ // 鍊嶇巼璁$畻
+ BigDecimal rate = mPoint.getRate() != null ? mPoint.getRate() : BigDecimal.ONE;
+ BigDecimal value = (valDou == 0 || rate.compareTo(BigDecimal.ZERO) == 0)
+ ? BigDecimal.ZERO
+ : rate.multiply(new BigDecimal(valueStr)).divide(BigDecimal.ONE, 10, RoundingMode.HALF_UP);
+
+ // 鍙栧弽鎿嶄綔
+ if ("1".equals(mPoint.getDirecttype())) {
+ value = "0".equals(valueStr) ? BigDecimal.ONE : ("1".equals(valueStr) ? BigDecimal.ZERO : value);
+ }
+
+ // 鎺ㄩ佹姤璀︿俊鎭
+ if (mPoint.getTriggeralarm() != null && !"0".equals(mPoint.getTriggeralarm().trim())) {
+// loggger.info("鎺ㄩ佹姤璀: {}={} @{}", key, value, date);
+// mPointService.sendKafka4MQTT_Alarm(key, value, date, 0, 1);
+ }
+
+ // 鍙戦並afka娑堟伅
+// mPointService.sendKafka4MQTT(unitId,key, value, date, 0, 1, mPoint.getSignaltype());
+
+ // 鏋勫缓鍓嶇鏁版嵁
+ buildVuePoint(mPoint, value, date, vueList);
+
+ return value;
+ } catch (NumberFormatException e) {
+ loggger.error("鏁板艰浆鎹㈠け璐: {}", valueStr, e);
+ return null;
+ }
+ }
+
+ /**
+ * 鏋勫缓鍓嶇灞曠ず鐢ㄧ偣浣嶅璞
+ */
+ private void buildVuePoint(MPoint mPoint, BigDecimal value, String date, List vueList) {
+ MPoint vuePoint = new MPoint();
+ vuePoint.setId(mPoint.getId());
+ vuePoint.setMeasuredt(date);
+ int numtail = doNumtail(mPoint.getNumtail());
+ vuePoint.setParmvalue(value.setScale(numtail, RoundingMode.HALF_UP));
+ vueList.add(vuePoint);
+ }
+
+ /**
+ * 鏋勫缓ES瀛樺偍鐢ㄧ偣浣嶅璞
+ */
+ private MPointES buildEsPoint(MPoint mPoint, BigDecimal value, String date) {
+ MPoint esPoint = new MPoint();
+ esPoint.setId(mPoint.getId());
+ esPoint.setMeasuredt(date);
+ esPoint.setParmvalue(value);
+ esPoint.setNumtail(mPoint.getNumtail());
+ return MPointES.format(esPoint);
+ }
+
+ /**
+ * 鎵归噺淇濆瓨鏁版嵁鍒癛edis
+ */
+ private void saveToRedisBatch(List esList) {
+ if (esList.isEmpty()) {
+ return;
+ }
+
+ int batchSize = 200;
+ int total = esList.size();
+ int batches = (total + batchSize - 1) / batchSize;
+
+ try {
+ for (int i = 0; i < batches; i++) {
+ int start = i * batchSize;
+ int end = Math.min((i + 1) * batchSize, total);
+ RBatch batch = redissonClient.createBatch();
+
+ for (MPoint mPoint : esList.subList(start, end)) {
+ int num = mPoint.getId().hashCode() % 25;
+ String dt = mPoint.getMeasuredt().replace("T", " ").replace("Z", "");
+ int numtail = doNumtail(mPoint.getNumtail());
+ String val = mPoint.getParmvalue().setScale(numtail, RoundingMode.HALF_UP)
+ + ";" + mPoint.getParmvalue() + ";" + dt;
+
+ batch.getMapCache(CommString.RedisMpointFlag + num)
+ .fastPutAsync(mPoint.getId(), val, 1, TimeUnit.DAYS);
+ }
+ batch.execute();
+ }
+ } catch (Exception e) {
+ loggger.error("Redis鎵归噺淇濆瓨澶辫触", e);
+ }
+ }
+
+ @Async("getAsyncMqttHandle")
+ @Override
+ public void doHandleHis(String bizId, String topic, String ip4, String port, JSONArray jsonArray) {
+ for (int i = 0; i < jsonArray.size(); i++) {
+ JSONObject jsonObject = jsonArray.getJSONObject(i);
+ String timestamp = jsonObject.get("timestamp").toString();//鏃堕棿鎴
+ String[] times = timestamp.split("\\.");
+ String date = CommUtil.timeStamp2Date(times[0], "yyyy-MM-dd HH:mm:ss");
+ //鏁版嵁
+ JSONObject data = jsonObject.getJSONObject("Data");
+
+ MPointService mPointService = (MPointService) SpringContextUtil.getBean("mPointService");
+ MPointHistoryService mPointHistoryService = (MPointHistoryService) SpringContextUtil.getBean("mPointHistoryService");
+ MPointHistory mPointHistory = new MPointHistory();
+
+ for (String str : data.keySet()) {
+ String key = str;//娴嬮噺鐐筰d
+ String value = data.get(str) + "";
+
+ //杩囨护鎺夐暱搴﹁秴杩15浣嶇殑鏁板 money鏃犳硶瀛
+ if (value != null && !value.equals("")) {
+ BigDecimal bd = new BigDecimal(value);
+ value = bd.toPlainString();
+ if (value.contains(".")) {
+ String[] vals = value.split("\\.");
+ if (vals[0].length() > 15) {
+ //瓒呭嚭
+ continue;
+ } else {
+ //鏈秴鍑
+ }
+ } else {
+ if (value.length() > 15) {
+ //瓒呭嚭
+ continue;
+ } else {
+ //鏈秴鍑
+ }
+ }
+ } else {
+ //涓虹┖ 杩囨护鎺 涓嶅瓨鏃ュ織
+ continue;
+ }
+
+ BigDecimal value_BigDecimal = null;
+ double valDou = 0;//鍒ゆ柇鏄惁涓0 鎴栬0.00 鎴 0.000绛
+
+ //灏唈son涓负""鐨勮繃婊ゆ帀
+ if (value != null && !value.equals("")) {
+ valDou = Double.parseDouble(value);//鍒ゆ柇鏄惁涓0 鎴栬0.00 鎴 0.000绛
+// MPoint mPoint = mPointService.selectById(bizId, key);
+ MPoint mPoint = mPointService.selectById(bizId, key.toString());
+ if (mPoint != null) {
+ BigDecimal numDecimal = mPoint.getRate();
+ if (valDou == 0 || mPoint.getRate().equals(0)) {
+ value_BigDecimal = new BigDecimal(0);
+ } else {
+ value_BigDecimal = numDecimal.multiply(new BigDecimal(value));
+ //鎴彇闀垮害闃叉 杩囬暱 闄1鍐嶆埅鍙10浣
+ value_BigDecimal = value_BigDecimal.divide(new BigDecimal(1), 10, BigDecimal.ROUND_HALF_UP);
+ }
+
+ //鍙栧弽鎿嶄綔
+ if (mPoint.getDirecttype() != null && mPoint.getDirecttype().equals("1")) {
+ //鍙栧弽
+ if (value.equals("0")) {
+ value_BigDecimal = new BigDecimal(1);
+ } else if (value.equals("1")) {
+ value_BigDecimal = new BigDecimal(0);
+ } else {
+ //姝e父
+ }
+ } else {
+ //姝e父
+ }
+
+ try {
+ //DI鐐圭洿鎺ュ瓨
+ if (mPoint.getSignaltype() != null && mPoint.getSignaltype().equals("DI")) {
+ mPointHistory.setMeasuredt(date);
+ mPointHistory.setParmvalue(value_BigDecimal);
+ mPointHistory.setTbName("[tb_mp_" + key + "]");
+ mPointHistory.setUserid(ip4 + ":" + port);
+ mPointHistoryService.save(bizId, mPointHistory);
+ }
+ //AI鐐3鍒嗛挓瀛樹竴娆
+ if (mPoint.getSignaltype() != null && mPoint.getSignaltype().equals("DI")) {
+ String sqlStr = "where MeasureDT>dateadd(mi,-3,'" + date + "') order by MeasureDT desc";
+ /*List mPointHistoryList = mPointHistoryService.selectTopNumListByTableAWhere(mPoint.getBizid(),"[tb_mp_" + key + "]", sqlStr, "1");
+ if (mPointHistoryList != null && mPointHistoryList.size() > 0) {
+ //3鍒嗛挓鍐呭瓨鍦 鍒欎笉瀛
+ } else {
+ mPointHistory.setMeasuredt(date);
+ mPointHistory.setParmvalue(value_BigDecimal);
+ mPointHistory.setTbName("[tb_mp_" + key + "]");
+ mPointHistory.setUserid(ip4 + ":" + port);
+ mPointHistoryService.save(bizId, mPointHistory);
+ }*/
+ }
+ } catch (Exception e) {
+ System.out.println("鏂板瀛愯〃--鎵ц缁撴灉锛" + e);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * 鍙戦 rabbitmq
+ *
+ * @param key
+ * @param value_BigDecimal
+ * @param date
+ */
+ @Override
+ public void sentRabbitmq(String exchange, String key, String value_BigDecimal, String date) {
+ JSONObject jsonObject1 = new JSONObject();
+ jsonObject1.put("id", key);
+ jsonObject1.put("value", value_BigDecimal);
+ jsonObject1.put("time", date);
+ MQService mqService = (MQService) SpringContextUtil.getBean("mQService");
+ mqService.sendMQ(exchange, jsonObject1.toString());
+ }
+
+ /**
+ * 鎸囦护涓嬪彂
+ *
+ * @param json
+ * @param topic
+ * @param userId
+ * @param bizId
+ * @return
+ */
+ @Override
+ public int doPublish(JSONObject json, String topic, String userId, String bizId) {
+ try {
+ MqttMessage message = new MqttMessage(json.toString().getBytes());
+ message.setQos(2);
+ message.setRetained(false);
+
+ //鏌ヨ涓婚灞炰簬鍝釜浠g悊
+ String topic_up = "";
+ if (topic != null && !topic.equals("")) {
+ topic_up = topic.replace("_DOWN", "_UP");
+ }
+ String host = mqttConfigService.getEmqxHost4TCP(topic_up);
+
+ System.out.println("鎸囦护涓嬪彂锛" + host);
+
+ //杩炴帴
+ mqttClient = connect(host, "gy_publish_8080", mqttProperties.getUsername(), mqttProperties.getPassword());
+ //鍙戝竷
+ mqttClient.publish(topic, message);
+ //鏂紑杩炴帴
+ mqttClient.disconnect();
+ } catch (MqttPersistenceException e) {
+ e.printStackTrace();
+ } catch (MqttException e) {
+ e.printStackTrace();
+ }
+ return 0;
+ }
+
+ /**
+ * 鍙洖缃戝叧鍏ㄩ儴鏁版嵁
+ *
+ * @param bizId
+ * @param jsonObject
+ */
+ @Override
+ public void doRecall(String bizId, JSONObject jsonObject) {
+ try {
+ MqttMessage message = new MqttMessage(jsonObject.toString().getBytes());
+ message.setQos(2);
+ message.setRetained(false);
+
+ //鏌ヨ涓婚灞炰簬鍝釜浠g悊
+ String topic_up = bizId + "_01_UP";
+
+ String host = mqttConfigService.getEmqxHost4TCP(topic_up);
+
+ System.out.println(CommUtil.nowDate() + "鏁版嵁鍙洖-鎵цRecall锛" + bizId + "_01_RECALL" + "------" + host);
+
+ //杩炴帴
+ mqttClient = connect(host, "gy_recall_8080", mqttProperties.getUsername(), mqttProperties.getPassword());
+ //鍙戝竷涓婚
+ mqttClient.publish(bizId + "_01_RECALL", message);
+ //鏂紑杩炴帴
+ mqttClient.disconnect();
+ } catch (MqttException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 灏嗗鐞嗗悗鐨刴qtt娑堟伅浼犲埌vue鍓嶇
+ */
+ @Async
+ public void doSendMqttVue(List list, String topic) {
+ JSONArray jsonVue = (JSONArray) JSONArray.toJSON(list);
+ MqttMessage message = new MqttMessage(jsonVue.toString().getBytes());
+ message.setQos(0);
+ message.setRetained(false);
+ try {
+ if (mqttClient == null) {
+ System.out.println("鎺夌嚎_sendMqttVue");
+ mqttClient = connect(mqttProperties.getBrokerAddress(), "gy_sendMqttVue" + CommUtil.getUUID(), mqttProperties.getUsername(), mqttProperties.getPassword());
+ } else {
+// System.out.println("姝e父_sendMqttVue");
+ }
+ String topic_vue = topic;
+ if (topic != null && topic.contains("_IM")) {
+ topic_vue = topic.replace("_IM", "");
+ }
+ topic_vue = topic_vue + "_VUE";
+ mqttClient.publish(topic_vue, message);
+ } catch (MqttException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 杩斿洖绮惧害
+ *
+ * @param num
+ * @return
+ */
+ public int doNumtail(String num) {
+ int numtail = 0;
+ if (num != null && !num.trim().equals("")) {
+ //0.00杩欑鍚庨潰涓嶇敤 鏆傛椂鍐欐 浠ュ悗鏍囧噯鍏ㄩ儴鐢ㄦ暟瀛
+ if (num.contains(".")) {
+ if (num.equals("0.0")) {
+ numtail = 1;
+ }
+ if (num.equals("0.00")) {
+ numtail = 2;
+ }
+ if (num.equals("0.000")) {
+ numtail = 3;
+ }
+ if (num.equals("0.0000")) {
+ numtail = 4;
+ }
+ } else {
+ numtail = Integer.parseInt(num);
+ }
+ } else {
+ numtail = 0;
+ }
+ return numtail;
+ }
+
+ /**
+ * 浼樺厛浠庣紦瀛樻煡璇Point瀵硅薄锛屼笉瀛樺湪鍒欏幓es涓煡璇
+ *
+ * @param key
+ * @return
+ */
+ private MPoint getMPointCacheOrES(String unitId, String key) {
+ // 鏌ヨ缂撳瓨鏄惁瀛樺湪
+ if (mPointCache.containsKey(key)) {
+ MPoint mp = mPointCache.get(key);
+// loggger.debug("缂撳瓨鍛戒腑: key={} id={}", key, mp.getId());
+ return mp;
+ }
+ // 鐐逛綅涓嶅瓨鍦 锛堟柊鍔犵殑鐐逛綅闇瑕佸畾鏃舵竻鐞嗙紦瀛樼殑鏃跺欐墠浼氳繘鍏ョ紦瀛橈紝涓嶇劧棰戠箒鍘绘煡璇s锛
+ if (notFoundKeys.contains(key)) {
+// loggger.debug("宸叉爣璁版湭鎵惧埌锛岃烦杩嘐S鏌ヨ: key={}", key);
+ return null;
+ }
+ // 浠嶦S鏌ヨ
+ MPoint mPoint = mPointService.selectById(unitId, key);
+ if (mPoint != null) {
+ //娣诲姞缂撳瓨
+ mPointCache.put(key, mPoint);
+// loggger.debug("ES鏌ヨ鎴愬姛骞剁紦瀛: key={} id={}", key, mPoint.getId());
+ } else {
+ //娌℃湁璇ョ偣浣
+ notFoundKeys.add(key);
+// loggger.warn("ES鏈壘鍒扮偣浣嶏紝鍔犲叆鏈壘鍒伴泦鍚: key={}", key);
+ }
+ return mPoint;
+ }
+
+}
diff --git a/src/main/java/com/sipai/service/opc/InitOpcUaService.java b/src/main/java/com/sipai/service/opc/InitOpcUaService.java
new file mode 100644
index 0000000..9d24bf7
--- /dev/null
+++ b/src/main/java/com/sipai/service/opc/InitOpcUaService.java
@@ -0,0 +1,134 @@
+package com.sipai.service.opc;
+
+import com.sipai.entity.scada.MPoint;
+import com.sipai.service.opc.OpcUaService;
+import com.sipai.service.scada.MPointService;
+import com.sipai.tools.CommString;
+import com.sipai.tools.CommUtil;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.sort.SortBuilders;
+import org.redisson.api.RBatch;
+import org.redisson.api.RedissonClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.domain.PageRequest;
+import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
+/**
+ * 鍒濆鍖栬闃呮暟鎹
+ */
+@Service
+public class InitOpcUaService {
+ @Autowired(required = false)
+ private OpcUaService opcUaService;
+ @Autowired
+ private MPointService mPointService;
+
+ // 鏈湴缂撳瓨
+ private final Cache mPointCache = Caffeine.newBuilder()
+ .maximumSize(10000) // 浠呰缃渶澶х紦瀛樻暟閲
+// .expireAfterWrite(1, TimeUnit.HOURS) // 1灏忔椂鍚庤繃鏈
+ .build();
+
+ // 宸茶闃呯殑鐐逛綅闆嗗悎
+ private Set subscribedNodeIds = new HashSet<>();
+
+ @PostConstruct
+ public void init() {
+ if (opcUaService == null) {
+ System.out.println("OPC UA 鏈惎鍔紝璇锋鏌ml閰嶇疆鏂囦欢閽燂細opc: enabled: true");
+ return;
+ }
+ try {
+ //鏌ヨ鎵鏈夌殑opcua閲囬泦鐐逛綅
+ List nodeIdList = fetchNodeIdsFromEs();
+ for (String nodeId : nodeIdList) {
+ MPoint mPoint = mPointCache.get(nodeId, key -> mPointService.selectById("",nodeId));
+ System.out.println(mPoint);
+ Consumer callback = value -> {
+ if (mPoint != null) {
+ mPointService.sendKafka4OpcUa(mPoint.getId(), value, CommUtil.nowDate());
+ }
+ };
+ opcUaService.subscribeToNode(nodeId, callback);
+ subscribedNodeIds.add(nodeId); // 璁板綍宸茶闃呯殑鐐逛綅
+ }
+ System.out.println("Subscribed to " + nodeIdList.size() + " nodes");
+ } catch (Exception e) {
+ System.err.println("Failed to subscribe: " + e.getMessage());
+ }
+ }
+
+ /**
+ * 鍚屾涓娆pcua璁㈤槄鐐逛綅
+ */
+ public void manualSyncSubscriptions() {
+ try {
+ List currentNodeIds = fetchNodeIdsFromEs(); // 鑾峰彇褰撳墠 Elasticsearch 涓殑鐐逛綅淇℃伅
+ Set newSubscribedNodeIds = new HashSet<>();
+
+ // 澶勭悊鏂板鐐逛綅
+ for (String nodeId : currentNodeIds) {
+ if (!subscribedNodeIds.contains(nodeId)) { // 鏂板鐐逛綅
+ MPoint mPoint = mPointCache.get(nodeId, key -> mPointService.selectById("",nodeId));
+ System.out.println(mPoint);
+ Consumer callback = value -> {
+ if (mPoint != null) {
+ mPointService.sendKafka4OpcUa(mPoint.getId(), value, CommUtil.nowDate());
+ }
+ };
+ opcUaService.subscribeToNode(nodeId, callback);
+ newSubscribedNodeIds.add(nodeId);
+ } else {
+ newSubscribedNodeIds.add(nodeId); // 淇濈暀宸茶闃呯殑鐐逛綅
+ }
+ }
+
+ // 澶勭悊鍒犻櫎鐐逛綅
+ for (String nodeId : subscribedNodeIds) {
+ if (!currentNodeIds.contains(nodeId)) { // 鍒犻櫎鐐逛綅
+ opcUaService.unsubscribeFromNode(nodeId);
+ }
+ }
+
+ // 鏇存柊宸茶闃呯偣浣嶉泦鍚
+ subscribedNodeIds = newSubscribedNodeIds;
+
+ System.out.println("杩涜涓娆pcua璁㈤槄鍚屾");
+ } catch (Exception e) {
+ System.err.println("opcua璁㈤槄鍚屾澶辫触: " + e.getMessage());
+ }
+ }
+
+ /**
+ * 鏌ヨ鎵鏈夌殑opcua閲囬泦鐐逛綅
+ *
+ * @return
+ */
+ private List fetchNodeIdsFromEs() {
+ NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
+ BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
+ BoolQueryBuilder childBoolQueryBuilder = QueryBuilders.boolQuery();
+ childBoolQueryBuilder.must(QueryBuilders.matchPhraseQuery("biztype", "opcua"));
+ boolQueryBuilder.should(childBoolQueryBuilder);
+ nativeSearchQueryBuilder.withQuery(boolQueryBuilder);
+ nativeSearchQueryBuilder.withPageable(PageRequest.of(0, 10000));
+ List list_mps = this.mPointService.selectListByWhere4Es(nativeSearchQueryBuilder);
+ List nodeIdList = new ArrayList<>();
+ for (MPoint mPoint : list_mps) {
+ nodeIdList.add(mPoint.getMpointid());
+ }
+ return nodeIdList;
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/sipai/service/opc/OpcUaService.java b/src/main/java/com/sipai/service/opc/OpcUaService.java
new file mode 100644
index 0000000..457b2f6
--- /dev/null
+++ b/src/main/java/com/sipai/service/opc/OpcUaService.java
@@ -0,0 +1,277 @@
+package com.sipai.service.opc;
+
+import com.sipai.config.OpcUaProperties;
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
+import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
+import org.eclipse.milo.opcua.sdk.client.nodes.UaVariableNode;
+import org.eclipse.milo.opcua.stack.core.AttributeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
+import org.eclipse.milo.opcua.stack.core.types.structured.*;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
+import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+@Service
+public class OpcUaService {
+
+ @Autowired(required = false)
+ private final OpcUaClient opcUaClient;
+ private final OpcUaProperties opcUaProperties;
+
+ @Autowired
+ public OpcUaService(@Autowired(required = false) OpcUaClient opcUaClient, OpcUaProperties opcUaProperties) {
+ this.opcUaClient = opcUaClient;
+ this.opcUaProperties = opcUaProperties;
+ }
+
+ /*public void printConfig() {
+ System.out.println("OPC UA Server URL: " + opcUaProperties.getServerUrl());
+ System.out.println("Security Policy: " + opcUaProperties.getSecurityPolicy());
+ System.out.println("Security Mode: " + opcUaProperties.getSecurityMode());
+ }*/
+
+ /**
+ * 璇诲彇鍗曚釜鐐逛綅鐨勫
+ *
+ * @param nodeId
+ * @return
+ * @throws ExecutionException
+ * @throws InterruptedException
+ */
+ public Object readNodeValue(String nodeId) throws ExecutionException, InterruptedException {
+ if (opcUaClient == null) {
+ throw new IllegalStateException("OPC UA 鏈惎鍔紝璇锋鏌ml閰嶇疆鏂囦欢閽燂細opc: enabled: true");
+ }
+ NodeId node = NodeId.parse(nodeId);
+ DataValue dataValue = opcUaClient.readValue(0, TimestampsToReturn.Both, node).get();
+ Variant variant = dataValue.getValue();
+ return variant.getValue();
+ }
+
+ /**
+ * 鍐欏叆鍗曚釜鐐逛綅鐨勫
+ *
+ * @param nodeId 鑺傜偣ID
+ * @param value 瑕佸啓鍏ョ殑鍊
+ * @return 鍐欏叆鏄惁鎴愬姛
+ * @throws ExecutionException
+ * @throws InterruptedException
+ */
+ public boolean writeNodeValue(String nodeId, Object value) {
+ try {
+ NodeId node = NodeId.parse(nodeId);
+
+ // 璇诲彇鑺傜偣鐨勬暟鎹被鍨嬶紙浣跨敤 UaVariableNode 鏇夸唬 VariableNode锛
+ UaVariableNode variableNode = (UaVariableNode) opcUaClient.getAddressSpace().getVariableNode(node);
+ if (variableNode == null) {
+ throw new RuntimeException("鏃犳硶鎵惧埌鑺傜偣: " + nodeId);
+ }
+
+ NodeId dataTypeId = variableNode.getDataType();
+ System.out.println(dataTypeId);
+
+ if (dataTypeId == null) {
+ throw new RuntimeException("鏃犳硶鑾峰彇鑺傜偣鐨勬暟鎹被鍨");
+ }
+
+ // 鏍规嵁鏁版嵁绫诲瀷鍒涘缓瀵瑰簲鐨 Variant
+ Variant variant = createVariantForDataType(value, dataTypeId);
+
+ DataValue dataValue = DataValue.valueOnly(variant);
+ StatusCode statusCode = opcUaClient.writeValue(node, dataValue).get();
+ System.out.println(statusCode);
+ return statusCode.isGood();
+ } catch (Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ /**
+ * typeId璺熸爣鍑嗚鑼冧笉涓鑷达紝鐩墠鐢 KEPServerEX6鑷甫鐨凮PCUA鏈嶅姟 娴嬭瘯鐨勭粨鏋
+ *
+ * @param value
+ * @param dataTypeId
+ * @return
+ */
+ private Variant createVariantForDataType(Object value, NodeId dataTypeId) {
+ int typeId = ((Number) dataTypeId.getIdentifier()).intValue();
+ // 鏍囧噯绫诲瀷澶勭悊
+ switch (typeId) {
+ case 1:
+ return new Variant(convertToBoolean(value));
+ case 4: // Int16 (鏈夌鍙穝hort)
+ short shortValue = (value instanceof Number) ?
+ ((Number) value).shortValue() :
+ Short.parseShort(value.toString());
+ return new Variant(shortValue);
+ case 5: // UInt16/Word (鏃犵鍙穝hort)
+ int intValue = (value instanceof Number) ?
+ ((Number) value).intValue() :
+ Integer.parseInt(value.toString());
+ if (intValue < 0 || intValue > 65535) {
+ throw new IllegalArgumentException("UInt16 鍊煎繀椤诲湪 0~65535 涔嬮棿");
+ }
+ return new Variant(Unsigned.ushort(intValue));
+ case 6: // Int32
+ return new Variant(convertToInt(value));
+ case 10: //Float
+ return new Variant(convertToFloat(value));
+ case 11: // Double
+ return new Variant(convertToDouble(value));
+ default:
+ return new Variant(value.toString());
+ }
+ }
+
+ // 绫诲瀷杞崲杈呭姪鏂规硶锛堜繚鎸佷笉鍙橈級
+ private boolean convertToBoolean(Object value) {
+ if (value instanceof Boolean) return (Boolean) value;
+ String strVal = value.toString().trim().toLowerCase();
+ return strVal.equals("true") || strVal.equals("1");
+ }
+
+ private float convertToFloat(Object value) {
+ if (value instanceof Number) return ((Number) value).floatValue();
+ return Float.parseFloat(value.toString());
+ }
+
+ private double convertToDouble(Object value) {
+ if (value instanceof Number) return ((Number) value).doubleValue();
+ return Double.parseDouble(value.toString());
+ }
+
+ private int convertToInt(Object value) {
+ if (value instanceof Number) return ((Number) value).intValue();
+ return Integer.parseInt(value.toString());
+ }
+
+ private long convertToUInt(Object value) {
+ if (value instanceof Number) return ((Number) value).longValue();
+ return Long.parseLong(value.toString());
+ }
+
+ // 璁㈤槄鐐逛綅
+ public void subscribeToNode(String nodeId, Consumer callback) throws Exception {
+ if (opcUaClient == null) {
+ throw new IllegalStateException("OPC UA 鏈惎鍔紝璇锋鏌ml閰嶇疆鏂囦欢閽燂細opc: enabled: true");
+ }
+ try {
+ NodeId node = NodeId.parse(nodeId);
+
+ UaSubscription subscription = opcUaClient
+ .getSubscriptionManager()
+ .createSubscription(1000.0)
+ .get();
+
+ MonitoringParameters parameters = new MonitoringParameters(
+ Unsigned.uint(1),
+ 1000.0,
+ null,
+ Unsigned.uint(10),
+ true
+ );
+
+ MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(
+ new ReadValueId(
+ node,
+ AttributeId.Value.uid(),
+ null,
+ null
+ ),
+ MonitoringMode.Reporting,
+ parameters
+ );
+
+ subscription.createMonitoredItems(
+ TimestampsToReturn.Both,
+ Collections.singletonList(request),
+ new UaSubscription.ItemCreationCallback() {
+ @Override
+ public void onItemCreated(UaMonitoredItem item, int i) {
+ // 璁剧疆鍊肩殑鍥炶皟澶勭悊鍣
+ item.setValueConsumer(value -> {
+ Variant variant = value.getValue();
+ Object rawValue = variant.getValue();
+
+ if (rawValue instanceof Number) {
+ callback.accept(((Number) rawValue).doubleValue()); // 杞负 Double
+ } else if (rawValue instanceof Boolean) {
+ callback.accept(((Boolean) rawValue) ? 1.0 : 0.0); // 甯冨皵杞0 1
+ } else {
+ throw new ClassCastException("Expected Number or Boolean but received: " +
+ rawValue.getClass().getSimpleName());
+ }
+ });
+ }
+ }
+ ).get();
+
+ } catch (Exception e) {
+ throw new Exception("璁㈤槄澶辫触: " + e.getMessage(), e);
+ }
+ }
+
+ // 鍙栨秷璁㈤槄鐐逛綅
+ public boolean unsubscribeFromNode(String nodeId) {
+ if (opcUaClient == null) {
+ throw new IllegalStateException("OPC UA 鏈惎鍔紝璇锋鏌ml閰嶇疆鏂囦欢閽燂細opc: enabled: true");
+ }
+ try {
+ NodeId node = NodeId.parse(nodeId);
+
+ // 鑾峰彇鎵鏈夎闃
+ List subscriptions = opcUaClient.getSubscriptionManager().getSubscriptions();
+
+ for (UaSubscription subscription : subscriptions) {
+ // 鑾峰彇鎵鏈夌洃鎺ч」
+ List monitoredItems = subscription.getMonitoredItems();
+
+ for (UaMonitoredItem monitoredItem : monitoredItems) {
+ if (monitoredItem.getReadValueId().getNodeId().equals(node)) {
+ // 鍒犻櫎鐩戞帶椤
+ CompletableFuture> deleteFuture = subscription.deleteMonitoredItems(
+ Collections.singletonList(monitoredItem)
+ );
+
+ // 绛夊緟鍒犻櫎鎿嶄綔瀹屾垚
+ List statusCodes = deleteFuture.get(5, TimeUnit.SECONDS);
+
+ // 妫鏌ュ垹闄ょ粨鏋
+ for (StatusCode statusCode : statusCodes) {
+ if (!statusCode.isGood()) {
+ System.err.println("鍒犻櫎鐩戞帶椤瑰け璐: " + statusCode);
+ return false;
+ }
+ }
+
+ System.out.println("鎴愬姛鍙栨秷璁㈤槄鑺傜偣: " + nodeId);
+ return true;
+ }
+ }
+ }
+
+ System.out.println("鏈壘鍒拌妭鐐: " + nodeId);
+ return false;
+ } catch (Exception e) {
+ System.err.println("鍙栨秷璁㈤槄澶辫触: " + e.getMessage());
+ return false;
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/sipai/service/rabbitmq/MQService.java b/src/main/java/com/sipai/service/rabbitmq/MQService.java
new file mode 100644
index 0000000..650f742
--- /dev/null
+++ b/src/main/java/com/sipai/service/rabbitmq/MQService.java
@@ -0,0 +1,5 @@
+package com.sipai.service.rabbitmq;
+
+public interface MQService {
+ void sendMQ(String exchange, String msg);
+}
diff --git a/src/main/java/com/sipai/service/rabbitmq/impl/MQServiceImpl.java b/src/main/java/com/sipai/service/rabbitmq/impl/MQServiceImpl.java
new file mode 100644
index 0000000..22327f4
--- /dev/null
+++ b/src/main/java/com/sipai/service/rabbitmq/impl/MQServiceImpl.java
@@ -0,0 +1,23 @@
+package com.sipai.service.rabbitmq.impl;
+
+import com.sipai.service.rabbitmq.MQService;
+import com.sipai.tools.ConstantString;
+import org.springframework.amqp.core.AmqpTemplate;
+import org.springframework.context.annotation.Lazy;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+
+@Service("mQService")
+public class MQServiceImpl implements MQService {
+ @Resource
+ @Lazy
+ private AmqpTemplate amqpTemplate;
+
+ @Override
+ public void sendMQ(String exchange,String msg) {
+// String exchange = ConstantString.MQ_ALARM_MQTT;
+ String key = ConstantString.KEY_ALARM_MSG;
+ this.amqpTemplate.convertAndSend(exchange,key,msg);
+ }
+}
diff --git a/src/main/java/com/sipai/service/scada/MPointBzwService.java b/src/main/java/com/sipai/service/scada/MPointBzwService.java
new file mode 100644
index 0000000..ad26385
--- /dev/null
+++ b/src/main/java/com/sipai/service/scada/MPointBzwService.java
@@ -0,0 +1,21 @@
+package com.sipai.service.scada;
+
+import com.sipai.entity.scada.MPointBzw;
+
+import java.util.List;
+
+public interface MPointBzwService {
+
+ public abstract MPointBzw selectByPrimaryKey(String id);
+
+ public abstract Integer deleteByPrimaryKey(String id);
+
+ public abstract Integer insert(MPointBzw entity);
+
+ public abstract Integer updateByPrimaryKeySelective(MPointBzw entity);
+
+ public abstract List selectListByWhere(String wherestr);
+
+ public abstract Integer deleteByWhere(String wherestr);
+
+}
diff --git a/src/main/java/com/sipai/service/scada/MPointHistoryService.java b/src/main/java/com/sipai/service/scada/MPointHistoryService.java
new file mode 100644
index 0000000..8cf94c1
--- /dev/null
+++ b/src/main/java/com/sipai/service/scada/MPointHistoryService.java
@@ -0,0 +1,17 @@
+package com.sipai.service.scada;
+
+import com.sipai.entity.scada.MPointHistory;
+
+import java.util.List;
+
+public interface MPointHistoryService {
+ int save(String bizId, MPointHistory mPointHistory);
+
+ List selectListByTableAWhere(String bizId, String table, String wherestr);
+
+ int deleteByTableAWhere(String bizId,String table ,String wherestr);
+
+ int saveByCreate(String bizId, MPointHistory mPointHistory);
+
+ List