添加大棚后更新订阅主题

feasure
lld 2026-03-03 03:48:51 +08:00
parent 81d08dbd56
commit ecad105f36
3 changed files with 113 additions and 87 deletions

41
App.vue
View File

@ -3,10 +3,6 @@ import config from './config'
import { getToken } from '@/utils/auth' import { getToken } from '@/utils/auth'
import mqttUtil from '@/utils/mqtt' import mqttUtil from '@/utils/mqtt'
import {startMqttOnlinePing, stopMqttOnlinePing} from "./utils/mqtt"; import {startMqttOnlinePing, stopMqttOnlinePing} from "./utils/mqtt";
import {batchSubscribe} from "./api/system/mqtt";
import store from "store";
import {listAgri} from "./api/system/assets/agri";
export default { export default {
subscribeList:[], subscribeList:[],
globalData: { globalData: {
@ -123,27 +119,7 @@ export default {
console.error('MQTT连接失败') console.error('MQTT连接失败')
return return
} }
var clientId = mqttUtil.getMqttState().clientId; mqttUtil.updateSubscribeTopic();
this.getSubscribeImei(clientId).then(res=>{
console.info("subscribeList",res)
const subscribeList = res;
this.globalData.mqtt.subscribeList = subscribeList || []
batchSubscribe({clientId: clientId}).then((result) => {
if (result.code === 200) {
console.info(`设备列表订阅成功:${subscribeList}`)
}
})
// ========== localStorage ==========
uni.setStorageSync('mqtt_subscribe_list', subscribeList || [])
if (subscribeList.length > 0) {
mqttUtil.updateSubscribeList(subscribeList)
console.log('恢复MQTT订阅列表', subscribeList)
// ========== localStorage ==========
uni.setStorageSync('mqtt_subscribe_list', subscribeList)
}
})
}, },
loginSuccess(token) { loginSuccess(token) {
this.globalData.mqtt.hasLogin = true this.globalData.mqtt.hasLogin = true
@ -163,21 +139,6 @@ export default {
uni.removeStorageSync('mqtt_subscribe_list') uni.removeStorageSync('mqtt_subscribe_list')
console.log('登出成功MQTT已断开') console.log('登出成功MQTT已断开')
}, },
getSubscribeImei(clientId) {
return listAgri().then(response => {
const subscribeList = [];
if (response.code === 200) {
response.rows.forEach(item =>
subscribeList.push(`frontend/${clientId}/dtu/${item.imei}/+`)
);
if (store.getters && store.getters.name === 'admin') {
subscribeList.push(`frontend/${clientId}/dtu/862538065276061/+`)
}
}
return subscribeList;
})
},
// token // token
handleTokenExpired() { handleTokenExpired() {

View File

@ -35,6 +35,9 @@
<script> <script>
import UniPopup from "../../uni_modules/uni-popup/components/uni-popup/uni-popup.vue"; import UniPopup from "../../uni_modules/uni-popup/components/uni-popup/uni-popup.vue";
import {addAgriMobile} from "../../api/system/assets/agri"; import {addAgriMobile} from "../../api/system/assets/agri";
import {updateSubscribeTopic} from "../../utils/mqtt";
import {batchUnsubscribe} from "../../api/system/mqtt";
import * as mqttUtil from "../../utils/mqtt";
export default { export default {
name: "addAgri", name: "addAgri",
@ -98,6 +101,11 @@ export default {
// 4. // 4.
if (res.confirm) { if (res.confirm) {
this.$emit("reload"); // this.$emit("reload"); //
batchUnsubscribe({clientId: mqttUtil.getMqttState().clientId}).then(response => {
if (response.code === 200) {
updateSubscribeTopic();
}
})
this.close(); // this.close(); //
} }
} }

View File

@ -10,8 +10,10 @@
// 微信小程序端:引入适配小程序的 mqtt 版本 // 微信小程序端:引入适配小程序的 mqtt 版本
import mqtt from 'mqtt/dist/mqtt' import mqtt from 'mqtt/dist/mqtt'
import {batchUnsubscribe} from "../api/system/mqtt"; import {batchSubscribe, batchUnsubscribe} from "../api/system/mqtt";
import {getToken} from "./auth"; import {getToken} from "./auth";
import {listAgri} from "../api/system/assets/agri";
import store from "../store";
// ===================== MQTT配置暂时写死TODO后续从数据字典获取===================== // ===================== MQTT配置暂时写死TODO后续从数据字典获取=====================
const MQTT_CONFIG = { const MQTT_CONFIG = {
@ -44,7 +46,7 @@ const mqttState = {
*/ */
export function initMqttConfig() { export function initMqttConfig() {
if (mqttState.client) { if (mqttState.client) {
console.info("重连前强制断开",mqttState.options.clientId) console.info("重连前强制断开", mqttState.options.clientId)
// 加try-catch避免断开时客户端已异常导致报错 // 加try-catch避免断开时客户端已异常导致报错
try { try {
mqttState.client.end(true) mqttState.client.end(true)
@ -61,7 +63,7 @@ export function initMqttConfig() {
console.log('MQTT配置初始化成功clientId', mqttState.options.clientId) console.log('MQTT配置初始化成功clientId', mqttState.options.clientId)
return true return true
} catch (err) { } catch (err) {
uni.showToast({ title: '设备连接异常-设备初始化失败', icon: 'none', duration: 2000 }) uni.showToast({title: '设备连接异常-设备初始化失败', icon: 'none', duration: 2000})
console.error('MQTT配置初始化失败', err) console.error('MQTT配置初始化失败', err)
return false return false
} }
@ -74,12 +76,12 @@ export function initMqttConfig() {
export function connectMqtt() { export function connectMqtt() {
// 前置校验:是否已初始化配置 // 前置校验:是否已初始化配置
if (!mqttState.options.clientId) { if (!mqttState.options.clientId) {
uni.showToast({ title: '设备连接失败', icon: 'none' }) uni.showToast({title: '设备连接失败', icon: 'none'})
console.error('MQTT连接失败请先调用initMqttConfig初始化配置') console.error('MQTT连接失败请先调用initMqttConfig初始化配置')
return false return false
} }
// 避免重复连接 // 避免重复连接
if (mqttState.client!=null) { if (mqttState.client != null) {
console.log('MQTT已连接无需重复操作') console.log('MQTT已连接无需重复操作')
return true return true
} }
@ -89,12 +91,12 @@ export function connectMqtt() {
// #ifndef MP-WEIXIN // #ifndef MP-WEIXIN
MQTT_CONFIG.server = 'wxs://mq.xiaoces.com:443/mqtt' MQTT_CONFIG.server = 'wxs://mq.xiaoces.com:443/mqtt'
// #endif // #endif
console.info("mqttState.connect",mqttState) console.info("mqttState.connect", mqttState)
mqttState.client = mqtt.connect(MQTT_CONFIG.server, mqttState.options) mqttState.client = mqtt.connect(MQTT_CONFIG.server, mqttState.options)
console.info("重连中。。") console.info("重连中。。")
// 监听核心事件(异步,同步更新状态) // 监听核心事件(异步,同步更新状态)
mqttState.client.on('connect', () => { mqttState.client.on('connect', () => {
console.log('MQTT连接成功',mqttState.options.clientId) console.log('MQTT连接成功', mqttState.options.clientId)
mqttState.isConnected = true mqttState.isConnected = true
// 连接成功后自动订阅全局列表 // 连接成功后自动订阅全局列表
subscribeAllTopics() subscribeAllTopics()
@ -102,7 +104,7 @@ export function connectMqtt() {
mqttState.client.on('close', () => { mqttState.client.on('close', () => {
if (getToken()) { if (getToken()) {
batchUnsubscribe({clientId:mqttState.options.clientId}).then(response => { batchUnsubscribe({clientId: mqttState.options.clientId}).then(response => {
if (response.code === 200) { if (response.code === 200) {
console.info("取消订阅成功!") console.info("取消订阅成功!")
} }
@ -113,7 +115,7 @@ export function connectMqtt() {
}) })
mqttState.client.on('error', (err) => { mqttState.client.on('error', (err) => {
uni.showToast({ title: '设备连接异常', icon: 'none' }) uni.showToast({title: '设备连接异常', icon: 'none'})
console.error('MQTT连接错误', err) console.error('MQTT连接错误', err)
mqttState.isConnected = false mqttState.isConnected = false
}) })
@ -129,7 +131,7 @@ export function connectMqtt() {
return true return true
} catch (err) { } catch (err) {
uni.showToast({ title: '设备连接异常', icon: 'none' }) uni.showToast({title: '设备连接异常', icon: 'none'})
console.error('MQTT连接创建失败', err) console.error('MQTT连接创建失败', err)
mqttState.isConnected = false mqttState.isConnected = false
return false return false
@ -164,7 +166,7 @@ export function removeOnMessageCallback() {
*/ */
export function updateSubscribeList(list) { export function updateSubscribeList(list) {
if (!Array.isArray(list)) { if (!Array.isArray(list)) {
uni.showToast({ title: '设备订阅更新异常', icon: 'none' }) uni.showToast({title: '设备订阅更新异常', icon: 'none'})
console.error('订阅列表必须是数组') console.error('订阅列表必须是数组')
return false return false
} }
@ -183,15 +185,15 @@ export function updateSubscribeList(list) {
* 内部方法订阅所有全局主题 * 内部方法订阅所有全局主题
*/ */
function subscribeAllTopics() { function subscribeAllTopics() {
const { isConnected, client, subscribeList } = mqttState const {isConnected, client, subscribeList} = mqttState
if (!isConnected || !client || subscribeList.length === 0) { if (!isConnected || !client || subscribeList.length === 0) {
console.warn('MQTT订阅跳过未连接或列表为空') console.warn('MQTT订阅跳过未连接或列表为空')
return return
} }
client.subscribe(subscribeList, { qos: 0 }, (err) => { client.subscribe(subscribeList, {qos: 0}, (err) => {
if (err) { if (err) {
uni.showToast({ title: '设备订阅异常', icon: 'none' }) uni.showToast({title: '设备订阅异常', icon: 'none'})
console.error('MQTT订阅失败', err) console.error('MQTT订阅失败', err)
} else { } else {
console.log(`MQTT成功订阅${subscribeList.join(', ')}`) console.log(`MQTT成功订阅${subscribeList.join(', ')}`)
@ -207,34 +209,34 @@ function subscribeAllTopics() {
*/ */
export function publishMqtt(topic, message) { export function publishMqtt(topic, message) {
// if (process.env.NODE_ENV === "production") { // if (process.env.NODE_ENV === "production") {
const { isConnected, client } = mqttState const {isConnected, client} = mqttState
if (!isConnected || !client) { if (!isConnected || !client) {
uni.showToast({ title: '控制异常', icon: 'none' }) uni.showToast({title: '控制异常', icon: 'none'})
console.error('MQTT发布失败未连接') console.error('MQTT发布失败未连接')
return false return false
} }
if (!topic) { if (!topic) {
uni.showToast({ title: '控制异常', icon: 'none' }) uni.showToast({title: '控制异常', icon: 'none'})
console.error('MQTT发布失败主题为空') console.error('MQTT发布失败主题为空')
return false return false
} }
try { try {
const msg = typeof message === 'object' ? JSON.stringify(message) : String(message) const msg = typeof message === 'object' ? JSON.stringify(message) : String(message)
client.publish(topic, msg, (err) => { client.publish(topic, msg, (err) => {
if (err) { if (err) {
uni.showToast({ title: '控制异常', icon: 'none' }) uni.showToast({title: '控制异常', icon: 'none'})
console.error(`MQTT发布失败topic=${topic}err=${err}`) console.error(`MQTT发布失败topic=${topic}err=${err}`)
} else { } else {
console.log(`MQTT发布成功topic=${topic}message=${msg}`) console.log(`MQTT发布成功topic=${topic}message=${msg}`)
} }
}) })
return true return true
} catch (err) { } catch (err) {
uni.showToast({ title: '控制异常', icon: 'none' }) uni.showToast({title: '控制异常', icon: 'none'})
console.error('MQTT发布异常', err) console.error('MQTT发布异常', err)
return false return false
} }
// } // }
} }
@ -249,7 +251,7 @@ export function disconnectMqtt() {
return true return true
} }
if (getToken()) { if (getToken()) {
batchUnsubscribe({clientId:mqttState.options.clientId}).then(response => { batchUnsubscribe({clientId: mqttState.options.clientId}).then(response => {
if (response.code === 200) { if (response.code === 200) {
console.info("取消订阅成功!") console.info("取消订阅成功!")
} }
@ -261,7 +263,7 @@ export function disconnectMqtt() {
console.log('MQTT连接已断开') console.log('MQTT连接已断开')
return true return true
} catch (err) { } catch (err) {
uni.showToast({ title: '设备通信异常', icon: 'none' }) uni.showToast({title: '设备通信异常', icon: 'none'})
console.error('MQTT断开失败', err) console.error('MQTT断开失败', err)
return false return false
} }
@ -292,7 +294,6 @@ export function getMqttState() {
} }
// utils/mqttOnline.js // utils/mqttOnline.js
let timer = null; let timer = null;
@ -305,11 +306,12 @@ export function startMqttOnlinePing(intervalMs = 20000) {
const ping = () => { const ping = () => {
try { try {
if (!mqttState.client.connected) return; if (!mqttState.client.connected) return;
const payload = JSON.stringify({ ts: Date.now() }); const payload = JSON.stringify({ts: Date.now()});
// qos=0 足够retain 不要 // qos=0 足够retain 不要
mqttState.client.publish(topic, payload, { qos: 0, retain: false }); mqttState.client.publish(topic, payload, {qos: 0, retain: false});
// console.info(`主题:${topic}${payload}`) // console.info(`主题:${topic}${payload}`)
} catch (e) {} } catch (e) {
}
}; };
ping(); ping();
@ -325,6 +327,60 @@ export function stopMqttOnlinePing() {
} }
/**
* 更新MQTT订阅主题无返回值不用async纯回调实现
*/
export function updateSubscribeTopic() {
const subscribeList = [];
// 初始化clientId空值保护
let clientId = mqttState.options?.clientId || '';
if (!clientId) {
throw new Error('MQTT clientId 为空,无法生成订阅主题');
}
// 1. 获取设备列表(纯.then/.catch回调
listAgri().then(response => {
if (response.code === 200) {
// 生成设备订阅主题校验imei非空
response.rows.forEach(item => {
if (item?.imei) {
subscribeList.push(`frontend/${clientId}/dtu/${item.imei}/+`);
}
});
// 管理员额外订阅指定设备
if (store.getters && store.getters.name === 'admin') {
subscribeList.push(`frontend/${clientId}/dtu/862538065276061/+`);
}
// 有订阅主题时执行批量订阅
if (subscribeList.length > 0) {
// 2. 批量订阅(链式.then保证执行顺序
batchSubscribe({clientId})
.then(result => {
if (result.code === 200) {
console.info(`设备列表订阅成功:${subscribeList}`);
// 更新订阅列表+同步本地存储
updateSubscribeList(subscribeList);
uni.setStorageSync('mqtt_subscribe_list', subscribeList);
console.log('恢复MQTT订阅列表', subscribeList);
} else {
console.error(`批量订阅失败,返回码:${result.code}`);
}
})
.catch(batchErr => {
console.error('批量订阅失败:', batchErr.message);
});
} else {
console.warn('无可用的订阅主题,跳过订阅操作');
}
} else {
console.error(`获取设备列表失败,返回码:${response.code}`);
}
}).catch(error => {
console.error('更新MQTT订阅主题失败', error.message);
});
}
// 导出所有方法(全局调用) // 导出所有方法(全局调用)
export default { export default {
initMqttConfig, initMqttConfig,
@ -332,6 +388,7 @@ export default {
setOnMessageCallback, setOnMessageCallback,
removeOnMessageCallback, removeOnMessageCallback,
updateSubscribeList, updateSubscribeList,
updateSubscribeTopic,
publishMqtt, publishMqtt,
disconnectMqtt, disconnectMqtt,
getMqttState getMqttState