修改多线程

feasure
xce 2026-01-18 17:28:58 +08:00
parent d45b68047a
commit d6cfe99710
2 changed files with 19 additions and 8 deletions

View File

@ -9,7 +9,6 @@ spring:
qos: 1 # 消息可靠性 qos: 1 # 消息可靠性
timeout: 60 # 连接超时 timeout: 60 # 连接超时
keep-alive: 60 # 心跳间隔 keep-alive: 60 # 心跳间隔
auto-off-seconds: 30 #自动关延迟秒数。
latest-ttl-seconds: 120 #设备最新状态缓存的过期时间(秒)。 latest-ttl-seconds: 120 #设备最新状态缓存的过期时间(秒)。
# 自动关闭任务线程池大小 # 自动关闭任务线程池大小
auto-off-thread-pool-size: 5 auto-off-thread-pool-size: 5

View File

@ -40,9 +40,10 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function; import java.util.function.Function;
@ -797,12 +798,23 @@ public class MqttMessageHandler implements SmartLifecycle {
if (isRunning.compareAndSet(false, true)) { if (isRunning.compareAndSet(false, true)) {
try { try {
// 初始化多线程池(固定线程数) // 初始化多线程池(固定线程数)
autoOffExecutor = Executors.newScheduledThreadPool(autoOffThreadPoolSize, r -> { autoOffExecutor = new ScheduledThreadPoolExecutor(
autoOffThreadPoolSize, // 核心线程数
r -> {
Thread thread = new Thread(r); Thread thread = new Thread(r);
thread.setName("auto-off-task-" + thread.getId()); thread.setName("auto-off-task-" + thread.getId());
thread.setDaemon(true); // 设置为守护线程不阻塞JVM退出 thread.setDaemon(true); // 设置为守护线程不阻塞JVM退出
return thread; return thread;
}); },
new ThreadPoolExecutor.CallerRunsPolicy() // 队列压力或关闭时兜底不丢任务
);
// 关键优化1取消任务后立即从队列移除避免队列堆积
((ScheduledThreadPoolExecutor) autoOffExecutor).setRemoveOnCancelPolicy(true);
// 关键优化2允许核心线程超时回收空闲时省资源
((ScheduledThreadPoolExecutor) autoOffExecutor).setKeepAliveTime(60, TimeUnit.SECONDS);
((ScheduledThreadPoolExecutor) autoOffExecutor).allowCoreThreadTimeOut(true);
// 核心修改:无论是否已连接,都执行订阅 // 核心修改:无论是否已连接,都执行订阅
subscribeTopics(); subscribeTopics();