From 28c357ddf5f9f74d402475cf13449d21fba7b366 Mon Sep 17 00:00:00 2001 From: lld <15027638633@163.com> Date: Sun, 1 Feb 2026 17:59:44 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9Eack=E4=B8=BB=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/resources/application-mqtt.yml | 2 +- .../interceptor/DeviceAckHandler.java | 171 ++++++++++++++++++ .../interceptor/DeviceStatusHandler.java | 145 ++------------- .../framework/manager/MqttAutoOffManager.java | 14 ++ .../web/dispatcher/MqttMessageDispatcher.java | 11 +- 5 files changed, 208 insertions(+), 135 deletions(-) create mode 100644 agri-framework/src/main/java/com/agri/framework/interceptor/DeviceAckHandler.java diff --git a/agri-admin/src/main/resources/application-mqtt.yml b/agri-admin/src/main/resources/application-mqtt.yml index 4b14853..59f3ace 100644 --- a/agri-admin/src/main/resources/application-mqtt.yml +++ b/agri-admin/src/main/resources/application-mqtt.yml @@ -13,4 +13,4 @@ spring: # 自动关闭任务线程池大小 auto-off-thread-pool-size: 5 subc-ttl-seconds: 3600 # 在线新跳ttl - dtu-ctl-lock-ttl: 60 \ No newline at end of file + dtu-ctl-lock-ttl: 20 \ No newline at end of file diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceAckHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceAckHandler.java new file mode 100644 index 0000000..11fbdba --- /dev/null +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceAckHandler.java @@ -0,0 +1,171 @@ +package com.agri.framework.interceptor; + +import com.agri.framework.manager.MqttAutoOffManager; +import com.agri.system.domain.SysAgriLimit; +import com.agri.system.domain.SysDevOperLog; +import com.agri.system.service.ISysAgriLimitService; +import com.agri.system.service.ISysDevOperLogService; +import com.alibaba.fastjson2.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; + +import javax.annotation.Resource; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +@Component +public class DeviceAckHandler { + /** + * 优化:统一使用SLF4J日志(JDK 8兼容) + */ + private static final Logger log = LoggerFactory.getLogger(DeviceAckHandler.class); + + /** + * Redis模板,用于存储订阅关系、设备在线状态、分布式锁 + */ + @Resource + private StringRedisTemplate stringRedisTemplate; + + /** + * 自动关任务管理器,调度/取消自动关任务 + */ + @Resource + private MqttAutoOffManager mqttAutoOffManager; + + /** + * 农业限制服务,查询设备自动关延迟配置 + */ + @Resource + private ISysAgriLimitService agriLimitService; + + // 新增:最新状态缓存TTL(设备每10秒上报一次,缓存一小段时间即可) + @Value("${spring.mqtt.latest-ttl-seconds:120}") + private int latestTtlSeconds; + + @Autowired + private ISysDevOperLogService sysDevOperLogService; + + + // 初始化映射(建议放在类初始化块/构造方法中,只初始化一次) + private static final Map> LIMIT_MAP = new HashMap<>(); + private static final Set VALID_FUNC_CODES = new HashSet<>(); + static { + LIMIT_MAP.put("jm1g1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1gLimit()))); + LIMIT_MAP.put("jm2g1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2gLimit()))); + LIMIT_MAP.put("jbg1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbgLimit()))); + LIMIT_MAP.put("jm3g1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3gLimit()))); + LIMIT_MAP.put("jm2k1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2kLimit()))); + LIMIT_MAP.put("jm3k1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3kLimit()))); + LIMIT_MAP.put("jbk1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbkLimit()))); + LIMIT_MAP.put("jm1k1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1kLimit()))); + + VALID_FUNC_CODES.add("jm1g"); + VALID_FUNC_CODES.add("jm2g"); + VALID_FUNC_CODES.add("jbg"); + VALID_FUNC_CODES.add("jm3g"); + VALID_FUNC_CODES.add("jm2k"); + VALID_FUNC_CODES.add("jm3k"); + VALID_FUNC_CODES.add("jbk"); + VALID_FUNC_CODES.add("jm1k"); + } + + public void isStartAutoOffTask(JSONObject payloadObj, String deviceId, String payload) { + // 第二步:判断是否为设备回执({"suc":true/false,"prop":{"功能码":指令}}) + String funcType = null; + Integer funcValue = null; + boolean isAck = false; + // 新增:标记是否需要执行自动关任务(全局可用) + // 第二步:设备回执处理逻辑(完全移除Redis写入) + if (payloadObj.containsKey("suc") && payloadObj.containsKey("prop")) { + isAck = true; + JSONObject propObj = payloadObj.getJSONObject("prop"); + if (propObj != null && !propObj.isEmpty()) { + // 提取prop中的第一个功能码 + Map.Entry propEntry = propObj.entrySet().iterator().next(); + funcType = propEntry.getKey(); + try { + funcValue = Integer.parseInt(String.valueOf(propEntry.getValue())); + } catch (Exception ignore) { + } + + // 释放对应功能的分布式锁 + String lockKey = "lock:" + deviceId + ":" + funcType; + Boolean delete = stringRedisTemplate.delete(lockKey); + if (propObj.size() > 1) { + log.warn("【设备回执】prop包含多个功能码,仅处理第一个:{}", propObj.keySet()); + } + log.info("【设备回执】设备{}的{}功能执行完成,已释放锁:{},{}", deviceId, funcType, lockKey, delete); + int runTime = 0; + // 回执成功且值=1时启动自动关闭任务(保留原有逻辑) + boolean suc = payloadObj.getBooleanValue("suc"); + if (suc && StringUtils.hasText(funcType) && funcValue != null && funcValue == 1) { + SysAgriLimit agriLimit = agriLimitService.lambdaQuery() + .eq(SysAgriLimit::getImei, deviceId) + .one(); + int autoOffSeconds = 0; + if (agriLimit != null) { + autoOffSeconds = LIMIT_MAP.getOrDefault(funcType, k -> 0).apply(agriLimit); + } + runTime = autoOffSeconds; + // 新增:判断是否真的需要执行自动关任务(延迟秒数>0才是有效任务) + if (autoOffSeconds > 0) { + mqttAutoOffManager.scheduleAutoOff(deviceId, funcType, autoOffSeconds); + log.debug("【自动关任务】标记需要执行,deviceId={}, funcType={}, delay={}s", deviceId, funcType, autoOffSeconds); + } + } + if (suc && StringUtils.hasText(funcType) && funcValue != null && funcValue == 0) { + mqttAutoOffManager.cancelAutoOff(deviceId, funcType); + } + sysDevOperLogService.lambdaUpdate() + .eq(SysDevOperLog::getImei, deviceId) + .eq(SysDevOperLog::getFuncCode, funcType) + .eq(SysDevOperLog::getOpType, funcValue) + .eq(SysDevOperLog::getLockAcquired,1) + .orderByDesc(SysDevOperLog::getCreateTime) + .last("LIMIT 1") + .set(SysDevOperLog::getAckReceived,1) + .set(SysDevOperLog::getAckSuc, suc?1:0) + .set(SysDevOperLog::getIsLockSuc,delete?1:0) + .set(SysDevOperLog::getIsTask,runTime > 0?1:0) + .set(SysDevOperLog::getRunTime, runTime) + .set(SysDevOperLog::getNoTaskReason,runTime > 0?null:"【自动关任务】标记不符合执行运行时间未配置,当前运行时间:【"+runTime+" s】") + .set(SysDevOperLog::getUpdateBy,"设备回执") + .set(SysDevOperLog::getAck, payload) + .update(); + } + } + + // 第三步:仅处理非回执的设备状态包,且仅当是8个功能码结构就写入Redis + // 有没有人订阅都得写,只要发送设备开的指令成功了就得写 + if (!isAck) { + // 1) 先校验状态包是否包含8个固定功能码(核心:只有这种结构才写入) + boolean isValidStatus = true; + for (String validCode : VALID_FUNC_CODES) { + if (!payloadObj.containsKey(validCode)) { + isValidStatus = false; + // log.debug("【设备状态包】结构不合法(非8个功能码),跳过Redis写入,deviceId={};payload={}", deviceId, payload); + break; + } + } + if (mqttAutoOffManager.hasAutoOffTask(deviceId) && isValidStatus) { + // ✅ 8个功能码状态包:无条件写device:latest:{deviceId},避免自动关读不到最新状态 + stringRedisTemplate.opsForValue().set( + "device:latest:" + deviceId, + payload, // 完整的8功能码JSON + latestTtlSeconds, + TimeUnit.SECONDS + ); + // log.debug("【设备状态包】写入Redis成功,deviceId={}", deviceId); + } + } + } +} diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceStatusHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceStatusHandler.java index e517e2f..abcc735 100644 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceStatusHandler.java +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceStatusHandler.java @@ -11,6 +11,7 @@ import com.agri.system.service.ISysDevOperLogService; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import org.apache.commons.lang3.ObjectUtils; +import org.checkerframework.checker.units.qual.A; import org.eclipse.paho.client.mqttv3.MqttException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,47 +65,10 @@ public class DeviceStatusHandler { @Resource private MqttSubscriptionManager mqttSubscriptionManager; - /** - * 自动关任务管理器,调度/取消自动关任务 - */ - @Resource - private MqttAutoOffManager mqttAutoOffManager; - /** - * 农业限制服务,查询设备自动关延迟配置 - */ - @Resource - private ISysAgriLimitService agriLimitService; - - // 新增:最新状态缓存TTL(设备每10秒上报一次,缓存一小段时间即可) - @Value("${spring.mqtt.latest-ttl-seconds:120}") - private int latestTtlSeconds; @Autowired - private ISysDevOperLogService sysDevOperLogService; - - // 初始化映射(建议放在类初始化块/构造方法中,只初始化一次) - private static final Map> LIMIT_MAP = new HashMap<>(); - private static final Set VALID_FUNC_CODES = new HashSet<>(); - static { - LIMIT_MAP.put("jm1g1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1gLimit()))); - LIMIT_MAP.put("jm2g1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2gLimit()))); - LIMIT_MAP.put("jbg1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbgLimit()))); - LIMIT_MAP.put("jm3g1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3gLimit()))); - LIMIT_MAP.put("jm2k1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2kLimit()))); - LIMIT_MAP.put("jm3k1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3kLimit()))); - LIMIT_MAP.put("jbk1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbkLimit()))); - LIMIT_MAP.put("jm1k1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1kLimit()))); - - VALID_FUNC_CODES.add("jm1g"); - VALID_FUNC_CODES.add("jm2g"); - VALID_FUNC_CODES.add("jbg"); - VALID_FUNC_CODES.add("jm3g"); - VALID_FUNC_CODES.add("jm2k"); - VALID_FUNC_CODES.add("jm3k"); - VALID_FUNC_CODES.add("jbk"); - VALID_FUNC_CODES.add("jm1k"); - } + private DeviceAckHandler deviceAckHandler; /** * 处理设备状态:转发给订阅的前端、处理回执、触发自动关 @@ -129,105 +93,19 @@ public class DeviceStatusHandler { // 解析设备ID:主题格式为dtu/{deviceId}/up,分割后第2个元素是设备ID String deviceId = extractDeviceId(topic); if (deviceId == null) return; + + String[] segments = topic.split("/"); + String action = segments[2]; // 转发消息 - forwardPayload(deviceId, payload,payloadObj); + forwardPayload(deviceId, payload,payloadObj,action); - isStartAutoOffTask(payloadObj,deviceId,payload); - - } - - private void isStartAutoOffTask(JSONObject payloadObj,String deviceId,String payload) { - // 第二步:判断是否为设备回执({"suc":true/false,"prop":{"功能码":指令}}) - String funcType = null; - Integer funcValue = null; - boolean isAck = false; - // 新增:标记是否需要执行自动关任务(全局可用) - // 第二步:设备回执处理逻辑(完全移除Redis写入) - if (payloadObj.containsKey("suc") && payloadObj.containsKey("prop")) { - isAck = true; - JSONObject propObj = payloadObj.getJSONObject("prop"); - if (propObj != null && !propObj.isEmpty()) { - // 提取prop中的第一个功能码 - Map.Entry propEntry = propObj.entrySet().iterator().next(); - funcType = propEntry.getKey(); - try { - funcValue = Integer.parseInt(String.valueOf(propEntry.getValue())); - } catch (Exception ignore) { - } - - // 释放对应功能的分布式锁 - String lockKey = "lock:" + deviceId + ":" + funcType; - Boolean delete = stringRedisTemplate.delete(lockKey); - if (propObj.size() > 1) { - log.warn("【设备回执】prop包含多个功能码,仅处理第一个:{}", propObj.keySet()); - } - log.info("【设备回执】设备{}的{}功能执行完成,已释放锁:{},{}", deviceId, funcType, lockKey, delete); - int runTime = 0; - // 回执成功且值=1时启动自动关闭任务(保留原有逻辑) - boolean suc = payloadObj.getBooleanValue("suc"); - if (suc && StringUtils.hasText(funcType) && funcValue != null && funcValue == 1) { - SysAgriLimit agriLimit = agriLimitService.lambdaQuery() - .eq(SysAgriLimit::getImei, deviceId) - .one(); - int autoOffSeconds = 0; - if (agriLimit != null) { - autoOffSeconds = LIMIT_MAP.getOrDefault(funcType, k -> 0).apply(agriLimit); - } - runTime = autoOffSeconds; - // 新增:判断是否真的需要执行自动关任务(延迟秒数>0才是有效任务) - if (autoOffSeconds > 0) { - mqttAutoOffManager.scheduleAutoOff(deviceId, funcType, autoOffSeconds); - log.debug("【自动关任务】标记需要执行,deviceId={}, funcType={}, delay={}s", deviceId, funcType, autoOffSeconds); - } - } - if (suc && StringUtils.hasText(funcType) && funcValue != null && funcValue == 0) { - mqttAutoOffManager.cancelAutoOff(deviceId, funcType); - } - sysDevOperLogService.lambdaUpdate() - .eq(SysDevOperLog::getImei, deviceId) - .eq(SysDevOperLog::getFuncCode, funcType) - .eq(SysDevOperLog::getOpType, funcValue) - .eq(SysDevOperLog::getLockAcquired,1) - .orderByDesc(SysDevOperLog::getCreateTime) - .last("LIMIT 1") - .set(SysDevOperLog::getAckReceived,1) - .set(SysDevOperLog::getAckSuc, suc?1:0) - .set(SysDevOperLog::getIsLockSuc,delete?1:0) - .set(SysDevOperLog::getIsTask,runTime > 0?1:0) - .set(SysDevOperLog::getRunTime, runTime) - .set(SysDevOperLog::getNoTaskReason,runTime > 0?null:"【自动关任务】标记不符合执行运行时间未配置,当前运行时间:【"+runTime+" s】") - .set(SysDevOperLog::getUpdateBy,"设备回执") - .set(SysDevOperLog::getAck, payload) - .update(); - } - } - - // 第三步:仅处理非回执的设备状态包,且仅当是8个功能码结构就写入Redis - // 有没有人订阅都得写,只要发送设备开的指令成功了就得写 - if (!isAck) { - // 1) 先校验状态包是否包含8个固定功能码(核心:只有这种结构才写入) - boolean isValidStatus = true; - for (String validCode : VALID_FUNC_CODES) { - if (!payloadObj.containsKey(validCode)) { - isValidStatus = false; - // log.debug("【设备状态包】结构不合法(非8个功能码),跳过Redis写入,deviceId={};payload={}", deviceId, payload); - break; - } - } - if (mqttAutoOffManager.hasAutoOffTask(deviceId) && isValidStatus) { - // ✅ 8个功能码状态包:无条件写device:latest:{deviceId},避免自动关读不到最新状态 - stringRedisTemplate.opsForValue().set( - "device:latest:" + deviceId, - payload, // 完整的8功能码JSON - latestTtlSeconds, - TimeUnit.SECONDS - ); - // log.debug("【设备状态包】写入Redis成功,deviceId={}", deviceId); - } + // 获取第二个动态段,如"up"或"ack" + if ("ack".equals(action)) { + deviceAckHandler.isStartAutoOffTask(payloadObj,deviceId,payload); } } - private void forwardPayload(String deviceId,String payload,JSONObject payloadObj) { + private void forwardPayload(String deviceId,String payload,JSONObject payloadObj,String action) { try { boolean isAck = payloadObj.containsKey("suc") && payloadObj.containsKey("prop"); JSONObject sendObj = payloadObj; // 默认直接用原对象 @@ -274,6 +152,9 @@ public class DeviceStatusHandler { } // 前端专属主题:frontend/{clientId}/dtu/{deviceId}/listener String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; + if ("ack".equals(action)) { + frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/ack"; + } // 发布消息 mqttMessageSender.publish(frontendTopic, sendObj.toJSONString()); // log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic); diff --git a/agri-framework/src/main/java/com/agri/framework/manager/MqttAutoOffManager.java b/agri-framework/src/main/java/com/agri/framework/manager/MqttAutoOffManager.java index 6aed93d..0f51941 100644 --- a/agri-framework/src/main/java/com/agri/framework/manager/MqttAutoOffManager.java +++ b/agri-framework/src/main/java/com/agri/framework/manager/MqttAutoOffManager.java @@ -316,6 +316,20 @@ public class MqttAutoOffManager { if (!oldFuture.cancel(false)) { return; } + sysDevOperLogService.lambdaUpdate() + .eq(SysDevOperLog::getImei, deviceId) + .eq(SysDevOperLog::getFuncCode, funcType) + .eq(SysDevOperLog::getOpType, 1) + .eq(SysDevOperLog::getOpSource, 1) + .eq(SysDevOperLog::getAckSuc, 1) + .eq(SysDevOperLog::getIsTask, 1) + .orderByDesc(SysDevOperLog::getCreateTime) + .last("LIMIT 2") + .set(SysDevOperLog::getExecResult,2) + .set(SysDevOperLog::getLatestState, "{\"msg\":\"任务已退出\"}") + .set(SysDevOperLog::getUpdateBy,"自动关") + .set(SysDevOperLog::getSkipReason, "旧任务已退出") + .update(); // cancel成功:旧任务不会跑了,这时再remove并减计数 autoOffFutureMap.remove(taskKey, oldFuture); decAutoOffCnt(deviceId); diff --git a/agri-framework/src/main/java/com/agri/framework/web/dispatcher/MqttMessageDispatcher.java b/agri-framework/src/main/java/com/agri/framework/web/dispatcher/MqttMessageDispatcher.java index 734e164..1f83c80 100644 --- a/agri-framework/src/main/java/com/agri/framework/web/dispatcher/MqttMessageDispatcher.java +++ b/agri-framework/src/main/java/com/agri/framework/web/dispatcher/MqttMessageDispatcher.java @@ -1,6 +1,7 @@ package com.agri.framework.web.dispatcher; import com.agri.common.utils.wechat.WxUtil; +import com.agri.framework.interceptor.DeviceAckHandler; import com.agri.framework.interceptor.DeviceStatusHandler; import com.agri.framework.interceptor.FrontendControlHandler; import com.agri.framework.interceptor.FrontendOnlineHandler; @@ -40,6 +41,13 @@ public class MqttMessageDispatcher { @Resource private FrontendOnlineHandler frontendOnlineHandler; + + /** + * 前端在线心跳处理器 + */ + @Resource + private DeviceAckHandler deviceAckHandler; + /** * 消息分发处理:根据主题类型路由到不同处理方法 * 可以监听到设备传过来的业务数据 以及前端传过来的控制设备指令 @@ -52,7 +60,7 @@ public class MqttMessageDispatcher { // log.info("【MQTT接收】topic={}, payload={}", topic, payload); // 设备状态主题:dtu/{deviceId}/up - if (topic.matches("dtu/\\w+/up")) { + if (topic.matches("dtu/\\w+/\\w")) { deviceStatusHandler.handle(topic, payload); } // 处理前端控制指令主题:frontend/{clientId}/control/{deviceId} @@ -63,7 +71,6 @@ public class MqttMessageDispatcher { else if (topic.matches("frontend/\\w+/online")) { frontendOnlineHandler.handle(topic, payload); } - // todo 是否加回复主题?? } catch (Exception e) { WxUtil.pushText("【MQTT消息处理异常】\n topic: "+ topic+"\n cause: "+e); log.error("【MQTT消息处理异常】topic={}", topic, e);