agri-app/utils/mqtt.js

257 lines
8.9 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

/**
* MQTT工具类 - 全局唯一连接、自动重连、支持自定义配置和订阅列表
* 适配uniapp小程序/APP/H5小程序仅支持ws/wss
*/
// 引入mqtt库需先安装npm install mqtt --save或下载mqtt.min.js到本地引入
import mqtt from 'mqtt'
// 全局变量
let client = null; // MQTT客户端实例
let subscribeList = []; // 订阅主题列表(重连后自动恢复订阅)
let reconnectTimer = null; // 重连定时器
let maxReconnectTimes = 10; // 最大重连次数
let currentReconnectTimes = 0; // 当前重连次数
let isManualDisconnect = false; // 是否主动断开(用于区分主动/被动断开,避免主动断开后重连)
/**
* 初始化MQTT连接
* @param {Object} config - MQTT连接配置
* @param {Array} subs - 初始订阅列表,格式:[{ topic: 'topic1', qos: 0 }, { topic: 'topic2', qos: 1 }]
* // 初始化连接(移除事件监听,只返回客户端实例)
* @returns {Promise} 连接成功/失败的Promise
*/
export function initMQTT(config, subs = []) {
return new Promise((resolve, reject) => {
// 1. 校验配置(必填项)
if (!config.host || !config.port) {
reject(new Error('MQTT配置错误host和port为必填项'));
return;
}
// 2. 保存订阅列表(用于重连恢复)
subscribeList = subs;
// 3. 避免重复连接
if (client && client.connected) {
resolve(client);
return;
}
// 4. 标记为非主动断开(允许重连)
isManualDisconnect = false;
// 5. 构建连接地址小程序仅支持ws/wss优先用wss更安全
const protocol = config.protocol || (config.isSSL ? 'wss' : 'ws');
const connectUrl = `${protocol}://${config.host}:${config.port}/mqtt`;
// 6. 构建MQTT连接选项
const mqttOptions = {
clientId: config.clientId || `uni_mqtt_${Math.random().toString(16).substr(2, 8)}`,
username: config.username || '',
password: config.password || '',
keepalive: config.keepalive || 60, // 心跳间隔(秒)
clean: config.clean !== undefined ? config.clean : true, // 是否清除会话
reconnectPeriod: 0, // 关闭内置重连(自定义重连逻辑)
connectTimeout: config.connectTimeout || 10000, // 连接超时(毫秒)
...config.extraOptions // 额外扩展配置
};
try {
// 7. 创建客户端并连接
client = mqtt.connect(connectUrl, mqttOptions);
// 仅保留连接成功的基础逻辑(订阅列表),移除其他事件监听
client.on('connect', () => {
console.log('MQTT连接成功', connectUrl);
currentReconnectTimes = 0;
// 9. 订阅初始主题
subscribeTopics(subscribeList);
resolve(client); // 成功后返回客户端实例,让页面自定义监听
});
// 移除工具层的error/close/message监听 → 交给页面处理
// 保留重连的核心逻辑通过单独的close监听仅用于重连不处理业务
client.on('close', () => {
console.log('MQTT连接已断开工具层');
const oldClient = client; // 暂存旧客户端,避免重连时被覆盖
client = null;
if (!isManualDisconnect && currentReconnectTimes < maxReconnectTimes) {
reconnectMQTT(config).then(newClient => {
// 重连成功后,全局通知页面:客户端已更换,需重新绑定事件
uni.$emit('mqtt_reconnected', newClient);
}).catch(err => {
uni.$emit('mqtt_reconnect_fail', err);
});
}
// 通知页面:旧客户端已断开
uni.$emit('mqtt_closed', oldClient);
});
} catch (err) {
console.error('MQTT初始化失败', err);
reject(err);
}
});
}
// 其他方法subscribeTopics/unsubscribeTopics/publishMQTT/disconnectMQTT/reconnectMQTT/getMQTTStatus/manualReconnect
// 【完全不变】,直接复用之前的代码
/**
* 订阅主题(支持单个/多个)
* @param {Array} topics - 订阅列表,格式:[{ topic: 'topic1', qos: 0 }, { topic: 'topic2', qos: 1 }]
*/
export function subscribeTopics(topics = []) {
if (!client || !client.connected) {
console.warn('MQTT未连接无法订阅主题');
return;
}
// 过滤空主题
const validTopics = topics.filter(item => item && item.topic);
if (validTopics.length === 0) return;
// 转换为mqtt库要求的格式{ topic1: { qos: 0 }, topic2: { qos: 1 } }
const subscribeMap = {};
validTopics.forEach(item => {
subscribeMap[item.topic] = { qos: item.qos || 0 };
});
client.subscribe(subscribeMap, (err) => {
if (err) {
console.error('MQTT订阅失败', err);
} else {
console.log('MQTT订阅成功', validTopics.map(item => item.topic));
// 更新订阅列表(用于重连恢复)
subscribeList = [...new Set([...subscribeList, ...validTopics])]; // 去重
}
});
}
/**
* 取消订阅主题
* @param {Array} topics - 取消订阅的主题列表,格式:['topic1', 'topic2']
*/
export function unsubscribeTopics(topics = []) {
if (!client || !client.connected) {
console.warn('MQTT未连接无法取消订阅');
return;
}
client.unsubscribe(topics, (err) => {
if (err) {
console.error('MQTT取消订阅失败', err);
} else {
console.log('MQTT取消订阅成功', topics);
// 更新订阅列表
subscribeList = subscribeList.filter(item => !topics.includes(item.topic));
}
});
}
/**
* 发布消息
* @param {String} topic - 发布的主题
* @param {String/Buffer} payload - 发布的消息内容
* @param {Object} options - 发布选项,如{ qos: 0, retain: false }
* @returns {Promise} 发布成功/失败的Promise
*/
export function publishMQTT(topic, payload, options = { qos: 0, retain: false }) {
return new Promise((resolve, reject) => {
if (!client || !client.connected) {
reject(new Error('MQTT未连接无法发布消息'));
return;
}
if (!topic) {
reject(new Error('发布失败:主题不能为空'));
return;
}
client.publish(topic, payload, options, (err) => {
if (err) {
console.error(`MQTT发布${topic}失败:`, err);
reject(err);
} else {
console.log(`MQTT发布${topic}成功:`, payload);
resolve(true);
}
});
});
}
/**
* 断开MQTT连接主动断开不会触发重连
*/
export function disconnectMQTT() {
// 标记为主动断开
isManualDisconnect = true;
// 清除重连定时器
if (reconnectTimer) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
// 断开连接
if (client && client.connected) {
client.end(false, () => { // false不发送遗嘱消息
console.log('MQTT主动断开连接');
client = null;
subscribeList = [];
});
}
}
/**
* 重连MQTT内部调用也可外部手动触发
* @param {Object} config - 连接配置同initMQTT的config
*/
export function reconnectMQTT(config) {
// 清除已有定时器,避免重复触发
if (reconnectTimer) {
clearTimeout(reconnectTimer);
}
// 重连间隔1秒/次(可自定义)
reconnectTimer = setTimeout(async () => {
currentReconnectTimes++;
console.log(`MQTT重连中${currentReconnectTimes}/${maxReconnectTimes}`);
try {
await initMQTT(config, subscribeList);
} catch (err) {
// 重连失败,继续尝试(直到达到最大次数)
if (currentReconnectTimes < maxReconnectTimes) {
reconnectMQTT(config);
} else {
console.error('MQTT重连次数已达上限停止重连');
uni.$emit('mqtt_reconnect_fail'); // 全局通知重连失败
}
}
}, 1000);
}
/**
* 获取MQTT客户端状态
* @returns {Object} 状态信息
*/
export function getMQTTStatus() {
return {
isConnected: !!client && client.connected, // 是否连接
client: client, // 客户端实例
subscribeList: [...subscribeList], // 已订阅列表(浅拷贝,避免外部修改)
currentReconnectTimes, // 当前重连次数
maxReconnectTimes // 最大重连次数
};
}
/**
* 手动触发重连(外部调用,比如页面主动刷新连接)
* @param {Object} config - 连接配置
*/
export function manualReconnect(config) {
currentReconnectTimes = 0; // 重置重连次数
reconnectMQTT(config);
}
export function getMQTTClientInstance() {
return client;
}