控制指令修改,设备控制锁过期时间调整

master
lld 2026-01-29 20:01:26 +08:00
parent 2d8edaccfc
commit a3019355e0
4 changed files with 48 additions and 22 deletions

View File

@ -13,3 +13,4 @@ spring:
# 自动关闭任务线程池大小 # 自动关闭任务线程池大小
auto-off-thread-pool-size: 5 auto-off-thread-pool-size: 5
subc-ttl-seconds: 3600 # 在线新跳ttl subc-ttl-seconds: 3600 # 在线新跳ttl
dtu-ctl-lock-ttl: 60

View File

@ -80,14 +80,14 @@ public class DeviceStatusHandler {
private static final Map<String, Function<SysAgriLimit, Integer>> LIMIT_MAP = new HashMap<>(); private static final Map<String, Function<SysAgriLimit, Integer>> LIMIT_MAP = new HashMap<>();
private static final Set<String> VALID_FUNC_CODES = new HashSet<>(); private static final Set<String> VALID_FUNC_CODES = new HashSet<>();
static { static {
LIMIT_MAP.put("jm1g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1gLimit()))); LIMIT_MAP.put("jm1g1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1gLimit())));
LIMIT_MAP.put("jm2g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2gLimit()))); LIMIT_MAP.put("jm2g1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2gLimit())));
LIMIT_MAP.put("jbg", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbgLimit()))); LIMIT_MAP.put("jbg1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbgLimit())));
LIMIT_MAP.put("jm3g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3gLimit()))); LIMIT_MAP.put("jm3g1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3gLimit())));
LIMIT_MAP.put("jm2k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2kLimit()))); LIMIT_MAP.put("jm2k1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2kLimit())));
LIMIT_MAP.put("jm3k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3kLimit()))); LIMIT_MAP.put("jm3k1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3kLimit())));
LIMIT_MAP.put("jbk", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbkLimit()))); LIMIT_MAP.put("jbk1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbkLimit())));
LIMIT_MAP.put("jm1k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1kLimit()))); LIMIT_MAP.put("jm1k1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1kLimit())));
VALID_FUNC_CODES.add("jm1g"); VALID_FUNC_CODES.add("jm1g");
VALID_FUNC_CODES.add("jm2g"); VALID_FUNC_CODES.add("jm2g");
@ -123,7 +123,7 @@ public class DeviceStatusHandler {
String deviceId = extractDeviceId(topic); String deviceId = extractDeviceId(topic);
if (deviceId == null) return; if (deviceId == null) return;
// 转发消息 // 转发消息
forwardPayload(deviceId, payload); forwardPayload(deviceId, payload,payloadObj);
isStartAutoOffTask(payloadObj,deviceId,payload); isStartAutoOffTask(payloadObj,deviceId,payload);
@ -204,8 +204,26 @@ public class DeviceStatusHandler {
} }
} }
private void forwardPayload(String deviceId,String payload) { private void forwardPayload(String deviceId,String payload,JSONObject payloadObj) {
try { try {
boolean isAck = payloadObj.containsKey("suc") && payloadObj.containsKey("prop");
JSONObject sendObj = payloadObj; // 默认直接用原对象
// 如果是回执,先拿 funcType
if (isAck) {
JSONObject propObj = payloadObj.getJSONObject("prop");
if (propObj != null && !propObj.isEmpty()) {
String funcType = propObj.entrySet().iterator().next().getKey();
String lockKey = "lock:" + deviceId + ":" + funcType;
// 读取锁的 value比如 autooff / user:1001
String lockHolder = stringRedisTemplate.opsForValue().get(lockKey);
if (lockHolder != null) {
sendObj = new JSONObject(payloadObj); // 只在需要时复制
sendObj.put("clientId", lockHolder);
}
}
}
// 非回执消息:正常转发给订阅前端 // 非回执消息:正常转发给订阅前端
// 查询Redis中订阅该设备的前端列表sub:{deviceId} // 查询Redis中订阅该设备的前端列表sub:{deviceId}
Set<String> subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId); Set<String> subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId);
@ -234,7 +252,7 @@ public class DeviceStatusHandler {
// 前端专属主题frontend/{clientId}/dtu/{deviceId}/listener // 前端专属主题frontend/{clientId}/dtu/{deviceId}/listener
String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";
// 发布消息 // 发布消息
mqttMessageSender.publish(frontendTopic, payload); mqttMessageSender.publish(frontendTopic, sendObj.toJSONString());
// log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic); // log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic);
} }
// 删掉设备对应的客户端 // 删掉设备对应的客户端

View File

@ -10,6 +10,7 @@ import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
@ -55,16 +56,18 @@ public class FrontendControlHandler {
@Autowired @Autowired
private ISysAgriLimitService agriLimitService; private ISysAgriLimitService agriLimitService;
@Value("${dtu-ctl-lock-ttl}")
private int dtuCtlLockTTL;
private static final Map<String, Function<SysAgriLimit, Integer>> LIMIT_MAP = new HashMap<>(); private static final Map<String, Function<SysAgriLimit, Integer>> LIMIT_MAP = new HashMap<>();
static { static {
LIMIT_MAP.put("jm1g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1gLimit()))); LIMIT_MAP.put("jm1g1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1gLimit())));
LIMIT_MAP.put("jm2g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2gLimit()))); LIMIT_MAP.put("jm2g1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2gLimit())));
LIMIT_MAP.put("jbg", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbgLimit()))); LIMIT_MAP.put("jbg1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbgLimit())));
LIMIT_MAP.put("jm3g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3gLimit()))); LIMIT_MAP.put("jm3g1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3gLimit())));
LIMIT_MAP.put("jm2k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2kLimit()))); LIMIT_MAP.put("jm2k1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2kLimit())));
LIMIT_MAP.put("jm3k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3kLimit()))); LIMIT_MAP.put("jm3k1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3kLimit())));
LIMIT_MAP.put("jbk", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbkLimit()))); LIMIT_MAP.put("jbk1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbkLimit())));
LIMIT_MAP.put("jm1k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1kLimit()))); LIMIT_MAP.put("jm1k1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1kLimit())));
} }
/** /**
* ++ * ++
@ -112,7 +115,7 @@ public class FrontendControlHandler {
// 2. 分布式锁设备ID+功能类型(避免同设备同功能并发控制) // 2. 分布式锁设备ID+功能类型(避免同设备同功能并发控制)
String lockKey = "lock:" + deviceId + ":" + funcType; String lockKey = "lock:" + deviceId + ":" + funcType;
Boolean lockSuccess = stringRedisTemplate.opsForValue().setIfAbsent( Boolean lockSuccess = stringRedisTemplate.opsForValue().setIfAbsent(
lockKey, clientId, 60, TimeUnit.SECONDS // 延长至15秒适配设备回执场景 lockKey, clientId, dtuCtlLockTTL, TimeUnit.SECONDS // 延长至15秒适配设备回执场景
); );
if (lockSuccess == null || !lockSuccess) { if (lockSuccess == null || !lockSuccess) {
String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";

View File

@ -63,6 +63,10 @@ public class MqttAutoOffManager {
// 新增:自动关闭任务线程池核心线程数(可配置) // 新增:自动关闭任务线程池核心线程数(可配置)
@Value("${spring.mqtt.auto-off-thread-pool-size:5}") @Value("${spring.mqtt.auto-off-thread-pool-size:5}")
private int autoOffThreadPoolSize; private int autoOffThreadPoolSize;
@Value("${dtu-ctl-lock-ttl}")
private int dtuCtlLockTTL;
/** /**
* 线 * 线
* @param corePoolSize 线 * @param corePoolSize 线
@ -228,7 +232,7 @@ public class MqttAutoOffManager {
// 新增:自动关也走分布式锁(避免与前端并发控制同一功能导致乱序/互相覆盖) // 新增:自动关也走分布式锁(避免与前端并发控制同一功能导致乱序/互相覆盖)
String lockKey = "lock:" + deviceId + ":" + funcType; String lockKey = "lock:" + deviceId + ":" + funcType;
Boolean lockSuccess = stringRedisTemplate.opsForValue().setIfAbsent( Boolean lockSuccess = stringRedisTemplate.opsForValue().setIfAbsent(
lockKey, "autooff", 60, TimeUnit.SECONDS lockKey, "autooff", dtuCtlLockTTL, TimeUnit.SECONDS
); );
if (lockSuccess == null || !lockSuccess) { if (lockSuccess == null || !lockSuccess) {
log.info("【自动关任务】{}功能忙锁占用跳过自动关闭deviceId={}, funcType={}", funcType, deviceId, funcType); log.info("【自动关任务】{}功能忙锁占用跳过自动关闭deviceId={}, funcType={}", funcType, deviceId, funcType);