diff --git a/agri-admin/src/main/resources/application-mqtt.yml b/agri-admin/src/main/resources/application-mqtt.yml index 2ecd864..b17fa55 100644 --- a/agri-admin/src/main/resources/application-mqtt.yml +++ b/agri-admin/src/main/resources/application-mqtt.yml @@ -5,7 +5,7 @@ spring: username: admin # Mosquitto共用账号 password: Admin#12345678 # Mosquitto密码 client-id: springboot-backend # 截取UUID前8位(自动去横线) - default-topic: dtu/+/up,frontend/+/down/+ # 后端监听的主题 + default-topic: dtu/+/up,frontend/+/control/+ # 后端监听的主题 qos: 1 # 消息可靠性 timeout: 60 # 连接超时 keep-alive: 60 # 心跳间隔 diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java index b7d049c..ad4b803 100644 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java @@ -41,6 +41,12 @@ import java.util.concurrent.atomic.AtomicBoolean; * 3. 转发设备状态到订阅的前端 * 4. 处理前端控制指令(权限校验+分布式锁+转发) * 适配JDK 8,无心跳包相关逻辑 + * + * + * + * + * 前端监听: "frontend/" + clientId + "/dtu/" + deviceId + "/listener" + * 前端发布主题:frontend/+/control/+ *
* 【方案A改造说明(最小改动)】 * 1) 不再自己new MqttClient 做重连,避免 mqttMessageSender 持有旧client导致“重连后发不出去” @@ -69,7 +75,7 @@ public class MqttMessageHandler implements SmartLifecycle { private StringRedisTemplate stringRedisTemplate; // 读取配置文件中的默认订阅主题(移除心跳主题) - @Value("${spring.mqtt.default-topic:dtu/+/up,frontend/+/down/+}") + @Value("${spring.mqtt.default-topic}") private String defaultTopic; // 优化:统一使用SLF4J日志(JDK 8兼容) @@ -196,8 +202,8 @@ public class MqttMessageHandler implements SmartLifecycle { if (topic.matches("dtu/\\w+/up")) { handleDeviceStatus(topic, payload); } - // 处理前端控制指令主题:frontend/{clientId}/down/{deviceId} - else if (topic.matches("frontend/\\w+/down/\\w+")) { + // 处理前端控制指令主题:frontend/{clientId}/control/{deviceId} + else if (topic.matches("frontend/\\w+/control/\\w+")) { handleFrontendControl(topic, payload); } } catch (Exception e) { @@ -243,7 +249,7 @@ public class MqttMessageHandler implements SmartLifecycle { log.info("【设备回执】设备{}的{}功能执行完成,已释放锁:{},{}", deviceId, funcType, lockKey, delete); // 广播回执结果给所有订阅该设备的前端 - // String broadcastTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/up";; + // String broadcastTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";; // JSONObject ackPayload = new JSONObject(); // ackPayload.put("deviceId", deviceId); // ackPayload.put("funcType", funcType); @@ -262,7 +268,7 @@ public class MqttMessageHandler implements SmartLifecycle { // 推送给每个订阅的前端 for (String clientId : subscribedClients) { // 前端专属主题:frontend/{clientId}/dtu/{deviceId}/up - String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/up"; + String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; // 发布消息 mqttMessageSender.publish(frontendTopic, payload); // 优化:替换System.out为log.info @@ -297,12 +303,12 @@ public class MqttMessageHandler implements SmartLifecycle { }); } catch (Exception e) { log.error("【指令处理】功能码解析失败,payload={}", payload, e); - // String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId+"/up"; + // String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; // mqttMessageSender.publish(errorTopic, "{\"msg\":\"指令格式错误\"}"); return; } if (funcCodeMap == null || funcCodeMap.isEmpty()) { - // String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId+"/up"; + // String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; // mqttMessageSender.publish(errorTopic, "{\"msg\":\"功能码不能为空\"}"); log.warn("【指令处理】前端{}操作设备{}失败:功能码为空", clientId, deviceId); return; @@ -312,7 +318,7 @@ public class MqttMessageHandler implements SmartLifecycle { // 1. 权限校验(示例:admin开头有全权限) if (!checkPermission(clientId, deviceId)) { - String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId+"/up"; + String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; mqttMessageSender.publish(errorTopic, "{\"msg\":\"无设备操作权限\"}"); // 优化:替换System.err为log.warn log.warn("【权限校验】前端{}操作设备{}失败", clientId, deviceId); @@ -325,7 +331,7 @@ public class MqttMessageHandler implements SmartLifecycle { lockKey, clientId, 15, TimeUnit.SECONDS // 延长至15秒,适配设备回执场景 ); if (lockSuccess == null || !lockSuccess) { - String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId+"/up"; + String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; mqttMessageSender.publish(errorTopic, "{\"msg\":\"设备" + funcType + "功能忙,请稍后重试\"}"); // 优化:替换System.err为log.warn log.warn("【分布式锁】前端{}操作设备{}的{}功能失败", clientId, deviceId, funcType); @@ -338,7 +344,8 @@ public class MqttMessageHandler implements SmartLifecycle { // 4. 转发指令到设备 String deviceTopic = "dtu/" + deviceId + "/down"; - mqttMessageSender.publish(deviceTopic, payload); + //todo + // mqttMessageSender.publish(deviceTopic, payload); // 优化:替换System.out为log.info log.info("【指令转发】前端{} → 设备{}的{}功能", clientId, deviceId, funcType); } @@ -379,7 +386,7 @@ public class MqttMessageHandler implements SmartLifecycle { // 推送设备最新状态(可选) // String latestStatus = stringRedisTemplate.opsForValue().get("device:latest:" + deviceId); // if (latestStatus != null) { - // String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/up"; + // String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; // try { // mqttMessageSender.publish(frontendTopic, latestStatus); // } catch (MqttException e) { @@ -442,7 +449,7 @@ public class MqttMessageHandler implements SmartLifecycle { for (String deviceId : deviceSet) { String subKey = "sub:" + deviceId; stringRedisTemplate.opsForSet().remove(subKey, clientId); - String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/up"; + String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; frontendTopics.add(frontendTopic); log.info("【批量取消】前端{}取消设备{}订阅", clientId, deviceId); }