feasure
xce 2026-01-16 02:12:28 +08:00
parent 913365927f
commit 0c520823aa
3 changed files with 11 additions and 23 deletions

View File

@ -1,16 +1,4 @@
spring: spring:
# Redis配置分布式锁/订阅关系)
redis:
host: 122.51.109.52
port: 6379
password: lld123
database: 1
lettuce:
pool:
max-active: 8
max-idle: 8
min-idle: 2
max-wait: 10000ms
# MQTT配置 # MQTT配置
mqtt: mqtt:
host: tcp://122.51.109.52:1883 # 设备/后端的MQTT TCP地址 host: tcp://122.51.109.52:1883 # 设备/后端的MQTT TCP地址

View File

@ -169,7 +169,7 @@ public class MqttConfig {
// 设置QoS级别 // 设置QoS级别
message.setQos(qos); message.setQos(qos);
// 设置保留消息true=服务端保留该主题的最新消息,新订阅者可立即获取 // 设置保留消息true=服务端保留该主题的最新消息,新订阅者可立即获取
message.setRetained(true); message.setRetained(false);
// 3. 发布消息 // 3. 发布消息
client.publish(topic, message); client.publish(topic, message);

View File

@ -43,7 +43,7 @@ public class MqttMessageHandler {
private StringRedisTemplate stringRedisTemplate; private StringRedisTemplate stringRedisTemplate;
// 读取配置文件中的默认订阅主题(移除心跳主题) // 读取配置文件中的默认订阅主题(移除心跳主题)
@Value("${spring.mqtt.default-topic:device/+/status,frontend/+/control/+}") @Value("${spring.mqtt.default-topic:dtu/+/up,frontend/+/control/+}")
private String defaultTopic; private String defaultTopic;
/** /**
@ -107,8 +107,8 @@ public class MqttMessageHandler {
try { try {
System.out.println("【MQTT接收】topic=" + topic + ", payload=" + payload); System.out.println("【MQTT接收】topic=" + topic + ", payload=" + payload);
// 设备状态主题device/{deviceId}/status // 设备状态主题dtu/{deviceId}/up
if (topic.matches("device/\\w+/status")) { if (topic.matches("dtu/\\w+/up")) {
handleDeviceStatus(topic, payload); handleDeviceStatus(topic, payload);
} }
// 处理前端控制指令主题frontend/{clientId}/control/{deviceId} // 处理前端控制指令主题frontend/{clientId}/control/{deviceId}
@ -125,7 +125,7 @@ public class MqttMessageHandler {
* *
*/ */
private void handleDeviceStatus(String topic, String payload) throws MqttException { private void handleDeviceStatus(String topic, String payload) throws MqttException {
// 解析设备ID主题格式为device/{deviceId}/status分割后第2个元素是设备ID // 解析设备ID主题格式为dtu/{deviceId}/up分割后第2个元素是设备ID
String deviceId = topic.split("/")[1]; String deviceId = topic.split("/")[1];
// 查询Redis中订阅该设备的前端列表sub:{deviceId} // 查询Redis中订阅该设备的前端列表sub:{deviceId}
Set<String> subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId); Set<String> subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId);
@ -133,8 +133,8 @@ public class MqttMessageHandler {
if (subscribedClients != null && !subscribedClients.isEmpty()) { if (subscribedClients != null && !subscribedClients.isEmpty()) {
// 推送给每个订阅的前端 // 推送给每个订阅的前端
for (String clientId : subscribedClients) { for (String clientId : subscribedClients) {
// 前端专属主题frontend/{clientId}/device/{deviceId}/status // 前端专属主题frontend/{clientId}/dtu/{deviceId}/up
String frontendTopic = "frontend/" + clientId + "/device/" + deviceId + "/status"; String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/up";
// 发布消息 // 发布消息
mqttMessageSender.publish(frontendTopic, payload); mqttMessageSender.publish(frontendTopic, payload);
System.out.println("【设备状态转发】设备" + deviceId + " → 前端" + clientId + ",主题:" + frontendTopic); System.out.println("【设备状态转发】设备" + deviceId + " → 前端" + clientId + ",主题:" + frontendTopic);
@ -180,7 +180,7 @@ public class MqttMessageHandler {
)); ));
// 4. 转发指令到设备 // 4. 转发指令到设备
String deviceTopic = "device/" + deviceId + "/control"; String deviceTopic = "dtu/" + deviceId + "/control";
mqttMessageSender.publish(deviceTopic, payload); mqttMessageSender.publish(deviceTopic, payload);
System.out.println("【指令转发】前端" + clientId + " → 设备" + deviceId); System.out.println("【指令转发】前端" + clientId + " → 设备" + deviceId);
} }
@ -201,7 +201,7 @@ public class MqttMessageHandler {
return true; return true;
} }
// 普通用户权限校验Redis中是否绑定该设备校验Redis中user_device:{clientId}是否包含该设备ID // 普通用户权限校验Redis中是否绑定该设备校验Redis中user_device:{clientId}是否包含该设备ID
return stringRedisTemplate.opsForSet().isMember("user_device:" + clientId, deviceId); return Boolean.TRUE.equals(stringRedisTemplate.opsForSet().isMember("user_device:" + clientId, deviceId));
} }
/** /**
@ -215,7 +215,7 @@ public class MqttMessageHandler {
// 推送设备最新状态(可选) // 推送设备最新状态(可选)
String latestStatus = stringRedisTemplate.opsForValue().get("device:latest:" + deviceId); String latestStatus = stringRedisTemplate.opsForValue().get("device:latest:" + deviceId);
if (latestStatus != null) { if (latestStatus != null) {
String frontendTopic = "frontend/" + clientId + "/device/" + deviceId + "/status"; String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/up";
try { try {
mqttMessageSender.publish(frontendTopic, latestStatus); mqttMessageSender.publish(frontendTopic, latestStatus);
} catch (MqttException e) { } catch (MqttException e) {
@ -260,7 +260,7 @@ public class MqttMessageHandler {
String deviceId = subKey.split(":")[1]; String deviceId = subKey.split(":")[1];
deviceIds.add(deviceId); deviceIds.add(deviceId);
// 构建前端需要取消的MQTT主题 // 构建前端需要取消的MQTT主题
String frontendTopic = "frontend/" + clientId + "/device/" + deviceId + "/status"; String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/up";
frontendTopics.add(frontendTopic); frontendTopics.add(frontendTopic);
// 从该设备的订阅列表中移除clientId // 从该设备的订阅列表中移除clientId
stringRedisTemplate.opsForSet().remove(subKey, clientId); stringRedisTemplate.opsForSet().remove(subKey, clientId);