diff --git a/utils/mqtt.js b/utils/mqtt.js index daf053b..7b2e95b 100644 --- a/utils/mqtt.js +++ b/utils/mqtt.js @@ -1,61 +1,151 @@ import mqtt from 'mqtt'; -// 全局变量 -let client= { - instance : null, - connected: false -} ; +// 全局变量:封装更完整的状态 +let client = { + instance: null, // MQTT客户端实例 + connected: false, // 最终连接状态 + isConnecting: false, // 连接中状态(防止重复连接) + isManualDisconnect: false // 是否手动断开(区分主动/被动断开) +}; /** * 初始化MQTT连接(无Promise,纯回调) * @param {Object} config 连接配置 {host, port, username, password, clientId, isSSL, keepalive, clean} * @param {Array} subs 订阅主题列表 [{topic: 'xxx', qos: 0}] + * @param {Function} onSuccess 成功回调 (client) => {} + * @param {Function} onError 失败回调 (err) => {} */ -export function initMQTT(config, subs = []) { - // 1. 已有连接:直接回调成功 - if (client.instance && client.instance.connected) { +export function initMQTT(config, subs = [], onSuccess, onError) { + // 1. 已有有效连接:直接返回 + if (client.instance && client.connected) { + onSuccess && onSuccess(client.instance); return client; } - const options = { - clientId: 'uniapp_mqtt_' + Math.random().toString(16).substr(2, 8), - username: config.username, - password: config.password, - clean: true, - connectTimeout: 4000, - reconnectPeriod: 1000 + // 2. 正在连接中:防止重复请求 + if (client.isConnecting) { + console.info('MQTT已有连接请求在途,等待结果...'); + // 轮询检查连接状态,直到连接完成/失败 + const checkTimer = setInterval(() => { + if (!client.isConnecting) { + clearInterval(checkTimer); + if (client.connected) { + onSuccess && onSuccess(client.instance); + } else { + onError && onError(new Error('MQTT连接请求失败')); + } + } + }, 100); + return; } + // 3. 参数校验 + if (!config.host || !config.port) { + const err = new Error('MQTT配置错误:host/port不能为空'); + onError && onError(err); + return; + } + + // 4. 标记连接中 + client.isConnecting = true; + client.isManualDisconnect = false; + // 5. 构建连接参数 const protocol = config.protocol || (config.isSSL ? 'wss' : 'ws'); - const url = `${protocol}://${config.host}:${config.port}/mqtt` + const url = `${protocol}://${config.host}:${config.port}/mqtt`; - client.instance = mqtt.connect(url, options) + const options = { + clientId: config.clientId || ('uniapp_mqtt_' + Math.random().toString(16).substr(2, 8)), + username: config.username || '', + password: config.password || '', + clean: config.clean !== undefined ? config.clean : true, + connectTimeout: config.connectTimeout || 4000, + reconnectPeriod: config.reconnectPeriod || 1000, // 自动重连间隔(可配置) + keepalive: config.keepalive || 60 + }; - client.instance.on('connect', () => { - client.connected = true - client.instance.subscribe(subs, {qos: 0}) - console.info('已连接到MQTT服务器') - console.info(client.instance) - }) + // 6. 创建客户端并绑定事件 + try { + client.instance = mqtt.connect(url, options); - client.instance.on('close', () => { - console.info('连接已关闭') - client.connected = false - console.info(client.instance) - }) + // 连接成功 + client.instance.on('connect', () => { + console.info('已连接到MQTT服务器'); + client.connected = true; + client.isConnecting = false; + // 订阅主题(带错误处理) + client.instance.subscribe(subs, { qos: 0 }, (err) => { + if (err) { + console.error('MQTT订阅失败:', err); + onError && onError(new Error(`订阅失败:${err.message}`)); + } else { + console.info(`MQTT订阅成功:${subs.map(item => item.topic).join(',')}`); + onSuccess && onSuccess(client.instance); + } + }); + }); + + // 连接错误(核心:处理所有连接失败场景) + client.instance.on('error', (err) => { + console.error('MQTT连接错误:', err); + client.connected = false; + client.isConnecting = false; + onError && onError(err); + }); + + // 连接关闭(被动断开:服务器/网络原因) + client.instance.on('close', () => { + console.info('MQTT连接已关闭'); + client.connected = false; + client.isConnecting = false; + // 非手动断开则保留实例(等待自动重连) + if (!client.isManualDisconnect) { + console.info('非手动断开,等待自动重连...'); + } else { + client.instance = null; // 手动断开则清空实例 + } + }); + + // 重连中(可选:感知重连状态) + client.instance.on('reconnect', () => { + console.info('MQTT正在重连...'); + client.isConnecting = true; + client.connected = false; + }); + + } catch (err) { + console.error('MQTT客户端创建失败:', err); + client.isConnecting = false; + client.connected = false; + client.instance = null; + onError && onError(err); + } + + return client; } - /** * 断开MQTT连接(无Promise,纯回调) * @param {Function} onComplete 完成回调 () => {} */ -export function disconnectMQTT() { +export function disconnectMQTT(onComplete) { + // 标记为手动断开(避免重连) + client.isManualDisconnect = true; + if (client.instance && client.connected) { - client.instance.end() - client.connected = false - console.info('已断开MQTT连接') + client.instance.end(false, () => { // false:不发送will消息 + console.info('已断开MQTT连接'); + client.connected = false; + client.isConnecting = false; + client.instance = null; // 清空实例 + onComplete && onComplete(); + }); + } else { + // 无有效连接,直接回调 + client.connected = false; + client.isConnecting = false; + client.instance = null; + onComplete && onComplete(); } } @@ -68,11 +158,13 @@ export function getMQTTClientInstance() { } /** - * 获取MQTT连接状态 - * @returns {Object} {isConnected: Boolean, isConnecting: Boolean} + * 获取MQTT连接状态(更精准) + * @returns {Object} 完整状态 */ export function getMQTTStatus() { return { - isConnected: !!client && client.connected, + isConnected: client.connected, // 最终连接状态 + isConnecting: client.isConnecting, // 连接中/重连中 + isManualDisconnect: client.isManualDisconnect // 是否手动断开 }; -} +} \ No newline at end of file