diff --git a/agri-admin/src/main/resources/application-mqtt.yml b/agri-admin/src/main/resources/application-mqtt.yml index cd625e1..03b98c8 100644 --- a/agri-admin/src/main/resources/application-mqtt.yml +++ b/agri-admin/src/main/resources/application-mqtt.yml @@ -1,16 +1,4 @@ spring: - # Redis配置(分布式锁/订阅关系) - redis: - host: 122.51.109.52 - port: 6379 - password: lld123 - database: 1 - lettuce: - pool: - max-active: 8 - max-idle: 8 - min-idle: 2 - max-wait: 10000ms # MQTT配置 mqtt: host: tcp://122.51.109.52:1883 # 设备/后端的MQTT TCP地址 diff --git a/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java b/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java index 9d4ed23..85ed54b 100644 --- a/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java +++ b/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java @@ -169,7 +169,7 @@ public class MqttConfig { // 设置QoS级别 message.setQos(qos); // 设置保留消息:true=服务端保留该主题的最新消息,新订阅者可立即获取 - message.setRetained(true); + message.setRetained(false); // 3. 发布消息 client.publish(topic, message); diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java index 03c1f94..6d8e4f3 100644 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java @@ -43,7 +43,7 @@ public class MqttMessageHandler { private StringRedisTemplate stringRedisTemplate; // 读取配置文件中的默认订阅主题(移除心跳主题) - @Value("${spring.mqtt.default-topic:device/+/status,frontend/+/control/+}") + @Value("${spring.mqtt.default-topic:dtu/+/up,frontend/+/control/+}") private String defaultTopic; /** @@ -107,8 +107,8 @@ public class MqttMessageHandler { try { System.out.println("【MQTT接收】topic=" + topic + ", payload=" + payload); - // 设备状态主题:device/{deviceId}/status - if (topic.matches("device/\\w+/status")) { + // 设备状态主题:dtu/{deviceId}/up + if (topic.matches("dtu/\\w+/up")) { handleDeviceStatus(topic, payload); } // 处理前端控制指令主题:frontend/{clientId}/control/{deviceId} @@ -125,7 +125,7 @@ public class MqttMessageHandler { * 处理设备状态:转发给订阅的前端 */ private void handleDeviceStatus(String topic, String payload) throws MqttException { - // 解析设备ID:主题格式为device/{deviceId}/status,分割后第2个元素是设备ID + // 解析设备ID:主题格式为dtu/{deviceId}/up,分割后第2个元素是设备ID String deviceId = topic.split("/")[1]; // 查询Redis中订阅该设备的前端列表:sub:{deviceId} Set subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId); @@ -133,8 +133,8 @@ public class MqttMessageHandler { if (subscribedClients != null && !subscribedClients.isEmpty()) { // 推送给每个订阅的前端 for (String clientId : subscribedClients) { - // 前端专属主题:frontend/{clientId}/device/{deviceId}/status - String frontendTopic = "frontend/" + clientId + "/device/" + deviceId + "/status"; + // 前端专属主题:frontend/{clientId}/dtu/{deviceId}/up + String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/up"; // 发布消息 mqttMessageSender.publish(frontendTopic, payload); System.out.println("【设备状态转发】设备" + deviceId + " → 前端" + clientId + ",主题:" + frontendTopic); @@ -180,7 +180,7 @@ public class MqttMessageHandler { )); // 4. 转发指令到设备 - String deviceTopic = "device/" + deviceId + "/control"; + String deviceTopic = "dtu/" + deviceId + "/control"; mqttMessageSender.publish(deviceTopic, payload); System.out.println("【指令转发】前端" + clientId + " → 设备" + deviceId); } @@ -201,7 +201,7 @@ public class MqttMessageHandler { return true; } // 普通用户权限:校验Redis中是否绑定该设备:校验Redis中user_device:{clientId}是否包含该设备ID - return stringRedisTemplate.opsForSet().isMember("user_device:" + clientId, deviceId); + return Boolean.TRUE.equals(stringRedisTemplate.opsForSet().isMember("user_device:" + clientId, deviceId)); } /** @@ -215,7 +215,7 @@ public class MqttMessageHandler { // 推送设备最新状态(可选) String latestStatus = stringRedisTemplate.opsForValue().get("device:latest:" + deviceId); if (latestStatus != null) { - String frontendTopic = "frontend/" + clientId + "/device/" + deviceId + "/status"; + String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/up"; try { mqttMessageSender.publish(frontendTopic, latestStatus); } catch (MqttException e) { @@ -260,7 +260,7 @@ public class MqttMessageHandler { String deviceId = subKey.split(":")[1]; deviceIds.add(deviceId); // 构建前端需要取消的MQTT主题 - String frontendTopic = "frontend/" + clientId + "/device/" + deviceId + "/status"; + String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/up"; frontendTopics.add(frontendTopic); // 从该设备的订阅列表中移除clientId stringRedisTemplate.opsForSet().remove(subKey, clientId);