diff --git a/agri-admin/src/main/resources/application-mqtt.yml b/agri-admin/src/main/resources/application-mqtt.yml index c791b71..4b14853 100644 --- a/agri-admin/src/main/resources/application-mqtt.yml +++ b/agri-admin/src/main/resources/application-mqtt.yml @@ -12,4 +12,5 @@ spring: latest-ttl-seconds: 120 #设备最新状态缓存的过期时间(秒)。 # 自动关闭任务线程池大小 auto-off-thread-pool-size: 5 - subc-ttl-seconds: 3600 # 在线新跳ttl \ No newline at end of file + subc-ttl-seconds: 3600 # 在线新跳ttl + dtu-ctl-lock-ttl: 60 \ No newline at end of file diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceStatusHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceStatusHandler.java index 7bdf95d..7d4c870 100644 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceStatusHandler.java +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceStatusHandler.java @@ -80,14 +80,14 @@ public class DeviceStatusHandler { private static final Map> LIMIT_MAP = new HashMap<>(); private static final Set VALID_FUNC_CODES = new HashSet<>(); static { - LIMIT_MAP.put("jm1g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1gLimit()))); - LIMIT_MAP.put("jm2g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2gLimit()))); - LIMIT_MAP.put("jbg", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbgLimit()))); - LIMIT_MAP.put("jm3g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3gLimit()))); - LIMIT_MAP.put("jm2k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2kLimit()))); - LIMIT_MAP.put("jm3k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3kLimit()))); - LIMIT_MAP.put("jbk", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbkLimit()))); - LIMIT_MAP.put("jm1k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1kLimit()))); + LIMIT_MAP.put("jm1g1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1gLimit()))); + LIMIT_MAP.put("jm2g1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2gLimit()))); + LIMIT_MAP.put("jbg1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbgLimit()))); + LIMIT_MAP.put("jm3g1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3gLimit()))); + LIMIT_MAP.put("jm2k1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2kLimit()))); + LIMIT_MAP.put("jm3k1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3kLimit()))); + LIMIT_MAP.put("jbk1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbkLimit()))); + LIMIT_MAP.put("jm1k1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1kLimit()))); VALID_FUNC_CODES.add("jm1g"); VALID_FUNC_CODES.add("jm2g"); @@ -123,7 +123,7 @@ public class DeviceStatusHandler { String deviceId = extractDeviceId(topic); if (deviceId == null) return; // 转发消息 - forwardPayload(deviceId, payload); + forwardPayload(deviceId, payload,payloadObj); isStartAutoOffTask(payloadObj,deviceId,payload); @@ -204,8 +204,26 @@ public class DeviceStatusHandler { } } - private void forwardPayload(String deviceId,String payload) { + private void forwardPayload(String deviceId,String payload,JSONObject payloadObj) { try { + boolean isAck = payloadObj.containsKey("suc") && payloadObj.containsKey("prop"); + JSONObject sendObj = payloadObj; // 默认直接用原对象 + // 如果是回执,先拿 funcType + + if (isAck) { + JSONObject propObj = payloadObj.getJSONObject("prop"); + if (propObj != null && !propObj.isEmpty()) { + String funcType = propObj.entrySet().iterator().next().getKey(); + String lockKey = "lock:" + deviceId + ":" + funcType; + + // 读取锁的 value(比如 autooff / user:1001) + String lockHolder = stringRedisTemplate.opsForValue().get(lockKey); + if (lockHolder != null) { + sendObj = new JSONObject(payloadObj); // 只在需要时复制 + sendObj.put("clientId", lockHolder); + } + } + } // 非回执消息:正常转发给订阅前端 // 查询Redis中订阅该设备的前端列表:sub:{deviceId} Set subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId); @@ -234,7 +252,7 @@ public class DeviceStatusHandler { // 前端专属主题:frontend/{clientId}/dtu/{deviceId}/listener String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; // 发布消息 - mqttMessageSender.publish(frontendTopic, payload); + mqttMessageSender.publish(frontendTopic, sendObj.toJSONString()); // log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic); } // 删掉设备对应的客户端 diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/FrontendControlHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/FrontendControlHandler.java index 59a94c9..7d566c0 100644 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/FrontendControlHandler.java +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/FrontendControlHandler.java @@ -10,6 +10,7 @@ import org.eclipse.paho.client.mqttv3.MqttException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @@ -55,16 +56,18 @@ public class FrontendControlHandler { @Autowired private ISysAgriLimitService agriLimitService; + @Value("${dtu-ctl-lock-ttl}") + private int dtuCtlLockTTL; private static final Map> LIMIT_MAP = new HashMap<>(); static { - LIMIT_MAP.put("jm1g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1gLimit()))); - LIMIT_MAP.put("jm2g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2gLimit()))); - LIMIT_MAP.put("jbg", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbgLimit()))); - LIMIT_MAP.put("jm3g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3gLimit()))); - LIMIT_MAP.put("jm2k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2kLimit()))); - LIMIT_MAP.put("jm3k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3kLimit()))); - LIMIT_MAP.put("jbk", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbkLimit()))); - LIMIT_MAP.put("jm1k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1kLimit()))); + LIMIT_MAP.put("jm1g1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1gLimit()))); + LIMIT_MAP.put("jm2g1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2gLimit()))); + LIMIT_MAP.put("jbg1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbgLimit()))); + LIMIT_MAP.put("jm3g1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3gLimit()))); + LIMIT_MAP.put("jm2k1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2kLimit()))); + LIMIT_MAP.put("jm3k1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3kLimit()))); + LIMIT_MAP.put("jbk1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbkLimit()))); + LIMIT_MAP.put("jm1k1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1kLimit()))); } /** * 处理前端控制指令:权限校验+分布式锁+转发给设备 @@ -112,7 +115,7 @@ public class FrontendControlHandler { // 2. 分布式锁:设备ID+功能类型(避免同设备同功能并发控制) String lockKey = "lock:" + deviceId + ":" + funcType; Boolean lockSuccess = stringRedisTemplate.opsForValue().setIfAbsent( - lockKey, clientId, 60, TimeUnit.SECONDS // 延长至15秒,适配设备回执场景 + lockKey, clientId, dtuCtlLockTTL, TimeUnit.SECONDS // 延长至15秒,适配设备回执场景 ); if (lockSuccess == null || !lockSuccess) { String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; diff --git a/agri-framework/src/main/java/com/agri/framework/manager/MqttAutoOffManager.java b/agri-framework/src/main/java/com/agri/framework/manager/MqttAutoOffManager.java index 9aadbd3..daca774 100644 --- a/agri-framework/src/main/java/com/agri/framework/manager/MqttAutoOffManager.java +++ b/agri-framework/src/main/java/com/agri/framework/manager/MqttAutoOffManager.java @@ -63,6 +63,10 @@ public class MqttAutoOffManager { // 新增:自动关闭任务线程池核心线程数(可配置) @Value("${spring.mqtt.auto-off-thread-pool-size:5}") private int autoOffThreadPoolSize; + + @Value("${dtu-ctl-lock-ttl}") + private int dtuCtlLockTTL; + /** * 初始化自动关任务线程池 * @param corePoolSize 核心线程数 @@ -228,7 +232,7 @@ public class MqttAutoOffManager { // 新增:自动关也走分布式锁(避免与前端并发控制同一功能导致乱序/互相覆盖) String lockKey = "lock:" + deviceId + ":" + funcType; Boolean lockSuccess = stringRedisTemplate.opsForValue().setIfAbsent( - lockKey, "autooff", 60, TimeUnit.SECONDS + lockKey, "autooff", dtuCtlLockTTL, TimeUnit.SECONDS ); if (lockSuccess == null || !lockSuccess) { log.info("【自动关任务】{}功能忙(锁占用),跳过自动关闭:deviceId={}, funcType={}", funcType, deviceId, funcType);