From c297d418dae02482479ad6fe6a2cae15ac90859a Mon Sep 17 00:00:00 2001 From: xce Date: Sat, 24 Jan 2026 17:44:46 +0800 Subject: [PATCH] =?UTF-8?q?mqtt=E6=9E=B6=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../web/controller/mqtt/MqttController.java | 19 +- .../com/agri/framework/config/MqttConfig.java | 6 +- .../interceptor/DeviceStatusHandler.java | 235 ---- .../interceptor/FrontendControlHandler.java | 127 -- .../interceptor/FrontendOnlineHandler.java | 61 - .../interceptor/MqttMessageHandler.java | 1115 +++++++++++++++++ .../framework/manager/MqttAutoOffManager.java | 265 ---- .../framework/manager/MqttClientManager.java | 278 ---- .../manager/MqttSubscriptionManager.java | 317 ----- .../web/dispatcher/MqttMessageDispatcher.java | 69 - 10 files changed, 1126 insertions(+), 1366 deletions(-) delete mode 100644 agri-framework/src/main/java/com/agri/framework/interceptor/DeviceStatusHandler.java delete mode 100644 agri-framework/src/main/java/com/agri/framework/interceptor/FrontendControlHandler.java delete mode 100644 agri-framework/src/main/java/com/agri/framework/interceptor/FrontendOnlineHandler.java create mode 100644 agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java delete mode 100644 agri-framework/src/main/java/com/agri/framework/manager/MqttAutoOffManager.java delete mode 100644 agri-framework/src/main/java/com/agri/framework/manager/MqttClientManager.java delete mode 100644 agri-framework/src/main/java/com/agri/framework/manager/MqttSubscriptionManager.java delete mode 100644 agri-framework/src/main/java/com/agri/framework/web/dispatcher/MqttMessageDispatcher.java diff --git a/agri-admin/src/main/java/com/agri/web/controller/mqtt/MqttController.java b/agri-admin/src/main/java/com/agri/web/controller/mqtt/MqttController.java index 66b521b..d6fe9cb 100644 --- a/agri-admin/src/main/java/com/agri/web/controller/mqtt/MqttController.java +++ b/agri-admin/src/main/java/com/agri/web/controller/mqtt/MqttController.java @@ -3,8 +3,7 @@ package com.agri.web.controller.mqtt; import com.agri.common.annotation.Log; import com.agri.common.core.domain.AjaxResult; import com.agri.common.enums.BusinessType; -import com.agri.framework.manager.MqttClientManager; -import com.agri.framework.manager.MqttSubscriptionManager; +import com.agri.framework.interceptor.MqttMessageHandler; import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,9 +27,7 @@ public class MqttController { private static final Logger log = LoggerFactory.getLogger(MqttController.class); @Resource - private MqttSubscriptionManager mqttSubscriptionManager; - @Resource - private MqttClientManager mqttClientManager; + private MqttMessageHandler mqttMessageHandler; /** * 单个订阅 @@ -39,7 +36,7 @@ public class MqttController { @Log(title = "订阅主题", businessType = BusinessType.INSERT) public String subscribe(@RequestParam String clientId, @RequestParam String deviceId) { try { - mqttSubscriptionManager.subscribeDevice(clientId, deviceId); + mqttMessageHandler.subscribeDevice(clientId, deviceId); return "订阅成功"; } catch (IllegalArgumentException e) { log.error("MQTT单个订阅失败:{}", e.getMessage()); @@ -56,7 +53,7 @@ public class MqttController { @DeleteMapping("/single") public String unsubscribe(@RequestParam String clientId, @RequestParam String deviceId) { try { - mqttSubscriptionManager.unsubscribeDevice(clientId, deviceId); + mqttMessageHandler.unsubscribeDevice(clientId, deviceId); return "取消订阅成功"; } catch (IllegalArgumentException e) { log.error("MQTT单个取消订阅失败:{}", e.getMessage()); @@ -75,7 +72,7 @@ public class MqttController { public AjaxResult subscribeAll(@RequestParam String clientId) { try { // 返回前端需要取消的MQTT主题列表 - return AjaxResult.success(mqttSubscriptionManager.subscribeAllDeviceByUserId(clientId)); + return AjaxResult.success(mqttMessageHandler.subscribeAllDeviceByUserId(clientId)); } catch (IllegalArgumentException e) { log.error("MQTT批量订阅失败:{}", e.getMessage()); // 异常时返回空列表,避免前端解析失败 @@ -94,7 +91,7 @@ public class MqttController { public List unsubscribeAll(@RequestParam String clientId) { try { // 返回前端需要取消的MQTT主题列表 - return mqttSubscriptionManager.unsubscribeAllDevice(clientId); + return mqttMessageHandler.unsubscribeAllDevice(clientId); } catch (IllegalArgumentException e) { log.error("MQTT批量取消订阅失败:{}", e.getMessage()); // 异常时返回空列表,避免前端解析失败 @@ -112,7 +109,7 @@ public class MqttController { @Log(title = "手动触发MQTT重连", businessType = BusinessType.OTHER) public String manualReconnect() { try { - return mqttClientManager.manualReconnect(); + return mqttMessageHandler.manualReconnect(); } catch (Exception e) { log.error("MQTT手动重连异常", e); return "手动重连失败:" + e.getMessage(); @@ -127,7 +124,7 @@ public class MqttController { @Log(title = "手动触发MQTT重连", businessType = BusinessType.SELECT) public String getMqttStatus() { try { - return mqttClientManager.getMqttStatus(); + return mqttMessageHandler.getMqttStatus(); } catch (Exception e) { log.error("查询MQTT连接状态异常", e); return "查询状态失败:" + e.getMessage(); diff --git a/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java b/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java index 477f52b..82f04fe 100644 --- a/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java +++ b/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java @@ -122,10 +122,10 @@ public class MqttConfig { connectOptions.setKeepAliveInterval(keepAlive); // 关闭清除会话:false=重连后保留订阅关系(若不需要离线消息可设为true) // 优化:生产环境建议设为false,重连后保留订阅关系,避免丢失离线消息 - connectOptions.setCleanSession(false); - + connectOptions.setCleanSession(true); + connectOptions.setMaxInflight(200); // 开启自动重连:连接断开后自动尝试重连,提升稳定性(方案A核心) - connectOptions.setAutomaticReconnect(false); + connectOptions.setAutomaticReconnect(true); // 设置最大重连间隔(秒):避免频繁重连消耗资源 connectOptions.setMaxReconnectDelay(30); return connectOptions; diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceStatusHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceStatusHandler.java deleted file mode 100644 index aa22e10..0000000 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceStatusHandler.java +++ /dev/null @@ -1,235 +0,0 @@ -package com.agri.framework.interceptor; - -import com.agri.framework.config.MqttConfig; -import com.agri.framework.manager.MqttAutoOffManager; -import com.agri.framework.manager.MqttSubscriptionManager; -import com.agri.system.domain.SysAgriLimit; -import com.agri.system.service.ISysAgriLimitService; -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONObject; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.stereotype.Component; -import org.springframework.util.StringUtils; - -import javax.annotation.Resource; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; - -/** - * 设备状态消息处理器 - * 核心功能: - * 1. 处理设备状态上报、设备回执消息 - * 2. 触发自动关闭任务、取消自动关闭任务 - * 3. 转发设备状态到订阅的前端 - * 4. 维护设备最新状态缓存 - * 适配JDK 8,无心跳包相关逻辑 - */ -@Component -public class DeviceStatusHandler { - /** - * 优化:统一使用SLF4J日志(JDK 8兼容) - */ - private static final Logger log = LoggerFactory.getLogger(DeviceStatusHandler.class); - - /** - * Redis模板,用于存储订阅关系、设备在线状态、分布式锁 - */ - @Resource - private StringRedisTemplate stringRedisTemplate; - - /** - * MQTT消息发送工具类(由MqttConfig配置类注入) - */ - @Resource - private MqttConfig.MqttMessageSender mqttMessageSender; - - /** - * MQTT订阅关系管理器,处理批量Redis操作 - */ - @Resource - private MqttSubscriptionManager mqttSubscriptionManager; - - /** - * 自动关任务管理器,调度/取消自动关任务 - */ - @Resource - private MqttAutoOffManager mqttAutoOffManager; - - /** - * 农业限制服务,查询设备自动关延迟配置 - */ - @Resource - private ISysAgriLimitService agriLimitService; - - // 新增:最新状态缓存TTL(设备每10秒上报一次,缓存一小段时间即可) - @Value("${spring.mqtt.latest-ttl-seconds:120}") - private int latestTtlSeconds; - - // 初始化映射(建议放在类初始化块/构造方法中,只初始化一次) - private static final Map> LIMIT_MAP = new HashMap<>(); - private static final Set VALID_FUNC_CODES = new HashSet<>(); - static { - LIMIT_MAP.put("jm1g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1gLimit()))); - LIMIT_MAP.put("jm2g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2gLimit()))); - LIMIT_MAP.put("jbg", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbgLimit()))); - LIMIT_MAP.put("jm3g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3gLimit()))); - LIMIT_MAP.put("jm2k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2kLimit()))); - LIMIT_MAP.put("jm3k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3kLimit()))); - LIMIT_MAP.put("jbk", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbkLimit()))); - LIMIT_MAP.put("jm1k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1kLimit()))); - - VALID_FUNC_CODES.add("jm1g"); - VALID_FUNC_CODES.add("jm2g"); - VALID_FUNC_CODES.add("jbg"); - VALID_FUNC_CODES.add("jm3g"); - VALID_FUNC_CODES.add("jm2k"); - VALID_FUNC_CODES.add("jm3k"); - VALID_FUNC_CODES.add("jbk"); - VALID_FUNC_CODES.add("jm1k"); - } - - /** - * 处理设备状态:转发给订阅的前端、处理回执、触发自动关 - */ - public void handle(String topic, String payload) throws MqttException { - // 第一步:解析JSON,非有效JSON直接return - JSONObject payloadObj; - try { - payloadObj = JSON.parseObject(payload); - } catch (Exception e) { - log.error("【设备处理】JSON解析失败,payload={}", payload, e); - return; - } - if (payloadObj == null || payloadObj.isEmpty()) { - log.warn("【设备处理】JSON解析后为空,payload={}", payload); - return; - } - -// log.info("【设备处理】JSON解析:{}",payloadObj); - // 解析设备ID:主题格式为dtu/{deviceId}/up,分割后第2个元素是设备ID - String deviceId = topic.split("/")[1]; - - // 第二步:判断是否为设备回执({"suc":true/false,"prop":{"功能码":指令}}) - String funcType = null; - Integer funcValue = null; - boolean isAck = false; - // 新增:标记是否需要执行自动关任务(全局可用) - - // 第二步:设备回执处理逻辑(完全移除Redis写入) - if (payloadObj.containsKey("suc") && payloadObj.containsKey("prop")) { - isAck = true; - JSONObject propObj = payloadObj.getJSONObject("prop"); - if (propObj != null && !propObj.isEmpty()) { - // 提取prop中的第一个功能码 - Map.Entry propEntry = propObj.entrySet().iterator().next(); - funcType = propEntry.getKey(); - try { - funcValue = Integer.parseInt(String.valueOf(propEntry.getValue())); - } catch (Exception ignore) { - } - - // 释放对应功能的分布式锁 - String lockKey = "lock:" + deviceId + ":" + funcType; - Boolean delete = stringRedisTemplate.delete(lockKey); - if (propObj.size() > 1) { - log.warn("【设备回执】prop包含多个功能码,仅处理第一个:{}", propObj.keySet()); - } - log.info("【设备回执】设备{}的{}功能执行完成,已释放锁:{},{}", deviceId, funcType, lockKey, delete); - - // 回执成功且值=1时启动自动关闭任务(保留原有逻辑) - boolean suc = payloadObj.getBooleanValue("suc"); - if (suc && StringUtils.hasText(funcType) && funcValue != null && funcValue == 1) { - SysAgriLimit agriLimit = agriLimitService.lambdaQuery() - .eq(SysAgriLimit::getImei, deviceId) - .one(); - int autoOffSeconds = 0; - if (agriLimit != null) { - autoOffSeconds = LIMIT_MAP.getOrDefault(funcType, k -> 0).apply(agriLimit); - } - // 新增:判断是否真的需要执行自动关任务(延迟秒数>0才是有效任务) - if (autoOffSeconds > 0) { - mqttAutoOffManager.scheduleAutoOff(deviceId, funcType, autoOffSeconds); - log.debug("【自动关任务】标记需要执行,deviceId={}, funcType={}, delay={}s", deviceId, funcType, autoOffSeconds); - } - } - - if (suc && StringUtils.hasText(funcType) && funcValue != null && funcValue == 0) { - mqttAutoOffManager.cancelAutoOff(deviceId, funcType); - } - } - } - - // 第三步:仅处理非回执的设备状态包,且仅当是8个功能码结构就写入Redis - // 有没有人订阅都得写,只要发送设备开的指令成功了就得写 - if (!isAck) { - // 1) 先校验状态包是否包含8个固定功能码(核心:只有这种结构才写入) - boolean isValidStatus = true; - for (String validCode : VALID_FUNC_CODES) { - if (!payloadObj.containsKey(validCode)) { - isValidStatus = false; - // log.debug("【设备状态包】结构不合法(非8个功能码),跳过Redis写入,deviceId={};payload={}", deviceId, payload); - break; - } - } - if (mqttAutoOffManager.hasAutoOffTask(deviceId) && isValidStatus) { - // ✅ 8个功能码状态包:无条件写device:latest:{deviceId},避免自动关读不到最新状态 - stringRedisTemplate.opsForValue().set( - "device:latest:" + deviceId, - payload, // 完整的8功能码JSON - latestTtlSeconds, - TimeUnit.SECONDS - ); - log.debug("【设备状态包】写入Redis成功,deviceId={}", deviceId); - } - } - // 非回执消息:正常转发给订阅前端 - // 查询Redis中订阅该设备的前端列表:sub:{deviceId} - Set subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId); - - if (subscribedClients != null && !subscribedClients.isEmpty()) { - // 推送给每个订阅的前端 - // 方案B:不再依赖online:;改为校验subc:{clientId}是否仍包含deviceId(取消订阅失败/异常退出兜底) - List clients = new ArrayList<>(subscribedClients); - // 判断subc是否还存在 一次性查全部 获取失效的clientId - List stillSubs = mqttSubscriptionManager.pipeIsMemberSubc(clients, deviceId); - - // 关系不存在:清理sub:{deviceId}残留,避免一直给前端发 - List stale = null; - - for (int i = 0; i < clients.size(); i++) { - String clientId = clients.get(i); - boolean stillSub = i < stillSubs.size() && Boolean.TRUE.equals(stillSubs.get(i)); - if (!stillSub) { - if (stale == null) { - stale = new ArrayList<>(); - } - // false不存在添加队列 - stale.add(clientId); - continue; - } - // 前端专属主题:frontend/{clientId}/dtu/{deviceId}/listener - String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; - // 发布消息 - mqttMessageSender.publish(frontendTopic, payload); - log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic); - } - // 删掉设备对应的客户端 - if (stale != null && !stale.isEmpty()) { - mqttSubscriptionManager.pipeSRemSub(deviceId, stale); - } - } else { - // 优化:替换System.out为log.info - // log.info("【设备状态转发】设备{}无订阅前端,跳过转发", deviceId); - } - } -} \ No newline at end of file diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/FrontendControlHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/FrontendControlHandler.java deleted file mode 100644 index f852352..0000000 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/FrontendControlHandler.java +++ /dev/null @@ -1,127 +0,0 @@ -package com.agri.framework.interceptor; - -import com.agri.framework.config.MqttConfig; -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.TypeReference; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.stereotype.Component; -import org.springframework.util.StringUtils; - -import javax.annotation.Resource; -import java.time.LocalDateTime; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -/** - * 前端控制指令处理器 - * 核心功能: - * 1. 解析前端控制指令,校验入参合法性 - * 2. 前端操作设备权限校验 - * 3. 分布式锁控制,避免同设备同功能并发指令 - * 4. 转发前端指令到设备端 - * 适配JDK 8,无心跳包相关逻辑 - */ -@Component -public class FrontendControlHandler { - /** - * 优化:统一使用SLF4J日志(JDK 8兼容) - */ - private static final Logger log = LoggerFactory.getLogger(FrontendControlHandler.class); - - /** - * Redis模板,用于存储订阅关系、设备在线状态、分布式锁 - */ - @Resource - private StringRedisTemplate stringRedisTemplate; - - /** - * MQTT消息发送工具类(由MqttConfig配置类注入) - */ - @Resource - private MqttConfig.MqttMessageSender mqttMessageSender; - - /** - * 处理前端控制指令:权限校验+分布式锁+转发给设备 - */ - public void handle(String topic, String payload) throws MqttException { - // 解析前端clientId、设备ID - String[] parts = topic.split("/"); - String clientId = parts[1]; - String deviceId = parts[3]; - - // 新增:入参非空校验(JDK 8兼容) - if (!StringUtils.hasText(clientId) || !StringUtils.hasText(deviceId)) { - log.error("【指令处理】clientId或deviceId为空,topic={}", topic); - return; - } - - // 解析功能码({"功能码":状态码}格式) - Map funcCodeMap = null; - try { - funcCodeMap = JSON.parseObject(payload, new TypeReference>() { - }); - } catch (Exception e) { - log.error("【指令处理】功能码解析失败,payload={}", payload, e); - // String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; - // mqttMessageSender.publish(errorTopic, "{\"msg\":\"指令格式错误\"}"); - return; - } - if (funcCodeMap == null || funcCodeMap.isEmpty()) { - // String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; - // mqttMessageSender.publish(errorTopic, "{\"msg\":\"功能码不能为空\"}"); - log.warn("【指令处理】前端{}操作设备{}失败:功能码为空", clientId, deviceId); - return; - } - // 提取第一个功能码作为锁标识 - String funcType = funcCodeMap.keySet().iterator().next(); - - // 1. 权限校验(示例:admin开头有全权限) - if (!checkPermission(clientId, deviceId)) { - String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; - mqttMessageSender.publish(errorTopic, "{\"msg\":\"无设备操作权限\"}"); - log.warn("【权限校验】前端{}操作设备{}失败", clientId, deviceId); - return; - } - - // 2. 分布式锁:设备ID+功能类型(避免同设备同功能并发控制) - String lockKey = "lock:" + deviceId + ":" + funcType; - Boolean lockSuccess = stringRedisTemplate.opsForValue().setIfAbsent( - lockKey, clientId, 15, TimeUnit.SECONDS // 延长至15秒,适配设备回执场景 - ); - if (lockSuccess == null || !lockSuccess) { - String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; - mqttMessageSender.publish(errorTopic, "{\"msg\":\"设备" + funcType + "功能忙,请稍后重试\"}"); - log.warn("【分布式锁】前端{}操作设备{}的{}功能失败", clientId, deviceId, funcType); - return; - } - - // 3. 记录日志 - log.info("【指令处理】前端{}于{}控制设备{}的{}功能,指令:{}", - clientId, LocalDateTime.now(), deviceId, funcType, payload); - - // 4. 转发指令到设备 - String deviceTopic = "dtu/" + deviceId + "/down"; - //todo - mqttMessageSender.publish(deviceTopic, payload); - log.info("【指令转发】前端{} → 设备{}的{}功能", clientId, deviceId, funcType); - } - - /** - * 权限校验逻辑(示例) - * 可根据业务需求扩展: - * 1. 管理员前端(clientId以admin_开头)拥有所有权限 - * 2. 普通前端仅能操作Redis中绑定的设备(user_device:{clientId} → deviceId集合) - * - * @param clientId 前端唯一标识 - * @param deviceId 设备ID - * @return true=有权限,false=无权限 - */ - private boolean checkPermission(String clientId, String deviceId) { - // 管理员权限:clientId以admin_开头 - // 普通用户权限:校验Redis中是否绑定该设备 - return Boolean.TRUE; - } -} \ No newline at end of file diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/FrontendOnlineHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/FrontendOnlineHandler.java deleted file mode 100644 index 1d926b0..0000000 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/FrontendOnlineHandler.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.agri.framework.interceptor; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.stereotype.Component; -import org.springframework.util.StringUtils; - -import javax.annotation.Resource; -import java.util.concurrent.TimeUnit; - -/** - * 前端在线心跳处理器 - * 核心功能: - * 1. 处理前端在线心跳消息 - * 2. 续期前端订阅关系TTL,兜底异常退出场景 - * 适配JDK 8,无心跳包相关逻辑 - */ -@Component -public class FrontendOnlineHandler { - /** - * 优化:统一使用SLF4J日志(JDK 8兼容) - */ - private static final Logger log = LoggerFactory.getLogger(FrontendOnlineHandler.class); - - /** - * Redis模板,用于存储订阅关系、设备在线状态、分布式锁 - */ - @Resource - private StringRedisTemplate stringRedisTemplate; - - // 新增:前端订阅关系TTL(兜底“取消订阅失败/异常退出”)——只维护subc:{clientId}的TTL - @Value("${spring.mqtt.subc-ttl-seconds:3600}") - private int subcTtlSeconds; - - /** - * 新增:处理前端在线心跳:写入Redis在线标记(带TTL) - * 主题格式:frontend/{clientId}/online - */ - public void handle(String topic, String payload) { - try { - String[] parts = topic.split("/"); - if (parts.length < 3) { - return; - } - String clientId = parts[1]; - if (!StringUtils.hasText(clientId)) { - return; - } - - // 续期subc:{clientId} - stringRedisTemplate.expire("subc:" + clientId, subcTtlSeconds, TimeUnit.SECONDS); - - // todo 生产环境不建议打印每次心跳 -// log.debug("【在线心跳】clientId={} 续期subcTTL={}s payload={}", clientId, subcTtlSeconds, payload); - } catch (Exception e) { - log.warn("【在线心跳】处理失败 topic={} msg={}", topic, e.getMessage()); - } - } -} \ No newline at end of file diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java new file mode 100644 index 0000000..3529bc7 --- /dev/null +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java @@ -0,0 +1,1115 @@ +package com.agri.framework.interceptor; + +import com.agri.common.utils.SecurityUtils; +import com.agri.framework.config.MqttConfig; +import com.agri.system.domain.SysAgriInfo; +import com.agri.system.domain.SysAgriLimit; +import com.agri.system.service.ISysAgriInfoService; +import com.agri.system.service.ISysAgriLimitService; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.alibaba.fastjson2.TypeReference; +import org.apache.commons.collections4.CollectionUtils; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.SmartLifecycle; +import org.springframework.dao.DataAccessException; +import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.core.RedisCallback; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.serializer.RedisSerializer; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; + +import javax.annotation.Resource; +import java.nio.charset.StandardCharsets; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * + * * 前端监听: "frontend/" + clientId + "/dtu/" + deviceId + "/listener" + * * 前端发布主题:frontend/+/control/+ + * MQTT消息处理器(无心跳包版本) + * 核心功能: + * 1. 订阅设备状态、前端控制指令主题 + * 2. 管理前端-设备订阅关系(Redis) + * 3. 转发设备状态到订阅的前端 + * 4. 处理前端控制指令(权限校验+分布式锁+转发) + * 适配JDK 8,无心跳包相关逻辑 + * + * 改造点:自动关闭任务改为多线程并行执行 + */ +@Component +public class MqttMessageHandler implements SmartLifecycle { + + /** + * MQTT客户端(由MqttConfig配置类注入) + */ + @Resource + private MqttClient mqttClient; + + /** + * MQTT消息发送工具类(由MqttConfig配置类注入) + */ + @Resource + private MqttConfig.MqttMessageSender mqttMessageSender; + + /** + * Redis模板,用于存储订阅关系、设备在线状态、分布式锁 + */ + @Resource + private StringRedisTemplate stringRedisTemplate; + + // 读取配置文件中的默认订阅主题(移除心跳主题) + @Value("${spring.mqtt.default-topic}") + private String defaultTopic; + + // 新增:最新状态缓存TTL(设备每10秒上报一次,缓存一小段时间即可) + @Value("${spring.mqtt.latest-ttl-seconds:120}") + private int latestTtlSeconds; + + // 新增:自动关闭任务线程池核心线程数(可配置) + @Value("${spring.mqtt.auto-off-thread-pool-size:5}") + private int autoOffThreadPoolSize; + + // 新增:前端订阅关系TTL(兜底“取消订阅失败/异常退出”)——只维护subc:{clientId}的TTL + @Value("${spring.mqtt.subc-ttl-seconds:3600}") + private int subcTtlSeconds; + + // 优化:统一使用SLF4J日志(JDK 8兼容) + private static final Logger log = LoggerFactory.getLogger(MqttMessageHandler.class); + + // 新增:生命周期管理标识,控制MQTT客户端启动/关闭 + private final AtomicBoolean isRunning = new AtomicBoolean(false); + + private final ThreadPoolExecutor mqttBizPool = + new ThreadPoolExecutor( + 8, // core + 16, // max + 60, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(2000), // 有界、较小 + r -> { + Thread t = new Thread(r); + t.setName("mqtt-biz-" + t.getId()); + t.setDaemon(true); + return t; + }, + new ThreadPoolExecutor.DiscardPolicy() // 直接丢 + ); + /** + * MQTT连接配置项(从MqttConfig注入) + */ + @Resource + private MqttConnectOptions mqttConnectOptions; + + // 改造:将单线程池改为固定线程池,支持多任务并行执行 + // 替代原有的 Executors.newSingleThreadScheduledExecutor() + private ScheduledExecutorService autoOffExecutor; + + // 新增:同设备同功能只保留最后一次自动关任务 + private final ConcurrentHashMap> autoOffFutureMap = new ConcurrentHashMap<>(); + + // 新增:按设备维度统计“未完成的自动关任务”数量,hasAutoOffTask从扫描O(N)降为O(1) + private final ConcurrentHashMap autoOffDeviceCnt = new ConcurrentHashMap<>(); + + @Autowired + private ISysAgriLimitService agriLimitService; + + @Autowired + private ISysAgriInfoService agriInfoService; + // 初始化映射(建议放在类初始化块/构造方法中,只初始化一次) + private static final Map> LIMIT_MAP = new HashMap<>(); + private static final Set VALID_FUNC_CODES = new HashSet<>(); + static { + LIMIT_MAP.put("jm1g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1gLimit()))); + LIMIT_MAP.put("jm2g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2gLimit()))); + LIMIT_MAP.put("jbg", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbgLimit()))); + LIMIT_MAP.put("jm3g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3gLimit()))); + LIMIT_MAP.put("jm2k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2kLimit()))); + LIMIT_MAP.put("jm3k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3kLimit()))); + LIMIT_MAP.put("jbk", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbkLimit()))); + LIMIT_MAP.put("jm1k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1kLimit()))); + + VALID_FUNC_CODES.add("jm1g"); + VALID_FUNC_CODES.add("jm2g"); + VALID_FUNC_CODES.add("jbg"); + VALID_FUNC_CODES.add("jm3g"); + VALID_FUNC_CODES.add("jm2k"); + VALID_FUNC_CODES.add("jm3k"); + VALID_FUNC_CODES.add("jbk"); + VALID_FUNC_CODES.add("jm1k"); + } + + /** + * 初始化:订阅主题+设置回调 + * (移除@PostConstruct,改为由SmartLifecycle的start()触发) + *

+ * 【方案A】不做自写重连;Paho会在连接断开后自动重连(前提:connectOptions.setAutomaticReconnect(true)) + */ + public void subscribeTopics() throws MqttException { + // 关键补充1:判空 + if (mqttClient == null) { + log.error("【MQTT初始化】客户端实例为空,无法订阅主题"); + throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED); + } + + // 关键补充:先确保客户端已连接(如果没连则连接,已连则跳过) + // 注意:这里只使用同一个client实例,避免sender与handler使用不同client + if (!mqttClient.isConnected()) { + try { + // 使用注入的连接配置项连接Broker(带用户名密码、自动重连等配置) + mqttClient.connect(mqttConnectOptions); + log.info("【MQTT连接】客户端已成功连接到Broker,clientId:{}", mqttClient.getClientId()); + } catch (MqttException e) { + log.error("【MQTT连接】连接Broker失败,clientId:{}", mqttClient.getClientId(), e); + throw e; + } + } + + // 解析配置的主题列表 + String[] topics = defaultTopic.split(","); + int[] qosArray = new int[topics.length]; + // 按主题类型设置QoS:控制指令/状态用QoS 1 + for (int i = 0; i < topics.length; i++) { + qosArray[i] = 0; + topics[i] = topics[i].trim(); + } + + // 设置MQTT消息回调:处理连接断开、消息接收、消息发布完成 + mqttClient.setCallback(new MqttCallbackExtended() { + @Override + public void connectComplete(boolean reconnect, String serverURI) { + log.info("【MQTT连接完成】reconnect={}, serverURI={}, clientId={}", + reconnect, serverURI, safeClientId()); + try { + mqttClient.subscribe(topics, qosArray); + log.info("【MQTT订阅恢复】topics={}", String.join(",", topics)); + } catch (Exception e) { + log.error("【MQTT订阅恢复失败】", e); + } + } + + /** + * MQTT连接断开回调 + * @param cause 断开原因 + */ + @Override + public void connectionLost(Throwable cause) { + log.info("autoReconnect={}, cleanSession={}, keepAlive={}", + mqttConnectOptions.isAutomaticReconnect(), + mqttConnectOptions.isCleanSession(), + mqttConnectOptions.getKeepAliveInterval()); + + log.error("【MQTT连接异常】连接断开,clientId:{},原因:{}", + safeClientId(), (cause == null ? "unknown" : cause.getMessage()), cause); + + // 【方案A】不再触发自写重连;Paho自动重连会接管重连过程 + // 这里只记录日志即可 + if (isRunning.get()) { + log.warn("【MQTT自动重连】已开启automaticReconnect,等待Paho自动重连..."); + } + } + + /** + * 收到MQTT消息回调:核心处理入口 + * @param topic 消息主题 + * @param message 消息内容 + * @throws Exception 消息处理异常 + */ + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + + final String payload = new String(message.getPayload(), StandardCharsets.UTF_8); + if (message.isRetained()) { + log.info("ignore retained snapshot: {}", topic); + return; + } + + mqttBizPool.execute(() -> { + log.debug("mqttBizPool active={}, queue={}", + mqttBizPool.getActiveCount(), + mqttBizPool.getQueue().size()); + try { + // 优化:显式指定UTF-8编码,避免乱码(JDK 8兼容) + handleMessage(topic, payload); + } catch (Exception e) { + log.error("【MQTT消息处理异常】topic={}, payload={}", topic, payload, e); + } + }); + + } + + /** + * 消息发布完成回调(仅日志记录,无业务逻辑) + * @param token 发布令牌 + */ + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + if (token != null && token.getTopics() != null && token.getTopics().length > 0) { + // log.info("【MQTT确认】消息发布完成,clientId:{},主题:{}", safeClientId(), token.getTopics()[0]); + } + } + }); + + // 【方案A关键点】不再 unsubscribe 主题 + // cleanSession=false + unsubscribe 会破坏Broker侧会话订阅;并且自动重连场景更不建议这么做 + + // 订阅主题 + // mqttClient.subscribe(topics, qosArray); + // 优化:打印clientId,方便排查 + log.info("【MQTT初始化】订阅主题完成,clientId:{},订阅主题:{}", mqttClient.getClientId(), String.join(",", topics)); + } + + private String safeClientId() { + try { + return (mqttClient == null ? "null" : mqttClient.getClientId()); + } catch (Exception e) { + return "unknown"; + } + } + + /** + * 消息分发处理:根据主题类型路由到不同处理方法\仅处理设备状态、前端控制指令 + * 可以监听到设备传过来的业务数据 以及前端传过来的控制设备指令 + * + * @param topic 消息主题 + * @param payload 消息内容(JSON字符串) + */ + private void handleMessage(String topic, String payload) { + try { + // log.info("【MQTT接收】topic={}, payload={}", topic, payload); + + // 设备状态主题:dtu/{deviceId}/up + if (topic.matches("dtu/\\w+/up")) { + handleDeviceStatus(topic, payload); + } + // 处理前端控制指令主题:frontend/{clientId}/control/{deviceId} + else if (topic.matches("frontend/\\w+/control/\\w+")) { + handleFrontendControl(topic, payload); + } + // 新增:前端在线心跳主题:frontend/{clientId}/online + else if (topic.matches("frontend/\\w+/online")) { + handleFrontendOnline(topic, payload); + } + } catch (Exception e) { + log.error("【MQTT消息处理异常】topic={}", topic, e); + } + } + + /** + * 处理设备状态:转发给订阅的前端 + */ + private void handleDeviceStatus(String topic, String payload) throws MqttException { + // 第一步:解析JSON,非有效JSON直接return + JSONObject payloadObj; + try { + payloadObj = JSON.parseObject(payload); + } catch (Exception e) { + log.error("【设备处理】JSON解析失败,payload={}", payload, e); + return; + } + if (payloadObj == null || payloadObj.isEmpty()) { + log.warn("【设备处理】JSON解析后为空,payload={}", payload); + return; + } + +// log.info("【设备处理】JSON解析:{}",payloadObj); + // 解析设备ID:主题格式为dtu/{deviceId}/up,分割后第2个元素是设备ID + String deviceId = topic.split("/")[1]; + + // 第二步:判断是否为设备回执({"suc":true/false,"prop":{"功能码":指令}}) + String funcType = null; + Integer funcValue = null; + boolean isAck = false; + // 新增:标记是否需要执行自动关任务(全局可用) + + // 第二步:设备回执处理逻辑(完全移除Redis写入) + if (payloadObj.containsKey("suc") && payloadObj.containsKey("prop")) { + isAck = true; + JSONObject propObj = payloadObj.getJSONObject("prop"); + if (propObj != null && !propObj.isEmpty()) { + // 提取prop中的第一个功能码 + Map.Entry propEntry = propObj.entrySet().iterator().next(); + funcType = propEntry.getKey(); + try { + funcValue = Integer.parseInt(String.valueOf(propEntry.getValue())); + } catch (Exception ignore) { + } + + // 释放对应功能的分布式锁 + String lockKey = "lock:" + deviceId + ":" + funcType; + Boolean delete = stringRedisTemplate.delete(lockKey); + if (propObj.size() > 1) { + log.warn("【设备回执】prop包含多个功能码,仅处理第一个:{}", propObj.keySet()); + } + log.info("【设备回执】设备{}的{}功能执行完成,已释放锁:{},{}", deviceId, funcType, lockKey, delete); + + // 回执成功且值=1时启动自动关闭任务(保留原有逻辑) + boolean suc = payloadObj.getBooleanValue("suc"); + if (suc && StringUtils.hasText(funcType) && funcValue != null && funcValue == 1) { + SysAgriLimit agriLimit = agriLimitService.lambdaQuery() + .eq(SysAgriLimit::getImei, deviceId) + .one(); + int autoOffSeconds = 0; + if (agriLimit != null) { + autoOffSeconds = LIMIT_MAP.getOrDefault(funcType, k -> 0).apply(agriLimit); + } + // 新增:判断是否真的需要执行自动关任务(延迟秒数>0才是有效任务) + if (autoOffSeconds > 0) { + scheduleAutoOff(deviceId, funcType, autoOffSeconds); + log.debug("【自动关任务】标记需要执行,deviceId={}, funcType={}, delay={}s", deviceId, funcType, autoOffSeconds); + } + } + + if (suc && StringUtils.hasText(funcType) && funcValue != null && funcValue == 0) { + cancelAutoOff(deviceId, funcType); + } + } + } + + // 第三步:仅处理非回执的设备状态包,且仅当是8个功能码结构就写入Redis + // 有没有人订阅都得写,只要发送设备开的指令成功了就得写 + if (!isAck) { + // 1) 先校验状态包是否包含8个固定功能码(核心:只有这种结构才写入) + boolean isValidStatus = true; + for (String validCode : VALID_FUNC_CODES) { + if (!payloadObj.containsKey(validCode)) { + isValidStatus = false; + // log.debug("【设备状态包】结构不合法(非8个功能码),跳过Redis写入,deviceId={};payload={}", deviceId, payload); + break; + } + } + if (hasAutoOffTask(deviceId) && isValidStatus) { + // ✅ 8个功能码状态包:无条件写device:latest:{deviceId},避免自动关读不到最新状态 + stringRedisTemplate.opsForValue().set( + "device:latest:" + deviceId, + payload, // 完整的8功能码JSON + latestTtlSeconds, + TimeUnit.SECONDS + ); + log.debug("【设备状态包】写入Redis成功,deviceId={}", deviceId); + } + } + // 非回执消息:正常转发给订阅前端 + // 查询Redis中订阅该设备的前端列表:sub:{deviceId} + Set subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId); + + if (subscribedClients != null && !subscribedClients.isEmpty()) { + // 推送给每个订阅的前端 + // 方案B:不再依赖online:;改为校验subc:{clientId}是否仍包含deviceId(取消订阅失败/异常退出兜底) + List clients = new ArrayList<>(subscribedClients); + // 判断subc是否还存在 一次性查全部 获取失效的clientId + List stillSubs = pipeIsMemberSubc(clients, deviceId); + + // 关系不存在:清理sub:{deviceId}残留,避免一直给前端发 + List stale = null; + + for (int i = 0; i < clients.size(); i++) { + String clientId = clients.get(i); + boolean stillSub = i < stillSubs.size() && Boolean.TRUE.equals(stillSubs.get(i)); + if (!stillSub) { + if (stale == null) { + stale = new ArrayList<>(); + } + // false不存在添加队列 + stale.add(clientId); + continue; + } + // 前端专属主题:frontend/{clientId}/dtu/{deviceId}/up + String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; + // 发布消息 + mqttMessageSender.publish(frontendTopic, payload); + + // log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic); + } + // 删掉设备对应的客户端 + if (stale != null && !stale.isEmpty()) { + pipeSRemSub(deviceId, stale); + } + } else { + // 优化:替换System.out为log.info + // log.info("【设备状态转发】设备{}无订阅前端,跳过转发", deviceId); + } + } + + // 新增:pipeline 批量 SISMEMBER subc:{clientId} deviceId(N次->1次往返) 拿取失效的client + private List pipeIsMemberSubc(List clientIds, String deviceId) { + if (clientIds == null || clientIds.isEmpty() || !StringUtils.hasText(deviceId)) { + return Collections.emptyList(); + } + + // ✅ 关键:不发“占位命令”;只对有效clientId发SISMEMBER,同时保证返回结果与入参严格对齐 + int n = clientIds.size(); + // 创建长度n全部false的队列 + List out = new ArrayList<>(Collections.nCopies(n, Boolean.FALSE)); + List idx = new ArrayList<>(n); + + for (int i = 0; i < n; i++) { + if (StringUtils.hasText(clientIds.get(i))) { + // 符合条件存进clientIds对应的索引 + idx.add(i); + } + } + if (idx.isEmpty()) { + return out; + } + + // 处理每个每个clientId的值是否还在 值存在rs中 执行 stringRedisTemplate.executePipelined + List rs = stringRedisTemplate.executePipelined((RedisCallback) connection -> { + RedisSerializer serializer = stringRedisTemplate.getStringSerializer(); + byte[] member = serializer.serialize(deviceId); + for (int i : idx) { + String clientId = clientIds.get(i); + // 存放命令 + connection.sIsMember(serializer.serialize("subc:" + clientId), member); + } + return null; + }); + + for (int j = 0; j < idx.size() && j < (rs == null ? 0 : rs.size()); j++) { + out.set(idx.get(j), Boolean.TRUE.equals(rs.get(j))); + } + return out; + } + + // 对应的subc不存在 删除对应的sub pipeline 批量 SREM sub:{deviceId} clientId(清理残留 N次->1次往返) + private void pipeSRemSub(String deviceId, List staleClientIds) { + if (!StringUtils.hasText(deviceId) || staleClientIds == null || staleClientIds.isEmpty()) { + return; + } + stringRedisTemplate.executePipelined((RedisCallback) connection -> { + RedisSerializer serializer = stringRedisTemplate.getStringSerializer(); + byte[] subKey = serializer.serialize("sub:" + deviceId); + for (String clientId : staleClientIds) { + if (StringUtils.hasText(clientId)) { + connection.sRem(subKey, serializer.serialize(clientId)); + } + } + return null; + }); + } + + // 新增:是否存在该设备的自动关任务 + private boolean hasAutoOffTask(String deviceId) { + if (!StringUtils.hasText(deviceId)) { + return false; + } + // ✅ O(1):不再扫描autoOffFutureMap;由scheduleAutoOff/runAutoOff维护计数 + Integer cnt = autoOffDeviceCnt.get(deviceId); + return cnt != null && cnt > 0; + } + + // 新增:自动关任务计数 +1(只维护deviceId维度,确保hasAutoOffTask为O(1)) + private void incAutoOffCnt(String deviceId) { + if (!StringUtils.hasText(deviceId)) { + return; + } + autoOffDeviceCnt.merge(deviceId, 1, (a, b) -> a + b); + } + + // 新增:自动关任务计数 -1(避免负数;归零则清理key,省内存) + private void decAutoOffCnt(String deviceId) { + if (!StringUtils.hasText(deviceId)) { + return; + } + autoOffDeviceCnt.compute(deviceId, (k, v) -> { + if (v == null || v <= 1) { + return null; + } + return v - 1; + }); + } + + // 改造:多线程执行自动关闭任务 + // 起个任务,固定多少秒-n秒,【监听最新的设备状态,如果还在运行】,发送设备关的指令 + private void scheduleAutoOff(String deviceId, String funcType, int delaySeconds) { + + // ✅ 防御:避免极端情况下线程池尚未初始化导致NPE + if (autoOffExecutor == null) { + log.warn("【自动关任务】线程池未初始化,跳过创建任务:deviceId={}, funcType={}", deviceId, funcType); + return; + } + + String taskKey = "autooff:" + deviceId + ":" + funcType; + + cancelAutoOff(deviceId,funcType); + + // 使用多线程池提交任务 + ScheduledFuture newFuture = autoOffExecutor.schedule(() -> { + try { + runAutoOff(deviceId, funcType); + } catch (Exception e) { + log.error("【自动关任务】执行失败,deviceId={}, funcType={}", deviceId, funcType, e); + } finally { + // 任务执行完成后移除映射 + autoOffFutureMap.remove(taskKey); + // ✅ 任务结束(成功/失败都算结束):减少该设备的“未完成任务数”,保证hasAutoOffTask准确 + decAutoOffCnt(deviceId); + } + }, delaySeconds, TimeUnit.SECONDS); + + // 保存新任务的引用 + autoOffFutureMap.put(taskKey, newFuture); + // ✅ 新任务创建成功:增加该设备的“未完成任务数” + incAutoOffCnt(deviceId); + + log.info("【自动关任务】已创建(多线程):deviceId={}, funcType={}, delay={}s", deviceId, funcType, delaySeconds); + } + + // 自动关闭任务的核心逻辑(无改动) + // 新增:读取最新状态(device:latest:{deviceId}),若仍为1则下发 {"funcType":0} 到 dtu/{id}/down + private void runAutoOff(String deviceId, String funcType) throws MqttException { + String latest = stringRedisTemplate.opsForValue().get("device:latest:" + deviceId); + if (!StringUtils.hasText(latest)) { + //todo + log.warn("【自动关任务】无最新状态,跳过:deviceId={}, funcType={}", deviceId, funcType); + return; + } + + JSONObject latestObj; + try { + latestObj = JSON.parseObject(latest); + } catch (Exception e) { + log.warn("【自动关任务】最新状态JSON解析失败,跳过:deviceId={}, funcType={}", deviceId, funcType); + return; + } + if (latestObj == null || latestObj.isEmpty()) { + return; + } + + // 设备每10秒上报的状态包:{"jm1k":0/1,...} 顶层字段直接取 + Integer current = null; + try { + if (latestObj.containsKey(funcType)) { + current = latestObj.getIntValue(funcType); + } + } catch (Exception ignore) { + } + + if (current != null && current == 1) { + // 新增:自动关也走分布式锁(避免与前端并发控制同一功能导致乱序/互相覆盖) + String lockKey = "lock:" + deviceId + ":" + funcType; + Boolean lockSuccess = stringRedisTemplate.opsForValue().setIfAbsent( + lockKey, "autooff", 15, TimeUnit.SECONDS + ); + if (lockSuccess == null || !lockSuccess) { + log.info("【自动关任务】{}功能忙(锁占用),跳过自动关闭:deviceId={}, funcType={}", funcType, deviceId, funcType); + return; + } + JSONObject down = new JSONObject(); + down.put(funcType, 0); + + String deviceTopic = "dtu/" + deviceId + "/down"; + //todo + mqttMessageSender.publish(deviceTopic, down.toJSONString()); + log.info("【自动关任务】检测仍在运行,已下发关闭:deviceId={}, funcType={}, payload={}", deviceId, funcType, down.toJSONString()); + } else { + log.info("【自动关任务】检测未运行或状态未知,跳过关闭:deviceId={}, funcType={}, current={}", deviceId, funcType, current); + } + } + + /** + * 处理前端控制指令:权限校验+分布式锁+转发给设备 + */ + private void handleFrontendControl(String topic, String payload) throws MqttException { + // 解析前端clientId、设备ID + String[] parts = topic.split("/"); + String clientId = parts[1]; + String deviceId = parts[3]; + + // 新增:入参非空校验(JDK 8兼容) + if (!StringUtils.hasText(clientId) || !StringUtils.hasText(deviceId)) { + log.error("【指令处理】clientId或deviceId为空,topic={}", topic); + return; + } + + // 解析功能码({"功能码":状态码}格式) + Map funcCodeMap = null; + try { + funcCodeMap = JSON.parseObject(payload, new TypeReference>() { + }); + } catch (Exception e) { + log.error("【指令处理】功能码解析失败,payload={}", payload, e); + // String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; + // mqttMessageSender.publish(errorTopic, "{\"msg\":\"指令格式错误\"}"); + return; + } + if (funcCodeMap == null || funcCodeMap.isEmpty()) { + // String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; + // mqttMessageSender.publish(errorTopic, "{\"msg\":\"功能码不能为空\"}"); + log.warn("【指令处理】前端{}操作设备{}失败:功能码为空", clientId, deviceId); + return; + } + // 提取第一个功能码作为锁标识 + String funcType = funcCodeMap.keySet().iterator().next(); + + // 1. 权限校验(示例:admin开头有全权限) + if (!checkPermission(clientId, deviceId)) { + String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; + mqttMessageSender.publish(errorTopic, "{\"msg\":\"无设备操作权限\"}"); + log.warn("【权限校验】前端{}操作设备{}失败", clientId, deviceId); + return; + } + + // 2. 分布式锁:设备ID+功能类型(避免同设备同功能并发控制) + String lockKey = "lock:" + deviceId + ":" + funcType; + Boolean lockSuccess = stringRedisTemplate.opsForValue().setIfAbsent( + lockKey, clientId, 15, TimeUnit.SECONDS // 延长至15秒,适配设备回执场景 + ); + if (lockSuccess == null || !lockSuccess) { + String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; + mqttMessageSender.publish(errorTopic, "{\"msg\":\"设备" + funcType + "功能忙,请稍后重试\"}"); + log.warn("【分布式锁】前端{}操作设备{}的{}功能失败", clientId, deviceId, funcType); + return; + } + + // 3. 记录日志 + log.info("【指令处理】前端{}于{}控制设备{}的{}功能,指令:{}", + clientId, LocalDateTime.now(), deviceId, funcType, payload); + + // 4. 转发指令到设备 + String deviceTopic = "dtu/" + deviceId + "/down"; + //todo + mqttMessageSender.publish(deviceTopic, payload); + log.info("【指令转发】前端{} → 设备{}的{}功能", clientId, deviceId, funcType); + } + + /** + * 新增:处理前端在线心跳:写入Redis在线标记(带TTL) + * 主题格式:frontend/{clientId}/online + */ + private void handleFrontendOnline(String topic, String payload) { + try { + String[] parts = topic.split("/"); + if (parts.length < 3) { + return; + } + String clientId = parts[1]; + if (!StringUtils.hasText(clientId)) { + return; + } + + // 续期subc:{clientId} + stringRedisTemplate.expire("subc:" + clientId, subcTtlSeconds, TimeUnit.SECONDS); + + // todo 生产环境不建议打印每次心跳 +// log.debug("【在线心跳】clientId={} 续期subcTTL={}s payload={}", clientId, subcTtlSeconds, payload); + } catch (Exception e) { + log.warn("【在线心跳】处理失败 topic={} msg={}", topic, e.getMessage()); + } + } + + /** + * 权限校验逻辑(示例) + * 可根据业务需求扩展: + * 1. 管理员前端(clientId以admin_开头)拥有所有权限 + * 2. 普通前端仅能操作Redis中绑定的设备(user_device:{clientId} → deviceId集合) + * + * @param clientId 前端唯一标识 + * @param deviceId 设备ID + * @return true=有权限,false=无权限 + */ + private boolean checkPermission(String clientId, String deviceId) { + // 管理员权限:clientId以admin_开头 + // 普通用户权限:校验Redis中是否绑定该设备 + return Boolean.TRUE; + } + + /** + * 前端订阅设备(Controller调用) + */ + public void subscribeDevice(String clientId, String deviceId) { + if (!StringUtils.hasText(clientId) || !StringUtils.hasText(deviceId)) { + log.error("【订阅管理】clientId或deviceId不能为空"); + throw new IllegalArgumentException("clientId和deviceId不能为空"); + } + + // 保存订阅关系到Redis + stringRedisTemplate.opsForSet().add("sub:" + deviceId, clientId); + stringRedisTemplate.opsForSet().add("subc:" + clientId, deviceId); + + // 新增:订阅成功后给subc设置TTL(兜底“取消订阅失败/异常退出”) + stringRedisTemplate.expire("subc:" + clientId, subcTtlSeconds, TimeUnit.SECONDS); + + log.info("【订阅管理】前端{}订阅设备{}成功", clientId, deviceId); + } + + /** + * 前端取消订阅设备状态接口(供Controller层调用) + * 逻辑:从设备的订阅列表移除前端clientId + * + * @param clientId 前端唯一标识 + * @param deviceId 设备ID + */ + public void unsubscribeDevice(String clientId, String deviceId) { + if (!StringUtils.hasText(clientId) || !StringUtils.hasText(deviceId)) { + log.error("【前端取消订阅】clientId或deviceId不能为空"); + throw new IllegalArgumentException("clientId和deviceId不能为空"); + } + + // 从Redis删除订阅关系 + stringRedisTemplate.opsForSet().remove("sub:" + deviceId, clientId); + stringRedisTemplate.opsForSet().remove("subc:" + clientId, deviceId); + log.info("【前端取消订阅】前端{}取消订阅设备{}成功", clientId, deviceId); + } + + /** + * 全量订阅:前端订阅指定用户名下的所有设备(Controller调用) + * @param clientId 前端唯一标识(如web_001、app_002) + * @return 订阅成功的设备数量 + */ + public int subscribeAllDeviceByUserId(String clientId) { + // 1. 入参校验 + if (!StringUtils.hasText(clientId)) { + log.error("【全量订阅】clientId不能为空"); + throw new IllegalArgumentException("clientId不能为空"); + } + + + // 2. Redis连接可用性校验 + try { + stringRedisTemplate.hasKey("test:connection"); + } catch (Exception e) { + log.warn("【全量订阅】Redis连接不可用,订阅操作跳过:{}", e.getMessage()); + return 0; + } + Long userId = SecurityUtils.getLoginUser().getUserId(); + // 3. 查询该用户名下的所有设备ID(替换为你的实际设备查询逻辑) + List deviceIds = new ArrayList<>(queryImeiByUserId(userId)); + if (userId == 1) { + deviceIds.add("862538065276061"); + } + if (deviceIds == null || deviceIds.isEmpty()) { + log.warn("【全量订阅】用户{}名下无可用设备", userId); + return 0; + } + // 过滤空设备ID,避免无效操作 + List validDeviceIds = deviceIds.stream() + .filter(StringUtils::hasText) + .distinct() + .collect(Collectors.toList()); + if (validDeviceIds.isEmpty()) { + log.warn("【全量订阅】用户{}名下无有效设备ID", userId); + return 0; + } + + // 4. 批量写入Redis订阅关系(兼容JDK 8的RedisCallback写法) + try { + stringRedisTemplate.execute(new RedisCallback() { + @Override + public Void doInRedis(RedisConnection connection) throws DataAccessException { + // 获取String序列化器(和stringRedisTemplate保持一致) + RedisSerializer serializer = stringRedisTemplate.getStringSerializer(); + + // 开启Redis事务 + connection.multi(); + byte[] clientIdBytes = serializer.serialize(clientId); + // 4.1 设备→前端:给每个设备的订阅集合添加clientId + for (String deviceId : validDeviceIds) { + byte[] subKey = serializer.serialize("sub:" + deviceId); + connection.sAdd(subKey, clientIdBytes); + } + + // 4.2 前端→设备:给前端的订阅集合批量添加所有设备ID + byte[] subcKey = serializer.serialize("subc:" + clientId); + byte[][] deviceIdBytesArray = new byte[validDeviceIds.size()][]; + for (int i = 0; i < validDeviceIds.size(); i++) { + deviceIdBytesArray[i] = serializer.serialize(validDeviceIds.get(i)); + } + connection.sAdd(subcKey, deviceIdBytesArray); + + // 新增:给subc设置TTL(兜底“取消订阅失败/异常退出”) + connection.expire(subcKey, subcTtlSeconds); + + // 执行事务 + connection.exec(); + return null; + } + }); + + log.info("【全量订阅】前端{}成功订阅用户{}名下的{}个设备,设备列表:{}", + clientId, userId, validDeviceIds.size(), validDeviceIds); + return validDeviceIds.size(); + } catch (Exception e) { + log.error("【全量------订阅】前端{}订阅用户{}名下设备失败", clientId, userId, e); + throw new RuntimeException("全量订阅失败:" + e.getMessage()); + } + } + + /** + * 全量取消:前端取消订阅的所有设备(即用户名下所有设备) + * @param clientId 前端唯一标识 + * @return 需要前端取消监听的MQTT主题列表 + */ + public List unsubscribeAllDevice(String clientId) { + // 1. 入参校验 + if (!StringUtils.hasText(clientId)) { + log.error("【全量取消】clientId不能为空"); + throw new IllegalArgumentException("clientId不能为空"); + } + + // 2. Redis连接可用性校验 + try { + stringRedisTemplate.hasKey("test:connection"); + } catch (Exception e) { + log.warn("【全量取消】Redis连接不可用,取消操作跳过:{}", e.getMessage()); + return Collections.emptyList(); + } + + // 3. 查询该前端订阅的所有设备ID(即用户名下所有设备) + Set deviceSet = stringRedisTemplate.opsForSet().members("subc:" + clientId); + if (deviceSet == null || deviceSet.isEmpty()) { + log.warn("【全量取消】前端{}无订阅的设备", clientId); + return Collections.emptyList(); + } + + // 4. 构建需要取消的MQTT主题列表 + List frontendTopics = new ArrayList<>(); + for (String deviceId : deviceSet) { + frontendTopics.add("frontend/" + clientId + "/dtu/" + deviceId + "/listener"); + } + + // 5. 批量删除Redis订阅关系(兼容JDK 8的RedisCallback写法) + try { + stringRedisTemplate.execute(new RedisCallback() { + @Override + public Void doInRedis(RedisConnection connection) throws DataAccessException { + RedisSerializer serializer = stringRedisTemplate.getStringSerializer(); + + // 开启事务 + connection.multi(); + byte[] clientIdBytes = serializer.serialize(clientId); + // 5.1 批量删除设备→前端的订阅关系 + for (String deviceId : deviceSet) { + byte[] subKey = serializer.serialize("sub:" + deviceId); + connection.sRem(subKey, clientIdBytes); + } + + // 5.2 删除前端→设备的反向索引(核心:清空该前端的所有订阅设备) + byte[] subcKey = serializer.serialize("subc:" + clientId); + connection.del(subcKey); + + // 执行事务 + connection.exec(); + return null; + } + }); + } catch (Exception e) { + log.error("【全量取消】Redis批量删除失败", e); + throw new RuntimeException("全量取消订阅失败:" + e.getMessage()); + } + log.info("【全量取消】前端{}成功取消{}个设备的订阅", clientId, deviceSet.size()); + return frontendTopics; + } + + /** + * 实际业务中:查询指定用户名下的所有设备ID(需替换为你的DAO/Service逻辑) + * @return 设备ID列表 + */ + private List queryImeiByUserId(Long userId) { + // 示例:替换为你项目中查询用户设备的实际代码 + // 比如:return deviceService.listDeviceIdsByUserId(userId); + List agriInfos = agriInfoService.lambdaQuery() + .eq(SysAgriInfo::getUserId, userId) + .list(); + if (CollectionUtils.isEmpty(agriInfos)) { + return Collections.emptyList(); + } + return agriInfos.stream().map(SysAgriInfo::getImei).collect(Collectors.toList()); + } + // ========== 手动重连接口(供Controller调用) ========== + + /** + * 手动触发MQTT重连(最小改动:不替换client实例,只用同一个client重连) + */ + public synchronized String manualReconnect() { + isRunning.set(true); + try { + // 强制断开旧连接(如果存在) + if (mqttClient != null && mqttClient.isConnected()) { + mqttClient.disconnect(); + } + // 重新初始化订阅(内部会connect + subscribe) + subscribeTopics(); + log.info("【手动重连】MQTT客户端重连成功"); + return "MQTT手动重连成功"; + } catch (MqttException e) { + log.error("【手动重连】MQTT客户端重连失败", e); + return "MQTT手动重连失败:" + e.getMessage(); + } + } + + /** + * 获取当前MQTT连接状态 + */ + public String getMqttStatus() { + boolean connected = (mqttClient != null && mqttClient.isConnected()); + String status = connected ? "已连接" : "已断开"; + return String.format("MQTT连接状态:%s;clientId:%s", status, safeClientId()); + } + + // ======================== SmartLifecycle 生命周期管理(核心修复) ======================== + + /** + * 启动MQTT客户端(Spring上下文初始化/重启时触发) + * 核心:替代@PostConstruct,保证上下文重启时重新初始化MQTT连接 + */ + @Override + public void start() { + log.info("开始监听"); + if (isRunning.compareAndSet(false, true)) { + try { + // 初始化多线程池(固定线程数) + autoOffExecutor = new ScheduledThreadPoolExecutor( + autoOffThreadPoolSize, // 核心线程数 + r -> { + Thread thread = new Thread(r); + thread.setName("auto-off-task-" + thread.getId()); + thread.setDaemon(true); // 设置为守护线程,不阻塞JVM退出 + return thread; + }, + new ThreadPoolExecutor.CallerRunsPolicy() // 队列压力或关闭时兜底不丢任务 + ); + + // 关键优化1:取消任务后立即从队列移除,避免队列堆积 + ((ScheduledThreadPoolExecutor) autoOffExecutor).setRemoveOnCancelPolicy(true); + + // 关键优化2:允许核心线程超时回收,空闲时省资源 + ((ScheduledThreadPoolExecutor) autoOffExecutor).setKeepAliveTime(60, TimeUnit.SECONDS); + ((ScheduledThreadPoolExecutor) autoOffExecutor).allowCoreThreadTimeOut(true); + + // 核心修改:无论是否已连接,都执行订阅 + subscribeTopics(); + log.info("【MQTT生命周期】客户端启动成功(已设置回调+订阅主题),自动关闭任务线程池大小:{}", autoOffThreadPoolSize); + } catch (MqttException e) { + log.error("【MQTT生命周期】客户端启动失败", e); + isRunning.set(false); + } + } + } + + /** + * 停止MQTT客户端 + * 改造点:优化多线程池的优雅关闭 + */ + @Override + public void stop() { + if (isRunning.compareAndSet(true, false)) { + try { + // 1. 取消所有未执行的自动关闭任务 + for (Map.Entry> entry : autoOffFutureMap.entrySet()) { + entry.getValue().cancel(false); + log.debug("【自动关任务】取消任务:{}", entry.getKey()); + } + autoOffFutureMap.clear(); + // ✅ 停止时直接清空计数,避免残留 + autoOffDeviceCnt.clear(); + + // 2. 优雅关闭线程池 + if (autoOffExecutor != null) { + autoOffExecutor.shutdown(); + try { + // 等待3秒让任务完成 + if (!autoOffExecutor.awaitTermination(3, TimeUnit.SECONDS)) { + // 强制关闭 + autoOffExecutor.shutdownNow(); + log.warn("【自动关任务】线程池强制关闭"); + } + } catch (InterruptedException e) { + autoOffExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + log.info("【自动关任务】线程池已关闭"); + } + + // 3. 关闭MQTT客户端 + if (mqttClient != null) { + if (mqttClient.isConnected()) { + mqttClient.disconnect(); + } + mqttClient.close(); + log.info("【MQTT生命周期】客户端已优雅关闭"); + } + } catch (Exception e) { + log.error("【MQTT生命周期】客户端关闭失败", e); + } + } + } + + // 新增:收到“关”指令时,尝试取消对应自动关任务(优化:减少无意义任务执行;正确性仍以到点状态判断为准) + private void cancelAutoOff(String deviceId, String funcType) { + if (!StringUtils.hasText(deviceId) || !StringUtils.hasText(funcType)) { + return; + } + String taskKey = "autooff:" + deviceId + ":" + funcType; + // 同设备同功能只保留最后一次任务:只有旧任务还没开始时才替换 + ScheduledFuture oldFuture = autoOffFutureMap.get(taskKey); + if (oldFuture != null) { + // cancel=false 说明任务已开始/已完成,避免双执行:不再创建新任务 + if (!oldFuture.cancel(false)) { + return; + } + // cancel成功:旧任务不会跑了,这时再remove并减计数 + autoOffFutureMap.remove(taskKey, oldFuture); + decAutoOffCnt(deviceId); + } + } + + /** + * 异步停止 + */ + @Override + public void stop(Runnable callback) { + stop(); + callback.run(); + } + + /** + * 判断MQTT客户端是否运行中 + */ + @Override + public boolean isRunning() { + return isRunning.get(); + } + + /** + * 启动优先级(保证MQTT在Redis之后启动) + */ + @Override + public int getPhase() { + return 10; + } + + /** + * 是否自动启动(默认true,Spring上下文初始化时自动调用start()) + */ + @Override + public boolean isAutoStartup() { + return true; + } +} \ No newline at end of file diff --git a/agri-framework/src/main/java/com/agri/framework/manager/MqttAutoOffManager.java b/agri-framework/src/main/java/com/agri/framework/manager/MqttAutoOffManager.java deleted file mode 100644 index 48ff9c8..0000000 --- a/agri-framework/src/main/java/com/agri/framework/manager/MqttAutoOffManager.java +++ /dev/null @@ -1,265 +0,0 @@ -package com.agri.framework.manager; - -import com.agri.framework.config.MqttConfig; -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONObject; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.stereotype.Component; -import org.springframework.util.StringUtils; - -import javax.annotation.Resource; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/** - * 设备自动关任务管理器 - * 核心功能: - * 1. 初始化/优雅关闭自动关任务线程池 - * 2. 调度/取消设备功能自动关闭任务 - * 3. 执行自动关任务,校验设备最新状态 - * 4. 维护设备未完成自动关任务计数 - * 改造点:自动关闭任务改为多线程并行执行 - * 适配JDK 8,无心跳包相关逻辑 - */ -@Component -public class MqttAutoOffManager { - /** - * 优化:统一使用SLF4J日志(JDK 8兼容) - */ - private static final Logger log = LoggerFactory.getLogger(MqttAutoOffManager.class); - - /** - * Redis模板,用于存储订阅关系、设备在线状态、分布式锁 - */ - @Resource - private StringRedisTemplate stringRedisTemplate; - - /** - * MQTT消息发送工具类(由MqttConfig配置类注入) - */ - @Resource - private MqttConfig.MqttMessageSender mqttMessageSender; - - // 改造:将单线程池改为固定线程池,支持多任务并行执行 - // 替代原有的 Executors.newSingleThreadScheduledExecutor() - private ScheduledExecutorService autoOffExecutor; - - // 新增:同设备同功能只保留最后一次自动关任务 - private final ConcurrentHashMap> autoOffFutureMap = new ConcurrentHashMap<>(); - - // 新增:按设备维度统计“未完成的自动关任务”数量,hasAutoOffTask从扫描O(N)降为O(1) - private final ConcurrentHashMap autoOffDeviceCnt = new ConcurrentHashMap<>(); - - // 新增:自动关闭任务线程池核心线程数(可配置) - @Value("${spring.mqtt.auto-off-thread-pool-size:5}") - private int autoOffThreadPoolSize; - /** - * 初始化自动关任务线程池 - * @param corePoolSize 核心线程数 - */ - public void initExecutor(int corePoolSize) { - // 初始化多线程池(固定线程数) - autoOffExecutor = new ScheduledThreadPoolExecutor( - autoOffThreadPoolSize, // 核心线程数 - r -> { - Thread thread = new Thread(r); - thread.setName("auto-off-task-" + thread.getId()); - thread.setDaemon(true); // 设置为守护线程,不阻塞JVM退出 - return thread; - }, - new ThreadPoolExecutor.CallerRunsPolicy() // 队列压力或关闭时兜底不丢任务 - ); - - // 关键优化1:取消任务后立即从队列移除,避免队列堆积 - ((ScheduledThreadPoolExecutor) autoOffExecutor).setRemoveOnCancelPolicy(true); - - // 关键优化2:允许核心线程超时回收,空闲时省资源 - ((ScheduledThreadPoolExecutor) autoOffExecutor).setKeepAliveTime(60, TimeUnit.SECONDS); - ((ScheduledThreadPoolExecutor) autoOffExecutor).allowCoreThreadTimeOut(true); - - log.info("自动关任务线程池初始化完成,核心线程数={}", corePoolSize); - } - - /** - * 优雅关闭自动关任务线程池 - */ - public void shutdownExecutor() { - try { - // 1. 取消所有未执行的自动关闭任务 - for (Map.Entry> entry : autoOffFutureMap.entrySet()) { - entry.getValue().cancel(false); - log.debug("【自动关任务】取消任务:{}", entry.getKey()); - } - autoOffFutureMap.clear(); - // ✅ 停止时直接清空计数,避免残留 - autoOffDeviceCnt.clear(); - - // 2. 优雅关闭线程池 - if (autoOffExecutor != null) { - autoOffExecutor.shutdown(); - try { - // 等待3秒让任务完成 - if (!autoOffExecutor.awaitTermination(3, TimeUnit.SECONDS)) { - // 强制关闭 - autoOffExecutor.shutdownNow(); - log.warn("【自动关任务】线程池强制关闭"); - } - } catch (InterruptedException e) { - autoOffExecutor.shutdownNow(); - Thread.currentThread().interrupt(); - } - log.info("【自动关任务】线程池已关闭"); - } - } catch (Exception e) { - log.error("【自动关任务】线程池关闭失败", e); - } - } - - // 新增:是否存在该设备的自动关任务 - public boolean hasAutoOffTask(String deviceId) { - if (!StringUtils.hasText(deviceId)) { - return false; - } - // ✅ O(1)查询,无需扫描整个任务表 - Integer cnt = autoOffDeviceCnt.get(deviceId); - return cnt != null && cnt > 0; - } - - // 新增:自动关任务计数 +1(只维护deviceId维度,确保hasAutoOffTask为O(1)) - private void incAutoOffCnt(String deviceId) { - if (!StringUtils.hasText(deviceId)) { - return; - } - autoOffDeviceCnt.merge(deviceId, 1, (a, b) -> a + b); - } - - // 新增:自动关任务计数 -1(避免负数;归零则清理key,省内存) - private void decAutoOffCnt(String deviceId) { - if (!StringUtils.hasText(deviceId)) { - return; - } - autoOffDeviceCnt.compute(deviceId, (k, v) -> { - if (v == null || v <= 1) { - return null; - } - return v - 1; - }); - } - - // 改造:多线程执行自动关闭任务 - // 起个任务,固定多少秒-n秒,【监听最新的设备状态,如果还在运行】,发送设备关的指令 - public void scheduleAutoOff(String deviceId, String funcType, int delaySeconds) { - - // ✅ 防御:避免极端情况下线程池尚未初始化导致NPE - if (autoOffExecutor == null) { - log.warn("【自动关任务】线程池未初始化,跳过创建任务:deviceId={}, funcType={}", deviceId, funcType); - return; - } - - String taskKey = "autooff:" + deviceId + ":" + funcType; - - cancelAutoOff(deviceId,funcType); - - // 使用多线程池提交任务 - ScheduledFuture newFuture = autoOffExecutor.schedule(() -> { - try { - runAutoOff(deviceId, funcType); - } catch (Exception e) { - log.error("【自动关任务】执行失败,deviceId={}, funcType={}", deviceId, funcType, e); - } finally { - // 任务执行完成后移除映射 - autoOffFutureMap.remove(taskKey); - // ✅ 任务结束(成功/失败都算结束):减少该设备的“未完成任务数”,保证hasAutoOffTask准确 - decAutoOffCnt(deviceId); - } - }, delaySeconds, TimeUnit.SECONDS); - - // 保存新任务的引用 - autoOffFutureMap.put(taskKey, newFuture); - // ✅ 新任务创建成功:增加该设备的“未完成任务数” - incAutoOffCnt(deviceId); - - log.info("【自动关任务】已创建(多线程):deviceId={}, funcType={}, delay={}s", deviceId, funcType, delaySeconds); - } - - // 自动关闭任务的核心逻辑(无改动) - // 新增:读取最新状态(device:latest:{deviceId}),若仍为1则下发 {"funcType":0} 到 dtu/{id}/down - private void runAutoOff(String deviceId, String funcType) throws MqttException { - String latest = stringRedisTemplate.opsForValue().get("device:latest:" + deviceId); - if (!StringUtils.hasText(latest)) { - //todo - log.warn("【自动关任务】无最新状态,跳过:deviceId={}, funcType={}", deviceId, funcType); - return; - } - - JSONObject latestObj; - try { - latestObj = JSON.parseObject(latest); - } catch (Exception e) { - log.warn("【自动关任务】最新状态JSON解析失败,跳过:deviceId={}, funcType={}", deviceId, funcType); - return; - } - if (latestObj == null || latestObj.isEmpty()) { - return; - } - - // 设备每10秒上报的状态包:{"jm1k":0/1,...} 顶层字段直接取 - Integer current = null; - try { - if (latestObj.containsKey(funcType)) { - current = latestObj.getIntValue(funcType); - } - } catch (Exception ignore) { - } - - if (current != null && current == 1) { - // 新增:自动关也走分布式锁(避免与前端并发控制同一功能导致乱序/互相覆盖) - String lockKey = "lock:" + deviceId + ":" + funcType; - Boolean lockSuccess = stringRedisTemplate.opsForValue().setIfAbsent( - lockKey, "autooff", 15, TimeUnit.SECONDS - ); - if (lockSuccess == null || !lockSuccess) { - log.info("【自动关任务】{}功能忙(锁占用),跳过自动关闭:deviceId={}, funcType={}", funcType, deviceId, funcType); - return; - } - JSONObject down = new JSONObject(); - down.put(funcType, 0); - - String deviceTopic = "dtu/" + deviceId + "/down"; - //todo - mqttMessageSender.publish(deviceTopic, down.toJSONString()); - log.info("【自动关任务】检测仍在运行,已下发关闭:deviceId={}, funcType={}, payload={}", deviceId, funcType, down.toJSONString()); - } else { - log.info("【自动关任务】检测未运行或状态未知,跳过关闭:deviceId={}, funcType={}, current={}", deviceId, funcType, current); - } - } - - // 新增:收到“关”指令时,尝试取消对应自动关任务(优化:减少无意义任务执行;正确性仍以到点状态判断为准) - public void cancelAutoOff(String deviceId, String funcType) { - if (!StringUtils.hasText(deviceId) || !StringUtils.hasText(funcType)) { - return; - } - String taskKey = "autooff:" + deviceId + ":" + funcType; - // 同设备同功能只保留最后一次任务:只有旧任务还没开始时才替换 - ScheduledFuture oldFuture = autoOffFutureMap.get(taskKey); - if (oldFuture != null) { - // cancel=false 说明任务已开始/已完成,避免双执行:不再创建新任务 - if (!oldFuture.cancel(false)) { - return; - } - // cancel成功:旧任务不会跑了,这时再remove并减计数 - autoOffFutureMap.remove(taskKey, oldFuture); - decAutoOffCnt(deviceId); - } - } - -} \ No newline at end of file diff --git a/agri-framework/src/main/java/com/agri/framework/manager/MqttClientManager.java b/agri-framework/src/main/java/com/agri/framework/manager/MqttClientManager.java deleted file mode 100644 index c567f44..0000000 --- a/agri-framework/src/main/java/com/agri/framework/manager/MqttClientManager.java +++ /dev/null @@ -1,278 +0,0 @@ -package com.agri.framework.manager; - -import com.agri.framework.web.dispatcher.MqttMessageDispatcher; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.SmartLifecycle; -import org.springframework.stereotype.Component; - -import javax.annotation.Resource; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * MQTT客户端生命周期管理器 - * 核心功能: - * 1. 订阅设备状态、前端控制指令主题 - * 2. MQTT客户端启动/停止/重连 - * 3. 管理自动关闭任务线程池 - * 4. 监听MQTT连接状态,设置消息回调 - * 适配JDK 8,无心跳包相关逻辑 - */ -@Component -public class MqttClientManager implements SmartLifecycle { - - /** - * 优化:统一使用SLF4J日志(JDK 8兼容) - */ - private static final Logger log = LoggerFactory.getLogger(MqttClientManager.class); - - /** - * 新增:生命周期管理标识,控制MQTT客户端启动/关闭 - */ - private final AtomicBoolean isRunning = new AtomicBoolean(false); - - /** - * MQTT客户端(由MqttConfig配置类注入) - */ - @Resource - private MqttClient mqttClient; - - /** - * MQTT连接配置项(从MqttConfig注入) - */ - @Resource - private MqttConnectOptions mqttConnectOptions; - - /** - * MQTT消息分发器,转发消息到对应处理器 - */ - @Resource - private MqttMessageDispatcher mqttMessageDispatcher; - - /** - * 自动关任务管理器,初始化/关闭线程池 - */ - @Resource - private MqttAutoOffManager mqttAutoOffManager; - - // 读取配置文件中的默认订阅主题(移除心跳主题) - @Value("${spring.mqtt.default-topic}") - private String defaultTopic; - - // 新增:自动关闭任务线程池核心线程数(可配置) - @Value("${spring.mqtt.auto-off-thread-pool-size:5}") - private int autoOffThreadPoolSize; - - /** - * 初始化:订阅主题+设置回调 - * (移除@PostConstruct,改为由SmartLifecycle的start()触发) - *

- * 【方案A】不做自写重连;Paho会在连接断开后自动重连(前提:connectOptions.setAutomaticReconnect(true)) - */ - public void subscribeTopics() throws MqttException { - // 关键补充1:判空 - if (mqttClient == null) { - log.error("【MQTT初始化】客户端实例为空,无法订阅主题"); - throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED); - } - - // 关键补充:先确保客户端已连接(如果没连则连接,已连则跳过) - // 注意:这里只使用同一个client实例,避免sender与handler使用不同client - if (!mqttClient.isConnected()) { - try { - // 使用注入的连接配置项连接Broker(带用户名密码、自动重连等配置) - mqttClient.connect(mqttConnectOptions); - log.info("【MQTT连接】客户端已成功连接到Broker,clientId:{}", mqttClient.getClientId()); - } catch (MqttException e) { - log.error("【MQTT连接】连接Broker失败,clientId:{}", mqttClient.getClientId(), e); - throw e; - } - } - - // 解析配置的主题列表 - String[] topics = defaultTopic.split(","); - int[] qosArray = new int[topics.length]; - // 按主题类型设置QoS:控制指令/状态用QoS 1 - for (int i = 0; i < topics.length; i++) { - qosArray[i] = 0; - topics[i] = topics[i].trim(); - } - - // 设置MQTT消息回调:处理连接断开、消息接收、消息发布完成 - mqttClient.setCallback(new MqttCallback() { - /** - * MQTT连接断开回调 - * @param cause 断开原因 - */ - @Override - public void connectionLost(Throwable cause) { - log.error("【MQTT连接异常】连接断开,clientId:{},原因:{}", - safeClientId(), (cause == null ? "unknown" : cause.getMessage()), cause); - - // 【方案A】不再触发自写重连;Paho自动重连会接管重连过程 - // 这里只记录日志即可 - if (isRunning.get()) { - log.warn("【MQTT自动重连】已开启automaticReconnect,等待Paho自动重连..."); - } - } - - /** - * 收到MQTT消息回调:转发到消息分发器 - * @param topic 消息主题 - * @param message 消息内容 - * @throws Exception 消息处理异常 - */ - @Override - public void messageArrived(String topic, MqttMessage message) throws Exception { - // 优化:显式指定UTF-8编码,避免乱码(JDK 8兼容) - mqttMessageDispatcher.handleMessage(topic, new String(message.getPayload(), StandardCharsets.UTF_8)); - } - - /** - * 消息发布完成回调(仅日志记录,无业务逻辑) - * @param token 发布令牌 - */ - @Override - public void deliveryComplete(IMqttDeliveryToken token) { - if (token != null && token.getTopics() != null && token.getTopics().length > 0) { - // log.info("【MQTT确认】消息发布完成,clientId:{},主题:{}", safeClientId(), token.getTopics()[0]); - } - } - }); - - // 【方案A关键点】不再 unsubscribe 主题 - // cleanSession=false + unsubscribe 会破坏Broker侧会话订阅;并且自动重连场景更不建议这么做 - - // 订阅主题 - mqttClient.subscribe(topics, qosArray); - // 优化:打印clientId,方便排查 - log.info("【MQTT初始化】订阅主题完成,clientId:{},订阅主题:{}", mqttClient.getClientId(), String.join(",", topics)); - } - - private String safeClientId() { - try { - return (mqttClient == null ? "null" : mqttClient.getClientId()); - } catch (Exception e) { - return "unknown"; - } - } - - // ========== 手动重连接口(供Controller调用) ========== - - /** - * 手动触发MQTT重连(最小改动:不替换client实例,只用同一个client重连) - */ - public synchronized String manualReconnect() { - isRunning.set(true); - try { - // 强制断开旧连接(如果存在) - if (mqttClient != null && mqttClient.isConnected()) { - mqttClient.disconnect(); - } - // 重新初始化订阅(内部会connect + subscribe) - subscribeTopics(); - log.info("【手动重连】MQTT客户端重连成功"); - return "MQTT手动重连成功"; - } catch (MqttException e) { - log.error("【手动重连】MQTT客户端重连失败", e); - return "MQTT手动重连失败:" + e.getMessage(); - } - } - - /** - * 获取当前MQTT连接状态 - */ - public String getMqttStatus() { - boolean connected = (mqttClient != null && mqttClient.isConnected()); - String status = connected ? "已连接" : "已断开"; - return String.format("MQTT连接状态:%s;clientId:%s", status, safeClientId()); - } - - // ======================== SmartLifecycle 生命周期管理(核心修复) ======================== - - /** - * 启动MQTT客户端(Spring上下文初始化/重启时触发) - * 核心:替代@PostConstruct,保证上下文重启时重新初始化MQTT连接 - */ - @Override - public void start() { - log.info("开始监听"); - if (isRunning.compareAndSet(false, true)) { - try { - // 初始化自动关任务线程池 - mqttAutoOffManager.initExecutor(autoOffThreadPoolSize); - // 核心修改:无论是否已连接,都执行订阅 - subscribeTopics(); - log.info("【MQTT生命周期】客户端启动成功(已设置回调+订阅主题),自动关闭任务线程池大小:{}", autoOffThreadPoolSize); - } catch (MqttException e) { - log.error("【MQTT生命周期】客户端启动失败", e); - isRunning.set(false); - } - } - } - - /** - * 停止MQTT客户端 - * 改造点:优化多线程池的优雅关闭 - */ - @Override - public void stop() { - if (isRunning.compareAndSet(true, false)) { - try { - // 关闭自动关任务线程池 - mqttAutoOffManager.shutdownExecutor(); - // 关闭MQTT客户端 - if (mqttClient != null) { - if (mqttClient.isConnected()) { - mqttClient.disconnect(); - } - mqttClient.close(); - log.info("【MQTT生命周期】客户端已优雅关闭"); - } - } catch (Exception e) { - log.error("【MQTT生命周期】客户端关闭失败", e); - } - } - } - - /** - * 异步停止 - */ - @Override - public void stop(Runnable callback) { - stop(); - callback.run(); - } - - /** - * 判断MQTT客户端是否运行中 - */ - @Override - public boolean isRunning() { - return isRunning.get(); - } - - /** - * 启动优先级(保证MQTT在Redis之后启动) - */ - @Override - public int getPhase() { - return 10; - } - - /** - * 是否自动启动(默认true,Spring上下文初始化时自动调用start()) - */ - @Override - public boolean isAutoStartup() { - return true; - } -} \ No newline at end of file diff --git a/agri-framework/src/main/java/com/agri/framework/manager/MqttSubscriptionManager.java b/agri-framework/src/main/java/com/agri/framework/manager/MqttSubscriptionManager.java deleted file mode 100644 index 0c8cd6e..0000000 --- a/agri-framework/src/main/java/com/agri/framework/manager/MqttSubscriptionManager.java +++ /dev/null @@ -1,317 +0,0 @@ -package com.agri.framework.manager; - -import com.agri.common.utils.SecurityUtils; -import com.agri.system.domain.SysAgriInfo; -import com.agri.system.service.ISysAgriInfoService; -import org.apache.commons.collections4.CollectionUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.dao.DataAccessException; -import org.springframework.data.redis.connection.RedisConnection; -import org.springframework.data.redis.core.RedisCallback; -import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.data.redis.serializer.RedisSerializer; -import org.springframework.stereotype.Component; -import org.springframework.util.StringUtils; - -import javax.annotation.Resource; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -/** - * MQTT订阅关系管理器 - * 核心功能: - * 1. 维护前端-设备双向订阅关系(Redis) - * 2. 前端单设备/全量设备订阅/取消订阅 - * 3. Redis批量操作(pipeline),提升性能 - * 4. 清理无效订阅关系,避免脏数据 - * 适配JDK 8,无心跳包相关逻辑 - */ -@Component -public class MqttSubscriptionManager { - /** - * 优化:统一使用SLF4J日志(JDK 8兼容) - */ - private static final Logger log = LoggerFactory.getLogger(MqttSubscriptionManager.class); - - /** - * Redis模板,用于存储订阅关系、设备在线状态、分布式锁 - */ - @Resource - private StringRedisTemplate stringRedisTemplate; - - /** - * 农业信息服务,查询用户名下设备 - */ - @Resource - private ISysAgriInfoService agriInfoService; - - // 新增:前端订阅关系TTL(兜底“取消订阅失败/异常退出”)——只维护subc:{clientId}的TTL - @Value("${spring.mqtt.subc-ttl-seconds:3600}") - private int subcTtlSeconds; - - // 新增:pipeline 批量 SISMEMBER subc:{clientId} deviceId(N次->1次往返) 拿取失效的client - public List pipeIsMemberSubc(List clientIds, String deviceId) { - if (clientIds == null || clientIds.isEmpty() || !StringUtils.hasText(deviceId)) { - return Collections.emptyList(); - } - - // ✅ 关键:不发“占位命令”;只对有效clientId发SISMEMBER,同时保证返回结果与入参严格对齐 - int n = clientIds.size(); - // 创建长度n全部false的队列 - List out = new ArrayList<>(Collections.nCopies(n, Boolean.FALSE)); - List idx = new ArrayList<>(n); - - for (int i = 0; i < n; i++) { - if (StringUtils.hasText(clientIds.get(i))) { - // 符合条件存进clientIds对应的索引 - idx.add(i); - } - } - if (idx.isEmpty()) { - return out; - } - - // 处理每个每个clientId的值是否还在 值存在rs中 执行 stringRedisTemplate.executePipelined - List rs = stringRedisTemplate.executePipelined((RedisCallback) connection -> { - RedisSerializer serializer = stringRedisTemplate.getStringSerializer(); - byte[] member = serializer.serialize(deviceId); - for (int i : idx) { - String clientId = clientIds.get(i); - // 存放命令 - connection.sIsMember(serializer.serialize("subc:" + clientId), member); - } - return null; - }); - - for (int j = 0; j < idx.size() && j < (rs == null ? 0 : rs.size()); j++) { - out.set(idx.get(j), Boolean.TRUE.equals(rs.get(j))); - } - return out; - } - - // 对应的subc不存在 删除对应的sub pipeline 批量 SREM sub:{deviceId} clientId(清理残留 N次->1次往返) - public void pipeSRemSub(String deviceId, List staleClientIds) { - if (!StringUtils.hasText(deviceId) || staleClientIds == null || staleClientIds.isEmpty()) { - return; - } - stringRedisTemplate.executePipelined((RedisCallback) connection -> { - RedisSerializer serializer = stringRedisTemplate.getStringSerializer(); - byte[] subKey = serializer.serialize("sub:" + deviceId); - for (String clientId : staleClientIds) { - if (StringUtils.hasText(clientId)) { - connection.sRem(subKey, serializer.serialize(clientId)); - } - } - return null; - }); - } - - /** - * 前端订阅设备(Controller调用) - */ - public void subscribeDevice(String clientId, String deviceId) { - if (!StringUtils.hasText(clientId) || !StringUtils.hasText(deviceId)) { - log.error("【订阅管理】clientId或deviceId不能为空"); - throw new IllegalArgumentException("clientId和deviceId不能为空"); - } - - // 保存订阅关系到Redis - stringRedisTemplate.opsForSet().add("sub:" + deviceId, clientId); - stringRedisTemplate.opsForSet().add("subc:" + clientId, deviceId); - - // 新增:订阅成功后给subc设置TTL(兜底“取消订阅失败/异常退出”) - stringRedisTemplate.expire("subc:" + clientId, subcTtlSeconds, TimeUnit.SECONDS); - - log.info("【订阅管理】前端{}订阅设备{}成功", clientId, deviceId); - } - - /** - * 前端取消订阅设备状态接口(供Controller层调用) - * 逻辑:从设备的订阅列表移除前端clientId - * - * @param clientId 前端唯一标识 - * @param deviceId 设备ID - */ - public void unsubscribeDevice(String clientId, String deviceId) { - if (!StringUtils.hasText(clientId) || !StringUtils.hasText(deviceId)) { - log.error("【前端取消订阅】clientId或deviceId不能为空"); - throw new IllegalArgumentException("clientId和deviceId不能为空"); - } - - // 从Redis删除订阅关系 - stringRedisTemplate.opsForSet().remove("sub:" + deviceId, clientId); - stringRedisTemplate.opsForSet().remove("subc:" + clientId, deviceId); - log.info("【前端取消订阅】前端{}取消订阅设备{}成功", clientId, deviceId); - } - - /** - * 全量订阅:前端订阅指定用户名下的所有设备(Controller调用) - * @param clientId 前端唯一标识(如web_001、app_002) - * @return 订阅成功的设备数量 - */ - public int subscribeAllDeviceByUserId(String clientId) { - // 1. 入参校验 - if (!StringUtils.hasText(clientId)) { - log.error("【全量订阅】clientId不能为空"); - throw new IllegalArgumentException("clientId不能为空"); - } - - - // 2. Redis连接可用性校验 - try { - stringRedisTemplate.hasKey("test:connection"); - } catch (Exception e) { - log.warn("【全量订阅】Redis连接不可用,订阅操作跳过:{}", e.getMessage()); - return 0; - } - Long userId = SecurityUtils.getLoginUser().getUserId(); - // 3. 查询该用户名下的所有设备ID(替换为你的实际设备查询逻辑) - List deviceIds = new ArrayList<>(queryImeiByUserId(userId)); - if (userId == 1) { - deviceIds.add("862538065276061"); - } - if (deviceIds == null || deviceIds.isEmpty()) { - log.warn("【全量订阅】用户{}名下无可用设备", userId); - return 0; - } - // 过滤空设备ID,避免无效操作 - List validDeviceIds = deviceIds.stream() - .filter(StringUtils::hasText) - .distinct() - .collect(Collectors.toList()); - if (validDeviceIds.isEmpty()) { - log.warn("【全量订阅】用户{}名下无有效设备ID", userId); - return 0; - } - - // 4. 批量写入Redis订阅关系(兼容JDK 8的RedisCallback写法) - try { - stringRedisTemplate.execute(new RedisCallback() { - @Override - public Void doInRedis(RedisConnection connection) throws DataAccessException { - // 获取String序列化器(和stringRedisTemplate保持一致) - RedisSerializer serializer = stringRedisTemplate.getStringSerializer(); - - // 开启Redis事务 - connection.multi(); - byte[] clientIdBytes = serializer.serialize(clientId); - // 4.1 设备→前端:给每个设备的订阅集合添加clientId - for (String deviceId : validDeviceIds) { - byte[] subKey = serializer.serialize("sub:" + deviceId); - connection.sAdd(subKey, clientIdBytes); - } - - // 4.2 前端→设备:给前端的订阅集合批量添加所有设备ID - byte[] subcKey = serializer.serialize("subc:" + clientId); - byte[][] deviceIdBytesArray = new byte[validDeviceIds.size()][]; - for (int i = 0; i < validDeviceIds.size(); i++) { - deviceIdBytesArray[i] = serializer.serialize(validDeviceIds.get(i)); - } - connection.sAdd(subcKey, deviceIdBytesArray); - - // 新增:给subc设置TTL(兜底“取消订阅失败/异常退出”) - connection.expire(subcKey, subcTtlSeconds); - - // 执行事务 - connection.exec(); - return null; - } - }); - - log.info("【全量订阅】前端{}成功订阅用户{}名下的{}个设备,设备列表:{}", - clientId, userId, validDeviceIds.size(), validDeviceIds); - return validDeviceIds.size(); - } catch (Exception e) { - log.error("【全量------订阅】前端{}订阅用户{}名下设备失败", clientId, userId, e); - throw new RuntimeException("全量订阅失败:" + e.getMessage()); - } - } - - /** - * 全量取消:前端取消订阅的所有设备(即用户名下所有设备) - * @param clientId 前端唯一标识 - * @return 需要前端取消监听的MQTT主题列表 - */ - public List unsubscribeAllDevice(String clientId) { - // 1. 入参校验 - if (!StringUtils.hasText(clientId)) { - log.error("【全量取消】clientId不能为空"); - throw new IllegalArgumentException("clientId不能为空"); - } - - // 2. Redis连接可用性校验 - try { - stringRedisTemplate.hasKey("test:connection"); - } catch (Exception e) { - log.warn("【全量取消】Redis连接不可用,取消操作跳过:{}", e.getMessage()); - return Collections.emptyList(); - } - - // 3. 查询该前端订阅的所有设备ID(即用户名下所有设备) - Set deviceSet = stringRedisTemplate.opsForSet().members("subc:" + clientId); - if (deviceSet == null || deviceSet.isEmpty()) { - log.warn("【全量取消】前端{}无订阅的设备", clientId); - return Collections.emptyList(); - } - - // 4. 构建需要取消的MQTT主题列表 - List frontendTopics = new ArrayList<>(); - for (String deviceId : deviceSet) { - frontendTopics.add("frontend/" + clientId + "/dtu/" + deviceId + "/listener"); - } - - // 5. 批量删除Redis订阅关系(兼容JDK 8的RedisCallback写法) - try { - stringRedisTemplate.execute(new RedisCallback() { - @Override - public Void doInRedis(RedisConnection connection) throws DataAccessException { - RedisSerializer serializer = stringRedisTemplate.getStringSerializer(); - - // 开启事务 - connection.multi(); - byte[] clientIdBytes = serializer.serialize(clientId); - // 5.1 批量删除设备→前端的订阅关系 - for (String deviceId : deviceSet) { - byte[] subKey = serializer.serialize("sub:" + deviceId); - connection.sRem(subKey, clientIdBytes); - } - - // 5.2 删除前端→设备的反向索引(核心:清空该前端的所有订阅设备) - byte[] subcKey = serializer.serialize("subc:" + clientId); - connection.del(subcKey); - - // 执行事务 - connection.exec(); - return null; - } - }); - } catch (Exception e) { - log.error("【全量取消】Redis批量删除失败", e); - throw new RuntimeException("全量取消订阅失败:" + e.getMessage()); - } - log.info("【全量取消】前端{}成功取消{}个设备的订阅", clientId, deviceSet.size()); - return frontendTopics; - } - - /** - * 实际业务中:查询指定用户名下的所有设备ID(需替换为你的DAO/Service逻辑) - * @return 设备ID列表 - */ - private List queryImeiByUserId(Long userId) { - // 示例:替换为你项目中查询用户设备的实际代码 - // 比如:return deviceService.listDeviceIdsByUserId(userId); - List agriInfos = agriInfoService.lambdaQuery() - .eq(SysAgriInfo::getUserId, userId) - .list(); - if (CollectionUtils.isEmpty(agriInfos)) { - return Collections.emptyList(); - } - return agriInfos.stream().map(SysAgriInfo::getImei).collect(Collectors.toList()); - } -} \ No newline at end of file diff --git a/agri-framework/src/main/java/com/agri/framework/web/dispatcher/MqttMessageDispatcher.java b/agri-framework/src/main/java/com/agri/framework/web/dispatcher/MqttMessageDispatcher.java deleted file mode 100644 index 8a23191..0000000 --- a/agri-framework/src/main/java/com/agri/framework/web/dispatcher/MqttMessageDispatcher.java +++ /dev/null @@ -1,69 +0,0 @@ -package com.agri.framework.web.dispatcher; - -import com.agri.framework.interceptor.DeviceStatusHandler; -import com.agri.framework.interceptor.FrontendControlHandler; -import com.agri.framework.interceptor.FrontendOnlineHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; - -import javax.annotation.Resource; - -/** - * MQTT消息分发器 - * 核心功能:根据Topic类型路由消息到对应处理器 - * 适配JDK 8,无心跳包相关逻辑 - */ -@Component -public class MqttMessageDispatcher { - /** - * 优化:统一使用SLF4J日志(JDK 8兼容) - */ - private static final Logger log = LoggerFactory.getLogger(MqttMessageDispatcher.class); - - /** - * 设备状态处理器 - */ - @Resource - private DeviceStatusHandler deviceStatusHandler; - - /** - * 前端控制指令处理器 - */ - @Resource - private FrontendControlHandler frontendControlHandler; - - /** - * 前端在线心跳处理器 - */ - @Resource - private FrontendOnlineHandler frontendOnlineHandler; - - /** - * 消息分发处理:根据主题类型路由到不同处理方法 - * 可以监听到设备传过来的业务数据 以及前端传过来的控制设备指令 - * - * @param topic 消息主题 - * @param payload 消息内容(JSON字符串) - */ - public void handleMessage(String topic, String payload) { - try { - // log.info("【MQTT接收】topic={}, payload={}", topic, payload); - - // 设备状态主题:dtu/{deviceId}/up - if (topic.matches("dtu/\\w+/up")) { - deviceStatusHandler.handle(topic, payload); - } - // 处理前端控制指令主题:frontend/{clientId}/control/{deviceId} - else if (topic.matches("frontend/\\w+/control/\\w+")) { - frontendControlHandler.handle(topic, payload); - } - // 新增:前端在线心跳主题:frontend/{clientId}/online - else if (topic.matches("frontend/\\w+/online")) { - frontendOnlineHandler.handle(topic, payload); - } - } catch (Exception e) { - log.error("【MQTT消息处理异常】topic={}", topic, e); - } - } -} \ No newline at end of file