mqtt架构

master
xce 2026-01-24 19:53:48 +08:00
parent ca1bcec621
commit 1c3dc095e4
3 changed files with 89 additions and 36 deletions

View File

@ -122,10 +122,10 @@ public class MqttConfig {
connectOptions.setKeepAliveInterval(keepAlive); connectOptions.setKeepAliveInterval(keepAlive);
// 关闭清除会话false=重连后保留订阅关系若不需要离线消息可设为true // 关闭清除会话false=重连后保留订阅关系若不需要离线消息可设为true
// 优化生产环境建议设为false重连后保留订阅关系避免丢失离线消息 // 优化生产环境建议设为false重连后保留订阅关系避免丢失离线消息
connectOptions.setCleanSession(false); connectOptions.setCleanSession(true);
connectOptions.setMaxInflight(200);
// 开启自动重连连接断开后自动尝试重连提升稳定性方案A核心 // 开启自动重连连接断开后自动尝试重连提升稳定性方案A核心
connectOptions.setAutomaticReconnect(false); connectOptions.setAutomaticReconnect(true);
// 设置最大重连间隔(秒):避免频繁重连消耗资源 // 设置最大重连间隔(秒):避免频繁重连消耗资源
connectOptions.setMaxReconnectDelay(30); connectOptions.setMaxReconnectDelay(30);
return connectOptions; return connectOptions;

View File

@ -221,7 +221,7 @@ public class DeviceStatusHandler {
String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";
// 发布消息 // 发布消息
mqttMessageSender.publish(frontendTopic, payload); mqttMessageSender.publish(frontendTopic, payload);
log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic); // log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic);
} }
// 删掉设备对应的客户端 // 删掉设备对应的客户端
if (stale != null && !stale.isEmpty()) { if (stale != null && !stale.isEmpty()) {

View File

@ -2,7 +2,7 @@ package com.agri.framework.manager;
import com.agri.framework.web.dispatcher.MqttMessageDispatcher; import com.agri.framework.web.dispatcher.MqttMessageDispatcher;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttException;
@ -15,6 +15,10 @@ import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
/** /**
@ -71,6 +75,20 @@ public class MqttClientManager implements SmartLifecycle {
@Value("${spring.mqtt.auto-off-thread-pool-size:5}") @Value("${spring.mqtt.auto-off-thread-pool-size:5}")
private int autoOffThreadPoolSize; private int autoOffThreadPoolSize;
private final ThreadPoolExecutor mqttBizPool =
new ThreadPoolExecutor(
8, // core
16, // max
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2000), // 有界、较小
r -> {
Thread t = new Thread(r);
t.setName("mqtt-biz-" + t.getId());
t.setDaemon(true);
return t;
},
new ThreadPoolExecutor.DiscardPolicy() // 直接丢
);
/** /**
* + * +
* @PostConstructSmartLifecyclestart() * @PostConstructSmartLifecyclestart()
@ -84,36 +102,41 @@ public class MqttClientManager implements SmartLifecycle {
throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED); throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
} }
// 关键补充:先确保客户端已连接(如果没连则连接,已连则跳过)
// 注意这里只使用同一个client实例避免sender与handler使用不同client
if (!mqttClient.isConnected()) {
try {
// 使用注入的连接配置项连接Broker带用户名密码、自动重连等配置
mqttClient.connect(mqttConnectOptions);
log.info("【MQTT连接】客户端已成功连接到BrokerclientId{}", mqttClient.getClientId());
} catch (MqttException e) {
log.error("【MQTT连接】连接Broker失败clientId{}", mqttClient.getClientId(), e);
throw e;
}
}
// 解析配置的主题列表 // 解析配置的主题列表
String[] topics = defaultTopic.split(","); final String[] topicsFinal = Arrays.stream(defaultTopic.split(","))
int[] qosArray = new int[topics.length]; .map(String::trim).toArray(String[]::new);
// 按主题类型设置QoS控制指令/状态用QoS 1 final int[] qosFinal = new int[topicsFinal.length];
for (int i = 0; i < topics.length; i++) { Arrays.fill(qosFinal, 0);
qosArray[i] = 0;
topics[i] = topics[i].trim();
}
// 设置MQTT消息回调处理连接断开、消息接收、消息发布完成 // 设置MQTT消息回调处理连接断开、消息接收、消息发布完成
mqttClient.setCallback(new MqttCallback() { mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
log.info("【MQTT连接完成】reconnect={}, serverURI={}, clientId={}",
reconnect, serverURI, safeClientId());
// cleanSession=true重连后必须补订阅
if (reconnect) {
try {
mqttClient.subscribe(topicsFinal, qosFinal);
log.info("【MQTT订阅恢复】topicsFinal={}", String.join(",", topicsFinal));
} catch (Exception e) {
log.error("【MQTT订阅恢复失败】", e);
}
}
}
/** /**
* MQTT * MQTT
* @param cause * @param cause
*/ */
@Override @Override
public void connectionLost(Throwable cause) { public void connectionLost(Throwable cause) {
log.info("autoReconnect={}, cleanSession={}, keepAlive={}",
mqttConnectOptions.isAutomaticReconnect(),
mqttConnectOptions.isCleanSession(),
mqttConnectOptions.getKeepAliveInterval());
log.error("【MQTT连接异常】连接断开clientId{},原因:{}", log.error("【MQTT连接异常】连接断开clientId{},原因:{}",
safeClientId(), (cause == null ? "unknown" : cause.getMessage()), cause); safeClientId(), (cause == null ? "unknown" : cause.getMessage()), cause);
@ -125,15 +148,32 @@ public class MqttClientManager implements SmartLifecycle {
} }
/** /**
* MQTT * MQTT
* @param topic * @param topic
* @param message * @param message
* @throws Exception * @throws Exception
*/ */
@Override @Override
public void messageArrived(String topic, MqttMessage message) throws Exception { public void messageArrived(String topic, MqttMessage message) throws Exception {
// 优化显式指定UTF-8编码避免乱码JDK 8兼容
mqttMessageDispatcher.handleMessage(topic, new String(message.getPayload(), StandardCharsets.UTF_8)); final String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
if (message.isRetained()) {
log.info("ignore retained snapshot: {}", topic);
return;
}
mqttBizPool.execute(() -> {
log.debug("mqttBizPool active={}, queue={}",
mqttBizPool.getActiveCount(),
mqttBizPool.getQueue().size());
try {
// 优化显式指定UTF-8编码避免乱码JDK 8兼容
mqttMessageDispatcher.handleMessage(topic, payload);
} catch (Exception e) {
log.error("【MQTT消息处理异常】topic={}, payload={}", topic, payload, e);
}
});
} }
/** /**
@ -147,14 +187,27 @@ public class MqttClientManager implements SmartLifecycle {
} }
} }
}); });
if (!mqttClient.isConnected()) {
// 【方案A关键点】不再 unsubscribe 主题 try {
// cleanSession=false + unsubscribe 会破坏Broker侧会话订阅并且自动重连场景更不建议这么做 // 使用注入的连接配置项连接Broker带用户名密码、自动重连等配置
mqttClient.connect(mqttConnectOptions);
log.info("【MQTT连接】客户端已成功连接到BrokerclientId{}", mqttClient.getClientId());
} catch (MqttException e) {
log.error("【MQTT连接】连接Broker失败clientId{}", mqttClient.getClientId(), e);
throw e;
}
}
// 订阅主题 // 订阅主题
mqttClient.subscribe(topics, qosArray); // connect 后首次订阅
// 优化打印clientId方便排查 try {
log.info("【MQTT初始化】订阅主题完成clientId{},订阅主题:{}", mqttClient.getClientId(), String.join(",", topics)); mqttClient.subscribe(topicsFinal, qosFinal);
log.info("【MQTT初始化】订阅主题完成clientId{}topicsFinal={}",
mqttClient.getClientId(), String.join(",", topicsFinal));
} catch (MqttException e) {
log.error("【MQTT初始化】订阅失败clientId{}topicsFinal={}",
safeClientId(), String.join(",", topicsFinal), e);
throw e;
}
} }
private String safeClientId() { private String safeClientId() {