mqtt延时任务

feasure
xce 2026-01-18 16:12:09 +08:00
parent 77fb218e5f
commit 2b6895b518
1 changed files with 12 additions and 11 deletions

View File

@ -337,7 +337,7 @@ public class MqttMessageHandler implements SmartLifecycle {
} }
} }
// 第三步仅处理非回执的设备状态包且仅当是8个功能码结构+需要执行自动关任务时写入Redis // 第三步仅处理非回执的设备状态包且仅当是8个功能码结构写入Redis
// 有没有人订阅都得写,只要发送设备开的指令成功了就得写 // 有没有人订阅都得写,只要发送设备开的指令成功了就得写
if (!isAck) { if (!isAck) {
// 1) 先校验状态包是否包含8个固定功能码核心只有这种结构才写入 // 1) 先校验状态包是否包含8个固定功能码核心只有这种结构才写入
@ -350,21 +350,15 @@ public class MqttMessageHandler implements SmartLifecycle {
} }
if (!isValidStatus) { if (!isValidStatus) {
log.debug("【设备状态包】结构不合法非8个功能码跳过Redis写入deviceId={}payload={}", deviceId, payload); log.debug("【设备状态包】结构不合法非8个功能码跳过Redis写入deviceId={}payload={}", deviceId, payload);
} } else {
// ✅ 8个功能码状态包无条件写device:latest:{deviceId},避免自动关读不到最新状态
// 2) 核心修改仅当需要执行自动关任务时才判断是否写入Redis
// 条件1当前已标记需要执行新的自动关任务
// 条件2或已有未完成的自动关任务避免任务执行中状态丢失
// 最终写入条件:有自动关任务(必要条件) + 有人订阅(可选条件,可按需删除)
// 3) 满足条件则写入完整的8功能码JSON到Redis
if (needAutoOffTask || hasAutoOffTask(deviceId)) {
stringRedisTemplate.opsForValue().set( stringRedisTemplate.opsForValue().set(
"device:latest:" + deviceId, "device:latest:" + deviceId,
payload, // 完整的8功能码JSON payload, // 完整的8功能码JSON
latestTtlSeconds, latestTtlSeconds,
TimeUnit.SECONDS TimeUnit.SECONDS
); );
log.debug("【设备状态包】因需要执行自动关任务,写入Redis成功deviceId={}", deviceId); log.debug("【设备状态包】写入Redis成功deviceId={}", deviceId);
} }
} }
// 非回执消息:正常转发给订阅前端 // 非回执消息:正常转发给订阅前端
@ -409,6 +403,13 @@ public class MqttMessageHandler implements SmartLifecycle {
if (!StringUtils.hasText(deviceId) || !StringUtils.hasText(funcType) || delaySeconds <= 0) { if (!StringUtils.hasText(deviceId) || !StringUtils.hasText(funcType) || delaySeconds <= 0) {
return; return;
} }
// ✅ 防御避免极端情况下线程池尚未初始化导致NPE
if (autoOffExecutor == null) {
log.warn("【自动关任务】线程池未初始化跳过创建任务deviceId={}, funcType={}", deviceId, funcType);
return;
}
String taskKey = "autooff:" + deviceId + ":" + funcType; String taskKey = "autooff:" + deviceId + ":" + funcType;
// 同设备同功能只保留最后一次任务(先取消旧任务) // 同设备同功能只保留最后一次任务(先取消旧任务)