多线程

feasure
xce 2026-01-17 03:01:11 +08:00
parent 36ab2096b4
commit 45961455eb
2 changed files with 66 additions and 79 deletions

View File

@ -11,4 +11,5 @@ spring:
keep-alive: 60 # 心跳间隔
auto-off-seconds: 30 #自动关延迟秒数。
latest-ttl-seconds: 120 #设备最新状态缓存的过期时间(秒)。
# 自动关闭任务线程池大小
auto-off-thread-pool-size: 5

View File

@ -51,16 +51,7 @@ import java.util.function.Function;
* 4. ++
* JDK 8
*
*
*
*
* "frontend/" + clientId + "/dtu/" + deviceId + "/listener"
* frontend/+/control/+
* <p>
* A
* 1) new MqttClient mqttMessageSender client
* 2) MqttConnectOptions#setAutomaticReconnect(true) Paho
* 3) unsubscribe cleanSession=false
* 线
*/
@Component
public class MqttMessageHandler implements SmartLifecycle {
@ -95,6 +86,10 @@ public class MqttMessageHandler implements SmartLifecycle {
@Value("${spring.mqtt.latest-ttl-seconds:120}")
private int latestTtlSeconds;
// 新增:自动关闭任务线程池核心线程数(可配置)
@Value("${spring.mqtt.auto-off-thread-pool-size:5}")
private int autoOffThreadPoolSize;
// 优化统一使用SLF4J日志JDK 8兼容
private static final Logger log = LoggerFactory.getLogger(MqttMessageHandler.class);
@ -107,8 +102,9 @@ public class MqttMessageHandler implements SmartLifecycle {
@Resource
private MqttConnectOptions mqttConnectOptions;
// 新增:自动关任务线程池(单线程,避免并发执行)
private final ScheduledExecutorService autoOffExecutor = Executors.newSingleThreadScheduledExecutor();
// 改造:将单线程池改为固定线程池,支持多任务并行执行
// 替代原有的 Executors.newSingleThreadScheduledExecutor()
private ScheduledExecutorService autoOffExecutor;
// 新增:同设备同功能只保留最后一次自动关任务
private final ConcurrentHashMap<String, ScheduledFuture<?>> autoOffFutureMap = new ConcurrentHashMap<>();
@ -116,7 +112,6 @@ public class MqttMessageHandler implements SmartLifecycle {
@Autowired
private ISysAgriLimitService agriLimitService;
// 初始化映射(建议放在类初始化块/构造方法中,只初始化一次)
private static final Map<String, Function<SysAgriLimit, Integer>> LIMIT_MAP = new HashMap<>();
static {
@ -173,7 +168,6 @@ public class MqttMessageHandler implements SmartLifecycle {
*/
@Override
public void connectionLost(Throwable cause) {
// 优化替换System.err为log.error
log.error("【MQTT连接异常】连接断开clientId{},原因:{}",
safeClientId(), (cause == null ? "unknown" : cause.getMessage()), cause);
@ -202,7 +196,6 @@ public class MqttMessageHandler implements SmartLifecycle {
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// 优化替换System.out为log.info增加空值校验
if (token != null && token.getTopics() != null && token.getTopics().length > 0) {
log.info("【MQTT确认】消息发布完成clientId{},主题:{}", safeClientId(), token.getTopics()[0]);
}
@ -235,7 +228,6 @@ public class MqttMessageHandler implements SmartLifecycle {
*/
private void handleMessage(String topic, String payload) {
try {
// 优化替换System.out为log.info
// log.info("【MQTT接收】topic={}, payload={}", topic, payload);
// 设备状态主题dtu/{deviceId}/up
@ -247,7 +239,6 @@ public class MqttMessageHandler implements SmartLifecycle {
handleFrontendControl(topic, payload);
}
} catch (Exception e) {
// 优化替换System.err为log.error打印完整堆栈
log.error("【MQTT消息处理异常】topic={}", topic, e);
}
}
@ -297,6 +288,7 @@ public class MqttMessageHandler implements SmartLifecycle {
}
log.info("【设备回执】设备{}的{}功能执行完成,已释放锁:{},{}", deviceId, funcType, lockKey, delete);
// 回执成功且值=1表示运行/开启)时,起个任务,固定多少秒-n秒
// 新增:回执固定是{"suc":true,"prop":{"jm1k":1}}
boolean suc = payloadObj.getBooleanValue("suc");
if (suc && StringUtils.hasText(funcType) && funcValue != null && funcValue == 1) {
@ -310,22 +302,14 @@ public class MqttMessageHandler implements SmartLifecycle {
autoOffSeconds = LIMIT_MAP.getOrDefault(funcType, k -> 0).apply(agriLimit);
}
// 启动自动关闭任务(多线程执行)
// 新增:回执成功且值=1表示运行/开启)时,起个任务,固定多少秒-n秒
scheduleAutoOff(deviceId, funcType, autoOffSeconds);
}
// 广播回执结果给所有订阅该设备的前端
// String broadcastTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";;
// JSONObject ackPayload = new JSONObject();
// ackPayload.put("deviceId", deviceId);
// ackPayload.put("funcType", funcType);
// ackPayload.put("suc", payloadObj.getBooleanValue("suc"));
// ackPayload.put("code", propEntry.getValue());
// mqttMessageSender.publish(broadcastTopic, ackPayload.toJSONString());
}
}
// 新增:设备每10秒上报一次状态包写入latest供自动关任务读取只在需要时写减少消耗
// 设备每10秒上报一次状态包写入latest供自动关任务读取 (只在需要时写,减少消耗)
if (!isAck) {
boolean needWriteLatest = false;
@ -346,7 +330,6 @@ public class MqttMessageHandler implements SmartLifecycle {
}
// 非回执消息:正常转发给订阅前端
// if (!isDeviceAck) {
// 查询Redis中订阅该设备的前端列表sub:{deviceId}
Set<String> subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId);
@ -357,14 +340,12 @@ public class MqttMessageHandler implements SmartLifecycle {
String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";
// 发布消息
mqttMessageSender.publish(frontendTopic, payload);
// 优化替换System.out为log.info
// log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic);
}
} else {
// 优化替换System.out为log.info
// log.info("【设备状态转发】设备{}无订阅前端,跳过转发", deviceId);
}
// }
}
// 新增:是否存在该设备的自动关任务
@ -384,33 +365,39 @@ public class MqttMessageHandler implements SmartLifecycle {
return false;
}
// 新增:起个任务,固定多少秒-n秒【监听最新的设备状态如果还在运行】发送设备关的指令
// 改造:多线程执行自动关闭任务
// 起个任务,固定多少秒-n秒【监听最新的设备状态如果还在运行】发送设备关的指令
private void scheduleAutoOff(String deviceId, String funcType, int delaySeconds) {
if (!StringUtils.hasText(deviceId) || !StringUtils.hasText(funcType) || delaySeconds <= 0) {
return;
}
String taskKey = "autooff:" + deviceId + ":" + funcType;
// 同设备同功能只保留最后一次
ScheduledFuture<?> old = autoOffFutureMap.remove(taskKey);
if (old != null) {
old.cancel(false);
// 同设备同功能只保留最后一次任务(先取消旧任务)
ScheduledFuture<?> oldFuture = autoOffFutureMap.remove(taskKey);
if (oldFuture != null) {
oldFuture.cancel(false);
log.debug("【自动关任务】取消旧任务:{}", taskKey);
}
ScheduledFuture<?> future = autoOffExecutor.schedule(() -> {
// 使用多线程池提交任务
ScheduledFuture<?> newFuture = autoOffExecutor.schedule(() -> {
try {
runAutoOff(deviceId, funcType);
} catch (Exception e) {
log.error("【自动关任务】执行失败deviceId={}, funcType={}", deviceId, funcType, e);
} finally {
// 任务执行完成后移除映射
autoOffFutureMap.remove(taskKey);
}
}, delaySeconds, TimeUnit.SECONDS);
autoOffFutureMap.put(taskKey, future);
log.info("【自动关任务】已创建deviceId={}, funcType={}, delay={}s", deviceId, funcType, delaySeconds);
// 保存新任务的引用
autoOffFutureMap.put(taskKey, newFuture);
log.info("【自动关任务】已创建多线程deviceId={}, funcType={}, delay={}s", deviceId, funcType, delaySeconds);
}
// 自动关闭任务的核心逻辑(无改动)
// 新增读取最新状态device:latest:{deviceId}若仍为1则下发 {"funcType":0} 到 dtu/{id}/down
private void runAutoOff(String deviceId, String funcType) throws MqttException {
String latest = stringRedisTemplate.opsForValue().get("device:latest:" + deviceId);
@ -490,7 +477,6 @@ public class MqttMessageHandler implements SmartLifecycle {
if (!checkPermission(clientId, deviceId)) {
String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";
mqttMessageSender.publish(errorTopic, "{\"msg\":\"无设备操作权限\"}");
// 优化替换System.err为log.warn
log.warn("【权限校验】前端{}操作设备{}失败", clientId, deviceId);
return;
}
@ -503,7 +489,6 @@ public class MqttMessageHandler implements SmartLifecycle {
if (lockSuccess == null || !lockSuccess) {
String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";
mqttMessageSender.publish(errorTopic, "{\"msg\":\"设备" + funcType + "功能忙,请稍后重试\"}");
// 优化替换System.err为log.warn
log.warn("【分布式锁】前端{}操作设备{}的{}功能失败", clientId, deviceId, funcType);
return;
}
@ -516,7 +501,6 @@ public class MqttMessageHandler implements SmartLifecycle {
String deviceTopic = "dtu/" + deviceId + "/down";
//todo
// mqttMessageSender.publish(deviceTopic, payload);
// 优化替换System.out为log.info
log.info("【指令转发】前端{} → 设备{}的{}功能", clientId, deviceId, funcType);
}
@ -532,8 +516,7 @@ public class MqttMessageHandler implements SmartLifecycle {
*/
private boolean checkPermission(String clientId, String deviceId) {
// 管理员权限clientId以admin_开头
// 普通用户权限校验Redis中是否绑定该设备校验Redis中user_device:{clientId}是否包含该设备ID
// 普通用户权限校验Redis中是否绑定该设备
return Boolean.TRUE;
}
@ -541,7 +524,6 @@ public class MqttMessageHandler implements SmartLifecycle {
* Controller
*/
public void subscribeDevice(String clientId, String deviceId) {
// 新增入参非空校验JDK 8兼容
if (!StringUtils.hasText(clientId) || !StringUtils.hasText(deviceId)) {
log.error("【订阅管理】clientId或deviceId不能为空");
throw new IllegalArgumentException("clientId和deviceId不能为空");
@ -549,21 +531,8 @@ public class MqttMessageHandler implements SmartLifecycle {
// 保存订阅关系到Redis
stringRedisTemplate.opsForSet().add("sub:" + deviceId, clientId);
stringRedisTemplate.opsForSet().add("subc:" + clientId, deviceId); // 新增:反向索引
// 优化替换System.out为log.info
stringRedisTemplate.opsForSet().add("subc:" + clientId, deviceId);
log.info("【订阅管理】前端{}订阅设备{}成功", clientId, deviceId);
// 推送设备最新状态(可选)
// String latestStatus = stringRedisTemplate.opsForValue().get("device:latest:" + deviceId);
// if (latestStatus != null) {
// String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";
// try {
// mqttMessageSender.publish(frontendTopic, latestStatus);
// } catch (MqttException e) {
// // 优化替换System.err为log.error
// log.error("【订阅推送】设备{}状态推送失败", deviceId, e);
// }
// }
}
/**
@ -574,7 +543,6 @@ public class MqttMessageHandler implements SmartLifecycle {
* @param deviceId ID
*/
public void unsubscribeDevice(String clientId, String deviceId) {
// 新增入参非空校验JDK 8兼容
if (!StringUtils.hasText(clientId) || !StringUtils.hasText(deviceId)) {
log.error("【前端取消订阅】clientId或deviceId不能为空");
throw new IllegalArgumentException("clientId和deviceId不能为空");
@ -582,8 +550,7 @@ public class MqttMessageHandler implements SmartLifecycle {
// 从Redis删除订阅关系
stringRedisTemplate.opsForSet().remove("sub:" + deviceId, clientId);
stringRedisTemplate.opsForSet().remove("subc:" + clientId, deviceId); // 新增:反向索引
// 优化替换System.out为log.info
stringRedisTemplate.opsForSet().remove("subc:" + clientId, deviceId);
log.info("【前端取消订阅】前端{}取消订阅设备{}成功", clientId, deviceId);
}
@ -594,7 +561,6 @@ public class MqttMessageHandler implements SmartLifecycle {
* @return MQTT
*/
public List<String> unsubscribeAllDevice(String clientId) {
// 新增入参非空校验JDK 8兼容
if (!StringUtils.hasText(clientId)) {
log.error("【批量取消】clientId不能为空");
throw new IllegalArgumentException("clientId不能为空");
@ -643,7 +609,7 @@ public class MqttMessageHandler implements SmartLifecycle {
return frontendTopics;
}
// 新增:生产环境用Scan替代Keys避免Redis阻塞JDK 8兼容
// 生产环境用Scan替代Keys避免Redis阻塞
private Set<String> scanRedisKeys(String pattern) {
Set<String> keys = new HashSet<>();
try {
@ -711,9 +677,17 @@ public class MqttMessageHandler implements SmartLifecycle {
log.info("开始监听");
if (isRunning.compareAndSet(false, true)) {
try {
// 核心修改:无论是否已连接,都执行订阅(设置回调+订阅主题)
// 初始化多线程池(固定线程数)
autoOffExecutor = Executors.newScheduledThreadPool(autoOffThreadPoolSize, r -> {
Thread thread = new Thread(r);
thread.setName("auto-off-task-" + thread.getId());
thread.setDaemon(true); // 设置为守护线程不阻塞JVM退出
return thread;
});
// 核心修改:无论是否已连接,都执行订阅
subscribeTopics();
log.info("【MQTT生命周期】客户端启动成功已设置回调+订阅主题)");
log.info("【MQTT生命周期】客户端启动成功已设置回调+订阅主题),自动关闭任务线程池大小:{}", autoOffThreadPoolSize);
} catch (MqttException e) {
log.error("【MQTT生命周期】客户端启动失败", e);
isRunning.set(false);
@ -722,27 +696,39 @@ public class MqttMessageHandler implements SmartLifecycle {
}
/**
* MQTTSpring/
* /
* MQTT
* 线
*/
@Override
public void stop() {
// 修复JDK 8正确的compareAndSet写法无命名参数
if (isRunning.compareAndSet(true, false)) {
try {
// 新增:关闭自动关任务线程池,避免线程泄漏
autoOffExecutor.shutdown();
try {
if (!autoOffExecutor.awaitTermination(3, TimeUnit.SECONDS)) {
// 1. 取消所有未执行的自动关闭任务
for (Map.Entry<String, ScheduledFuture<?>> entry : autoOffFutureMap.entrySet()) {
entry.getValue().cancel(false);
log.debug("【自动关任务】取消任务:{}", entry.getKey());
}
autoOffFutureMap.clear();
// 2. 优雅关闭线程池
if (autoOffExecutor != null) {
autoOffExecutor.shutdown();
try {
// 等待3秒让任务完成
if (!autoOffExecutor.awaitTermination(3, TimeUnit.SECONDS)) {
// 强制关闭
autoOffExecutor.shutdownNow();
log.warn("【自动关任务】线程池强制关闭");
}
} catch (InterruptedException e) {
autoOffExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
} catch (InterruptedException ignore) {
autoOffExecutor.shutdownNow();
Thread.currentThread().interrupt();
log.info("【自动关任务】线程池已关闭");
}
// 3. 关闭MQTT客户端
if (mqttClient != null) {
// 注意disconnect 只在已连接时调用close 尽量无条件释放资源
if (mqttClient.isConnected()) {
mqttClient.disconnect();
}
@ -756,7 +742,7 @@ public class MqttMessageHandler implements SmartLifecycle {
}
/**
* JDK 8
*
*/
@Override
public void stop(Runnable callback) {