From e2cdd43a8cd23079c107ec0b3e1bc7731b42b735 Mon Sep 17 00:00:00 2001 From: xce Date: Fri, 16 Jan 2026 14:30:24 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9A=82=E6=8F=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agri-admin/src/main/resources/application.yml | 2 + .../com/agri/framework/config/MqttConfig.java | 64 ++++++++- .../interceptor/MqttMessageHandler.java | 129 +++++++++++++----- 3 files changed, 157 insertions(+), 38 deletions(-) diff --git a/agri-admin/src/main/resources/application.yml b/agri-admin/src/main/resources/application.yml index b1ddf2c..bb20778 100644 --- a/agri-admin/src/main/resources/application.yml +++ b/agri-admin/src/main/resources/application.yml @@ -74,6 +74,8 @@ spring: restart: # 热部署开关 enabled: true + # 禁用 MQTT 的热部署 + exclude: com/agri/framework/config/MqttConfig.class,com/agri/framework/interceptor/MqttMessageHandler.class # redis 配置 redis: # 地址 diff --git a/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java b/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java index 85ed54b..b6e3941 100644 --- a/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java +++ b/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java @@ -5,9 +5,16 @@ import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.util.StringUtils; + +import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; +import java.util.UUID; /** * MQTT核心配置类 @@ -23,6 +30,8 @@ import org.springframework.context.annotation.Configuration; */ @Configuration public class MqttConfig { + // 新增:替换System.out为SLF4J日志(JDK 8兼容,生产环境必备) + private static final Logger log = LoggerFactory.getLogger(MqttConfig.class); /** Mosquitto服务器地址(TCP协议):格式为 tcp://IP:端口 */ @Value("${spring.mqtt.host}") @@ -52,6 +61,9 @@ public class MqttConfig { @Value("${spring.mqtt.keep-alive:60}") private int keepAlive; + // 新增:保存客户端实例,用于应用关闭时优雅断开连接 + private MqttClient mqttClientInstance; + /** * 创建MQTT客户端实例(Spring Bean) * 核心逻辑: @@ -64,22 +76,37 @@ public class MqttConfig { */ @Bean public MqttClient mqttClient() throws MqttException { + // 新增:核心参数非空校验(避免空指针,JDK 8兼容) + if (!StringUtils.hasText(host)) { + throw new IllegalArgumentException("MQTT服务器地址(spring.mqtt.host)不能为空"); + } + if (!StringUtils.hasText(clientId)) { + throw new IllegalArgumentException("MQTT客户端ID(spring.mqtt.client-id)不能为空"); + } + // 1. 初始化连接配置项 MqttConnectOptions connectOptions = getMqttConnectOptions(); // 2. 初始化MQTT客户端 // MemoryPersistence:使用内存存储会话,不持久化到磁盘(适合后端服务) MemoryPersistence persistence = new MemoryPersistence(); - MqttClient mqttClient = new MqttClient(host, clientId, persistence); + + // 新增:客户端ID拼接随机后缀,避免多实例部署时冲突(生产环境必备) + String uniqueClientId = clientId + "_" + UUID.randomUUID().toString().substring(0, 8); + MqttClient mqttClient = new MqttClient(host, uniqueClientId, persistence); // 3. 建立MQTT连接 if (!mqttClient.isConnected()) { mqttClient.connect(connectOptions); - System.out.println("【MQTT连接成功】服务器地址:" + host + ",客户端ID:" + clientId); + // 优化:替换System.out为日志框架,保留原有输出内容 + log.info("【MQTT连接成功】服务器地址:" + host + ",客户端ID:" + uniqueClientId); } else { - System.out.println("【MQTT连接状态】已连接,无需重复初始化"); + // 优化:替换System.out为日志框架,保留原有输出内容 + log.info("【MQTT连接状态】已连接,无需重复初始化"); } + // 新增:保存实例到成员变量,用于优雅关闭 + this.mqttClientInstance = mqttClient; return mqttClient; } @@ -94,7 +121,8 @@ public class MqttConfig { // 设置保活间隔(秒):客户端每隔该时间发送一次心跳 connectOptions.setKeepAliveInterval(keepAlive); // 关闭清除会话:false=重连后保留订阅关系(若不需要离线消息可设为true) - connectOptions.setCleanSession(true); + // 优化:生产环境建议设为false,重连后保留订阅关系,避免丢失离线消息 + connectOptions.setCleanSession(false); // 开启自动重连:连接断开后自动尝试重连,提升稳定性 connectOptions.setAutomaticReconnect(true); // 设置最大重连间隔(秒):避免频繁重连消耗资源 @@ -157,6 +185,14 @@ public class MqttConfig { * @throws MqttException 消息发布异常(连接断开、主题无效等) */ public void publish(String topic, String payload, int qos) throws MqttException { + // 新增:入参校验(避免空主题/空内容,JDK 8兼容) + if (!StringUtils.hasText(topic)) { + throw new IllegalArgumentException("MQTT发布主题不能为空"); + } + if (payload == null) { + payload = ""; // 空内容默认设为空字符串,避免NPE + } + // 1. 校验客户端是否已连接 if (!client.isConnected()) { throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED); @@ -165,7 +201,8 @@ public class MqttConfig { // 2. 构建MQTT消息对象 MqttMessage message = new MqttMessage(); // 设置消息内容(转换为字节数组) - message.setPayload(payload.getBytes()); + // 优化:显式指定UTF-8编码,避免不同环境下编码不一致导致乱码(JDK 8兼容) + message.setPayload(payload.getBytes(StandardCharsets.UTF_8)); // 设置QoS级别 message.setQos(qos); // 设置保留消息:true=服务端保留该主题的最新消息,新订阅者可立即获取 @@ -173,7 +210,22 @@ public class MqttConfig { // 3. 发布消息 client.publish(topic, message); - System.out.println("【MQTT消息发布成功】主题:" + topic + ",内容:" + payload); + // 优化:替换System.out为日志框架,保留原有输出内容 + log.info("【MQTT消息发布成功】主题:" + topic + ",内容:" + payload); + } + } + + // 新增:应用关闭时优雅断开MQTT连接,避免连接泄漏(JDK 8兼容) + @PreDestroy + public void destroyMqttClient() { + if (mqttClientInstance != null && mqttClientInstance.isConnected()) { + try { + mqttClientInstance.disconnect(); + mqttClientInstance.close(); + log.info("【MQTT连接关闭】客户端已优雅断开连接"); + } catch (MqttException e) { + log.error("【MQTT连接关闭异常】:" + e.getMessage(), e); + } } } } \ No newline at end of file diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java index d96658f..7901f01 100644 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java @@ -9,11 +9,16 @@ import org.eclipse.paho.client.mqttv3.MqttMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; +import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.core.Cursor; +import org.springframework.data.redis.core.ScanOptions; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; import javax.annotation.PostConstruct; import javax.annotation.Resource; +import java.nio.charset.StandardCharsets; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collections; @@ -49,7 +54,9 @@ public class MqttMessageHandler { @Value("${spring.mqtt.default-topic:dtu/+/up,frontend/+/control/+}") private String defaultTopic; - Logger log = LoggerFactory.getLogger(MqttMessageHandler.class); + // 优化:统一使用SLF4J日志(JDK 8兼容) + private static final Logger log = LoggerFactory.getLogger(MqttMessageHandler.class); + /** * 初始化:订阅主题+设置回调 */ @@ -71,7 +78,8 @@ public class MqttMessageHandler { */ @Override public void connectionLost(Throwable cause) { - System.err.println("【MQTT连接异常】连接断开:" + cause.getMessage()); + // 优化:替换System.err为log.error + log.error("【MQTT连接异常】连接断开:{}", cause.getMessage(), cause); } /** @@ -82,8 +90,8 @@ public class MqttMessageHandler { */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { - // 将字节数组转换为字符串,分发处理不同主题的消息 - handleMessage(topic, new String(message.getPayload())); + // 优化:显式指定UTF-8编码,避免乱码(JDK 8兼容) + handleMessage(topic, new String(message.getPayload(), StandardCharsets.UTF_8)); } /** @@ -92,14 +100,17 @@ public class MqttMessageHandler { */ @Override public void deliveryComplete(IMqttDeliveryToken token) { - // QoS 1确认:消息已被服务端接收 - System.out.println("【MQTT确认】消息发布完成,主题:" + token.getTopics()[0]); + // 优化:替换System.out为log.info,增加空值校验 + if (token != null && token.getTopics() != null && token.getTopics().length > 0) { + log.info("【MQTT确认】消息发布完成,主题:{}", token.getTopics()[0]); + } } }); // 订阅主题 mqttClient.subscribe(topics, qosArray); - System.out.println("【MQTT初始化】订阅主题:" + String.join(",", topics)); + // 优化:替换System.out为log.info + log.info("【MQTT初始化】订阅主题:{}", String.join(",", topics)); } /** @@ -109,7 +120,8 @@ public class MqttMessageHandler { */ private void handleMessage(String topic, String payload) { try { - System.out.println("【MQTT接收】topic=" + topic + ", payload=" + payload); + // 优化:替换System.out为log.info + log.info("【MQTT接收】topic={}, payload={}", topic, payload); // 设备状态主题:dtu/{deviceId}/up if (topic.matches("dtu/\\w+/up")) { @@ -120,8 +132,8 @@ public class MqttMessageHandler { handleFrontendControl(topic, payload); } } catch (Exception e) { - System.err.println("【MQTT消息处理异常】topic=" + topic + ", 异常信息:" + e.getMessage()); - e.printStackTrace(); + // 优化:替换System.err为log.error,打印完整堆栈 + log.error("【MQTT消息处理异常】topic={}", topic, e); } } @@ -141,10 +153,12 @@ public class MqttMessageHandler { String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/up"; // 发布消息 mqttMessageSender.publish(frontendTopic, payload); - System.out.println("【设备状态转发】设备" + deviceId + " → 前端" + clientId + ",主题:" + frontendTopic); + // 优化:替换System.out为log.info + log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic); } } else { - System.out.println("【设备状态转发】设备" + deviceId + "无订阅前端,跳过转发"); + // 优化:替换System.out为log.info + log.info("【设备状态转发】设备{}无订阅前端,跳过转发", deviceId); } } @@ -157,36 +171,43 @@ public class MqttMessageHandler { String clientId = parts[1]; String deviceId = parts[3]; + // 新增:入参非空校验(JDK 8兼容) + if (!StringUtils.hasText(clientId) || !StringUtils.hasText(deviceId)) { + log.error("【指令处理】clientId或deviceId为空,topic={}", topic); + return; + } + // 1. 权限校验(示例:admin开头有全权限) if (!checkPermission(clientId, deviceId)) { String errorTopic = "frontend/" + clientId + "/error/" + deviceId; mqttMessageSender.publish(errorTopic, "{\"msg\":\"无设备操作权限\"}"); - System.err.println("【权限校验】前端" + clientId + "操作设备" + deviceId + "失败"); + // 优化:替换System.err为log.warn + log.warn("【权限校验】前端{}操作设备{}失败", clientId, deviceId); return; } // 2. 分布式锁:避免多前端并发控制 String lockKey = "lock:" + deviceId; Boolean lockSuccess = stringRedisTemplate.opsForValue().setIfAbsent( - lockKey, clientId, 10, TimeUnit.SECONDS + lockKey, clientId, 10, TimeUnit.SECONDS // 优化:显式指定时间单位 ); if (lockSuccess == null || !lockSuccess) { String errorTopic = "frontend/" + clientId + "/error/" + deviceId; mqttMessageSender.publish(errorTopic, "{\"msg\":\"设备忙,请稍后重试\"}"); - System.err.println("【分布式锁】前端" + clientId + "操作设备" + deviceId + "失败"); + // 优化:替换System.err为log.warn + log.warn("【分布式锁】前端{}操作设备{}失败", clientId, deviceId); return; } // 3. 记录日志 - System.out.println(String.format( - "【指令处理】前端%s于%s控制设备%s,指令:%s", - clientId, LocalDateTime.now(), deviceId, payload - )); + log.info("【指令处理】前端{}于{}控制设备{},指令:{}", + clientId, LocalDateTime.now(), deviceId, payload); // 4. 转发指令到设备 String deviceTopic = "dtu/" + deviceId + "/control"; mqttMessageSender.publish(deviceTopic, payload); - System.out.println("【指令转发】前端" + clientId + " → 设备" + deviceId); + // 优化:替换System.out为log.info + log.info("【指令转发】前端{} → 设备{}", clientId, deviceId); } /** @@ -212,9 +233,16 @@ public class MqttMessageHandler { * 前端订阅设备(Controller调用) */ public void subscribeDevice(String clientId, String deviceId) { + // 新增:入参非空校验(JDK 8兼容) + if (!StringUtils.hasText(clientId) || !StringUtils.hasText(deviceId)) { + log.error("【订阅管理】clientId或deviceId不能为空"); + throw new IllegalArgumentException("clientId和deviceId不能为空"); + } + // 保存订阅关系到Redis stringRedisTemplate.opsForSet().add("sub:" + deviceId, clientId); - System.out.println("【订阅管理】前端" + clientId + "订阅设备" + deviceId + "成功"); + // 优化:替换System.out为log.info + log.info("【订阅管理】前端{}订阅设备{}成功", clientId, deviceId); // 推送设备最新状态(可选) String latestStatus = stringRedisTemplate.opsForValue().get("device:latest:" + deviceId); @@ -223,7 +251,8 @@ public class MqttMessageHandler { try { mqttMessageSender.publish(frontendTopic, latestStatus); } catch (MqttException e) { - System.err.println("【订阅推送】设备" + deviceId + "状态推送失败:" + e.getMessage()); + // 优化:替换System.err为log.error + log.error("【订阅推送】设备{}状态推送失败", deviceId, e); } } } @@ -236,19 +265,29 @@ public class MqttMessageHandler { * @param deviceId 设备ID */ public void unsubscribeDevice(String clientId, String deviceId) { + // 新增:入参非空校验(JDK 8兼容) + if (!StringUtils.hasText(clientId) || !StringUtils.hasText(deviceId)) { + log.error("【前端取消订阅】clientId或deviceId不能为空"); + throw new IllegalArgumentException("clientId和deviceId不能为空"); + } + // 从Redis删除订阅关系 stringRedisTemplate.opsForSet().remove("sub:" + deviceId, clientId); - System.out.println("【前端取消订阅】前端" + clientId + "取消订阅设备" + deviceId + "成功"); + // 优化:替换System.out为log.info + log.info("【前端取消订阅】前端{}取消订阅设备{}成功", clientId, deviceId); } - - /** * 批量取消前端的所有设备订阅(核心:根据clientId清理所有订阅关系) * @param clientId 前端唯一标识(如wx_123) * @return 前端需要取消的MQTT主题列表(供前端批量取消) */ public List unsubscribeAllDevice(String clientId) { + // 新增:入参非空校验(JDK 8兼容) + if (!StringUtils.hasText(clientId)) { + log.error("【批量取消】clientId不能为空"); + throw new IllegalArgumentException("clientId不能为空"); + } // 适配低版本的Redis连接可用性校验(替换掉isRunning()) try { @@ -257,9 +296,9 @@ public class MqttMessageHandler { log.warn("Redis连接不可用,取消订阅操作跳过:{}", e.getMessage()); return Collections.emptyList(); } - // 步骤1:查询该前端订阅的所有设备ID(Redis中所有sub:*集合中包含该clientId的key) - // 注意:生产环境建议用scan代替keys,避免阻塞Redis - Set subKeys = stringRedisTemplate.keys("sub:*"); + + // 步骤1:查询该前端订阅的所有设备ID(生产环境用Scan替代Keys,避免阻塞Redis) + Set subKeys = scanRedisKeys("sub:*"); List deviceIds = new ArrayList<>(); List frontendTopics = new ArrayList<>(); @@ -276,25 +315,51 @@ public class MqttMessageHandler { frontendTopics.add(frontendTopic); // 从该设备的订阅列表中移除clientId stringRedisTemplate.opsForSet().remove(subKey, clientId); - System.out.println("【批量取消】前端" + clientId + "取消设备" + deviceId + "订阅"); + log.info("【批量取消】前端{}取消设备{}订阅", clientId, deviceId); } } } // 步骤2:清理该前端的分布式锁(可选,防止死锁) - Set lockKeys = stringRedisTemplate.keys("lock:*"); + Set lockKeys = scanRedisKeys("lock:*"); if (lockKeys != null && !lockKeys.isEmpty()) { for (String lockKey : lockKeys) { String lockValue = stringRedisTemplate.opsForValue().get(lockKey); if (clientId.equals(lockValue)) { stringRedisTemplate.delete(lockKey); - System.out.println("【批量取消】清理前端" + clientId + "持有的锁:" + lockKey); + log.info("【批量取消】清理前端{}持有的锁:{}", clientId, lockKey); } } } - System.out.println("【批量取消】前端" + clientId + "共取消" + deviceIds.size() + "个设备订阅"); + log.info("【批量取消】前端{}共取消{}个设备订阅", clientId, deviceIds.size()); return frontendTopics; } + // 新增:生产环境用Scan替代Keys,避免Redis阻塞(JDK 8兼容) + private Set scanRedisKeys(String pattern) { + Set keys = new java.util.HashSet<>(); + try { + stringRedisTemplate.executeWithStickyConnection((RedisConnection connection) -> { + ScanOptions scanOptions = ScanOptions.scanOptions() + .match(pattern) + .count(100) + .build(); + Cursor cursor = connection.scan(scanOptions); + while (cursor.hasNext()) { + byte[] keyBytes = cursor.next(); + String key = stringRedisTemplate.getStringSerializer().deserialize(keyBytes); + if (key != null) { + keys.add(key); + } + } + cursor.close(); + return null; + }); + } catch (Exception e) { + log.error("Redis Scan查询失败,pattern={}", pattern, e); + } + return keys; + } + } \ No newline at end of file