修改redis 查key 批量查批量删除

feasure
xce 2026-01-18 23:46:00 +08:00
parent f798ac3682
commit ee21847741
1 changed files with 111 additions and 14 deletions

View File

@ -119,6 +119,9 @@ public class MqttMessageHandler implements SmartLifecycle {
// 新增:同设备同功能只保留最后一次自动关任务
private final ConcurrentHashMap<String, ScheduledFuture<?>> autoOffFutureMap = new ConcurrentHashMap<>();
// 新增按设备维度统计“未完成的自动关任务”数量hasAutoOffTask从扫描O(N)降为O(1)
private final ConcurrentHashMap<String, Integer> autoOffDeviceCnt = new ConcurrentHashMap<>();
@Autowired
private ISysAgriLimitService agriLimitService;
@ -367,13 +370,23 @@ public class MqttMessageHandler implements SmartLifecycle {
if (subscribedClients != null && !subscribedClients.isEmpty()) {
// 推送给每个订阅的前端
for (String clientId : subscribedClients) {
// 方案B不再依赖online:改为校验subc:{clientId}是否仍包含deviceId取消订阅失败/异常退出兜底)
Boolean stillSub = stringRedisTemplate.opsForSet().isMember("subc:" + clientId, deviceId);
if (!stillSub) {
List<String> clients = new ArrayList<>(subscribedClients);
// 判断subc是否还存在 一次性查全部 获取失效的clientId
List<Boolean> stillSubs = pipeIsMemberSubc(clients, deviceId);
// 关系不存在清理sub:{deviceId}残留,避免一直给前端发
stringRedisTemplate.opsForSet().remove("sub:" + deviceId, clientId);
List<String> stale = null;
for (int i = 0; i < clients.size(); i++) {
String clientId = clients.get(i);
boolean stillSub = i < stillSubs.size() && Boolean.TRUE.equals(stillSubs.get(i));
if (!stillSub) {
if (stale == null) {
stale = new ArrayList<>();
}
// false不存在添加队列
stale.add(clientId);
continue;
}
// 前端专属主题frontend/{clientId}/dtu/{deviceId}/up
@ -382,27 +395,102 @@ public class MqttMessageHandler implements SmartLifecycle {
mqttMessageSender.publish(frontendTopic, payload);
// log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic);
}
// 删掉设备对应的客户端
if (stale != null && !stale.isEmpty()) {
pipeSRemSub(deviceId, stale);
}
} else {
// 优化替换System.out为log.info
// log.info("【设备状态转发】设备{}无订阅前端,跳过转发", deviceId);
}
}
// 新增pipeline 批量 SISMEMBER subc:{clientId} deviceIdN次->1次往返 拿取失效的client
private List<Boolean> pipeIsMemberSubc(List<String> clientIds, String deviceId) {
if (clientIds == null || clientIds.isEmpty() || !StringUtils.hasText(deviceId)) {
return Collections.emptyList();
}
// ✅ 关键不发“占位命令”只对有效clientId发SISMEMBER同时保证返回结果与入参严格对齐
int n = clientIds.size();
// 创建长度n全部false的队列
List<Boolean> out = new ArrayList<>(Collections.nCopies(n, Boolean.FALSE));
List<Integer> idx = new ArrayList<>(n);
for (int i = 0; i < n; i++) {
if (StringUtils.hasText(clientIds.get(i))) {
// 符合条件存进clientIds对应的索引
idx.add(i);
}
}
if (idx.isEmpty()) {
return out;
}
// 处理每个每个clientId的值是否还在 值存在rs中 执行 stringRedisTemplate.executePipelined
List<Object> rs = stringRedisTemplate.executePipelined((RedisCallback<Object>) connection -> {
RedisSerializer<String> serializer = stringRedisTemplate.getStringSerializer();
byte[] member = serializer.serialize(deviceId);
for (int i : idx) {
String clientId = clientIds.get(i);
// 存放命令
connection.sIsMember(serializer.serialize("subc:" + clientId), member);
}
return null;
});
for (int j = 0; j < idx.size() && j < (rs == null ? 0 : rs.size()); j++) {
out.set(idx.get(j), Boolean.TRUE.equals(rs.get(j)));
}
return out;
}
// 对应的subc不存在 删除对应的sub pipeline 批量 SREM sub:{deviceId} clientId清理残留 N次->1次往返
private void pipeSRemSub(String deviceId, List<String> staleClientIds) {
if (!StringUtils.hasText(deviceId) || staleClientIds == null || staleClientIds.isEmpty()) {
return;
}
stringRedisTemplate.executePipelined((RedisCallback<Object>) connection -> {
RedisSerializer<String> serializer = stringRedisTemplate.getStringSerializer();
byte[] subKey = serializer.serialize("sub:" + deviceId);
for (String clientId : staleClientIds) {
if (StringUtils.hasText(clientId)) {
connection.sRem(subKey, serializer.serialize(clientId));
}
}
return null;
});
}
// 新增:是否存在该设备的自动关任务
private boolean hasAutoOffTask(String deviceId) {
if (!StringUtils.hasText(deviceId)) {
return false;
}
String prefix = "autooff:" + deviceId + ":";
for (String key : autoOffFutureMap.keySet()) {
if (key != null && key.startsWith(prefix)) {
ScheduledFuture<?> f = autoOffFutureMap.get(key);
if (f != null && !f.isCancelled() && !f.isDone()) {
return true;
// ✅ O(1)不再扫描autoOffFutureMap由scheduleAutoOff/runAutoOff维护计数
Integer cnt = autoOffDeviceCnt.get(deviceId);
return cnt != null && cnt > 0;
}
// 新增:自动关任务计数 +1只维护deviceId维度确保hasAutoOffTask为O(1)
private void incAutoOffCnt(String deviceId) {
if (!StringUtils.hasText(deviceId)) {
return;
}
autoOffDeviceCnt.merge(deviceId, 1, (a, b) -> a + b);
}
return false;
// 新增:自动关任务计数 -1避免负数归零则清理key省内存
private void decAutoOffCnt(String deviceId) {
if (!StringUtils.hasText(deviceId)) {
return;
}
autoOffDeviceCnt.compute(deviceId, (k, v) -> {
if (v == null || v <= 1) {
return null;
}
return v - 1;
});
}
// 改造:多线程执行自动关闭任务
@ -425,6 +513,8 @@ public class MqttMessageHandler implements SmartLifecycle {
if (oldFuture != null) {
oldFuture.cancel(false);
log.debug("【自动关任务】取消旧任务:{}", taskKey);
// ✅ 旧任务被替换需要同步减少该设备的“未完成任务数”避免计数虚高导致hasAutoOffTask误判
decAutoOffCnt(deviceId);
}
// 使用多线程池提交任务
@ -436,11 +526,16 @@ public class MqttMessageHandler implements SmartLifecycle {
} finally {
// 任务执行完成后移除映射
autoOffFutureMap.remove(taskKey);
// ✅ 任务结束(成功/失败都算结束减少该设备的“未完成任务数”保证hasAutoOffTask准确
decAutoOffCnt(deviceId);
}
}, delaySeconds, TimeUnit.SECONDS);
// 保存新任务的引用
autoOffFutureMap.put(taskKey, newFuture);
// ✅ 新任务创建成功:增加该设备的“未完成任务数”
incAutoOffCnt(deviceId);
log.info("【自动关任务】已创建多线程deviceId={}, funcType={}, delay={}s", deviceId, funcType, delaySeconds);
}
@ -883,6 +978,8 @@ public class MqttMessageHandler implements SmartLifecycle {
log.debug("【自动关任务】取消任务:{}", entry.getKey());
}
autoOffFutureMap.clear();
// ✅ 停止时直接清空计数,避免残留
autoOffDeviceCnt.clear();
// 2. 优雅关闭线程池
if (autoOffExecutor != null) {