/** * MQTT工具类 - 全局唯一连接、自动重连、支持自定义配置和订阅列表 * 适配uniapp小程序/APP/H5(小程序仅支持ws/wss) */ // 引入mqtt库(需先安装:npm install mqtt --save,或下载mqtt.min.js到本地引入) import mqtt from 'mqtt' // 全局变量 let client = null; // MQTT客户端实例 let subscribeList = []; // 订阅主题列表(重连后自动恢复订阅) let reconnectTimer = null; // 重连定时器 let maxReconnectTimes = 10; // 最大重连次数 let currentReconnectTimes = 0; // 当前重连次数 let isManualDisconnect = false; // 是否主动断开(用于区分主动/被动断开,避免主动断开后重连) /** * 初始化MQTT连接 * @param {Object} config - MQTT连接配置 * @param {Array} subs - 初始订阅列表,格式:[{ topic: 'topic1', qos: 0 }, { topic: 'topic2', qos: 1 }] * // 初始化连接(移除事件监听,只返回客户端实例) * @returns {Promise} 连接成功/失败的Promise */ export function initMQTT(config, subs = []) { return new Promise((resolve, reject) => { // 1. 校验配置(必填项) if (!config.host || !config.port) { reject(new Error('MQTT配置错误:host和port为必填项')); return; } // 2. 保存订阅列表(用于重连恢复) subscribeList = subs; // 3. 避免重复连接 if (client && client.connected) { resolve(client); return; } // 4. 标记为非主动断开(允许重连) isManualDisconnect = false; // 5. 构建连接地址(小程序仅支持ws/wss,优先用wss更安全) const protocol = config.protocol || (config.isSSL ? 'wss' : 'ws'); const connectUrl = `${protocol}://${config.host}:${config.port}/mqtt`; // 6. 构建MQTT连接选项 const mqttOptions = { clientId: config.clientId || `uni_mqtt_${Math.random().toString(16).substr(2, 8)}`, username: config.username || '', password: config.password || '', keepalive: config.keepalive || 60, // 心跳间隔(秒) clean: config.clean !== undefined ? config.clean : true, // 是否清除会话 reconnectPeriod: 0, // 关闭内置重连(自定义重连逻辑) connectTimeout: config.connectTimeout || 10000, // 连接超时(毫秒) ...config.extraOptions // 额外扩展配置 }; try { // 7. 创建客户端并连接 client = mqtt.connect(connectUrl, mqttOptions); // 仅保留连接成功的基础逻辑(订阅列表),移除其他事件监听 client.on('connect', () => { console.log('MQTT连接成功:', connectUrl); currentReconnectTimes = 0; // 9. 订阅初始主题 subscribeTopics(subscribeList); resolve(client); // 成功后返回客户端实例,让页面自定义监听 }); // 移除工具层的error/close/message监听 → 交给页面处理 // 保留重连的核心逻辑(通过单独的close监听,仅用于重连,不处理业务) client.on('close', () => { console.log('MQTT连接已断开(工具层)'); const oldClient = client; // 暂存旧客户端,避免重连时被覆盖 client = null; if (!isManualDisconnect && currentReconnectTimes < maxReconnectTimes) { reconnectMQTT(config).then(newClient => { // 重连成功后,全局通知页面:客户端已更换,需重新绑定事件 uni.$emit('mqtt_reconnected', newClient); }).catch(err => { uni.$emit('mqtt_reconnect_fail', err); }); } // 通知页面:旧客户端已断开 uni.$emit('mqtt_closed', oldClient); }); } catch (err) { console.error('MQTT初始化失败:', err); reject(err); } }); } // 其他方法(subscribeTopics/unsubscribeTopics/publishMQTT/disconnectMQTT/reconnectMQTT/getMQTTStatus/manualReconnect) // 【完全不变】,直接复用之前的代码 /** * 订阅主题(支持单个/多个) * @param {Array} topics - 订阅列表,格式:[{ topic: 'topic1', qos: 0 }, { topic: 'topic2', qos: 1 }] */ export function subscribeTopics(topics = []) { if (!client || !client.connected) { console.warn('MQTT未连接,无法订阅主题'); return; } // 过滤空主题 const validTopics = topics.filter(item => item && item.topic); if (validTopics.length === 0) return; // 转换为mqtt库要求的格式:{ topic1: { qos: 0 }, topic2: { qos: 1 } } const subscribeMap = {}; validTopics.forEach(item => { subscribeMap[item.topic] = { qos: item.qos || 0 }; }); client.subscribe(subscribeMap, (err) => { if (err) { console.error('MQTT订阅失败:', err); } else { console.log('MQTT订阅成功:', validTopics.map(item => item.topic)); // 更新订阅列表(用于重连恢复) subscribeList = [...new Set([...subscribeList, ...validTopics])]; // 去重 } }); } /** * 取消订阅主题 * @param {Array} topics - 取消订阅的主题列表,格式:['topic1', 'topic2'] */ export function unsubscribeTopics(topics = []) { if (!client || !client.connected) { console.warn('MQTT未连接,无法取消订阅'); return; } client.unsubscribe(topics, (err) => { if (err) { console.error('MQTT取消订阅失败:', err); } else { console.log('MQTT取消订阅成功:', topics); // 更新订阅列表 subscribeList = subscribeList.filter(item => !topics.includes(item.topic)); } }); } /** * 发布消息 * @param {String} topic - 发布的主题 * @param {String/Buffer} payload - 发布的消息内容 * @param {Object} options - 发布选项,如{ qos: 0, retain: false } * @returns {Promise} 发布成功/失败的Promise */ export function publishMQTT(topic, payload, options = { qos: 0, retain: false }) { return new Promise((resolve, reject) => { if (!client || !client.connected) { reject(new Error('MQTT未连接,无法发布消息')); return; } if (!topic) { reject(new Error('发布失败:主题不能为空')); return; } client.publish(topic, payload, options, (err) => { if (err) { console.error(`MQTT发布${topic}失败:`, err); reject(err); } else { console.log(`MQTT发布${topic}成功:`, payload); resolve(true); } }); }); } /** * 断开MQTT连接(主动断开,不会触发重连) */ export function disconnectMQTT() { // 标记为主动断开 isManualDisconnect = true; // 清除重连定时器 if (reconnectTimer) { clearTimeout(reconnectTimer); reconnectTimer = null; } // 断开连接 if (client && client.connected) { client.end(false, () => { // false:不发送遗嘱消息 console.log('MQTT主动断开连接'); client = null; subscribeList = []; }); } } /** * 重连MQTT(内部调用,也可外部手动触发) * @param {Object} config - 连接配置(同initMQTT的config) */ export function reconnectMQTT(config) { // 清除已有定时器,避免重复触发 if (reconnectTimer) { clearTimeout(reconnectTimer); } // 重连间隔:1秒/次(可自定义) reconnectTimer = setTimeout(async () => { currentReconnectTimes++; console.log(`MQTT重连中,第${currentReconnectTimes}/${maxReconnectTimes}次`); try { await initMQTT(config, subscribeList); } catch (err) { // 重连失败,继续尝试(直到达到最大次数) if (currentReconnectTimes < maxReconnectTimes) { reconnectMQTT(config); } else { console.error('MQTT重连次数已达上限,停止重连'); uni.$emit('mqtt_reconnect_fail'); // 全局通知重连失败 } } }, 1000); } /** * 获取MQTT客户端状态 * @returns {Object} 状态信息 */ export function getMQTTStatus() { return { isConnected: !!client && client.connected, // 是否连接 client: client, // 客户端实例 subscribeList: [...subscribeList], // 已订阅列表(浅拷贝,避免外部修改) currentReconnectTimes, // 当前重连次数 maxReconnectTimes // 最大重连次数 }; } /** * 手动触发重连(外部调用,比如页面主动刷新连接) * @param {Object} config - 连接配置 */ export function manualReconnect(config) { currentReconnectTimes = 0; // 重置重连次数 reconnectMQTT(config); } export function getMQTTClientInstance() { return client; }