diff --git a/agri-admin/src/main/resources/application-mqtt.yml b/agri-admin/src/main/resources/application-mqtt.yml index e654987..6de3ecd 100644 --- a/agri-admin/src/main/resources/application-mqtt.yml +++ b/agri-admin/src/main/resources/application-mqtt.yml @@ -9,7 +9,6 @@ spring: qos: 1 # 消息可靠性 timeout: 60 # 连接超时 keep-alive: 60 # 心跳间隔 - auto-off-seconds: 30 #自动关延迟秒数。 latest-ttl-seconds: 120 #设备最新状态缓存的过期时间(秒)。 # 自动关闭任务线程池大小 auto-off-thread-pool-size: 5 \ No newline at end of file diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java index 81ac89b..cfde2de 100644 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java @@ -40,9 +40,10 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -797,12 +798,23 @@ public class MqttMessageHandler implements SmartLifecycle { if (isRunning.compareAndSet(false, true)) { try { // 初始化多线程池(固定线程数) - autoOffExecutor = Executors.newScheduledThreadPool(autoOffThreadPoolSize, r -> { - Thread thread = new Thread(r); - thread.setName("auto-off-task-" + thread.getId()); - thread.setDaemon(true); // 设置为守护线程,不阻塞JVM退出 - return thread; - }); + autoOffExecutor = new ScheduledThreadPoolExecutor( + autoOffThreadPoolSize, // 核心线程数 + r -> { + Thread thread = new Thread(r); + thread.setName("auto-off-task-" + thread.getId()); + thread.setDaemon(true); // 设置为守护线程,不阻塞JVM退出 + return thread; + }, + new ThreadPoolExecutor.CallerRunsPolicy() // 队列压力或关闭时兜底不丢任务 + ); + + // 关键优化1:取消任务后立即从队列移除,避免队列堆积 + ((ScheduledThreadPoolExecutor) autoOffExecutor).setRemoveOnCancelPolicy(true); + + // 关键优化2:允许核心线程超时回收,空闲时省资源 + ((ScheduledThreadPoolExecutor) autoOffExecutor).setKeepAliveTime(60, TimeUnit.SECONDS); + ((ScheduledThreadPoolExecutor) autoOffExecutor).allowCoreThreadTimeOut(true); // 核心修改:无论是否已连接,都执行订阅 subscribeTopics();