From 6fd96b188ccb92066554f8eb9e8c3ef207ddd9ee Mon Sep 17 00:00:00 2001 From: xce Date: Fri, 16 Jan 2026 18:13:24 +0800 Subject: [PATCH] =?UTF-8?q?mqtt=E5=9F=BA=E7=A1=80=E7=89=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../web/controller/mqtt/MqttController.java | 77 +++++- .../src/main/resources/application-mqtt.yml | 7 +- .../java/com/agri/common/utils/uuid/UUID.java | 20 +- .../com/agri/framework/config/MqttConfig.java | 13 +- .../agri/framework/config/SecurityConfig.java | 2 +- .../interceptor/MqttMessageHandler.java | 230 +++++++++++++++++- 6 files changed, 329 insertions(+), 20 deletions(-) diff --git a/agri-admin/src/main/java/com/agri/web/controller/mqtt/MqttController.java b/agri-admin/src/main/java/com/agri/web/controller/mqtt/MqttController.java index 3afdbdd..0a85317 100644 --- a/agri-admin/src/main/java/com/agri/web/controller/mqtt/MqttController.java +++ b/agri-admin/src/main/java/com/agri/web/controller/mqtt/MqttController.java @@ -1,7 +1,15 @@ package com.agri.web.controller.mqtt; import com.agri.framework.interceptor.MqttMessageHandler; -import org.springframework.web.bind.annotation.*; +import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.bind.annotation.DeleteMapping; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.List; @@ -13,6 +21,8 @@ import java.util.List; @RequestMapping("/api/mqtt") public class MqttController { + private static final Logger log = LoggerFactory.getLogger(MqttController.class); + @Resource private MqttMessageHandler mqttMessageHandler; @@ -21,8 +31,16 @@ public class MqttController { */ @PostMapping("/single") public String subscribe(@RequestParam String clientId, @RequestParam String deviceId) { - mqttMessageHandler.subscribeDevice(clientId, deviceId); - return "订阅成功"; + try { + mqttMessageHandler.subscribeDevice(clientId, deviceId); + return "订阅成功"; + } catch (IllegalArgumentException e) { + log.error("MQTT单个订阅失败:{}", e.getMessage()); + return "订阅失败:" + e.getMessage(); + } catch (Exception e) { + log.error("MQTT单个订阅异常", e); + return "订阅失败:系统异常"; + } } /** @@ -30,8 +48,16 @@ public class MqttController { */ @DeleteMapping("/single") public String unsubscribe(@RequestParam String clientId, @RequestParam String deviceId) { - mqttMessageHandler.unsubscribeDevice(clientId, deviceId); - return "取消订阅成功"; + try { + mqttMessageHandler.unsubscribeDevice(clientId, deviceId); + return "取消订阅成功"; + } catch (IllegalArgumentException e) { + log.error("MQTT单个取消订阅失败:{}", e.getMessage()); + return "取消订阅失败:" + e.getMessage(); + } catch (Exception e) { + log.error("MQTT单个取消订阅异常", e); + return "取消订阅失败:系统异常"; + } } /** @@ -39,7 +65,44 @@ public class MqttController { */ @DeleteMapping("/batch") public List unsubscribeAll(@RequestParam String clientId) { - // 返回前端需要取消的MQTT主题列表 - return mqttMessageHandler.unsubscribeAllDevice(clientId); + try { + // 返回前端需要取消的MQTT主题列表 + return mqttMessageHandler.unsubscribeAllDevice(clientId); + } catch (IllegalArgumentException e) { + log.error("MQTT批量取消订阅失败:{}", e.getMessage()); + // 异常时返回空列表,避免前端解析失败 + return Lists.newArrayList(); + } catch (Exception e) { + log.error("MQTT批量取消订阅异常", e); + return Lists.newArrayList(); + } + } + + /** + * 手动触发MQTT重连 + * 场景:配置修正后,手动恢复连接(无需重启服务) + */ + @GetMapping("/reconnect") + public String manualReconnect() { + try { + return mqttMessageHandler.manualReconnect(); + } catch (Exception e) { + log.error("MQTT手动重连异常", e); + return "手动重连失败:" + e.getMessage(); + } + } + + /** + * 查询当前MQTT连接状态 + * 便于排查连接问题 + */ + @GetMapping("/status") + public String getMqttStatus() { + try { + return mqttMessageHandler.getMqttStatus(); + } catch (Exception e) { + log.error("查询MQTT连接状态异常", e); + return "查询状态失败:" + e.getMessage(); + } } } \ No newline at end of file diff --git a/agri-admin/src/main/resources/application-mqtt.yml b/agri-admin/src/main/resources/application-mqtt.yml index 03b98c8..6d515cf 100644 --- a/agri-admin/src/main/resources/application-mqtt.yml +++ b/agri-admin/src/main/resources/application-mqtt.yml @@ -5,8 +5,11 @@ spring: ws-host: wss://mq.xiaoces.com/mqtt # 前端的WebSocket地址 username: admin # Mosquitto共用账号 password: Admin#12345678 # Mosquitto密码 - client-id: springboot-backend-${random.uuid} # 后端客户端ID(唯一) + client-id: springboot-backend # 截取UUID前8位(自动去横线) default-topic: dtu/+/up,frontend/+/control/+ # 后端监听的主题 qos: 1 # 消息可靠性 timeout: 60 # 连接超时 - keep-alive: 60 # 心跳间隔 \ No newline at end of file + keep-alive: 60 # 心跳间隔 + # 新增重连配置 + reconnect-interval: 5 # 重连间隔(秒) + max-reconnect-times: -1 # 最大重连次数(-1=无限重连) \ No newline at end of file diff --git a/agri-common/src/main/java/com/agri/common/utils/uuid/UUID.java b/agri-common/src/main/java/com/agri/common/utils/uuid/UUID.java index 4815cbe..e3e76a0 100644 --- a/agri-common/src/main/java/com/agri/common/utils/uuid/UUID.java +++ b/agri-common/src/main/java/com/agri/common/utils/uuid/UUID.java @@ -1,11 +1,12 @@ package com.agri.common.utils.uuid; +import com.agri.common.exception.UtilException; + import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.util.Random; import java.util.concurrent.ThreadLocalRandom; -import com.agri.common.exception.UtilException; /** * 提供通用唯一识别码(universally unique identifier)(UUID)实现 @@ -481,4 +482,21 @@ public final class UUID implements java.io.Serializable, Comparable { return ThreadLocalRandom.current(); } + + + /** + * 生成指定长度的字母数字随机串(对齐Spring ${random.alphanumeric:n}语法) + * 字符集:0-9 + a-z + A-Z,和Spring配置的随机串规则一致 + */ + public static String generateAlphanumericRandom(int length) { + // 定义字母数字混合的字符集(和Spring ${random.alphanumeric}的字符集一致) + String chars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; + StringBuilder sb = new StringBuilder(); + // 基于JVM的随机数生成,保证随机性 + for (int i = 0; i < length; i++) { + int randomIndex = (int) (Math.random() * chars.length()); + sb.append(chars.charAt(randomIndex)); + } + return sb.toString(); + } } diff --git a/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java b/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java index b6e3941..c444402 100644 --- a/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java +++ b/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java @@ -1,5 +1,6 @@ package com.agri.framework.config; +import com.agri.common.utils.uuid.UUID; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; @@ -14,7 +15,6 @@ import org.springframework.util.StringUtils; import javax.annotation.PreDestroy; import java.nio.charset.StandardCharsets; -import java.util.UUID; /** * MQTT核心配置类 @@ -92,7 +92,7 @@ public class MqttConfig { MemoryPersistence persistence = new MemoryPersistence(); // 新增:客户端ID拼接随机后缀,避免多实例部署时冲突(生产环境必备) - String uniqueClientId = clientId + "_" + UUID.randomUUID().toString().substring(0, 8); + String uniqueClientId = clientId + "_" + UUID.generateAlphanumericRandom(8); MqttClient mqttClient = new MqttClient(host, uniqueClientId, persistence); // 3. 建立MQTT连接 @@ -228,4 +228,13 @@ public class MqttConfig { } } } + + + /** + * 暴露MQTT连接配置项为Bean,供其他类注入使用 + */ + @Bean + public MqttConnectOptions mqttConnectOptions() { + return getMqttConnectOptions(); + } } \ No newline at end of file diff --git a/agri-framework/src/main/java/com/agri/framework/config/SecurityConfig.java b/agri-framework/src/main/java/com/agri/framework/config/SecurityConfig.java index def8335..6d9cacf 100644 --- a/agri-framework/src/main/java/com/agri/framework/config/SecurityConfig.java +++ b/agri-framework/src/main/java/com/agri/framework/config/SecurityConfig.java @@ -111,7 +111,7 @@ public class SecurityConfig .authorizeHttpRequests((requests) -> { permitAllUrl.getUrls().forEach(url -> requests.antMatchers(url).permitAll()); // 对于登录login 注册register 验证码captchaImage 允许匿名访问 - requests.antMatchers("/login", "/register", "/captchaImage").permitAll() + requests.antMatchers("/login", "/register", "/captchaImage","/api/mqtt/status").permitAll() // 静态资源,可匿名访问 .antMatchers(HttpMethod.GET, "/", "/*.html", "/**/*.html", "/**/*.css", "/**/*.js", "/profile/**").permitAll() .antMatchers("/swagger-ui.html", "/swagger-resources/**", "/webjars/**", "/*/api-docs", "/druid/**").permitAll() diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java index 29971b8..e4ef9d1 100644 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java @@ -1,11 +1,14 @@ package com.agri.framework.interceptor; +import com.agri.common.utils.uuid.UUID; import com.agri.framework.config.MqttConfig; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; @@ -25,6 +28,8 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -62,11 +67,48 @@ public class MqttMessageHandler implements SmartLifecycle { // 新增:生命周期管理标识,控制MQTT客户端启动/关闭 private final AtomicBoolean isRunning = new AtomicBoolean(false); + + /** MQTT连接配置项(从MqttConfig注入) */ + @Resource + private MqttConnectOptions mqttConnectOptions; + + // ========== 新增:重连相关配置 ========== + // 重连间隔(秒),可配置化 + @Value("${spring.mqtt.reconnect-interval:5}") + private int reconnectInterval; + // 最大重连次数(-1表示无限重连) + @Value("${spring.mqtt.max-reconnect-times:-1}") + private int maxReconnectTimes; + // 重连线程池(单线程,避免并发重连) + private final ScheduledExecutorService reconnectExecutor = Executors.newSingleThreadScheduledExecutor(); + // 当前重连次数计数 + private int currentReconnectCount = 0; + // 配置错误标记(避免配置错时无限重连) + private volatile boolean isConfigError = false; + /** * 初始化:订阅主题+设置回调 * (移除@PostConstruct,改为由SmartLifecycle的start()触发) */ public void subscribeTopics() throws MqttException { + // 关键补充1:判空(重连后替换的新实例可能为空) + if (mqttClient == null) { + log.error("【MQTT初始化】客户端实例为空,无法订阅主题"); + throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED); + } + + // 关键补充:先确保客户端已连接(如果没连则连接,已连则跳过) + if (!mqttClient.isConnected()) { + try { + // 使用注入的连接配置项连接Broker(带用户名密码、重连等配置) + mqttClient.connect(mqttConnectOptions); + log.info("【MQTT连接】客户端已成功连接到Broker,clientId:{}", mqttClient.getClientId()); + } catch (MqttException e) { + log.error("【MQTT连接】连接Broker失败,clientId:{}", mqttClient.getClientId(), e); + throw e; // 抛出异常,让start()处理重连 + } + } + // 解析配置的主题列表 String[] topics = defaultTopic.split(","); int[] qosArray = new int[topics.length]; @@ -75,6 +117,14 @@ public class MqttMessageHandler implements SmartLifecycle { qosArray[i] = 1; } + // 关键补充2:先取消原有订阅(避免重复订阅导致的消息重复接收) + try { + mqttClient.unsubscribe(topics); + log.info("【MQTT初始化】已取消原有主题订阅,准备重新订阅"); + } catch (Exception e) { + log.warn("【MQTT初始化】取消原有订阅失败(首次订阅可忽略):{}", e.getMessage()); + } + // 设置MQTT消息回调:处理连接断开、消息接收、消息发布完成 mqttClient.setCallback(new MqttCallback() { /** @@ -84,7 +134,11 @@ public class MqttMessageHandler implements SmartLifecycle { @Override public void connectionLost(Throwable cause) { // 优化:替换System.err为log.error - log.error("【MQTT连接异常】连接断开:{}", cause.getMessage(), cause); + log.error("【MQTT连接异常】连接断开,clientId:{},原因:{}", mqttClient.getClientId(), cause.getMessage(), cause); + // 新增:触发自动重连(仅当客户端处于运行状态且非配置错误时) + if (isRunning.get() && !isConfigError) { + startReconnect(); + } } /** @@ -107,15 +161,18 @@ public class MqttMessageHandler implements SmartLifecycle { public void deliveryComplete(IMqttDeliveryToken token) { // 优化:替换System.out为log.info,增加空值校验 if (token != null && token.getTopics() != null && token.getTopics().length > 0) { - log.info("【MQTT确认】消息发布完成,主题:{}", token.getTopics()[0]); + log.info("【MQTT确认】消息发布完成,clientId:{},主题:{}", mqttClient.getClientId(), token.getTopics()[0]); } } }); // 订阅主题 mqttClient.subscribe(topics, qosArray); - // 优化:替换System.out为log.info - log.info("【MQTT初始化】订阅主题:{}", String.join(",", topics)); + // 重置重连计数和配置错误标记(连接成功后清零) + currentReconnectCount = 0; + isConfigError = false; + // 优化:打印clientId,方便排查重连后的实例是否替换成功 + log.info("【MQTT初始化】订阅主题完成,clientId:{},订阅主题:{}", mqttClient.getClientId(), String.join(",", topics)); } /** @@ -367,6 +424,148 @@ public class MqttMessageHandler implements SmartLifecycle { return keys; } + // ========== 新增:自动重连核心方法 ========== + /** + * 启动自动重连逻辑 + */ + private void startReconnect() { + // 终止条件:配置错误 或 达到最大重连次数 + if (isConfigError || (maxReconnectTimes > 0 && currentReconnectCount >= maxReconnectTimes)) { + String reason = isConfigError ? "配置错误" : String.format("达到最大重连次数(%d)", maxReconnectTimes); + log.error("【MQTT重连】{},停止重连", reason); + // isRunning.set(false); + return; + } + // 极端场景兜底:客户端实例为空则终止重连 + if (mqttClient == null) { + log.error("【MQTT重连】客户端实例为空,终止重连"); + return; + } + + // 提交重连任务到单线程池 + reconnectExecutor.schedule(() -> { + try { + currentReconnectCount++; + log.info("【MQTT重连】第{}次尝试重连(间隔{}秒)", currentReconnectCount, reconnectInterval); + + // 兼容极端场景:先强制断开(无论当前状态),再重新连接 + try { + if (mqttClient.isConnected()) { + mqttClient.disconnect(); + log.info("【MQTT重连】已断开旧连接"); + } + } catch (Exception e) { + log.warn("【MQTT重连】断开旧连接失败(忽略):{}", e.getMessage()); + } + + // 2. 生成新的clientId(和MqttConfig中一致的规则:原clientId + 随机后缀) + // 新代码(字母数字混合,和配置文件对齐) + String oldClientId = mqttClient.getClientId(); + String originalClientId = oldClientId.contains("_") ? oldClientId.split("_")[0] : oldClientId; + // 调用工具方法生成8位字母数字随机串 + String shortRandom = UUID.generateAlphanumericRandom(8); + String newClientId = originalClientId + "_" + shortRandom; + + // 3. 创建新的MqttClient实例 + MqttClient newMqttClient = new MqttClient( + mqttClient.getServerURI(), + newClientId, + new MemoryPersistence() + ); + + // 4. 用新客户端连接 + newMqttClient.connect(mqttConnectOptions); + log.info("【MQTT重连】使用新clientId连接成功:{}", newClientId); + + // 5. 替换旧客户端实例,重新订阅 + this.mqttClient = newMqttClient; + // 重新连接MQTT Broker + 重新订阅主题 + subscribeTopics(); + + log.info("【MQTT重连】第{}次重连成功,新clientId:{}", currentReconnectCount, newClientId); + + } catch (MqttException e) { + log.error("【MQTT重连】第{}次重连失败:{}", currentReconnectCount, e.getMessage(), e); + // 判断是否为配置类错误(永久错误) + judgeConfigError(e); + // 非配置错误则继续重连 + if (!isConfigError) { + startReconnect(); + } + } + }, reconnectInterval, TimeUnit.SECONDS); + } + + /** + * 判断MQTT错误类型(配置错误/网络错误) + * 用纯数字错误码,兼容所有Paho版本 + */ + private void judgeConfigError(MqttException e) { + int errorCode = e.getReasonCode(); + switch (errorCode) { + // 21 = 认证失败(用户名/密码错误)→ 配置错误 + case 21: + isConfigError = true; + log.error("【MQTT配置错误】认证失败!请检查用户名/密码,错误码:{}", errorCode); + break; + // 32 = 客户端ID非法 → 配置错误 + case 32: + isConfigError = true; + log.error("【MQTT配置错误】客户端ID非法!请检查clientId配置,错误码:{}", errorCode); + break; + // 3 = 连接被拒绝(地址/端口错误)→ 配置错误 + case 3: + isConfigError = true; + log.error("【MQTT配置错误】连接被拒绝!请检查Broker地址/端口,错误码:{}", errorCode); + break; + // 31 = Broker不可达(网络波动)→ 继续重连 + case 31: + log.warn("【MQTT错误】Broker不可达(网络波动),错误码:{},继续重连", errorCode); + break; + // 其他错误 → 默认视为临时错误 + default: + log.warn("【MQTT未知错误】错误码:{},继续重连", errorCode); + break; + } + } + + // ========== 新增:手动重连接口(供Controller调用) ========== + /** + * 手动触发MQTT重连(重置配置错误标记,强制重连) + */ + public String manualReconnect() { + // 重置配置错误标记(允许重连) + isConfigError = false; + currentReconnectCount = 0; + isRunning.set(true); + + try { + // 强制断开旧连接(如果存在) + if (mqttClient.isConnected()) { + mqttClient.disconnect(); + } + // 重新初始化订阅 + subscribeTopics(); + log.info("【手动重连】MQTT客户端重连成功"); + return "MQTT手动重连成功"; + } catch (MqttException e) { + log.error("【手动重连】MQTT客户端重连失败", e); + judgeConfigError(e); + return "MQTT手动重连失败:" + e.getMessage(); + } + } + + /** + * 获取当前MQTT连接状态 + */ + public String getMqttStatus() { + boolean connected = mqttClient.isConnected(); + String status = connected ? "已连接" : "已断开"; + String configErrorStatus = isConfigError ? "(配置错误,已终止自动重连)" : ""; + String reconnectStatus = String.format("当前重连次数:%d,最大重连次数:%d", currentReconnectCount, maxReconnectTimes); + return String.format("MQTT连接状态:%s%s;%s", status, configErrorStatus, reconnectStatus); + } + // ======================== SmartLifecycle 生命周期管理(核心修复) ======================== /** * 启动MQTT客户端(Spring上下文初始化/重启时触发) @@ -374,14 +573,22 @@ public class MqttMessageHandler implements SmartLifecycle { */ @Override public void start() { + log.info("开始监听"); if (isRunning.compareAndSet(false, true)) { + log.info("开始监听111"); try { - // 重新初始化MQTT订阅和回调 + // 核心修改:无论是否已连接,都执行订阅(设置回调+订阅主题) + // 移除原有的if (!mqttClient.isConnected()) 判断 + log.info("开始监听222"); subscribeTopics(); - log.info("【MQTT生命周期】客户端启动成功"); + log.info("【MQTT生命周期】客户端启动成功(已设置回调+订阅主题)"); } catch (MqttException e) { log.error("【MQTT生命周期】客户端启动失败", e); isRunning.set(false); + judgeConfigError(e); // 启动失败时判断配置错误 + if (!isConfigError) { + startReconnect(); + } } } } @@ -395,6 +602,12 @@ public class MqttMessageHandler implements SmartLifecycle { // 修复:JDK 8正确的compareAndSet写法(无命名参数) if (isRunning.compareAndSet(true, false)) { try { + // 新增:关闭重连线程池,避免内存泄漏 + reconnectExecutor.shutdown(); + if (!reconnectExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + reconnectExecutor.shutdownNow(); + } + if (mqttClient != null && mqttClient.isConnected()) { // 移除:取消订阅相关逻辑(避免依赖不存在的方法) // 直接断开连接(基础Paho支持的核心方法) @@ -405,7 +618,10 @@ public class MqttMessageHandler implements SmartLifecycle { } // 移除:resetConnection()(避免Redis版本差异) // 替代方案:无需主动重置,Spring上下文重启会重新创建Redis连接 - } catch (MqttException e) { + // 新增:重置重连相关状态 + currentReconnectCount = 0; + isConfigError = false; + } catch (MqttException | InterruptedException e) { log.error("【MQTT生命周期】客户端关闭失败", e); } }