agri-app/utils/mqtt.js

170 lines
5.8 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import mqtt from 'mqtt';
// 全局变量:封装更完整的状态
let client = {
instance: null, // MQTT客户端实例
connected: false, // 最终连接状态
isConnecting: false, // 连接中状态(防止重复连接)
isManualDisconnect: false // 是否手动断开(区分主动/被动断开)
};
/**
* 初始化MQTT连接无Promise纯回调
* @param {Object} config 连接配置 {host, port, username, password, clientId, isSSL, keepalive, clean}
* @param {Array} subs 订阅主题列表 [{topic: 'xxx', qos: 0}]
* @param {Function} onSuccess 成功回调 (client) => {}
* @param {Function} onError 失败回调 (err) => {}
*/
export function initMQTT(config, subs = [], onSuccess, onError) {
// 1. 已有有效连接:直接返回
if (client.instance && client.connected) {
onSuccess && onSuccess(client.instance);
return client;
}
// 2. 正在连接中:防止重复请求
if (client.isConnecting) {
console.info('MQTT已有连接请求在途等待结果...');
// 轮询检查连接状态,直到连接完成/失败
const checkTimer = setInterval(() => {
if (!client.isConnecting) {
clearInterval(checkTimer);
if (client.connected) {
onSuccess && onSuccess(client.instance);
} else {
onError && onError(new Error('MQTT连接请求失败'));
}
}
}, 100);
return;
}
// 3. 参数校验
if (!config.host || !config.port) {
const err = new Error('MQTT配置错误host/port不能为空');
onError && onError(err);
return;
}
// 4. 标记连接中
client.isConnecting = true;
client.isManualDisconnect = false;
// 5. 构建连接参数
const protocol = config.protocol || (config.isSSL ? 'wss' : 'ws');
const url = `${protocol}://${config.host}:${config.port}/mqtt`;
const options = {
clientId: config.clientId || ('uniapp_mqtt_' + Math.random().toString(16).substr(2, 8)),
username: config.username || '',
password: config.password || '',
clean: config.clean !== undefined ? config.clean : true,
connectTimeout: config.connectTimeout || 4000,
reconnectPeriod: config.reconnectPeriod || 1000, // 自动重连间隔(可配置)
keepalive: config.keepalive || 60
};
// 6. 创建客户端并绑定事件
try {
client.instance = mqtt.connect(url, options);
// 连接成功
client.instance.on('connect', () => {
console.info('已连接到MQTT服务器');
client.connected = true;
client.isConnecting = false;
// 订阅主题(带错误处理)
client.instance.subscribe(subs, { qos: 0 }, (err) => {
if (err) {
console.error('MQTT订阅失败', err);
onError && onError(new Error(`订阅失败:${err.message}`));
} else {
console.info(`MQTT订阅成功${subs.map(item => item.topic).join(',')}`);
onSuccess && onSuccess(client.instance);
}
});
});
// 连接错误(核心:处理所有连接失败场景)
client.instance.on('error', (err) => {
console.error('MQTT连接错误', err);
client.connected = false;
client.isConnecting = false;
onError && onError(err);
});
// 连接关闭(被动断开:服务器/网络原因)
client.instance.on('close', () => {
console.info('MQTT连接已关闭');
client.connected = false;
client.isConnecting = false;
// 非手动断开则保留实例(等待自动重连)
if (!client.isManualDisconnect) {
console.info('非手动断开,等待自动重连...');
} else {
client.instance = null; // 手动断开则清空实例
}
});
// 重连中(可选:感知重连状态)
client.instance.on('reconnect', () => {
console.info('MQTT正在重连...');
client.isConnecting = true;
client.connected = false;
});
} catch (err) {
console.error('MQTT客户端创建失败', err);
client.isConnecting = false;
client.connected = false;
client.instance = null;
onError && onError(err);
}
return client;
}
/**
* 断开MQTT连接无Promise纯回调
* @param {Function} onComplete 完成回调 () => {}
*/
export function disconnectMQTT(onComplete) {
// 标记为手动断开(避免重连)
client.isManualDisconnect = true;
if (client.instance && client.connected) {
client.instance.end(false, () => { // false不发送will消息
console.info('已断开MQTT连接');
client.connected = false;
client.isConnecting = false;
client.instance = null; // 清空实例
onComplete && onComplete();
});
} else {
// 无有效连接,直接回调
client.connected = false;
client.isConnecting = false;
client.instance = null;
onComplete && onComplete();
}
}
/**
* 获取MQTT客户端实例
* @returns {Client} MQTT客户端
*/
export function getMQTTClientInstance() {
return client.instance;
}
/**
* 获取MQTT连接状态更精准
* @returns {Object} 完整状态
*/
export function getMQTTStatus() {
return {
isConnected: client.connected, // 最终连接状态
isConnecting: client.isConnecting, // 连接中/重连中
isManualDisconnect: client.isManualDisconnect // 是否手动断开
};
}