/** * 智能农业小程序 - MQTT全局工具类(uniapp+小程序适配版) * 核心设计: * 1. 配置暂时写死全局,后续可替换为登录后从数据字典获取 * 2. clientId仅随机生成(无需关联userId),保证唯一性即可 * 3. 同步方法为主,全局统一管理连接/订阅/发布,多页面复用 */ // 引入小程序版MQTT核心库(确保mqtt.min.js在utils目录) // 微信小程序端:引入适配小程序的 mqtt 版本 import mqtt from 'mqtt/dist/mqtt' import {batchSubscribe, batchUnsubscribe} from "../api/system/mqtt"; import {getToken} from "./auth"; import {listAgri} from "../api/system/assets/agri"; import store from "../store"; // ===================== MQTT配置(暂时写死,TODO:后续从数据字典获取)===================== const MQTT_CONFIG = { server: 'wxs://mq.xiaoces.com:443/mqtt', // 替换为你的MQTT服务器地址 username: 'admin', // 替换为通用账号 password: 'Admin#12345678', // 替换为通用密码 clean: true, host: 'mq.xiaoces.com', port: 443, reconnectPeriod: 5000, // 重连间隔 connectTimeout: 10000, // 连接超时 keepalive: 60, // 心跳时间 } // ===================== MQTT全局状态管理 ===================== const mqttState = { client: null, // MQTT客户端实例 isConnected: false, // 连接状态(同步标记) subscribeList: [], // 全局订阅列表(登录后赋值) options: { // MQTT连接选项 clientId: '', // 仅随机生成,无需关联userId ...MQTT_CONFIG // 合并基础配置 }, onMessageCallback: null // 全局消息回调(各页面自定义) } /** * 初始化MQTT配置(生成随机clientId,无需传入userId) * @returns {Boolean} - 是否配置成功 */ export function initMqttConfig() { if (mqttState.client) { console.info("重连前强制断开", mqttState.options.clientId) // 加try-catch,避免断开时客户端已异常导致报错 try { mqttState.client.end(true) } catch (err) { console.warn('旧连接断开失败:', err) } } try { // 仅随机生成clientId(保证唯一性,避免连接冲突) mqttState.options.clientId = `wx_mqtt_${Math.random().toString(16).substr(2, 10)}` // 重置旧状态,避免干扰 mqttState.isConnected = false mqttState.client = null console.log('MQTT配置初始化成功,clientId:', mqttState.options.clientId) return true } catch (err) { uni.showToast({title: '设备连接异常-设备初始化失败', icon: 'none', duration: 2000}) console.error('MQTT配置初始化失败:', err) return false } } /** * 建立MQTT连接(初始化后调用,同步返回触发状态) * @returns {Boolean} - 是否成功触发连接 */ export function connectMqtt() { // 前置校验:是否已初始化配置 if (!mqttState.options.clientId) { uni.showToast({title: '设备连接失败', icon: 'none'}) console.error('MQTT连接失败:请先调用initMqttConfig初始化配置') return false } // 避免重复连接 if (mqttState.client != null) { console.log('MQTT已连接,无需重复操作') return true } try { // 创建客户端实例(同步操作) // #ifndef MP-WEIXIN MQTT_CONFIG.server = 'wxs://mq.xiaoces.com:443/mqtt' // #endif console.info("mqttState.connect", mqttState) mqttState.client = mqtt.connect(MQTT_CONFIG.server, mqttState.options) console.info("重连中。。") // 监听核心事件(异步,同步更新状态) mqttState.client.on('connect', () => { console.log('MQTT连接成功', mqttState.options.clientId) mqttState.isConnected = true // 连接成功后自动订阅全局列表 subscribeAllTopics() }) mqttState.client.on('close', () => { if (getToken()) { batchUnsubscribe({clientId: mqttState.options.clientId}).then(response => { if (response.code === 200) { console.info("取消订阅成功!") } }) } console.log('MQTT连接断开') mqttState.isConnected = false }) mqttState.client.on('error', (err) => { uni.showToast({title: '设备连接异常', icon: 'none'}) console.error('MQTT连接错误:', err) mqttState.isConnected = false }) // 全局接收消息,转发给页面自定义回调 mqttState.client.on('message', (topic, payload) => { const message = payload.toString() // console.log(`收到MQTT消息:topic=${topic},message=${message}`) if (typeof mqttState.onMessageCallback === 'function') { mqttState.onMessageCallback(topic, message) ackMsg(topic, payload) } }) return true } catch (err) { uni.showToast({title: '设备连接异常', icon: 'none'}) console.error('MQTT连接创建失败:', err) mqttState.isConnected = false return false } } export function ackMsg(topic, payload) { var words = topic.split("/"); if (words.length!==3) { return; } const agriList = uni.getStorageSync('agri_list') if (agriList.length === 0 || !agriList.includes(words[1])) { return; } const tag = topic.match(/[^/]+$/)?.[0] var regex = { alarm:1, notice:2, event:3 } if (!regex[tag]) return; uni.showTabBarRedDot({index: 2}) // uni.setTabBarBadge({ // index: 0, // text: '666' // }) } /** * 设置页面专属消息回调(各页面独立处理消息) * @param {Function} callback - (topic, message) => {} */ export function setOnMessageCallback(callback) { if (typeof callback === 'function') { mqttState.onMessageCallback = callback console.log('MQTT消息回调已注册') } else { console.error('MQTT回调必须是函数类型') } } /** * 移除消息回调(页面卸载时调用,避免内存泄漏) */ export function removeOnMessageCallback() { mqttState.onMessageCallback = null console.log('MQTT消息回调已移除') } /** * 更新全局订阅列表(登录后调用,自动订阅) * @param {Array} list - 订阅主题列表 ['topic1', 'topic2'] * @returns {Boolean} - 是否更新成功 */ export function updateSubscribeList(list) { if (!Array.isArray(list)) { uni.showToast({title: '设备订阅更新异常', icon: 'none'}) console.error('订阅列表必须是数组') return false } mqttState.subscribeList = [...list] console.log('MQTT订阅列表已更新:', list) // 已连接则立即订阅 if (mqttState.isConnected) { subscribeAllTopics() } return true } /** * 内部方法:订阅所有全局主题 */ function subscribeAllTopics() { const {isConnected, client, subscribeList} = mqttState if (!isConnected || !client || subscribeList.length === 0) { console.warn('MQTT订阅跳过:未连接或列表为空') return } client.subscribe(subscribeList, {qos: 0}, (err) => { if (err) { uni.showToast({title: '设备订阅异常', icon: 'none'}) console.error('MQTT订阅失败:', err) } else { console.log(`MQTT成功订阅:${subscribeList.join(', ')}`) } }) } /** * 发布MQTT消息(同步调用) * @param {String} topic - 发布主题 * @param {String/Object} message - 发布内容 * @returns {Boolean} - 是否触发发布成功 */ export function publishMqtt(topic, message) { // if (process.env.NODE_ENV === "production") { const {isConnected, client} = mqttState if (!isConnected || !client) { uni.showToast({title: '控制异常', icon: 'none'}) console.error('MQTT发布失败:未连接') return false } if (!topic) { uni.showToast({title: '控制异常', icon: 'none'}) console.error('MQTT发布失败:主题为空') return false } try { const msg = typeof message === 'object' ? JSON.stringify(message) : String(message) client.publish(topic, msg, (err) => { if (err) { uni.showToast({title: '控制异常', icon: 'none'}) console.error(`MQTT发布失败:topic=${topic},err=${err}`) } else { console.log(`MQTT发布成功:topic=${topic},message=${msg}`) } }) return true } catch (err) { uni.showToast({title: '控制异常', icon: 'none'}) console.error('MQTT发布异常:', err) return false } // } } /** * 断开MQTT连接(登出/小程序切后台时调用) * @returns {Boolean} - 是否断开成功 */ export function disconnectMqtt() { if (!mqttState.client) { console.log('MQTT无客户端实例,无需断开') resetMqttState() return true } if (getToken()) { batchUnsubscribe({clientId: mqttState.options.clientId}).then(response => { if (response.code === 200) { console.info("取消订阅成功!") } }) } try { mqttState.client.end(true) // 强制断开 resetMqttState() console.log('MQTT连接已断开') return true } catch (err) { uni.showToast({title: '设备通信异常', icon: 'none'}) console.error('MQTT断开失败:', err) return false } } /** * 重置MQTT状态(内部方法) */ function resetMqttState() { mqttState.isConnected = false mqttState.client = null mqttState.subscribeList = [] mqttState.onMessageCallback = null // 保留clientId,直到下一次初始化覆盖 } /** * 获取当前MQTT状态(同步) * @returns {Object} - 状态副本 */ export function getMqttState() { return { isConnected: mqttState.isConnected, subscribeList: [...mqttState.subscribeList], clientId: mqttState.options.clientId, client: mqttState.client } } // utils/mqttOnline.js let timer = null; export function startMqttOnlinePing(intervalMs = 20000) { if (!mqttState.client || !mqttState.options.clientId) return; if (timer) return; const topic = `frontend/${mqttState.options.clientId}/online`; const ping = () => { try { if (!mqttState.client.connected) return; const payload = JSON.stringify({ts: Date.now()}); // qos=0 足够;retain 不要 mqttState.client.publish(topic, payload, {qos: 0, retain: false}); // console.info(`主题:${topic}:${payload}`) } catch (e) { } }; ping(); // 每 20000ms(20秒)执行一次 ping timer = setInterval(ping, intervalMs); } export function stopMqttOnlinePing() { if (timer) { clearInterval(timer); timer = null; } } /** * 更新MQTT订阅主题(无返回值、不用async、纯回调实现) */ export function updateSubscribeTopic() { const subscribeList = []; // 初始化clientId,空值保护 let clientId = mqttState.options?.clientId || ''; if (!clientId) { throw new Error('MQTT clientId 为空,无法生成订阅主题'); } const agriList = []; // 1. 获取设备列表(纯.then/.catch回调) // 获取已启用的设备列表 listAgri({status: 1}).then(response => { if (response.code === 200) { // 生成设备订阅主题(校验imei非空) response.rows.forEach(item => { if (item?.imei) { agriList.push(item.imei); subscribeList.push(`frontend/${clientId}/dtu/${item.imei}/+`); subscribeList.push(`device/${item.imei}/+`); } }); uni.setStorageSync('agri_list', agriList); // 管理员额外订阅指定设备 if (store.getters && store.getters.name === 'admin') { subscribeList.push(`frontend/${clientId}/dtu/862538065276061/+`); } // 有订阅主题时执行批量订阅 if (subscribeList.length > 0) { // 2. 批量订阅(链式.then保证执行顺序) batchSubscribe({clientId}) .then(result => { if (result.code === 200) { console.info(`设备列表订阅成功:${subscribeList}`); // 更新订阅列表+同步本地存储 updateSubscribeList(subscribeList); uni.setStorageSync('mqtt_subscribe_list', subscribeList); console.log('恢复MQTT订阅列表:', subscribeList); } else { console.error(`批量订阅失败,返回码:${result.code}`); } }) .catch(batchErr => { console.error('批量订阅失败:', batchErr.message); }); } else { console.warn('无可用的订阅主题,跳过订阅操作'); } } else { console.error(`获取设备列表失败,返回码:${response.code}`); } }).catch(error => { console.error('更新MQTT订阅主题失败:', error.message); }); } // 导出所有方法(全局调用) export default { initMqttConfig, connectMqtt, setOnMessageCallback, removeOnMessageCallback, updateSubscribeList, updateSubscribeTopic, publishMqtt, disconnectMqtt, getMqttState }