From 913365927f88203dd1c7f0f870978e218f31e530 Mon Sep 17 00:00:00 2001 From: xce Date: Fri, 16 Jan 2026 01:55:40 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9A=82=E6=8F=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../web/controller/mqtt/MqttController.java | 44 +-- .../src/main/resources/application-mqtt.yml | 8 +- .../interceptor/MqttMessageHandler.java | 369 +++++------------- 3 files changed, 133 insertions(+), 288 deletions(-) diff --git a/agri-admin/src/main/java/com/agri/web/controller/mqtt/MqttController.java b/agri-admin/src/main/java/com/agri/web/controller/mqtt/MqttController.java index f572ba7..3afdbdd 100644 --- a/agri-admin/src/main/java/com/agri/web/controller/mqtt/MqttController.java +++ b/agri-admin/src/main/java/com/agri/web/controller/mqtt/MqttController.java @@ -1,21 +1,14 @@ package com.agri.web.controller.mqtt; import com.agri.framework.interceptor.MqttMessageHandler; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; -import java.util.Map; +import java.util.List; /** - * @Auther: jone - * @Date: 2026/1/15 - 01 - 15 - 23:45 - * @Description: com.agri.web.controller.mqtt - * @version: 1.0 + * MQTT订阅管理接口 */ - @RestController @RequestMapping("/api/mqtt") public class MqttController { @@ -24,30 +17,29 @@ public class MqttController { private MqttMessageHandler mqttMessageHandler; /** - * 前端订阅设备状态 + * 单个订阅 */ - @PostMapping("/subscribe") - public String subscribe(@RequestBody Map params) { - String clientId = params.get("clientId"); - String deviceId = params.get("deviceId"); - if (clientId == null || deviceId == null) { - return "参数错误"; - } + @PostMapping("/single") + public String subscribe(@RequestParam String clientId, @RequestParam String deviceId) { mqttMessageHandler.subscribeDevice(clientId, deviceId); return "订阅成功"; } /** - * 前端取消订阅设备状态 + * 单个取消 */ - @PostMapping("/unsubscribe") - public String unsubscribe(@RequestBody Map params) { - String clientId = params.get("clientId"); - String deviceId = params.get("deviceId"); - if (clientId == null || deviceId == null) { - return "参数错误"; - } + @DeleteMapping("/single") + public String unsubscribe(@RequestParam String clientId, @RequestParam String deviceId) { mqttMessageHandler.unsubscribeDevice(clientId, deviceId); return "取消订阅成功"; } + + /** + * 批量取消当前用户的所有设备订阅 + */ + @DeleteMapping("/batch") + public List unsubscribeAll(@RequestParam String clientId) { + // 返回前端需要取消的MQTT主题列表 + return mqttMessageHandler.unsubscribeAllDevice(clientId); + } } \ No newline at end of file diff --git a/agri-admin/src/main/resources/application-mqtt.yml b/agri-admin/src/main/resources/application-mqtt.yml index 0e1918e..cd625e1 100644 --- a/agri-admin/src/main/resources/application-mqtt.yml +++ b/agri-admin/src/main/resources/application-mqtt.yml @@ -5,6 +5,12 @@ spring: port: 6379 password: lld123 database: 1 + lettuce: + pool: + max-active: 8 + max-idle: 8 + min-idle: 2 + max-wait: 10000ms # MQTT配置 mqtt: host: tcp://122.51.109.52:1883 # 设备/后端的MQTT TCP地址 @@ -12,7 +18,7 @@ spring: username: admin # Mosquitto共用账号 password: Admin#12345678 # Mosquitto密码 client-id: springboot-backend-${random.uuid} # 后端客户端ID(唯一) - default-topic: device/+/status,device/+/heartbeat,frontend/+/control/+ # 后端监听的主题 + default-topic: dtu/+/up,frontend/+/control/+ # 后端监听的主题 qos: 1 # 消息可靠性 timeout: 60 # 连接超时 keep-alive: 60 # 心跳间隔 \ No newline at end of file diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java index 6d7900a..03c1f94 100644 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java @@ -9,27 +9,23 @@ import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; -import org.springframework.web.client.RestTemplate; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.time.LocalDateTime; -import java.util.Arrays; +import java.util.ArrayList; +import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; /** - * MQTT消息处理器 + * MQTT消息处理器(无心跳包版本) * 核心功能: - * 1. 订阅设备状态/心跳、前端控制指令主题 - * 2. 转发设备状态到订阅的前端 - * 3. 处理设备心跳,维护在线状态并上报4G平台 - * 4. 处理前端控制指令,权限校验+分布式锁+转发给设备 - * 5. 定时检查离线设备,更新状态并上报 - * - * @Auther: lld - * @Date: 2026/1/15 - 01 - 15 - 23:43 - * @version: 1.0 + * 1. 订阅设备状态、前端控制指令主题 + * 2. 管理前端-设备订阅关系(Redis) + * 3. 转发设备状态到订阅的前端 + * 4. 处理前端控制指令(权限校验+分布式锁+转发) + * 适配JDK 8,无心跳包相关逻辑 */ @Component public class MqttMessageHandler { @@ -46,34 +42,22 @@ public class MqttMessageHandler { @Resource private StringRedisTemplate stringRedisTemplate; - /** 4G平台API地址:用于上报设备在线/离线状态 */ - private static final String FOUR_G_API = "http://你的4G平台IP/api/device/status"; - - /** 心跳超时时间(秒):设备超过该时间未发心跳则判定为离线 */ - private static final long HEARTBEAT_TIMEOUT = 60; - - @Value("${spring.mqtt.default-topic}") + // 读取配置文件中的默认订阅主题(移除心跳主题) + @Value("${spring.mqtt.default-topic:device/+/status,frontend/+/control/+}") private String defaultTopic; + /** - * 初始化方法:项目启动时执行 - * 1. 设置MQTT回调函数 - * 2. 订阅核心主题 - * 3. 启动离线设备检查线程 - * - * @throws MqttException MQTT订阅失败异常 + * 初始化:订阅主题+设置回调 */ @PostConstruct public void subscribeTopics() throws MqttException { - // 定义需要监听的MQTT主题数组 - // device/+/status:所有设备的业务状态(温湿度、开关等) - // device/+/heartbeat:所有设备的心跳包(用于判定在线状态) - // frontend/+/control/+:所有前端发送的设备控制指令 - // 解析配置文件中的主题列表(逗号分隔) + // 解析配置的主题列表 String[] topics = defaultTopic.split(","); - // 对应主题的QoS级别(所有主题使用相同QoS,也可自定义多QoS配置) - int[] qos = new int[topics.length]; - // 所有主题QoS=1 - Arrays.fill(qos, 1); + int[] qosArray = new int[topics.length]; + // 按主题类型设置QoS:控制指令/状态用QoS 1 + for (int i = 0; i < topics.length; i++) { + qosArray[i] = 1; + } // 设置MQTT消息回调:处理连接断开、消息接收、消息发布完成 mqttClient.setCallback(new MqttCallback() { @@ -104,44 +88,30 @@ public class MqttMessageHandler { */ @Override public void deliveryComplete(IMqttDeliveryToken token) { - // 可添加消息发布成功的日志 - // System.out.println("消息发布完成:" + token.getTopics()[0]); + // QoS 1确认:消息已被服务端接收 + System.out.println("【MQTT确认】消息发布完成,主题:" + token.getTopics()[0]); } }); - // 订阅主题:阻塞式操作,订阅成功后继续执行 - mqttClient.subscribe(topics, qos); - System.out.println("【MQTT初始化】核心主题订阅成功,订阅列表:" + String.join(",", topics)); - - // 启动离线设备检查线程:独立线程,避免阻塞主线程 - // 线程名:offline-check-thread,便于日志排查 - new Thread(new Runnable() { - @Override - public void run() { - checkOfflineDevice(); - } - }, "offline-check-thread").start(); - System.out.println("【MQTT初始化】离线设备检查线程已启动"); + // 订阅主题 + mqttClient.subscribe(topics, qosArray); + System.out.println("【MQTT初始化】订阅主题:" + String.join(",", topics)); } /** - * 消息分发处理:根据主题类型路由到不同处理方法 + * 消息分发处理:根据主题类型路由到不同处理方法\仅处理设备状态、前端控制指令 * @param topic 消息主题 * @param payload 消息内容(JSON字符串) */ private void handleMessage(String topic, String payload) { try { - System.out.println("【MQTT消息接收】topic=" + topic + ", payload=" + payload); + System.out.println("【MQTT接收】topic=" + topic + ", payload=" + payload); - // 1. 处理设备业务状态主题:device/{deviceId}/status + // 设备状态主题:device/{deviceId}/status if (topic.matches("device/\\w+/status")) { handleDeviceStatus(topic, payload); } - // 2. 处理设备心跳主题:device/{deviceId}/heartbeat - else if (topic.matches("device/\\w+/heartbeat")) { - handleDeviceHeartbeat(topic, payload); - } - // 3. 处理前端控制指令主题:frontend/{clientId}/control/{deviceId} + // 处理前端控制指令主题:frontend/{clientId}/control/{deviceId} else if (topic.matches("frontend/\\w+/control/\\w+")) { handleFrontendControl(topic, payload); } @@ -152,38 +122,21 @@ public class MqttMessageHandler { } /** - * 处理设备业务状态消息 - * 逻辑: - * 1. 解析设备ID - * 2. 补充设备在线状态到消息体 - * 3. 查询订阅该设备的前端列表 - * 4. 转发消息到每个前端的专属主题 - * - * @param topic 消息主题(device/{deviceId}/status) - * @param payload 设备状态JSON字符串 - * @throws MqttException 消息发布异常 + * 处理设备状态:转发给订阅的前端 */ private void handleDeviceStatus(String topic, String payload) throws MqttException { // 解析设备ID:主题格式为device/{deviceId}/status,分割后第2个元素是设备ID String deviceId = topic.split("/")[1]; - - // 补充设备在线状态到payload(兼容JSON格式) - // 从Redis获取设备在线状态:device:online:{deviceId} → true/false - String onlineStatus = stringRedisTemplate.opsForValue().get("device:online:" + deviceId); - // 若Redis中无记录,默认离线 - String finalOnlineStatus = (onlineStatus == null) ? "false" : onlineStatus; - // 拼接在线状态到JSON末尾(兼容无空格的JSON格式) - String newPayload = payload.replace("}", ",\"online\":\"" + finalOnlineStatus + "\"}"); - - // 查询Redis中订阅该设备的前端clientId列表:sub:{deviceId} → Set + // 查询Redis中订阅该设备的前端列表:sub:{deviceId} Set subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId); + if (subscribedClients != null && !subscribedClients.isEmpty()) { - // 遍历所有订阅的前端,推送消息到前端专属主题 + // 推送给每个订阅的前端 for (String clientId : subscribedClients) { // 前端专属主题:frontend/{clientId}/device/{deviceId}/status String frontendTopic = "frontend/" + clientId + "/device/" + deviceId + "/status"; - // 发布消息(保留最新消息,前端订阅后可立即获取) - mqttMessageSender.publish(frontendTopic, newPayload); + // 发布消息 + mqttMessageSender.publish(frontendTopic, payload); System.out.println("【设备状态转发】设备" + deviceId + " → 前端" + clientId + ",主题:" + frontendTopic); } } else { @@ -192,109 +145,44 @@ public class MqttMessageHandler { } /** - * 处理设备心跳消息 - * 逻辑: - * 1. 解析设备ID - * 2. 更新Redis中设备最后心跳时间、在线状态 - * 3. 异步上报在线状态到4G平台(避免阻塞MQTT消息处理) - * - * @param topic 消息主题(device/{deviceId}/heartbeat) - * @param payload 心跳包JSON字符串(可包含timestamp等字段) - */ - private void handleDeviceHeartbeat(String topic, String payload) { - // 解析设备ID:主题格式为device/{deviceId}/heartbeat,分割后第2个元素是设备ID - String deviceId = topic.split("/")[1]; - - // 获取当前时间戳(秒) - long currentTime = System.currentTimeMillis() / 1000; - - // 更新Redis:存储最后心跳时间 → device:last_heartbeat:{deviceId} - stringRedisTemplate.opsForValue().set("device:last_heartbeat:" + deviceId, String.valueOf(currentTime)); - // 更新Redis:存储在线状态,设置过期时间(心跳超时+10秒),避免Redis数据堆积 - stringRedisTemplate.opsForValue().set( - "device:online:" + deviceId, - "true", - HEARTBEAT_TIMEOUT + 10, - TimeUnit.SECONDS - ); - - // 异步上报4G平台:使用独立线程,避免阻塞MQTT消息处理线程 - new Thread(new Runnable() { - @Override - public void run() { - try { - // 构造上报4G平台的JSON数据 - String statusJson = String.format( - "{\"device_id\":\"%s\",\"online\":true,\"timestamp\":%d}", - deviceId, currentTime - ); - - // 调用4G平台API(POST请求) - RestTemplate restTemplate = new RestTemplate(); - String response = restTemplate.postForObject(FOUR_G_API, statusJson, String.class); - - System.out.println("【4G平台上报】设备" + deviceId + "在线状态上报成功,响应:" + response); - } catch (Exception e) { - System.err.println("【4G平台上报】设备" + deviceId + "在线状态上报失败,异常:" + e.getMessage()); - } - } - }).start(); - } - - /** - * 处理前端控制指令 - * 逻辑: - * 1. 解析前端clientId、设备ID - * 2. 权限校验:验证前端是否有权操作该设备 - * 3. 分布式锁:避免多前端同时控制同一设备 - * 4. 记录操作日志 - * 5. 转发指令到设备专属主题 - * - * @param topic 消息主题(frontend/{clientId}/control/{deviceId}) - * @param payload 控制指令JSON字符串 - * @throws MqttException 消息发布异常 + * 处理前端控制指令:权限校验+分布式锁+转发给设备 */ private void handleFrontendControl(String topic, String payload) throws MqttException { - // 解析主题:frontend/{clientId}/control/{deviceId} + // 解析前端clientId、设备ID String[] parts = topic.split("/"); - String clientId = parts[1]; // 前端唯一标识 - String deviceId = parts[3]; // 目标设备ID + String clientId = parts[1]; + String deviceId = parts[3]; - // 1. 权限校验:失败则推送错误消息给前端 + // 1. 权限校验(示例:admin开头有全权限) if (!checkPermission(clientId, deviceId)) { String errorTopic = "frontend/" + clientId + "/error/" + deviceId; mqttMessageSender.publish(errorTopic, "{\"msg\":\"无设备操作权限\"}"); - System.err.println("【前端指令处理】前端" + clientId + "操作设备" + deviceId + "权限校验失败"); + System.err.println("【权限校验】前端" + clientId + "操作设备" + deviceId + "失败"); return; } - // 2. 分布式锁:lock:{deviceId},过期时间10秒(避免死锁) + // 2. 分布式锁:避免多前端并发控制 String lockKey = "lock:" + deviceId; Boolean lockSuccess = stringRedisTemplate.opsForValue().setIfAbsent( - lockKey, - clientId, - 10, - TimeUnit.SECONDS + lockKey, clientId, 10, TimeUnit.SECONDS ); - - // 锁获取失败:设备忙,推送错误消息给前端 if (lockSuccess == null || !lockSuccess) { String errorTopic = "frontend/" + clientId + "/error/" + deviceId; mqttMessageSender.publish(errorTopic, "{\"msg\":\"设备忙,请稍后重试\"}"); - System.err.println("【前端指令处理】前端" + clientId + "操作设备" + deviceId + "获取锁失败(设备忙)"); + System.err.println("【分布式锁】前端" + clientId + "操作设备" + deviceId + "失败"); return; } - // 3. 记录操作日志(示例:可替换为数据库存储) + // 3. 记录日志 System.out.println(String.format( - "【前端指令处理】前端%s于%s控制设备%s,指令:%s", + "【指令处理】前端%s于%s控制设备%s,指令:%s", clientId, LocalDateTime.now(), deviceId, payload )); - // 4. 转发指令到设备专属主题:device/{deviceId}/control + // 4. 转发指令到设备 String deviceTopic = "device/" + deviceId + "/control"; mqttMessageSender.publish(deviceTopic, payload); - System.out.println("【前端指令转发】前端" + clientId + " → 设备" + deviceId + ",主题:" + deviceTopic); + System.out.println("【指令转发】前端" + clientId + " → 设备" + deviceId); } /** @@ -312,117 +200,27 @@ public class MqttMessageHandler { if (clientId.startsWith("admin_")) { return true; } - - // 普通用户权限:校验Redis中是否绑定该设备 - return Boolean.TRUE.equals(stringRedisTemplate.opsForSet().isMember("user_device:" + clientId, deviceId)); + // 普通用户权限:校验Redis中是否绑定该设备:校验Redis中user_device:{clientId}是否包含该设备ID + return stringRedisTemplate.opsForSet().isMember("user_device:" + clientId, deviceId); } /** - * 定时检查离线设备(无限循环,每10秒执行一次) - * 逻辑: - * 1. 获取所有设备的最后心跳记录 - * 2. 判定是否离线(当前时间 - 最后心跳时间 > 心跳超时) - * 3. 更新Redis在线状态为离线 - * 4. 上报离线状态到4G平台 - */ - private void checkOfflineDevice() { - while (true) { - try { - // 获取Redis中所有设备的最后心跳记录:device:last_heartbeat:* - Set heartbeatKeys = stringRedisTemplate.keys("device:last_heartbeat:*"); - - // 无设备心跳记录,休眠10秒后继续 - if (heartbeatKeys == null || heartbeatKeys.isEmpty()) { - Thread.sleep(10000); - continue; - } - - // 当前时间戳(秒) - long currentTime = System.currentTimeMillis() / 1000; - - // 遍历所有设备心跳记录 - for (String key : heartbeatKeys) { - // 解析设备ID:key格式为device:last_heartbeat:{deviceId} - String deviceId = key.split(":")[2]; - // 获取最后心跳时间 - String lastHeartbeatStr = stringRedisTemplate.opsForValue().get(key); - - // 无心跳记录,跳过 - if (lastHeartbeatStr == null) { - continue; - } - - // 转换为长整型 - long lastHeartbeat = Long.parseLong(lastHeartbeatStr); - - // 判定离线:超过心跳超时时间未发心跳 - if (currentTime - lastHeartbeat > HEARTBEAT_TIMEOUT) { - // 更新Redis在线状态为离线 - stringRedisTemplate.opsForValue().set("device:online:" + deviceId, "false"); - - // 构造上报4G平台的JSON数据 - String statusJson = String.format( - "{\"device_id\":\"%s\",\"online\":false,\"timestamp\":%d}", - deviceId, currentTime - ); - - // 调用4G平台API上报离线状态 - RestTemplate restTemplate = new RestTemplate(); - restTemplate.postForObject(FOUR_G_API, statusJson, String.class); - - System.out.println("【离线设备检查】设备" + deviceId + "判定为离线,已上报4G平台"); - } - } - - // 每10秒检查一次 - Thread.sleep(10000); - } catch (InterruptedException e) { - // 线程中断,退出循环 - System.err.println("【离线设备检查】线程被中断,停止检查:" + e.getMessage()); - Thread.currentThread().interrupt(); - break; - } catch (Exception e) { - System.err.println("【离线设备检查】异常:" + e.getMessage()); - // 异常时休眠10秒,避免无限循环报错 - try { - Thread.sleep(10000); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - break; - } - } - } - } - - /** - * 前端订阅设备状态接口(供Controller层调用) - * 逻辑: - * 1. 将前端clientId加入设备的订阅列表 - * 2. 推送设备最新状态给前端(立即) - * - * @param clientId 前端唯一标识 - * @param deviceId 设备ID + * 前端订阅设备(Controller调用) */ public void subscribeDevice(String clientId, String deviceId) { - // 将前端clientId添加到设备的订阅列表:sub:{deviceId} + // 保存订阅关系到Redis stringRedisTemplate.opsForSet().add("sub:" + deviceId, clientId); - System.out.println("【前端订阅】前端" + clientId + "订阅设备" + deviceId + "成功"); + System.out.println("【订阅管理】前端" + clientId + "订阅设备" + deviceId + "成功"); - // 推送设备最新状态给前端(立即) - try { - // 从Redis获取设备最新状态:device:latest:{deviceId} - String latestStatus = stringRedisTemplate.opsForValue().get("device:latest:" + deviceId); - if (latestStatus != null) { - // 前端专属主题 - String frontendTopic = "frontend/" + clientId + "/device/" + deviceId + "/status"; + // 推送设备最新状态(可选) + String latestStatus = stringRedisTemplate.opsForValue().get("device:latest:" + deviceId); + if (latestStatus != null) { + String frontendTopic = "frontend/" + clientId + "/device/" + deviceId + "/status"; + try { mqttMessageSender.publish(frontendTopic, latestStatus); - System.out.println("【前端订阅】推送设备" + deviceId + "最新状态给前端" + clientId); - } else { - System.out.println("【前端订阅】设备" + deviceId + "无最新状态,跳过推送"); + } catch (MqttException e) { + System.err.println("【订阅推送】设备" + deviceId + "状态推送失败:" + e.getMessage()); } - } catch (MqttException e) { - System.err.println("【前端订阅】推送设备" + deviceId + "状态给前端" + clientId + "失败:" + e.getMessage()); - e.printStackTrace(); } } @@ -434,8 +232,57 @@ public class MqttMessageHandler { * @param deviceId 设备ID */ public void unsubscribeDevice(String clientId, String deviceId) { - // 从设备订阅列表移除前端clientId + // 从Redis删除订阅关系 stringRedisTemplate.opsForSet().remove("sub:" + deviceId, clientId); System.out.println("【前端取消订阅】前端" + clientId + "取消订阅设备" + deviceId + "成功"); } + + + + /** + * 批量取消前端的所有设备订阅(核心:根据clientId清理所有订阅关系) + * @param clientId 前端唯一标识(如wx_123) + * @return 前端需要取消的MQTT主题列表(供前端批量取消) + */ + public List unsubscribeAllDevice(String clientId) { + // 步骤1:查询该前端订阅的所有设备ID(Redis中所有sub:*集合中包含该clientId的key) + // 注意:生产环境建议用scan代替keys,避免阻塞Redis + Set subKeys = stringRedisTemplate.keys("sub:*"); + List deviceIds = new ArrayList<>(); + List frontendTopics = new ArrayList<>(); + + if (subKeys != null && !subKeys.isEmpty()) { + for (String subKey : subKeys) { + // 检查该sub:{deviceId}集合中是否包含当前clientId + Boolean isMember = stringRedisTemplate.opsForSet().isMember(subKey, clientId); + if (Boolean.TRUE.equals(isMember)) { + // 解析设备ID:sub:1001 → 1001 + String deviceId = subKey.split(":")[1]; + deviceIds.add(deviceId); + // 构建前端需要取消的MQTT主题 + String frontendTopic = "frontend/" + clientId + "/device/" + deviceId + "/status"; + frontendTopics.add(frontendTopic); + // 从该设备的订阅列表中移除clientId + stringRedisTemplate.opsForSet().remove(subKey, clientId); + System.out.println("【批量取消】前端" + clientId + "取消设备" + deviceId + "订阅"); + } + } + } + + // 步骤2:清理该前端的分布式锁(可选,防止死锁) + Set lockKeys = stringRedisTemplate.keys("lock:*"); + if (lockKeys != null && !lockKeys.isEmpty()) { + for (String lockKey : lockKeys) { + String lockValue = stringRedisTemplate.opsForValue().get(lockKey); + if (clientId.equals(lockValue)) { + stringRedisTemplate.delete(lockKey); + System.out.println("【批量取消】清理前端" + clientId + "持有的锁:" + lockKey); + } + } + } + + System.out.println("【批量取消】前端" + clientId + "共取消" + deviceIds.size() + "个设备订阅"); + return frontendTopics; + } + } \ No newline at end of file