From 27243422687d0b72ad5af48da3c04dfb6f735979 Mon Sep 17 00:00:00 2001 From: lld <15027638633@163.com> Date: Sun, 8 Feb 2026 16:47:20 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A4=A7=E6=A3=9A=E5=85=B3=E8=81=94=E7=94=A8?= =?UTF-8?q?=E6=88=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../interceptor/DeviceAckHandler.java | 42 ++-------------- .../interceptor/DeviceStatusHandler.java | 49 ++++++++++++++++++- .../framework/manager/MqttAutoOffManager.java | 5 +- 3 files changed, 54 insertions(+), 42 deletions(-) diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceAckHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceAckHandler.java index 11fbdba..60f3f7c 100644 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceAckHandler.java +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceAckHandler.java @@ -15,10 +15,7 @@ import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import javax.annotation.Resource; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -47,10 +44,6 @@ public class DeviceAckHandler { @Resource private ISysAgriLimitService agriLimitService; - // 新增:最新状态缓存TTL(设备每10秒上报一次,缓存一小段时间即可) - @Value("${spring.mqtt.latest-ttl-seconds:120}") - private int latestTtlSeconds; - @Autowired private ISysDevOperLogService sysDevOperLogService; @@ -82,11 +75,9 @@ public class DeviceAckHandler { // 第二步:判断是否为设备回执({"suc":true/false,"prop":{"功能码":指令}}) String funcType = null; Integer funcValue = null; - boolean isAck = false; // 新增:标记是否需要执行自动关任务(全局可用) // 第二步:设备回执处理逻辑(完全移除Redis写入) if (payloadObj.containsKey("suc") && payloadObj.containsKey("prop")) { - isAck = true; JSONObject propObj = payloadObj.getJSONObject("prop"); if (propObj != null && !propObj.isEmpty()) { // 提取prop中的第一个功能码 @@ -125,6 +116,7 @@ public class DeviceAckHandler { if (suc && StringUtils.hasText(funcType) && funcValue != null && funcValue == 0) { mqttAutoOffManager.cancelAutoOff(deviceId, funcType); } + boolean isTask = (Objects.equals(funcValue, 1)) && (runTime > 0); sysDevOperLogService.lambdaUpdate() .eq(SysDevOperLog::getImei, deviceId) .eq(SysDevOperLog::getFuncCode, funcType) @@ -135,37 +127,13 @@ public class DeviceAckHandler { .set(SysDevOperLog::getAckReceived,1) .set(SysDevOperLog::getAckSuc, suc?1:0) .set(SysDevOperLog::getIsLockSuc,delete?1:0) - .set(SysDevOperLog::getIsTask,runTime > 0?1:0) - .set(SysDevOperLog::getRunTime, runTime) - .set(SysDevOperLog::getNoTaskReason,runTime > 0?null:"【自动关任务】标记不符合执行运行时间未配置,当前运行时间:【"+runTime+" s】") + .set(SysDevOperLog::getIsTask,isTask?1:0) + .set(isTask,SysDevOperLog::getRunTime, runTime) + .set(isTask,SysDevOperLog::getNoTaskReason,runTime > 0?null:"【自动关任务】标记不符合执行运行时间未配置,当前运行时间:【"+runTime+" s】") .set(SysDevOperLog::getUpdateBy,"设备回执") .set(SysDevOperLog::getAck, payload) .update(); } } - - // 第三步:仅处理非回执的设备状态包,且仅当是8个功能码结构就写入Redis - // 有没有人订阅都得写,只要发送设备开的指令成功了就得写 - if (!isAck) { - // 1) 先校验状态包是否包含8个固定功能码(核心:只有这种结构才写入) - boolean isValidStatus = true; - for (String validCode : VALID_FUNC_CODES) { - if (!payloadObj.containsKey(validCode)) { - isValidStatus = false; - // log.debug("【设备状态包】结构不合法(非8个功能码),跳过Redis写入,deviceId={};payload={}", deviceId, payload); - break; - } - } - if (mqttAutoOffManager.hasAutoOffTask(deviceId) && isValidStatus) { - // ✅ 8个功能码状态包:无条件写device:latest:{deviceId},避免自动关读不到最新状态 - stringRedisTemplate.opsForValue().set( - "device:latest:" + deviceId, - payload, // 完整的8功能码JSON - latestTtlSeconds, - TimeUnit.SECONDS - ); - // log.debug("【设备状态包】写入Redis成功,deviceId={}", deviceId); - } - } } } diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceStatusHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceStatusHandler.java index abcc735..895853d 100644 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceStatusHandler.java +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceStatusHandler.java @@ -65,11 +65,32 @@ public class DeviceStatusHandler { @Resource private MqttSubscriptionManager mqttSubscriptionManager; - - @Autowired private DeviceAckHandler deviceAckHandler; + + // 新增:最新状态缓存TTL(设备每10秒上报一次,缓存一小段时间即可) + @Value("${spring.mqtt.latest-ttl-seconds:120}") + private int latestTtlSeconds; + + /** + * 自动关任务管理器,调度/取消自动关任务 + */ + @Resource + private MqttAutoOffManager mqttAutoOffManager; + + private static final Set VALID_FUNC_CODES = new HashSet<>(); + static { + VALID_FUNC_CODES.add("jm1g"); + VALID_FUNC_CODES.add("jm2g"); + VALID_FUNC_CODES.add("jbg"); + VALID_FUNC_CODES.add("jm3g"); + VALID_FUNC_CODES.add("jm2k"); + VALID_FUNC_CODES.add("jm3k"); + VALID_FUNC_CODES.add("jbk"); + VALID_FUNC_CODES.add("jm1k"); + } + /** * 处理设备状态:转发给订阅的前端、处理回执、触发自动关 */ @@ -167,6 +188,30 @@ public class DeviceStatusHandler { // 优化:替换System.out为log.info // log.info("【设备状态转发】设备{}无订阅前端,跳过转发", deviceId); } + + // 第三步:仅处理非回执的设备状态包,且仅当是8个功能码结构就写入Redis + // 有没有人订阅都得写,只要发送设备开的指令成功了就得写 + if (!isAck) { + // 1) 先校验状态包是否包含8个固定功能码(核心:只有这种结构才写入) + boolean isValidStatus = true; + for (String validCode : VALID_FUNC_CODES) { + if (!payloadObj.containsKey(validCode)) { + isValidStatus = false; + // log.debug("【设备状态包】结构不合法(非8个功能码),跳过Redis写入,deviceId={};payload={}", deviceId, payload); + break; + } + } + if (mqttAutoOffManager.hasAutoOffTask(deviceId) && isValidStatus) { + // ✅ 8个功能码状态包:无条件写device:latest:{deviceId},避免自动关读不到最新状态 + stringRedisTemplate.opsForValue().set( + "device:latest:" + deviceId, + payload, // 完整的8功能码JSON + latestTtlSeconds, + TimeUnit.SECONDS + ); + // log.debug("【设备状态包】写入Redis成功,deviceId={}", deviceId); + } + } } catch (MqttException e) { WxUtil.pushText( "【消息转发失败】\n deviceId: "+deviceId+"\n payload: "+payload+"\n cause: "+e); diff --git a/agri-framework/src/main/java/com/agri/framework/manager/MqttAutoOffManager.java b/agri-framework/src/main/java/com/agri/framework/manager/MqttAutoOffManager.java index 0f51941..01bdf5a 100644 --- a/agri-framework/src/main/java/com/agri/framework/manager/MqttAutoOffManager.java +++ b/agri-framework/src/main/java/com/agri/framework/manager/MqttAutoOffManager.java @@ -216,7 +216,6 @@ public class MqttAutoOffManager { String skipReason = ""; if (!StringUtils.hasText(latest)) { //todo - skipReason = "【自动关任务】无最新状态"; log.warn("【自动关任务】无最新状态,跳过:deviceId={}, funcType={}", deviceId, funcType); return; } @@ -225,12 +224,12 @@ public class MqttAutoOffManager { try { latestObj = JSON.parseObject(latest); } catch (Exception e) { - skipReason = "【自动关任务】执行报错-解析异常"; WxUtil.pushText("自动关任务执行报错-解析异常:\n deviceId: " + deviceId + "\n funcType:" + funcType+"\n Cause: "+e); log.warn("【自动关任务】最新状态JSON解析失败,跳过:deviceId={}, funcType={}", deviceId, funcType); return; } if (latestObj == null || latestObj.isEmpty()) { + log.info("【自动关任务】最新状态为空"); skipReason = "【自动关任务】最新状态为空"; return; } @@ -260,7 +259,7 @@ public class MqttAutoOffManager { .set(SysDevOperLog::getExecResult,1) .set(SysDevOperLog::getLatestState, latest) .set(SysDevOperLog::getUpdateBy,"自动关") - .set(!skipReason.isEmpty(), SysDevOperLog::getSkipReason, skipReason) + .set(SysDevOperLog::getSkipReason, skipReason.isEmpty()?"解析报错":skipReason) .update(); if (current != null && current == 1) {