170 lines
5.8 KiB
JavaScript
170 lines
5.8 KiB
JavaScript
import mqtt from 'mqtt';
|
||
|
||
// 全局变量:封装更完整的状态
|
||
let client = {
|
||
instance: null, // MQTT客户端实例
|
||
connected: false, // 最终连接状态
|
||
isConnecting: false, // 连接中状态(防止重复连接)
|
||
isManualDisconnect: false // 是否手动断开(区分主动/被动断开)
|
||
};
|
||
|
||
/**
|
||
* 初始化MQTT连接(无Promise,纯回调)
|
||
* @param {Object} config 连接配置 {host, port, username, password, clientId, isSSL, keepalive, clean}
|
||
* @param {Array} subs 订阅主题列表 [{topic: 'xxx', qos: 0}]
|
||
* @param {Function} onSuccess 成功回调 (client) => {}
|
||
* @param {Function} onError 失败回调 (err) => {}
|
||
*/
|
||
export function initMQTT(config, subs = [], onSuccess, onError) {
|
||
// 1. 已有有效连接:直接返回
|
||
if (client.instance && client.connected) {
|
||
onSuccess && onSuccess(client.instance);
|
||
return client;
|
||
}
|
||
|
||
// 2. 正在连接中:防止重复请求
|
||
if (client.isConnecting) {
|
||
console.info('MQTT已有连接请求在途,等待结果...');
|
||
// 轮询检查连接状态,直到连接完成/失败
|
||
const checkTimer = setInterval(() => {
|
||
if (!client.isConnecting) {
|
||
clearInterval(checkTimer);
|
||
if (client.connected) {
|
||
onSuccess && onSuccess(client.instance);
|
||
} else {
|
||
onError && onError(new Error('MQTT连接请求失败'));
|
||
}
|
||
}
|
||
}, 100);
|
||
return;
|
||
}
|
||
|
||
// 3. 参数校验
|
||
if (!config.host || !config.port) {
|
||
const err = new Error('MQTT配置错误:host/port不能为空');
|
||
onError && onError(err);
|
||
return;
|
||
}
|
||
|
||
// 4. 标记连接中
|
||
client.isConnecting = true;
|
||
client.isManualDisconnect = false;
|
||
|
||
// 5. 构建连接参数
|
||
const protocol = config.protocol || (config.isSSL ? 'wss' : 'ws');
|
||
const url = `${protocol}://${config.host}:${config.port}/mqtt`;
|
||
|
||
const options = {
|
||
clientId: config.clientId || ('uniapp_mqtt_' + Math.random().toString(16).substr(2, 8)),
|
||
username: config.username || '',
|
||
password: config.password || '',
|
||
clean: config.clean !== undefined ? config.clean : true,
|
||
connectTimeout: config.connectTimeout || 4000,
|
||
reconnectPeriod: config.reconnectPeriod || 1000, // 自动重连间隔(可配置)
|
||
keepalive: config.keepalive || 60
|
||
};
|
||
|
||
// 6. 创建客户端并绑定事件
|
||
try {
|
||
client.instance = mqtt.connect(url, options);
|
||
|
||
// 连接成功
|
||
client.instance.on('connect', () => {
|
||
console.info('已连接到MQTT服务器');
|
||
client.connected = true;
|
||
client.isConnecting = false;
|
||
// 订阅主题(带错误处理)
|
||
client.instance.subscribe(subs, { qos: 0 }, (err) => {
|
||
if (err) {
|
||
console.error('MQTT订阅失败:', err);
|
||
onError && onError(new Error(`订阅失败:${err.message}`));
|
||
} else {
|
||
console.info(`MQTT订阅成功:${subs.map(item => item.topic).join(',')}`);
|
||
onSuccess && onSuccess(client.instance);
|
||
}
|
||
});
|
||
});
|
||
|
||
// 连接错误(核心:处理所有连接失败场景)
|
||
client.instance.on('error', (err) => {
|
||
console.error('MQTT连接错误:', err);
|
||
client.connected = false;
|
||
client.isConnecting = false;
|
||
onError && onError(err);
|
||
});
|
||
|
||
// 连接关闭(被动断开:服务器/网络原因)
|
||
client.instance.on('close', () => {
|
||
console.info('MQTT连接已关闭');
|
||
client.connected = false;
|
||
client.isConnecting = false;
|
||
// 非手动断开则保留实例(等待自动重连)
|
||
if (!client.isManualDisconnect) {
|
||
console.info('非手动断开,等待自动重连...');
|
||
} else {
|
||
client.instance = null; // 手动断开则清空实例
|
||
}
|
||
});
|
||
|
||
// 重连中(可选:感知重连状态)
|
||
client.instance.on('reconnect', () => {
|
||
console.info('MQTT正在重连...');
|
||
client.isConnecting = true;
|
||
client.connected = false;
|
||
});
|
||
|
||
} catch (err) {
|
||
console.error('MQTT客户端创建失败:', err);
|
||
client.isConnecting = false;
|
||
client.connected = false;
|
||
client.instance = null;
|
||
onError && onError(err);
|
||
}
|
||
|
||
return client;
|
||
}
|
||
|
||
/**
|
||
* 断开MQTT连接(无Promise,纯回调)
|
||
* @param {Function} onComplete 完成回调 () => {}
|
||
*/
|
||
export function disconnectMQTT(onComplete) {
|
||
// 标记为手动断开(避免重连)
|
||
client.isManualDisconnect = true;
|
||
|
||
if (client.instance && client.connected) {
|
||
client.instance.end(false, () => { // false:不发送will消息
|
||
console.info('已断开MQTT连接');
|
||
client.connected = false;
|
||
client.isConnecting = false;
|
||
client.instance = null; // 清空实例
|
||
onComplete && onComplete();
|
||
});
|
||
} else {
|
||
// 无有效连接,直接回调
|
||
client.connected = false;
|
||
client.isConnecting = false;
|
||
client.instance = null;
|
||
onComplete && onComplete();
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 获取MQTT客户端实例
|
||
* @returns {{instance: null, connected: boolean, isConnecting: boolean, isManualDisconnect: boolean}} MQTT客户端
|
||
*/
|
||
export function getMQTTClient() {
|
||
return client;
|
||
}
|
||
|
||
/**
|
||
* 获取MQTT连接状态(更精准)
|
||
* @returns {Object} 完整状态
|
||
*/
|
||
export function getMQTTStatus() {
|
||
return {
|
||
isConnected: client.connected, // 最终连接状态
|
||
isConnecting: client.isConnecting, // 连接中/重连中
|
||
isManualDisconnect: client.isManualDisconnect // 是否手动断开
|
||
};
|
||
} |