diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java index 7901f01..29971b8 100644 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java @@ -9,6 +9,7 @@ import org.eclipse.paho.client.mqttv3.MqttMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.SmartLifecycle; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.core.Cursor; import org.springframework.data.redis.core.ScanOptions; @@ -16,15 +17,16 @@ import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; -import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.nio.charset.StandardCharsets; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * MQTT消息处理器(无心跳包版本) @@ -36,7 +38,7 @@ import java.util.concurrent.TimeUnit; * 适配JDK 8,无心跳包相关逻辑 */ @Component -public class MqttMessageHandler { +public class MqttMessageHandler implements SmartLifecycle { /** MQTT客户端(由MqttConfig配置类注入) */ @Resource @@ -57,10 +59,13 @@ public class MqttMessageHandler { // 优化:统一使用SLF4J日志(JDK 8兼容) private static final Logger log = LoggerFactory.getLogger(MqttMessageHandler.class); + // 新增:生命周期管理标识,控制MQTT客户端启动/关闭 + private final AtomicBoolean isRunning = new AtomicBoolean(false); + /** * 初始化:订阅主题+设置回调 + * (移除@PostConstruct,改为由SmartLifecycle的start()触发) */ - @PostConstruct public void subscribeTopics() throws MqttException { // 解析配置的主题列表 String[] topics = defaultTopic.split(","); @@ -338,7 +343,7 @@ public class MqttMessageHandler { // 新增:生产环境用Scan替代Keys,避免Redis阻塞(JDK 8兼容) private Set scanRedisKeys(String pattern) { - Set keys = new java.util.HashSet<>(); + Set keys = new HashSet<>(); try { stringRedisTemplate.executeWithStickyConnection((RedisConnection connection) -> { ScanOptions scanOptions = ScanOptions.scanOptions() @@ -362,4 +367,80 @@ public class MqttMessageHandler { return keys; } + // ======================== SmartLifecycle 生命周期管理(核心修复) ======================== + /** + * 启动MQTT客户端(Spring上下文初始化/重启时触发) + * 核心:替代@PostConstruct,保证上下文重启时重新初始化MQTT连接 + */ + @Override + public void start() { + if (isRunning.compareAndSet(false, true)) { + try { + // 重新初始化MQTT订阅和回调 + subscribeTopics(); + log.info("【MQTT生命周期】客户端启动成功"); + } catch (MqttException e) { + log.error("【MQTT生命周期】客户端启动失败", e); + isRunning.set(false); + } + } + } + + /** + * 停止MQTT客户端(Spring上下文销毁/重启时触发) + * 核心:移除所有不存在的方法,仅保留基础的断开/关闭逻辑 + */ + @Override + public void stop() { + // 修复:JDK 8正确的compareAndSet写法(无命名参数) + if (isRunning.compareAndSet(true, false)) { + try { + if (mqttClient != null && mqttClient.isConnected()) { + // 移除:取消订阅相关逻辑(避免依赖不存在的方法) + // 直接断开连接(基础Paho支持的核心方法) + mqttClient.disconnect(); + // 关闭客户端释放资源(基础Paho支持的核心方法) + mqttClient.close(); + log.info("【MQTT生命周期】客户端已优雅关闭"); + } + // 移除:resetConnection()(避免Redis版本差异) + // 替代方案:无需主动重置,Spring上下文重启会重新创建Redis连接 + } catch (MqttException e) { + log.error("【MQTT生命周期】客户端关闭失败", e); + } + } + } + + /** + * 异步停止(JDK 8兼容,默认实现) + */ + @Override + public void stop(Runnable callback) { + stop(); + callback.run(); + } + + /** + * 判断MQTT客户端是否运行中 + */ + @Override + public boolean isRunning() { + return isRunning.get(); + } + + /** + * 启动优先级(保证MQTT在Redis之后启动) + */ + @Override + public int getPhase() { + return 10; + } + + /** + * 是否自动启动(默认true,Spring上下文初始化时自动调用start()) + */ + @Override + public boolean isAutoStartup() { + return true; + } } \ No newline at end of file