注释没用逻辑

feasure
xce 2026-01-16 23:30:08 +08:00
parent bc21635090
commit bd24cec8cc
2 changed files with 47 additions and 185 deletions

View File

@ -123,7 +123,8 @@ public class MqttConfig {
// 关闭清除会话false=重连后保留订阅关系若不需要离线消息可设为true
// 优化生产环境建议设为false重连后保留订阅关系避免丢失离线消息
connectOptions.setCleanSession(false);
// 开启自动重连:连接断开后自动尝试重连,提升稳定性
// 开启自动重连连接断开后自动尝试重连提升稳定性方案A核心
connectOptions.setAutomaticReconnect(true);
// 设置最大重连间隔(秒):避免频繁重连消耗资源
connectOptions.setMaxReconnectDelay(30);
@ -235,7 +236,6 @@ public class MqttConfig {
}
}
/**
* MQTTBean使
*/

View File

@ -1,6 +1,5 @@
package com.agri.framework.interceptor;
import com.agri.common.utils.uuid.UUID;
import com.agri.framework.config.MqttConfig;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
@ -8,7 +7,6 @@ import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
@ -28,8 +26,6 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -41,6 +37,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
* 3.
* 4. ++
* JDK 8
*
* A
* 1) new MqttClient mqttMessageSender client
* 2) MqttConnectOptions#setAutomaticReconnect(true) Paho
* 3) unsubscribe cleanSession=false
*/
@Component
public class MqttMessageHandler implements SmartLifecycle {
@ -67,45 +68,33 @@ public class MqttMessageHandler implements SmartLifecycle {
// 新增生命周期管理标识控制MQTT客户端启动/关闭
private final AtomicBoolean isRunning = new AtomicBoolean(false);
/** MQTT连接配置项从MqttConfig注入 */
@Resource
private MqttConnectOptions mqttConnectOptions;
// ========== 新增:重连相关配置 ==========
// 重连间隔(秒),可配置化
@Value("${spring.mqtt.reconnect-interval:5}")
private int reconnectInterval;
// 最大重连次数(-1表示无限重连
@Value("${spring.mqtt.max-reconnect-times:-1}")
private int maxReconnectTimes;
// 重连线程池(单线程,避免并发重连)
private final ScheduledExecutorService reconnectExecutor = Executors.newSingleThreadScheduledExecutor();
// 当前重连次数计数
private int currentReconnectCount = 0;
// 配置错误标记(避免配置错时无限重连)
private volatile boolean isConfigError = false;
/**
* +
* @PostConstructSmartLifecyclestart()
*
* APahoconnectOptions.setAutomaticReconnect(true)
*/
public void subscribeTopics() throws MqttException {
// 关键补充1判空(重连后替换的新实例可能为空)
// 关键补充1判空
if (mqttClient == null) {
log.error("【MQTT初始化】客户端实例为空无法订阅主题");
throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
}
// 关键补充:先确保客户端已连接(如果没连则连接,已连则跳过)
// 注意这里只使用同一个client实例避免sender与handler使用不同client
if (!mqttClient.isConnected()) {
try {
// 使用注入的连接配置项连接Broker带用户名密码、重连等配置)
// 使用注入的连接配置项连接Broker带用户名密码、自动重连等配置)
mqttClient.connect(mqttConnectOptions);
log.info("【MQTT连接】客户端已成功连接到BrokerclientId{}", mqttClient.getClientId());
} catch (MqttException e) {
log.error("【MQTT连接】连接Broker失败clientId{}", mqttClient.getClientId(), e);
throw e; // 抛出异常让start()处理重连
throw e;
}
}
@ -117,14 +106,6 @@ public class MqttMessageHandler implements SmartLifecycle {
qosArray[i] = 1;
}
// 关键补充2先取消原有订阅避免重复订阅导致的消息重复接收
try {
mqttClient.unsubscribe(topics);
log.info("【MQTT初始化】已取消原有主题订阅准备重新订阅");
} catch (Exception e) {
log.warn("【MQTT初始化】取消原有订阅失败首次订阅可忽略{}", e.getMessage());
}
// 设置MQTT消息回调处理连接断开、消息接收、消息发布完成
mqttClient.setCallback(new MqttCallback() {
/**
@ -134,10 +115,13 @@ public class MqttMessageHandler implements SmartLifecycle {
@Override
public void connectionLost(Throwable cause) {
// 优化替换System.err为log.error
log.error("【MQTT连接异常】连接断开clientId{},原因:{}", mqttClient.getClientId(), cause.getMessage(), cause);
// 新增:触发自动重连(仅当客户端处于运行状态且非配置错误时)
if (isRunning.get() && !isConfigError) {
startReconnect();
log.error("【MQTT连接异常】连接断开clientId{},原因:{}",
safeClientId(), (cause == null ? "unknown" : cause.getMessage()), cause);
// 【方案A】不再触发自写重连Paho自动重连会接管重连过程
// 这里只记录日志即可
if (isRunning.get()) {
log.warn("【MQTT自动重连】已开启automaticReconnect等待Paho自动重连...");
}
}
@ -161,20 +145,28 @@ public class MqttMessageHandler implements SmartLifecycle {
public void deliveryComplete(IMqttDeliveryToken token) {
// 优化替换System.out为log.info增加空值校验
if (token != null && token.getTopics() != null && token.getTopics().length > 0) {
log.info("【MQTT确认】消息发布完成clientId{},主题:{}", mqttClient.getClientId(), token.getTopics()[0]);
log.info("【MQTT确认】消息发布完成clientId{},主题:{}", safeClientId(), token.getTopics()[0]);
}
}
});
// 【方案A关键点】不再 unsubscribe 主题
// cleanSession=false + unsubscribe 会破坏Broker侧会话订阅并且自动重连场景更不建议这么做
// 订阅主题
mqttClient.subscribe(topics, qosArray);
// 重置重连计数和配置错误标记(连接成功后清零)
currentReconnectCount = 0;
isConfigError = false;
// 优化打印clientId方便排查重连后的实例是否替换成功
// 优化打印clientId方便排查
log.info("【MQTT初始化】订阅主题完成clientId{},订阅主题:{}", mqttClient.getClientId(), String.join(",", topics));
}
private String safeClientId() {
try {
return (mqttClient == null ? "null" : mqttClient.getClientId());
} catch (Exception e) {
return "unknown";
}
}
/**
* \
*
@ -425,135 +417,23 @@ public class MqttMessageHandler implements SmartLifecycle {
return keys;
}
// ========== 新增:自动重连核心方法 ==========
// ========== 手动重连接口供Controller调用 ==========
/**
*
*/
private void startReconnect() {
// 终止条件:配置错误 或 达到最大重连次数
if (isConfigError || (maxReconnectTimes > 0 && currentReconnectCount >= maxReconnectTimes)) {
String reason = isConfigError ? "配置错误" : String.format("达到最大重连次数(%d)", maxReconnectTimes);
log.error("【MQTT重连】{},停止重连", reason);
// isRunning.set(false);
return;
}
// 极端场景兜底:客户端实例为空则终止重连
if (mqttClient == null) {
log.error("【MQTT重连】客户端实例为空终止重连");
return;
}
// 提交重连任务到单线程池
reconnectExecutor.schedule(() -> {
try {
currentReconnectCount++;
log.info("【MQTT重连】第{}次尝试重连(间隔{}秒)", currentReconnectCount, reconnectInterval);
// 最小改动补充close+置空,彻底释放旧实例
try {
if (mqttClient.isConnected()) {
mqttClient.disconnect();
log.info("【MQTT重连】已断开旧连接");
}
// 新增:关闭旧实例释放资源
mqttClient.close();
} catch (Exception e) {
log.warn("【MQTT重连】断开/关闭旧连接失败(忽略):{}", e.getMessage());
}
// 2. 生成新的clientId和MqttConfig中一致的规则原clientId + 随机后缀)
// 新代码(字母数字混合,和配置文件对齐)
String oldClientId = mqttClient.getClientId();
String originalClientId = oldClientId.contains("_") ? oldClientId.split("_")[0] : oldClientId;
// 调用工具方法生成8位字母数字随机串
String shortRandom = UUID.generateAlphanumericRandom(8);
String newClientId = originalClientId + "_" + shortRandom;
// 3. 创建新的MqttClient实例
MqttClient newMqttClient = new MqttClient(
mqttClient.getServerURI(),
newClientId,
new MemoryPersistence()
);
// 4. 用新客户端连接
newMqttClient.connect(mqttConnectOptions);
log.info("【MQTT重连】使用新clientId连接成功{}", newClientId);
// 5. 替换旧客户端实例,重新订阅
this.mqttClient = newMqttClient;
// 重新连接MQTT Broker + 重新订阅主题
subscribeTopics();
log.info("【MQTT重连】第{}次重连成功新clientId{}", currentReconnectCount, newClientId);
} catch (MqttException e) {
log.error("【MQTT重连】第{}次重连失败:{}", currentReconnectCount, e.getMessage(), e);
// 判断是否为配置类错误(永久错误)
judgeConfigError(e);
// 非配置错误则继续重连
if (!isConfigError) {
startReconnect();
}
}
}, reconnectInterval, TimeUnit.SECONDS);
}
/**
* MQTT/
* Paho
*/
private void judgeConfigError(MqttException e) {
int errorCode = e.getReasonCode();
switch (errorCode) {
// 21 = 认证失败(用户名/密码错误)→ 配置错误
case 21:
isConfigError = true;
log.error("【MQTT配置错误】认证失败请检查用户名/密码,错误码:{}", errorCode);
break;
// 32 = 客户端ID非法 → 配置错误
case 32:
isConfigError = true;
log.error("【MQTT配置错误】客户端ID非法请检查clientId配置错误码{}", errorCode);
break;
// 3 = 连接被拒绝(地址/端口错误)→ 配置错误
case 3:
isConfigError = true;
log.error("【MQTT配置错误】连接被拒绝请检查Broker地址/端口,错误码:{}", errorCode);
break;
// 31 = Broker不可达网络波动→ 继续重连
case 31:
log.warn("【MQTT错误】Broker不可达网络波动错误码{},继续重连", errorCode);
break;
// 其他错误 → 默认视为临时错误
default:
log.warn("【MQTT未知错误】错误码{},继续重连", errorCode);
break;
}
}
// ========== 新增手动重连接口供Controller调用 ==========
/**
* MQTT
* MQTTclientclient
*/
public String manualReconnect() {
// 重置配置错误标记(允许重连)
isConfigError = false;
currentReconnectCount = 0;
isRunning.set(true);
try {
// 强制断开旧连接(如果存在)
if (mqttClient.isConnected()) {
if (mqttClient != null && mqttClient.isConnected()) {
mqttClient.disconnect();
}
// 重新初始化订阅
// 重新初始化订阅内部会connect + subscribe
subscribeTopics();
log.info("【手动重连】MQTT客户端重连成功");
return "MQTT手动重连成功";
} catch (MqttException e) {
log.error("【手动重连】MQTT客户端重连失败", e);
judgeConfigError(e);
return "MQTT手动重连失败" + e.getMessage();
}
}
@ -562,11 +442,9 @@ public class MqttMessageHandler implements SmartLifecycle {
* MQTT
*/
public String getMqttStatus() {
boolean connected = mqttClient.isConnected();
boolean connected = (mqttClient != null && mqttClient.isConnected());
String status = connected ? "已连接" : "已断开";
String configErrorStatus = isConfigError ? "(配置错误,已终止自动重连)" : "";
String reconnectStatus = String.format("当前重连次数:%d最大重连次数%d", currentReconnectCount, maxReconnectTimes);
return String.format("MQTT连接状态%s%s%s", status, configErrorStatus, reconnectStatus);
return String.format("MQTT连接状态%sclientId%s", status, safeClientId());
}
// ======================== SmartLifecycle 生命周期管理(核心修复) ========================
@ -580,16 +458,11 @@ public class MqttMessageHandler implements SmartLifecycle {
if (isRunning.compareAndSet(false, true)) {
try {
// 核心修改:无论是否已连接,都执行订阅(设置回调+订阅主题)
// 移除原有的if (!mqttClient.isConnected()) 判断
subscribeTopics();
log.info("【MQTT生命周期】客户端启动成功已设置回调+订阅主题)");
} catch (MqttException e) {
log.error("【MQTT生命周期】客户端启动失败", e);
isRunning.set(false);
judgeConfigError(e); // 启动失败时判断配置错误
if (!isConfigError) {
startReconnect();
}
}
}
}
@ -603,26 +476,15 @@ public class MqttMessageHandler implements SmartLifecycle {
// 修复JDK 8正确的compareAndSet写法无命名参数
if (isRunning.compareAndSet(true, false)) {
try {
// 新增:关闭重连线程池,避免内存泄漏
reconnectExecutor.shutdown();
if (!reconnectExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
reconnectExecutor.shutdownNow();
}
if (mqttClient != null && mqttClient.isConnected()) {
// 移除:取消订阅相关逻辑(避免依赖不存在的方法)
// 直接断开连接基础Paho支持的核心方法
if (mqttClient != null) {
// 注意disconnect 只在已连接时调用close 尽量无条件释放资源
if (mqttClient.isConnected()) {
mqttClient.disconnect();
// 关闭客户端释放资源基础Paho支持的核心方法
}
mqttClient.close();
log.info("【MQTT生命周期】客户端已优雅关闭");
}
// 移除resetConnection()避免Redis版本差异
// 替代方案无需主动重置Spring上下文重启会重新创建Redis连接
// 新增:重置重连相关状态
currentReconnectCount = 0;
isConfigError = false;
} catch (MqttException | InterruptedException e) {
} catch (Exception e) {
log.error("【MQTT生命周期】客户端关闭失败", e);
}
}