mqtt延时任务

feasure
xce 2026-01-17 06:54:44 +08:00
parent b55e29045d
commit 77fb218e5f
1 changed files with 17 additions and 19 deletions

View File

@ -290,6 +290,8 @@ public class MqttMessageHandler implements SmartLifecycle {
String funcType = null;
Integer funcValue = null;
boolean isAck = false;
// 新增:标记是否需要执行自动关任务(全局可用)
boolean needAutoOffTask = false;
// 第二步设备回执处理逻辑完全移除Redis写入
if (payloadObj.containsKey("suc") && payloadObj.containsKey("prop")) {
@ -322,7 +324,12 @@ public class MqttMessageHandler implements SmartLifecycle {
if (agriLimit != null) {
autoOffSeconds = LIMIT_MAP.getOrDefault(funcType, k -> 0).apply(agriLimit);
}
scheduleAutoOff(deviceId, funcType, autoOffSeconds);
// 新增:判断是否真的需要执行自动关任务(延迟秒数>0才是有效任务
needAutoOffTask = autoOffSeconds > 0;
if (needAutoOffTask) {
scheduleAutoOff(deviceId, funcType, autoOffSeconds);
log.debug("【自动关任务】标记需要执行deviceId={}, funcType={}, delay={}s", deviceId, funcType, autoOffSeconds);
}
}
// ========== 关键设备回执完全不写入Redis ==========
@ -330,10 +337,9 @@ public class MqttMessageHandler implements SmartLifecycle {
}
}
// 第三步仅处理非回执的设备状态包且仅当是8个功能码结构时写入Redis
// 第三步仅处理非回执的设备状态包且仅当是8个功能码结构+需要执行自动关任务时写入Redis
// 有没有人订阅都得写,只要发送设备开的指令成功了就得写
if (!isAck) {
boolean needWriteLatest = false;
// 1) 先校验状态包是否包含8个固定功能码核心只有这种结构才写入
boolean isValidStatus = true;
for (String validCode : VALID_FUNC_CODES) {
@ -346,29 +352,21 @@ public class MqttMessageHandler implements SmartLifecycle {
log.debug("【设备状态包】结构不合法非8个功能码跳过Redis写入deviceId={}payload={}", deviceId,payload);
}
// 2) 有人订阅时才写latest
Set<String> subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId);
if (subscribedClients != null && !subscribedClients.isEmpty()) {
needWriteLatest = true;
}
// 3) 或者存在该设备的自动关任务时也写latest
if (!needWriteLatest && hasAutoOffTask(deviceId)) {
needWriteLatest = true;
}
// 4) 满足条件则写入完整的8功能码JSON到Redis
if (needWriteLatest) {
// 2) 核心修改仅当需要执行自动关任务时才判断是否写入Redis
// 条件1当前已标记需要执行新的自动关任务
// 条件2或已有未完成的自动关任务避免任务执行中状态丢失
// 最终写入条件:有自动关任务(必要条件) + 有人订阅(可选条件,可按需删除)
// 3) 满足条件则写入完整的8功能码JSON到Redis
if (needAutoOffTask || hasAutoOffTask(deviceId)) {
stringRedisTemplate.opsForValue().set(
"device:latest:" + deviceId,
payload, // 完整的8功能码JSON
latestTtlSeconds,
TimeUnit.SECONDS
);
// log.debug("【设备状态包】写入Redis成功deviceId={}", deviceId);
log.debug("【设备状态包】因需要执行自动关任务写入Redis成功deviceId={}", deviceId);
}
}
// 非回执消息:正常转发给订阅前端
// 查询Redis中订阅该设备的前端列表sub:{deviceId}
Set<String> subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId);