agri-app/utils/mqtt.js

260 lines
8.8 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

/**
* MQTT工具类 - 全局唯一连接、自动重连、支持自定义配置和订阅列表
* 适配uniapp小程序/APP/H5小程序仅支持ws/wss
*/
// 引入mqtt库需先安装npm install mqtt --save或下载mqtt.min.js到本地引入
import mqtt from 'mqtt'
// 全局变量
let client = null; // MQTT客户端实例
let subscribeList = []; // 订阅主题列表(重连后自动恢复订阅)
let reconnectTimer = null; // 重连定时器
let maxReconnectTimes = 10; // 最大重连次数
let currentReconnectTimes = 0; // 当前重连次数
let isManualDisconnect = false; // 是否主动断开(用于区分主动/被动断开,避免主动断开后重连)
/**
* 初始化MQTT连接
* @param {Object} config - MQTT连接配置
* @param {Array} subs - 初始订阅列表,格式:[{ topic: 'topic1', qos: 0 }, { topic: 'topic2', qos: 1 }]
* @returns {Promise} 连接成功/失败的Promise
*/
export function initMQTT(config, subs = []) {
return new Promise((resolve, reject) => {
// 1. 校验配置(必填项)
if (!config.host || !config.port) {
reject(new Error('MQTT配置错误host和port为必填项'));
return;
}
// 2. 保存订阅列表(用于重连恢复)
subscribeList = subs;
// 3. 避免重复连接
if (client && client.connected) {
resolve(client);
return;
}
// 4. 标记为非主动断开(允许重连)
isManualDisconnect = false;
// 5. 构建连接地址小程序仅支持ws/wss优先用wss更安全
const protocol = config.protocol || (config.isSSL ? 'wss' : 'ws');
const connectUrl = `${protocol}://${config.host}:${config.port}/mqtt`;
// 6. 构建MQTT连接选项
const mqttOptions = {
clientId: config.clientId || `uni_mqtt_${Math.random().toString(16).substr(2, 8)}`,
username: config.username || '',
password: config.password || '',
keepalive: config.keepalive || 60, // 心跳间隔(秒)
clean: config.clean !== undefined ? config.clean : true, // 是否清除会话
reconnectPeriod: 0, // 关闭内置重连(自定义重连逻辑)
connectTimeout: config.connectTimeout || 10000, // 连接超时(毫秒)
...config.extraOptions // 额外扩展配置
};
try {
// 7. 创建客户端并连接
client = mqtt.connect(connectUrl, mqttOptions);
// 8. 监听连接成功
client.on('connect', () => {
console.log('MQTT连接成功', connectUrl);
currentReconnectTimes = 0; // 重置重连次数
// 9. 订阅初始主题
subscribeTopics(subscribeList);
resolve(client);
});
// 10. 监听连接错误
client.on('error', (err) => {
console.error('MQTT连接错误', err);
client.end();
reject(err);
});
// 11. 监听连接断开(被动断开则触发重连)
client.on('close', () => {
console.log('MQTT连接已断开');
client = null;
// 非主动断开 + 未达最大重连次数 → 触发重连
if (!isManualDisconnect && currentReconnectTimes < maxReconnectTimes) {
reconnectMQTT(config);
}
});
// 12. 监听消息全局消息转发页面层通过uni.$on监听
client.on('message', (topic, message) => {
const msg = {
topic,
payload: message.toString(), // 转字符串原始是Buffer
timestamp: Date.now()
};
// 全局广播消息,页面层按需监听
uni.$emit('mqtt_message', msg);
});
} catch (err) {
console.error('MQTT初始化失败', err);
reject(err);
}
});
}
/**
* 订阅主题(支持单个/多个)
* @param {Array} topics - 订阅列表,格式:[{ topic: 'topic1', qos: 0 }, { topic: 'topic2', qos: 1 }]
*/
export function subscribeTopics(topics = []) {
if (!client || !client.connected) {
console.warn('MQTT未连接无法订阅主题');
return;
}
// 过滤空主题
const validTopics = topics.filter(item => item && item.topic);
if (validTopics.length === 0) return;
// 转换为mqtt库要求的格式{ topic1: { qos: 0 }, topic2: { qos: 1 } }
const subscribeMap = {};
validTopics.forEach(item => {
subscribeMap[item.topic] = { qos: item.qos || 0 };
});
client.subscribe(subscribeMap, (err) => {
if (err) {
console.error('MQTT订阅失败', err);
} else {
console.log('MQTT订阅成功', validTopics.map(item => item.topic));
// 更新订阅列表(用于重连恢复)
subscribeList = [...new Set([...subscribeList, ...validTopics])]; // 去重
}
});
}
/**
* 取消订阅主题
* @param {Array} topics - 取消订阅的主题列表,格式:['topic1', 'topic2']
*/
export function unsubscribeTopics(topics = []) {
if (!client || !client.connected) {
console.warn('MQTT未连接无法取消订阅');
return;
}
client.unsubscribe(topics, (err) => {
if (err) {
console.error('MQTT取消订阅失败', err);
} else {
console.log('MQTT取消订阅成功', topics);
// 更新订阅列表
subscribeList = subscribeList.filter(item => !topics.includes(item.topic));
}
});
}
/**
* 发布消息
* @param {String} topic - 发布的主题
* @param {String/Buffer} payload - 发布的消息内容
* @param {Object} options - 发布选项,如{ qos: 0, retain: false }
* @returns {Promise} 发布成功/失败的Promise
*/
export function publishMQTT(topic, payload, options = { qos: 0, retain: false }) {
return new Promise((resolve, reject) => {
if (!client || !client.connected) {
reject(new Error('MQTT未连接无法发布消息'));
return;
}
if (!topic) {
reject(new Error('发布失败:主题不能为空'));
return;
}
client.publish(topic, payload, options, (err) => {
if (err) {
console.error(`MQTT发布${topic}失败:`, err);
reject(err);
} else {
console.log(`MQTT发布${topic}成功:`, payload);
resolve(true);
}
});
});
}
/**
* 断开MQTT连接主动断开不会触发重连
*/
export function disconnectMQTT() {
// 标记为主动断开
isManualDisconnect = true;
// 清除重连定时器
if (reconnectTimer) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
// 断开连接
if (client && client.connected) {
client.end(false, () => { // false不发送遗嘱消息
console.log('MQTT主动断开连接');
client = null;
subscribeList = [];
});
}
}
/**
* 重连MQTT内部调用也可外部手动触发
* @param {Object} config - 连接配置同initMQTT的config
*/
export function reconnectMQTT(config) {
// 清除已有定时器,避免重复触发
if (reconnectTimer) {
clearTimeout(reconnectTimer);
}
// 重连间隔1秒/次(可自定义)
reconnectTimer = setTimeout(async () => {
currentReconnectTimes++;
console.log(`MQTT重连中${currentReconnectTimes}/${maxReconnectTimes}`);
try {
await initMQTT(config, subscribeList);
} catch (err) {
// 重连失败,继续尝试(直到达到最大次数)
if (currentReconnectTimes < maxReconnectTimes) {
reconnectMQTT(config);
} else {
console.error('MQTT重连次数已达上限停止重连');
uni.$emit('mqtt_reconnect_fail'); // 全局通知重连失败
}
}
}, 1000);
}
/**
* 获取MQTT客户端状态
* @returns {Object} 状态信息
*/
export function getMQTTStatus() {
return {
isConnected: !!client && client.connected, // 是否连接
client: client, // 客户端实例
subscribeList: [...subscribeList], // 已订阅列表(浅拷贝,避免外部修改)
currentReconnectTimes, // 当前重连次数
maxReconnectTimes // 最大重连次数
};
}
/**
* 手动触发重连(外部调用,比如页面主动刷新连接)
* @param {Object} config - 连接配置
*/
export function manualReconnect(config) {
currentReconnectTimes = 0; // 重置重连次数
reconnectMQTT(config);
}