diff --git a/App.vue b/App.vue index 52628e7..5c4d32f 100644 --- a/App.vue +++ b/App.vue @@ -1,10 +1,51 @@ diff --git a/config.js b/config.js index 4bd12cd..8961a79 100644 --- a/config.js +++ b/config.js @@ -1,6 +1,6 @@ // 应用全局配置 module.exports = { - baseUrl: "production" ? "/api" : "http://localhost:8088", + baseUrl: process.env.NODE_ENV === "production" ? "/api" : "http://localhost:8088", // 应用信息 appInfo: { // 应用名称 diff --git a/pages/control/index.vue b/pages/control/index.vue index 7cdbc81..a14c69b 100644 --- a/pages/control/index.vue +++ b/pages/control/index.vue @@ -58,7 +58,9 @@ - + @@ -169,12 +171,12 @@ export default { mqttConfig: { host: '1.94.254.176', port: 9001, - clientId: 'uniapp_mqtt_' + Math.random().toString(16).substr(2, 8), username: 'admin', password: 'Admin#12345678', subscribeTopic:'/up', }, value: 1, + control: '正在加载中...', range: [{ "value": '864865085016294', "text": "十方北棚" @@ -273,7 +275,7 @@ export default { } findDtuDataByInfo(queryParams).then(response => { this.liveData = { - temp1: response.data.temp1 || '已离线...', + temp1: response.data.temp1 || '已离线..', temp2: response.data.temp2 || '已离线..', temp3: response.data.temp3 || '已离线..', temp4: response.data.temp4 || '已离线..', @@ -317,6 +319,7 @@ export default { jm3k: 0, jm3g: 0 }; + this.control = '正在加载中...'; this.message = {}; this.temp = ''; this.liveData = { @@ -393,7 +396,7 @@ export default { }, connectMqtt() { const options = { - clientId: this.mqttConfig.clientId, + clientId: 'uniapp_mqtt_' + Math.random().toString(16).substr(2, 8), username: this.mqttConfig.username, password: this.mqttConfig.password, clean: true, @@ -409,6 +412,7 @@ export default { this.connected = true this.client.subscribe('dtu/+/up', {qos: 0}) this.addMessage('已连接到MQTT服务器') + console.info(this.client) }) this.client.on("message", this.ackMessage); @@ -425,6 +429,7 @@ export default { this.client.on('close', () => { this.addMessage('连接已关闭') this.connected = false + console.info(this.client) }) }, @@ -527,6 +532,7 @@ export default { // 判断值:0→暂停,1→运行,其他值可补充默认值(可选) this.show[key] = value === 0 ? '暂停' : '运行'; }); + this.control = '最后更新时间:' + this.getCurrentTime(); // console.info("imei: "+this.publishTopic+"copy: ",this.status) } const allKeysNumeric2 = Object.keys(msgData).every(key => /^\d+$/.test(key)); @@ -546,7 +552,7 @@ export default { humi4: div10(msgData["104"]) || "已离线...", } // 调用函数获取并输出格式化后的当前时间 - this.temp = "最新更新时间:" + this.getCurrentTime(); + this.temp = "最后更新时间:" + this.getCurrentTime(); this.fontStyle = 'font-size:16px;' } diff --git a/utils/mqtt.js b/utils/mqtt.js index a5729e7..dd38b19 100644 --- a/utils/mqtt.js +++ b/utils/mqtt.js @@ -1,214 +1,260 @@ +/** + * MQTT工具类 - 全局唯一连接、自动重连、支持自定义配置和订阅列表 + * 适配uniapp小程序/APP/H5(小程序仅支持ws/wss) + */ + +// 引入mqtt库(需先安装:npm install mqtt --save,或下载mqtt.min.js到本地引入) import mqtt from 'mqtt' -// 全局MQTT实例(单例,避免多页面重复创建) -let mqttInstance = null; + +// 全局变量 +let client = null; // MQTT客户端实例 +let subscribeList = []; // 订阅主题列表(重连后自动恢复订阅) +let reconnectTimer = null; // 重连定时器 +let maxReconnectTimes = 10; // 最大重连次数 +let currentReconnectTimes = 0; // 当前重连次数 +let isManualDisconnect = false; // 是否主动断开(用于区分主动/被动断开,避免主动断开后重连) /** - * 初始化MQTT客户端(单例模式) - * @param {Object} mqttConfig 配置项 - * @param {Array|String} subTopic 初始订阅主题 - * @returns {Object} MQTT实例 + * 初始化MQTT连接 + * @param {Object} config - MQTT连接配置 + * @param {Array} subs - 初始订阅列表,格式:[{ topic: 'topic1', qos: 0 }, { topic: 'topic2', qos: 1 }] + * @returns {Promise} 连接成功/失败的Promise */ -function createMqttClient(mqttConfig, subTopic) { - // 已存在实例则直接返回 - if (mqttInstance && mqttInstance.connected) { - // 若传入新主题,补充订阅 - if (subTopic) { - mqttInstance.subscribe(subTopic); +export function initMQTT(config, subs = []) { + return new Promise((resolve, reject) => { + // 1. 校验配置(必填项) + if (!config.host || !config.port) { + reject(new Error('MQTT配置错误:host和port为必填项')); + return; } - return mqttInstance; - } - // 标准化配置(默认值 + 防错) - const config = { - host: mqttConfig.host || '1.94.254.176', - port: mqttConfig.port || 9001, - clientId: mqttConfig.clientId || `uniapp_mqtt_${Date.now()}${Math.random().toString(16).substr(2, 4)}`, // 时间戳+随机数,减少重复 - username: mqttConfig.username || 'admin', - password: mqttConfig.password || 'Admin#12345678', - clean: false, // 关键:改为false,保持会话缓存消息 - reconnectPeriod: mqttConfig.reconnectPeriod || 3000, - connectTimeout: mqttConfig.connectTimeout || 5000, - qos: mqttConfig.qos || 1 // 默认QoS1,确保消息不丢 - }; + // 2. 保存订阅列表(用于重连恢复) + subscribeList = subs; - // 连接选项 - const options = { - clientId: config.clientId, - username: config.username, - password: config.password, - clean: config.clean, - connectTimeout: config.connectTimeout, - reconnectPeriod: config.reconnectPeriod, - keepalive: 60 // 新增心跳,避免连接被断开 - }; + // 3. 避免重复连接 + if (client && client.connected) { + resolve(client); + return; + } - // 拼接WS地址(兼容配置错误) - const url = `ws://${config.host}:${config.port}/mqtt`; + // 4. 标记为非主动断开(允许重连) + isManualDisconnect = false; - // 创建客户端 - const client = mqtt.connect(url, options); + // 5. 构建连接地址(小程序仅支持ws/wss,优先用wss更安全) + const protocol = config.protocol || (config.isSSL ? 'wss' : 'ws'); + const connectUrl = `${protocol}://${config.host}:${config.port}/mqtt`; - // 实例状态管理 - const instance = { - client: client, - connected: false, - subscribedTopics: new Set(), // 记录已订阅主题 - config: config, - messageCallback: null, // 消息接收回调 - statusCallback: null, // 状态变更回调 + // 6. 构建MQTT连接选项 + const mqttOptions = { + clientId: config.clientId || `uni_mqtt_${Math.random().toString(16).substr(2, 8)}`, + username: config.username || '', + password: config.password || '', + keepalive: config.keepalive || 60, // 心跳间隔(秒) + clean: config.clean !== undefined ? config.clean : true, // 是否清除会话 + reconnectPeriod: 0, // 关闭内置重连(自定义重连逻辑) + connectTimeout: config.connectTimeout || 10000, // 连接超时(毫秒) + ...config.extraOptions // 额外扩展配置 + }; - // 订阅主题(支持单个/多个,带重试) - subscribe: function (topics, qos = config.qos) { - if (!this.connected) { - console.warn('MQTT未连接,延迟订阅:', topics); - // 连接成功后自动订阅 - client.once('connect', () => this.subscribe(topics, qos)); - return; - } + try { + // 7. 创建客户端并连接 + client = mqtt.connect(connectUrl, mqttOptions); - const topicList = Array.isArray(topics) ? topics : [topics]; - client.subscribe(topicList, { qos }, (err) => { - if (err) { - console.error('订阅失败:', err, '主题:', topicList); - // 订阅失败重试(仅一次) - setTimeout(() => this.subscribe(topics, qos), 1000); - } else { - topicList.forEach(t => this.subscribedTopics.add(t)); - console.log(`订阅成功${topicList.length > 1 ? '(批量)' : '(单个)'}:`, topicList); - this.statusCallback && this.statusCallback('subscribe_success', topicList); + // 8. 监听连接成功 + client.on('connect', () => { + console.log('MQTT连接成功:', connectUrl); + currentReconnectTimes = 0; // 重置重连次数 + // 9. 订阅初始主题 + subscribeTopics(subscribeList); + resolve(client); + }); + + // 10. 监听连接错误 + client.on('error', (err) => { + console.error('MQTT连接错误:', err); + client.end(); + reject(err); + }); + + // 11. 监听连接断开(被动断开则触发重连) + client.on('close', () => { + console.log('MQTT连接已断开'); + client = null; + // 非主动断开 + 未达最大重连次数 → 触发重连 + if (!isManualDisconnect && currentReconnectTimes < maxReconnectTimes) { + reconnectMQTT(config); } }); - }, - // 取消订阅 - unsubscribe: function (topics) { - if (!this.connected) return; - const topicList = Array.isArray(topics) ? topics : [topics]; - client.unsubscribe(topicList, (err) => { - if (err) { - console.error('取消订阅失败:', err); - } else { - topicList.forEach(t => this.subscribedTopics.delete(t)); - console.log('取消订阅成功:', topicList); - this.statusCallback && this.statusCallback('unsubscribe_success', topicList); - } + // 12. 监听消息(全局消息转发,页面层通过uni.$on监听) + client.on('message', (topic, message) => { + const msg = { + topic, + payload: message.toString(), // 转字符串(原始是Buffer) + timestamp: Date.now() + }; + // 全局广播消息,页面层按需监听 + uni.$emit('mqtt_message', msg); }); - }, - // 发布消息(带参数,失败重试) - publish: function (topic, message, qos = config.qos, retain = false) { - return new Promise((resolve, reject) => { - if (!this.connected) { - reject(new Error('MQTT未连接,无法发布消息')); - // 自动重连后发布 - this.reconnectAndPublish(topic, message, qos, retain); - return; - } - - // 标准化消息格式(对象转JSON) - const payload = typeof message === 'object' ? JSON.stringify(message) : String(message); - - client.publish(topic, payload, { qos, retain }, (err) => { - if (err) { - console.error('发布失败:', err, '主题:', topic); - reject(err); - // 发布失败重试 - setTimeout(() => this.publish(topic, message, qos, retain), 1000); - } else { - console.log('发布成功:', topic, '内容:', payload); - resolve({ topic, payload }); - } - }); - }); - }, - - // 重连后补发消息 - reconnectAndPublish: function (topic, message, qos, retain) { - client.once('connect', () => { - this.publish(topic, message, qos, retain); - }); - }, - - // 断开连接 - disconnect: function () { - if (this.connected && this.client) { - this.client.end(false, () => { // false:等待剩余消息发送完成 - this.connected = false; - this.subscribedTopics.clear(); - console.log('MQTT连接已断开(保留会话)'); - this.statusCallback && this.statusCallback('disconnect'); - }); - } + } catch (err) { + console.error('MQTT初始化失败:', err); + reject(err); } - }; - - // 绑定客户端事件 - client.on('connect', () => { - instance.connected = true; - console.log('MQTT连接成功,ClientId:', config.clientId); - instance.statusCallback && instance.statusCallback('connect_success'); - // 初始订阅主题 - if (subTopic) { - instance.subscribe(subTopic); - } - }) - .on('reconnect', (error) => { - instance.connected = false; - console.log('MQTT正在重连...', error); - instance.statusCallback && instance.statusCallback('reconnect', error); - }) - .on('error', (error) => { - instance.connected = false; - console.error('MQTT连接错误:', error); - instance.statusCallback && instance.statusCallback('error', error); - }) - .on('close', () => { - instance.connected = false; - console.log('MQTT连接关闭'); - instance.statusCallback && instance.statusCallback('close'); - }) - .on('offline', () => { - instance.connected = false; - console.log('MQTT客户端下线'); - instance.statusCallback && instance.statusCallback('offline'); - }) - .on('message', (topic, payload) => { - const msg = payload.toString(); - console.log('收到MQTT消息:', topic, msg); - // 消息回调,交给业务层处理 - instance.messageCallback && instance.messageCallback(topic, msg); - }); - - // 赋值单例 - mqttInstance = instance; - return instance; + }); } /** - * 对外暴露的核心方法 + * 订阅主题(支持单个/多个) + * @param {Array} topics - 订阅列表,格式:[{ topic: 'topic1', qos: 0 }, { topic: 'topic2', qos: 1 }] */ -export const mqttTool = { - // 初始化连接 - connect: function (mqttConfig, subTopic) { - return createMqttClient(mqttConfig, subTopic); - }, - - // 获取全局实例 - getInstance: function () { - return mqttInstance; - }, - - // 页面切换时的订阅管理(核心解决多页面订阅问题) - switchPageTopic: function (newTopics, oldTopics) { - const instance = mqttInstance; - if (!instance) return; - - // 先订阅新主题,再取消旧主题(避免漏消息) - if (newTopics) { - instance.subscribe(newTopics); - } - if (oldTopics) { - instance.unsubscribe(oldTopics); - } +export function subscribeTopics(topics = []) { + if (!client || !client.connected) { + console.warn('MQTT未连接,无法订阅主题'); + return; } -}; \ No newline at end of file + + // 过滤空主题 + const validTopics = topics.filter(item => item && item.topic); + if (validTopics.length === 0) return; + + // 转换为mqtt库要求的格式:{ topic1: { qos: 0 }, topic2: { qos: 1 } } + const subscribeMap = {}; + validTopics.forEach(item => { + subscribeMap[item.topic] = { qos: item.qos || 0 }; + }); + + client.subscribe(subscribeMap, (err) => { + if (err) { + console.error('MQTT订阅失败:', err); + } else { + console.log('MQTT订阅成功:', validTopics.map(item => item.topic)); + // 更新订阅列表(用于重连恢复) + subscribeList = [...new Set([...subscribeList, ...validTopics])]; // 去重 + } + }); +} + +/** + * 取消订阅主题 + * @param {Array} topics - 取消订阅的主题列表,格式:['topic1', 'topic2'] + */ +export function unsubscribeTopics(topics = []) { + if (!client || !client.connected) { + console.warn('MQTT未连接,无法取消订阅'); + return; + } + + client.unsubscribe(topics, (err) => { + if (err) { + console.error('MQTT取消订阅失败:', err); + } else { + console.log('MQTT取消订阅成功:', topics); + // 更新订阅列表 + subscribeList = subscribeList.filter(item => !topics.includes(item.topic)); + } + }); +} + +/** + * 发布消息 + * @param {String} topic - 发布的主题 + * @param {String/Buffer} payload - 发布的消息内容 + * @param {Object} options - 发布选项,如{ qos: 0, retain: false } + * @returns {Promise} 发布成功/失败的Promise + */ +export function publishMQTT(topic, payload, options = { qos: 0, retain: false }) { + return new Promise((resolve, reject) => { + if (!client || !client.connected) { + reject(new Error('MQTT未连接,无法发布消息')); + return; + } + + if (!topic) { + reject(new Error('发布失败:主题不能为空')); + return; + } + + client.publish(topic, payload, options, (err) => { + if (err) { + console.error(`MQTT发布${topic}失败:`, err); + reject(err); + } else { + console.log(`MQTT发布${topic}成功:`, payload); + resolve(true); + } + }); + }); +} + +/** + * 断开MQTT连接(主动断开,不会触发重连) + */ +export function disconnectMQTT() { + // 标记为主动断开 + isManualDisconnect = true; + // 清除重连定时器 + if (reconnectTimer) { + clearTimeout(reconnectTimer); + reconnectTimer = null; + } + // 断开连接 + if (client && client.connected) { + client.end(false, () => { // false:不发送遗嘱消息 + console.log('MQTT主动断开连接'); + client = null; + subscribeList = []; + }); + } +} + +/** + * 重连MQTT(内部调用,也可外部手动触发) + * @param {Object} config - 连接配置(同initMQTT的config) + */ +export function reconnectMQTT(config) { + // 清除已有定时器,避免重复触发 + if (reconnectTimer) { + clearTimeout(reconnectTimer); + } + + // 重连间隔:1秒/次(可自定义) + reconnectTimer = setTimeout(async () => { + currentReconnectTimes++; + console.log(`MQTT重连中,第${currentReconnectTimes}/${maxReconnectTimes}次`); + try { + await initMQTT(config, subscribeList); + } catch (err) { + // 重连失败,继续尝试(直到达到最大次数) + if (currentReconnectTimes < maxReconnectTimes) { + reconnectMQTT(config); + } else { + console.error('MQTT重连次数已达上限,停止重连'); + uni.$emit('mqtt_reconnect_fail'); // 全局通知重连失败 + } + } + }, 1000); +} + +/** + * 获取MQTT客户端状态 + * @returns {Object} 状态信息 + */ +export function getMQTTStatus() { + return { + isConnected: !!client && client.connected, // 是否连接 + client: client, // 客户端实例 + subscribeList: [...subscribeList], // 已订阅列表(浅拷贝,避免外部修改) + currentReconnectTimes, // 当前重连次数 + maxReconnectTimes // 最大重连次数 + }; +} + +/** + * 手动触发重连(外部调用,比如页面主动刷新连接) + * @param {Object} config - 连接配置 + */ +export function manualReconnect(config) { + currentReconnectTimes = 0; // 重置重连次数 + reconnectMQTT(config); +} \ No newline at end of file