对接MQTT、APP样式
This commit is contained in:
280
src/utils/mqttDataManager.js
Normal file
280
src/utils/mqttDataManager.js
Normal file
@ -0,0 +1,280 @@
|
||||
// MQTT数据管理器
|
||||
import { createMqtt, closeMqtt, getConnectionStatus } from './sendMqtt.js'
|
||||
|
||||
class MqttDataManager {
|
||||
constructor() {
|
||||
this.listeners = new Map()
|
||||
this.isConnected = false
|
||||
this.lastData = {
|
||||
temperature: null,
|
||||
humidity: null,
|
||||
pm25: null,
|
||||
timestamp: null
|
||||
}
|
||||
this.init()
|
||||
}
|
||||
|
||||
// 初始化MQTT连接
|
||||
init() {
|
||||
try {
|
||||
console.log('🚀 MQTT数据管理器开始初始化...')
|
||||
|
||||
// 监听MQTT数据
|
||||
uni.$on('mqttData', this.handleMqttData.bind(this))
|
||||
console.log('✅ MQTT数据监听器已注册')
|
||||
|
||||
// 立即创建MQTT连接,不使用延迟
|
||||
console.log('🔧 立即创建MQTT连接...')
|
||||
createMqtt()
|
||||
|
||||
// 定期检查连接状态
|
||||
this.statusCheckInterval = setInterval(() => {
|
||||
const wasConnected = this.isConnected
|
||||
this.isConnected = getConnectionStatus()
|
||||
|
||||
// 如果连接状态发生变化,通知监听器
|
||||
if (wasConnected !== this.isConnected) {
|
||||
console.log('🔄 MQTT连接状态变化:', {
|
||||
wasConnected,
|
||||
isConnected: this.isConnected
|
||||
})
|
||||
this.notifyListeners('connectionStatus', {
|
||||
isConnected: this.isConnected,
|
||||
lastUpdate: this.lastData.timestamp ? new Date(this.lastData.timestamp * 1000).toLocaleString('zh-CN') : null
|
||||
})
|
||||
}
|
||||
}, 3000) // 改为3秒检查一次
|
||||
|
||||
console.log('✅ MQTT数据管理器初始化完成')
|
||||
} catch (error) {
|
||||
console.error('❌ MQTT数据管理器初始化失败:', error)
|
||||
}
|
||||
}
|
||||
|
||||
// 处理MQTT数据
|
||||
handleMqttData(data) {
|
||||
try {
|
||||
console.log('📨 收到MQTT数据:', data)
|
||||
|
||||
// 更新连接状态
|
||||
this.isConnected = true
|
||||
|
||||
// 检查数据是否为数组
|
||||
if (Array.isArray(data)) {
|
||||
console.log('📋 收到数组数据,长度:', data.length)
|
||||
|
||||
// 遍历数组中的每个设备数据
|
||||
data.forEach((deviceData, index) => {
|
||||
console.log(`📦 处理设备数据[${index}]:`, deviceData)
|
||||
this.processDeviceData(deviceData)
|
||||
})
|
||||
} else {
|
||||
// 单个设备数据
|
||||
console.log('📦 处理单个设备数据:', data)
|
||||
this.processDeviceData(data)
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('❌ 处理MQTT数据失败:', error)
|
||||
}
|
||||
}
|
||||
|
||||
// 处理单个设备数据
|
||||
processDeviceData(deviceData) {
|
||||
try {
|
||||
// 检查数据结构
|
||||
if (!deviceData || !deviceData.Device || !deviceData.Data) {
|
||||
console.warn('⚠️ 设备数据格式不符合预期:', deviceData)
|
||||
return
|
||||
}
|
||||
|
||||
const deviceType = deviceData.Device
|
||||
const deviceDataContent = deviceData.Data
|
||||
const timestamp = deviceData.timestamp || Math.floor(Date.now() / 1000)
|
||||
|
||||
console.log(`🔍 处理设备类型: ${deviceType}`)
|
||||
console.log('设备数据:', deviceDataContent)
|
||||
console.log('时间戳:', timestamp)
|
||||
|
||||
// 根据设备类型处理数据
|
||||
if (deviceType === 'WSD') {
|
||||
console.log('✅ 处理WSD设备数据 - 更新环境参数')
|
||||
this.processWSDData(deviceDataContent, timestamp)
|
||||
} else {
|
||||
console.log(`⚠️ 设备类型 ${deviceType} 暂不处理,仅打印到控制台`)
|
||||
console.log('设备详情:', {
|
||||
deviceType,
|
||||
data: deviceDataContent,
|
||||
timestamp: new Date(timestamp * 1000).toLocaleString('zh-CN')
|
||||
})
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('❌ 处理设备数据失败:', error)
|
||||
}
|
||||
}
|
||||
|
||||
// 处理WSD设备数据
|
||||
processWSDData(data, timestamp) {
|
||||
try {
|
||||
// 解析WSD数据 - 根据您提供的数据结构,WD是温度,SD是湿度
|
||||
const temperature = parseFloat(data.WD) || 0
|
||||
const humidity = parseFloat(data.SD) || 0
|
||||
|
||||
console.log('🌡️ WSD数据解析:')
|
||||
console.log('温度(WD):', temperature)
|
||||
console.log('湿度(SD):', humidity)
|
||||
|
||||
// 构建解析后的数据
|
||||
const parsedData = {
|
||||
deviceType: 'WSD',
|
||||
timestamp,
|
||||
time: new Date(timestamp * 1000).toLocaleString('zh-CN'),
|
||||
temperature,
|
||||
humidity
|
||||
}
|
||||
|
||||
// 更新最新数据
|
||||
this.updateLastData(parsedData)
|
||||
|
||||
// 通知所有监听器
|
||||
this.notifyListeners('dataUpdate', parsedData)
|
||||
|
||||
console.log('✅ WSD数据处理完成:', parsedData)
|
||||
} catch (error) {
|
||||
console.error('❌ 处理WSD数据失败:', error)
|
||||
}
|
||||
}
|
||||
|
||||
// 解析设备数据
|
||||
parseDeviceData(rawData) {
|
||||
try {
|
||||
// 如果是数组,取第一个元素
|
||||
if (Array.isArray(rawData) && rawData.length > 0) {
|
||||
rawData = rawData[0]
|
||||
}
|
||||
|
||||
// 检查数据结构
|
||||
if (!rawData || !rawData.Device || !rawData.Data) {
|
||||
console.warn('⚠️ 数据格式不符合预期:', rawData)
|
||||
return null
|
||||
}
|
||||
|
||||
const deviceType = rawData.Device
|
||||
const deviceData = rawData.Data
|
||||
const timestamp = rawData.timestamp || Math.floor(Date.now() / 1000)
|
||||
|
||||
// 根据设备类型解析数据
|
||||
let parsedData = {
|
||||
deviceType,
|
||||
timestamp,
|
||||
time: new Date(timestamp * 1000).toLocaleString('zh-CN')
|
||||
}
|
||||
|
||||
switch (deviceType) {
|
||||
case 'WSD': // 温湿度传感器
|
||||
parsedData.temperature = parseFloat(deviceData.Temperature) || 0
|
||||
parsedData.humidity = parseFloat(deviceData.Humidity) || 0
|
||||
break
|
||||
|
||||
case 'AC': // 空调设备
|
||||
parsedData.temperature = parseFloat(deviceData.Temperature) || 0
|
||||
parsedData.humidity = parseFloat(deviceData.Humidity) || 0
|
||||
break
|
||||
|
||||
case 'PM': // PM2.5传感器
|
||||
parsedData.pm25 = parseFloat(deviceData.PM25) || 0
|
||||
break
|
||||
|
||||
default:
|
||||
console.warn('⚠️ 未知设备类型:', deviceType)
|
||||
return null
|
||||
}
|
||||
|
||||
return parsedData
|
||||
} catch (error) {
|
||||
console.error('❌ 解析设备数据失败:', error)
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
// 更新最新数据
|
||||
updateLastData(parsedData) {
|
||||
if (parsedData.temperature !== undefined) {
|
||||
this.lastData.temperature = parsedData.temperature
|
||||
}
|
||||
if (parsedData.humidity !== undefined) {
|
||||
this.lastData.humidity = parsedData.humidity
|
||||
}
|
||||
if (parsedData.pm25 !== undefined) {
|
||||
this.lastData.pm25 = parsedData.pm25
|
||||
}
|
||||
this.lastData.timestamp = parsedData.timestamp
|
||||
}
|
||||
|
||||
// 添加数据监听器
|
||||
addListener(key, callback) {
|
||||
if (!this.listeners.has(key)) {
|
||||
this.listeners.set(key, [])
|
||||
}
|
||||
this.listeners.get(key).push(callback)
|
||||
|
||||
// 立即发送当前数据
|
||||
if (key === 'dataUpdate' && this.lastData.timestamp) {
|
||||
callback(this.lastData)
|
||||
}
|
||||
}
|
||||
|
||||
// 移除数据监听器
|
||||
removeListener(key, callback) {
|
||||
if (this.listeners.has(key)) {
|
||||
const callbacks = this.listeners.get(key)
|
||||
const index = callbacks.indexOf(callback)
|
||||
if (index > -1) {
|
||||
callbacks.splice(index, 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 通知监听器
|
||||
notifyListeners(key, data) {
|
||||
if (this.listeners.has(key)) {
|
||||
this.listeners.get(key).forEach(callback => {
|
||||
try {
|
||||
callback(data)
|
||||
} catch (error) {
|
||||
console.error('❌ 监听器回调执行失败:', error)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// 获取最新数据
|
||||
getLastData() {
|
||||
return { ...this.lastData }
|
||||
}
|
||||
|
||||
// 获取连接状态
|
||||
getConnectionStatus() {
|
||||
return {
|
||||
isConnected: this.isConnected,
|
||||
lastUpdate: this.lastData.timestamp ? new Date(this.lastData.timestamp * 1000).toLocaleString('zh-CN') : null
|
||||
}
|
||||
}
|
||||
|
||||
// 销毁管理器
|
||||
destroy() {
|
||||
uni.$off('mqttData', this.handleMqttData.bind(this))
|
||||
closeMqtt()
|
||||
|
||||
if (this.statusCheckInterval) {
|
||||
clearInterval(this.statusCheckInterval)
|
||||
}
|
||||
|
||||
this.listeners.clear()
|
||||
console.log('🔌 MQTT数据管理器已销毁')
|
||||
}
|
||||
}
|
||||
|
||||
// 创建全局实例
|
||||
const mqttDataManager = new MqttDataManager()
|
||||
|
||||
export default mqttDataManager
|
||||
101
src/utils/mqttTest.js
Normal file
101
src/utils/mqttTest.js
Normal file
@ -0,0 +1,101 @@
|
||||
// MQTT连接测试工具
|
||||
import { createMqtt, getConnectionStatus, closeMqtt } from './sendMqtt.js'
|
||||
|
||||
class MqttTest {
|
||||
constructor() {
|
||||
this.testResults = []
|
||||
}
|
||||
|
||||
// 运行MQTT连接测试
|
||||
async runTest() {
|
||||
console.log('🧪 开始MQTT连接测试...')
|
||||
|
||||
try {
|
||||
// 测试1: 创建连接
|
||||
console.log('📋 测试1: 创建MQTT连接')
|
||||
createMqtt()
|
||||
|
||||
// 等待连接建立
|
||||
await this.waitForConnection(10000) // 等待10秒
|
||||
|
||||
// 测试2: 检查连接状态
|
||||
console.log('📋 测试2: 检查连接状态')
|
||||
const isConnected = getConnectionStatus()
|
||||
console.log('连接状态:', isConnected)
|
||||
|
||||
// 测试3: 监听数据
|
||||
console.log('📋 测试3: 设置数据监听')
|
||||
uni.$on('mqttData', (data) => {
|
||||
console.log('✅ 收到测试数据:', data)
|
||||
this.testResults.push({
|
||||
type: 'data_received',
|
||||
data: data,
|
||||
timestamp: new Date().toISOString()
|
||||
})
|
||||
})
|
||||
|
||||
// 等待数据
|
||||
console.log('⏳ 等待MQTT数据...')
|
||||
await this.waitForData(30000) // 等待30秒
|
||||
|
||||
// 输出测试结果
|
||||
this.printTestResults()
|
||||
|
||||
} catch (error) {
|
||||
console.error('❌ MQTT测试失败:', error)
|
||||
}
|
||||
}
|
||||
|
||||
// 等待连接建立
|
||||
waitForConnection(timeout = 10000) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const startTime = Date.now()
|
||||
const checkInterval = setInterval(() => {
|
||||
const isConnected = getConnectionStatus()
|
||||
if (isConnected) {
|
||||
clearInterval(checkInterval)
|
||||
console.log('✅ MQTT连接建立成功')
|
||||
resolve(true)
|
||||
} else if (Date.now() - startTime > timeout) {
|
||||
clearInterval(checkInterval)
|
||||
console.log('⏰ MQTT连接超时')
|
||||
reject(new Error('连接超时'))
|
||||
}
|
||||
}, 1000)
|
||||
})
|
||||
}
|
||||
|
||||
// 等待数据接收
|
||||
waitForData(timeout = 30000) {
|
||||
return new Promise((resolve) => {
|
||||
const startTime = Date.now()
|
||||
const checkInterval = setInterval(() => {
|
||||
if (this.testResults.length > 0 || Date.now() - startTime > timeout) {
|
||||
clearInterval(checkInterval)
|
||||
resolve()
|
||||
}
|
||||
}, 1000)
|
||||
})
|
||||
}
|
||||
|
||||
// 打印测试结果
|
||||
printTestResults() {
|
||||
console.log('📊 MQTT测试结果:')
|
||||
console.log('连接状态:', getConnectionStatus())
|
||||
console.log('收到数据次数:', this.testResults.length)
|
||||
|
||||
if (this.testResults.length > 0) {
|
||||
console.log('最新数据:', this.testResults[this.testResults.length - 1])
|
||||
}
|
||||
}
|
||||
|
||||
// 清理测试
|
||||
cleanup() {
|
||||
uni.$off('mqttData')
|
||||
closeMqtt()
|
||||
console.log('🧹 MQTT测试清理完成')
|
||||
}
|
||||
}
|
||||
|
||||
// 导出测试实例
|
||||
export default new MqttTest()
|
||||
@ -8,7 +8,12 @@ mqtturl = "ws://122.51.194.184:8083/mqtt";
|
||||
import * as mqtt from "mqtt/dist/mqtt.min.js";
|
||||
// #endif
|
||||
|
||||
// #ifdef APP-PLUS || MP-WEIXIN
|
||||
// #ifdef APP-PLUS
|
||||
mqtturl = "wx://122.51.194.184:8083/mqtt";
|
||||
import * as mqtt from "mqtt/dist/mqtt.min.js";
|
||||
//#endif
|
||||
|
||||
// #ifdef MP-WEIXIN
|
||||
mqtturl = "wx://122.51.194.184:8083/mqtt";
|
||||
import * as mqtt from "mqtt/dist/mqtt.min.js";
|
||||
//#endif
|
||||
@ -35,9 +40,10 @@ const createMqtt = () => {
|
||||
password: "qwer1234",
|
||||
protocolVersion: 4,
|
||||
clean: true,
|
||||
reconnectPeriod: 1000, // reconnectPeriod为1000毫秒,这意味着在连接丢失之后,客户端将在1秒后尝试重新连接。
|
||||
connectTimeout: 5000, // 5s超时时间 意味着mqtt-reconnect函数5秒钟触发一次
|
||||
topic: "HDYDCJ_01_DOWN",
|
||||
reconnectPeriod: 1000, // 恢复自动重连,1秒重连一次
|
||||
connectTimeout: 5000, // 5s超时时间
|
||||
// topic: "HDYDCJ_01_DOWN",
|
||||
topic: "HDYDCJ_01_UP",
|
||||
rejectUnauthorized: false,
|
||||
// #ifdef MP-ALIPAY
|
||||
my: my,//注意这里的my
|
||||
@ -49,12 +55,44 @@ const createMqtt = () => {
|
||||
console.log('🔧 开始创建MQTT连接...');
|
||||
console.log('🔧 MQTT URL:', mqtturl);
|
||||
console.log('🔧 连接选项:', options);
|
||||
console.log('🔧 当前平台:',
|
||||
// #ifdef H5
|
||||
'H5'
|
||||
// #endif
|
||||
// #ifdef APP-PLUS
|
||||
'APP-PLUS'
|
||||
// #endif
|
||||
// #ifdef MP-WEIXIN
|
||||
'MP-WEIXIN'
|
||||
// #endif
|
||||
// #ifdef MP-ALIPAY
|
||||
'MP-ALIPAY'
|
||||
// #endif
|
||||
)
|
||||
|
||||
// 显示连接loading
|
||||
uni.showLoading({
|
||||
title: 'MQTT连接中...',
|
||||
mask: true
|
||||
});
|
||||
|
||||
client = mqtt.connect(mqtturl, options);
|
||||
initEventHandleMqtt(options.topic);
|
||||
} else {
|
||||
console.log('🔧 MQTT客户端已存在,跳过创建');
|
||||
}
|
||||
} catch (e) {
|
||||
console.error('❌ MQTT连接创建失败:', e);
|
||||
console.error('❌ 错误详情:', e.message);
|
||||
console.error('❌ 错误堆栈:', e.stack);
|
||||
|
||||
// 连接失败时隐藏loading
|
||||
uni.hideLoading();
|
||||
uni.showToast({
|
||||
title: 'MQTT连接失败' + mqtturl,
|
||||
icon: 'error',
|
||||
duration: 3000
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
@ -64,10 +102,23 @@ const initEventHandleMqtt = (topicUrl) => {
|
||||
client.on("connect", function() {
|
||||
uni.hideLoading();
|
||||
console.log("✅ MQTT连接成功");
|
||||
|
||||
// 显示连接成功提示
|
||||
uni.showToast({
|
||||
title: 'MQTT连接成功' + mqtturl,
|
||||
icon: 'success',
|
||||
duration: 2000
|
||||
});
|
||||
|
||||
//订阅主题
|
||||
client.subscribe(topicUrl, function(err) {
|
||||
if (err) {
|
||||
console.error("❌ MQTT订阅主题失败:", err);
|
||||
uni.showToast({
|
||||
title: '订阅主题失败',
|
||||
icon: 'error',
|
||||
duration: 3000
|
||||
});
|
||||
} else {
|
||||
console.log("✅ MQTT订阅主题成功:", topicUrl);
|
||||
}
|
||||
@ -84,6 +135,18 @@ const initEventHandleMqtt = (topicUrl) => {
|
||||
// 获取信息
|
||||
const mqttData = JSON.parse(message.toString());
|
||||
console.log('📋 解析后的数据:', mqttData);
|
||||
console.log('数据类型:', Array.isArray(mqttData) ? '数组' : '对象');
|
||||
|
||||
// 如果是数组,打印数组信息
|
||||
if (Array.isArray(mqttData)) {
|
||||
console.log('📋 数组长度:', mqttData.length);
|
||||
mqttData.forEach((item, index) => {
|
||||
console.log(`📦 数组[${index}]:`, item);
|
||||
if (item.Device) {
|
||||
console.log(`🔍 设备类型[${index}]: ${item.Device}`);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// 传递信息
|
||||
uni.$emit("mqttData", mqttData);
|
||||
@ -97,13 +160,14 @@ const initEventHandleMqtt = (topicUrl) => {
|
||||
client.on('reconnect', function() {
|
||||
console.log('🔄 MQTT重新连接中...');
|
||||
uni.showLoading({
|
||||
title: "重新连接"
|
||||
title: "重新连接中..."
|
||||
});
|
||||
});
|
||||
|
||||
// 当客户端无法成功连接时或发生解析错误时触发,参数 error 为错误信息
|
||||
client.on("error", function(err) {
|
||||
console.error('❌ MQTT连接错误:', err);
|
||||
uni.hideLoading();
|
||||
uni.showToast({
|
||||
title: 'MQTT连接错误',
|
||||
icon: 'error',
|
||||
@ -114,16 +178,19 @@ const initEventHandleMqtt = (topicUrl) => {
|
||||
// 在收到 Broker 发送过来的断开连接的报文时触发
|
||||
client.on('disconnect', function() {
|
||||
console.log('⚠️ MQTT连接断开');
|
||||
uni.hideLoading();
|
||||
});
|
||||
|
||||
// 在断开连接以后触发
|
||||
client.on("close", function() {
|
||||
console.log('🔌 MQTT连接关闭');
|
||||
uni.hideLoading();
|
||||
});
|
||||
|
||||
// 当客户端下线时触发
|
||||
client.on("offline", function() {
|
||||
console.log('📴 MQTT客户端离线');
|
||||
uni.hideLoading();
|
||||
});
|
||||
};
|
||||
|
||||
@ -131,6 +198,7 @@ const initEventHandleMqtt = (topicUrl) => {
|
||||
const closeMqtt = () => {
|
||||
if (client) {
|
||||
console.log('🔌 强制断开MQTT连接');
|
||||
uni.hideLoading();
|
||||
client.end();
|
||||
client = null;
|
||||
}
|
||||
@ -150,7 +218,28 @@ const judgeBeat = () => {
|
||||
|
||||
// 获取连接状态
|
||||
const getConnectionStatus = () => {
|
||||
return client && client.connected;
|
||||
if (!client) {
|
||||
console.log('🔍 连接状态检查: 客户端不存在');
|
||||
return false;
|
||||
}
|
||||
|
||||
const isConnected = client.connected;
|
||||
console.log('🔍 连接状态检查:', {
|
||||
clientExists: !!client,
|
||||
connected: isConnected,
|
||||
readyState: client.stream ? client.stream.readyState : 'unknown'
|
||||
});
|
||||
|
||||
return isConnected;
|
||||
};
|
||||
|
||||
// 手动重连函数
|
||||
const manualReconnect = () => {
|
||||
console.log('🔄 手动触发重连');
|
||||
closeMqtt();
|
||||
setTimeout(() => {
|
||||
createMqtt();
|
||||
}, 1000);
|
||||
};
|
||||
|
||||
export {
|
||||
@ -158,5 +247,6 @@ export {
|
||||
closeMqtt,
|
||||
judgeBeat,
|
||||
getConnectionStatus,
|
||||
manualReconnect,
|
||||
client,
|
||||
}
|
||||
Reference in New Issue
Block a user