From ee21847741c1fa007e74a54e247ce3ee7477fac3 Mon Sep 17 00:00:00 2001 From: xce Date: Sun, 18 Jan 2026 23:46:00 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9redis=20=E6=9F=A5key=20?= =?UTF-8?q?=E6=89=B9=E9=87=8F=E6=9F=A5=E6=89=B9=E9=87=8F=E5=88=A0=E9=99=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../interceptor/MqttMessageHandler.java | 125 ++++++++++++++++-- 1 file changed, 111 insertions(+), 14 deletions(-) diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java index 8dc491e..e3c7222 100644 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java @@ -119,6 +119,9 @@ public class MqttMessageHandler implements SmartLifecycle { // 新增:同设备同功能只保留最后一次自动关任务 private final ConcurrentHashMap> autoOffFutureMap = new ConcurrentHashMap<>(); + // 新增:按设备维度统计“未完成的自动关任务”数量,hasAutoOffTask从扫描O(N)降为O(1) + private final ConcurrentHashMap autoOffDeviceCnt = new ConcurrentHashMap<>(); + @Autowired private ISysAgriLimitService agriLimitService; @@ -367,13 +370,23 @@ public class MqttMessageHandler implements SmartLifecycle { if (subscribedClients != null && !subscribedClients.isEmpty()) { // 推送给每个订阅的前端 - for (String clientId : subscribedClients) { + // 方案B:不再依赖online:;改为校验subc:{clientId}是否仍包含deviceId(取消订阅失败/异常退出兜底) + List clients = new ArrayList<>(subscribedClients); + // 判断subc是否还存在 一次性查全部 获取失效的clientId + List stillSubs = pipeIsMemberSubc(clients, deviceId); - // 方案B:不再依赖online:;改为校验subc:{clientId}是否仍包含deviceId(取消订阅失败/异常退出兜底) - Boolean stillSub = stringRedisTemplate.opsForSet().isMember("subc:" + clientId, deviceId); + // 关系不存在:清理sub:{deviceId}残留,避免一直给前端发 + List stale = null; + + for (int i = 0; i < clients.size(); i++) { + String clientId = clients.get(i); + boolean stillSub = i < stillSubs.size() && Boolean.TRUE.equals(stillSubs.get(i)); if (!stillSub) { - // 关系不存在:清理sub:{deviceId}残留,避免一直给前端发 - stringRedisTemplate.opsForSet().remove("sub:" + deviceId, clientId); + if (stale == null) { + stale = new ArrayList<>(); + } + // false不存在添加队列 + stale.add(clientId); continue; } // 前端专属主题:frontend/{clientId}/dtu/{deviceId}/up @@ -382,27 +395,102 @@ public class MqttMessageHandler implements SmartLifecycle { mqttMessageSender.publish(frontendTopic, payload); // log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic); } + // 删掉设备对应的客户端 + if (stale != null && !stale.isEmpty()) { + pipeSRemSub(deviceId, stale); + } } else { // 优化:替换System.out为log.info // log.info("【设备状态转发】设备{}无订阅前端,跳过转发", deviceId); } } + // 新增:pipeline 批量 SISMEMBER subc:{clientId} deviceId(N次->1次往返) 拿取失效的client + private List pipeIsMemberSubc(List clientIds, String deviceId) { + if (clientIds == null || clientIds.isEmpty() || !StringUtils.hasText(deviceId)) { + return Collections.emptyList(); + } + + // ✅ 关键:不发“占位命令”;只对有效clientId发SISMEMBER,同时保证返回结果与入参严格对齐 + int n = clientIds.size(); + // 创建长度n全部false的队列 + List out = new ArrayList<>(Collections.nCopies(n, Boolean.FALSE)); + List idx = new ArrayList<>(n); + + for (int i = 0; i < n; i++) { + if (StringUtils.hasText(clientIds.get(i))) { + // 符合条件存进clientIds对应的索引 + idx.add(i); + } + } + if (idx.isEmpty()) { + return out; + } + + // 处理每个每个clientId的值是否还在 值存在rs中 执行 stringRedisTemplate.executePipelined + List rs = stringRedisTemplate.executePipelined((RedisCallback) connection -> { + RedisSerializer serializer = stringRedisTemplate.getStringSerializer(); + byte[] member = serializer.serialize(deviceId); + for (int i : idx) { + String clientId = clientIds.get(i); + // 存放命令 + connection.sIsMember(serializer.serialize("subc:" + clientId), member); + } + return null; + }); + + for (int j = 0; j < idx.size() && j < (rs == null ? 0 : rs.size()); j++) { + out.set(idx.get(j), Boolean.TRUE.equals(rs.get(j))); + } + return out; + } + + // 对应的subc不存在 删除对应的sub pipeline 批量 SREM sub:{deviceId} clientId(清理残留 N次->1次往返) + private void pipeSRemSub(String deviceId, List staleClientIds) { + if (!StringUtils.hasText(deviceId) || staleClientIds == null || staleClientIds.isEmpty()) { + return; + } + stringRedisTemplate.executePipelined((RedisCallback) connection -> { + RedisSerializer serializer = stringRedisTemplate.getStringSerializer(); + byte[] subKey = serializer.serialize("sub:" + deviceId); + for (String clientId : staleClientIds) { + if (StringUtils.hasText(clientId)) { + connection.sRem(subKey, serializer.serialize(clientId)); + } + } + return null; + }); + } + // 新增:是否存在该设备的自动关任务 private boolean hasAutoOffTask(String deviceId) { if (!StringUtils.hasText(deviceId)) { return false; } - String prefix = "autooff:" + deviceId + ":"; - for (String key : autoOffFutureMap.keySet()) { - if (key != null && key.startsWith(prefix)) { - ScheduledFuture f = autoOffFutureMap.get(key); - if (f != null && !f.isCancelled() && !f.isDone()) { - return true; - } - } + // ✅ O(1):不再扫描autoOffFutureMap;由scheduleAutoOff/runAutoOff维护计数 + Integer cnt = autoOffDeviceCnt.get(deviceId); + return cnt != null && cnt > 0; + } + + // 新增:自动关任务计数 +1(只维护deviceId维度,确保hasAutoOffTask为O(1)) + private void incAutoOffCnt(String deviceId) { + if (!StringUtils.hasText(deviceId)) { + return; } - return false; + autoOffDeviceCnt.merge(deviceId, 1, (a, b) -> a + b); + } + + // 新增:自动关任务计数 -1(避免负数;归零则清理key,省内存) + private void decAutoOffCnt(String deviceId) { + if (!StringUtils.hasText(deviceId)) { + return; + } + autoOffDeviceCnt.compute(deviceId, (k, v) -> { + if (v == null || v <= 1) { + return null; + } + return v - 1; + }); } // 改造:多线程执行自动关闭任务 @@ -425,6 +513,8 @@ public class MqttMessageHandler implements SmartLifecycle { if (oldFuture != null) { oldFuture.cancel(false); log.debug("【自动关任务】取消旧任务:{}", taskKey); + // ✅ 旧任务被替换:需要同步减少该设备的“未完成任务数”,避免计数虚高导致hasAutoOffTask误判 + decAutoOffCnt(deviceId); } // 使用多线程池提交任务 @@ -436,11 +526,16 @@ public class MqttMessageHandler implements SmartLifecycle { } finally { // 任务执行完成后移除映射 autoOffFutureMap.remove(taskKey); + // ✅ 任务结束(成功/失败都算结束):减少该设备的“未完成任务数”,保证hasAutoOffTask准确 + decAutoOffCnt(deviceId); } }, delaySeconds, TimeUnit.SECONDS); // 保存新任务的引用 autoOffFutureMap.put(taskKey, newFuture); + // ✅ 新任务创建成功:增加该设备的“未完成任务数” + incAutoOffCnt(deviceId); + log.info("【自动关任务】已创建(多线程):deviceId={}, funcType={}, delay={}s", deviceId, funcType, delaySeconds); } @@ -883,6 +978,8 @@ public class MqttMessageHandler implements SmartLifecycle { log.debug("【自动关任务】取消任务:{}", entry.getKey()); } autoOffFutureMap.clear(); + // ✅ 停止时直接清空计数,避免残留 + autoOffDeviceCnt.clear(); // 2. 优雅关闭线程池 if (autoOffExecutor != null) {