mqtt基础功能
parent
e7b2f43cf0
commit
8c2eebbf55
|
|
@ -5,7 +5,7 @@ spring:
|
||||||
username: admin # Mosquitto共用账号
|
username: admin # Mosquitto共用账号
|
||||||
password: Admin#12345678 # Mosquitto密码
|
password: Admin#12345678 # Mosquitto密码
|
||||||
client-id: springboot-backend # 截取UUID前8位(自动去横线)
|
client-id: springboot-backend # 截取UUID前8位(自动去横线)
|
||||||
default-topic: dtu/+/up,frontend/+/down/+ # 后端监听的主题
|
default-topic: dtu/+/up,frontend/+/control/+ # 后端监听的主题
|
||||||
qos: 1 # 消息可靠性
|
qos: 1 # 消息可靠性
|
||||||
timeout: 60 # 连接超时
|
timeout: 60 # 连接超时
|
||||||
keep-alive: 60 # 心跳间隔
|
keep-alive: 60 # 心跳间隔
|
||||||
|
|
|
||||||
|
|
@ -41,6 +41,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
* 3. 转发设备状态到订阅的前端
|
* 3. 转发设备状态到订阅的前端
|
||||||
* 4. 处理前端控制指令(权限校验+分布式锁+转发)
|
* 4. 处理前端控制指令(权限校验+分布式锁+转发)
|
||||||
* 适配JDK 8,无心跳包相关逻辑
|
* 适配JDK 8,无心跳包相关逻辑
|
||||||
|
*
|
||||||
|
*
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* 前端监听: "frontend/" + clientId + "/dtu/" + deviceId + "/listener"
|
||||||
|
* 前端发布主题:frontend/+/control/+
|
||||||
* <p>
|
* <p>
|
||||||
* 【方案A改造说明(最小改动)】
|
* 【方案A改造说明(最小改动)】
|
||||||
* 1) 不再自己new MqttClient 做重连,避免 mqttMessageSender 持有旧client导致“重连后发不出去”
|
* 1) 不再自己new MqttClient 做重连,避免 mqttMessageSender 持有旧client导致“重连后发不出去”
|
||||||
|
|
@ -69,7 +75,7 @@ public class MqttMessageHandler implements SmartLifecycle {
|
||||||
private StringRedisTemplate stringRedisTemplate;
|
private StringRedisTemplate stringRedisTemplate;
|
||||||
|
|
||||||
// 读取配置文件中的默认订阅主题(移除心跳主题)
|
// 读取配置文件中的默认订阅主题(移除心跳主题)
|
||||||
@Value("${spring.mqtt.default-topic:dtu/+/up,frontend/+/down/+}")
|
@Value("${spring.mqtt.default-topic}")
|
||||||
private String defaultTopic;
|
private String defaultTopic;
|
||||||
|
|
||||||
// 优化:统一使用SLF4J日志(JDK 8兼容)
|
// 优化:统一使用SLF4J日志(JDK 8兼容)
|
||||||
|
|
@ -196,8 +202,8 @@ public class MqttMessageHandler implements SmartLifecycle {
|
||||||
if (topic.matches("dtu/\\w+/up")) {
|
if (topic.matches("dtu/\\w+/up")) {
|
||||||
handleDeviceStatus(topic, payload);
|
handleDeviceStatus(topic, payload);
|
||||||
}
|
}
|
||||||
// 处理前端控制指令主题:frontend/{clientId}/down/{deviceId}
|
// 处理前端控制指令主题:frontend/{clientId}/control/{deviceId}
|
||||||
else if (topic.matches("frontend/\\w+/down/\\w+")) {
|
else if (topic.matches("frontend/\\w+/control/\\w+")) {
|
||||||
handleFrontendControl(topic, payload);
|
handleFrontendControl(topic, payload);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|
@ -243,7 +249,7 @@ public class MqttMessageHandler implements SmartLifecycle {
|
||||||
log.info("【设备回执】设备{}的{}功能执行完成,已释放锁:{},{}", deviceId, funcType, lockKey, delete);
|
log.info("【设备回执】设备{}的{}功能执行完成,已释放锁:{},{}", deviceId, funcType, lockKey, delete);
|
||||||
|
|
||||||
// 广播回执结果给所有订阅该设备的前端
|
// 广播回执结果给所有订阅该设备的前端
|
||||||
// String broadcastTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/up";;
|
// String broadcastTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";;
|
||||||
// JSONObject ackPayload = new JSONObject();
|
// JSONObject ackPayload = new JSONObject();
|
||||||
// ackPayload.put("deviceId", deviceId);
|
// ackPayload.put("deviceId", deviceId);
|
||||||
// ackPayload.put("funcType", funcType);
|
// ackPayload.put("funcType", funcType);
|
||||||
|
|
@ -262,7 +268,7 @@ public class MqttMessageHandler implements SmartLifecycle {
|
||||||
// 推送给每个订阅的前端
|
// 推送给每个订阅的前端
|
||||||
for (String clientId : subscribedClients) {
|
for (String clientId : subscribedClients) {
|
||||||
// 前端专属主题:frontend/{clientId}/dtu/{deviceId}/up
|
// 前端专属主题:frontend/{clientId}/dtu/{deviceId}/up
|
||||||
String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/up";
|
String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";
|
||||||
// 发布消息
|
// 发布消息
|
||||||
mqttMessageSender.publish(frontendTopic, payload);
|
mqttMessageSender.publish(frontendTopic, payload);
|
||||||
// 优化:替换System.out为log.info
|
// 优化:替换System.out为log.info
|
||||||
|
|
@ -297,12 +303,12 @@ public class MqttMessageHandler implements SmartLifecycle {
|
||||||
});
|
});
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("【指令处理】功能码解析失败,payload={}", payload, e);
|
log.error("【指令处理】功能码解析失败,payload={}", payload, e);
|
||||||
// String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId+"/up";
|
// String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";
|
||||||
// mqttMessageSender.publish(errorTopic, "{\"msg\":\"指令格式错误\"}");
|
// mqttMessageSender.publish(errorTopic, "{\"msg\":\"指令格式错误\"}");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (funcCodeMap == null || funcCodeMap.isEmpty()) {
|
if (funcCodeMap == null || funcCodeMap.isEmpty()) {
|
||||||
// String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId+"/up";
|
// String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";
|
||||||
// mqttMessageSender.publish(errorTopic, "{\"msg\":\"功能码不能为空\"}");
|
// mqttMessageSender.publish(errorTopic, "{\"msg\":\"功能码不能为空\"}");
|
||||||
log.warn("【指令处理】前端{}操作设备{}失败:功能码为空", clientId, deviceId);
|
log.warn("【指令处理】前端{}操作设备{}失败:功能码为空", clientId, deviceId);
|
||||||
return;
|
return;
|
||||||
|
|
@ -312,7 +318,7 @@ public class MqttMessageHandler implements SmartLifecycle {
|
||||||
|
|
||||||
// 1. 权限校验(示例:admin开头有全权限)
|
// 1. 权限校验(示例:admin开头有全权限)
|
||||||
if (!checkPermission(clientId, deviceId)) {
|
if (!checkPermission(clientId, deviceId)) {
|
||||||
String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId+"/up";
|
String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";
|
||||||
mqttMessageSender.publish(errorTopic, "{\"msg\":\"无设备操作权限\"}");
|
mqttMessageSender.publish(errorTopic, "{\"msg\":\"无设备操作权限\"}");
|
||||||
// 优化:替换System.err为log.warn
|
// 优化:替换System.err为log.warn
|
||||||
log.warn("【权限校验】前端{}操作设备{}失败", clientId, deviceId);
|
log.warn("【权限校验】前端{}操作设备{}失败", clientId, deviceId);
|
||||||
|
|
@ -325,7 +331,7 @@ public class MqttMessageHandler implements SmartLifecycle {
|
||||||
lockKey, clientId, 15, TimeUnit.SECONDS // 延长至15秒,适配设备回执场景
|
lockKey, clientId, 15, TimeUnit.SECONDS // 延长至15秒,适配设备回执场景
|
||||||
);
|
);
|
||||||
if (lockSuccess == null || !lockSuccess) {
|
if (lockSuccess == null || !lockSuccess) {
|
||||||
String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId+"/up";
|
String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";
|
||||||
mqttMessageSender.publish(errorTopic, "{\"msg\":\"设备" + funcType + "功能忙,请稍后重试\"}");
|
mqttMessageSender.publish(errorTopic, "{\"msg\":\"设备" + funcType + "功能忙,请稍后重试\"}");
|
||||||
// 优化:替换System.err为log.warn
|
// 优化:替换System.err为log.warn
|
||||||
log.warn("【分布式锁】前端{}操作设备{}的{}功能失败", clientId, deviceId, funcType);
|
log.warn("【分布式锁】前端{}操作设备{}的{}功能失败", clientId, deviceId, funcType);
|
||||||
|
|
@ -338,7 +344,8 @@ public class MqttMessageHandler implements SmartLifecycle {
|
||||||
|
|
||||||
// 4. 转发指令到设备
|
// 4. 转发指令到设备
|
||||||
String deviceTopic = "dtu/" + deviceId + "/down";
|
String deviceTopic = "dtu/" + deviceId + "/down";
|
||||||
mqttMessageSender.publish(deviceTopic, payload);
|
//todo
|
||||||
|
// mqttMessageSender.publish(deviceTopic, payload);
|
||||||
// 优化:替换System.out为log.info
|
// 优化:替换System.out为log.info
|
||||||
log.info("【指令转发】前端{} → 设备{}的{}功能", clientId, deviceId, funcType);
|
log.info("【指令转发】前端{} → 设备{}的{}功能", clientId, deviceId, funcType);
|
||||||
}
|
}
|
||||||
|
|
@ -379,7 +386,7 @@ public class MqttMessageHandler implements SmartLifecycle {
|
||||||
// 推送设备最新状态(可选)
|
// 推送设备最新状态(可选)
|
||||||
// String latestStatus = stringRedisTemplate.opsForValue().get("device:latest:" + deviceId);
|
// String latestStatus = stringRedisTemplate.opsForValue().get("device:latest:" + deviceId);
|
||||||
// if (latestStatus != null) {
|
// if (latestStatus != null) {
|
||||||
// String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/up";
|
// String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";
|
||||||
// try {
|
// try {
|
||||||
// mqttMessageSender.publish(frontendTopic, latestStatus);
|
// mqttMessageSender.publish(frontendTopic, latestStatus);
|
||||||
// } catch (MqttException e) {
|
// } catch (MqttException e) {
|
||||||
|
|
@ -442,7 +449,7 @@ public class MqttMessageHandler implements SmartLifecycle {
|
||||||
for (String deviceId : deviceSet) {
|
for (String deviceId : deviceSet) {
|
||||||
String subKey = "sub:" + deviceId;
|
String subKey = "sub:" + deviceId;
|
||||||
stringRedisTemplate.opsForSet().remove(subKey, clientId);
|
stringRedisTemplate.opsForSet().remove(subKey, clientId);
|
||||||
String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/up";
|
String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";
|
||||||
frontendTopics.add(frontendTopic);
|
frontendTopics.add(frontendTopic);
|
||||||
log.info("【批量取消】前端{}取消设备{}订阅", clientId, deviceId);
|
log.info("【批量取消】前端{}取消设备{}订阅", clientId, deviceId);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue