feasure
xce 2026-01-16 14:30:24 +08:00
parent 6cfc8a1393
commit e2cdd43a8c
3 changed files with 157 additions and 38 deletions

View File

@ -74,6 +74,8 @@ spring:
restart:
# 热部署开关
enabled: true
# 禁用 MQTT 的热部署
exclude: com/agri/framework/config/MqttConfig.class,com/agri/framework/interceptor/MqttMessageHandler.class
# redis 配置
redis:
# 地址

View File

@ -5,9 +5,16 @@ import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
/**
* MQTT
@ -23,6 +30,8 @@ import org.springframework.context.annotation.Configuration;
*/
@Configuration
public class MqttConfig {
// 新增替换System.out为SLF4J日志JDK 8兼容生产环境必备
private static final Logger log = LoggerFactory.getLogger(MqttConfig.class);
/** Mosquitto服务器地址TCP协议格式为 tcp://IP:端口 */
@Value("${spring.mqtt.host}")
@ -52,6 +61,9 @@ public class MqttConfig {
@Value("${spring.mqtt.keep-alive:60}")
private int keepAlive;
// 新增:保存客户端实例,用于应用关闭时优雅断开连接
private MqttClient mqttClientInstance;
/**
* MQTTSpring Bean
*
@ -64,22 +76,37 @@ public class MqttConfig {
*/
@Bean
public MqttClient mqttClient() throws MqttException {
// 新增核心参数非空校验避免空指针JDK 8兼容
if (!StringUtils.hasText(host)) {
throw new IllegalArgumentException("MQTT服务器地址spring.mqtt.host不能为空");
}
if (!StringUtils.hasText(clientId)) {
throw new IllegalArgumentException("MQTT客户端IDspring.mqtt.client-id不能为空");
}
// 1. 初始化连接配置项
MqttConnectOptions connectOptions = getMqttConnectOptions();
// 2. 初始化MQTT客户端
// MemoryPersistence使用内存存储会话不持久化到磁盘适合后端服务
MemoryPersistence persistence = new MemoryPersistence();
MqttClient mqttClient = new MqttClient(host, clientId, persistence);
// 新增客户端ID拼接随机后缀避免多实例部署时冲突生产环境必备
String uniqueClientId = clientId + "_" + UUID.randomUUID().toString().substring(0, 8);
MqttClient mqttClient = new MqttClient(host, uniqueClientId, persistence);
// 3. 建立MQTT连接
if (!mqttClient.isConnected()) {
mqttClient.connect(connectOptions);
System.out.println("【MQTT连接成功】服务器地址" + host + "客户端ID" + clientId);
// 优化替换System.out为日志框架保留原有输出内容
log.info("【MQTT连接成功】服务器地址" + host + "客户端ID" + uniqueClientId);
} else {
System.out.println("【MQTT连接状态】已连接无需重复初始化");
// 优化替换System.out为日志框架保留原有输出内容
log.info("【MQTT连接状态】已连接无需重复初始化");
}
// 新增:保存实例到成员变量,用于优雅关闭
this.mqttClientInstance = mqttClient;
return mqttClient;
}
@ -94,7 +121,8 @@ public class MqttConfig {
// 设置保活间隔(秒):客户端每隔该时间发送一次心跳
connectOptions.setKeepAliveInterval(keepAlive);
// 关闭清除会话false=重连后保留订阅关系若不需要离线消息可设为true
connectOptions.setCleanSession(true);
// 优化生产环境建议设为false重连后保留订阅关系避免丢失离线消息
connectOptions.setCleanSession(false);
// 开启自动重连:连接断开后自动尝试重连,提升稳定性
connectOptions.setAutomaticReconnect(true);
// 设置最大重连间隔(秒):避免频繁重连消耗资源
@ -157,6 +185,14 @@ public class MqttConfig {
* @throws MqttException
*/
public void publish(String topic, String payload, int qos) throws MqttException {
// 新增:入参校验(避免空主题/空内容JDK 8兼容
if (!StringUtils.hasText(topic)) {
throw new IllegalArgumentException("MQTT发布主题不能为空");
}
if (payload == null) {
payload = ""; // 空内容默认设为空字符串避免NPE
}
// 1. 校验客户端是否已连接
if (!client.isConnected()) {
throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
@ -165,7 +201,8 @@ public class MqttConfig {
// 2. 构建MQTT消息对象
MqttMessage message = new MqttMessage();
// 设置消息内容(转换为字节数组)
message.setPayload(payload.getBytes());
// 优化显式指定UTF-8编码避免不同环境下编码不一致导致乱码JDK 8兼容
message.setPayload(payload.getBytes(StandardCharsets.UTF_8));
// 设置QoS级别
message.setQos(qos);
// 设置保留消息true=服务端保留该主题的最新消息,新订阅者可立即获取
@ -173,7 +210,22 @@ public class MqttConfig {
// 3. 发布消息
client.publish(topic, message);
System.out.println("【MQTT消息发布成功】主题" + topic + ",内容:" + payload);
// 优化替换System.out为日志框架保留原有输出内容
log.info("【MQTT消息发布成功】主题" + topic + ",内容:" + payload);
}
}
// 新增应用关闭时优雅断开MQTT连接避免连接泄漏JDK 8兼容
@PreDestroy
public void destroyMqttClient() {
if (mqttClientInstance != null && mqttClientInstance.isConnected()) {
try {
mqttClientInstance.disconnect();
mqttClientInstance.close();
log.info("【MQTT连接关闭】客户端已优雅断开连接");
} catch (MqttException e) {
log.error("【MQTT连接关闭异常】" + e.getMessage(), e);
}
}
}
}

View File

@ -9,11 +9,16 @@ import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
@ -49,7 +54,9 @@ public class MqttMessageHandler {
@Value("${spring.mqtt.default-topic:dtu/+/up,frontend/+/control/+}")
private String defaultTopic;
Logger log = LoggerFactory.getLogger(MqttMessageHandler.class);
// 优化统一使用SLF4J日志JDK 8兼容
private static final Logger log = LoggerFactory.getLogger(MqttMessageHandler.class);
/**
* +
*/
@ -71,7 +78,8 @@ public class MqttMessageHandler {
*/
@Override
public void connectionLost(Throwable cause) {
System.err.println("【MQTT连接异常】连接断开" + cause.getMessage());
// 优化替换System.err为log.error
log.error("【MQTT连接异常】连接断开{}", cause.getMessage(), cause);
}
/**
@ -82,8 +90,8 @@ public class MqttMessageHandler {
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// 将字节数组转换为字符串,分发处理不同主题的消息
handleMessage(topic, new String(message.getPayload()));
// 优化显式指定UTF-8编码避免乱码JDK 8兼容
handleMessage(topic, new String(message.getPayload(), StandardCharsets.UTF_8));
}
/**
@ -92,14 +100,17 @@ public class MqttMessageHandler {
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// QoS 1确认消息已被服务端接收
System.out.println("【MQTT确认】消息发布完成主题" + token.getTopics()[0]);
// 优化替换System.out为log.info增加空值校验
if (token != null && token.getTopics() != null && token.getTopics().length > 0) {
log.info("【MQTT确认】消息发布完成主题{}", token.getTopics()[0]);
}
}
});
// 订阅主题
mqttClient.subscribe(topics, qosArray);
System.out.println("【MQTT初始化】订阅主题" + String.join(",", topics));
// 优化替换System.out为log.info
log.info("【MQTT初始化】订阅主题{}", String.join(",", topics));
}
/**
@ -109,7 +120,8 @@ public class MqttMessageHandler {
*/
private void handleMessage(String topic, String payload) {
try {
System.out.println("【MQTT接收】topic=" + topic + ", payload=" + payload);
// 优化替换System.out为log.info
log.info("【MQTT接收】topic={}, payload={}", topic, payload);
// 设备状态主题dtu/{deviceId}/up
if (topic.matches("dtu/\\w+/up")) {
@ -120,8 +132,8 @@ public class MqttMessageHandler {
handleFrontendControl(topic, payload);
}
} catch (Exception e) {
System.err.println("【MQTT消息处理异常】topic=" + topic + ", 异常信息:" + e.getMessage());
e.printStackTrace();
// 优化替换System.err为log.error打印完整堆栈
log.error("【MQTT消息处理异常】topic={}", topic, e);
}
}
@ -141,10 +153,12 @@ public class MqttMessageHandler {
String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/up";
// 发布消息
mqttMessageSender.publish(frontendTopic, payload);
System.out.println("【设备状态转发】设备" + deviceId + " → 前端" + clientId + ",主题:" + frontendTopic);
// 优化替换System.out为log.info
log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic);
}
} else {
System.out.println("【设备状态转发】设备" + deviceId + "无订阅前端,跳过转发");
// 优化替换System.out为log.info
log.info("【设备状态转发】设备{}无订阅前端,跳过转发", deviceId);
}
}
@ -157,36 +171,43 @@ public class MqttMessageHandler {
String clientId = parts[1];
String deviceId = parts[3];
// 新增入参非空校验JDK 8兼容
if (!StringUtils.hasText(clientId) || !StringUtils.hasText(deviceId)) {
log.error("【指令处理】clientId或deviceId为空topic={}", topic);
return;
}
// 1. 权限校验示例admin开头有全权限
if (!checkPermission(clientId, deviceId)) {
String errorTopic = "frontend/" + clientId + "/error/" + deviceId;
mqttMessageSender.publish(errorTopic, "{\"msg\":\"无设备操作权限\"}");
System.err.println("【权限校验】前端" + clientId + "操作设备" + deviceId + "失败");
// 优化替换System.err为log.warn
log.warn("【权限校验】前端{}操作设备{}失败", clientId, deviceId);
return;
}
// 2. 分布式锁:避免多前端并发控制
String lockKey = "lock:" + deviceId;
Boolean lockSuccess = stringRedisTemplate.opsForValue().setIfAbsent(
lockKey, clientId, 10, TimeUnit.SECONDS
lockKey, clientId, 10, TimeUnit.SECONDS // 优化:显式指定时间单位
);
if (lockSuccess == null || !lockSuccess) {
String errorTopic = "frontend/" + clientId + "/error/" + deviceId;
mqttMessageSender.publish(errorTopic, "{\"msg\":\"设备忙,请稍后重试\"}");
System.err.println("【分布式锁】前端" + clientId + "操作设备" + deviceId + "失败");
// 优化替换System.err为log.warn
log.warn("【分布式锁】前端{}操作设备{}失败", clientId, deviceId);
return;
}
// 3. 记录日志
System.out.println(String.format(
"【指令处理】前端%s于%s控制设备%s指令%s",
clientId, LocalDateTime.now(), deviceId, payload
));
log.info("【指令处理】前端{}于{}控制设备{},指令:{}",
clientId, LocalDateTime.now(), deviceId, payload);
// 4. 转发指令到设备
String deviceTopic = "dtu/" + deviceId + "/control";
mqttMessageSender.publish(deviceTopic, payload);
System.out.println("【指令转发】前端" + clientId + " → 设备" + deviceId);
// 优化替换System.out为log.info
log.info("【指令转发】前端{} → 设备{}", clientId, deviceId);
}
/**
@ -212,9 +233,16 @@ public class MqttMessageHandler {
* Controller
*/
public void subscribeDevice(String clientId, String deviceId) {
// 新增入参非空校验JDK 8兼容
if (!StringUtils.hasText(clientId) || !StringUtils.hasText(deviceId)) {
log.error("【订阅管理】clientId或deviceId不能为空");
throw new IllegalArgumentException("clientId和deviceId不能为空");
}
// 保存订阅关系到Redis
stringRedisTemplate.opsForSet().add("sub:" + deviceId, clientId);
System.out.println("【订阅管理】前端" + clientId + "订阅设备" + deviceId + "成功");
// 优化替换System.out为log.info
log.info("【订阅管理】前端{}订阅设备{}成功", clientId, deviceId);
// 推送设备最新状态(可选)
String latestStatus = stringRedisTemplate.opsForValue().get("device:latest:" + deviceId);
@ -223,7 +251,8 @@ public class MqttMessageHandler {
try {
mqttMessageSender.publish(frontendTopic, latestStatus);
} catch (MqttException e) {
System.err.println("【订阅推送】设备" + deviceId + "状态推送失败:" + e.getMessage());
// 优化替换System.err为log.error
log.error("【订阅推送】设备{}状态推送失败", deviceId, e);
}
}
}
@ -236,12 +265,17 @@ public class MqttMessageHandler {
* @param deviceId ID
*/
public void unsubscribeDevice(String clientId, String deviceId) {
// 从Redis删除订阅关系
stringRedisTemplate.opsForSet().remove("sub:" + deviceId, clientId);
System.out.println("【前端取消订阅】前端" + clientId + "取消订阅设备" + deviceId + "成功");
// 新增入参非空校验JDK 8兼容
if (!StringUtils.hasText(clientId) || !StringUtils.hasText(deviceId)) {
log.error("【前端取消订阅】clientId或deviceId不能为空");
throw new IllegalArgumentException("clientId和deviceId不能为空");
}
// 从Redis删除订阅关系
stringRedisTemplate.opsForSet().remove("sub:" + deviceId, clientId);
// 优化替换System.out为log.info
log.info("【前端取消订阅】前端{}取消订阅设备{}成功", clientId, deviceId);
}
/**
* clientId
@ -249,6 +283,11 @@ public class MqttMessageHandler {
* @return MQTT
*/
public List<String> unsubscribeAllDevice(String clientId) {
// 新增入参非空校验JDK 8兼容
if (!StringUtils.hasText(clientId)) {
log.error("【批量取消】clientId不能为空");
throw new IllegalArgumentException("clientId不能为空");
}
// 适配低版本的Redis连接可用性校验替换掉isRunning()
try {
@ -257,9 +296,9 @@ public class MqttMessageHandler {
log.warn("Redis连接不可用取消订阅操作跳过{}", e.getMessage());
return Collections.emptyList();
}
// 步骤1查询该前端订阅的所有设备IDRedis中所有sub:*集合中包含该clientId的key
// 注意生产环境建议用scan代替keys避免阻塞Redis
Set<String> subKeys = stringRedisTemplate.keys("sub:*");
// 步骤1查询该前端订阅的所有设备ID生产环境用Scan替代Keys避免阻塞Redis
Set<String> subKeys = scanRedisKeys("sub:*");
List<String> deviceIds = new ArrayList<>();
List<String> frontendTopics = new ArrayList<>();
@ -276,25 +315,51 @@ public class MqttMessageHandler {
frontendTopics.add(frontendTopic);
// 从该设备的订阅列表中移除clientId
stringRedisTemplate.opsForSet().remove(subKey, clientId);
System.out.println("【批量取消】前端" + clientId + "取消设备" + deviceId + "订阅");
log.info("【批量取消】前端{}取消设备{}订阅", clientId, deviceId);
}
}
}
// 步骤2清理该前端的分布式锁可选防止死锁
Set<String> lockKeys = stringRedisTemplate.keys("lock:*");
Set<String> lockKeys = scanRedisKeys("lock:*");
if (lockKeys != null && !lockKeys.isEmpty()) {
for (String lockKey : lockKeys) {
String lockValue = stringRedisTemplate.opsForValue().get(lockKey);
if (clientId.equals(lockValue)) {
stringRedisTemplate.delete(lockKey);
System.out.println("【批量取消】清理前端" + clientId + "持有的锁:" + lockKey);
log.info("【批量取消】清理前端{}持有的锁:{}", clientId, lockKey);
}
}
}
System.out.println("【批量取消】前端" + clientId + "共取消" + deviceIds.size() + "个设备订阅");
log.info("【批量取消】前端{}共取消{}个设备订阅", clientId, deviceIds.size());
return frontendTopics;
}
// 新增生产环境用Scan替代Keys避免Redis阻塞JDK 8兼容
private Set<String> scanRedisKeys(String pattern) {
Set<String> keys = new java.util.HashSet<>();
try {
stringRedisTemplate.executeWithStickyConnection((RedisConnection connection) -> {
ScanOptions scanOptions = ScanOptions.scanOptions()
.match(pattern)
.count(100)
.build();
Cursor<byte[]> cursor = connection.scan(scanOptions);
while (cursor.hasNext()) {
byte[] keyBytes = cursor.next();
String key = stringRedisTemplate.getStringSerializer().deserialize(keyBytes);
if (key != null) {
keys.add(key);
}
}
cursor.close();
return null;
});
} catch (Exception e) {
log.error("Redis Scan查询失败pattern={}", pattern, e);
}
return keys;
}
}