diff --git a/agri-admin/src/main/resources/application-mqtt.yml b/agri-admin/src/main/resources/application-mqtt.yml index 6de3ecd..c99d2b3 100644 --- a/agri-admin/src/main/resources/application-mqtt.yml +++ b/agri-admin/src/main/resources/application-mqtt.yml @@ -11,4 +11,5 @@ spring: keep-alive: 60 # 心跳间隔 latest-ttl-seconds: 120 #设备最新状态缓存的过期时间(秒)。 # 自动关闭任务线程池大小 - auto-off-thread-pool-size: 5 \ No newline at end of file + auto-off-thread-pool-size: 5 + online-ttl-seconds: 60 # 在线新跳ttl \ No newline at end of file diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java index 9b6e8c3..bc2252c 100644 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java @@ -88,9 +88,9 @@ public class MqttMessageHandler implements SmartLifecycle { @Value("${spring.mqtt.default-topic}") private String defaultTopic; - // 新增:自动关延迟秒数(固定多少秒-n秒) - @Value("${spring.mqtt.auto-off-seconds:30}") - private int autoOffSeconds; + // 新增:前端在线TTL(配合前端心跳,每20秒续一次,TTL建议60秒) + @Value("${spring.mqtt.online-ttl-seconds:60}") + private int onlineTtlSeconds; // 新增:最新状态缓存TTL(设备每10秒上报一次,缓存一小段时间即可) @Value("${spring.mqtt.latest-ttl-seconds:120}") @@ -260,6 +260,10 @@ public class MqttMessageHandler implements SmartLifecycle { else if (topic.matches("frontend/\\w+/control/\\w+")) { handleFrontendControl(topic, payload); } + // 新增:前端在线心跳主题:frontend/{clientId}/online + else if (topic.matches("frontend/\\w+/online")) { + handleFrontendOnline(topic, payload); + } } catch (Exception e) { log.error("【MQTT消息处理异常】topic={}", topic, e); } @@ -342,7 +346,7 @@ public class MqttMessageHandler implements SmartLifecycle { for (String validCode : VALID_FUNC_CODES) { if (!payloadObj.containsKey(validCode)) { isValidStatus = false; - log.debug("【设备状态包】结构不合法(非8个功能码),跳过Redis写入,deviceId={};payload={}", deviceId, payload); + // log.debug("【设备状态包】结构不合法(非8个功能码),跳过Redis写入,deviceId={};payload={}", deviceId, payload); break; } } @@ -364,11 +368,21 @@ public class MqttMessageHandler implements SmartLifecycle { if (subscribedClients != null && !subscribedClients.isEmpty()) { // 推送给每个订阅的前端 for (String clientId : subscribedClients) { + + // 新增:只有在线才转发;不在线就清理残留订阅,解决“取消订阅失败兜底” + String onlineKey = "online:" + clientId; + Boolean online = stringRedisTemplate.hasKey(onlineKey); + if (online == null || !online) { + // 不在线:从两边索引移除(避免一直给离线前端发) + stringRedisTemplate.opsForSet().remove("sub:" + deviceId, clientId); + stringRedisTemplate.opsForSet().remove("subc:" + clientId, deviceId); + continue; + } // 前端专属主题:frontend/{clientId}/dtu/{deviceId}/up String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; // 发布消息 mqttMessageSender.publish(frontendTopic, payload); - // log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic); + log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic); } } else { // 优化:替换System.out为log.info @@ -541,6 +555,32 @@ public class MqttMessageHandler implements SmartLifecycle { log.info("【指令转发】前端{} → 设备{}的{}功能", clientId, deviceId, funcType); } + /** + * 新增:处理前端在线心跳:写入Redis在线标记(带TTL) + * 主题格式:frontend/{clientId}/online + */ + private void handleFrontendOnline(String topic, String payload) { + try { + String[] parts = topic.split("/"); + if (parts.length < 3) { + return; + } + String clientId = parts[1]; + if (!StringUtils.hasText(clientId)) { + return; + } + + // 在线标记:online:{clientId} => 1(带TTL) + String onlineKey = "online:" + clientId; + stringRedisTemplate.opsForValue().set(onlineKey, "1", onlineTtlSeconds, TimeUnit.SECONDS); + + // todo 生产环境不建议打印每次心跳 + log.debug("【在线心跳】clientId={} 在线TTL={}s payload={}", clientId, onlineTtlSeconds, payload); + } catch (Exception e) { + log.warn("【在线心跳】处理失败 topic={} msg={}", topic, e.getMessage()); + } + } + /** * 权限校验逻辑(示例) * 可根据业务需求扩展: