Files
movecheck/src/utils/uniMqttClient.js

744 lines
24 KiB
JavaScript
Raw Normal View History

2025-09-27 14:59:48 +08:00
// uni-app兼容的MQTT客户端
import { MQTT_CONFIG, DataParser } from '@/config/mqtt'
// 兼容性处理为App环境提供TextEncoder和TextDecoder
if (typeof TextEncoder === 'undefined') {
global.TextEncoder = class {
encode(str) {
const utf8 = unescape(encodeURIComponent(str));
const result = new Uint8Array(utf8.length);
for (let i = 0; i < utf8.length; i++) {
result[i] = utf8.charCodeAt(i);
}
return result;
}
};
}
if (typeof TextDecoder === 'undefined') {
global.TextDecoder = class {
constructor(encoding = 'utf-8') {
this.encoding = encoding;
}
decode(bytes) {
if (this.encoding === 'utf-8') {
let result = '';
for (let i = 0; i < bytes.length; i++) {
result += String.fromCharCode(bytes[i]);
}
try {
return decodeURIComponent(escape(result));
} catch (e) {
return result;
}
} else if (this.encoding === 'ascii') {
let result = '';
for (let i = 0; i < bytes.length; i++) {
result += String.fromCharCode(bytes[i]);
}
return result;
}
return '';
}
};
}
class UniMqttClient {
constructor() {
this.socketTask = null
this.isConnected = false
this.subscriptions = new Map()
this.messageHandlers = new Map()
this.reconnectAttempts = 0
this.maxReconnectAttempts = 5
this.reconnectTimer = null
this.heartbeatTimer = null
this.messageId = 1
}
// 连接MQTT服务器
async connect() {
try {
console.log('🔧 ===== MQTT连接开始 =====')
console.log('🔧 正在连接MQTT服务器:', MQTT_CONFIG.broker)
console.log('🔧 环境检测:', {
isApp: typeof plus !== 'undefined' || typeof window === 'undefined',
plus: typeof plus,
window: typeof window
})
// 发送日志到页面
this.sendLogToPage('===== MQTT连接开始 =====', 'info')
this.sendLogToPage(`正在连接MQTT服务器: ${MQTT_CONFIG.broker}`, 'info')
this.sendLogToPage(`环境检测: isApp=${typeof plus !== 'undefined' || typeof window === 'undefined'}, plus=${typeof plus}, window=${typeof window}`, 'info')
// 添加连接超时保护
const connectTimeout = setTimeout(() => {
console.error('🔧 MQTT连接超时')
this.sendLogToPage('MQTT连接超时', 'error')
this.handleConnectionError(new Error('Connection timeout'))
}, 10000) // 10秒超时
// 解析WebSocket URL
const wsUrl = MQTT_CONFIG.broker.replace('ws://', '').replace('wss://', '')
const [host, path] = wsUrl.split('/')
const [hostname, port] = host.split(':')
const protocol = MQTT_CONFIG.broker.startsWith('wss://') ? 'wss' : 'ws'
const fullUrl = `${protocol}://${hostname}:${port || (protocol === 'wss' ? 443 : 80)}/${path || ''}`
console.log('🔧 WebSocket连接地址:', fullUrl)
console.log('🔧 解析结果:', { hostname, port, path, protocol })
this.sendLogToPage(`WebSocket连接地址: ${fullUrl}`, 'info')
this.sendLogToPage(`解析结果: hostname=${hostname}, port=${port}, path=${path}, protocol=${protocol}`, 'info')
this.socketTask = uni.connectSocket({
url: fullUrl,
protocols: ['mqtt'],
success: () => {
console.log('🔧 WebSocket连接请求发送成功')
this.sendLogToPage('WebSocket连接请求发送成功', 'info')
},
fail: (error) => {
console.error('🔧 WebSocket连接失败:', error)
console.error('🔧 连接参数:', { url: fullUrl, protocols: ['mqtt'] })
this.sendLogToPage(`WebSocket连接失败: ${JSON.stringify(error)}`, 'error')
this.handleConnectionError(error)
}
})
return new Promise((resolve, reject) => {
this.socketTask.onOpen(() => {
console.log('🔧 WebSocket连接成功')
console.log('🔧 开始发送MQTT CONNECT消息...')
this.sendLogToPage('WebSocket连接成功', 'success')
this.sendLogToPage('开始发送MQTT CONNECT消息...', 'info')
// 发送MQTT CONNECT消息
this.sendConnectMessage()
.then(() => {
console.log('🔧 MQTT CONNECT成功连接建立')
this.sendLogToPage('MQTT CONNECT成功连接建立', 'success')
this.isConnected = true
this.reconnectAttempts = 0
this.startHeartbeat()
clearTimeout(connectTimeout) // 清除超时定时器
resolve()
})
.catch((error) => {
console.error('🔧 MQTT CONNECT失败:', error)
this.sendLogToPage(`MQTT CONNECT失败: ${error.message}`, 'error')
clearTimeout(connectTimeout) // 清除超时定时器
reject(error)
})
})
this.socketTask.onMessage((res) => {
this.handleMessage(res.data)
})
this.socketTask.onError((error) => {
console.error('🔧 WebSocket错误:', error)
console.error('🔧 错误详情:', JSON.stringify(error))
this.sendLogToPage(`WebSocket错误: ${JSON.stringify(error)}`, 'error')
clearTimeout(connectTimeout) // 清除超时定时器
this.handleConnectionError(error)
reject(error)
})
this.socketTask.onClose((closeInfo) => {
console.log('🔧 WebSocket连接关闭')
console.log('🔧 关闭信息:', closeInfo)
this.sendLogToPage(`WebSocket连接关闭: ${JSON.stringify(closeInfo)}`, 'warning')
this.isConnected = false
this.stopHeartbeat()
this.attemptReconnect()
})
})
} catch (error) {
console.error('MQTT连接异常:', error)
throw error
}
}
// 处理连接错误
handleConnectionError(error) {
this.isConnected = false
this.stopHeartbeat()
console.error('MQTT连接错误:', error)
}
// 尝试重连
attemptReconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++
console.log(`MQTT重连中... (${this.reconnectAttempts}/${this.maxReconnectAttempts})`)
this.sendLogToPage(`MQTT重连中... (${this.reconnectAttempts}/${this.maxReconnectAttempts})`, 'warning')
this.reconnectTimer = setTimeout(() => {
this.connect().catch(error => {
console.error('重连失败:', error)
this.sendLogToPage(`重连失败: ${error.message}`, 'error')
})
}, 5000)
} else {
console.error('MQTT重连次数超限停止重连')
this.sendLogToPage('MQTT重连次数超限停止重连', 'error')
}
}
// 发送日志到页面
sendLogToPage(message, type = 'info') {
try {
// 通过全局事件发送日志
if (typeof window !== 'undefined' && window.dispatchEvent) {
window.dispatchEvent(new CustomEvent('mqtt-log', {
detail: { message, type }
}))
}
} catch (error) {
// 忽略错误避免影响MQTT连接
}
}
// 开始心跳
startHeartbeat() {
this.heartbeatTimer = setInterval(() => {
if (this.isConnected && this.socketTask) {
// 发送PING消息
this.sendPing()
}
}, 30000) // 30秒心跳
}
// 停止心跳
stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer)
this.heartbeatTimer = null
}
}
// 发送MQTT CONNECT消息
sendConnectMessage() {
return new Promise((resolve, reject) => {
// 保存Promise的resolve和reject函数
this.connectResolve = resolve
this.connectReject = reject
try {
const options = MQTT_CONFIG.options
const clientId = options.clientId || 'uni-mqtt-client-' + Math.random().toString(16).substr(2, 8)
// 构建CONNECT消息
const protocolName = 'MQTT'
const protocolLevel = 4 // MQTT 3.1.1
// 构建CONNECT消息的各个部分
const protocolNameBytes = new TextEncoder().encode(protocolName)
const clientIdBytes = new TextEncoder().encode(clientId)
// 可变头部
let connectFlags = 0x02 // Clean Session
if (options.username) {
connectFlags |= 0x80 // Username flag
}
if (options.password) {
connectFlags |= 0x40 // Password flag
}
const variableHeaderParts = [
new Uint8Array([0x00, protocolNameBytes.length]), // 协议名长度
protocolNameBytes, // 协议名
new Uint8Array([protocolLevel]), // 协议级别
new Uint8Array([connectFlags]), // 连接标志
new Uint8Array([0x00, options.keepalive || 60]), // 保持连接时间
]
// 载荷
let payloadParts = [
new Uint8Array([0x00, clientIdBytes.length]), // 客户端ID长度
clientIdBytes, // 客户端ID
]
// 如果有用户名和密码,添加到载荷
if (options.username) {
const usernameBytes = new TextEncoder().encode(options.username)
payloadParts.push(
new Uint8Array([0x00, usernameBytes.length]), // 用户名长度
usernameBytes // 用户名
)
}
if (options.password) {
const passwordBytes = new TextEncoder().encode(options.password)
payloadParts.push(
new Uint8Array([0x00, passwordBytes.length]), // 密码长度
passwordBytes // 密码
)
}
// 合并所有部分
const allParts = [
new Uint8Array([0x10]), // CONNECT消息类型
this.encodeRemainingLength(variableHeaderParts.reduce((sum, part) => sum + part.length, 0) +
payloadParts.reduce((sum, part) => sum + part.length, 0)),
...variableHeaderParts,
...payloadParts
]
const connectMessage = this.concatUint8Arrays(allParts)
console.log('🔧 发送MQTT CONNECT消息')
console.log('🔧 CONNECT消息长度:', connectMessage.length)
console.log('🔧 CONNECT消息内容 (Hex):', Array.from(connectMessage).map(b => b.toString(16).padStart(2, '0')).join(' '))
this.socketTask.send({
data: connectMessage.buffer,
success: () => {
console.log('🔧 CONNECT消息发送成功等待CONNACK响应...')
// 设置超时等待CONNACK
this.connackTimeout = setTimeout(() => {
console.error('🔧 等待CONNACK超时 (5秒)')
if (this.connectReject) {
this.connectReject(new Error('CONNACK timeout'))
this.connectReject = null
}
}, 5000)
},
fail: (error) => {
console.error('🔧 CONNECT消息发送失败:', error)
console.error('🔧 发送失败详情:', JSON.stringify(error))
if (this.connectReject) {
this.connectReject(error)
this.connectReject = null
}
}
})
} catch (error) {
console.error('构建CONNECT消息失败:', error)
if (this.connectReject) {
this.connectReject(error)
this.connectReject = null
}
}
})
}
// 发送PING消息
sendPing() {
try {
// MQTT PINGREQ消息 (0xC0, 0x00)
const pingMessage = new Uint8Array([0xC0, 0x00])
this.socketTask.send({
data: pingMessage.buffer
})
} catch (error) {
console.error('发送心跳失败:', error)
}
}
// 订阅主题
subscribe(topic, handler) {
if (!this.isConnected) {
console.warn('MQTT未连接无法订阅主题:', topic)
return false
}
try {
// MQTT SUBSCRIBE消息
const messageId = this.messageId++
const topicBytes = new TextEncoder().encode(topic)
const topicLength = topicBytes.length
const payloadParts = [
new Uint8Array([0x00, topicLength]), // 主题长度
topicBytes, // 主题内容
new Uint8Array([0x00]) // QoS 0
]
const payload = this.concatUint8Arrays(payloadParts)
const remainingLength = 2 + payload.length // messageId + payload
const subscribeMessage = this.concatUint8Arrays([
new Uint8Array([0x82]), // SUBSCRIBE消息类型
this.encodeRemainingLength(remainingLength),
new Uint8Array([(messageId >> 8) & 0xFF, messageId & 0xFF]), // messageId
payload
])
console.log('📤 发送MQTT SUBSCRIBE消息')
console.log('订阅主题:', topic)
console.log('消息ID:', messageId)
this.socketTask.send({
data: subscribeMessage.buffer
})
this.subscriptions.set(topic, true)
this.messageHandlers.set(topic, handler)
console.log('订阅主题成功:', topic)
return true
} catch (error) {
console.error('订阅主题失败:', topic, error)
return false
}
}
// 取消订阅
unsubscribe(topic) {
if (this.subscriptions.has(topic)) {
try {
// MQTT UNSUBSCRIBE消息
const messageId = this.messageId++
const topicLength = Buffer.byteLength(topic, 'utf8')
const payload = Buffer.concat([
Buffer.from([0x00, topicLength]), // 主题长度
Buffer.from(topic, 'utf8') // 主题内容
])
const remainingLength = 2 + payload.length // messageId + payload
const unsubscribeMessage = Buffer.concat([
Buffer.from([0xA2]), // UNSUBSCRIBE消息类型
this.encodeRemainingLength(remainingLength),
Buffer.from([(messageId >> 8) & 0xFF, messageId & 0xFF]), // messageId
payload
])
this.socketTask.send({
data: unsubscribeMessage.buffer
})
this.subscriptions.delete(topic)
this.messageHandlers.delete(topic)
console.log('取消订阅主题:', topic)
} catch (error) {
console.error('取消订阅失败:', topic, error)
}
}
}
// 发布消息
publish(topic, message) {
if (!this.isConnected) {
console.warn('MQTT未连接无法发布消息')
return false
}
try {
const payload = typeof message === 'object' ? JSON.stringify(message) : message
const topicLength = Buffer.byteLength(topic, 'utf8')
const payloadBuffer = Buffer.from(payload, 'utf8')
const remainingLength = 2 + topicLength + payloadBuffer.length
const publishMessage = Buffer.concat([
Buffer.from([0x30]), // PUBLISH消息类型
this.encodeRemainingLength(remainingLength),
Buffer.from([(topicLength >> 8) & 0xFF, topicLength & 0xFF]), // 主题长度
Buffer.from(topic, 'utf8'), // 主题内容
payloadBuffer // 消息内容
])
this.socketTask.send({
data: publishMessage.buffer
})
console.log('发布消息成功:', topic, payload)
return true
} catch (error) {
console.error('发布消息失败:', topic, error)
return false
}
}
// 处理接收到的消息
handleMessage(data) {
try {
const buffer = new Uint8Array(data)
const messageType = (buffer[0] >> 4) & 0x0F
const flags = buffer[0] & 0x0F
// 打印协议内容
console.log('📦 收到MQTT协议消息:')
console.log('消息类型:', messageType, this.getMessageTypeName(messageType))
if (messageType === 2) { // CONNACK消息
this.handleConnackMessage(buffer)
} else if (messageType === 3) { // PUBLISH消息
this.handlePublishMessage(buffer)
} else if (messageType === 9) { // SUBACK消息
console.log('订阅确认收到')
} else if (messageType === 13) { // PINGRESP消息
console.log('心跳响应收到')
} else {
console.log('收到未知消息类型:', messageType)
}
} catch (error) {
console.error('处理消息失败:', error)
}
}
// 获取消息类型名称
getMessageTypeName(type) {
const typeNames = {
0: 'RESERVED',
1: 'CONNECT',
2: 'CONNACK',
3: 'PUBLISH',
4: 'PUBACK',
5: 'PUBREC',
6: 'PUBREL',
7: 'PUBCOMP',
8: 'SUBSCRIBE',
9: 'SUBACK',
10: 'UNSUBSCRIBE',
11: 'UNSUBACK',
12: 'PINGREQ',
13: 'PINGRESP',
14: 'DISCONNECT',
15: 'RESERVED'
}
return typeNames[type] || 'UNKNOWN'
}
// 处理CONNACK消息
handleConnackMessage(buffer) {
try {
console.log('🔧 收到CONNACK消息')
if (buffer.length >= 4) {
const returnCode = buffer[3]
console.log('🔧 CONNACK返回码:', returnCode)
if (returnCode === 0) {
console.log('🔧 ✅ MQTT连接成功')
this.isConnected = true
this.reconnectAttempts = 0
// 清除超时定时器
if (this.connackTimeout) {
clearTimeout(this.connackTimeout)
this.connackTimeout = null
}
// 触发连接成功的Promise resolve
if (this.connectResolve) {
this.connectResolve()
this.connectResolve = null
}
} else {
console.error('🔧 ❌ MQTT连接被拒绝返回码:', returnCode)
const errorMessages = {
1: '连接被拒绝,不支持的协议版本',
2: '连接被拒绝,不合格的客户端标识符',
3: '连接被拒绝,服务端不可用',
4: '连接被拒绝,无效的用户名或密码',
5: '连接被拒绝,未授权'
}
console.error('🔧 错误原因:', errorMessages[returnCode] || '未知错误')
this.isConnected = false
// 清除超时定时器
if (this.connackTimeout) {
clearTimeout(this.connackTimeout)
this.connackTimeout = null
}
// 触发连接失败的Promise reject
if (this.connectReject) {
this.connectReject(new Error(`MQTT连接被拒绝: ${errorMessages[returnCode] || '未知错误'}`))
this.connectReject = null
}
}
} else {
console.error('🔧 CONNACK消息长度不足:', buffer.length)
}
} catch (error) {
console.error('🔧 解析CONNACK消息失败:', error)
}
}
// 处理PUBLISH消息
handlePublishMessage(buffer) {
try {
let offset = 1
// 解析剩余长度
const { length, bytesRead } = this.decodeRemainingLength(buffer, offset)
offset += bytesRead
console.log('🔍 PUBLISH消息解析:')
console.log('剩余长度:', length)
// 解析主题长度
const topicLength = (buffer[offset] << 8) | buffer[offset + 1]
offset += 2
console.log('主题长度:', topicLength)
// 解析主题
const topic = new TextDecoder().decode(buffer.slice(offset, offset + topicLength))
offset += topicLength
console.log('主题名称:', topic)
// 解析消息内容
const messageBytes = buffer.slice(offset)
console.log('🔍 消息字节数据:')
console.log('字节长度:', messageBytes.length)
console.log('字节数据 (Hex):', Array.from(messageBytes).map(b => b.toString(16).padStart(2, '0')).join(' '))
console.log('字节数据 (Dec):', Array.from(messageBytes).join(' '))
// 尝试不同的编码方式
let messageData
try {
// 首先尝试UTF-8解码
messageData = new TextDecoder('utf-8').decode(messageBytes)
console.log('✅ UTF-8解码成功')
} catch (error) {
console.log('❌ UTF-8解码失败尝试其他编码')
try {
// 尝试ASCII解码
messageData = new TextDecoder('ascii').decode(messageBytes)
console.log('✅ ASCII解码成功')
} catch (error2) {
console.log('❌ ASCII解码失败使用原始字节')
messageData = Array.from(messageBytes).map(b => String.fromCharCode(b)).join('')
}
}
console.log('📨 收到MQTT消息:')
console.log('主题:', topic)
console.log('解码后消息:', messageData)
console.log('消息内容 (Hex):', Array.from(new TextEncoder().encode(messageData)).map(b => b.toString(16).padStart(2, '0')).join(' '))
// 数据类型判断和JSON转换
console.log('🔍 数据类型分析:')
console.log('数据类型:', typeof messageData)
// 尝试解析JSON
try {
console.log('🔄 尝试解析JSON...')
const jsonData = JSON.parse(messageData)
console.log('✅ JSON解析成功!')
console.log('解析后的数据类型:', typeof jsonData)
// 如果是数组,显示数组信息
if (Array.isArray(jsonData)) {
console.log('📋 数组信息:')
console.log('数组长度:', jsonData.length)
jsonData.forEach((item, index) => {
console.log(`数组[${index}]:`, typeof item, item)
})
}
// 如果是对象,显示对象信息
if (jsonData && typeof jsonData === 'object' && !Array.isArray(jsonData)) {
console.log('📋 对象信息:')
console.log('对象键:', Object.keys(jsonData))
Object.entries(jsonData).forEach(([key, value]) => {
console.log(`${key}:`, typeof value, value)
})
}
} catch (jsonError) {
console.log('❌ JSON解析失败:', jsonError.message)
console.log('原始数据可能不是有效的JSON格式')
}
// 解析设备数据并调用订阅的回调函数
console.log('🔍 开始解析设备数据...')
const parsedData = DataParser.parseDeviceData(messageData)
if (parsedData) {
console.log('✅ 设备数据解析成功')
// 调用订阅的回调函数
const handler = this.messageHandlers.get(topic)
if (handler) {
try {
handler(parsedData)
} catch (error) {
console.error('❌ 消息处理器执行失败:', error)
}
} else {
console.log('⚠️ 未找到主题处理器:', topic)
}
} else {
console.log('❌ 设备数据解析失败或数据为空')
}
} catch (error) {
console.error('解析PUBLISH消息失败:', error)
}
}
// 合并Uint8Array数组
concatUint8Arrays(arrays) {
const totalLength = arrays.reduce((sum, arr) => sum + arr.length, 0)
const result = new Uint8Array(totalLength)
let offset = 0
for (const arr of arrays) {
result.set(arr, offset)
offset += arr.length
}
return result
}
// 编码剩余长度
encodeRemainingLength(length) {
const bytes = []
do {
let byte = length % 128
length = Math.floor(length / 128)
if (length > 0) {
byte |= 0x80
}
bytes.push(byte)
} while (length > 0)
return new Uint8Array(bytes)
}
// 解码剩余长度
decodeRemainingLength(buffer, offset) {
let multiplier = 1
let value = 0
let bytesRead = 0
let byte
do {
if (offset + bytesRead >= buffer.length) {
throw new Error('Invalid remaining length')
}
byte = buffer[offset + bytesRead]
value += (byte & 0x7F) * multiplier
multiplier *= 128
bytesRead++
} while ((byte & 0x80) !== 0)
return { length: value, bytesRead }
}
// 获取连接状态
getConnectionStatus() {
return {
isConnected: this.isConnected,
reconnectAttempts: this.reconnectAttempts,
subscriptions: Array.from(this.subscriptions.keys())
}
}
// 断开连接
disconnect() {
if (this.socketTask) {
this.socketTask.close()
this.isConnected = false
this.subscriptions.clear()
this.messageHandlers.clear()
this.stopHeartbeat()
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer)
this.reconnectTimer = null
}
console.log('MQTT连接已断开')
}
}
}
export default new UniMqttClient()