单线程

feasure
xce 2026-01-17 02:46:12 +08:00
parent 814cad45cd
commit 36ab2096b4
2 changed files with 186 additions and 3 deletions

View File

@ -5,8 +5,10 @@ spring:
username: admin # Mosquitto共用账号 username: admin # Mosquitto共用账号
password: Admin#12345678 # Mosquitto密码 password: Admin#12345678 # Mosquitto密码
client-id: springboot-backend # 截取UUID前8位自动去横线 client-id: springboot-backend # 截取UUID前8位自动去横线
default-topic: dtu/+/up,frontend/+/control/+ # 后端监听的主题 default-topic: dtu/+/up,frontend/+/control/+,frontend/+/online # 后端监听的主题
qos: 1 # 消息可靠性 qos: 1 # 消息可靠性
timeout: 60 # 连接超时 timeout: 60 # 连接超时
keep-alive: 60 # 心跳间隔 keep-alive: 60 # 心跳间隔
auto-off-seconds: 30 #自动关延迟秒数。
latest-ttl-seconds: 120 #设备最新状态缓存的过期时间(秒)。

View File

@ -1,6 +1,8 @@
package com.agri.framework.interceptor; package com.agri.framework.interceptor;
import com.agri.framework.config.MqttConfig; import com.agri.framework.config.MqttConfig;
import com.agri.system.domain.SysAgriLimit;
import com.agri.system.service.ISysAgriLimitService;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.TypeReference; import com.alibaba.fastjson2.TypeReference;
@ -12,6 +14,7 @@ import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.SmartLifecycle; import org.springframework.context.SmartLifecycle;
import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisConnection;
@ -26,12 +29,18 @@ import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
/** /**
* MQTT * MQTT
@ -78,6 +87,14 @@ public class MqttMessageHandler implements SmartLifecycle {
@Value("${spring.mqtt.default-topic}") @Value("${spring.mqtt.default-topic}")
private String defaultTopic; private String defaultTopic;
// 新增:自动关延迟秒数(固定多少秒-n秒
@Value("${spring.mqtt.auto-off-seconds:30}")
private int autoOffSeconds;
// 新增最新状态缓存TTL设备每10秒上报一次缓存一小段时间即可
@Value("${spring.mqtt.latest-ttl-seconds:120}")
private int latestTtlSeconds;
// 优化统一使用SLF4J日志JDK 8兼容 // 优化统一使用SLF4J日志JDK 8兼容
private static final Logger log = LoggerFactory.getLogger(MqttMessageHandler.class); private static final Logger log = LoggerFactory.getLogger(MqttMessageHandler.class);
@ -90,6 +107,29 @@ public class MqttMessageHandler implements SmartLifecycle {
@Resource @Resource
private MqttConnectOptions mqttConnectOptions; private MqttConnectOptions mqttConnectOptions;
// 新增:自动关任务线程池(单线程,避免并发执行)
private final ScheduledExecutorService autoOffExecutor = Executors.newSingleThreadScheduledExecutor();
// 新增:同设备同功能只保留最后一次自动关任务
private final ConcurrentHashMap<String, ScheduledFuture<?>> autoOffFutureMap = new ConcurrentHashMap<>();
@Autowired
private ISysAgriLimitService agriLimitService;
// 初始化映射(建议放在类初始化块/构造方法中,只初始化一次)
private static final Map<String, Function<SysAgriLimit, Integer>> LIMIT_MAP = new HashMap<>();
static {
LIMIT_MAP.put("jm1g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1gLimit())));
LIMIT_MAP.put("jm2g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2gLimit())));
LIMIT_MAP.put("jbg", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbgLimit())));
LIMIT_MAP.put("jm3g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3gLimit())));
LIMIT_MAP.put("jm2k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2kLimit())));
LIMIT_MAP.put("jm3k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3kLimit())));
LIMIT_MAP.put("jbk", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbkLimit())));
LIMIT_MAP.put("jm1k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1kLimit())));
}
/** /**
* + * +
* @PostConstructSmartLifecyclestart() * @PostConstructSmartLifecyclestart()
@ -234,12 +274,21 @@ public class MqttMessageHandler implements SmartLifecycle {
// 第二步:判断是否为设备回执({"suc":true/false,"prop":{"功能码":指令}} // 第二步:判断是否为设备回执({"suc":true/false,"prop":{"功能码":指令}}
String funcType = null; String funcType = null;
Integer funcValue = null;
boolean isAck = false;
if (payloadObj.containsKey("suc") && payloadObj.containsKey("prop")) { if (payloadObj.containsKey("suc") && payloadObj.containsKey("prop")) {
isAck = true;
JSONObject propObj = payloadObj.getJSONObject("prop"); JSONObject propObj = payloadObj.getJSONObject("prop");
if (propObj != null && !propObj.isEmpty()) { if (propObj != null && !propObj.isEmpty()) {
// 提取prop中的第一个功能码 // 提取prop中的第一个功能码
Map.Entry<String, Object> propEntry = propObj.entrySet().iterator().next(); Map.Entry<String, Object> propEntry = propObj.entrySet().iterator().next();
funcType = propEntry.getKey(); funcType = propEntry.getKey();
try {
funcValue = Integer.parseInt(String.valueOf(propEntry.getValue()));
} catch (Exception ignore) {
}
// 释放对应功能的分布式锁 // 释放对应功能的分布式锁
String lockKey = "lock:" + deviceId + ":" + funcType; String lockKey = "lock:" + deviceId + ":" + funcType;
Boolean delete = stringRedisTemplate.delete(lockKey); Boolean delete = stringRedisTemplate.delete(lockKey);
@ -248,6 +297,23 @@ public class MqttMessageHandler implements SmartLifecycle {
} }
log.info("【设备回执】设备{}的{}功能执行完成,已释放锁:{},{}", deviceId, funcType, lockKey, delete); log.info("【设备回执】设备{}的{}功能执行完成,已释放锁:{},{}", deviceId, funcType, lockKey, delete);
// 新增:回执固定是{"suc":true,"prop":{"jm1k":1}}
boolean suc = payloadObj.getBooleanValue("suc");
if (suc && StringUtils.hasText(funcType) && funcValue != null && funcValue == 1) {
SysAgriLimit agriLimit = agriLimitService.lambdaQuery()
.eq(SysAgriLimit::getImei, deviceId)
.one();
int autoOffSeconds = 0;
if (agriLimit != null) {
autoOffSeconds = LIMIT_MAP.getOrDefault(funcType, k -> 0).apply(agriLimit);
}
// 新增:回执成功且值=1表示运行/开启)时,起个任务,固定多少秒-n秒
scheduleAutoOff(deviceId, funcType, autoOffSeconds);
}
// 广播回执结果给所有订阅该设备的前端 // 广播回执结果给所有订阅该设备的前端
// String broadcastTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";; // String broadcastTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";;
// JSONObject ackPayload = new JSONObject(); // JSONObject ackPayload = new JSONObject();
@ -259,6 +325,26 @@ public class MqttMessageHandler implements SmartLifecycle {
} }
} }
// 新增设备每10秒上报一次状态包写入latest供自动关任务读取只在需要时写减少消耗
if (!isAck) {
boolean needWriteLatest = false;
// 1) 有人订阅时才写latest
Set<String> subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId);
if (subscribedClients != null && !subscribedClients.isEmpty()) {
needWriteLatest = true;
}
// 2) 或者存在该设备的自动关任务时也写latest任务需要最新状态判断
if (!needWriteLatest && hasAutoOffTask(deviceId)) {
needWriteLatest = true;
}
if (needWriteLatest) {
stringRedisTemplate.opsForValue().set("device:latest:" + deviceId, payload, latestTtlSeconds, TimeUnit.SECONDS);
}
}
// 非回执消息:正常转发给订阅前端 // 非回执消息:正常转发给订阅前端
// if (!isDeviceAck) { // if (!isDeviceAck) {
// 查询Redis中订阅该设备的前端列表sub:{deviceId} // 查询Redis中订阅该设备的前端列表sub:{deviceId}
@ -281,6 +367,90 @@ public class MqttMessageHandler implements SmartLifecycle {
// } // }
} }
// 新增:是否存在该设备的自动关任务
private boolean hasAutoOffTask(String deviceId) {
if (!StringUtils.hasText(deviceId)) {
return false;
}
String prefix = "autooff:" + deviceId + ":";
for (String key : autoOffFutureMap.keySet()) {
if (key != null && key.startsWith(prefix)) {
ScheduledFuture<?> f = autoOffFutureMap.get(key);
if (f != null && !f.isCancelled() && !f.isDone()) {
return true;
}
}
}
return false;
}
// 新增:起个任务,固定多少秒-n秒【监听最新的设备状态如果还在运行】发送设备关的指令
private void scheduleAutoOff(String deviceId, String funcType, int delaySeconds) {
if (!StringUtils.hasText(deviceId) || !StringUtils.hasText(funcType) || delaySeconds <= 0) {
return;
}
String taskKey = "autooff:" + deviceId + ":" + funcType;
// 同设备同功能只保留最后一次
ScheduledFuture<?> old = autoOffFutureMap.remove(taskKey);
if (old != null) {
old.cancel(false);
}
ScheduledFuture<?> future = autoOffExecutor.schedule(() -> {
try {
runAutoOff(deviceId, funcType);
} catch (Exception e) {
log.error("【自动关任务】执行失败deviceId={}, funcType={}", deviceId, funcType, e);
} finally {
autoOffFutureMap.remove(taskKey);
}
}, delaySeconds, TimeUnit.SECONDS);
autoOffFutureMap.put(taskKey, future);
log.info("【自动关任务】已创建deviceId={}, funcType={}, delay={}s", deviceId, funcType, delaySeconds);
}
// 新增读取最新状态device:latest:{deviceId}若仍为1则下发 {"funcType":0} 到 dtu/{id}/down
private void runAutoOff(String deviceId, String funcType) throws MqttException {
String latest = stringRedisTemplate.opsForValue().get("device:latest:" + deviceId);
if (!StringUtils.hasText(latest)) {
log.warn("【自动关任务】无最新状态跳过deviceId={}, funcType={}", deviceId, funcType);
return;
}
JSONObject latestObj;
try {
latestObj = JSON.parseObject(latest);
} catch (Exception e) {
log.warn("【自动关任务】最新状态JSON解析失败跳过deviceId={}, funcType={}", deviceId, funcType);
return;
}
if (latestObj == null || latestObj.isEmpty()) {
return;
}
// 设备每10秒上报的状态包{"jm1k":0/1,...} 顶层字段直接取
Integer current = null;
try {
if (latestObj.containsKey(funcType)) {
current = latestObj.getIntValue(funcType);
}
} catch (Exception ignore) {
}
if (current != null && current == 1) {
JSONObject down = new JSONObject();
down.put(funcType, 0);
String deviceTopic = "dtu/" + deviceId + "/down";
mqttMessageSender.publish(deviceTopic, down.toJSONString());
log.info("【自动关任务】检测仍在运行已下发关闭deviceId={}, funcType={}, payload={}", deviceId, funcType, down.toJSONString());
} else {
log.info("【自动关任务】检测未运行或状态未知跳过关闭deviceId={}, funcType={}, current={}", deviceId, funcType, current);
}
}
/** /**
* ++ * ++
*/ */
@ -504,7 +674,7 @@ public class MqttMessageHandler implements SmartLifecycle {
/** /**
* MQTTclientclient * MQTTclientclient
*/ */
public synchronized String manualReconnect() { public synchronized String manualReconnect() {
isRunning.set(true); isRunning.set(true);
try { try {
// 强制断开旧连接(如果存在) // 强制断开旧连接(如果存在)
@ -560,6 +730,17 @@ public class MqttMessageHandler implements SmartLifecycle {
// 修复JDK 8正确的compareAndSet写法无命名参数 // 修复JDK 8正确的compareAndSet写法无命名参数
if (isRunning.compareAndSet(true, false)) { if (isRunning.compareAndSet(true, false)) {
try { try {
// 新增:关闭自动关任务线程池,避免线程泄漏
autoOffExecutor.shutdown();
try {
if (!autoOffExecutor.awaitTermination(3, TimeUnit.SECONDS)) {
autoOffExecutor.shutdownNow();
}
} catch (InterruptedException ignore) {
autoOffExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
if (mqttClient != null) { if (mqttClient != null) {
// 注意disconnect 只在已连接时调用close 尽量无条件释放资源 // 注意disconnect 只在已连接时调用close 尽量无条件释放资源
if (mqttClient.isConnected()) { if (mqttClient.isConnected()) {