diff --git a/agri-admin/src/main/resources/application-mqtt.yml b/agri-admin/src/main/resources/application-mqtt.yml index 2530d68..e654987 100644 --- a/agri-admin/src/main/resources/application-mqtt.yml +++ b/agri-admin/src/main/resources/application-mqtt.yml @@ -11,4 +11,5 @@ spring: keep-alive: 60 # 心跳间隔 auto-off-seconds: 30 #自动关延迟秒数。 latest-ttl-seconds: 120 #设备最新状态缓存的过期时间(秒)。 - \ No newline at end of file + # 自动关闭任务线程池大小 + auto-off-thread-pool-size: 5 \ No newline at end of file diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java index a6c60af..e07cf2a 100644 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java @@ -51,16 +51,7 @@ import java.util.function.Function; * 4. 处理前端控制指令(权限校验+分布式锁+转发) * 适配JDK 8,无心跳包相关逻辑 * - * - * - * - * 前端监听: "frontend/" + clientId + "/dtu/" + deviceId + "/listener" - * 前端发布主题:frontend/+/control/+ - *

- * 【方案A改造说明(最小改动)】 - * 1) 不再自己new MqttClient 做重连,避免 mqttMessageSender 持有旧client导致“重连后发不出去” - * 2) 只依赖 MqttConnectOptions#setAutomaticReconnect(true) 的Paho自动重连 - * 3) 不再 unsubscribe 主题(cleanSession=false场景下会破坏会话订阅) + * 改造点:自动关闭任务改为多线程并行执行 */ @Component public class MqttMessageHandler implements SmartLifecycle { @@ -95,6 +86,10 @@ public class MqttMessageHandler implements SmartLifecycle { @Value("${spring.mqtt.latest-ttl-seconds:120}") private int latestTtlSeconds; + // 新增:自动关闭任务线程池核心线程数(可配置) + @Value("${spring.mqtt.auto-off-thread-pool-size:5}") + private int autoOffThreadPoolSize; + // 优化:统一使用SLF4J日志(JDK 8兼容) private static final Logger log = LoggerFactory.getLogger(MqttMessageHandler.class); @@ -107,8 +102,9 @@ public class MqttMessageHandler implements SmartLifecycle { @Resource private MqttConnectOptions mqttConnectOptions; - // 新增:自动关任务线程池(单线程,避免并发执行) - private final ScheduledExecutorService autoOffExecutor = Executors.newSingleThreadScheduledExecutor(); + // 改造:将单线程池改为固定线程池,支持多任务并行执行 + // 替代原有的 Executors.newSingleThreadScheduledExecutor() + private ScheduledExecutorService autoOffExecutor; // 新增:同设备同功能只保留最后一次自动关任务 private final ConcurrentHashMap> autoOffFutureMap = new ConcurrentHashMap<>(); @@ -116,7 +112,6 @@ public class MqttMessageHandler implements SmartLifecycle { @Autowired private ISysAgriLimitService agriLimitService; - // 初始化映射(建议放在类初始化块/构造方法中,只初始化一次) private static final Map> LIMIT_MAP = new HashMap<>(); static { @@ -173,7 +168,6 @@ public class MqttMessageHandler implements SmartLifecycle { */ @Override public void connectionLost(Throwable cause) { - // 优化:替换System.err为log.error log.error("【MQTT连接异常】连接断开,clientId:{},原因:{}", safeClientId(), (cause == null ? "unknown" : cause.getMessage()), cause); @@ -202,7 +196,6 @@ public class MqttMessageHandler implements SmartLifecycle { */ @Override public void deliveryComplete(IMqttDeliveryToken token) { - // 优化:替换System.out为log.info,增加空值校验 if (token != null && token.getTopics() != null && token.getTopics().length > 0) { log.info("【MQTT确认】消息发布完成,clientId:{},主题:{}", safeClientId(), token.getTopics()[0]); } @@ -235,7 +228,6 @@ public class MqttMessageHandler implements SmartLifecycle { */ private void handleMessage(String topic, String payload) { try { - // 优化:替换System.out为log.info // log.info("【MQTT接收】topic={}, payload={}", topic, payload); // 设备状态主题:dtu/{deviceId}/up @@ -247,7 +239,6 @@ public class MqttMessageHandler implements SmartLifecycle { handleFrontendControl(topic, payload); } } catch (Exception e) { - // 优化:替换System.err为log.error,打印完整堆栈 log.error("【MQTT消息处理异常】topic={}", topic, e); } } @@ -297,6 +288,7 @@ public class MqttMessageHandler implements SmartLifecycle { } log.info("【设备回执】设备{}的{}功能执行完成,已释放锁:{},{}", deviceId, funcType, lockKey, delete); + // 回执成功且值=1(表示运行/开启)时,起个任务,固定多少秒-n秒 // 新增:回执固定是{"suc":true,"prop":{"jm1k":1}} boolean suc = payloadObj.getBooleanValue("suc"); if (suc && StringUtils.hasText(funcType) && funcValue != null && funcValue == 1) { @@ -310,22 +302,14 @@ public class MqttMessageHandler implements SmartLifecycle { autoOffSeconds = LIMIT_MAP.getOrDefault(funcType, k -> 0).apply(agriLimit); } + // 启动自动关闭任务(多线程执行) // 新增:回执成功且值=1(表示运行/开启)时,起个任务,固定多少秒-n秒 scheduleAutoOff(deviceId, funcType, autoOffSeconds); } - - // 广播回执结果给所有订阅该设备的前端 - // String broadcastTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";; - // JSONObject ackPayload = new JSONObject(); - // ackPayload.put("deviceId", deviceId); - // ackPayload.put("funcType", funcType); - // ackPayload.put("suc", payloadObj.getBooleanValue("suc")); - // ackPayload.put("code", propEntry.getValue()); - // mqttMessageSender.publish(broadcastTopic, ackPayload.toJSONString()); } } - // 新增:设备每10秒上报一次状态包,写入latest供自动关任务读取(只在需要时写,减少消耗) + // 设备每10秒上报一次状态包,写入latest供自动关任务读取 (只在需要时写,减少消耗) if (!isAck) { boolean needWriteLatest = false; @@ -346,7 +330,6 @@ public class MqttMessageHandler implements SmartLifecycle { } // 非回执消息:正常转发给订阅前端 - // if (!isDeviceAck) { // 查询Redis中订阅该设备的前端列表:sub:{deviceId} Set subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId); @@ -357,14 +340,12 @@ public class MqttMessageHandler implements SmartLifecycle { String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; // 发布消息 mqttMessageSender.publish(frontendTopic, payload); - // 优化:替换System.out为log.info // log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic); } } else { // 优化:替换System.out为log.info // log.info("【设备状态转发】设备{}无订阅前端,跳过转发", deviceId); } - // } } // 新增:是否存在该设备的自动关任务 @@ -384,33 +365,39 @@ public class MqttMessageHandler implements SmartLifecycle { return false; } - // 新增:起个任务,固定多少秒-n秒,【监听最新的设备状态,如果还在运行】,发送设备关的指令 + // 改造:多线程执行自动关闭任务 + // 起个任务,固定多少秒-n秒,【监听最新的设备状态,如果还在运行】,发送设备关的指令 private void scheduleAutoOff(String deviceId, String funcType, int delaySeconds) { if (!StringUtils.hasText(deviceId) || !StringUtils.hasText(funcType) || delaySeconds <= 0) { return; } String taskKey = "autooff:" + deviceId + ":" + funcType; - // 同设备同功能只保留最后一次 - ScheduledFuture old = autoOffFutureMap.remove(taskKey); - if (old != null) { - old.cancel(false); + // 同设备同功能只保留最后一次任务(先取消旧任务) + ScheduledFuture oldFuture = autoOffFutureMap.remove(taskKey); + if (oldFuture != null) { + oldFuture.cancel(false); + log.debug("【自动关任务】取消旧任务:{}", taskKey); } - ScheduledFuture future = autoOffExecutor.schedule(() -> { + // 使用多线程池提交任务 + ScheduledFuture newFuture = autoOffExecutor.schedule(() -> { try { runAutoOff(deviceId, funcType); } catch (Exception e) { log.error("【自动关任务】执行失败,deviceId={}, funcType={}", deviceId, funcType, e); } finally { + // 任务执行完成后移除映射 autoOffFutureMap.remove(taskKey); } }, delaySeconds, TimeUnit.SECONDS); - autoOffFutureMap.put(taskKey, future); - log.info("【自动关任务】已创建:deviceId={}, funcType={}, delay={}s", deviceId, funcType, delaySeconds); + // 保存新任务的引用 + autoOffFutureMap.put(taskKey, newFuture); + log.info("【自动关任务】已创建(多线程):deviceId={}, funcType={}, delay={}s", deviceId, funcType, delaySeconds); } + // 自动关闭任务的核心逻辑(无改动) // 新增:读取最新状态(device:latest:{deviceId}),若仍为1则下发 {"funcType":0} 到 dtu/{id}/down private void runAutoOff(String deviceId, String funcType) throws MqttException { String latest = stringRedisTemplate.opsForValue().get("device:latest:" + deviceId); @@ -490,7 +477,6 @@ public class MqttMessageHandler implements SmartLifecycle { if (!checkPermission(clientId, deviceId)) { String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; mqttMessageSender.publish(errorTopic, "{\"msg\":\"无设备操作权限\"}"); - // 优化:替换System.err为log.warn log.warn("【权限校验】前端{}操作设备{}失败", clientId, deviceId); return; } @@ -503,7 +489,6 @@ public class MqttMessageHandler implements SmartLifecycle { if (lockSuccess == null || !lockSuccess) { String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; mqttMessageSender.publish(errorTopic, "{\"msg\":\"设备" + funcType + "功能忙,请稍后重试\"}"); - // 优化:替换System.err为log.warn log.warn("【分布式锁】前端{}操作设备{}的{}功能失败", clientId, deviceId, funcType); return; } @@ -516,7 +501,6 @@ public class MqttMessageHandler implements SmartLifecycle { String deviceTopic = "dtu/" + deviceId + "/down"; //todo // mqttMessageSender.publish(deviceTopic, payload); - // 优化:替换System.out为log.info log.info("【指令转发】前端{} → 设备{}的{}功能", clientId, deviceId, funcType); } @@ -532,8 +516,7 @@ public class MqttMessageHandler implements SmartLifecycle { */ private boolean checkPermission(String clientId, String deviceId) { // 管理员权限:clientId以admin_开头 - - // 普通用户权限:校验Redis中是否绑定该设备:校验Redis中user_device:{clientId}是否包含该设备ID + // 普通用户权限:校验Redis中是否绑定该设备 return Boolean.TRUE; } @@ -541,7 +524,6 @@ public class MqttMessageHandler implements SmartLifecycle { * 前端订阅设备(Controller调用) */ public void subscribeDevice(String clientId, String deviceId) { - // 新增:入参非空校验(JDK 8兼容) if (!StringUtils.hasText(clientId) || !StringUtils.hasText(deviceId)) { log.error("【订阅管理】clientId或deviceId不能为空"); throw new IllegalArgumentException("clientId和deviceId不能为空"); @@ -549,21 +531,8 @@ public class MqttMessageHandler implements SmartLifecycle { // 保存订阅关系到Redis stringRedisTemplate.opsForSet().add("sub:" + deviceId, clientId); - stringRedisTemplate.opsForSet().add("subc:" + clientId, deviceId); // 新增:反向索引 - // 优化:替换System.out为log.info + stringRedisTemplate.opsForSet().add("subc:" + clientId, deviceId); log.info("【订阅管理】前端{}订阅设备{}成功", clientId, deviceId); - - // 推送设备最新状态(可选) - // String latestStatus = stringRedisTemplate.opsForValue().get("device:latest:" + deviceId); - // if (latestStatus != null) { - // String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; - // try { - // mqttMessageSender.publish(frontendTopic, latestStatus); - // } catch (MqttException e) { - // // 优化:替换System.err为log.error - // log.error("【订阅推送】设备{}状态推送失败", deviceId, e); - // } - // } } /** @@ -574,7 +543,6 @@ public class MqttMessageHandler implements SmartLifecycle { * @param deviceId 设备ID */ public void unsubscribeDevice(String clientId, String deviceId) { - // 新增:入参非空校验(JDK 8兼容) if (!StringUtils.hasText(clientId) || !StringUtils.hasText(deviceId)) { log.error("【前端取消订阅】clientId或deviceId不能为空"); throw new IllegalArgumentException("clientId和deviceId不能为空"); @@ -582,8 +550,7 @@ public class MqttMessageHandler implements SmartLifecycle { // 从Redis删除订阅关系 stringRedisTemplate.opsForSet().remove("sub:" + deviceId, clientId); - stringRedisTemplate.opsForSet().remove("subc:" + clientId, deviceId); // 新增:反向索引 - // 优化:替换System.out为log.info + stringRedisTemplate.opsForSet().remove("subc:" + clientId, deviceId); log.info("【前端取消订阅】前端{}取消订阅设备{}成功", clientId, deviceId); } @@ -594,7 +561,6 @@ public class MqttMessageHandler implements SmartLifecycle { * @return 前端需要取消的MQTT主题列表(供前端批量取消) */ public List unsubscribeAllDevice(String clientId) { - // 新增:入参非空校验(JDK 8兼容) if (!StringUtils.hasText(clientId)) { log.error("【批量取消】clientId不能为空"); throw new IllegalArgumentException("clientId不能为空"); @@ -643,7 +609,7 @@ public class MqttMessageHandler implements SmartLifecycle { return frontendTopics; } - // 新增:生产环境用Scan替代Keys,避免Redis阻塞(JDK 8兼容) + // 生产环境用Scan替代Keys,避免Redis阻塞 private Set scanRedisKeys(String pattern) { Set keys = new HashSet<>(); try { @@ -711,9 +677,17 @@ public class MqttMessageHandler implements SmartLifecycle { log.info("开始监听"); if (isRunning.compareAndSet(false, true)) { try { - // 核心修改:无论是否已连接,都执行订阅(设置回调+订阅主题) + // 初始化多线程池(固定线程数) + autoOffExecutor = Executors.newScheduledThreadPool(autoOffThreadPoolSize, r -> { + Thread thread = new Thread(r); + thread.setName("auto-off-task-" + thread.getId()); + thread.setDaemon(true); // 设置为守护线程,不阻塞JVM退出 + return thread; + }); + + // 核心修改:无论是否已连接,都执行订阅 subscribeTopics(); - log.info("【MQTT生命周期】客户端启动成功(已设置回调+订阅主题)"); + log.info("【MQTT生命周期】客户端启动成功(已设置回调+订阅主题),自动关闭任务线程池大小:{}", autoOffThreadPoolSize); } catch (MqttException e) { log.error("【MQTT生命周期】客户端启动失败", e); isRunning.set(false); @@ -722,27 +696,39 @@ public class MqttMessageHandler implements SmartLifecycle { } /** - * 停止MQTT客户端(Spring上下文销毁/重启时触发) - * 核心:移除所有不存在的方法,仅保留基础的断开/关闭逻辑 + * 停止MQTT客户端 + * 改造点:优化多线程池的优雅关闭 */ @Override public void stop() { - // 修复:JDK 8正确的compareAndSet写法(无命名参数) if (isRunning.compareAndSet(true, false)) { try { - // 新增:关闭自动关任务线程池,避免线程泄漏 - autoOffExecutor.shutdown(); - try { - if (!autoOffExecutor.awaitTermination(3, TimeUnit.SECONDS)) { + // 1. 取消所有未执行的自动关闭任务 + for (Map.Entry> entry : autoOffFutureMap.entrySet()) { + entry.getValue().cancel(false); + log.debug("【自动关任务】取消任务:{}", entry.getKey()); + } + autoOffFutureMap.clear(); + + // 2. 优雅关闭线程池 + if (autoOffExecutor != null) { + autoOffExecutor.shutdown(); + try { + // 等待3秒让任务完成 + if (!autoOffExecutor.awaitTermination(3, TimeUnit.SECONDS)) { + // 强制关闭 + autoOffExecutor.shutdownNow(); + log.warn("【自动关任务】线程池强制关闭"); + } + } catch (InterruptedException e) { autoOffExecutor.shutdownNow(); + Thread.currentThread().interrupt(); } - } catch (InterruptedException ignore) { - autoOffExecutor.shutdownNow(); - Thread.currentThread().interrupt(); + log.info("【自动关任务】线程池已关闭"); } + // 3. 关闭MQTT客户端 if (mqttClient != null) { - // 注意:disconnect 只在已连接时调用;close 尽量无条件释放资源 if (mqttClient.isConnected()) { mqttClient.disconnect(); } @@ -756,7 +742,7 @@ public class MqttMessageHandler implements SmartLifecycle { } /** - * 异步停止(JDK 8兼容,默认实现) + * 异步停止 */ @Override public void stop(Runnable callback) { @@ -787,4 +773,4 @@ public class MqttMessageHandler implements SmartLifecycle { public boolean isAutoStartup() { return true; } -} +} \ No newline at end of file