diff --git a/utils/mqtt.js b/utils/mqtt.js index be45473..daf053b 100644 --- a/utils/mqtt.js +++ b/utils/mqtt.js @@ -1,257 +1,78 @@ -/** - * MQTT工具类 - 全局唯一连接、自动重连、支持自定义配置和订阅列表 - * 适配uniapp小程序/APP/H5(小程序仅支持ws/wss) - */ - -// 引入mqtt库(需先安装:npm install mqtt --save,或下载mqtt.min.js到本地引入) -import mqtt from 'mqtt' +import mqtt from 'mqtt'; // 全局变量 -let client = null; // MQTT客户端实例 -let subscribeList = []; // 订阅主题列表(重连后自动恢复订阅) -let reconnectTimer = null; // 重连定时器 -let maxReconnectTimes = 10; // 最大重连次数 -let currentReconnectTimes = 0; // 当前重连次数 -let isManualDisconnect = false; // 是否主动断开(用于区分主动/被动断开,避免主动断开后重连) +let client= { + instance : null, + connected: false +} ; /** - * 初始化MQTT连接 - * @param {Object} config - MQTT连接配置 - * @param {Array} subs - 初始订阅列表,格式:[{ topic: 'topic1', qos: 0 }, { topic: 'topic2', qos: 1 }] - * // 初始化连接(移除事件监听,只返回客户端实例) - * @returns {Promise} 连接成功/失败的Promise + * 初始化MQTT连接(无Promise,纯回调) + * @param {Object} config 连接配置 {host, port, username, password, clientId, isSSL, keepalive, clean} + * @param {Array} subs 订阅主题列表 [{topic: 'xxx', qos: 0}] */ export function initMQTT(config, subs = []) { - return new Promise((resolve, reject) => { - // 1. 校验配置(必填项) - if (!config.host || !config.port) { - reject(new Error('MQTT配置错误:host和port为必填项')); - return; - } - - // 2. 保存订阅列表(用于重连恢复) - subscribeList = subs; - - // 3. 避免重复连接 - if (client && client.connected) { - resolve(client); - return; - } - - // 4. 标记为非主动断开(允许重连) - isManualDisconnect = false; - - // 5. 构建连接地址(小程序仅支持ws/wss,优先用wss更安全) - const protocol = config.protocol || (config.isSSL ? 'wss' : 'ws'); - const connectUrl = `${protocol}://${config.host}:${config.port}/mqtt`; - - // 6. 构建MQTT连接选项 - const mqttOptions = { - clientId: config.clientId || `uni_mqtt_${Math.random().toString(16).substr(2, 8)}`, - username: config.username || '', - password: config.password || '', - keepalive: config.keepalive || 60, // 心跳间隔(秒) - clean: config.clean !== undefined ? config.clean : true, // 是否清除会话 - reconnectPeriod: 0, // 关闭内置重连(自定义重连逻辑) - connectTimeout: config.connectTimeout || 10000, // 连接超时(毫秒) - ...config.extraOptions // 额外扩展配置 - }; - - try { - // 7. 创建客户端并连接 - client = mqtt.connect(connectUrl, mqttOptions); - - // 仅保留连接成功的基础逻辑(订阅列表),移除其他事件监听 - client.on('connect', () => { - console.log('MQTT连接成功:', connectUrl); - currentReconnectTimes = 0; - // 9. 订阅初始主题 - subscribeTopics(subscribeList); - resolve(client); // 成功后返回客户端实例,让页面自定义监听 - }); - - // 移除工具层的error/close/message监听 → 交给页面处理 - // 保留重连的核心逻辑(通过单独的close监听,仅用于重连,不处理业务) - client.on('close', () => { - console.log('MQTT连接已断开(工具层)'); - const oldClient = client; // 暂存旧客户端,避免重连时被覆盖 - client = null; - if (!isManualDisconnect && currentReconnectTimes < maxReconnectTimes) { - reconnectMQTT(config).then(newClient => { - // 重连成功后,全局通知页面:客户端已更换,需重新绑定事件 - uni.$emit('mqtt_reconnected', newClient); - }).catch(err => { - uni.$emit('mqtt_reconnect_fail', err); - }); - } - // 通知页面:旧客户端已断开 - uni.$emit('mqtt_closed', oldClient); - }); - - } catch (err) { - console.error('MQTT初始化失败:', err); - reject(err); - } - }); -} - -// 其他方法(subscribeTopics/unsubscribeTopics/publishMQTT/disconnectMQTT/reconnectMQTT/getMQTTStatus/manualReconnect) -// 【完全不变】,直接复用之前的代码 -/** - * 订阅主题(支持单个/多个) - * @param {Array} topics - 订阅列表,格式:[{ topic: 'topic1', qos: 0 }, { topic: 'topic2', qos: 1 }] - */ -export function subscribeTopics(topics = []) { - if (!client || !client.connected) { - console.warn('MQTT未连接,无法订阅主题'); - return; + // 1. 已有连接:直接回调成功 + if (client.instance && client.instance.connected) { + return client; } - // 过滤空主题 - const validTopics = topics.filter(item => item && item.topic); - if (validTopics.length === 0) return; - - // 转换为mqtt库要求的格式:{ topic1: { qos: 0 }, topic2: { qos: 1 } } - const subscribeMap = {}; - validTopics.forEach(item => { - subscribeMap[item.topic] = { qos: item.qos || 0 }; - }); - - client.subscribe(subscribeMap, (err) => { - if (err) { - console.error('MQTT订阅失败:', err); - } else { - console.log('MQTT订阅成功:', validTopics.map(item => item.topic)); - // 更新订阅列表(用于重连恢复) - subscribeList = [...new Set([...subscribeList, ...validTopics])]; // 去重 - } - }); -} - -/** - * 取消订阅主题 - * @param {Array} topics - 取消订阅的主题列表,格式:['topic1', 'topic2'] - */ -export function unsubscribeTopics(topics = []) { - if (!client || !client.connected) { - console.warn('MQTT未连接,无法取消订阅'); - return; + const options = { + clientId: 'uniapp_mqtt_' + Math.random().toString(16).substr(2, 8), + username: config.username, + password: config.password, + clean: true, + connectTimeout: 4000, + reconnectPeriod: 1000 } - client.unsubscribe(topics, (err) => { - if (err) { - console.error('MQTT取消订阅失败:', err); - } else { - console.log('MQTT取消订阅成功:', topics); - // 更新订阅列表 - subscribeList = subscribeList.filter(item => !topics.includes(item.topic)); - } - }); + // 5. 构建连接参数 + const protocol = config.protocol || (config.isSSL ? 'wss' : 'ws'); + const url = `${protocol}://${config.host}:${config.port}/mqtt` + + client.instance = mqtt.connect(url, options) + + client.instance.on('connect', () => { + client.connected = true + client.instance.subscribe(subs, {qos: 0}) + console.info('已连接到MQTT服务器') + console.info(client.instance) + }) + + client.instance.on('close', () => { + console.info('连接已关闭') + client.connected = false + console.info(client.instance) + }) } -/** - * 发布消息 - * @param {String} topic - 发布的主题 - * @param {String/Buffer} payload - 发布的消息内容 - * @param {Object} options - 发布选项,如{ qos: 0, retain: false } - * @returns {Promise} 发布成功/失败的Promise - */ -export function publishMQTT(topic, payload, options = { qos: 0, retain: false }) { - return new Promise((resolve, reject) => { - if (!client || !client.connected) { - reject(new Error('MQTT未连接,无法发布消息')); - return; - } - - if (!topic) { - reject(new Error('发布失败:主题不能为空')); - return; - } - - client.publish(topic, payload, options, (err) => { - if (err) { - console.error(`MQTT发布${topic}失败:`, err); - reject(err); - } else { - console.log(`MQTT发布${topic}成功:`, payload); - resolve(true); - } - }); - }); -} /** - * 断开MQTT连接(主动断开,不会触发重连) + * 断开MQTT连接(无Promise,纯回调) + * @param {Function} onComplete 完成回调 () => {} */ export function disconnectMQTT() { - // 标记为主动断开 - isManualDisconnect = true; - // 清除重连定时器 - if (reconnectTimer) { - clearTimeout(reconnectTimer); - reconnectTimer = null; - } - // 断开连接 - if (client && client.connected) { - client.end(false, () => { // false:不发送遗嘱消息 - console.log('MQTT主动断开连接'); - client = null; - subscribeList = []; - }); + if (client.instance && client.connected) { + client.instance.end() + client.connected = false + console.info('已断开MQTT连接') } } /** - * 重连MQTT(内部调用,也可外部手动触发) - * @param {Object} config - 连接配置(同initMQTT的config) + * 获取MQTT客户端实例 + * @returns {Client} MQTT客户端 */ -export function reconnectMQTT(config) { - // 清除已有定时器,避免重复触发 - if (reconnectTimer) { - clearTimeout(reconnectTimer); - } - - // 重连间隔:1秒/次(可自定义) - reconnectTimer = setTimeout(async () => { - currentReconnectTimes++; - console.log(`MQTT重连中,第${currentReconnectTimes}/${maxReconnectTimes}次`); - try { - await initMQTT(config, subscribeList); - } catch (err) { - // 重连失败,继续尝试(直到达到最大次数) - if (currentReconnectTimes < maxReconnectTimes) { - reconnectMQTT(config); - } else { - console.error('MQTT重连次数已达上限,停止重连'); - uni.$emit('mqtt_reconnect_fail'); // 全局通知重连失败 - } - } - }, 1000); +export function getMQTTClientInstance() { + return client.instance; } /** - * 获取MQTT客户端状态 - * @returns {Object} 状态信息 + * 获取MQTT连接状态 + * @returns {Object} {isConnected: Boolean, isConnecting: Boolean} */ export function getMQTTStatus() { return { - isConnected: !!client && client.connected, // 是否连接 - client: client, // 客户端实例 - subscribeList: [...subscribeList], // 已订阅列表(浅拷贝,避免外部修改) - currentReconnectTimes, // 当前重连次数 - maxReconnectTimes // 最大重连次数 + isConnected: !!client && client.connected, }; } - -/** - * 手动触发重连(外部调用,比如页面主动刷新连接) - * @param {Object} config - 连接配置 - */ -export function manualReconnect(config) { - currentReconnectTimes = 0; // 重置重连次数 - reconnectMQTT(config); -} - -export function getMQTTClientInstance() { - return client; -} \ No newline at end of file