import mqtt from 'mqtt'; // 全局变量:封装更完整的状态 let client = { instance: null, // MQTT客户端实例 connected: false, // 最终连接状态 isConnecting: false, // 连接中状态(防止重复连接) isManualDisconnect: false // 是否手动断开(区分主动/被动断开) }; /** * 初始化MQTT连接(无Promise,纯回调) * @param {Object} config 连接配置 {host, port, username, password, clientId, isSSL, keepalive, clean} * @param {Array} subs 订阅主题列表 [{topic: 'xxx', qos: 0}] * @param {Function} onSuccess 成功回调 (client) => {} * @param {Function} onError 失败回调 (err) => {} */ export function initMQTT(config, subs = [], onSuccess, onError) { // 1. 已有有效连接:直接返回 if (client.instance && client.connected) { onSuccess && onSuccess(client.instance); return client; } // 2. 正在连接中:防止重复请求 if (client.isConnecting) { console.info('MQTT已有连接请求在途,等待结果...'); // 轮询检查连接状态,直到连接完成/失败 const checkTimer = setInterval(() => { if (!client.isConnecting) { clearInterval(checkTimer); if (client.connected) { onSuccess && onSuccess(client.instance); } else { onError && onError(new Error('MQTT连接请求失败')); } } }, 100); return; } // 3. 参数校验 if (!config.host || !config.port) { const err = new Error('MQTT配置错误:host/port不能为空'); onError && onError(err); return; } // 4. 标记连接中 client.isConnecting = true; client.isManualDisconnect = false; // 5. 构建连接参数 const protocol = config.protocol || (config.isSSL ? 'wss' : 'ws'); const url = `${protocol}://${config.host}:${config.port}/mqtt`; const options = { clientId: config.clientId || ('uniapp_mqtt_' + Math.random().toString(16).substr(2, 8)), username: config.username || '', password: config.password || '', clean: config.clean !== undefined ? config.clean : true, connectTimeout: config.connectTimeout || 4000, reconnectPeriod: config.reconnectPeriod || 1000, // 自动重连间隔(可配置) keepalive: config.keepalive || 60 }; // 6. 创建客户端并绑定事件 try { client.instance = mqtt.connect(url, options); // 连接成功 client.instance.on('connect', () => { console.info('已连接到MQTT服务器'); client.connected = true; client.isConnecting = false; // 订阅主题(带错误处理) client.instance.subscribe(subs, { qos: 0 }, (err) => { if (err) { console.error('MQTT订阅失败:', err); onError && onError(new Error(`订阅失败:${err.message}`)); } else { console.info(`MQTT订阅成功:${subs.map(item => item.topic).join(',')}`); onSuccess && onSuccess(client.instance); } }); }); // 连接错误(核心:处理所有连接失败场景) client.instance.on('error', (err) => { console.error('MQTT连接错误:', err); client.connected = false; client.isConnecting = false; onError && onError(err); }); // 连接关闭(被动断开:服务器/网络原因) client.instance.on('close', () => { console.info('MQTT连接已关闭'); client.connected = false; client.isConnecting = false; // 非手动断开则保留实例(等待自动重连) if (!client.isManualDisconnect) { console.info('非手动断开,等待自动重连...'); } else { client.instance = null; // 手动断开则清空实例 } }); // 重连中(可选:感知重连状态) client.instance.on('reconnect', () => { console.info('MQTT正在重连...'); client.isConnecting = true; client.connected = false; }); } catch (err) { console.error('MQTT客户端创建失败:', err); client.isConnecting = false; client.connected = false; client.instance = null; onError && onError(err); } return client; } /** * 断开MQTT连接(无Promise,纯回调) * @param {Function} onComplete 完成回调 () => {} */ export function disconnectMQTT(onComplete) { // 标记为手动断开(避免重连) client.isManualDisconnect = true; if (client.instance && client.connected) { client.instance.end(false, () => { // false:不发送will消息 console.info('已断开MQTT连接'); client.connected = false; client.isConnecting = false; client.instance = null; // 清空实例 onComplete && onComplete(); }); } else { // 无有效连接,直接回调 client.connected = false; client.isConnecting = false; client.instance = null; onComplete && onComplete(); } } /** * 获取MQTT客户端实例 * @returns {Client} MQTT客户端 */ export function getMQTTClient() { return client; } /** * 获取MQTT连接状态(更精准) * @returns {Object} 完整状态 */ export function getMQTTStatus() { return { isConnected: client.connected, // 最终连接状态 isConnecting: client.isConnecting, // 连接中/重连中 isManualDisconnect: client.isManualDisconnect // 是否手动断开 }; }