后端转发前端时,判断前端设备是否在线,目前前端每20秒推次心跳,每次有效期2分钟
parent
9fc3b1d372
commit
89ba26c77c
|
|
@ -11,4 +11,5 @@ spring:
|
||||||
keep-alive: 60 # 心跳间隔
|
keep-alive: 60 # 心跳间隔
|
||||||
latest-ttl-seconds: 120 #设备最新状态缓存的过期时间(秒)。
|
latest-ttl-seconds: 120 #设备最新状态缓存的过期时间(秒)。
|
||||||
# 自动关闭任务线程池大小
|
# 自动关闭任务线程池大小
|
||||||
auto-off-thread-pool-size: 5
|
auto-off-thread-pool-size: 5
|
||||||
|
online-ttl-seconds: 60 # 在线新跳ttl
|
||||||
|
|
@ -88,9 +88,9 @@ public class MqttMessageHandler implements SmartLifecycle {
|
||||||
@Value("${spring.mqtt.default-topic}")
|
@Value("${spring.mqtt.default-topic}")
|
||||||
private String defaultTopic;
|
private String defaultTopic;
|
||||||
|
|
||||||
// 新增:自动关延迟秒数(固定多少秒-n秒)
|
// 新增:前端在线TTL(配合前端心跳,每20秒续一次,TTL建议60秒)
|
||||||
@Value("${spring.mqtt.auto-off-seconds:30}")
|
@Value("${spring.mqtt.online-ttl-seconds:60}")
|
||||||
private int autoOffSeconds;
|
private int onlineTtlSeconds;
|
||||||
|
|
||||||
// 新增:最新状态缓存TTL(设备每10秒上报一次,缓存一小段时间即可)
|
// 新增:最新状态缓存TTL(设备每10秒上报一次,缓存一小段时间即可)
|
||||||
@Value("${spring.mqtt.latest-ttl-seconds:120}")
|
@Value("${spring.mqtt.latest-ttl-seconds:120}")
|
||||||
|
|
@ -260,6 +260,10 @@ public class MqttMessageHandler implements SmartLifecycle {
|
||||||
else if (topic.matches("frontend/\\w+/control/\\w+")) {
|
else if (topic.matches("frontend/\\w+/control/\\w+")) {
|
||||||
handleFrontendControl(topic, payload);
|
handleFrontendControl(topic, payload);
|
||||||
}
|
}
|
||||||
|
// 新增:前端在线心跳主题:frontend/{clientId}/online
|
||||||
|
else if (topic.matches("frontend/\\w+/online")) {
|
||||||
|
handleFrontendOnline(topic, payload);
|
||||||
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("【MQTT消息处理异常】topic={}", topic, e);
|
log.error("【MQTT消息处理异常】topic={}", topic, e);
|
||||||
}
|
}
|
||||||
|
|
@ -342,7 +346,7 @@ public class MqttMessageHandler implements SmartLifecycle {
|
||||||
for (String validCode : VALID_FUNC_CODES) {
|
for (String validCode : VALID_FUNC_CODES) {
|
||||||
if (!payloadObj.containsKey(validCode)) {
|
if (!payloadObj.containsKey(validCode)) {
|
||||||
isValidStatus = false;
|
isValidStatus = false;
|
||||||
log.debug("【设备状态包】结构不合法(非8个功能码),跳过Redis写入,deviceId={};payload={}", deviceId, payload);
|
// log.debug("【设备状态包】结构不合法(非8个功能码),跳过Redis写入,deviceId={};payload={}", deviceId, payload);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -364,11 +368,21 @@ public class MqttMessageHandler implements SmartLifecycle {
|
||||||
if (subscribedClients != null && !subscribedClients.isEmpty()) {
|
if (subscribedClients != null && !subscribedClients.isEmpty()) {
|
||||||
// 推送给每个订阅的前端
|
// 推送给每个订阅的前端
|
||||||
for (String clientId : subscribedClients) {
|
for (String clientId : subscribedClients) {
|
||||||
|
|
||||||
|
// 新增:只有在线才转发;不在线就清理残留订阅,解决“取消订阅失败兜底”
|
||||||
|
String onlineKey = "online:" + clientId;
|
||||||
|
Boolean online = stringRedisTemplate.hasKey(onlineKey);
|
||||||
|
if (online == null || !online) {
|
||||||
|
// 不在线:从两边索引移除(避免一直给离线前端发)
|
||||||
|
stringRedisTemplate.opsForSet().remove("sub:" + deviceId, clientId);
|
||||||
|
stringRedisTemplate.opsForSet().remove("subc:" + clientId, deviceId);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
// 前端专属主题:frontend/{clientId}/dtu/{deviceId}/up
|
// 前端专属主题:frontend/{clientId}/dtu/{deviceId}/up
|
||||||
String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";
|
String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";
|
||||||
// 发布消息
|
// 发布消息
|
||||||
mqttMessageSender.publish(frontendTopic, payload);
|
mqttMessageSender.publish(frontendTopic, payload);
|
||||||
// log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic);
|
log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// 优化:替换System.out为log.info
|
// 优化:替换System.out为log.info
|
||||||
|
|
@ -541,6 +555,32 @@ public class MqttMessageHandler implements SmartLifecycle {
|
||||||
log.info("【指令转发】前端{} → 设备{}的{}功能", clientId, deviceId, funcType);
|
log.info("【指令转发】前端{} → 设备{}的{}功能", clientId, deviceId, funcType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 新增:处理前端在线心跳:写入Redis在线标记(带TTL)
|
||||||
|
* 主题格式:frontend/{clientId}/online
|
||||||
|
*/
|
||||||
|
private void handleFrontendOnline(String topic, String payload) {
|
||||||
|
try {
|
||||||
|
String[] parts = topic.split("/");
|
||||||
|
if (parts.length < 3) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
String clientId = parts[1];
|
||||||
|
if (!StringUtils.hasText(clientId)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 在线标记:online:{clientId} => 1(带TTL)
|
||||||
|
String onlineKey = "online:" + clientId;
|
||||||
|
stringRedisTemplate.opsForValue().set(onlineKey, "1", onlineTtlSeconds, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
// todo 生产环境不建议打印每次心跳
|
||||||
|
log.debug("【在线心跳】clientId={} 在线TTL={}s payload={}", clientId, onlineTtlSeconds, payload);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("【在线心跳】处理失败 topic={} msg={}", topic, e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 权限校验逻辑(示例)
|
* 权限校验逻辑(示例)
|
||||||
* 可根据业务需求扩展:
|
* 可根据业务需求扩展:
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue