selectListByPid(String pid);
+}
diff --git a/src/main/java/com/sipai/service/mqtt/MqttService.java b/src/main/java/com/sipai/service/mqtt/MqttService.java
new file mode 100644
index 0000000..fc9c722
--- /dev/null
+++ b/src/main/java/com/sipai/service/mqtt/MqttService.java
@@ -0,0 +1,57 @@
+package com.sipai.service.mqtt;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import org.springframework.scheduling.annotation.Scheduled;
+
+public interface MqttService {
+ /**
+ * 用于提供云平台 给plc发布指令接口
+ * @param json
+ * @param topic
+ * @param userId
+ * @param bizId
+ * @return
+ */
+ public abstract int doPublish(JSONObject json, String topic, String userId, String bizId);
+
+ /**
+ * 处理mqtt订阅到的数据
+ * @param bizId
+ * @param topic
+ * @param ip4
+ * @param port
+ * @param jsonArray
+ */
+ public abstract void doHandle(String bizId, String topic, String ip4, String port, JSONArray jsonArray);
+
+ /**
+ * 处理mqtt订阅到的数据 -- (补发的数据 1.不报警 2.不更新主表 3.AI点3分钟存一次 DI点每次都存)
+ * @param bizId
+ * @param topic
+ * @param ip4
+ * @param port
+ * @param jsonArray
+ */
+ public abstract void doHandleHis(String bizId, String topic, String ip4, String port, JSONArray jsonArray);
+
+ /**
+ * 网关数据全部召回
+ * @param bizId
+ * @return
+ */
+ public abstract void doRecall(String bizId,JSONObject jsonObject);
+
+ /**
+ * 发送 rabbitmq
+ *
+ * @return
+ */
+ public abstract void sentRabbitmq(String exchange, String key, String value_BigDecimal, String date);
+
+ /**
+ * 清理测量点对象缓存
+ */
+ @Scheduled(fixedRate = 3600000) // 每小时清理一次
+ void cleanCache();
+}
diff --git a/src/main/java/com/sipai/service/mqtt/PushCallback.java b/src/main/java/com/sipai/service/mqtt/PushCallback.java
new file mode 100644
index 0000000..d5c07ab
--- /dev/null
+++ b/src/main/java/com/sipai/service/mqtt/PushCallback.java
@@ -0,0 +1,229 @@
+package com.sipai.service.mqtt;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.sipai.entity.user.Company;
+import com.sipai.service.rabbitmq.MQService;
+import com.sipai.service.user.CompanyService;
+import com.sipai.tools.CommUtil;
+import com.sipai.tools.SpringContextUtil;
+import org.eclipse.paho.client.mqttv3.*;
+
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.Query;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.lang.management.ManagementFactory;
+import java.math.BigDecimal;
+import java.net.*;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * 发布消息的回调类
+ *
+ * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
+ * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。
+ * 在回调中,将它用来标识已经启动了该回调的哪个实例。
+ * 必须在回调类中实现三个方法:
+ *
+ * public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。
+ *
+ * public void connectionLost(Throwable cause)在断开连接时调用。
+ *
+ * public void deliveryComplete(MqttDeliveryToken token))
+ * 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
+ * 由 MqttClient.connect 激活此回调。
+ */
+
+public class PushCallback implements MqttCallbackExtended {
+
+ InetAddress ip4;//ip
+
+ {
+ try {
+ ip4 = Inet4Address.getLocalHost();
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ }
+ }
+
+ MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer();
+ Set objectNames;
+ String port = "";//端口
+
+ {
+ try {
+ objectNames = beanServer.queryNames(new ObjectName("*:type=Connector,*"),
+ Query.match(Query.attr("protocol"), Query.value("HTTP/1.1")));
+// port = objectNames.iterator().next().getKeyProperty("port");
+ port = "8080";
+ } catch (MalformedObjectNameException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private MqttClient client;
+ private MqttConnectOptions options;
+ private String[] topic;
+ private int[] qos;
+
+ public PushCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) {
+ this.client = client;
+ this.options = options;
+ this.topic = topic;
+ this.qos = qos;
+ }
+
+ /**
+ * 连接断开回调方法
+ */
+ public void connectionLost(Throwable cause) {
+ System.out.println("===连接断开回调方法");
+ try {
+ if (null != client && !client.isConnected()) {
+ System.out.println("尝试重新连接");
+ client.connect(options);
+ } else {
+ System.out.println("尝试建立新连接");
+ client.connect(options);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 连接成功回调方法
+ */
+ @Override
+ public void connectComplete(boolean reconnect, String serverURI) {
+ System.out.println("===连接成功回调方法");
+ try {
+ if (null != topic && null != qos) {
+ if (client.isConnected()) {
+ client.subscribe(topic, qos);
+ System.out.println("mqtt连接成功");
+ } else {
+ System.out.println("mqtt连接失败");
+ }
+ }
+ } catch (Exception e) {
+ System.out.println("mqtt订阅主题异常:" + e);
+ }
+ }
+
+ public void deliveryComplete(IMqttDeliveryToken token) {
+// System.out.println("deliveryComplete---------" + token.isComplete());
+ }
+
+ public void messageArrived(String topic, MqttMessage message) {
+ try {
+ JSONArray jsonArray = JSONArray.parseArray(message.toString());
+// System.out.println(topic + "===开始===" + CommUtil.nowDate() + "===" + jsonArray.size());
+ String unitId = topic.substring(0, 4);//截取topic的前面部分作为厂id
+ MqttService mqttService = (MqttService) SpringContextUtil.getBean("mqttService");
+ //正常的主题 -- 数据处理
+ mqttService.doHandle(unitId, topic, ip4.toString(), port, jsonArray);
+ } catch (Exception e) {
+ System.out.println(topic + "执行失败" + e);
+ }
+ }
+
+
+ /**
+ * 日期格式字符串转换成时间戳
+ *
+ * @param date_str 字符串日期
+ * @param format 如:yyyy-MM-dd HH:mm:ss
+ * @return
+ */
+ public static String date2TimeStamp(String date_str, String format) {
+ try {
+ SimpleDateFormat sdf = new SimpleDateFormat(format);
+ return String.valueOf(sdf.parse(date_str).getTime() / 1000);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return "";
+ }
+
+ /**
+ * 发送 rabbitmq
+ *
+ * @param key
+ * @param value_BigDecimal
+ * @param date
+ */
+ public static void sentRabbitmq(String exchange, String key, String value_BigDecimal, String date) {
+ JSONObject jsonObject1 = new JSONObject();
+ jsonObject1.put("id", key);
+ jsonObject1.put("value", value_BigDecimal);
+ jsonObject1.put("time", date);
+ MQService mqService = (MQService) SpringContextUtil.getBean("mQService");
+ mqService.sendMQ(exchange, jsonObject1.toString());
+ }
+
+ public static void sentRabbitMqMpoint(String exchange, String json) {
+ MQService mqService = (MQService) SpringContextUtil.getBean("mQService");
+ mqService.sendMQ(exchange, json);
+ }
+
+ public String doPost(String URL) {
+ OutputStreamWriter out = null;
+ BufferedReader in = null;
+ StringBuilder result = new StringBuilder();
+ HttpURLConnection conn = null;
+ try {
+ java.net.URL url = new URL(URL);
+ conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod("POST");
+ //发送POST请求必须设置为true
+ conn.setDoOutput(true);
+ conn.setDoInput(true);
+ //设置连接超时时间和读取超时时间
+ conn.setConnectTimeout(30000);
+ conn.setReadTimeout(10000);
+ conn.setRequestProperty("Content-Type", "application/json");
+ conn.setRequestProperty("Accept", "application/json");
+ //获取输出流
+ out = new OutputStreamWriter(conn.getOutputStream());
+ String jsonStr = "{\"qry_by\":\"name\", \"name\":\"Tim\"}";
+ out.write(jsonStr);
+ out.flush();
+ out.close();
+ //取得输入流,并使用Reader读取
+ if (200 == conn.getResponseCode()) {
+ in = new BufferedReader(new InputStreamReader(conn.getInputStream(), "UTF-8"));
+ String line;
+ while ((line = in.readLine()) != null) {
+ result.append(line);
+ }
+ } else {
+ System.out.println("ResponseCode is an error code:" + conn.getResponseCode());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ if (out != null) {
+ out.close();
+ }
+ if (in != null) {
+ in.close();
+ }
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ }
+ }
+ return result.toString();
+ }
+
+}
diff --git a/src/main/java/com/sipai/service/mqtt/impl/MqttConfigServiceImpl.java b/src/main/java/com/sipai/service/mqtt/impl/MqttConfigServiceImpl.java
new file mode 100644
index 0000000..57016ea
--- /dev/null
+++ b/src/main/java/com/sipai/service/mqtt/impl/MqttConfigServiceImpl.java
@@ -0,0 +1,348 @@
+package com.sipai.service.mqtt.impl;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.sipai.entity.mqtt.Mqtt;
+import com.sipai.entity.mqtt.MqttProperties;
+import com.sipai.entity.scada.MPoint;
+import com.sipai.service.mqtt.PushCallback;
+import com.sipai.dao.mqtt.MqttConfigDao;
+import com.sipai.entity.mqtt.MqttConfig;
+import com.sipai.entity.mqtt.MqttConfigTopic;
+import com.sipai.service.mqtt.MqttConfigService;
+import com.sipai.service.mqtt.MqttConfigTopicService;
+import com.sipai.tools.CommUtil;
+import org.apache.commons.codec.binary.Base64;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Service;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Created with IntelliJ IDEA.
+ *
+ * @Auther: sj
+ * @Date: 2021/03/24/16:45
+ * @Description:
+ */
+@Service
+public class MqttConfigServiceImpl implements MqttConfigService {
+ private final MqttProperties mqttProperties;
+ @Autowired
+ private MqttConfigDao mqttConfigDao;
+ @Autowired
+ private MqttConfigTopicService mqttConfigTopicService;
+
+ private static MqttClient mqttClient1;
+
+ // 构造器注入
+ @Autowired
+ public MqttConfigServiceImpl(MqttProperties mqttProperties) {
+ this.mqttProperties = mqttProperties;
+ }
+
+ private static MqttClient connect(String brokeraddress, String clientId, String userName, String password) throws MqttException {
+ MemoryPersistence persistence = new MemoryPersistence();
+ MqttConnectOptions connOpts = new MqttConnectOptions();
+ connOpts.setCleanSession(true);
+ connOpts.setUserName(userName);
+ connOpts.setPassword(password.toCharArray());
+ connOpts.setConnectionTimeout(1000);
+ connOpts.setKeepAliveInterval(1000);
+ mqttClient1 = new MqttClient(brokeraddress, clientId, persistence);
+ mqttClient1.connect(connOpts);
+ return mqttClient1;
+ }
+
+ @Override
+ public MqttConfig selectByPrimaryKey(String id) {
+ return mqttConfigDao.selectByPrimaryKey(id);
+ }
+
+ @Override
+ public Integer deleteByPrimaryKey(String id) {
+ return mqttConfigDao.deleteByPrimaryKey(id);
+ }
+
+ @Override
+ public Integer insert(MqttConfig entity) {
+ return mqttConfigDao.insert(entity);
+ }
+
+ @Override
+ public Integer updateByPrimaryKeySelective(MqttConfig entity) {
+ return mqttConfigDao.insert(entity);
+ }
+
+ @Override
+ public List selectListByWhere(String whereStr) {
+ MqttConfig entity = new MqttConfig();
+ entity.setWhere(whereStr);
+ return mqttConfigDao.selectListByWhere(entity);
+ }
+
+ @Override
+ public Integer deleteByWhere(String whereStr) {
+ return mqttConfigDao.deleteByWhere(whereStr);
+ }
+
+ @Override
+ public String connect(String id) {
+ String clientId = "";
+ MqttConfig mqttConfig = mqttConfigDao.selectByPrimaryKey(id);
+ if (mqttConfig != null) {
+ clientId = mqttConfig.getTomcatPort() + "-" + CommUtil.getUUID();
+ System.out.println(clientId);
+ String sql = "where id = '" + mqttConfig.getId() + "'";
+ mqttConfig.setWhere(sql);
+ List list_config = mqttConfigDao.selectListByWhere(mqttConfig);
+ List topic_str = new ArrayList<>();
+
+ //每个客户端根据配置的tomcat端口号进行分区连接
+ if (list_config != null && list_config.size() > 0) {
+ //循环配置主表
+ for (int i = 0; i < list_config.size(); i++) {
+ String sql_detail = "where pid = '" + list_config.get(i).getId() + "'";
+ List list_topic = mqttConfigTopicService.selectListByWhere(sql_detail);
+ if (list_topic != null && list_topic.size() > 0) {
+ //循环配置附表(获取所有topic)
+ for (int j = 0; j < list_topic.size(); j++) {
+ //配置的_UP主题
+// topic_str.add("$queue/" + list_topic.get(j).getTopic());
+ topic_str.add(list_topic.get(j).getTopic());
+ }
+ }
+ }
+ }
+
+ //配置的条数即为数组的长度
+ String[] topic = new String[topic_str.size()];
+ System.out.println("订阅主题:" + topic_str);
+ int[] qos = new int[topic_str.size()];
+ for (int j = 0; j < topic_str.size(); j++) {
+ topic[j] = topic_str.get(j);
+ qos[j] = 0;
+ }
+ try {
+ MqttConnectOptions connOpts = new MqttConnectOptions();
+ connOpts.setCleanSession(true);
+ connOpts.setConnectionTimeout(1000);
+ //每隔1.5*10秒的时间向客户端发送个消息判断客户端是否在线
+ connOpts.setKeepAliveInterval(1000);
+ //自动重连
+ connOpts.setAutomaticReconnect(true);
+ connOpts.setUserName(mqttProperties.getUsername());
+ connOpts.setPassword(mqttProperties.getPassword().toCharArray());
+
+ if (mqttClient1 != null) {
+ System.out.println("已连接");
+ //回调
+ mqttClient1.setCallback(new PushCallback(mqttClient1, connOpts, topic, qos));
+ //订阅
+ mqttClient1.subscribe(topic, qos);
+ } else {
+ System.out.println("重新连接");
+ //连接
+ mqttClient1 = connect(mqttConfig.getBrokerIp(), clientId, mqttProperties.getUsername(), mqttProperties.getPassword());
+ //回调
+ mqttClient1.setCallback(new PushCallback(mqttClient1, connOpts, topic, qos));
+ //订阅
+ mqttClient1.subscribe(topic, qos);
+ }
+
+ } catch (MqttException me) {
+ me.printStackTrace();
+ }
+ }
+ return clientId;
+ }
+
+ public Boolean clientStatus(String clientId) {
+ boolean status = true;
+ try {
+ System.out.println(mqttProperties.getDashboard01() + "/api/v4/clients/" + clientId);
+ URL url = new URL(mqttProperties.getDashboard01() + "/api/v4/clients/" + clientId);
+ Base64 b = new Base64();
+ String encoding = b.encodeAsString(new String("admin:sipai@64368180").getBytes());
+ HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+ connection.setRequestMethod("GET");
+ connection.setDoOutput(true);
+ connection.setRequestProperty("Content-Type", "application/json");
+ connection.setRequestProperty("Authorization", "Basic " + encoding);
+ InputStream content = connection.getInputStream();
+ BufferedReader in = new BufferedReader(new InputStreamReader(content));
+
+ JSONObject jsonObject = JSONObject.parseObject(in.readLine());
+ String data = jsonObject.get("data").toString();
+ JSONArray jsonArrayStr = JSONArray.parseArray(data);
+ if (jsonArrayStr.size() > 0) {
+ JSONObject jsonObject1 = jsonArrayStr.getJSONObject(0);
+ if (jsonObject1.get("connected").equals(true)) {
+ status = true;
+ } else {
+ status = false;
+ }
+ } else {
+ status = false;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return status;
+ }
+
+ @Override
+ public JSONObject getEmqxHost4WS(String topic) {
+ JSONObject jsonObject = new JSONObject();
+ List list = mqttConfigTopicService.selectListByWhere("where topic = '" + topic + "'");
+ if (list != null && list.size() > 0) {
+ MqttConfig mqttConfig = new MqttConfig();
+ //增加个循环查出所有的 有些旧数据 跳过 只查有主表数据的
+ for (int i = 0; i < list.size(); i++) {
+ mqttConfig.setWhere("where id = '" + list.get(i).getPid() + "'");
+ List list2 = mqttConfigDao.selectListByWhere(mqttConfig);
+ if (list2 != null && list2.size() > 0) {
+ if (list2.get(0).getBrokerIp() != null && !list2.get(0).getBrokerIp().trim().equals("")) {
+ String ip = list2.get(0).getBrokerIp();
+ ip = ip.replace("tcp://", "");
+ String host = ip.substring(0, ip.indexOf(":"));
+ //一期
+ if (host != null && host.equals("172.16.242.16")) {
+ jsonObject.put("host", "58.254.140.62");
+ jsonObject.put("port", "8088");
+ }
+ //二期
+ if (host != null && host.equals("172.16.242.24")) {
+ jsonObject.put("host", "58.254.140.121");
+ jsonObject.put("port", "8083");
+ }
+ }
+ break;
+ }
+ }
+ }
+ return jsonObject;
+ }
+
+ @Override
+ public String getEmqxHost4TCP(String topic) {
+ String host = mqttProperties.getBrokerAddress();//默认一期
+ List list = mqttConfigTopicService.selectListByWhere("where topic = '" + topic + "'");
+ if (list != null && list.size() > 0) {
+ MqttConfig mqttConfig = new MqttConfig();
+ //增加个循环查出所有的 有些旧数据 跳过 只查有主表数据的
+ for (int i = 0; i < list.size(); i++) {
+
+ mqttConfig.setWhere("where id = '" + list.get(i).getPid() + "'");
+ List list2 = mqttConfigDao.selectListByWhere(mqttConfig);
+ if (list2 != null && list2.size() > 0) {
+ if (list2.get(0).getBrokerIp() != null && !list2.get(0).getBrokerIp().trim().equals("")) {
+ host = list2.get(0).getBrokerIp();
+ }
+ break;
+ }
+ }
+ }
+ return host;
+ }
+
+ @Async
+ public void doSendMqttVue(List list, String topic) {
+ JSONArray jsonVue = (JSONArray) JSONArray.toJSON(list);
+ MqttMessage message = new MqttMessage(jsonVue.toString().getBytes());
+ message.setQos(0);
+ message.setRetained(false);
+ try {
+ String host = this.getEmqxHost4TCP(topic);
+ if (host != null && !host.equals("")) {
+ String[] str = host.split(";");
+ String ip = str[0];//连接的ip
+ String topic_vue = topic;
+ if (topic != null && topic.contains("_IM")) {
+ topic_vue = topic.replace("_IM", "");
+ }
+// topic_vue = topic_vue + "_VIEW";
+ topic_vue = topic_vue.substring(0, 4) + "_01_UP_VIEW";
+ int size = list != null ? list.size() : 0;
+ StringBuilder ids = new StringBuilder();
+ if (list != null) {
+ int max = Math.min(5, list.size());
+ for (int i = 0; i < max; i++) {
+ ids.append(list.get(i).getId());
+ if (i < max - 1) ids.append(",");
+ }
+ }
+ System.out.println("推送VIEW: topic=" + topic_vue + " size=" + size);
+ if (mqttClient1 != null && mqttClient1.isConnected()) {
+ //发布
+// System.out.println("推送前端:" + topic_vue);
+ mqttClient1.publish(topic_vue, message);
+ } else {
+ System.out.println("===重新连接===" + "推VUE===主题:" + topic_vue + "===" + ip + "===" + CommUtil.nowDate() + "===");
+ //连接
+ String clientId = "send_vue_" + str[2];
+ mqttClient1 = connect(ip, clientId, mqttProperties.getUsername(), mqttProperties.getPassword());
+ //发布
+ mqttClient1.publish(topic_vue, message);
+ }
+ }
+ } catch (MqttException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Async
+ public void doSendView(JSONArray jsonArray, String topic) {
+ MqttMessage message = new MqttMessage(jsonArray.toString().getBytes());
+ message.setQos(0);
+ message.setRetained(false);
+ try {
+ if (mqttClient1 != null && mqttClient1.isConnected()) {
+ //发布
+ mqttClient1.publish(topic, message);
+ } else {
+ mqttClient1 = connect(mqttProperties.getBrokerAddress(), "send_vue_" + CommUtil.getUUID(), mqttProperties.getUsername(), mqttProperties.getPassword());
+ //发布
+ mqttClient1.publish(topic, message);
+ }
+ } catch (MqttException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * @param jsonObject 需要推送的数据
+ * @param topic 推送的主题
+ */
+ @Async
+ public void doSendJson(JSONObject jsonObject, String topic) {
+ MqttMessage message = new MqttMessage(jsonObject.toString().getBytes());
+ message.setQos(0);
+ message.setRetained(false);
+ try {
+ if (mqttClient1 != null && mqttClient1.isConnected()) {
+ //发布
+ mqttClient1.publish(topic, message);
+ } else {
+ mqttClient1 = connect(mqttProperties.getBrokerAddress(), "send_vue_" + CommUtil.getUUID(), mqttProperties.getUsername(), mqttProperties.getPassword());
+ //发布
+ mqttClient1.publish(topic, message);
+ }
+ } catch (MqttException e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/src/main/java/com/sipai/service/mqtt/impl/MqttConfigTopicServiceImpl.java b/src/main/java/com/sipai/service/mqtt/impl/MqttConfigTopicServiceImpl.java
new file mode 100644
index 0000000..4c3120e
--- /dev/null
+++ b/src/main/java/com/sipai/service/mqtt/impl/MqttConfigTopicServiceImpl.java
@@ -0,0 +1,61 @@
+package com.sipai.service.mqtt.impl;
+
+import com.sipai.dao.mqtt.MqttConfigTopicDao;
+import com.sipai.entity.mqtt.MqttConfigTopic;
+import com.sipai.service.mqtt.MqttConfigTopicService;
+import com.sipai.tools.DataSourceEnum;
+import com.sipai.tools.DataSourceTypeAnno;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+/**
+ * Created with IntelliJ IDEA.
+ *
+ * @Auther: sj
+ * @Date: 2021/03/24/17:56
+ * @Description:
+ */
+@Service
+public class MqttConfigTopicServiceImpl implements MqttConfigTopicService {
+ @Autowired
+ private MqttConfigTopicDao mqttConfigTopicDao;
+
+ @Override
+ public MqttConfigTopic selectByPrimaryKey(String id) {
+ return mqttConfigTopicDao.selectByPrimaryKey(id);
+ }
+
+ @Override
+ public Integer deleteByPrimaryKey(String id) {
+ return mqttConfigTopicDao.deleteByPrimaryKey(id);
+ }
+
+ @Override
+ public Integer insert(MqttConfigTopic entity) {
+ return mqttConfigTopicDao.insert(entity);
+ }
+
+ @Override
+ public Integer updateByPrimaryKeySelective(MqttConfigTopic entity) {
+ return mqttConfigTopicDao.updateByPrimaryKeySelective(entity);
+ }
+
+ @Override
+ public List selectListByWhere(String wherestr) {
+ MqttConfigTopic entity = new MqttConfigTopic();
+ entity.setWhere(wherestr);
+ return mqttConfigTopicDao.selectListByWhere(entity);
+ }
+
+ @Override
+ public Integer deleteByWhere(String wherestr) {
+ return mqttConfigTopicDao.deleteByWhere(wherestr);
+ }
+
+ @Override
+ public List selectListByPid(String pid) {
+ return mqttConfigTopicDao.selectListByPid(pid);
+ }
+}
diff --git a/src/main/java/com/sipai/service/mqtt/impl/MqttServiceImpl.java b/src/main/java/com/sipai/service/mqtt/impl/MqttServiceImpl.java
new file mode 100644
index 0000000..a1e4480
--- /dev/null
+++ b/src/main/java/com/sipai/service/mqtt/impl/MqttServiceImpl.java
@@ -0,0 +1,553 @@
+package com.sipai.service.mqtt.impl;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.sipai.entity.mqtt.Mqtt;
+import com.sipai.entity.mqtt.MqttProperties;
+import com.sipai.entity.scada.MPoint;
+import com.sipai.entity.scada.MPointES;
+import com.sipai.entity.scada.MPointHistory;
+import com.sipai.service.Listener.ListenerPointService;
+import com.sipai.service.mqtt.MqttConfigService;
+import com.sipai.service.mqtt.MqttConfigTopicService;
+import com.sipai.service.mqtt.MqttService;
+import com.sipai.service.rabbitmq.MQService;
+import com.sipai.service.scada.MPointHistoryService;
+import com.sipai.service.scada.MPointService;
+import com.sipai.tools.*;
+import jodd.util.ClassUtil;
+import org.eclipse.paho.client.mqttv3.*;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.redisson.api.RBatch;
+import org.redisson.api.RedissonClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+@Service("mqttService")
+public class MqttServiceImpl implements MqttService {
+ private final MqttProperties mqttProperties;
+ @Autowired
+ private MPointService mPointService;
+ @Autowired
+ private MqttConfigService mqttConfigService;
+ @Autowired
+ private MqttConfigTopicService mqttConfigTopicService;
+ @Autowired
+ private ListenerPointService listenerPointService;
+ @Autowired
+ private RedissonClient redissonClient;
+
+ private static MqttClient mqttClient;
+ private static String ipStr = "";
+ private static final Logger loggger = LoggerFactory.getLogger(ClassUtil.class);
+
+ //测量点对象缓存
+ private final Map mPointCache = new ConcurrentHashMap<>();
+ private final Set notFoundKeys = ConcurrentHashMap.newKeySet();
+
+ @Override
+ @Scheduled(fixedRate = 3600000)//每小时清理一次
+ public void cleanCache() {
+ loggger.info("清理缓存(" + mPointCache.size() + " 条):" + CommUtil.nowDate());
+ mPointCache.clear();
+ notFoundKeys.clear();
+ }
+
+ public MqttServiceImpl(MqttProperties mqttProperties) {
+ this.mqttProperties = mqttProperties;
+ }
+
+ private static MqttClient connect(String brokeraddress, String clientId, String userName, String password) throws MqttException {
+
+ ipStr = brokeraddress;
+
+ MemoryPersistence persistence = new MemoryPersistence();
+ MqttConnectOptions connOpts = new MqttConnectOptions();
+ connOpts.setCleanSession(true);
+ connOpts.setUserName(userName);
+ connOpts.setPassword(password.toCharArray());
+ connOpts.setConnectionTimeout(10);
+ connOpts.setKeepAliveInterval(20);
+// String[] uris = {"tcp://10.100.124.206:1883","tcp://10.100.124.207:1883"};
+// connOpts.setServerURIs(uris); //起到负载均衡和高可用的作用
+// MqttClient mqttClient = new MqttClient(brokeraddress, clientId, persistence);
+ mqttClient = new MqttClient(brokeraddress, clientId, persistence);
+ //mqttClient.setCallback(new PushCallback("test"));
+ mqttClient.connect(connOpts);
+ return mqttClient;
+ }
+
+ @Async("getAsyncMqttHandle")
+ @Override
+ public void doHandle(String bizId, String topic, String ip4, String port, JSONArray jsonArray) {
+// System.out.println("MQTT接收: biz=" + bizId + " topic=" + topic + " msgCount=" + jsonArray.size());
+ for (int i = 0; i < jsonArray.size(); i++) {
+ JSONObject jsonObject = jsonArray.getJSONObject(i);
+ // 解析时间戳
+ String timestamp = jsonObject.getString("timestamp");
+ String[] times = timestamp.split("\\.");
+ String date = CommUtil.timeStamp2Date(times[0], "yyyy-MM-dd HH:mm:ss");
+
+ // 解析数据对象
+ JSONObject data = jsonObject.getJSONObject("Data");
+ if (data == null || data.isEmpty()) {
+ loggger.warn("Data为空,跳过: @{}", date);
+ continue;
+ }
+
+ List redisList = new ArrayList<>();
+ List vueList = new ArrayList<>();
+
+ // 遍历数据条目
+ for (String key : data.keySet()) {
+ String valueStr = data.getString(key);
+ // 过滤无效值
+ if (!isValidValue(valueStr)) {
+ loggger.warn("值无效,跳过: key={} raw={} @{}", key, valueStr, date);
+ continue;
+ }
+ // 获取点位信息
+ MPoint mPoint = getMPointCacheOrES(bizId, key);
+ if (mPoint == null) {
+ boolean inNotFound = notFoundKeys.contains(key);
+ loggger.warn("点位不存在,跳过: key={} notFoundCache={} @{}", key, inNotFound, date);
+ continue;
+ }
+ // 处理数值转换、倍率计算、取反操作、推报警信息、发送Kafka消息
+ BigDecimal value = handleValueConversion(bizId, valueStr, key, date, vueList);
+ if (value == null) {
+ loggger.warn("数值转换失败,跳过: key={} raw={} @{}", key, valueStr, date);
+ continue;
+ }
+ // 处理ES数据
+ MPointES esPoint = buildEsPoint(mPoint, value, date);
+ redisList.add(esPoint);
+ }
+ // 推送前端数据
+// loggger.info("准备推送前端: topic={} size={} @{}", topic, vueList.size(), date);
+ mqttConfigService.doSendMqttVue(vueList, topic);
+
+ // 批量写入Redis
+ saveToRedisBatch(redisList);
+// loggger.info("批量写入Redis完成: size={} @{}", redisList.size(), date);
+ }
+ }
+
+ /**
+ * 校验值是否有效(非空且长度合法)
+ */
+ private boolean isValidValue(String value) {
+ if (value == null || value.isEmpty()) {
+ loggger.warn("无效值: 空字符串");
+ return false;
+ }
+ BigDecimal bd = new BigDecimal(value);
+ String plainValue = bd.toPlainString();
+ String[] parts = plainValue.split("\\.");
+ boolean ok = parts[0].length() <= 15;
+ if (!ok) {
+ loggger.warn("无效值: 整数位过长 len={} raw={}", parts[0].length(), value);
+ }
+ return ok;
+ }
+
+ /**
+ * 处理数值转换、倍率计算、取反操作、推报警信息、发送Kafka消息
+ */
+ private BigDecimal handleValueConversion(String unitId, String valueStr, String key, String date, List vueList) {
+ try {
+ double valDou = Double.parseDouble(valueStr);
+ MPoint mPoint = getMPointCacheOrES(unitId, key);
+ if (mPoint == null) {
+ return null;
+ }
+
+ // 倍率计算
+ BigDecimal rate = mPoint.getRate() != null ? mPoint.getRate() : BigDecimal.ONE;
+ BigDecimal value = (valDou == 0 || rate.compareTo(BigDecimal.ZERO) == 0)
+ ? BigDecimal.ZERO
+ : rate.multiply(new BigDecimal(valueStr)).divide(BigDecimal.ONE, 10, RoundingMode.HALF_UP);
+
+ // 取反操作
+ if ("1".equals(mPoint.getDirecttype())) {
+ value = "0".equals(valueStr) ? BigDecimal.ONE : ("1".equals(valueStr) ? BigDecimal.ZERO : value);
+ }
+
+ // 推送报警信息
+ if (mPoint.getTriggeralarm() != null && !"0".equals(mPoint.getTriggeralarm().trim())) {
+// loggger.info("推送报警: {}={} @{}", key, value, date);
+// mPointService.sendKafka4MQTT_Alarm(key, value, date, 0, 1);
+ }
+
+ // 发送Kafka消息
+// mPointService.sendKafka4MQTT(unitId,key, value, date, 0, 1, mPoint.getSignaltype());
+
+ // 构建前端数据
+ buildVuePoint(mPoint, value, date, vueList);
+
+ return value;
+ } catch (NumberFormatException e) {
+ loggger.error("数值转换失败: {}", valueStr, e);
+ return null;
+ }
+ }
+
+ /**
+ * 构建前端展示用点位对象
+ */
+ private void buildVuePoint(MPoint mPoint, BigDecimal value, String date, List vueList) {
+ MPoint vuePoint = new MPoint();
+ vuePoint.setId(mPoint.getId());
+ vuePoint.setMeasuredt(date);
+ int numtail = doNumtail(mPoint.getNumtail());
+ vuePoint.setParmvalue(value.setScale(numtail, RoundingMode.HALF_UP));
+ vueList.add(vuePoint);
+ }
+
+ /**
+ * 构建ES存储用点位对象
+ */
+ private MPointES buildEsPoint(MPoint mPoint, BigDecimal value, String date) {
+ MPoint esPoint = new MPoint();
+ esPoint.setId(mPoint.getId());
+ esPoint.setMeasuredt(date);
+ esPoint.setParmvalue(value);
+ esPoint.setNumtail(mPoint.getNumtail());
+ return MPointES.format(esPoint);
+ }
+
+ /**
+ * 批量保存数据到Redis
+ */
+ private void saveToRedisBatch(List esList) {
+ if (esList.isEmpty()) {
+ return;
+ }
+
+ int batchSize = 200;
+ int total = esList.size();
+ int batches = (total + batchSize - 1) / batchSize;
+
+ try {
+ for (int i = 0; i < batches; i++) {
+ int start = i * batchSize;
+ int end = Math.min((i + 1) * batchSize, total);
+ RBatch batch = redissonClient.createBatch();
+
+ for (MPoint mPoint : esList.subList(start, end)) {
+ int num = mPoint.getId().hashCode() % 25;
+ String dt = mPoint.getMeasuredt().replace("T", " ").replace("Z", "");
+ int numtail = doNumtail(mPoint.getNumtail());
+ String val = mPoint.getParmvalue().setScale(numtail, RoundingMode.HALF_UP)
+ + ";" + mPoint.getParmvalue() + ";" + dt;
+
+ batch.getMapCache(CommString.RedisMpointFlag + num)
+ .fastPutAsync(mPoint.getId(), val, 1, TimeUnit.DAYS);
+ }
+ batch.execute();
+ }
+ } catch (Exception e) {
+ loggger.error("Redis批量保存失败", e);
+ }
+ }
+
+ @Async("getAsyncMqttHandle")
+ @Override
+ public void doHandleHis(String bizId, String topic, String ip4, String port, JSONArray jsonArray) {
+ for (int i = 0; i < jsonArray.size(); i++) {
+ JSONObject jsonObject = jsonArray.getJSONObject(i);
+ String timestamp = jsonObject.get("timestamp").toString();//时间戳
+ String[] times = timestamp.split("\\.");
+ String date = CommUtil.timeStamp2Date(times[0], "yyyy-MM-dd HH:mm:ss");
+ //数据
+ JSONObject data = jsonObject.getJSONObject("Data");
+
+ MPointService mPointService = (MPointService) SpringContextUtil.getBean("mPointService");
+ MPointHistoryService mPointHistoryService = (MPointHistoryService) SpringContextUtil.getBean("mPointHistoryService");
+ MPointHistory mPointHistory = new MPointHistory();
+
+ for (String str : data.keySet()) {
+ String key = str;//测量点id
+ String value = data.get(str) + "";
+
+ //过滤掉长度超过15位的数值 money无法存
+ if (value != null && !value.equals("")) {
+ BigDecimal bd = new BigDecimal(value);
+ value = bd.toPlainString();
+ if (value.contains(".")) {
+ String[] vals = value.split("\\.");
+ if (vals[0].length() > 15) {
+ //超出
+ continue;
+ } else {
+ //未超出
+ }
+ } else {
+ if (value.length() > 15) {
+ //超出
+ continue;
+ } else {
+ //未超出
+ }
+ }
+ } else {
+ //为空 过滤掉 不存日志
+ continue;
+ }
+
+ BigDecimal value_BigDecimal = null;
+ double valDou = 0;//判断是否为0 或者0.00 或 0.000等
+
+ //将json中为""的过滤掉
+ if (value != null && !value.equals("")) {
+ valDou = Double.parseDouble(value);//判断是否为0 或者0.00 或 0.000等
+// MPoint mPoint = mPointService.selectById(bizId, key);
+ MPoint mPoint = mPointService.selectById(bizId, key.toString());
+ if (mPoint != null) {
+ BigDecimal numDecimal = mPoint.getRate();
+ if (valDou == 0 || mPoint.getRate().equals(0)) {
+ value_BigDecimal = new BigDecimal(0);
+ } else {
+ value_BigDecimal = numDecimal.multiply(new BigDecimal(value));
+ //截取长度防止 过长 除1再截取10位
+ value_BigDecimal = value_BigDecimal.divide(new BigDecimal(1), 10, BigDecimal.ROUND_HALF_UP);
+ }
+
+ //取反操作
+ if (mPoint.getDirecttype() != null && mPoint.getDirecttype().equals("1")) {
+ //取反
+ if (value.equals("0")) {
+ value_BigDecimal = new BigDecimal(1);
+ } else if (value.equals("1")) {
+ value_BigDecimal = new BigDecimal(0);
+ } else {
+ //正常
+ }
+ } else {
+ //正常
+ }
+
+ try {
+ //DI点直接存
+ if (mPoint.getSignaltype() != null && mPoint.getSignaltype().equals("DI")) {
+ mPointHistory.setMeasuredt(date);
+ mPointHistory.setParmvalue(value_BigDecimal);
+ mPointHistory.setTbName("[tb_mp_" + key + "]");
+ mPointHistory.setUserid(ip4 + ":" + port);
+ mPointHistoryService.save(bizId, mPointHistory);
+ }
+ //AI点3分钟存一次
+ if (mPoint.getSignaltype() != null && mPoint.getSignaltype().equals("DI")) {
+ String sqlStr = "where MeasureDT>dateadd(mi,-3,'" + date + "') order by MeasureDT desc";
+ /*List mPointHistoryList = mPointHistoryService.selectTopNumListByTableAWhere(mPoint.getBizid(),"[tb_mp_" + key + "]", sqlStr, "1");
+ if (mPointHistoryList != null && mPointHistoryList.size() > 0) {
+ //3分钟内存在 则不存
+ } else {
+ mPointHistory.setMeasuredt(date);
+ mPointHistory.setParmvalue(value_BigDecimal);
+ mPointHistory.setTbName("[tb_mp_" + key + "]");
+ mPointHistory.setUserid(ip4 + ":" + port);
+ mPointHistoryService.save(bizId, mPointHistory);
+ }*/
+ }
+ } catch (Exception e) {
+ System.out.println("新增子表--执行结果:" + e);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * 发送 rabbitmq
+ *
+ * @param key
+ * @param value_BigDecimal
+ * @param date
+ */
+ @Override
+ public void sentRabbitmq(String exchange, String key, String value_BigDecimal, String date) {
+ JSONObject jsonObject1 = new JSONObject();
+ jsonObject1.put("id", key);
+ jsonObject1.put("value", value_BigDecimal);
+ jsonObject1.put("time", date);
+ MQService mqService = (MQService) SpringContextUtil.getBean("mQService");
+ mqService.sendMQ(exchange, jsonObject1.toString());
+ }
+
+ /**
+ * 指令下发
+ *
+ * @param json
+ * @param topic
+ * @param userId
+ * @param bizId
+ * @return
+ */
+ @Override
+ public int doPublish(JSONObject json, String topic, String userId, String bizId) {
+ try {
+ MqttMessage message = new MqttMessage(json.toString().getBytes());
+ message.setQos(2);
+ message.setRetained(false);
+
+ //查询主题属于哪个代理
+ String topic_up = "";
+ if (topic != null && !topic.equals("")) {
+ topic_up = topic.replace("_DOWN", "_UP");
+ }
+ String host = mqttConfigService.getEmqxHost4TCP(topic_up);
+
+ System.out.println("指令下发:" + host);
+
+ //连接
+ mqttClient = connect(host, "gy_publish_8080", mqttProperties.getUsername(), mqttProperties.getPassword());
+ //发布
+ mqttClient.publish(topic, message);
+ //断开连接
+ mqttClient.disconnect();
+ } catch (MqttPersistenceException e) {
+ e.printStackTrace();
+ } catch (MqttException e) {
+ e.printStackTrace();
+ }
+ return 0;
+ }
+
+ /**
+ * 召回网关全部数据
+ *
+ * @param bizId
+ * @param jsonObject
+ */
+ @Override
+ public void doRecall(String bizId, JSONObject jsonObject) {
+ try {
+ MqttMessage message = new MqttMessage(jsonObject.toString().getBytes());
+ message.setQos(2);
+ message.setRetained(false);
+
+ //查询主题属于哪个代理
+ String topic_up = bizId + "_01_UP";
+
+ String host = mqttConfigService.getEmqxHost4TCP(topic_up);
+
+ System.out.println(CommUtil.nowDate() + "数据召回-执行Recall:" + bizId + "_01_RECALL" + "------" + host);
+
+ //连接
+ mqttClient = connect(host, "gy_recall_8080", mqttProperties.getUsername(), mqttProperties.getPassword());
+ //发布主题
+ mqttClient.publish(bizId + "_01_RECALL", message);
+ //断开连接
+ mqttClient.disconnect();
+ } catch (MqttException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 将处理后的mqtt消息传到vue前端
+ */
+ @Async
+ public void doSendMqttVue(List list, String topic) {
+ JSONArray jsonVue = (JSONArray) JSONArray.toJSON(list);
+ MqttMessage message = new MqttMessage(jsonVue.toString().getBytes());
+ message.setQos(0);
+ message.setRetained(false);
+ try {
+ if (mqttClient == null) {
+ System.out.println("掉线_sendMqttVue");
+ mqttClient = connect(mqttProperties.getBrokerAddress(), "gy_sendMqttVue" + CommUtil.getUUID(), mqttProperties.getUsername(), mqttProperties.getPassword());
+ } else {
+// System.out.println("正常_sendMqttVue");
+ }
+ String topic_vue = topic;
+ if (topic != null && topic.contains("_IM")) {
+ topic_vue = topic.replace("_IM", "");
+ }
+ topic_vue = topic_vue + "_VUE";
+ mqttClient.publish(topic_vue, message);
+ } catch (MqttException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 返回精度
+ *
+ * @param num
+ * @return
+ */
+ public int doNumtail(String num) {
+ int numtail = 0;
+ if (num != null && !num.trim().equals("")) {
+ //0.00这种后面不用 暂时写死 以后标准全部用数字
+ if (num.contains(".")) {
+ if (num.equals("0.0")) {
+ numtail = 1;
+ }
+ if (num.equals("0.00")) {
+ numtail = 2;
+ }
+ if (num.equals("0.000")) {
+ numtail = 3;
+ }
+ if (num.equals("0.0000")) {
+ numtail = 4;
+ }
+ } else {
+ numtail = Integer.parseInt(num);
+ }
+ } else {
+ numtail = 0;
+ }
+ return numtail;
+ }
+
+ /**
+ * 优先从缓存查询MPoint对象,不存在则去es中查询
+ *
+ * @param key
+ * @return
+ */
+ private MPoint getMPointCacheOrES(String unitId, String key) {
+ // 查询缓存是否存在
+ if (mPointCache.containsKey(key)) {
+ MPoint mp = mPointCache.get(key);
+// loggger.debug("缓存命中: key={} id={}", key, mp.getId());
+ return mp;
+ }
+ // 点位不存在 (新加的点位需要定时清理缓存的时候才会进入缓存,不然频繁去查询es)
+ if (notFoundKeys.contains(key)) {
+// loggger.debug("已标记未找到,跳过ES查询: key={}", key);
+ return null;
+ }
+ // 从ES查询
+ MPoint mPoint = mPointService.selectById(unitId, key);
+ if (mPoint != null) {
+ //添加缓存
+ mPointCache.put(key, mPoint);
+// loggger.debug("ES查询成功并缓存: key={} id={}", key, mPoint.getId());
+ } else {
+ //没有该点位
+ notFoundKeys.add(key);
+// loggger.warn("ES未找到点位,加入未找到集合: key={}", key);
+ }
+ return mPoint;
+ }
+
+}
diff --git a/src/main/java/com/sipai/service/opc/InitOpcUaService.java b/src/main/java/com/sipai/service/opc/InitOpcUaService.java
new file mode 100644
index 0000000..9d24bf7
--- /dev/null
+++ b/src/main/java/com/sipai/service/opc/InitOpcUaService.java
@@ -0,0 +1,134 @@
+package com.sipai.service.opc;
+
+import com.sipai.entity.scada.MPoint;
+import com.sipai.service.opc.OpcUaService;
+import com.sipai.service.scada.MPointService;
+import com.sipai.tools.CommString;
+import com.sipai.tools.CommUtil;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.sort.SortBuilders;
+import org.redisson.api.RBatch;
+import org.redisson.api.RedissonClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.domain.PageRequest;
+import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
+/**
+ * 初始化订阅数据
+ */
+@Service
+public class InitOpcUaService {
+ @Autowired(required = false)
+ private OpcUaService opcUaService;
+ @Autowired
+ private MPointService mPointService;
+
+ // 本地缓存
+ private final Cache mPointCache = Caffeine.newBuilder()
+ .maximumSize(10000) // 仅设置最大缓存数量
+// .expireAfterWrite(1, TimeUnit.HOURS) // 1小时后过期
+ .build();
+
+ // 已订阅的点位集合
+ private Set subscribedNodeIds = new HashSet<>();
+
+ @PostConstruct
+ public void init() {
+ if (opcUaService == null) {
+ System.out.println("OPC UA 未启动,请检查yml配置文件钟:opc: enabled: true");
+ return;
+ }
+ try {
+ //查询所有的opcua采集点位
+ List nodeIdList = fetchNodeIdsFromEs();
+ for (String nodeId : nodeIdList) {
+ MPoint mPoint = mPointCache.get(nodeId, key -> mPointService.selectById("",nodeId));
+ System.out.println(mPoint);
+ Consumer callback = value -> {
+ if (mPoint != null) {
+ mPointService.sendKafka4OpcUa(mPoint.getId(), value, CommUtil.nowDate());
+ }
+ };
+ opcUaService.subscribeToNode(nodeId, callback);
+ subscribedNodeIds.add(nodeId); // 记录已订阅的点位
+ }
+ System.out.println("Subscribed to " + nodeIdList.size() + " nodes");
+ } catch (Exception e) {
+ System.err.println("Failed to subscribe: " + e.getMessage());
+ }
+ }
+
+ /**
+ * 同步一次opcua订阅点位
+ */
+ public void manualSyncSubscriptions() {
+ try {
+ List currentNodeIds = fetchNodeIdsFromEs(); // 获取当前 Elasticsearch 中的点位信息
+ Set newSubscribedNodeIds = new HashSet<>();
+
+ // 处理新增点位
+ for (String nodeId : currentNodeIds) {
+ if (!subscribedNodeIds.contains(nodeId)) { // 新增点位
+ MPoint mPoint = mPointCache.get(nodeId, key -> mPointService.selectById("",nodeId));
+ System.out.println(mPoint);
+ Consumer callback = value -> {
+ if (mPoint != null) {
+ mPointService.sendKafka4OpcUa(mPoint.getId(), value, CommUtil.nowDate());
+ }
+ };
+ opcUaService.subscribeToNode(nodeId, callback);
+ newSubscribedNodeIds.add(nodeId);
+ } else {
+ newSubscribedNodeIds.add(nodeId); // 保留已订阅的点位
+ }
+ }
+
+ // 处理删除点位
+ for (String nodeId : subscribedNodeIds) {
+ if (!currentNodeIds.contains(nodeId)) { // 删除点位
+ opcUaService.unsubscribeFromNode(nodeId);
+ }
+ }
+
+ // 更新已订阅点位集合
+ subscribedNodeIds = newSubscribedNodeIds;
+
+ System.out.println("进行一次opcua订阅同步");
+ } catch (Exception e) {
+ System.err.println("opcua订阅同步失败: " + e.getMessage());
+ }
+ }
+
+ /**
+ * 查询所有的opcua采集点位
+ *
+ * @return
+ */
+ private List fetchNodeIdsFromEs() {
+ NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
+ BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
+ BoolQueryBuilder childBoolQueryBuilder = QueryBuilders.boolQuery();
+ childBoolQueryBuilder.must(QueryBuilders.matchPhraseQuery("biztype", "opcua"));
+ boolQueryBuilder.should(childBoolQueryBuilder);
+ nativeSearchQueryBuilder.withQuery(boolQueryBuilder);
+ nativeSearchQueryBuilder.withPageable(PageRequest.of(0, 10000));
+ List list_mps = this.mPointService.selectListByWhere4Es(nativeSearchQueryBuilder);
+ List nodeIdList = new ArrayList<>();
+ for (MPoint mPoint : list_mps) {
+ nodeIdList.add(mPoint.getMpointid());
+ }
+ return nodeIdList;
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/sipai/service/opc/OpcUaService.java b/src/main/java/com/sipai/service/opc/OpcUaService.java
new file mode 100644
index 0000000..457b2f6
--- /dev/null
+++ b/src/main/java/com/sipai/service/opc/OpcUaService.java
@@ -0,0 +1,277 @@
+package com.sipai.service.opc;
+
+import com.sipai.config.OpcUaProperties;
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
+import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
+import org.eclipse.milo.opcua.sdk.client.nodes.UaVariableNode;
+import org.eclipse.milo.opcua.stack.core.AttributeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
+import org.eclipse.milo.opcua.stack.core.types.structured.*;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
+import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+@Service
+public class OpcUaService {
+
+ @Autowired(required = false)
+ private final OpcUaClient opcUaClient;
+ private final OpcUaProperties opcUaProperties;
+
+ @Autowired
+ public OpcUaService(@Autowired(required = false) OpcUaClient opcUaClient, OpcUaProperties opcUaProperties) {
+ this.opcUaClient = opcUaClient;
+ this.opcUaProperties = opcUaProperties;
+ }
+
+ /*public void printConfig() {
+ System.out.println("OPC UA Server URL: " + opcUaProperties.getServerUrl());
+ System.out.println("Security Policy: " + opcUaProperties.getSecurityPolicy());
+ System.out.println("Security Mode: " + opcUaProperties.getSecurityMode());
+ }*/
+
+ /**
+ * 读取单个点位的值
+ *
+ * @param nodeId
+ * @return
+ * @throws ExecutionException
+ * @throws InterruptedException
+ */
+ public Object readNodeValue(String nodeId) throws ExecutionException, InterruptedException {
+ if (opcUaClient == null) {
+ throw new IllegalStateException("OPC UA 未启动,请检查yml配置文件钟:opc: enabled: true");
+ }
+ NodeId node = NodeId.parse(nodeId);
+ DataValue dataValue = opcUaClient.readValue(0, TimestampsToReturn.Both, node).get();
+ Variant variant = dataValue.getValue();
+ return variant.getValue();
+ }
+
+ /**
+ * 写入单个点位的值
+ *
+ * @param nodeId 节点ID
+ * @param value 要写入的值
+ * @return 写入是否成功
+ * @throws ExecutionException
+ * @throws InterruptedException
+ */
+ public boolean writeNodeValue(String nodeId, Object value) {
+ try {
+ NodeId node = NodeId.parse(nodeId);
+
+ // 读取节点的数据类型(使用 UaVariableNode 替代 VariableNode)
+ UaVariableNode variableNode = (UaVariableNode) opcUaClient.getAddressSpace().getVariableNode(node);
+ if (variableNode == null) {
+ throw new RuntimeException("无法找到节点: " + nodeId);
+ }
+
+ NodeId dataTypeId = variableNode.getDataType();
+ System.out.println(dataTypeId);
+
+ if (dataTypeId == null) {
+ throw new RuntimeException("无法获取节点的数据类型");
+ }
+
+ // 根据数据类型创建对应的 Variant
+ Variant variant = createVariantForDataType(value, dataTypeId);
+
+ DataValue dataValue = DataValue.valueOnly(variant);
+ StatusCode statusCode = opcUaClient.writeValue(node, dataValue).get();
+ System.out.println(statusCode);
+ return statusCode.isGood();
+ } catch (Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ /**
+ * typeId跟标准规范不一致,目前用 KEPServerEX6自带的OPCUA服务 测试的结果
+ *
+ * @param value
+ * @param dataTypeId
+ * @return
+ */
+ private Variant createVariantForDataType(Object value, NodeId dataTypeId) {
+ int typeId = ((Number) dataTypeId.getIdentifier()).intValue();
+ // 标准类型处理
+ switch (typeId) {
+ case 1:
+ return new Variant(convertToBoolean(value));
+ case 4: // Int16 (有符号short)
+ short shortValue = (value instanceof Number) ?
+ ((Number) value).shortValue() :
+ Short.parseShort(value.toString());
+ return new Variant(shortValue);
+ case 5: // UInt16/Word (无符号short)
+ int intValue = (value instanceof Number) ?
+ ((Number) value).intValue() :
+ Integer.parseInt(value.toString());
+ if (intValue < 0 || intValue > 65535) {
+ throw new IllegalArgumentException("UInt16 值必须在 0~65535 之间");
+ }
+ return new Variant(Unsigned.ushort(intValue));
+ case 6: // Int32
+ return new Variant(convertToInt(value));
+ case 10: //Float
+ return new Variant(convertToFloat(value));
+ case 11: // Double
+ return new Variant(convertToDouble(value));
+ default:
+ return new Variant(value.toString());
+ }
+ }
+
+ // 类型转换辅助方法(保持不变)
+ private boolean convertToBoolean(Object value) {
+ if (value instanceof Boolean) return (Boolean) value;
+ String strVal = value.toString().trim().toLowerCase();
+ return strVal.equals("true") || strVal.equals("1");
+ }
+
+ private float convertToFloat(Object value) {
+ if (value instanceof Number) return ((Number) value).floatValue();
+ return Float.parseFloat(value.toString());
+ }
+
+ private double convertToDouble(Object value) {
+ if (value instanceof Number) return ((Number) value).doubleValue();
+ return Double.parseDouble(value.toString());
+ }
+
+ private int convertToInt(Object value) {
+ if (value instanceof Number) return ((Number) value).intValue();
+ return Integer.parseInt(value.toString());
+ }
+
+ private long convertToUInt(Object value) {
+ if (value instanceof Number) return ((Number) value).longValue();
+ return Long.parseLong(value.toString());
+ }
+
+ // 订阅点位
+ public void subscribeToNode(String nodeId, Consumer callback) throws Exception {
+ if (opcUaClient == null) {
+ throw new IllegalStateException("OPC UA 未启动,请检查yml配置文件钟:opc: enabled: true");
+ }
+ try {
+ NodeId node = NodeId.parse(nodeId);
+
+ UaSubscription subscription = opcUaClient
+ .getSubscriptionManager()
+ .createSubscription(1000.0)
+ .get();
+
+ MonitoringParameters parameters = new MonitoringParameters(
+ Unsigned.uint(1),
+ 1000.0,
+ null,
+ Unsigned.uint(10),
+ true
+ );
+
+ MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(
+ new ReadValueId(
+ node,
+ AttributeId.Value.uid(),
+ null,
+ null
+ ),
+ MonitoringMode.Reporting,
+ parameters
+ );
+
+ subscription.createMonitoredItems(
+ TimestampsToReturn.Both,
+ Collections.singletonList(request),
+ new UaSubscription.ItemCreationCallback() {
+ @Override
+ public void onItemCreated(UaMonitoredItem item, int i) {
+ // 设置值的回调处理器
+ item.setValueConsumer(value -> {
+ Variant variant = value.getValue();
+ Object rawValue = variant.getValue();
+
+ if (rawValue instanceof Number) {
+ callback.accept(((Number) rawValue).doubleValue()); // 转为 Double
+ } else if (rawValue instanceof Boolean) {
+ callback.accept(((Boolean) rawValue) ? 1.0 : 0.0); // 布尔转0 1
+ } else {
+ throw new ClassCastException("Expected Number or Boolean but received: " +
+ rawValue.getClass().getSimpleName());
+ }
+ });
+ }
+ }
+ ).get();
+
+ } catch (Exception e) {
+ throw new Exception("订阅失败: " + e.getMessage(), e);
+ }
+ }
+
+ // 取消订阅点位
+ public boolean unsubscribeFromNode(String nodeId) {
+ if (opcUaClient == null) {
+ throw new IllegalStateException("OPC UA 未启动,请检查yml配置文件钟:opc: enabled: true");
+ }
+ try {
+ NodeId node = NodeId.parse(nodeId);
+
+ // 获取所有订阅
+ List subscriptions = opcUaClient.getSubscriptionManager().getSubscriptions();
+
+ for (UaSubscription subscription : subscriptions) {
+ // 获取所有监控项
+ List monitoredItems = subscription.getMonitoredItems();
+
+ for (UaMonitoredItem monitoredItem : monitoredItems) {
+ if (monitoredItem.getReadValueId().getNodeId().equals(node)) {
+ // 删除监控项
+ CompletableFuture> deleteFuture = subscription.deleteMonitoredItems(
+ Collections.singletonList(monitoredItem)
+ );
+
+ // 等待删除操作完成
+ List statusCodes = deleteFuture.get(5, TimeUnit.SECONDS);
+
+ // 检查删除结果
+ for (StatusCode statusCode : statusCodes) {
+ if (!statusCode.isGood()) {
+ System.err.println("删除监控项失败: " + statusCode);
+ return false;
+ }
+ }
+
+ System.out.println("成功取消订阅节点: " + nodeId);
+ return true;
+ }
+ }
+ }
+
+ System.out.println("未找到节点: " + nodeId);
+ return false;
+ } catch (Exception e) {
+ System.err.println("取消订阅失败: " + e.getMessage());
+ return false;
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/sipai/service/rabbitmq/MQService.java b/src/main/java/com/sipai/service/rabbitmq/MQService.java
new file mode 100644
index 0000000..650f742
--- /dev/null
+++ b/src/main/java/com/sipai/service/rabbitmq/MQService.java
@@ -0,0 +1,5 @@
+package com.sipai.service.rabbitmq;
+
+public interface MQService {
+ void sendMQ(String exchange, String msg);
+}
diff --git a/src/main/java/com/sipai/service/rabbitmq/impl/MQServiceImpl.java b/src/main/java/com/sipai/service/rabbitmq/impl/MQServiceImpl.java
new file mode 100644
index 0000000..22327f4
--- /dev/null
+++ b/src/main/java/com/sipai/service/rabbitmq/impl/MQServiceImpl.java
@@ -0,0 +1,23 @@
+package com.sipai.service.rabbitmq.impl;
+
+import com.sipai.service.rabbitmq.MQService;
+import com.sipai.tools.ConstantString;
+import org.springframework.amqp.core.AmqpTemplate;
+import org.springframework.context.annotation.Lazy;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+
+@Service("mQService")
+public class MQServiceImpl implements MQService {
+ @Resource
+ @Lazy
+ private AmqpTemplate amqpTemplate;
+
+ @Override
+ public void sendMQ(String exchange,String msg) {
+// String exchange = ConstantString.MQ_ALARM_MQTT;
+ String key = ConstantString.KEY_ALARM_MSG;
+ this.amqpTemplate.convertAndSend(exchange,key,msg);
+ }
+}
diff --git a/src/main/java/com/sipai/service/scada/MPointBzwService.java b/src/main/java/com/sipai/service/scada/MPointBzwService.java
new file mode 100644
index 0000000..ad26385
--- /dev/null
+++ b/src/main/java/com/sipai/service/scada/MPointBzwService.java
@@ -0,0 +1,21 @@
+package com.sipai.service.scada;
+
+import com.sipai.entity.scada.MPointBzw;
+
+import java.util.List;
+
+public interface MPointBzwService {
+
+ public abstract MPointBzw selectByPrimaryKey(String id);
+
+ public abstract Integer deleteByPrimaryKey(String id);
+
+ public abstract Integer insert(MPointBzw entity);
+
+ public abstract Integer updateByPrimaryKeySelective(MPointBzw entity);
+
+ public abstract List selectListByWhere(String wherestr);
+
+ public abstract Integer deleteByWhere(String wherestr);
+
+}
diff --git a/src/main/java/com/sipai/service/scada/MPointHistoryService.java b/src/main/java/com/sipai/service/scada/MPointHistoryService.java
new file mode 100644
index 0000000..8cf94c1
--- /dev/null
+++ b/src/main/java/com/sipai/service/scada/MPointHistoryService.java
@@ -0,0 +1,17 @@
+package com.sipai.service.scada;
+
+import com.sipai.entity.scada.MPointHistory;
+
+import java.util.List;
+
+public interface MPointHistoryService {
+ int save(String bizId, MPointHistory mPointHistory);
+
+ List selectListByTableAWhere(String bizId, String table, String wherestr);
+
+ int deleteByTableAWhere(String bizId,String table ,String wherestr);
+
+ int saveByCreate(String bizId, MPointHistory mPointHistory);
+
+ List