From bd24cec8cca063b41ff5301feae2b2d676bb078f Mon Sep 17 00:00:00 2001 From: xce Date: Fri, 16 Jan 2026 23:30:08 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B3=A8=E9=87=8A=E6=B2=A1=E7=94=A8=E9=80=BB?= =?UTF-8?q?=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/agri/framework/config/MqttConfig.java | 6 +- .../interceptor/MqttMessageHandler.java | 226 ++++-------------- 2 files changed, 47 insertions(+), 185 deletions(-) diff --git a/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java b/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java index 6baff0c..2dea7d4 100644 --- a/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java +++ b/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java @@ -123,7 +123,8 @@ public class MqttConfig { // 关闭清除会话:false=重连后保留订阅关系(若不需要离线消息可设为true) // 优化:生产环境建议设为false,重连后保留订阅关系,避免丢失离线消息 connectOptions.setCleanSession(false); - // 开启自动重连:连接断开后自动尝试重连,提升稳定性 + + // 开启自动重连:连接断开后自动尝试重连,提升稳定性(方案A核心) connectOptions.setAutomaticReconnect(true); // 设置最大重连间隔(秒):避免频繁重连消耗资源 connectOptions.setMaxReconnectDelay(30); @@ -235,7 +236,6 @@ public class MqttConfig { } } - /** * 暴露MQTT连接配置项为Bean,供其他类注入使用 */ @@ -243,4 +243,4 @@ public class MqttConfig { public MqttConnectOptions mqttConnectOptions() { return getMqttConnectOptions(); } -} \ No newline at end of file +} diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java index 46295d1..93e45d3 100644 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java @@ -1,6 +1,5 @@ package com.agri.framework.interceptor; -import com.agri.common.utils.uuid.UUID; import com.agri.framework.config.MqttConfig; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; @@ -8,7 +7,6 @@ import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; @@ -28,8 +26,6 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -41,6 +37,11 @@ import java.util.concurrent.atomic.AtomicBoolean; * 3. 转发设备状态到订阅的前端 * 4. 处理前端控制指令(权限校验+分布式锁+转发) * 适配JDK 8,无心跳包相关逻辑 + * + * 【方案A改造说明(最小改动)】 + * 1) 不再自己new MqttClient 做重连,避免 mqttMessageSender 持有旧client导致“重连后发不出去” + * 2) 只依赖 MqttConnectOptions#setAutomaticReconnect(true) 的Paho自动重连 + * 3) 不再 unsubscribe 主题(cleanSession=false场景下会破坏会话订阅) */ @Component public class MqttMessageHandler implements SmartLifecycle { @@ -67,45 +68,33 @@ public class MqttMessageHandler implements SmartLifecycle { // 新增:生命周期管理标识,控制MQTT客户端启动/关闭 private final AtomicBoolean isRunning = new AtomicBoolean(false); - /** MQTT连接配置项(从MqttConfig注入) */ @Resource private MqttConnectOptions mqttConnectOptions; - // ========== 新增:重连相关配置 ========== - // 重连间隔(秒),可配置化 - @Value("${spring.mqtt.reconnect-interval:5}") - private int reconnectInterval; - // 最大重连次数(-1表示无限重连) - @Value("${spring.mqtt.max-reconnect-times:-1}") - private int maxReconnectTimes; - // 重连线程池(单线程,避免并发重连) - private final ScheduledExecutorService reconnectExecutor = Executors.newSingleThreadScheduledExecutor(); - // 当前重连次数计数 - private int currentReconnectCount = 0; - // 配置错误标记(避免配置错时无限重连) - private volatile boolean isConfigError = false; - /** * 初始化:订阅主题+设置回调 * (移除@PostConstruct,改为由SmartLifecycle的start()触发) + * + * 【方案A】不做自写重连;Paho会在连接断开后自动重连(前提:connectOptions.setAutomaticReconnect(true)) */ public void subscribeTopics() throws MqttException { - // 关键补充1:判空(重连后替换的新实例可能为空) + // 关键补充1:判空 if (mqttClient == null) { log.error("【MQTT初始化】客户端实例为空,无法订阅主题"); throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED); } // 关键补充:先确保客户端已连接(如果没连则连接,已连则跳过) + // 注意:这里只使用同一个client实例,避免sender与handler使用不同client if (!mqttClient.isConnected()) { try { - // 使用注入的连接配置项连接Broker(带用户名密码、重连等配置) + // 使用注入的连接配置项连接Broker(带用户名密码、自动重连等配置) mqttClient.connect(mqttConnectOptions); log.info("【MQTT连接】客户端已成功连接到Broker,clientId:{}", mqttClient.getClientId()); } catch (MqttException e) { log.error("【MQTT连接】连接Broker失败,clientId:{}", mqttClient.getClientId(), e); - throw e; // 抛出异常,让start()处理重连 + throw e; } } @@ -117,14 +106,6 @@ public class MqttMessageHandler implements SmartLifecycle { qosArray[i] = 1; } - // 关键补充2:先取消原有订阅(避免重复订阅导致的消息重复接收) - try { - mqttClient.unsubscribe(topics); - log.info("【MQTT初始化】已取消原有主题订阅,准备重新订阅"); - } catch (Exception e) { - log.warn("【MQTT初始化】取消原有订阅失败(首次订阅可忽略):{}", e.getMessage()); - } - // 设置MQTT消息回调:处理连接断开、消息接收、消息发布完成 mqttClient.setCallback(new MqttCallback() { /** @@ -134,10 +115,13 @@ public class MqttMessageHandler implements SmartLifecycle { @Override public void connectionLost(Throwable cause) { // 优化:替换System.err为log.error - log.error("【MQTT连接异常】连接断开,clientId:{},原因:{}", mqttClient.getClientId(), cause.getMessage(), cause); - // 新增:触发自动重连(仅当客户端处于运行状态且非配置错误时) - if (isRunning.get() && !isConfigError) { - startReconnect(); + log.error("【MQTT连接异常】连接断开,clientId:{},原因:{}", + safeClientId(), (cause == null ? "unknown" : cause.getMessage()), cause); + + // 【方案A】不再触发自写重连;Paho自动重连会接管重连过程 + // 这里只记录日志即可 + if (isRunning.get()) { + log.warn("【MQTT自动重连】已开启automaticReconnect,等待Paho自动重连..."); } } @@ -161,20 +145,28 @@ public class MqttMessageHandler implements SmartLifecycle { public void deliveryComplete(IMqttDeliveryToken token) { // 优化:替换System.out为log.info,增加空值校验 if (token != null && token.getTopics() != null && token.getTopics().length > 0) { - log.info("【MQTT确认】消息发布完成,clientId:{},主题:{}", mqttClient.getClientId(), token.getTopics()[0]); + log.info("【MQTT确认】消息发布完成,clientId:{},主题:{}", safeClientId(), token.getTopics()[0]); } } }); + // 【方案A关键点】不再 unsubscribe 主题 + // cleanSession=false + unsubscribe 会破坏Broker侧会话订阅;并且自动重连场景更不建议这么做 + // 订阅主题 mqttClient.subscribe(topics, qosArray); - // 重置重连计数和配置错误标记(连接成功后清零) - currentReconnectCount = 0; - isConfigError = false; - // 优化:打印clientId,方便排查重连后的实例是否替换成功 + // 优化:打印clientId,方便排查 log.info("【MQTT初始化】订阅主题完成,clientId:{},订阅主题:{}", mqttClient.getClientId(), String.join(",", topics)); } + private String safeClientId() { + try { + return (mqttClient == null ? "null" : mqttClient.getClientId()); + } catch (Exception e) { + return "unknown"; + } + } + /** * 消息分发处理:根据主题类型路由到不同处理方法\仅处理设备状态、前端控制指令 * 可以监听到设备传过来的业务数据 以及前端传过来的控制设备指令 @@ -425,135 +417,23 @@ public class MqttMessageHandler implements SmartLifecycle { return keys; } - // ========== 新增:自动重连核心方法 ========== + // ========== 手动重连接口(供Controller调用) ========== /** - * 启动自动重连逻辑 - */ - private void startReconnect() { - // 终止条件:配置错误 或 达到最大重连次数 - if (isConfigError || (maxReconnectTimes > 0 && currentReconnectCount >= maxReconnectTimes)) { - String reason = isConfigError ? "配置错误" : String.format("达到最大重连次数(%d)", maxReconnectTimes); - log.error("【MQTT重连】{},停止重连", reason); - // isRunning.set(false); - return; - } - // 极端场景兜底:客户端实例为空则终止重连 - if (mqttClient == null) { - log.error("【MQTT重连】客户端实例为空,终止重连"); - return; - } - - // 提交重连任务到单线程池 - reconnectExecutor.schedule(() -> { - try { - currentReconnectCount++; - log.info("【MQTT重连】第{}次尝试重连(间隔{}秒)", currentReconnectCount, reconnectInterval); - - // 最小改动:补充close+置空,彻底释放旧实例 - try { - if (mqttClient.isConnected()) { - mqttClient.disconnect(); - log.info("【MQTT重连】已断开旧连接"); - } - // 新增:关闭旧实例释放资源 - mqttClient.close(); - } catch (Exception e) { - log.warn("【MQTT重连】断开/关闭旧连接失败(忽略):{}", e.getMessage()); - } - - // 2. 生成新的clientId(和MqttConfig中一致的规则:原clientId + 随机后缀) - // 新代码(字母数字混合,和配置文件对齐) - String oldClientId = mqttClient.getClientId(); - String originalClientId = oldClientId.contains("_") ? oldClientId.split("_")[0] : oldClientId; - // 调用工具方法生成8位字母数字随机串 - String shortRandom = UUID.generateAlphanumericRandom(8); - String newClientId = originalClientId + "_" + shortRandom; - - // 3. 创建新的MqttClient实例 - MqttClient newMqttClient = new MqttClient( - mqttClient.getServerURI(), - newClientId, - new MemoryPersistence() - ); - - // 4. 用新客户端连接 - newMqttClient.connect(mqttConnectOptions); - log.info("【MQTT重连】使用新clientId连接成功:{}", newClientId); - - // 5. 替换旧客户端实例,重新订阅 - this.mqttClient = newMqttClient; - // 重新连接MQTT Broker + 重新订阅主题 - subscribeTopics(); - - log.info("【MQTT重连】第{}次重连成功,新clientId:{}", currentReconnectCount, newClientId); - - } catch (MqttException e) { - log.error("【MQTT重连】第{}次重连失败:{}", currentReconnectCount, e.getMessage(), e); - // 判断是否为配置类错误(永久错误) - judgeConfigError(e); - // 非配置错误则继续重连 - if (!isConfigError) { - startReconnect(); - } - } - }, reconnectInterval, TimeUnit.SECONDS); - } - - /** - * 判断MQTT错误类型(配置错误/网络错误) - * 用纯数字错误码,兼容所有Paho版本 - */ - private void judgeConfigError(MqttException e) { - int errorCode = e.getReasonCode(); - switch (errorCode) { - // 21 = 认证失败(用户名/密码错误)→ 配置错误 - case 21: - isConfigError = true; - log.error("【MQTT配置错误】认证失败!请检查用户名/密码,错误码:{}", errorCode); - break; - // 32 = 客户端ID非法 → 配置错误 - case 32: - isConfigError = true; - log.error("【MQTT配置错误】客户端ID非法!请检查clientId配置,错误码:{}", errorCode); - break; - // 3 = 连接被拒绝(地址/端口错误)→ 配置错误 - case 3: - isConfigError = true; - log.error("【MQTT配置错误】连接被拒绝!请检查Broker地址/端口,错误码:{}", errorCode); - break; - // 31 = Broker不可达(网络波动)→ 继续重连 - case 31: - log.warn("【MQTT错误】Broker不可达(网络波动),错误码:{},继续重连", errorCode); - break; - // 其他错误 → 默认视为临时错误 - default: - log.warn("【MQTT未知错误】错误码:{},继续重连", errorCode); - break; - } - } - - // ========== 新增:手动重连接口(供Controller调用) ========== - /** - * 手动触发MQTT重连(重置配置错误标记,强制重连) + * 手动触发MQTT重连(最小改动:不替换client实例,只用同一个client重连) */ public String manualReconnect() { - // 重置配置错误标记(允许重连) - isConfigError = false; - currentReconnectCount = 0; isRunning.set(true); - try { // 强制断开旧连接(如果存在) - if (mqttClient.isConnected()) { + if (mqttClient != null && mqttClient.isConnected()) { mqttClient.disconnect(); } - // 重新初始化订阅 + // 重新初始化订阅(内部会connect + subscribe) subscribeTopics(); log.info("【手动重连】MQTT客户端重连成功"); return "MQTT手动重连成功"; } catch (MqttException e) { log.error("【手动重连】MQTT客户端重连失败", e); - judgeConfigError(e); return "MQTT手动重连失败:" + e.getMessage(); } } @@ -562,11 +442,9 @@ public class MqttMessageHandler implements SmartLifecycle { * 获取当前MQTT连接状态 */ public String getMqttStatus() { - boolean connected = mqttClient.isConnected(); + boolean connected = (mqttClient != null && mqttClient.isConnected()); String status = connected ? "已连接" : "已断开"; - String configErrorStatus = isConfigError ? "(配置错误,已终止自动重连)" : ""; - String reconnectStatus = String.format("当前重连次数:%d,最大重连次数:%d", currentReconnectCount, maxReconnectTimes); - return String.format("MQTT连接状态:%s%s;%s", status, configErrorStatus, reconnectStatus); + return String.format("MQTT连接状态:%s;clientId:%s", status, safeClientId()); } // ======================== SmartLifecycle 生命周期管理(核心修复) ======================== @@ -580,16 +458,11 @@ public class MqttMessageHandler implements SmartLifecycle { if (isRunning.compareAndSet(false, true)) { try { // 核心修改:无论是否已连接,都执行订阅(设置回调+订阅主题) - // 移除原有的if (!mqttClient.isConnected()) 判断 subscribeTopics(); log.info("【MQTT生命周期】客户端启动成功(已设置回调+订阅主题)"); } catch (MqttException e) { log.error("【MQTT生命周期】客户端启动失败", e); isRunning.set(false); - judgeConfigError(e); // 启动失败时判断配置错误 - if (!isConfigError) { - startReconnect(); - } } } } @@ -603,26 +476,15 @@ public class MqttMessageHandler implements SmartLifecycle { // 修复:JDK 8正确的compareAndSet写法(无命名参数) if (isRunning.compareAndSet(true, false)) { try { - // 新增:关闭重连线程池,避免内存泄漏 - reconnectExecutor.shutdown(); - if (!reconnectExecutor.awaitTermination(5, TimeUnit.SECONDS)) { - reconnectExecutor.shutdownNow(); - } - - if (mqttClient != null && mqttClient.isConnected()) { - // 移除:取消订阅相关逻辑(避免依赖不存在的方法) - // 直接断开连接(基础Paho支持的核心方法) - mqttClient.disconnect(); - // 关闭客户端释放资源(基础Paho支持的核心方法) + if (mqttClient != null) { + // 注意:disconnect 只在已连接时调用;close 尽量无条件释放资源 + if (mqttClient.isConnected()) { + mqttClient.disconnect(); + } mqttClient.close(); log.info("【MQTT生命周期】客户端已优雅关闭"); } - // 移除:resetConnection()(避免Redis版本差异) - // 替代方案:无需主动重置,Spring上下文重启会重新创建Redis连接 - // 新增:重置重连相关状态 - currentReconnectCount = 0; - isConfigError = false; - } catch (MqttException | InterruptedException e) { + } catch (Exception e) { log.error("【MQTT生命周期】客户端关闭失败", e); } } @@ -660,4 +522,4 @@ public class MqttMessageHandler implements SmartLifecycle { public boolean isAutoStartup() { return true; } -} \ No newline at end of file +}