新增ack主题

master
lld 2026-02-01 17:59:44 +08:00
parent 7dd7339953
commit 28c357ddf5
5 changed files with 208 additions and 135 deletions

View File

@ -13,4 +13,4 @@ spring:
# 自动关闭任务线程池大小
auto-off-thread-pool-size: 5
subc-ttl-seconds: 3600 # 在线新跳ttl
dtu-ctl-lock-ttl: 60
dtu-ctl-lock-ttl: 20

View File

@ -0,0 +1,171 @@
package com.agri.framework.interceptor;
import com.agri.framework.manager.MqttAutoOffManager;
import com.agri.system.domain.SysAgriLimit;
import com.agri.system.domain.SysDevOperLog;
import com.agri.system.service.ISysAgriLimitService;
import com.agri.system.service.ISysDevOperLogService;
import com.alibaba.fastjson2.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@Component
public class DeviceAckHandler {
/**
* 使SLF4JJDK 8
*/
private static final Logger log = LoggerFactory.getLogger(DeviceAckHandler.class);
/**
* Redis线
*/
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* /
*/
@Resource
private MqttAutoOffManager mqttAutoOffManager;
/**
*
*/
@Resource
private ISysAgriLimitService agriLimitService;
// 新增最新状态缓存TTL设备每10秒上报一次缓存一小段时间即可
@Value("${spring.mqtt.latest-ttl-seconds:120}")
private int latestTtlSeconds;
@Autowired
private ISysDevOperLogService sysDevOperLogService;
// 初始化映射(建议放在类初始化块/构造方法中,只初始化一次)
private static final Map<String, Function<SysAgriLimit, Integer>> LIMIT_MAP = new HashMap<>();
private static final Set<String> VALID_FUNC_CODES = new HashSet<>();
static {
LIMIT_MAP.put("jm1g1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1gLimit())));
LIMIT_MAP.put("jm2g1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2gLimit())));
LIMIT_MAP.put("jbg1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbgLimit())));
LIMIT_MAP.put("jm3g1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3gLimit())));
LIMIT_MAP.put("jm2k1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2kLimit())));
LIMIT_MAP.put("jm3k1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3kLimit())));
LIMIT_MAP.put("jbk1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbkLimit())));
LIMIT_MAP.put("jm1k1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1kLimit())));
VALID_FUNC_CODES.add("jm1g");
VALID_FUNC_CODES.add("jm2g");
VALID_FUNC_CODES.add("jbg");
VALID_FUNC_CODES.add("jm3g");
VALID_FUNC_CODES.add("jm2k");
VALID_FUNC_CODES.add("jm3k");
VALID_FUNC_CODES.add("jbk");
VALID_FUNC_CODES.add("jm1k");
}
public void isStartAutoOffTask(JSONObject payloadObj, String deviceId, String payload) {
// 第二步:判断是否为设备回执({"suc":true/false,"prop":{"功能码":指令}}
String funcType = null;
Integer funcValue = null;
boolean isAck = false;
// 新增:标记是否需要执行自动关任务(全局可用)
// 第二步设备回执处理逻辑完全移除Redis写入
if (payloadObj.containsKey("suc") && payloadObj.containsKey("prop")) {
isAck = true;
JSONObject propObj = payloadObj.getJSONObject("prop");
if (propObj != null && !propObj.isEmpty()) {
// 提取prop中的第一个功能码
Map.Entry<String, Object> propEntry = propObj.entrySet().iterator().next();
funcType = propEntry.getKey();
try {
funcValue = Integer.parseInt(String.valueOf(propEntry.getValue()));
} catch (Exception ignore) {
}
// 释放对应功能的分布式锁
String lockKey = "lock:" + deviceId + ":" + funcType;
Boolean delete = stringRedisTemplate.delete(lockKey);
if (propObj.size() > 1) {
log.warn("【设备回执】prop包含多个功能码仅处理第一个{}", propObj.keySet());
}
log.info("【设备回执】设备{}的{}功能执行完成,已释放锁:{},{}", deviceId, funcType, lockKey, delete);
int runTime = 0;
// 回执成功且值=1时启动自动关闭任务保留原有逻辑
boolean suc = payloadObj.getBooleanValue("suc");
if (suc && StringUtils.hasText(funcType) && funcValue != null && funcValue == 1) {
SysAgriLimit agriLimit = agriLimitService.lambdaQuery()
.eq(SysAgriLimit::getImei, deviceId)
.one();
int autoOffSeconds = 0;
if (agriLimit != null) {
autoOffSeconds = LIMIT_MAP.getOrDefault(funcType, k -> 0).apply(agriLimit);
}
runTime = autoOffSeconds;
// 新增:判断是否真的需要执行自动关任务(延迟秒数>0才是有效任务
if (autoOffSeconds > 0) {
mqttAutoOffManager.scheduleAutoOff(deviceId, funcType, autoOffSeconds);
log.debug("【自动关任务】标记需要执行deviceId={}, funcType={}, delay={}s", deviceId, funcType, autoOffSeconds);
}
}
if (suc && StringUtils.hasText(funcType) && funcValue != null && funcValue == 0) {
mqttAutoOffManager.cancelAutoOff(deviceId, funcType);
}
sysDevOperLogService.lambdaUpdate()
.eq(SysDevOperLog::getImei, deviceId)
.eq(SysDevOperLog::getFuncCode, funcType)
.eq(SysDevOperLog::getOpType, funcValue)
.eq(SysDevOperLog::getLockAcquired,1)
.orderByDesc(SysDevOperLog::getCreateTime)
.last("LIMIT 1")
.set(SysDevOperLog::getAckReceived,1)
.set(SysDevOperLog::getAckSuc, suc?1:0)
.set(SysDevOperLog::getIsLockSuc,delete?1:0)
.set(SysDevOperLog::getIsTask,runTime > 0?1:0)
.set(SysDevOperLog::getRunTime, runTime)
.set(SysDevOperLog::getNoTaskReason,runTime > 0?null:"【自动关任务】标记不符合执行运行时间未配置,当前运行时间:【"+runTime+" s】")
.set(SysDevOperLog::getUpdateBy,"设备回执")
.set(SysDevOperLog::getAck, payload)
.update();
}
}
// 第三步仅处理非回执的设备状态包且仅当是8个功能码结构就写入Redis
// 有没有人订阅都得写,只要发送设备开的指令成功了就得写
if (!isAck) {
// 1) 先校验状态包是否包含8个固定功能码核心只有这种结构才写入
boolean isValidStatus = true;
for (String validCode : VALID_FUNC_CODES) {
if (!payloadObj.containsKey(validCode)) {
isValidStatus = false;
// log.debug("【设备状态包】结构不合法非8个功能码跳过Redis写入deviceId={}payload={}", deviceId, payload);
break;
}
}
if (mqttAutoOffManager.hasAutoOffTask(deviceId) && isValidStatus) {
// ✅ 8个功能码状态包无条件写device:latest:{deviceId},避免自动关读不到最新状态
stringRedisTemplate.opsForValue().set(
"device:latest:" + deviceId,
payload, // 完整的8功能码JSON
latestTtlSeconds,
TimeUnit.SECONDS
);
// log.debug("【设备状态包】写入Redis成功deviceId={}", deviceId);
}
}
}
}

View File

@ -11,6 +11,7 @@ import com.agri.system.service.ISysDevOperLogService;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import org.apache.commons.lang3.ObjectUtils;
import org.checkerframework.checker.units.qual.A;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -64,47 +65,10 @@ public class DeviceStatusHandler {
@Resource
private MqttSubscriptionManager mqttSubscriptionManager;
/**
* /
*/
@Resource
private MqttAutoOffManager mqttAutoOffManager;
/**
*
*/
@Resource
private ISysAgriLimitService agriLimitService;
// 新增最新状态缓存TTL设备每10秒上报一次缓存一小段时间即可
@Value("${spring.mqtt.latest-ttl-seconds:120}")
private int latestTtlSeconds;
@Autowired
private ISysDevOperLogService sysDevOperLogService;
// 初始化映射(建议放在类初始化块/构造方法中,只初始化一次)
private static final Map<String, Function<SysAgriLimit, Integer>> LIMIT_MAP = new HashMap<>();
private static final Set<String> VALID_FUNC_CODES = new HashSet<>();
static {
LIMIT_MAP.put("jm1g1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1gLimit())));
LIMIT_MAP.put("jm2g1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2gLimit())));
LIMIT_MAP.put("jbg1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbgLimit())));
LIMIT_MAP.put("jm3g1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3gLimit())));
LIMIT_MAP.put("jm2k1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2kLimit())));
LIMIT_MAP.put("jm3k1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3kLimit())));
LIMIT_MAP.put("jbk1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbkLimit())));
LIMIT_MAP.put("jm1k1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1kLimit())));
VALID_FUNC_CODES.add("jm1g");
VALID_FUNC_CODES.add("jm2g");
VALID_FUNC_CODES.add("jbg");
VALID_FUNC_CODES.add("jm3g");
VALID_FUNC_CODES.add("jm2k");
VALID_FUNC_CODES.add("jm3k");
VALID_FUNC_CODES.add("jbk");
VALID_FUNC_CODES.add("jm1k");
}
private DeviceAckHandler deviceAckHandler;
/**
*
@ -129,105 +93,19 @@ public class DeviceStatusHandler {
// 解析设备ID主题格式为dtu/{deviceId}/up分割后第2个元素是设备ID
String deviceId = extractDeviceId(topic);
if (deviceId == null) return;
String[] segments = topic.split("/");
String action = segments[2];
// 转发消息
forwardPayload(deviceId, payload,payloadObj);
forwardPayload(deviceId, payload,payloadObj,action);
isStartAutoOffTask(payloadObj,deviceId,payload);
}
private void isStartAutoOffTask(JSONObject payloadObj,String deviceId,String payload) {
// 第二步:判断是否为设备回执({"suc":true/false,"prop":{"功能码":指令}}
String funcType = null;
Integer funcValue = null;
boolean isAck = false;
// 新增:标记是否需要执行自动关任务(全局可用)
// 第二步设备回执处理逻辑完全移除Redis写入
if (payloadObj.containsKey("suc") && payloadObj.containsKey("prop")) {
isAck = true;
JSONObject propObj = payloadObj.getJSONObject("prop");
if (propObj != null && !propObj.isEmpty()) {
// 提取prop中的第一个功能码
Map.Entry<String, Object> propEntry = propObj.entrySet().iterator().next();
funcType = propEntry.getKey();
try {
funcValue = Integer.parseInt(String.valueOf(propEntry.getValue()));
} catch (Exception ignore) {
}
// 释放对应功能的分布式锁
String lockKey = "lock:" + deviceId + ":" + funcType;
Boolean delete = stringRedisTemplate.delete(lockKey);
if (propObj.size() > 1) {
log.warn("【设备回执】prop包含多个功能码仅处理第一个{}", propObj.keySet());
}
log.info("【设备回执】设备{}的{}功能执行完成,已释放锁:{},{}", deviceId, funcType, lockKey, delete);
int runTime = 0;
// 回执成功且值=1时启动自动关闭任务保留原有逻辑
boolean suc = payloadObj.getBooleanValue("suc");
if (suc && StringUtils.hasText(funcType) && funcValue != null && funcValue == 1) {
SysAgriLimit agriLimit = agriLimitService.lambdaQuery()
.eq(SysAgriLimit::getImei, deviceId)
.one();
int autoOffSeconds = 0;
if (agriLimit != null) {
autoOffSeconds = LIMIT_MAP.getOrDefault(funcType, k -> 0).apply(agriLimit);
}
runTime = autoOffSeconds;
// 新增:判断是否真的需要执行自动关任务(延迟秒数>0才是有效任务
if (autoOffSeconds > 0) {
mqttAutoOffManager.scheduleAutoOff(deviceId, funcType, autoOffSeconds);
log.debug("【自动关任务】标记需要执行deviceId={}, funcType={}, delay={}s", deviceId, funcType, autoOffSeconds);
}
}
if (suc && StringUtils.hasText(funcType) && funcValue != null && funcValue == 0) {
mqttAutoOffManager.cancelAutoOff(deviceId, funcType);
}
sysDevOperLogService.lambdaUpdate()
.eq(SysDevOperLog::getImei, deviceId)
.eq(SysDevOperLog::getFuncCode, funcType)
.eq(SysDevOperLog::getOpType, funcValue)
.eq(SysDevOperLog::getLockAcquired,1)
.orderByDesc(SysDevOperLog::getCreateTime)
.last("LIMIT 1")
.set(SysDevOperLog::getAckReceived,1)
.set(SysDevOperLog::getAckSuc, suc?1:0)
.set(SysDevOperLog::getIsLockSuc,delete?1:0)
.set(SysDevOperLog::getIsTask,runTime > 0?1:0)
.set(SysDevOperLog::getRunTime, runTime)
.set(SysDevOperLog::getNoTaskReason,runTime > 0?null:"【自动关任务】标记不符合执行运行时间未配置,当前运行时间:【"+runTime+" s】")
.set(SysDevOperLog::getUpdateBy,"设备回执")
.set(SysDevOperLog::getAck, payload)
.update();
// 获取第二个动态段,如"up"或"ack"
if ("ack".equals(action)) {
deviceAckHandler.isStartAutoOffTask(payloadObj,deviceId,payload);
}
}
// 第三步仅处理非回执的设备状态包且仅当是8个功能码结构就写入Redis
// 有没有人订阅都得写,只要发送设备开的指令成功了就得写
if (!isAck) {
// 1) 先校验状态包是否包含8个固定功能码核心只有这种结构才写入
boolean isValidStatus = true;
for (String validCode : VALID_FUNC_CODES) {
if (!payloadObj.containsKey(validCode)) {
isValidStatus = false;
// log.debug("【设备状态包】结构不合法非8个功能码跳过Redis写入deviceId={}payload={}", deviceId, payload);
break;
}
}
if (mqttAutoOffManager.hasAutoOffTask(deviceId) && isValidStatus) {
// ✅ 8个功能码状态包无条件写device:latest:{deviceId},避免自动关读不到最新状态
stringRedisTemplate.opsForValue().set(
"device:latest:" + deviceId,
payload, // 完整的8功能码JSON
latestTtlSeconds,
TimeUnit.SECONDS
);
// log.debug("【设备状态包】写入Redis成功deviceId={}", deviceId);
}
}
}
private void forwardPayload(String deviceId,String payload,JSONObject payloadObj) {
private void forwardPayload(String deviceId,String payload,JSONObject payloadObj,String action) {
try {
boolean isAck = payloadObj.containsKey("suc") && payloadObj.containsKey("prop");
JSONObject sendObj = payloadObj; // 默认直接用原对象
@ -274,6 +152,9 @@ public class DeviceStatusHandler {
}
// 前端专属主题frontend/{clientId}/dtu/{deviceId}/listener
String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";
if ("ack".equals(action)) {
frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/ack";
}
// 发布消息
mqttMessageSender.publish(frontendTopic, sendObj.toJSONString());
// log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic);

View File

@ -316,6 +316,20 @@ public class MqttAutoOffManager {
if (!oldFuture.cancel(false)) {
return;
}
sysDevOperLogService.lambdaUpdate()
.eq(SysDevOperLog::getImei, deviceId)
.eq(SysDevOperLog::getFuncCode, funcType)
.eq(SysDevOperLog::getOpType, 1)
.eq(SysDevOperLog::getOpSource, 1)
.eq(SysDevOperLog::getAckSuc, 1)
.eq(SysDevOperLog::getIsTask, 1)
.orderByDesc(SysDevOperLog::getCreateTime)
.last("LIMIT 2")
.set(SysDevOperLog::getExecResult,2)
.set(SysDevOperLog::getLatestState, "{\"msg\":\"任务已退出\"}")
.set(SysDevOperLog::getUpdateBy,"自动关")
.set(SysDevOperLog::getSkipReason, "旧任务已退出")
.update();
// cancel成功旧任务不会跑了这时再remove并减计数
autoOffFutureMap.remove(taskKey, oldFuture);
decAutoOffCnt(deviceId);

View File

@ -1,6 +1,7 @@
package com.agri.framework.web.dispatcher;
import com.agri.common.utils.wechat.WxUtil;
import com.agri.framework.interceptor.DeviceAckHandler;
import com.agri.framework.interceptor.DeviceStatusHandler;
import com.agri.framework.interceptor.FrontendControlHandler;
import com.agri.framework.interceptor.FrontendOnlineHandler;
@ -40,6 +41,13 @@ public class MqttMessageDispatcher {
@Resource
private FrontendOnlineHandler frontendOnlineHandler;
/**
* 线
*/
@Resource
private DeviceAckHandler deviceAckHandler;
/**
*
*
@ -52,7 +60,7 @@ public class MqttMessageDispatcher {
// log.info("【MQTT接收】topic={}, payload={}", topic, payload);
// 设备状态主题dtu/{deviceId}/up
if (topic.matches("dtu/\\w+/up")) {
if (topic.matches("dtu/\\w+/\\w")) {
deviceStatusHandler.handle(topic, payload);
}
// 处理前端控制指令主题frontend/{clientId}/control/{deviceId}
@ -63,7 +71,6 @@ public class MqttMessageDispatcher {
else if (topic.matches("frontend/\\w+/online")) {
frontendOnlineHandler.handle(topic, payload);
}
// todo 是否加回复主题??
} catch (Exception e) {
WxUtil.pushText("【MQTT消息处理异常】\n topic: "+ topic+"\n cause: "+e);
log.error("【MQTT消息处理异常】topic={}", topic, e);