import mqtt from 'mqtt' // 全局MQTT实例(单例,避免多页面重复创建) let mqttInstance = null; /** * 初始化MQTT客户端(单例模式) * @param {Object} mqttConfig 配置项 * @param {Array|String} subTopic 初始订阅主题 * @returns {Object} MQTT实例 */ function createMqttClient(mqttConfig, subTopic) { // 已存在实例则直接返回 if (mqttInstance && mqttInstance.connected) { // 若传入新主题,补充订阅 if (subTopic) { mqttInstance.subscribe(subTopic); } return mqttInstance; } // 标准化配置(默认值 + 防错) const config = { host: mqttConfig.host || '1.94.254.176', port: mqttConfig.port || 9001, clientId: mqttConfig.clientId || `uniapp_mqtt_${Date.now()}${Math.random().toString(16).substr(2, 4)}`, // 时间戳+随机数,减少重复 username: mqttConfig.username || 'admin', password: mqttConfig.password || 'Admin#12345678', clean: false, // 关键:改为false,保持会话缓存消息 reconnectPeriod: mqttConfig.reconnectPeriod || 3000, connectTimeout: mqttConfig.connectTimeout || 5000, qos: mqttConfig.qos || 1 // 默认QoS1,确保消息不丢 }; // 连接选项 const options = { clientId: config.clientId, username: config.username, password: config.password, clean: config.clean, connectTimeout: config.connectTimeout, reconnectPeriod: config.reconnectPeriod, keepalive: 60 // 新增心跳,避免连接被断开 }; // 拼接WS地址(兼容配置错误) const url = `ws://${config.host}:${config.port}/mqtt`; // 创建客户端 const client = mqtt.connect(url, options); // 实例状态管理 const instance = { client: client, connected: false, subscribedTopics: new Set(), // 记录已订阅主题 config: config, messageCallback: null, // 消息接收回调 statusCallback: null, // 状态变更回调 // 订阅主题(支持单个/多个,带重试) subscribe: function (topics, qos = config.qos) { if (!this.connected) { console.warn('MQTT未连接,延迟订阅:', topics); // 连接成功后自动订阅 client.once('connect', () => this.subscribe(topics, qos)); return; } const topicList = Array.isArray(topics) ? topics : [topics]; client.subscribe(topicList, { qos }, (err) => { if (err) { console.error('订阅失败:', err, '主题:', topicList); // 订阅失败重试(仅一次) setTimeout(() => this.subscribe(topics, qos), 1000); } else { topicList.forEach(t => this.subscribedTopics.add(t)); console.log(`订阅成功${topicList.length > 1 ? '(批量)' : '(单个)'}:`, topicList); this.statusCallback && this.statusCallback('subscribe_success', topicList); } }); }, // 取消订阅 unsubscribe: function (topics) { if (!this.connected) return; const topicList = Array.isArray(topics) ? topics : [topics]; client.unsubscribe(topicList, (err) => { if (err) { console.error('取消订阅失败:', err); } else { topicList.forEach(t => this.subscribedTopics.delete(t)); console.log('取消订阅成功:', topicList); this.statusCallback && this.statusCallback('unsubscribe_success', topicList); } }); }, // 发布消息(带参数,失败重试) publish: function (topic, message, qos = config.qos, retain = false) { return new Promise((resolve, reject) => { if (!this.connected) { reject(new Error('MQTT未连接,无法发布消息')); // 自动重连后发布 this.reconnectAndPublish(topic, message, qos, retain); return; } // 标准化消息格式(对象转JSON) const payload = typeof message === 'object' ? JSON.stringify(message) : String(message); client.publish(topic, payload, { qos, retain }, (err) => { if (err) { console.error('发布失败:', err, '主题:', topic); reject(err); // 发布失败重试 setTimeout(() => this.publish(topic, message, qos, retain), 1000); } else { console.log('发布成功:', topic, '内容:', payload); resolve({ topic, payload }); } }); }); }, // 重连后补发消息 reconnectAndPublish: function (topic, message, qos, retain) { client.once('connect', () => { this.publish(topic, message, qos, retain); }); }, // 断开连接 disconnect: function () { if (this.connected && this.client) { this.client.end(false, () => { // false:等待剩余消息发送完成 this.connected = false; this.subscribedTopics.clear(); console.log('MQTT连接已断开(保留会话)'); this.statusCallback && this.statusCallback('disconnect'); }); } } }; // 绑定客户端事件 client.on('connect', () => { instance.connected = true; console.log('MQTT连接成功,ClientId:', config.clientId); instance.statusCallback && instance.statusCallback('connect_success'); // 初始订阅主题 if (subTopic) { instance.subscribe(subTopic); } }) .on('reconnect', (error) => { instance.connected = false; console.log('MQTT正在重连...', error); instance.statusCallback && instance.statusCallback('reconnect', error); }) .on('error', (error) => { instance.connected = false; console.error('MQTT连接错误:', error); instance.statusCallback && instance.statusCallback('error', error); }) .on('close', () => { instance.connected = false; console.log('MQTT连接关闭'); instance.statusCallback && instance.statusCallback('close'); }) .on('offline', () => { instance.connected = false; console.log('MQTT客户端下线'); instance.statusCallback && instance.statusCallback('offline'); }) .on('message', (topic, payload) => { const msg = payload.toString(); console.log('收到MQTT消息:', topic, msg); // 消息回调,交给业务层处理 instance.messageCallback && instance.messageCallback(topic, msg); }); // 赋值单例 mqttInstance = instance; return instance; } /** * 对外暴露的核心方法 */ export const mqttTool = { // 初始化连接 connect: function (mqttConfig, subTopic) { return createMqttClient(mqttConfig, subTopic); }, // 获取全局实例 getInstance: function () { return mqttInstance; }, // 页面切换时的订阅管理(核心解决多页面订阅问题) switchPageTopic: function (newTopics, oldTopics) { const instance = mqttInstance; if (!instance) return; // 先订阅新主题,再取消旧主题(避免漏消息) if (newTopics) { instance.subscribe(newTopics); } if (oldTopics) { instance.unsubscribe(oldTopics); } } };