mqtt延时任务

feasure
xce 2026-01-17 06:27:09 +08:00
parent ec17992c55
commit b55e29045d
4 changed files with 34 additions and 13 deletions

View File

@ -46,6 +46,7 @@ public class CacheController
caches.add(new SysCache(CacheConstants.PWD_ERR_CNT_KEY, "密码错误次数")); caches.add(new SysCache(CacheConstants.PWD_ERR_CNT_KEY, "密码错误次数"));
caches.add(new SysCache(CacheConstants.SUB_CLIENT_ID, "Redis订阅关系")); caches.add(new SysCache(CacheConstants.SUB_CLIENT_ID, "Redis订阅关系"));
caches.add(new SysCache(CacheConstants.SUB_IMEI, "Redis订阅关系【反向】")); caches.add(new SysCache(CacheConstants.SUB_IMEI, "Redis订阅关系【反向】"));
caches.add(new SysCache(CacheConstants.LOCK_IMEI_FUNC, "设备控制锁"));
} }
@PreAuthorize("@ss.hasPermi('monitor:cache:list')") @PreAuthorize("@ss.hasPermi('monitor:cache:list')")

View File

@ -43,4 +43,5 @@ public class CacheConstants
public static final String PWD_ERR_CNT_KEY = "pwd_err_cnt:"; public static final String PWD_ERR_CNT_KEY = "pwd_err_cnt:";
public static final String SUB_CLIENT_ID = "subc:"; public static final String SUB_CLIENT_ID = "subc:";
public static final String SUB_IMEI = "sub:"; public static final String SUB_IMEI = "sub:";
public static final String LOCK_IMEI_FUNC = "lock:";
} }

View File

@ -212,7 +212,7 @@ public class MqttConfig {
// 3. 发布消息 // 3. 发布消息
client.publish(topic, message); client.publish(topic, message);
// 优化替换System.out为日志框架保留原有输出内容 // 优化替换System.out为日志框架保留原有输出内容
log.info("【MQTT消息发布成功】主题" + topic + ",内容:" + payload); // log.info("【MQTT消息发布成功】主题" + topic + ",内容:" + payload);
} }
} }

View File

@ -220,7 +220,7 @@ public class MqttMessageHandler implements SmartLifecycle {
@Override @Override
public void deliveryComplete(IMqttDeliveryToken token) { public void deliveryComplete(IMqttDeliveryToken token) {
if (token != null && token.getTopics() != null && token.getTopics().length > 0) { if (token != null && token.getTopics() != null && token.getTopics().length > 0) {
log.info("【MQTT确认】消息发布完成clientId{},主题:{}", safeClientId(), token.getTopics()[0]); // log.info("【MQTT确认】消息发布完成clientId{},主题:{}", safeClientId(), token.getTopics()[0]);
} }
} }
}); });
@ -291,6 +291,7 @@ public class MqttMessageHandler implements SmartLifecycle {
Integer funcValue = null; Integer funcValue = null;
boolean isAck = false; boolean isAck = false;
// 第二步设备回执处理逻辑完全移除Redis写入
if (payloadObj.containsKey("suc") && payloadObj.containsKey("prop")) { if (payloadObj.containsKey("suc") && payloadObj.containsKey("prop")) {
isAck = true; isAck = true;
JSONObject propObj = payloadObj.getJSONObject("prop"); JSONObject propObj = payloadObj.getJSONObject("prop");
@ -311,44 +312,60 @@ public class MqttMessageHandler implements SmartLifecycle {
} }
log.info("【设备回执】设备{}的{}功能执行完成,已释放锁:{},{}", deviceId, funcType, lockKey, delete); log.info("【设备回执】设备{}的{}功能执行完成,已释放锁:{},{}", deviceId, funcType, lockKey, delete);
// 回执成功且值=1表示运行/开启)时,起个任务,固定多少秒-n秒 // 回执成功且值=1时启动自动关闭任务保留原有逻辑
// 新增:回执固定是{"suc":true,"prop":{"jm1k":1}}
boolean suc = payloadObj.getBooleanValue("suc"); boolean suc = payloadObj.getBooleanValue("suc");
if (suc && StringUtils.hasText(funcType) && funcValue != null && funcValue == 1) { if (suc && StringUtils.hasText(funcType) && funcValue != null && funcValue == 1) {
SysAgriLimit agriLimit = agriLimitService.lambdaQuery() SysAgriLimit agriLimit = agriLimitService.lambdaQuery()
.eq(SysAgriLimit::getImei, deviceId) .eq(SysAgriLimit::getImei, deviceId)
.one(); .one();
int autoOffSeconds = 0; int autoOffSeconds = 0;
if (agriLimit != null) { if (agriLimit != null) {
autoOffSeconds = LIMIT_MAP.getOrDefault(funcType, k -> 0).apply(agriLimit); autoOffSeconds = LIMIT_MAP.getOrDefault(funcType, k -> 0).apply(agriLimit);
} }
// 启动自动关闭任务(多线程执行)
// 新增:回执成功且值=1表示运行/开启)时,起个任务,固定多少秒-n秒
scheduleAutoOff(deviceId, funcType, autoOffSeconds); scheduleAutoOff(deviceId, funcType, autoOffSeconds);
} }
// ========== 关键设备回执完全不写入Redis ==========
// 移除所有回执相关的Redis写入逻辑
} }
} }
// 设备每10秒上报一次状态包写入latest供自动关任务读取 (只在需要时写,减少消耗) // 第三步仅处理非回执的设备状态包且仅当是8个功能码结构时写入Redis
if (!isAck) { if (!isAck) {
boolean needWriteLatest = false; boolean needWriteLatest = false;
// 1) 有人订阅时才写latest // 1) 先校验状态包是否包含8个固定功能码核心只有这种结构才写入
boolean isValidStatus = true;
for (String validCode : VALID_FUNC_CODES) {
if (!payloadObj.containsKey(validCode)) {
isValidStatus = false;
break;
}
}
if (!isValidStatus) {
log.debug("【设备状态包】结构不合法非8个功能码跳过Redis写入deviceId={}payload={}", deviceId,payload);
}
// 2) 有人订阅时才写latest
Set<String> subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId); Set<String> subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId);
if (subscribedClients != null && !subscribedClients.isEmpty()) { if (subscribedClients != null && !subscribedClients.isEmpty()) {
needWriteLatest = true; needWriteLatest = true;
} }
// 2) 或者存在该设备的自动关任务时也写latest任务需要最新状态判断 // 3) 或者存在该设备的自动关任务时也写latest
if (!needWriteLatest && hasAutoOffTask(deviceId)) { if (!needWriteLatest && hasAutoOffTask(deviceId)) {
needWriteLatest = true; needWriteLatest = true;
} }
// 4) 满足条件则写入完整的8功能码JSON到Redis
if (needWriteLatest) { if (needWriteLatest) {
stringRedisTemplate.opsForValue().set("device:latest:" + deviceId, payload, latestTtlSeconds, TimeUnit.SECONDS); stringRedisTemplate.opsForValue().set(
"device:latest:" + deviceId,
payload, // 完整的8功能码JSON
latestTtlSeconds,
TimeUnit.SECONDS
);
// log.debug("【设备状态包】写入Redis成功deviceId={}", deviceId);
} }
} }
@ -425,6 +442,7 @@ public class MqttMessageHandler implements SmartLifecycle {
private void runAutoOff(String deviceId, String funcType) throws MqttException { private void runAutoOff(String deviceId, String funcType) throws MqttException {
String latest = stringRedisTemplate.opsForValue().get("device:latest:" + deviceId); String latest = stringRedisTemplate.opsForValue().get("device:latest:" + deviceId);
if (!StringUtils.hasText(latest)) { if (!StringUtils.hasText(latest)) {
//todo
log.warn("【自动关任务】无最新状态跳过deviceId={}, funcType={}", deviceId, funcType); log.warn("【自动关任务】无最新状态跳过deviceId={}, funcType={}", deviceId, funcType);
return; return;
} }
@ -457,6 +475,7 @@ public class MqttMessageHandler implements SmartLifecycle {
//todo //todo
mqttMessageSender.publish(deviceTopic, down.toJSONString()); mqttMessageSender.publish(deviceTopic, down.toJSONString());
log.info("【自动关任务】检测仍在运行已下发关闭deviceId={}, funcType={}, payload={}", deviceId, funcType, down.toJSONString()); log.info("【自动关任务】检测仍在运行已下发关闭deviceId={}, funcType={}, payload={}", deviceId, funcType, down.toJSONString());
Boolean deleteSuccess = stringRedisTemplate.delete("device:latest:" + deviceId);
} else { } else {
log.info("【自动关任务】检测未运行或状态未知跳过关闭deviceId={}, funcType={}, current={}", deviceId, funcType, current); log.info("【自动关任务】检测未运行或状态未知跳过关闭deviceId={}, funcType={}, current={}", deviceId, funcType, current);
} }