From 1c3dc095e4b2e1ae9d8602cf07636e89baf8d4e4 Mon Sep 17 00:00:00 2001 From: xce Date: Sat, 24 Jan 2026 19:53:48 +0800 Subject: [PATCH] =?UTF-8?q?mqtt=E6=9E=B6=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/agri/framework/config/MqttConfig.java | 6 +- .../interceptor/DeviceStatusHandler.java | 2 +- .../framework/manager/MqttClientManager.java | 117 +++++++++++++----- 3 files changed, 89 insertions(+), 36 deletions(-) diff --git a/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java b/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java index 477f52b..82f04fe 100644 --- a/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java +++ b/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java @@ -122,10 +122,10 @@ public class MqttConfig { connectOptions.setKeepAliveInterval(keepAlive); // 关闭清除会话:false=重连后保留订阅关系(若不需要离线消息可设为true) // 优化:生产环境建议设为false,重连后保留订阅关系,避免丢失离线消息 - connectOptions.setCleanSession(false); - + connectOptions.setCleanSession(true); + connectOptions.setMaxInflight(200); // 开启自动重连:连接断开后自动尝试重连,提升稳定性(方案A核心) - connectOptions.setAutomaticReconnect(false); + connectOptions.setAutomaticReconnect(true); // 设置最大重连间隔(秒):避免频繁重连消耗资源 connectOptions.setMaxReconnectDelay(30); return connectOptions; diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceStatusHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceStatusHandler.java index aa22e10..0633bfe 100644 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceStatusHandler.java +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceStatusHandler.java @@ -221,7 +221,7 @@ public class DeviceStatusHandler { String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; // 发布消息 mqttMessageSender.publish(frontendTopic, payload); - log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic); + // log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic); } // 删掉设备对应的客户端 if (stale != null && !stale.isEmpty()) { diff --git a/agri-framework/src/main/java/com/agri/framework/manager/MqttClientManager.java b/agri-framework/src/main/java/com/agri/framework/manager/MqttClientManager.java index c567f44..ab41a3a 100644 --- a/agri-framework/src/main/java/com/agri/framework/manager/MqttClientManager.java +++ b/agri-framework/src/main/java/com/agri/framework/manager/MqttClientManager.java @@ -2,7 +2,7 @@ package com.agri.framework.manager; import com.agri.framework.web.dispatcher.MqttMessageDispatcher; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; @@ -15,6 +15,10 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -71,6 +75,20 @@ public class MqttClientManager implements SmartLifecycle { @Value("${spring.mqtt.auto-off-thread-pool-size:5}") private int autoOffThreadPoolSize; + private final ThreadPoolExecutor mqttBizPool = + new ThreadPoolExecutor( + 8, // core + 16, // max + 60, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(2000), // 有界、较小 + r -> { + Thread t = new Thread(r); + t.setName("mqtt-biz-" + t.getId()); + t.setDaemon(true); + return t; + }, + new ThreadPoolExecutor.DiscardPolicy() // 直接丢 + ); /** * 初始化:订阅主题+设置回调 * (移除@PostConstruct,改为由SmartLifecycle的start()触发) @@ -84,36 +102,41 @@ public class MqttClientManager implements SmartLifecycle { throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED); } - // 关键补充:先确保客户端已连接(如果没连则连接,已连则跳过) - // 注意:这里只使用同一个client实例,避免sender与handler使用不同client - if (!mqttClient.isConnected()) { - try { - // 使用注入的连接配置项连接Broker(带用户名密码、自动重连等配置) - mqttClient.connect(mqttConnectOptions); - log.info("【MQTT连接】客户端已成功连接到Broker,clientId:{}", mqttClient.getClientId()); - } catch (MqttException e) { - log.error("【MQTT连接】连接Broker失败,clientId:{}", mqttClient.getClientId(), e); - throw e; - } - } - // 解析配置的主题列表 - String[] topics = defaultTopic.split(","); - int[] qosArray = new int[topics.length]; - // 按主题类型设置QoS:控制指令/状态用QoS 1 - for (int i = 0; i < topics.length; i++) { - qosArray[i] = 0; - topics[i] = topics[i].trim(); - } + final String[] topicsFinal = Arrays.stream(defaultTopic.split(",")) + .map(String::trim).toArray(String[]::new); + final int[] qosFinal = new int[topicsFinal.length]; + Arrays.fill(qosFinal, 0); + // 设置MQTT消息回调:处理连接断开、消息接收、消息发布完成 - mqttClient.setCallback(new MqttCallback() { + mqttClient.setCallback(new MqttCallbackExtended() { + @Override + public void connectComplete(boolean reconnect, String serverURI) { + log.info("【MQTT连接完成】reconnect={}, serverURI={}, clientId={}", + reconnect, serverURI, safeClientId()); + // cleanSession=true:重连后必须补订阅 + if (reconnect) { + try { + mqttClient.subscribe(topicsFinal, qosFinal); + log.info("【MQTT订阅恢复】topicsFinal={}", String.join(",", topicsFinal)); + } catch (Exception e) { + log.error("【MQTT订阅恢复失败】", e); + } + } + } + /** * MQTT连接断开回调 * @param cause 断开原因 */ @Override public void connectionLost(Throwable cause) { + log.info("autoReconnect={}, cleanSession={}, keepAlive={}", + mqttConnectOptions.isAutomaticReconnect(), + mqttConnectOptions.isCleanSession(), + mqttConnectOptions.getKeepAliveInterval()); + log.error("【MQTT连接异常】连接断开,clientId:{},原因:{}", safeClientId(), (cause == null ? "unknown" : cause.getMessage()), cause); @@ -125,15 +148,32 @@ public class MqttClientManager implements SmartLifecycle { } /** - * 收到MQTT消息回调:转发到消息分发器 + * 收到MQTT消息回调:核心处理入口 * @param topic 消息主题 * @param message 消息内容 * @throws Exception 消息处理异常 */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { - // 优化:显式指定UTF-8编码,避免乱码(JDK 8兼容) - mqttMessageDispatcher.handleMessage(topic, new String(message.getPayload(), StandardCharsets.UTF_8)); + + final String payload = new String(message.getPayload(), StandardCharsets.UTF_8); + if (message.isRetained()) { + log.info("ignore retained snapshot: {}", topic); + return; + } + + mqttBizPool.execute(() -> { + log.debug("mqttBizPool active={}, queue={}", + mqttBizPool.getActiveCount(), + mqttBizPool.getQueue().size()); + try { + // 优化:显式指定UTF-8编码,避免乱码(JDK 8兼容) + mqttMessageDispatcher.handleMessage(topic, payload); + } catch (Exception e) { + log.error("【MQTT消息处理异常】topic={}, payload={}", topic, payload, e); + } + }); + } /** @@ -147,14 +187,27 @@ public class MqttClientManager implements SmartLifecycle { } } }); - - // 【方案A关键点】不再 unsubscribe 主题 - // cleanSession=false + unsubscribe 会破坏Broker侧会话订阅;并且自动重连场景更不建议这么做 - + if (!mqttClient.isConnected()) { + try { + // 使用注入的连接配置项连接Broker(带用户名密码、自动重连等配置) + mqttClient.connect(mqttConnectOptions); + log.info("【MQTT连接】客户端已成功连接到Broker,clientId:{}", mqttClient.getClientId()); + } catch (MqttException e) { + log.error("【MQTT连接】连接Broker失败,clientId:{}", mqttClient.getClientId(), e); + throw e; + } + } // 订阅主题 - mqttClient.subscribe(topics, qosArray); - // 优化:打印clientId,方便排查 - log.info("【MQTT初始化】订阅主题完成,clientId:{},订阅主题:{}", mqttClient.getClientId(), String.join(",", topics)); + // connect 后首次订阅 + try { + mqttClient.subscribe(topicsFinal, qosFinal); + log.info("【MQTT初始化】订阅主题完成,clientId:{},topicsFinal={}", + mqttClient.getClientId(), String.join(",", topicsFinal)); + } catch (MqttException e) { + log.error("【MQTT初始化】订阅失败,clientId:{},topicsFinal={}", + safeClientId(), String.join(",", topicsFinal), e); + throw e; + } } private String safeClientId() {