From 36ab2096b43506a5e0655dfdaff04d06c6ac7bf1 Mon Sep 17 00:00:00 2001 From: xce Date: Sat, 17 Jan 2026 02:46:12 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8D=95=E7=BA=BF=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/resources/application-mqtt.yml | 4 +- .../interceptor/MqttMessageHandler.java | 185 +++++++++++++++++- 2 files changed, 186 insertions(+), 3 deletions(-) diff --git a/agri-admin/src/main/resources/application-mqtt.yml b/agri-admin/src/main/resources/application-mqtt.yml index b17fa55..2530d68 100644 --- a/agri-admin/src/main/resources/application-mqtt.yml +++ b/agri-admin/src/main/resources/application-mqtt.yml @@ -5,8 +5,10 @@ spring: username: admin # Mosquitto共用账号 password: Admin#12345678 # Mosquitto密码 client-id: springboot-backend # 截取UUID前8位(自动去横线) - default-topic: dtu/+/up,frontend/+/control/+ # 后端监听的主题 + default-topic: dtu/+/up,frontend/+/control/+,frontend/+/online # 后端监听的主题 qos: 1 # 消息可靠性 timeout: 60 # 连接超时 keep-alive: 60 # 心跳间隔 + auto-off-seconds: 30 #自动关延迟秒数。 + latest-ttl-seconds: 120 #设备最新状态缓存的过期时间(秒)。 \ No newline at end of file diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java index c700554..a6c60af 100644 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java @@ -1,6 +1,8 @@ package com.agri.framework.interceptor; import com.agri.framework.config.MqttConfig; +import com.agri.system.domain.SysAgriLimit; +import com.agri.system.service.ISysAgriLimitService; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.TypeReference; @@ -12,6 +14,7 @@ import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.SmartLifecycle; import org.springframework.data.redis.connection.RedisConnection; @@ -26,12 +29,18 @@ import java.nio.charset.StandardCharsets; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; /** * MQTT消息处理器(无心跳包版本) @@ -78,6 +87,14 @@ public class MqttMessageHandler implements SmartLifecycle { @Value("${spring.mqtt.default-topic}") private String defaultTopic; + // 新增:自动关延迟秒数(固定多少秒-n秒) + @Value("${spring.mqtt.auto-off-seconds:30}") + private int autoOffSeconds; + + // 新增:最新状态缓存TTL(设备每10秒上报一次,缓存一小段时间即可) + @Value("${spring.mqtt.latest-ttl-seconds:120}") + private int latestTtlSeconds; + // 优化:统一使用SLF4J日志(JDK 8兼容) private static final Logger log = LoggerFactory.getLogger(MqttMessageHandler.class); @@ -90,6 +107,29 @@ public class MqttMessageHandler implements SmartLifecycle { @Resource private MqttConnectOptions mqttConnectOptions; + // 新增:自动关任务线程池(单线程,避免并发执行) + private final ScheduledExecutorService autoOffExecutor = Executors.newSingleThreadScheduledExecutor(); + + // 新增:同设备同功能只保留最后一次自动关任务 + private final ConcurrentHashMap> autoOffFutureMap = new ConcurrentHashMap<>(); + + @Autowired + private ISysAgriLimitService agriLimitService; + + + // 初始化映射(建议放在类初始化块/构造方法中,只初始化一次) + private static final Map> LIMIT_MAP = new HashMap<>(); + static { + LIMIT_MAP.put("jm1g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1gLimit()))); + LIMIT_MAP.put("jm2g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2gLimit()))); + LIMIT_MAP.put("jbg", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbgLimit()))); + LIMIT_MAP.put("jm3g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3gLimit()))); + LIMIT_MAP.put("jm2k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2kLimit()))); + LIMIT_MAP.put("jm3k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3kLimit()))); + LIMIT_MAP.put("jbk", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbkLimit()))); + LIMIT_MAP.put("jm1k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1kLimit()))); + } + /** * 初始化:订阅主题+设置回调 * (移除@PostConstruct,改为由SmartLifecycle的start()触发) @@ -234,12 +274,21 @@ public class MqttMessageHandler implements SmartLifecycle { // 第二步:判断是否为设备回执({"suc":true/false,"prop":{"功能码":指令}}) String funcType = null; + Integer funcValue = null; + boolean isAck = false; + if (payloadObj.containsKey("suc") && payloadObj.containsKey("prop")) { + isAck = true; JSONObject propObj = payloadObj.getJSONObject("prop"); if (propObj != null && !propObj.isEmpty()) { // 提取prop中的第一个功能码 Map.Entry propEntry = propObj.entrySet().iterator().next(); funcType = propEntry.getKey(); + try { + funcValue = Integer.parseInt(String.valueOf(propEntry.getValue())); + } catch (Exception ignore) { + } + // 释放对应功能的分布式锁 String lockKey = "lock:" + deviceId + ":" + funcType; Boolean delete = stringRedisTemplate.delete(lockKey); @@ -248,6 +297,23 @@ public class MqttMessageHandler implements SmartLifecycle { } log.info("【设备回执】设备{}的{}功能执行完成,已释放锁:{},{}", deviceId, funcType, lockKey, delete); + // 新增:回执固定是{"suc":true,"prop":{"jm1k":1}} + boolean suc = payloadObj.getBooleanValue("suc"); + if (suc && StringUtils.hasText(funcType) && funcValue != null && funcValue == 1) { + + SysAgriLimit agriLimit = agriLimitService.lambdaQuery() + .eq(SysAgriLimit::getImei, deviceId) + .one(); + int autoOffSeconds = 0; + + if (agriLimit != null) { + autoOffSeconds = LIMIT_MAP.getOrDefault(funcType, k -> 0).apply(agriLimit); + } + + // 新增:回执成功且值=1(表示运行/开启)时,起个任务,固定多少秒-n秒 + scheduleAutoOff(deviceId, funcType, autoOffSeconds); + } + // 广播回执结果给所有订阅该设备的前端 // String broadcastTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";; // JSONObject ackPayload = new JSONObject(); @@ -259,6 +325,26 @@ public class MqttMessageHandler implements SmartLifecycle { } } + // 新增:设备每10秒上报一次状态包,写入latest供自动关任务读取(只在需要时写,减少消耗) + if (!isAck) { + boolean needWriteLatest = false; + + // 1) 有人订阅时才写latest + Set subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId); + if (subscribedClients != null && !subscribedClients.isEmpty()) { + needWriteLatest = true; + } + + // 2) 或者存在该设备的自动关任务时也写latest(任务需要最新状态判断) + if (!needWriteLatest && hasAutoOffTask(deviceId)) { + needWriteLatest = true; + } + + if (needWriteLatest) { + stringRedisTemplate.opsForValue().set("device:latest:" + deviceId, payload, latestTtlSeconds, TimeUnit.SECONDS); + } + } + // 非回执消息:正常转发给订阅前端 // if (!isDeviceAck) { // 查询Redis中订阅该设备的前端列表:sub:{deviceId} @@ -281,6 +367,90 @@ public class MqttMessageHandler implements SmartLifecycle { // } } + // 新增:是否存在该设备的自动关任务 + private boolean hasAutoOffTask(String deviceId) { + if (!StringUtils.hasText(deviceId)) { + return false; + } + String prefix = "autooff:" + deviceId + ":"; + for (String key : autoOffFutureMap.keySet()) { + if (key != null && key.startsWith(prefix)) { + ScheduledFuture f = autoOffFutureMap.get(key); + if (f != null && !f.isCancelled() && !f.isDone()) { + return true; + } + } + } + return false; + } + + // 新增:起个任务,固定多少秒-n秒,【监听最新的设备状态,如果还在运行】,发送设备关的指令 + private void scheduleAutoOff(String deviceId, String funcType, int delaySeconds) { + if (!StringUtils.hasText(deviceId) || !StringUtils.hasText(funcType) || delaySeconds <= 0) { + return; + } + String taskKey = "autooff:" + deviceId + ":" + funcType; + + // 同设备同功能只保留最后一次 + ScheduledFuture old = autoOffFutureMap.remove(taskKey); + if (old != null) { + old.cancel(false); + } + + ScheduledFuture future = autoOffExecutor.schedule(() -> { + try { + runAutoOff(deviceId, funcType); + } catch (Exception e) { + log.error("【自动关任务】执行失败,deviceId={}, funcType={}", deviceId, funcType, e); + } finally { + autoOffFutureMap.remove(taskKey); + } + }, delaySeconds, TimeUnit.SECONDS); + + autoOffFutureMap.put(taskKey, future); + log.info("【自动关任务】已创建:deviceId={}, funcType={}, delay={}s", deviceId, funcType, delaySeconds); + } + + // 新增:读取最新状态(device:latest:{deviceId}),若仍为1则下发 {"funcType":0} 到 dtu/{id}/down + private void runAutoOff(String deviceId, String funcType) throws MqttException { + String latest = stringRedisTemplate.opsForValue().get("device:latest:" + deviceId); + if (!StringUtils.hasText(latest)) { + log.warn("【自动关任务】无最新状态,跳过:deviceId={}, funcType={}", deviceId, funcType); + return; + } + + JSONObject latestObj; + try { + latestObj = JSON.parseObject(latest); + } catch (Exception e) { + log.warn("【自动关任务】最新状态JSON解析失败,跳过:deviceId={}, funcType={}", deviceId, funcType); + return; + } + if (latestObj == null || latestObj.isEmpty()) { + return; + } + + // 设备每10秒上报的状态包:{"jm1k":0/1,...} 顶层字段直接取 + Integer current = null; + try { + if (latestObj.containsKey(funcType)) { + current = latestObj.getIntValue(funcType); + } + } catch (Exception ignore) { + } + + if (current != null && current == 1) { + JSONObject down = new JSONObject(); + down.put(funcType, 0); + + String deviceTopic = "dtu/" + deviceId + "/down"; + mqttMessageSender.publish(deviceTopic, down.toJSONString()); + log.info("【自动关任务】检测仍在运行,已下发关闭:deviceId={}, funcType={}, payload={}", deviceId, funcType, down.toJSONString()); + } else { + log.info("【自动关任务】检测未运行或状态未知,跳过关闭:deviceId={}, funcType={}, current={}", deviceId, funcType, current); + } + } + /** * 处理前端控制指令:权限校验+分布式锁+转发给设备 */ @@ -504,7 +674,7 @@ public class MqttMessageHandler implements SmartLifecycle { /** * 手动触发MQTT重连(最小改动:不替换client实例,只用同一个client重连) */ - public synchronized String manualReconnect() { + public synchronized String manualReconnect() { isRunning.set(true); try { // 强制断开旧连接(如果存在) @@ -560,6 +730,17 @@ public class MqttMessageHandler implements SmartLifecycle { // 修复:JDK 8正确的compareAndSet写法(无命名参数) if (isRunning.compareAndSet(true, false)) { try { + // 新增:关闭自动关任务线程池,避免线程泄漏 + autoOffExecutor.shutdown(); + try { + if (!autoOffExecutor.awaitTermination(3, TimeUnit.SECONDS)) { + autoOffExecutor.shutdownNow(); + } + } catch (InterruptedException ignore) { + autoOffExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + if (mqttClient != null) { // 注意:disconnect 只在已连接时调用;close 尽量无条件释放资源 if (mqttClient.isConnected()) { @@ -606,4 +787,4 @@ public class MqttMessageHandler implements SmartLifecycle { public boolean isAutoStartup() { return true; } -} \ No newline at end of file +}