暂提+mqtt

feasure
xce 2026-01-16 14:48:55 +08:00
parent e2cdd43a8c
commit 38e73ba0d2
1 changed files with 85 additions and 4 deletions

View File

@ -9,6 +9,7 @@ import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.SmartLifecycle;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions;
@ -16,15 +17,16 @@ import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* MQTT
@ -36,7 +38,7 @@ import java.util.concurrent.TimeUnit;
* JDK 8
*/
@Component
public class MqttMessageHandler {
public class MqttMessageHandler implements SmartLifecycle {
/** MQTT客户端由MqttConfig配置类注入 */
@Resource
@ -57,10 +59,13 @@ public class MqttMessageHandler {
// 优化统一使用SLF4J日志JDK 8兼容
private static final Logger log = LoggerFactory.getLogger(MqttMessageHandler.class);
// 新增生命周期管理标识控制MQTT客户端启动/关闭
private final AtomicBoolean isRunning = new AtomicBoolean(false);
/**
* +
* @PostConstructSmartLifecyclestart()
*/
@PostConstruct
public void subscribeTopics() throws MqttException {
// 解析配置的主题列表
String[] topics = defaultTopic.split(",");
@ -338,7 +343,7 @@ public class MqttMessageHandler {
// 新增生产环境用Scan替代Keys避免Redis阻塞JDK 8兼容
private Set<String> scanRedisKeys(String pattern) {
Set<String> keys = new java.util.HashSet<>();
Set<String> keys = new HashSet<>();
try {
stringRedisTemplate.executeWithStickyConnection((RedisConnection connection) -> {
ScanOptions scanOptions = ScanOptions.scanOptions()
@ -362,4 +367,80 @@ public class MqttMessageHandler {
return keys;
}
// ======================== SmartLifecycle 生命周期管理(核心修复) ========================
/**
* MQTTSpring/
* @PostConstructMQTT
*/
@Override
public void start() {
if (isRunning.compareAndSet(false, true)) {
try {
// 重新初始化MQTT订阅和回调
subscribeTopics();
log.info("【MQTT生命周期】客户端启动成功");
} catch (MqttException e) {
log.error("【MQTT生命周期】客户端启动失败", e);
isRunning.set(false);
}
}
}
/**
* MQTTSpring/
* /
*/
@Override
public void stop() {
// 修复JDK 8正确的compareAndSet写法无命名参数
if (isRunning.compareAndSet(true, false)) {
try {
if (mqttClient != null && mqttClient.isConnected()) {
// 移除:取消订阅相关逻辑(避免依赖不存在的方法)
// 直接断开连接基础Paho支持的核心方法
mqttClient.disconnect();
// 关闭客户端释放资源基础Paho支持的核心方法
mqttClient.close();
log.info("【MQTT生命周期】客户端已优雅关闭");
}
// 移除resetConnection()避免Redis版本差异
// 替代方案无需主动重置Spring上下文重启会重新创建Redis连接
} catch (MqttException e) {
log.error("【MQTT生命周期】客户端关闭失败", e);
}
}
}
/**
* JDK 8
*/
@Override
public void stop(Runnable callback) {
stop();
callback.run();
}
/**
* MQTT
*/
@Override
public boolean isRunning() {
return isRunning.get();
}
/**
* MQTTRedis
*/
@Override
public int getPhase() {
return 10;
}
/**
* trueSpringstart()
*/
@Override
public boolean isAutoStartup() {
return true;
}
}