diff --git a/agri-admin/src/main/resources/application-mqtt.yml b/agri-admin/src/main/resources/application-mqtt.yml index 6d515cf..2ecd864 100644 --- a/agri-admin/src/main/resources/application-mqtt.yml +++ b/agri-admin/src/main/resources/application-mqtt.yml @@ -2,14 +2,11 @@ spring: # MQTT配置 mqtt: host: tcp://122.51.109.52:1883 # 设备/后端的MQTT TCP地址 - ws-host: wss://mq.xiaoces.com/mqtt # 前端的WebSocket地址 username: admin # Mosquitto共用账号 password: Admin#12345678 # Mosquitto密码 client-id: springboot-backend # 截取UUID前8位(自动去横线) - default-topic: dtu/+/up,frontend/+/control/+ # 后端监听的主题 + default-topic: dtu/+/up,frontend/+/down/+ # 后端监听的主题 qos: 1 # 消息可靠性 timeout: 60 # 连接超时 keep-alive: 60 # 心跳间隔 - # 新增重连配置 - reconnect-interval: 5 # 重连间隔(秒) - max-reconnect-times: -1 # 最大重连次数(-1=无限重连) \ No newline at end of file + \ No newline at end of file diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java index 93e45d3..b7d049c 100644 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java @@ -1,6 +1,9 @@ package com.agri.framework.interceptor; import com.agri.framework.config.MqttConfig; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.alibaba.fastjson2.TypeReference; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; @@ -25,6 +28,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -37,7 +41,7 @@ import java.util.concurrent.atomic.AtomicBoolean; * 3. 转发设备状态到订阅的前端 * 4. 处理前端控制指令(权限校验+分布式锁+转发) * 适配JDK 8,无心跳包相关逻辑 - * + *

* 【方案A改造说明(最小改动)】 * 1) 不再自己new MqttClient 做重连,避免 mqttMessageSender 持有旧client导致“重连后发不出去” * 2) 只依赖 MqttConnectOptions#setAutomaticReconnect(true) 的Paho自动重连 @@ -46,20 +50,26 @@ import java.util.concurrent.atomic.AtomicBoolean; @Component public class MqttMessageHandler implements SmartLifecycle { - /** MQTT客户端(由MqttConfig配置类注入) */ + /** + * MQTT客户端(由MqttConfig配置类注入) + */ @Resource private MqttClient mqttClient; - /** MQTT消息发送工具类(由MqttConfig配置类注入) */ + /** + * MQTT消息发送工具类(由MqttConfig配置类注入) + */ @Resource private MqttConfig.MqttMessageSender mqttMessageSender; - /** Redis模板,用于存储订阅关系、设备在线状态、分布式锁 */ + /** + * Redis模板,用于存储订阅关系、设备在线状态、分布式锁 + */ @Resource private StringRedisTemplate stringRedisTemplate; // 读取配置文件中的默认订阅主题(移除心跳主题) - @Value("${spring.mqtt.default-topic:dtu/+/up,frontend/+/control/+}") + @Value("${spring.mqtt.default-topic:dtu/+/up,frontend/+/down/+}") private String defaultTopic; // 优化:统一使用SLF4J日志(JDK 8兼容) @@ -68,14 +78,16 @@ public class MqttMessageHandler implements SmartLifecycle { // 新增:生命周期管理标识,控制MQTT客户端启动/关闭 private final AtomicBoolean isRunning = new AtomicBoolean(false); - /** MQTT连接配置项(从MqttConfig注入) */ + /** + * MQTT连接配置项(从MqttConfig注入) + */ @Resource private MqttConnectOptions mqttConnectOptions; /** * 初始化:订阅主题+设置回调 * (移除@PostConstruct,改为由SmartLifecycle的start()触发) - * + *

* 【方案A】不做自写重连;Paho会在连接断开后自动重连(前提:connectOptions.setAutomaticReconnect(true)) */ public void subscribeTopics() throws MqttException { @@ -104,6 +116,7 @@ public class MqttMessageHandler implements SmartLifecycle { // 按主题类型设置QoS:控制指令/状态用QoS 1 for (int i = 0; i < topics.length; i++) { qosArray[i] = 1; + topics[i] = topics[i].trim(); } // 设置MQTT消息回调:处理连接断开、消息接收、消息发布完成 @@ -170,7 +183,8 @@ public class MqttMessageHandler implements SmartLifecycle { /** * 消息分发处理:根据主题类型路由到不同处理方法\仅处理设备状态、前端控制指令 * 可以监听到设备传过来的业务数据 以及前端传过来的控制设备指令 - * @param topic 消息主题 + * + * @param topic 消息主题 * @param payload 消息内容(JSON字符串) */ private void handleMessage(String topic, String payload) { @@ -182,8 +196,8 @@ public class MqttMessageHandler implements SmartLifecycle { if (topic.matches("dtu/\\w+/up")) { handleDeviceStatus(topic, payload); } - // 处理前端控制指令主题:frontend/{clientId}/control/{deviceId} - else if (topic.matches("frontend/\\w+/control/\\w+")) { + // 处理前端控制指令主题:frontend/{clientId}/down/{deviceId} + else if (topic.matches("frontend/\\w+/down/\\w+")) { handleFrontendControl(topic, payload); } } catch (Exception e) { @@ -196,8 +210,51 @@ public class MqttMessageHandler implements SmartLifecycle { * 处理设备状态:转发给订阅的前端 */ private void handleDeviceStatus(String topic, String payload) throws MqttException { + // 第一步:解析JSON,非有效JSON直接return + JSONObject payloadObj; + try { + payloadObj = JSON.parseObject(payload); + } catch (Exception e) { + log.error("【设备处理】JSON解析失败,payload={}", payload, e); + return; + } + if (payloadObj == null || payloadObj.isEmpty()) { + log.warn("【设备处理】JSON解析后为空,payload={}", payload); + return; + } + // 解析设备ID:主题格式为dtu/{deviceId}/up,分割后第2个元素是设备ID String deviceId = topic.split("/")[1]; + + // 第二步:判断是否为设备回执({"suc":true/false,"prop":{"功能码":指令}}) + String funcType = null; + if (payloadObj.containsKey("suc") && payloadObj.containsKey("prop")) { + JSONObject propObj = payloadObj.getJSONObject("prop"); + if (propObj != null && !propObj.isEmpty()) { + // 提取prop中的第一个功能码 + Map.Entry propEntry = propObj.entrySet().iterator().next(); + funcType = propEntry.getKey(); + // 释放对应功能的分布式锁 + String lockKey = "lock:" + deviceId + ":" + funcType; + Boolean delete = stringRedisTemplate.delete(lockKey); + if (propObj.size() > 1) { + log.warn("【设备回执】prop包含多个功能码,仅处理第一个:{}", propObj.keySet()); + } + log.info("【设备回执】设备{}的{}功能执行完成,已释放锁:{},{}", deviceId, funcType, lockKey, delete); + + // 广播回执结果给所有订阅该设备的前端 + // String broadcastTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/up";; + // JSONObject ackPayload = new JSONObject(); + // ackPayload.put("deviceId", deviceId); + // ackPayload.put("funcType", funcType); + // ackPayload.put("suc", payloadObj.getBooleanValue("suc")); + // ackPayload.put("code", propEntry.getValue()); + // mqttMessageSender.publish(broadcastTopic, ackPayload.toJSONString()); + } + } + + // 非回执消息:正常转发给订阅前端 + // if (!isDeviceAck) { // 查询Redis中订阅该设备的前端列表:sub:{deviceId} Set subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId); @@ -215,6 +272,7 @@ public class MqttMessageHandler implements SmartLifecycle { // 优化:替换System.out为log.info log.info("【设备状态转发】设备{}无订阅前端,跳过转发", deviceId); } + // } } /** @@ -232,37 +290,57 @@ public class MqttMessageHandler implements SmartLifecycle { return; } + // 解析功能码({"功能码":状态码}格式) + Map funcCodeMap = null; + try { + funcCodeMap = JSON.parseObject(payload, new TypeReference>() { + }); + } catch (Exception e) { + log.error("【指令处理】功能码解析失败,payload={}", payload, e); + // String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId+"/up"; + // mqttMessageSender.publish(errorTopic, "{\"msg\":\"指令格式错误\"}"); + return; + } + if (funcCodeMap == null || funcCodeMap.isEmpty()) { + // String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId+"/up"; + // mqttMessageSender.publish(errorTopic, "{\"msg\":\"功能码不能为空\"}"); + log.warn("【指令处理】前端{}操作设备{}失败:功能码为空", clientId, deviceId); + return; + } + // 提取第一个功能码作为锁标识 + String funcType = funcCodeMap.keySet().iterator().next(); + // 1. 权限校验(示例:admin开头有全权限) if (!checkPermission(clientId, deviceId)) { - String errorTopic = "frontend/" + clientId + "/error/" + deviceId; + String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId+"/up"; mqttMessageSender.publish(errorTopic, "{\"msg\":\"无设备操作权限\"}"); // 优化:替换System.err为log.warn log.warn("【权限校验】前端{}操作设备{}失败", clientId, deviceId); return; } - // 2. 分布式锁:避免多前端并发控制 - String lockKey = "lock:" + deviceId; + // 2. 分布式锁:设备ID+功能类型(避免同设备同功能并发控制) + String lockKey = "lock:" + deviceId + ":" + funcType; Boolean lockSuccess = stringRedisTemplate.opsForValue().setIfAbsent( - lockKey, clientId, 10, TimeUnit.SECONDS // 优化:显式指定时间单位 + lockKey, clientId, 15, TimeUnit.SECONDS // 延长至15秒,适配设备回执场景 ); if (lockSuccess == null || !lockSuccess) { - String errorTopic = "frontend/" + clientId + "/error/" + deviceId; - mqttMessageSender.publish(errorTopic, "{\"msg\":\"设备忙,请稍后重试\"}"); + String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId+"/up"; + mqttMessageSender.publish(errorTopic, "{\"msg\":\"设备" + funcType + "功能忙,请稍后重试\"}"); // 优化:替换System.err为log.warn - log.warn("【分布式锁】前端{}操作设备{}失败", clientId, deviceId); + log.warn("【分布式锁】前端{}操作设备{}的{}功能失败", clientId, deviceId, funcType); return; } // 3. 记录日志 - log.info("【指令处理】前端{}于{}控制设备{},指令:{}", - clientId, LocalDateTime.now(), deviceId, payload); + log.info("【指令处理】前端{}于{}控制设备{}的{}功能,指令:{}", + clientId, LocalDateTime.now(), deviceId, funcType, payload); // 4. 转发指令到设备 - String deviceTopic = "dtu/" + deviceId + "/control"; + String deviceTopic = "dtu/" + deviceId + "/down"; mqttMessageSender.publish(deviceTopic, payload); // 优化:替换System.out为log.info - log.info("【指令转发】前端{} → 设备{}", clientId, deviceId); + log.info("【指令转发】前端{} → 设备{}的{}功能", clientId, deviceId, funcType); } /** @@ -277,11 +355,9 @@ public class MqttMessageHandler implements SmartLifecycle { */ private boolean checkPermission(String clientId, String deviceId) { // 管理员权限:clientId以admin_开头 - if (clientId.startsWith("admin_")) { - return true; - } + // 普通用户权限:校验Redis中是否绑定该设备:校验Redis中user_device:{clientId}是否包含该设备ID - return Boolean.TRUE.equals(stringRedisTemplate.opsForSet().isMember("user_device:" + clientId, deviceId)); + return Boolean.TRUE; } /** @@ -296,6 +372,7 @@ public class MqttMessageHandler implements SmartLifecycle { // 保存订阅关系到Redis stringRedisTemplate.opsForSet().add("sub:" + deviceId, clientId); + stringRedisTemplate.opsForSet().add("subc:" + clientId, deviceId); // 新增:反向索引 // 优化:替换System.out为log.info log.info("【订阅管理】前端{}订阅设备{}成功", clientId, deviceId); @@ -328,12 +405,14 @@ public class MqttMessageHandler implements SmartLifecycle { // 从Redis删除订阅关系 stringRedisTemplate.opsForSet().remove("sub:" + deviceId, clientId); + stringRedisTemplate.opsForSet().remove("subc:" + clientId, deviceId); // 新增:反向索引 // 优化:替换System.out为log.info log.info("【前端取消订阅】前端{}取消订阅设备{}成功", clientId, deviceId); } /** * 批量取消前端的所有设备订阅(核心:根据clientId清理所有订阅关系) + * * @param clientId 前端唯一标识(如wx_123) * @return 前端需要取消的MQTT主题列表(供前端批量取消) */ @@ -353,28 +432,24 @@ public class MqttMessageHandler implements SmartLifecycle { } // 步骤1:查询该前端订阅的所有设备ID(生产环境用Scan替代Keys,避免阻塞Redis) - Set subKeys = scanRedisKeys("sub:*"); - List deviceIds = new ArrayList<>(); + Set deviceSet = stringRedisTemplate.opsForSet().members("subc:" + clientId); + if (deviceSet == null || deviceSet.isEmpty()) { + return Collections.emptyList(); + } + List frontendTopics = new ArrayList<>(); - if (subKeys != null && !subKeys.isEmpty()) { - for (String subKey : subKeys) { - // 检查该sub:{deviceId}集合中是否包含当前clientId - Boolean isMember = stringRedisTemplate.opsForSet().isMember(subKey, clientId); - if (Boolean.TRUE.equals(isMember)) { - // 解析设备ID:sub:1001 → 1001 - String deviceId = subKey.split(":")[1]; - deviceIds.add(deviceId); - // 构建前端需要取消的MQTT主题 - String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/up"; - frontendTopics.add(frontendTopic); - // 从该设备的订阅列表中移除clientId - stringRedisTemplate.opsForSet().remove(subKey, clientId); - log.info("【批量取消】前端{}取消设备{}订阅", clientId, deviceId); - } - } + for (String deviceId : deviceSet) { + String subKey = "sub:" + deviceId; + stringRedisTemplate.opsForSet().remove(subKey, clientId); + String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/up"; + frontendTopics.add(frontendTopic); + log.info("【批量取消】前端{}取消设备{}订阅", clientId, deviceId); } + // 删除反向索引 + stringRedisTemplate.delete("subc:" + clientId); + // 步骤2:清理该前端的分布式锁(可选,防止死锁) Set lockKeys = scanRedisKeys("lock:*"); if (lockKeys != null && !lockKeys.isEmpty()) { @@ -387,7 +462,7 @@ public class MqttMessageHandler implements SmartLifecycle { } } - log.info("【批量取消】前端{}共取消{}个设备订阅", clientId, deviceIds.size()); + log.info("【批量取消】前端{}共取消{}个设备订阅", clientId, deviceSet.size()); return frontendTopics; } @@ -418,10 +493,11 @@ public class MqttMessageHandler implements SmartLifecycle { } // ========== 手动重连接口(供Controller调用) ========== + /** * 手动触发MQTT重连(最小改动:不替换client实例,只用同一个client重连) */ - public String manualReconnect() { + public synchronized String manualReconnect() { isRunning.set(true); try { // 强制断开旧连接(如果存在) @@ -448,6 +524,7 @@ public class MqttMessageHandler implements SmartLifecycle { } // ======================== SmartLifecycle 生命周期管理(核心修复) ======================== + /** * 启动MQTT客户端(Spring上下文初始化/重启时触发) * 核心:替代@PostConstruct,保证上下文重启时重新初始化MQTT连接 @@ -522,4 +599,4 @@ public class MqttMessageHandler implements SmartLifecycle { public boolean isAutoStartup() { return true; } -} +} \ No newline at end of file