diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java index 9dbe84a..3fc209e 100644 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java @@ -337,7 +337,7 @@ public class MqttMessageHandler implements SmartLifecycle { } } - // 第三步:仅处理非回执的设备状态包,且仅当是8个功能码结构+需要执行自动关任务时写入Redis + // 第三步:仅处理非回执的设备状态包,且仅当是8个功能码结构就写入Redis // 有没有人订阅都得写,只要发送设备开的指令成功了就得写 if (!isAck) { // 1) 先校验状态包是否包含8个固定功能码(核心:只有这种结构才写入) @@ -349,22 +349,16 @@ public class MqttMessageHandler implements SmartLifecycle { } } if (!isValidStatus) { - log.debug("【设备状态包】结构不合法(非8个功能码),跳过Redis写入,deviceId={};payload={}", deviceId,payload); - } - - // 2) 核心修改:仅当需要执行自动关任务时,才判断是否写入Redis - // 条件1:当前已标记需要执行新的自动关任务 - // 条件2:或已有未完成的自动关任务(避免任务执行中状态丢失) - // 最终写入条件:有自动关任务(必要条件) + 有人订阅(可选条件,可按需删除) - // 3) 满足条件则写入完整的8功能码JSON到Redis - if (needAutoOffTask || hasAutoOffTask(deviceId)) { + log.debug("【设备状态包】结构不合法(非8个功能码),跳过Redis写入,deviceId={};payload={}", deviceId, payload); + } else { + // ✅ 8个功能码状态包:无条件写device:latest:{deviceId},避免自动关读不到最新状态 stringRedisTemplate.opsForValue().set( "device:latest:" + deviceId, payload, // 完整的8功能码JSON latestTtlSeconds, TimeUnit.SECONDS ); - log.debug("【设备状态包】因需要执行自动关任务,写入Redis成功,deviceId={}", deviceId); + log.debug("【设备状态包】写入Redis成功,deviceId={}", deviceId); } } // 非回执消息:正常转发给订阅前端 @@ -409,6 +403,13 @@ public class MqttMessageHandler implements SmartLifecycle { if (!StringUtils.hasText(deviceId) || !StringUtils.hasText(funcType) || delaySeconds <= 0) { return; } + + // ✅ 防御:避免极端情况下线程池尚未初始化导致NPE + if (autoOffExecutor == null) { + log.warn("【自动关任务】线程池未初始化,跳过创建任务:deviceId={}, funcType={}", deviceId, funcType); + return; + } + String taskKey = "autooff:" + deviceId + ":" + funcType; // 同设备同功能只保留最后一次任务(先取消旧任务)