423 lines
14 KiB
JavaScript
423 lines
14 KiB
JavaScript
/**
|
||
* 智能农业小程序 - MQTT全局工具类(uniapp+小程序适配版)
|
||
* 核心设计:
|
||
* 1. 配置暂时写死全局,后续可替换为登录后从数据字典获取
|
||
* 2. clientId仅随机生成(无需关联userId),保证唯一性即可
|
||
* 3. 同步方法为主,全局统一管理连接/订阅/发布,多页面复用
|
||
*/
|
||
|
||
// 引入小程序版MQTT核心库(确保mqtt.min.js在utils目录)
|
||
|
||
// 微信小程序端:引入适配小程序的 mqtt 版本
|
||
import mqtt from 'mqtt/dist/mqtt'
|
||
import {batchSubscribe, batchUnsubscribe} from "../api/system/mqtt";
|
||
import {getToken} from "./auth";
|
||
import {listAgri} from "../api/system/assets/agri";
|
||
import store from "../store";
|
||
|
||
// ===================== MQTT配置(暂时写死,TODO:后续从数据字典获取)=====================
|
||
const MQTT_CONFIG = {
|
||
server: 'wxs://mq.xiaoces.com:443/mqtt', // 替换为你的MQTT服务器地址
|
||
username: 'admin', // 替换为通用账号
|
||
password: 'Admin#12345678', // 替换为通用密码
|
||
clean: true,
|
||
host: 'mq.xiaoces.com',
|
||
port: 443,
|
||
reconnectPeriod: 5000, // 重连间隔
|
||
connectTimeout: 10000, // 连接超时
|
||
keepalive: 60, // 心跳时间
|
||
}
|
||
|
||
// ===================== MQTT全局状态管理 =====================
|
||
const mqttState = {
|
||
client: null, // MQTT客户端实例
|
||
isConnected: false, // 连接状态(同步标记)
|
||
subscribeList: [], // 全局订阅列表(登录后赋值)
|
||
options: { // MQTT连接选项
|
||
clientId: '', // 仅随机生成,无需关联userId
|
||
...MQTT_CONFIG // 合并基础配置
|
||
},
|
||
onMessageCallback: null // 全局消息回调(各页面自定义)
|
||
}
|
||
|
||
/**
|
||
* 初始化MQTT配置(生成随机clientId,无需传入userId)
|
||
* @returns {Boolean} - 是否配置成功
|
||
*/
|
||
export function initMqttConfig() {
|
||
if (mqttState.client) {
|
||
console.info("重连前强制断开", mqttState.options.clientId)
|
||
// 加try-catch,避免断开时客户端已异常导致报错
|
||
try {
|
||
mqttState.client.end(true)
|
||
} catch (err) {
|
||
console.warn('旧连接断开失败:', err)
|
||
}
|
||
}
|
||
try {
|
||
// 仅随机生成clientId(保证唯一性,避免连接冲突)
|
||
mqttState.options.clientId = `wx_mqtt_${Math.random().toString(16).substr(2, 10)}`
|
||
// 重置旧状态,避免干扰
|
||
mqttState.isConnected = false
|
||
mqttState.client = null
|
||
console.log('MQTT配置初始化成功,clientId:', mqttState.options.clientId)
|
||
return true
|
||
} catch (err) {
|
||
uni.showToast({title: '设备连接异常-设备初始化失败', icon: 'none', duration: 2000})
|
||
console.error('MQTT配置初始化失败:', err)
|
||
return false
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 建立MQTT连接(初始化后调用,同步返回触发状态)
|
||
* @returns {Boolean} - 是否成功触发连接
|
||
*/
|
||
export function connectMqtt() {
|
||
// 前置校验:是否已初始化配置
|
||
if (!mqttState.options.clientId) {
|
||
uni.showToast({title: '设备连接失败', icon: 'none'})
|
||
console.error('MQTT连接失败:请先调用initMqttConfig初始化配置')
|
||
return false
|
||
}
|
||
// 避免重复连接
|
||
if (mqttState.client != null) {
|
||
console.log('MQTT已连接,无需重复操作')
|
||
return true
|
||
}
|
||
|
||
try {
|
||
// 创建客户端实例(同步操作)
|
||
// #ifndef MP-WEIXIN
|
||
MQTT_CONFIG.server = 'wxs://mq.xiaoces.com:443/mqtt'
|
||
// #endif
|
||
console.info("mqttState.connect", mqttState)
|
||
mqttState.client = mqtt.connect(MQTT_CONFIG.server, mqttState.options)
|
||
console.info("重连中。。")
|
||
// 监听核心事件(异步,同步更新状态)
|
||
mqttState.client.on('connect', () => {
|
||
console.log('MQTT连接成功', mqttState.options.clientId)
|
||
mqttState.isConnected = true
|
||
// 连接成功后自动订阅全局列表
|
||
subscribeAllTopics()
|
||
})
|
||
|
||
mqttState.client.on('close', () => {
|
||
if (getToken()) {
|
||
batchUnsubscribe({clientId: mqttState.options.clientId}).then(response => {
|
||
if (response.code === 200) {
|
||
console.info("取消订阅成功!")
|
||
}
|
||
})
|
||
}
|
||
console.log('MQTT连接断开')
|
||
mqttState.isConnected = false
|
||
})
|
||
|
||
mqttState.client.on('error', (err) => {
|
||
uni.showToast({title: '设备连接异常', icon: 'none'})
|
||
console.error('MQTT连接错误:', err)
|
||
mqttState.isConnected = false
|
||
})
|
||
|
||
// 全局接收消息,转发给页面自定义回调
|
||
mqttState.client.on('message', (topic, payload) => {
|
||
const message = payload.toString()
|
||
// console.log(`收到MQTT消息:topic=${topic},message=${message}`)
|
||
if (typeof mqttState.onMessageCallback === 'function') {
|
||
mqttState.onMessageCallback(topic, message)
|
||
ackMsg(topic, payload)
|
||
}
|
||
})
|
||
|
||
return true
|
||
} catch (err) {
|
||
uni.showToast({title: '设备连接异常', icon: 'none'})
|
||
console.error('MQTT连接创建失败:', err)
|
||
mqttState.isConnected = false
|
||
return false
|
||
}
|
||
}
|
||
|
||
export function ackMsg(topic, payload) {
|
||
var words = topic.split("/");
|
||
if (words.length!==3) {
|
||
return;
|
||
}
|
||
const agriList = uni.getStorageSync('agri_list')
|
||
if (agriList.length === 0 || !agriList.includes(words[1])) {
|
||
return;
|
||
}
|
||
const tag = topic.match(/[^/]+$/)?.[0]
|
||
var regex = {
|
||
alarm:1,
|
||
notice:2,
|
||
event:3
|
||
}
|
||
if (!regex[tag]) return;
|
||
uni.showTabBarRedDot({index: 2})
|
||
// uni.setTabBarBadge({
|
||
// index: 0,
|
||
// text: '666'
|
||
// })
|
||
}
|
||
/**
|
||
* 设置页面专属消息回调(各页面独立处理消息)
|
||
* @param {Function} callback - (topic, message) => {}
|
||
*/
|
||
export function setOnMessageCallback(callback) {
|
||
if (typeof callback === 'function') {
|
||
mqttState.onMessageCallback = callback
|
||
console.log('MQTT消息回调已注册')
|
||
} else {
|
||
console.error('MQTT回调必须是函数类型')
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 移除消息回调(页面卸载时调用,避免内存泄漏)
|
||
*/
|
||
export function removeOnMessageCallback() {
|
||
mqttState.onMessageCallback = null
|
||
console.log('MQTT消息回调已移除')
|
||
}
|
||
|
||
/**
|
||
* 更新全局订阅列表(登录后调用,自动订阅)
|
||
* @param {Array} list - 订阅主题列表 ['topic1', 'topic2']
|
||
* @returns {Boolean} - 是否更新成功
|
||
*/
|
||
export function updateSubscribeList(list) {
|
||
if (!Array.isArray(list)) {
|
||
uni.showToast({title: '设备订阅更新异常', icon: 'none'})
|
||
console.error('订阅列表必须是数组')
|
||
return false
|
||
}
|
||
|
||
mqttState.subscribeList = [...list]
|
||
console.log('MQTT订阅列表已更新:', list)
|
||
|
||
// 已连接则立即订阅
|
||
if (mqttState.isConnected) {
|
||
subscribeAllTopics()
|
||
}
|
||
return true
|
||
}
|
||
|
||
/**
|
||
* 内部方法:订阅所有全局主题
|
||
*/
|
||
function subscribeAllTopics() {
|
||
const {isConnected, client, subscribeList} = mqttState
|
||
if (!isConnected || !client || subscribeList.length === 0) {
|
||
console.warn('MQTT订阅跳过:未连接或列表为空')
|
||
return
|
||
}
|
||
|
||
client.subscribe(subscribeList, {qos: 0}, (err) => {
|
||
if (err) {
|
||
uni.showToast({title: '设备订阅异常', icon: 'none'})
|
||
console.error('MQTT订阅失败:', err)
|
||
} else {
|
||
console.log(`MQTT成功订阅:${subscribeList.join(', ')}`)
|
||
}
|
||
})
|
||
}
|
||
|
||
/**
|
||
* 发布MQTT消息(同步调用)
|
||
* @param {String} topic - 发布主题
|
||
* @param {String/Object} message - 发布内容
|
||
* @returns {Boolean} - 是否触发发布成功
|
||
*/
|
||
export function publishMqtt(topic, message) {
|
||
// if (process.env.NODE_ENV === "production") {
|
||
const {isConnected, client} = mqttState
|
||
if (!isConnected || !client) {
|
||
uni.showToast({title: '控制异常', icon: 'none'})
|
||
console.error('MQTT发布失败:未连接')
|
||
return false
|
||
}
|
||
if (!topic) {
|
||
uni.showToast({title: '控制异常', icon: 'none'})
|
||
console.error('MQTT发布失败:主题为空')
|
||
return false
|
||
}
|
||
|
||
try {
|
||
const msg = typeof message === 'object' ? JSON.stringify(message) : String(message)
|
||
client.publish(topic, msg, (err) => {
|
||
if (err) {
|
||
uni.showToast({title: '控制异常', icon: 'none'})
|
||
console.error(`MQTT发布失败:topic=${topic},err=${err}`)
|
||
} else {
|
||
console.log(`MQTT发布成功:topic=${topic},message=${msg}`)
|
||
}
|
||
})
|
||
return true
|
||
} catch (err) {
|
||
uni.showToast({title: '控制异常', icon: 'none'})
|
||
console.error('MQTT发布异常:', err)
|
||
return false
|
||
}
|
||
// }
|
||
}
|
||
|
||
/**
|
||
* 断开MQTT连接(登出/小程序切后台时调用)
|
||
* @returns {Boolean} - 是否断开成功
|
||
*/
|
||
export function disconnectMqtt() {
|
||
if (!mqttState.client) {
|
||
console.log('MQTT无客户端实例,无需断开')
|
||
resetMqttState()
|
||
return true
|
||
}
|
||
if (getToken()) {
|
||
batchUnsubscribe({clientId: mqttState.options.clientId}).then(response => {
|
||
if (response.code === 200) {
|
||
console.info("取消订阅成功!")
|
||
}
|
||
})
|
||
}
|
||
try {
|
||
mqttState.client.end(true) // 强制断开
|
||
resetMqttState()
|
||
console.log('MQTT连接已断开')
|
||
return true
|
||
} catch (err) {
|
||
uni.showToast({title: '设备通信异常', icon: 'none'})
|
||
console.error('MQTT断开失败:', err)
|
||
return false
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 重置MQTT状态(内部方法)
|
||
*/
|
||
function resetMqttState() {
|
||
mqttState.isConnected = false
|
||
mqttState.client = null
|
||
mqttState.subscribeList = []
|
||
mqttState.onMessageCallback = null
|
||
// 保留clientId,直到下一次初始化覆盖
|
||
}
|
||
|
||
/**
|
||
* 获取当前MQTT状态(同步)
|
||
* @returns {Object} - 状态副本
|
||
*/
|
||
export function getMqttState() {
|
||
return {
|
||
isConnected: mqttState.isConnected,
|
||
subscribeList: [...mqttState.subscribeList],
|
||
clientId: mqttState.options.clientId,
|
||
client: mqttState.client
|
||
}
|
||
}
|
||
|
||
|
||
// utils/mqttOnline.js
|
||
let timer = null;
|
||
|
||
export function startMqttOnlinePing(intervalMs = 20000) {
|
||
if (!mqttState.client || !mqttState.options.clientId) return;
|
||
if (timer) return;
|
||
|
||
const topic = `frontend/${mqttState.options.clientId}/online`;
|
||
|
||
const ping = () => {
|
||
try {
|
||
if (!mqttState.client.connected) return;
|
||
const payload = JSON.stringify({ts: Date.now()});
|
||
// qos=0 足够;retain 不要
|
||
mqttState.client.publish(topic, payload, {qos: 0, retain: false});
|
||
// console.info(`主题:${topic}:${payload}`)
|
||
} catch (e) {
|
||
}
|
||
};
|
||
|
||
ping();
|
||
// 每 20000ms(20秒)执行一次 ping
|
||
timer = setInterval(ping, intervalMs);
|
||
}
|
||
|
||
export function stopMqttOnlinePing() {
|
||
if (timer) {
|
||
clearInterval(timer);
|
||
timer = null;
|
||
}
|
||
}
|
||
|
||
|
||
/**
|
||
* 更新MQTT订阅主题(无返回值、不用async、纯回调实现)
|
||
*/
|
||
export function updateSubscribeTopic() {
|
||
const subscribeList = [];
|
||
// 初始化clientId,空值保护
|
||
let clientId = mqttState.options?.clientId || '';
|
||
if (!clientId) {
|
||
throw new Error('MQTT clientId 为空,无法生成订阅主题');
|
||
}
|
||
const agriList = [];
|
||
// 1. 获取设备列表(纯.then/.catch回调)
|
||
listAgri().then(response => {
|
||
if (response.code === 200) {
|
||
|
||
// 生成设备订阅主题(校验imei非空)
|
||
response.rows.forEach(item => {
|
||
if (item?.imei) {
|
||
agriList.push(item.imei);
|
||
subscribeList.push(`frontend/${clientId}/dtu/${item.imei}/+`);
|
||
subscribeList.push(`device/${item.imei}/+`);
|
||
}
|
||
});
|
||
uni.setStorageSync('agri_list', agriList);
|
||
// 管理员额外订阅指定设备
|
||
if (store.getters && store.getters.name === 'admin') {
|
||
subscribeList.push(`frontend/${clientId}/dtu/862538065276061/+`);
|
||
}
|
||
|
||
// 有订阅主题时执行批量订阅
|
||
if (subscribeList.length > 0) {
|
||
// 2. 批量订阅(链式.then保证执行顺序)
|
||
batchSubscribe({clientId})
|
||
.then(result => {
|
||
if (result.code === 200) {
|
||
console.info(`设备列表订阅成功:${subscribeList}`);
|
||
// 更新订阅列表+同步本地存储
|
||
updateSubscribeList(subscribeList);
|
||
uni.setStorageSync('mqtt_subscribe_list', subscribeList);
|
||
console.log('恢复MQTT订阅列表:', subscribeList);
|
||
} else {
|
||
console.error(`批量订阅失败,返回码:${result.code}`);
|
||
}
|
||
})
|
||
.catch(batchErr => {
|
||
console.error('批量订阅失败:', batchErr.message);
|
||
});
|
||
} else {
|
||
console.warn('无可用的订阅主题,跳过订阅操作');
|
||
}
|
||
} else {
|
||
console.error(`获取设备列表失败,返回码:${response.code}`);
|
||
}
|
||
}).catch(error => {
|
||
console.error('更新MQTT订阅主题失败:', error.message);
|
||
});
|
||
}
|
||
|
||
// 导出所有方法(全局调用)
|
||
export default {
|
||
initMqttConfig,
|
||
connectMqtt,
|
||
setOnMessageCallback,
|
||
removeOnMessageCallback,
|
||
updateSubscribeList,
|
||
updateSubscribeTopic,
|
||
publishMqtt,
|
||
disconnectMqtt,
|
||
getMqttState
|
||
}
|