From b55e29045dad9c0d38bcda4d25ee01798f084339 Mon Sep 17 00:00:00 2001 From: xce Date: Sat, 17 Jan 2026 06:27:09 +0800 Subject: [PATCH] =?UTF-8?q?mqtt=E5=BB=B6=E6=97=B6=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/monitor/CacheController.java | 1 + .../agri/common/constant/CacheConstants.java | 1 + .../com/agri/framework/config/MqttConfig.java | 2 +- .../interceptor/MqttMessageHandler.java | 43 +++++++++++++------ 4 files changed, 34 insertions(+), 13 deletions(-) diff --git a/agri-admin/src/main/java/com/agri/web/controller/monitor/CacheController.java b/agri-admin/src/main/java/com/agri/web/controller/monitor/CacheController.java index 4c6fef0..0025280 100644 --- a/agri-admin/src/main/java/com/agri/web/controller/monitor/CacheController.java +++ b/agri-admin/src/main/java/com/agri/web/controller/monitor/CacheController.java @@ -46,6 +46,7 @@ public class CacheController caches.add(new SysCache(CacheConstants.PWD_ERR_CNT_KEY, "密码错误次数")); caches.add(new SysCache(CacheConstants.SUB_CLIENT_ID, "Redis订阅关系")); caches.add(new SysCache(CacheConstants.SUB_IMEI, "Redis订阅关系【反向】")); + caches.add(new SysCache(CacheConstants.LOCK_IMEI_FUNC, "设备控制锁")); } @PreAuthorize("@ss.hasPermi('monitor:cache:list')") diff --git a/agri-common/src/main/java/com/agri/common/constant/CacheConstants.java b/agri-common/src/main/java/com/agri/common/constant/CacheConstants.java index 5013b6e..9107bfe 100644 --- a/agri-common/src/main/java/com/agri/common/constant/CacheConstants.java +++ b/agri-common/src/main/java/com/agri/common/constant/CacheConstants.java @@ -43,4 +43,5 @@ public class CacheConstants public static final String PWD_ERR_CNT_KEY = "pwd_err_cnt:"; public static final String SUB_CLIENT_ID = "subc:"; public static final String SUB_IMEI = "sub:"; + public static final String LOCK_IMEI_FUNC = "lock:"; } diff --git a/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java b/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java index 2dea7d4..bede747 100644 --- a/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java +++ b/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java @@ -212,7 +212,7 @@ public class MqttConfig { // 3. 发布消息 client.publish(topic, message); // 优化:替换System.out为日志框架,保留原有输出内容 - log.info("【MQTT消息发布成功】主题:" + topic + ",内容:" + payload); + // log.info("【MQTT消息发布成功】主题:" + topic + ",内容:" + payload); } } diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java index d54ffc0..b3b3d70 100644 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java @@ -220,7 +220,7 @@ public class MqttMessageHandler implements SmartLifecycle { @Override public void deliveryComplete(IMqttDeliveryToken token) { if (token != null && token.getTopics() != null && token.getTopics().length > 0) { - log.info("【MQTT确认】消息发布完成,clientId:{},主题:{}", safeClientId(), token.getTopics()[0]); + // log.info("【MQTT确认】消息发布完成,clientId:{},主题:{}", safeClientId(), token.getTopics()[0]); } } }); @@ -291,6 +291,7 @@ public class MqttMessageHandler implements SmartLifecycle { Integer funcValue = null; boolean isAck = false; + // 第二步:设备回执处理逻辑(完全移除Redis写入) if (payloadObj.containsKey("suc") && payloadObj.containsKey("prop")) { isAck = true; JSONObject propObj = payloadObj.getJSONObject("prop"); @@ -311,44 +312,60 @@ public class MqttMessageHandler implements SmartLifecycle { } log.info("【设备回执】设备{}的{}功能执行完成,已释放锁:{},{}", deviceId, funcType, lockKey, delete); - // 回执成功且值=1(表示运行/开启)时,起个任务,固定多少秒-n秒 - // 新增:回执固定是{"suc":true,"prop":{"jm1k":1}} + // 回执成功且值=1时启动自动关闭任务(保留原有逻辑) boolean suc = payloadObj.getBooleanValue("suc"); if (suc && StringUtils.hasText(funcType) && funcValue != null && funcValue == 1) { - SysAgriLimit agriLimit = agriLimitService.lambdaQuery() .eq(SysAgriLimit::getImei, deviceId) .one(); int autoOffSeconds = 0; - if (agriLimit != null) { autoOffSeconds = LIMIT_MAP.getOrDefault(funcType, k -> 0).apply(agriLimit); } - - // 启动自动关闭任务(多线程执行) - // 新增:回执成功且值=1(表示运行/开启)时,起个任务,固定多少秒-n秒 scheduleAutoOff(deviceId, funcType, autoOffSeconds); } + + // ========== 关键:设备回执完全不写入Redis ========== + // 移除所有回执相关的Redis写入逻辑 } } - // 设备每10秒上报一次状态包,写入latest供自动关任务读取 (只在需要时写,减少消耗) + // 第三步:仅处理非回执的设备状态包,且仅当是8个功能码结构时写入Redis if (!isAck) { boolean needWriteLatest = false; - // 1) 有人订阅时才写latest + // 1) 先校验状态包是否包含8个固定功能码(核心:只有这种结构才写入) + boolean isValidStatus = true; + for (String validCode : VALID_FUNC_CODES) { + if (!payloadObj.containsKey(validCode)) { + isValidStatus = false; + break; + } + } + if (!isValidStatus) { + log.debug("【设备状态包】结构不合法(非8个功能码),跳过Redis写入,deviceId={};payload={}", deviceId,payload); + } + + // 2) 有人订阅时才写latest Set subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId); if (subscribedClients != null && !subscribedClients.isEmpty()) { needWriteLatest = true; } - // 2) 或者存在该设备的自动关任务时也写latest(任务需要最新状态判断) + // 3) 或者存在该设备的自动关任务时也写latest if (!needWriteLatest && hasAutoOffTask(deviceId)) { needWriteLatest = true; } + // 4) 满足条件则写入完整的8功能码JSON到Redis if (needWriteLatest) { - stringRedisTemplate.opsForValue().set("device:latest:" + deviceId, payload, latestTtlSeconds, TimeUnit.SECONDS); + stringRedisTemplate.opsForValue().set( + "device:latest:" + deviceId, + payload, // 完整的8功能码JSON + latestTtlSeconds, + TimeUnit.SECONDS + ); + // log.debug("【设备状态包】写入Redis成功,deviceId={}", deviceId); } } @@ -425,6 +442,7 @@ public class MqttMessageHandler implements SmartLifecycle { private void runAutoOff(String deviceId, String funcType) throws MqttException { String latest = stringRedisTemplate.opsForValue().get("device:latest:" + deviceId); if (!StringUtils.hasText(latest)) { + //todo log.warn("【自动关任务】无最新状态,跳过:deviceId={}, funcType={}", deviceId, funcType); return; } @@ -457,6 +475,7 @@ public class MqttMessageHandler implements SmartLifecycle { //todo mqttMessageSender.publish(deviceTopic, down.toJSONString()); log.info("【自动关任务】检测仍在运行,已下发关闭:deviceId={}, funcType={}, payload={}", deviceId, funcType, down.toJSONString()); + Boolean deleteSuccess = stringRedisTemplate.delete("device:latest:" + deviceId); } else { log.info("【自动关任务】检测未运行或状态未知,跳过关闭:deviceId={}, funcType={}, current={}", deviceId, funcType, current); }