master
lld 2025-12-30 22:39:25 +08:00
parent d4c8f4538a
commit 95572f8ffc
1 changed files with 49 additions and 228 deletions

View File

@ -1,257 +1,78 @@
/**
* MQTT工具类 - 全局唯一连接自动重连支持自定义配置和订阅列表
* 适配uniapp小程序/APP/H5小程序仅支持ws/wss
*/
// 引入mqtt库需先安装npm install mqtt --save或下载mqtt.min.js到本地引入
import mqtt from 'mqtt'
import mqtt from 'mqtt';
// 全局变量
let client = null; // MQTT客户端实例
let subscribeList = []; // 订阅主题列表(重连后自动恢复订阅)
let reconnectTimer = null; // 重连定时器
let maxReconnectTimes = 10; // 最大重连次数
let currentReconnectTimes = 0; // 当前重连次数
let isManualDisconnect = false; // 是否主动断开(用于区分主动/被动断开,避免主动断开后重连)
let client= {
instance : null,
connected: false
} ;
/**
* 初始化MQTT连接
* @param {Object} config - MQTT连接配置
* @param {Array} subs - 初始订阅列表格式[{ topic: 'topic1', qos: 0 }, { topic: 'topic2', qos: 1 }]
* // 初始化连接(移除事件监听,只返回客户端实例)
* @returns {Promise} 连接成功/失败的Promise
* 初始化MQTT连接无Promise纯回调
* @param {Object} config 连接配置 {host, port, username, password, clientId, isSSL, keepalive, clean}
* @param {Array} subs 订阅主题列表 [{topic: 'xxx', qos: 0}]
*/
export function initMQTT(config, subs = []) {
return new Promise((resolve, reject) => {
// 1. 校验配置(必填项)
if (!config.host || !config.port) {
reject(new Error('MQTT配置错误host和port为必填项'));
return;
}
// 2. 保存订阅列表(用于重连恢复)
subscribeList = subs;
// 3. 避免重复连接
if (client && client.connected) {
resolve(client);
return;
}
// 4. 标记为非主动断开(允许重连)
isManualDisconnect = false;
// 5. 构建连接地址小程序仅支持ws/wss优先用wss更安全
const protocol = config.protocol || (config.isSSL ? 'wss' : 'ws');
const connectUrl = `${protocol}://${config.host}:${config.port}/mqtt`;
// 6. 构建MQTT连接选项
const mqttOptions = {
clientId: config.clientId || `uni_mqtt_${Math.random().toString(16).substr(2, 8)}`,
username: config.username || '',
password: config.password || '',
keepalive: config.keepalive || 60, // 心跳间隔(秒)
clean: config.clean !== undefined ? config.clean : true, // 是否清除会话
reconnectPeriod: 0, // 关闭内置重连(自定义重连逻辑)
connectTimeout: config.connectTimeout || 10000, // 连接超时(毫秒)
...config.extraOptions // 额外扩展配置
};
try {
// 7. 创建客户端并连接
client = mqtt.connect(connectUrl, mqttOptions);
// 仅保留连接成功的基础逻辑(订阅列表),移除其他事件监听
client.on('connect', () => {
console.log('MQTT连接成功', connectUrl);
currentReconnectTimes = 0;
// 9. 订阅初始主题
subscribeTopics(subscribeList);
resolve(client); // 成功后返回客户端实例,让页面自定义监听
});
// 移除工具层的error/close/message监听 → 交给页面处理
// 保留重连的核心逻辑通过单独的close监听仅用于重连不处理业务
client.on('close', () => {
console.log('MQTT连接已断开工具层');
const oldClient = client; // 暂存旧客户端,避免重连时被覆盖
client = null;
if (!isManualDisconnect && currentReconnectTimes < maxReconnectTimes) {
reconnectMQTT(config).then(newClient => {
// 重连成功后,全局通知页面:客户端已更换,需重新绑定事件
uni.$emit('mqtt_reconnected', newClient);
}).catch(err => {
uni.$emit('mqtt_reconnect_fail', err);
});
}
// 通知页面:旧客户端已断开
uni.$emit('mqtt_closed', oldClient);
});
} catch (err) {
console.error('MQTT初始化失败', err);
reject(err);
}
});
}
// 其他方法subscribeTopics/unsubscribeTopics/publishMQTT/disconnectMQTT/reconnectMQTT/getMQTTStatus/manualReconnect
// 【完全不变】,直接复用之前的代码
/**
* 订阅主题支持单个/多个
* @param {Array} topics - 订阅列表格式[{ topic: 'topic1', qos: 0 }, { topic: 'topic2', qos: 1 }]
*/
export function subscribeTopics(topics = []) {
if (!client || !client.connected) {
console.warn('MQTT未连接无法订阅主题');
return;
// 1. 已有连接:直接回调成功
if (client.instance && client.instance.connected) {
return client;
}
// 过滤空主题
const validTopics = topics.filter(item => item && item.topic);
if (validTopics.length === 0) return;
// 转换为mqtt库要求的格式{ topic1: { qos: 0 }, topic2: { qos: 1 } }
const subscribeMap = {};
validTopics.forEach(item => {
subscribeMap[item.topic] = { qos: item.qos || 0 };
});
client.subscribe(subscribeMap, (err) => {
if (err) {
console.error('MQTT订阅失败', err);
} else {
console.log('MQTT订阅成功', validTopics.map(item => item.topic));
// 更新订阅列表(用于重连恢复)
subscribeList = [...new Set([...subscribeList, ...validTopics])]; // 去重
}
});
}
/**
* 取消订阅主题
* @param {Array} topics - 取消订阅的主题列表格式['topic1', 'topic2']
*/
export function unsubscribeTopics(topics = []) {
if (!client || !client.connected) {
console.warn('MQTT未连接无法取消订阅');
return;
const options = {
clientId: 'uniapp_mqtt_' + Math.random().toString(16).substr(2, 8),
username: config.username,
password: config.password,
clean: true,
connectTimeout: 4000,
reconnectPeriod: 1000
}
client.unsubscribe(topics, (err) => {
if (err) {
console.error('MQTT取消订阅失败', err);
} else {
console.log('MQTT取消订阅成功', topics);
// 更新订阅列表
subscribeList = subscribeList.filter(item => !topics.includes(item.topic));
}
});
// 5. 构建连接参数
const protocol = config.protocol || (config.isSSL ? 'wss' : 'ws');
const url = `${protocol}://${config.host}:${config.port}/mqtt`
client.instance = mqtt.connect(url, options)
client.instance.on('connect', () => {
client.connected = true
client.instance.subscribe(subs, {qos: 0})
console.info('已连接到MQTT服务器')
console.info(client.instance)
})
client.instance.on('close', () => {
console.info('连接已关闭')
client.connected = false
console.info(client.instance)
})
}
/**
* 发布消息
* @param {String} topic - 发布的主题
* @param {String/Buffer} payload - 发布的消息内容
* @param {Object} options - 发布选项{ qos: 0, retain: false }
* @returns {Promise} 发布成功/失败的Promise
*/
export function publishMQTT(topic, payload, options = { qos: 0, retain: false }) {
return new Promise((resolve, reject) => {
if (!client || !client.connected) {
reject(new Error('MQTT未连接无法发布消息'));
return;
}
if (!topic) {
reject(new Error('发布失败:主题不能为空'));
return;
}
client.publish(topic, payload, options, (err) => {
if (err) {
console.error(`MQTT发布${topic}失败:`, err);
reject(err);
} else {
console.log(`MQTT发布${topic}成功:`, payload);
resolve(true);
}
});
});
}
/**
* 断开MQTT连接主动断开不会触发重连
* 断开MQTT连接无Promise纯回调
* @param {Function} onComplete 完成回调 () => {}
*/
export function disconnectMQTT() {
// 标记为主动断开
isManualDisconnect = true;
// 清除重连定时器
if (reconnectTimer) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
// 断开连接
if (client && client.connected) {
client.end(false, () => { // false不发送遗嘱消息
console.log('MQTT主动断开连接');
client = null;
subscribeList = [];
});
if (client.instance && client.connected) {
client.instance.end()
client.connected = false
console.info('已断开MQTT连接')
}
}
/**
* 重连MQTT内部调用也可外部手动触发
* @param {Object} config - 连接配置同initMQTT的config
* 获取MQTT客户端实例
* @returns {Client} MQTT客户端
*/
export function reconnectMQTT(config) {
// 清除已有定时器,避免重复触发
if (reconnectTimer) {
clearTimeout(reconnectTimer);
}
// 重连间隔1秒/次(可自定义)
reconnectTimer = setTimeout(async () => {
currentReconnectTimes++;
console.log(`MQTT重连中${currentReconnectTimes}/${maxReconnectTimes}`);
try {
await initMQTT(config, subscribeList);
} catch (err) {
// 重连失败,继续尝试(直到达到最大次数)
if (currentReconnectTimes < maxReconnectTimes) {
reconnectMQTT(config);
} else {
console.error('MQTT重连次数已达上限停止重连');
uni.$emit('mqtt_reconnect_fail'); // 全局通知重连失败
}
}
}, 1000);
export function getMQTTClientInstance() {
return client.instance;
}
/**
* 获取MQTT客户端状态
* @returns {Object} 状态信息
* 获取MQTT连接状态
* @returns {Object} {isConnected: Boolean, isConnecting: Boolean}
*/
export function getMQTTStatus() {
return {
isConnected: !!client && client.connected, // 是否连接
client: client, // 客户端实例
subscribeList: [...subscribeList], // 已订阅列表(浅拷贝,避免外部修改)
currentReconnectTimes, // 当前重连次数
maxReconnectTimes // 最大重连次数
isConnected: !!client && client.connected,
};
}
/**
* 手动触发重连外部调用比如页面主动刷新连接
* @param {Object} config - 连接配置
*/
export function manualReconnect(config) {
currentReconnectTimes = 0; // 重置重连次数
reconnectMQTT(config);
}
export function getMQTTClientInstance() {
return client;
}