前端批量取消设备失败兜底,订阅关系默认一小时,根据心跳来续期订阅关系
parent
1c9549b105
commit
f798ac3682
|
|
@ -12,4 +12,4 @@ spring:
|
||||||
latest-ttl-seconds: 120 #设备最新状态缓存的过期时间(秒)。
|
latest-ttl-seconds: 120 #设备最新状态缓存的过期时间(秒)。
|
||||||
# 自动关闭任务线程池大小
|
# 自动关闭任务线程池大小
|
||||||
auto-off-thread-pool-size: 5
|
auto-off-thread-pool-size: 5
|
||||||
online-ttl-seconds: 60 # 在线新跳ttl
|
sub-ttl-seconds: 3600 # 在线新跳ttl
|
||||||
|
|
@ -88,10 +88,6 @@ public class MqttMessageHandler implements SmartLifecycle {
|
||||||
@Value("${spring.mqtt.default-topic}")
|
@Value("${spring.mqtt.default-topic}")
|
||||||
private String defaultTopic;
|
private String defaultTopic;
|
||||||
|
|
||||||
// 新增:前端在线TTL(配合前端心跳,每20秒续一次,TTL建议60秒)
|
|
||||||
@Value("${spring.mqtt.online-ttl-seconds:60}")
|
|
||||||
private int onlineTtlSeconds;
|
|
||||||
|
|
||||||
// 新增:最新状态缓存TTL(设备每10秒上报一次,缓存一小段时间即可)
|
// 新增:最新状态缓存TTL(设备每10秒上报一次,缓存一小段时间即可)
|
||||||
@Value("${spring.mqtt.latest-ttl-seconds:120}")
|
@Value("${spring.mqtt.latest-ttl-seconds:120}")
|
||||||
private int latestTtlSeconds;
|
private int latestTtlSeconds;
|
||||||
|
|
@ -100,6 +96,10 @@ public class MqttMessageHandler implements SmartLifecycle {
|
||||||
@Value("${spring.mqtt.auto-off-thread-pool-size:5}")
|
@Value("${spring.mqtt.auto-off-thread-pool-size:5}")
|
||||||
private int autoOffThreadPoolSize;
|
private int autoOffThreadPoolSize;
|
||||||
|
|
||||||
|
// 新增:前端订阅关系TTL(兜底“取消订阅失败/异常退出”)——只维护subc:{clientId}的TTL
|
||||||
|
@Value("${spring.mqtt.subc-ttl-seconds:3600}")
|
||||||
|
private int subcTtlSeconds;
|
||||||
|
|
||||||
// 优化:统一使用SLF4J日志(JDK 8兼容)
|
// 优化:统一使用SLF4J日志(JDK 8兼容)
|
||||||
private static final Logger log = LoggerFactory.getLogger(MqttMessageHandler.class);
|
private static final Logger log = LoggerFactory.getLogger(MqttMessageHandler.class);
|
||||||
|
|
||||||
|
|
@ -350,7 +350,7 @@ public class MqttMessageHandler implements SmartLifecycle {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if(hasAutoOffTask(deviceId) && isValidStatus){
|
if (hasAutoOffTask(deviceId) && isValidStatus) {
|
||||||
// ✅ 8个功能码状态包:无条件写device:latest:{deviceId},避免自动关读不到最新状态
|
// ✅ 8个功能码状态包:无条件写device:latest:{deviceId},避免自动关读不到最新状态
|
||||||
stringRedisTemplate.opsForValue().set(
|
stringRedisTemplate.opsForValue().set(
|
||||||
"device:latest:" + deviceId,
|
"device:latest:" + deviceId,
|
||||||
|
|
@ -369,20 +369,18 @@ public class MqttMessageHandler implements SmartLifecycle {
|
||||||
// 推送给每个订阅的前端
|
// 推送给每个订阅的前端
|
||||||
for (String clientId : subscribedClients) {
|
for (String clientId : subscribedClients) {
|
||||||
|
|
||||||
// 新增:只有在线才转发;不在线就清理残留订阅,解决“取消订阅失败兜底”
|
// 方案B:不再依赖online:;改为校验subc:{clientId}是否仍包含deviceId(取消订阅失败/异常退出兜底)
|
||||||
String onlineKey = "online:" + clientId;
|
Boolean stillSub = stringRedisTemplate.opsForSet().isMember("subc:" + clientId, deviceId);
|
||||||
Boolean online = stringRedisTemplate.hasKey(onlineKey);
|
if (!stillSub) {
|
||||||
if (!online) {
|
// 关系不存在:清理sub:{deviceId}残留,避免一直给前端发
|
||||||
// 不在线:从两边索引移除(避免一直给离线前端发)
|
|
||||||
stringRedisTemplate.opsForSet().remove("sub:" + deviceId, clientId);
|
stringRedisTemplate.opsForSet().remove("sub:" + deviceId, clientId);
|
||||||
stringRedisTemplate.opsForSet().remove("subc:" + clientId, deviceId);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// 前端专属主题:frontend/{clientId}/dtu/{deviceId}/up
|
// 前端专属主题:frontend/{clientId}/dtu/{deviceId}/up
|
||||||
String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";
|
String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";
|
||||||
// 发布消息
|
// 发布消息
|
||||||
mqttMessageSender.publish(frontendTopic, payload);
|
mqttMessageSender.publish(frontendTopic, payload);
|
||||||
log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic);
|
// log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// 优化:替换System.out为log.info
|
// 优化:替换System.out为log.info
|
||||||
|
|
@ -570,12 +568,11 @@ public class MqttMessageHandler implements SmartLifecycle {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 在线标记:online:{clientId} => 1(带TTL)
|
// 续期subc:{clientId}
|
||||||
String onlineKey = "online:" + clientId;
|
stringRedisTemplate.expire("subc:" + clientId, subcTtlSeconds, TimeUnit.SECONDS);
|
||||||
stringRedisTemplate.opsForValue().set(onlineKey, "1", onlineTtlSeconds, TimeUnit.SECONDS);
|
|
||||||
|
|
||||||
// todo 生产环境不建议打印每次心跳
|
// todo 生产环境不建议打印每次心跳
|
||||||
log.debug("【在线心跳】clientId={} 在线TTL={}s payload={}", clientId, onlineTtlSeconds, payload);
|
log.debug("【在线心跳】clientId={} 续期subcTTL={}s payload={}", clientId, subcTtlSeconds, payload);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.warn("【在线心跳】处理失败 topic={} msg={}", topic, e.getMessage());
|
log.warn("【在线心跳】处理失败 topic={} msg={}", topic, e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
@ -609,6 +606,10 @@ public class MqttMessageHandler implements SmartLifecycle {
|
||||||
// 保存订阅关系到Redis
|
// 保存订阅关系到Redis
|
||||||
stringRedisTemplate.opsForSet().add("sub:" + deviceId, clientId);
|
stringRedisTemplate.opsForSet().add("sub:" + deviceId, clientId);
|
||||||
stringRedisTemplate.opsForSet().add("subc:" + clientId, deviceId);
|
stringRedisTemplate.opsForSet().add("subc:" + clientId, deviceId);
|
||||||
|
|
||||||
|
// 新增:订阅成功后给subc设置TTL(兜底“取消订阅失败/异常退出”)
|
||||||
|
stringRedisTemplate.expire("subc:" + clientId, subcTtlSeconds, TimeUnit.SECONDS);
|
||||||
|
|
||||||
log.info("【订阅管理】前端{}订阅设备{}成功", clientId, deviceId);
|
log.info("【订阅管理】前端{}订阅设备{}成功", clientId, deviceId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -697,6 +698,9 @@ public class MqttMessageHandler implements SmartLifecycle {
|
||||||
}
|
}
|
||||||
connection.sAdd(subcKey, deviceIdBytesArray);
|
connection.sAdd(subcKey, deviceIdBytesArray);
|
||||||
|
|
||||||
|
// 新增:给subc设置TTL(兜底“取消订阅失败/异常退出”)
|
||||||
|
connection.expire(subcKey, subcTtlSeconds);
|
||||||
|
|
||||||
// 执行事务
|
// 执行事务
|
||||||
connection.exec();
|
connection.exec();
|
||||||
return null;
|
return null;
|
||||||
|
|
@ -707,7 +711,7 @@ public class MqttMessageHandler implements SmartLifecycle {
|
||||||
clientId, userId, validDeviceIds.size(), validDeviceIds);
|
clientId, userId, validDeviceIds.size(), validDeviceIds);
|
||||||
return validDeviceIds.size();
|
return validDeviceIds.size();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("【全量订阅】前端{}订阅用户{}名下设备失败", clientId, userId, e);
|
log.error("【全量------订阅】前端{}订阅用户{}名下设备失败", clientId, userId, e);
|
||||||
throw new RuntimeException("全量订阅失败:" + e.getMessage());
|
throw new RuntimeException("全量订阅失败:" + e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue