From c609b0678134159ca08626df110c33686cf70baa Mon Sep 17 00:00:00 2001 From: xce Date: Sat, 24 Jan 2026 18:19:49 +0800 Subject: [PATCH] =?UTF-8?q?mqtt=E6=9E=B6=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../interceptor/MqttMessageHandler.java | 65 ++++++++++--------- 1 file changed, 36 insertions(+), 29 deletions(-) diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java index 3529bc7..5145e25 100644 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java @@ -33,6 +33,7 @@ import javax.annotation.Resource; import java.nio.charset.StandardCharsets; import java.time.LocalDateTime; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -178,27 +179,12 @@ public class MqttMessageHandler implements SmartLifecycle { throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED); } - // 关键补充:先确保客户端已连接(如果没连则连接,已连则跳过) - // 注意:这里只使用同一个client实例,避免sender与handler使用不同client - if (!mqttClient.isConnected()) { - try { - // 使用注入的连接配置项连接Broker(带用户名密码、自动重连等配置) - mqttClient.connect(mqttConnectOptions); - log.info("【MQTT连接】客户端已成功连接到Broker,clientId:{}", mqttClient.getClientId()); - } catch (MqttException e) { - log.error("【MQTT连接】连接Broker失败,clientId:{}", mqttClient.getClientId(), e); - throw e; - } - } - // 解析配置的主题列表 - String[] topics = defaultTopic.split(","); - int[] qosArray = new int[topics.length]; - // 按主题类型设置QoS:控制指令/状态用QoS 1 - for (int i = 0; i < topics.length; i++) { - qosArray[i] = 0; - topics[i] = topics[i].trim(); - } + final String[] topicsFinal = Arrays.stream(defaultTopic.split(",")) + .map(String::trim).toArray(String[]::new); + final int[] qosFinal = new int[topicsFinal.length]; + Arrays.fill(qosFinal, 0); + // 设置MQTT消息回调:处理连接断开、消息接收、消息发布完成 mqttClient.setCallback(new MqttCallbackExtended() { @@ -206,11 +192,14 @@ public class MqttMessageHandler implements SmartLifecycle { public void connectComplete(boolean reconnect, String serverURI) { log.info("【MQTT连接完成】reconnect={}, serverURI={}, clientId={}", reconnect, serverURI, safeClientId()); - try { - mqttClient.subscribe(topics, qosArray); - log.info("【MQTT订阅恢复】topics={}", String.join(",", topics)); - } catch (Exception e) { - log.error("【MQTT订阅恢复失败】", e); + // cleanSession=true:重连后必须补订阅 + if (reconnect) { + try { + mqttClient.subscribe(topicsFinal, qosFinal); + log.info("【MQTT订阅恢复】topicsFinal={}", String.join(",", topicsFinal)); + } catch (Exception e) { + log.error("【MQTT订阅恢复失败】", e); + } } } @@ -278,11 +267,29 @@ public class MqttMessageHandler implements SmartLifecycle { // 【方案A关键点】不再 unsubscribe 主题 // cleanSession=false + unsubscribe 会破坏Broker侧会话订阅;并且自动重连场景更不建议这么做 - + // 关键补充:先确保客户端已连接(如果没连则连接,已连则跳过) + // 注意:这里只使用同一个client实例,避免sender与handler使用不同client + if (!mqttClient.isConnected()) { + try { + // 使用注入的连接配置项连接Broker(带用户名密码、自动重连等配置) + mqttClient.connect(mqttConnectOptions); + log.info("【MQTT连接】客户端已成功连接到Broker,clientId:{}", mqttClient.getClientId()); + } catch (MqttException e) { + log.error("【MQTT连接】连接Broker失败,clientId:{}", mqttClient.getClientId(), e); + throw e; + } + } // 订阅主题 - // mqttClient.subscribe(topics, qosArray); - // 优化:打印clientId,方便排查 - log.info("【MQTT初始化】订阅主题完成,clientId:{},订阅主题:{}", mqttClient.getClientId(), String.join(",", topics)); + // connect 后首次订阅 + try { + mqttClient.subscribe(topicsFinal, qosFinal); + log.info("【MQTT初始化】订阅主题完成,clientId:{},topicsFinal={}", + mqttClient.getClientId(), String.join(",", topicsFinal)); + } catch (MqttException e) { + log.error("【MQTT初始化】订阅失败,clientId:{},topicsFinal={}", + safeClientId(), String.join(",", topicsFinal), e); + throw e; + } } private String safeClientId() {