mqtt架构

feasure
xce 2026-01-24 18:19:49 +08:00
parent c297d418da
commit c609b06781
1 changed files with 36 additions and 29 deletions

View File

@ -33,6 +33,7 @@ import javax.annotation.Resource;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -178,27 +179,12 @@ public class MqttMessageHandler implements SmartLifecycle {
throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED); throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
} }
// 关键补充:先确保客户端已连接(如果没连则连接,已连则跳过)
// 注意这里只使用同一个client实例避免sender与handler使用不同client
if (!mqttClient.isConnected()) {
try {
// 使用注入的连接配置项连接Broker带用户名密码、自动重连等配置
mqttClient.connect(mqttConnectOptions);
log.info("【MQTT连接】客户端已成功连接到BrokerclientId{}", mqttClient.getClientId());
} catch (MqttException e) {
log.error("【MQTT连接】连接Broker失败clientId{}", mqttClient.getClientId(), e);
throw e;
}
}
// 解析配置的主题列表 // 解析配置的主题列表
String[] topics = defaultTopic.split(","); final String[] topicsFinal = Arrays.stream(defaultTopic.split(","))
int[] qosArray = new int[topics.length]; .map(String::trim).toArray(String[]::new);
// 按主题类型设置QoS控制指令/状态用QoS 1 final int[] qosFinal = new int[topicsFinal.length];
for (int i = 0; i < topics.length; i++) { Arrays.fill(qosFinal, 0);
qosArray[i] = 0;
topics[i] = topics[i].trim();
}
// 设置MQTT消息回调处理连接断开、消息接收、消息发布完成 // 设置MQTT消息回调处理连接断开、消息接收、消息发布完成
mqttClient.setCallback(new MqttCallbackExtended() { mqttClient.setCallback(new MqttCallbackExtended() {
@ -206,11 +192,14 @@ public class MqttMessageHandler implements SmartLifecycle {
public void connectComplete(boolean reconnect, String serverURI) { public void connectComplete(boolean reconnect, String serverURI) {
log.info("【MQTT连接完成】reconnect={}, serverURI={}, clientId={}", log.info("【MQTT连接完成】reconnect={}, serverURI={}, clientId={}",
reconnect, serverURI, safeClientId()); reconnect, serverURI, safeClientId());
try { // cleanSession=true重连后必须补订阅
mqttClient.subscribe(topics, qosArray); if (reconnect) {
log.info("【MQTT订阅恢复】topics={}", String.join(",", topics)); try {
} catch (Exception e) { mqttClient.subscribe(topicsFinal, qosFinal);
log.error("【MQTT订阅恢复失败】", e); log.info("【MQTT订阅恢复】topicsFinal={}", String.join(",", topicsFinal));
} catch (Exception e) {
log.error("【MQTT订阅恢复失败】", e);
}
} }
} }
@ -278,11 +267,29 @@ public class MqttMessageHandler implements SmartLifecycle {
// 【方案A关键点】不再 unsubscribe 主题 // 【方案A关键点】不再 unsubscribe 主题
// cleanSession=false + unsubscribe 会破坏Broker侧会话订阅并且自动重连场景更不建议这么做 // cleanSession=false + unsubscribe 会破坏Broker侧会话订阅并且自动重连场景更不建议这么做
// 关键补充:先确保客户端已连接(如果没连则连接,已连则跳过)
// 注意这里只使用同一个client实例避免sender与handler使用不同client
if (!mqttClient.isConnected()) {
try {
// 使用注入的连接配置项连接Broker带用户名密码、自动重连等配置
mqttClient.connect(mqttConnectOptions);
log.info("【MQTT连接】客户端已成功连接到BrokerclientId{}", mqttClient.getClientId());
} catch (MqttException e) {
log.error("【MQTT连接】连接Broker失败clientId{}", mqttClient.getClientId(), e);
throw e;
}
}
// 订阅主题 // 订阅主题
// mqttClient.subscribe(topics, qosArray); // connect 后首次订阅
// 优化打印clientId方便排查 try {
log.info("【MQTT初始化】订阅主题完成clientId{},订阅主题:{}", mqttClient.getClientId(), String.join(",", topics)); mqttClient.subscribe(topicsFinal, qosFinal);
log.info("【MQTT初始化】订阅主题完成clientId{}topicsFinal={}",
mqttClient.getClientId(), String.join(",", topicsFinal));
} catch (MqttException e) {
log.error("【MQTT初始化】订阅失败clientId{}topicsFinal={}",
safeClientId(), String.join(",", topicsFinal), e);
throw e;
}
} }
private String safeClientId() { private String safeClientId() {