diff --git a/agri-admin/src/main/resources/application-mqtt.yml b/agri-admin/src/main/resources/application-mqtt.yml index c99d2b3..b9964bf 100644 --- a/agri-admin/src/main/resources/application-mqtt.yml +++ b/agri-admin/src/main/resources/application-mqtt.yml @@ -12,4 +12,4 @@ spring: latest-ttl-seconds: 120 #设备最新状态缓存的过期时间(秒)。 # 自动关闭任务线程池大小 auto-off-thread-pool-size: 5 - online-ttl-seconds: 60 # 在线新跳ttl \ No newline at end of file + sub-ttl-seconds: 3600 # 在线新跳ttl \ No newline at end of file diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java index 0703736..8dc491e 100644 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java @@ -88,10 +88,6 @@ public class MqttMessageHandler implements SmartLifecycle { @Value("${spring.mqtt.default-topic}") private String defaultTopic; - // 新增:前端在线TTL(配合前端心跳,每20秒续一次,TTL建议60秒) - @Value("${spring.mqtt.online-ttl-seconds:60}") - private int onlineTtlSeconds; - // 新增:最新状态缓存TTL(设备每10秒上报一次,缓存一小段时间即可) @Value("${spring.mqtt.latest-ttl-seconds:120}") private int latestTtlSeconds; @@ -100,6 +96,10 @@ public class MqttMessageHandler implements SmartLifecycle { @Value("${spring.mqtt.auto-off-thread-pool-size:5}") private int autoOffThreadPoolSize; + // 新增:前端订阅关系TTL(兜底“取消订阅失败/异常退出”)——只维护subc:{clientId}的TTL + @Value("${spring.mqtt.subc-ttl-seconds:3600}") + private int subcTtlSeconds; + // 优化:统一使用SLF4J日志(JDK 8兼容) private static final Logger log = LoggerFactory.getLogger(MqttMessageHandler.class); @@ -350,7 +350,7 @@ public class MqttMessageHandler implements SmartLifecycle { break; } } - if(hasAutoOffTask(deviceId) && isValidStatus){ + if (hasAutoOffTask(deviceId) && isValidStatus) { // ✅ 8个功能码状态包:无条件写device:latest:{deviceId},避免自动关读不到最新状态 stringRedisTemplate.opsForValue().set( "device:latest:" + deviceId, @@ -369,20 +369,18 @@ public class MqttMessageHandler implements SmartLifecycle { // 推送给每个订阅的前端 for (String clientId : subscribedClients) { - // 新增:只有在线才转发;不在线就清理残留订阅,解决“取消订阅失败兜底” - String onlineKey = "online:" + clientId; - Boolean online = stringRedisTemplate.hasKey(onlineKey); - if (!online) { - // 不在线:从两边索引移除(避免一直给离线前端发) + // 方案B:不再依赖online:;改为校验subc:{clientId}是否仍包含deviceId(取消订阅失败/异常退出兜底) + Boolean stillSub = stringRedisTemplate.opsForSet().isMember("subc:" + clientId, deviceId); + if (!stillSub) { + // 关系不存在:清理sub:{deviceId}残留,避免一直给前端发 stringRedisTemplate.opsForSet().remove("sub:" + deviceId, clientId); - stringRedisTemplate.opsForSet().remove("subc:" + clientId, deviceId); continue; } // 前端专属主题:frontend/{clientId}/dtu/{deviceId}/up String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; // 发布消息 mqttMessageSender.publish(frontendTopic, payload); - log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic); + // log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic); } } else { // 优化:替换System.out为log.info @@ -570,12 +568,11 @@ public class MqttMessageHandler implements SmartLifecycle { return; } - // 在线标记:online:{clientId} => 1(带TTL) - String onlineKey = "online:" + clientId; - stringRedisTemplate.opsForValue().set(onlineKey, "1", onlineTtlSeconds, TimeUnit.SECONDS); + // 续期subc:{clientId} + stringRedisTemplate.expire("subc:" + clientId, subcTtlSeconds, TimeUnit.SECONDS); // todo 生产环境不建议打印每次心跳 - log.debug("【在线心跳】clientId={} 在线TTL={}s payload={}", clientId, onlineTtlSeconds, payload); + log.debug("【在线心跳】clientId={} 续期subcTTL={}s payload={}", clientId, subcTtlSeconds, payload); } catch (Exception e) { log.warn("【在线心跳】处理失败 topic={} msg={}", topic, e.getMessage()); } @@ -609,6 +606,10 @@ public class MqttMessageHandler implements SmartLifecycle { // 保存订阅关系到Redis stringRedisTemplate.opsForSet().add("sub:" + deviceId, clientId); stringRedisTemplate.opsForSet().add("subc:" + clientId, deviceId); + + // 新增:订阅成功后给subc设置TTL(兜底“取消订阅失败/异常退出”) + stringRedisTemplate.expire("subc:" + clientId, subcTtlSeconds, TimeUnit.SECONDS); + log.info("【订阅管理】前端{}订阅设备{}成功", clientId, deviceId); } @@ -697,6 +698,9 @@ public class MqttMessageHandler implements SmartLifecycle { } connection.sAdd(subcKey, deviceIdBytesArray); + // 新增:给subc设置TTL(兜底“取消订阅失败/异常退出”) + connection.expire(subcKey, subcTtlSeconds); + // 执行事务 connection.exec(); return null; @@ -707,7 +711,7 @@ public class MqttMessageHandler implements SmartLifecycle { clientId, userId, validDeviceIds.size(), validDeviceIds); return validDeviceIds.size(); } catch (Exception e) { - log.error("【全量订阅】前端{}订阅用户{}名下设备失败", clientId, userId, e); + log.error("【全量------订阅】前端{}订阅用户{}名下设备失败", clientId, userId, e); throw new RuntimeException("全量订阅失败:" + e.getMessage()); } }