agri-app/utils/mqtt.js

214 lines
7.9 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import mqtt from 'mqtt'
// 全局MQTT实例单例避免多页面重复创建
let mqttInstance = null;
/**
* 初始化MQTT客户端单例模式
* @param {Object} mqttConfig 配置项
* @param {Array|String} subTopic 初始订阅主题
* @returns {Object} MQTT实例
*/
function createMqttClient(mqttConfig, subTopic) {
// 已存在实例则直接返回
if (mqttInstance && mqttInstance.connected) {
// 若传入新主题,补充订阅
if (subTopic) {
mqttInstance.subscribe(subTopic);
}
return mqttInstance;
}
// 标准化配置(默认值 + 防错)
const config = {
host: mqttConfig.host || '1.94.254.176',
port: mqttConfig.port || 9001,
clientId: mqttConfig.clientId || `uniapp_mqtt_${Date.now()}${Math.random().toString(16).substr(2, 4)}`, // 时间戳+随机数,减少重复
username: mqttConfig.username || 'admin',
password: mqttConfig.password || 'Admin#12345678',
clean: false, // 关键改为false保持会话缓存消息
reconnectPeriod: mqttConfig.reconnectPeriod || 3000,
connectTimeout: mqttConfig.connectTimeout || 5000,
qos: mqttConfig.qos || 1 // 默认QoS1确保消息不丢
};
// 连接选项
const options = {
clientId: config.clientId,
username: config.username,
password: config.password,
clean: config.clean,
connectTimeout: config.connectTimeout,
reconnectPeriod: config.reconnectPeriod,
keepalive: 60 // 新增心跳,避免连接被断开
};
// 拼接WS地址兼容配置错误
const url = `ws://${config.host}:${config.port}/mqtt`;
// 创建客户端
const client = mqtt.connect(url, options);
// 实例状态管理
const instance = {
client: client,
connected: false,
subscribedTopics: new Set(), // 记录已订阅主题
config: config,
messageCallback: null, // 消息接收回调
statusCallback: null, // 状态变更回调
// 订阅主题(支持单个/多个,带重试)
subscribe: function (topics, qos = config.qos) {
if (!this.connected) {
console.warn('MQTT未连接延迟订阅:', topics);
// 连接成功后自动订阅
client.once('connect', () => this.subscribe(topics, qos));
return;
}
const topicList = Array.isArray(topics) ? topics : [topics];
client.subscribe(topicList, { qos }, (err) => {
if (err) {
console.error('订阅失败:', err, '主题:', topicList);
// 订阅失败重试(仅一次)
setTimeout(() => this.subscribe(topics, qos), 1000);
} else {
topicList.forEach(t => this.subscribedTopics.add(t));
console.log(`订阅成功${topicList.length > 1 ? '(批量)' : '(单个)'}:`, topicList);
this.statusCallback && this.statusCallback('subscribe_success', topicList);
}
});
},
// 取消订阅
unsubscribe: function (topics) {
if (!this.connected) return;
const topicList = Array.isArray(topics) ? topics : [topics];
client.unsubscribe(topicList, (err) => {
if (err) {
console.error('取消订阅失败:', err);
} else {
topicList.forEach(t => this.subscribedTopics.delete(t));
console.log('取消订阅成功:', topicList);
this.statusCallback && this.statusCallback('unsubscribe_success', topicList);
}
});
},
// 发布消息(带参数,失败重试)
publish: function (topic, message, qos = config.qos, retain = false) {
return new Promise((resolve, reject) => {
if (!this.connected) {
reject(new Error('MQTT未连接无法发布消息'));
// 自动重连后发布
this.reconnectAndPublish(topic, message, qos, retain);
return;
}
// 标准化消息格式对象转JSON
const payload = typeof message === 'object' ? JSON.stringify(message) : String(message);
client.publish(topic, payload, { qos, retain }, (err) => {
if (err) {
console.error('发布失败:', err, '主题:', topic);
reject(err);
// 发布失败重试
setTimeout(() => this.publish(topic, message, qos, retain), 1000);
} else {
console.log('发布成功:', topic, '内容:', payload);
resolve({ topic, payload });
}
});
});
},
// 重连后补发消息
reconnectAndPublish: function (topic, message, qos, retain) {
client.once('connect', () => {
this.publish(topic, message, qos, retain);
});
},
// 断开连接
disconnect: function () {
if (this.connected && this.client) {
this.client.end(false, () => { // false等待剩余消息发送完成
this.connected = false;
this.subscribedTopics.clear();
console.log('MQTT连接已断开保留会话');
this.statusCallback && this.statusCallback('disconnect');
});
}
}
};
// 绑定客户端事件
client.on('connect', () => {
instance.connected = true;
console.log('MQTT连接成功ClientId:', config.clientId);
instance.statusCallback && instance.statusCallback('connect_success');
// 初始订阅主题
if (subTopic) {
instance.subscribe(subTopic);
}
})
.on('reconnect', (error) => {
instance.connected = false;
console.log('MQTT正在重连...', error);
instance.statusCallback && instance.statusCallback('reconnect', error);
})
.on('error', (error) => {
instance.connected = false;
console.error('MQTT连接错误:', error);
instance.statusCallback && instance.statusCallback('error', error);
})
.on('close', () => {
instance.connected = false;
console.log('MQTT连接关闭');
instance.statusCallback && instance.statusCallback('close');
})
.on('offline', () => {
instance.connected = false;
console.log('MQTT客户端下线');
instance.statusCallback && instance.statusCallback('offline');
})
.on('message', (topic, payload) => {
const msg = payload.toString();
console.log('收到MQTT消息:', topic, msg);
// 消息回调,交给业务层处理
instance.messageCallback && instance.messageCallback(topic, msg);
});
// 赋值单例
mqttInstance = instance;
return instance;
}
/**
* 对外暴露的核心方法
*/
export const mqttTool = {
// 初始化连接
connect: function (mqttConfig, subTopic) {
return createMqttClient(mqttConfig, subTopic);
},
// 获取全局实例
getInstance: function () {
return mqttInstance;
},
// 页面切换时的订阅管理(核心解决多页面订阅问题)
switchPageTopic: function (newTopics, oldTopics) {
const instance = mqttInstance;
if (!instance) return;
// 先订阅新主题,再取消旧主题(避免漏消息)
if (newTopics) {
instance.subscribe(newTopics);
}
if (oldTopics) {
instance.unsubscribe(oldTopics);
}
}
};