职责拆分
parent
17f8af0901
commit
ca1bcec621
|
|
@ -3,7 +3,8 @@ package com.agri.web.controller.mqtt;
|
|||
import com.agri.common.annotation.Log;
|
||||
import com.agri.common.core.domain.AjaxResult;
|
||||
import com.agri.common.enums.BusinessType;
|
||||
import com.agri.framework.interceptor.MqttMessageHandler;
|
||||
import com.agri.framework.manager.MqttClientManager;
|
||||
import com.agri.framework.manager.MqttSubscriptionManager;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
@ -27,7 +28,9 @@ public class MqttController {
|
|||
private static final Logger log = LoggerFactory.getLogger(MqttController.class);
|
||||
|
||||
@Resource
|
||||
private MqttMessageHandler mqttMessageHandler;
|
||||
private MqttSubscriptionManager mqttSubscriptionManager;
|
||||
@Resource
|
||||
private MqttClientManager mqttClientManager;
|
||||
|
||||
/**
|
||||
* 单个订阅
|
||||
|
|
@ -36,7 +39,7 @@ public class MqttController {
|
|||
@Log(title = "订阅主题", businessType = BusinessType.INSERT)
|
||||
public String subscribe(@RequestParam String clientId, @RequestParam String deviceId) {
|
||||
try {
|
||||
mqttMessageHandler.subscribeDevice(clientId, deviceId);
|
||||
mqttSubscriptionManager.subscribeDevice(clientId, deviceId);
|
||||
return "订阅成功";
|
||||
} catch (IllegalArgumentException e) {
|
||||
log.error("MQTT单个订阅失败:{}", e.getMessage());
|
||||
|
|
@ -53,7 +56,7 @@ public class MqttController {
|
|||
@DeleteMapping("/single")
|
||||
public String unsubscribe(@RequestParam String clientId, @RequestParam String deviceId) {
|
||||
try {
|
||||
mqttMessageHandler.unsubscribeDevice(clientId, deviceId);
|
||||
mqttSubscriptionManager.unsubscribeDevice(clientId, deviceId);
|
||||
return "取消订阅成功";
|
||||
} catch (IllegalArgumentException e) {
|
||||
log.error("MQTT单个取消订阅失败:{}", e.getMessage());
|
||||
|
|
@ -72,7 +75,7 @@ public class MqttController {
|
|||
public AjaxResult subscribeAll(@RequestParam String clientId) {
|
||||
try {
|
||||
// 返回前端需要取消的MQTT主题列表
|
||||
return AjaxResult.success(mqttMessageHandler.subscribeAllDeviceByUserId(clientId));
|
||||
return AjaxResult.success(mqttSubscriptionManager.subscribeAllDeviceByUserId(clientId));
|
||||
} catch (IllegalArgumentException e) {
|
||||
log.error("MQTT批量订阅失败:{}", e.getMessage());
|
||||
// 异常时返回空列表,避免前端解析失败
|
||||
|
|
@ -91,7 +94,7 @@ public class MqttController {
|
|||
public List<String> unsubscribeAll(@RequestParam String clientId) {
|
||||
try {
|
||||
// 返回前端需要取消的MQTT主题列表
|
||||
return mqttMessageHandler.unsubscribeAllDevice(clientId);
|
||||
return mqttSubscriptionManager.unsubscribeAllDevice(clientId);
|
||||
} catch (IllegalArgumentException e) {
|
||||
log.error("MQTT批量取消订阅失败:{}", e.getMessage());
|
||||
// 异常时返回空列表,避免前端解析失败
|
||||
|
|
@ -109,7 +112,7 @@ public class MqttController {
|
|||
@Log(title = "手动触发MQTT重连", businessType = BusinessType.OTHER)
|
||||
public String manualReconnect() {
|
||||
try {
|
||||
return mqttMessageHandler.manualReconnect();
|
||||
return mqttClientManager.manualReconnect();
|
||||
} catch (Exception e) {
|
||||
log.error("MQTT手动重连异常", e);
|
||||
return "手动重连失败:" + e.getMessage();
|
||||
|
|
@ -124,7 +127,7 @@ public class MqttController {
|
|||
@Log(title = "手动触发MQTT重连", businessType = BusinessType.SELECT)
|
||||
public String getMqttStatus() {
|
||||
try {
|
||||
return mqttMessageHandler.getMqttStatus();
|
||||
return mqttClientManager.getMqttStatus();
|
||||
} catch (Exception e) {
|
||||
log.error("查询MQTT连接状态异常", e);
|
||||
return "查询状态失败:" + e.getMessage();
|
||||
|
|
|
|||
|
|
@ -75,7 +75,7 @@ spring:
|
|||
# 热部署开关
|
||||
enabled: true
|
||||
# 禁用 MQTT 的热部署
|
||||
exclude: com/agri/framework/config/MqttConfig.class,com/agri/framework/interceptor/MqttMessageHandler.class
|
||||
# exclude: com/agri/framework/config/MqttConfig.class,com/agri/framework/interceptor/MqttMessageHandler.class
|
||||
# redis 配置
|
||||
redis:
|
||||
# 地址
|
||||
|
|
|
|||
|
|
@ -0,0 +1,235 @@
|
|||
package com.agri.framework.interceptor;
|
||||
|
||||
import com.agri.framework.config.MqttConfig;
|
||||
import com.agri.framework.manager.MqttAutoOffManager;
|
||||
import com.agri.framework.manager.MqttSubscriptionManager;
|
||||
import com.agri.system.domain.SysAgriLimit;
|
||||
import com.agri.system.service.ISysAgriLimitService;
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* 设备状态消息处理器
|
||||
* 核心功能:
|
||||
* 1. 处理设备状态上报、设备回执消息
|
||||
* 2. 触发自动关闭任务、取消自动关闭任务
|
||||
* 3. 转发设备状态到订阅的前端
|
||||
* 4. 维护设备最新状态缓存
|
||||
* 适配JDK 8,无心跳包相关逻辑
|
||||
*/
|
||||
@Component
|
||||
public class DeviceStatusHandler {
|
||||
/**
|
||||
* 优化:统一使用SLF4J日志(JDK 8兼容)
|
||||
*/
|
||||
private static final Logger log = LoggerFactory.getLogger(DeviceStatusHandler.class);
|
||||
|
||||
/**
|
||||
* Redis模板,用于存储订阅关系、设备在线状态、分布式锁
|
||||
*/
|
||||
@Resource
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
/**
|
||||
* MQTT消息发送工具类(由MqttConfig配置类注入)
|
||||
*/
|
||||
@Resource
|
||||
private MqttConfig.MqttMessageSender mqttMessageSender;
|
||||
|
||||
/**
|
||||
* MQTT订阅关系管理器,处理批量Redis操作
|
||||
*/
|
||||
@Resource
|
||||
private MqttSubscriptionManager mqttSubscriptionManager;
|
||||
|
||||
/**
|
||||
* 自动关任务管理器,调度/取消自动关任务
|
||||
*/
|
||||
@Resource
|
||||
private MqttAutoOffManager mqttAutoOffManager;
|
||||
|
||||
/**
|
||||
* 农业限制服务,查询设备自动关延迟配置
|
||||
*/
|
||||
@Resource
|
||||
private ISysAgriLimitService agriLimitService;
|
||||
|
||||
// 新增:最新状态缓存TTL(设备每10秒上报一次,缓存一小段时间即可)
|
||||
@Value("${spring.mqtt.latest-ttl-seconds:120}")
|
||||
private int latestTtlSeconds;
|
||||
|
||||
// 初始化映射(建议放在类初始化块/构造方法中,只初始化一次)
|
||||
private static final Map<String, Function<SysAgriLimit, Integer>> LIMIT_MAP = new HashMap<>();
|
||||
private static final Set<String> VALID_FUNC_CODES = new HashSet<>();
|
||||
static {
|
||||
LIMIT_MAP.put("jm1g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1gLimit())));
|
||||
LIMIT_MAP.put("jm2g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2gLimit())));
|
||||
LIMIT_MAP.put("jbg", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbgLimit())));
|
||||
LIMIT_MAP.put("jm3g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3gLimit())));
|
||||
LIMIT_MAP.put("jm2k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2kLimit())));
|
||||
LIMIT_MAP.put("jm3k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3kLimit())));
|
||||
LIMIT_MAP.put("jbk", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbkLimit())));
|
||||
LIMIT_MAP.put("jm1k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1kLimit())));
|
||||
|
||||
VALID_FUNC_CODES.add("jm1g");
|
||||
VALID_FUNC_CODES.add("jm2g");
|
||||
VALID_FUNC_CODES.add("jbg");
|
||||
VALID_FUNC_CODES.add("jm3g");
|
||||
VALID_FUNC_CODES.add("jm2k");
|
||||
VALID_FUNC_CODES.add("jm3k");
|
||||
VALID_FUNC_CODES.add("jbk");
|
||||
VALID_FUNC_CODES.add("jm1k");
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理设备状态:转发给订阅的前端、处理回执、触发自动关
|
||||
*/
|
||||
public void handle(String topic, String payload) throws MqttException {
|
||||
// 第一步:解析JSON,非有效JSON直接return
|
||||
JSONObject payloadObj;
|
||||
try {
|
||||
payloadObj = JSON.parseObject(payload);
|
||||
} catch (Exception e) {
|
||||
log.error("【设备处理】JSON解析失败,payload={}", payload, e);
|
||||
return;
|
||||
}
|
||||
if (payloadObj == null || payloadObj.isEmpty()) {
|
||||
log.warn("【设备处理】JSON解析后为空,payload={}", payload);
|
||||
return;
|
||||
}
|
||||
|
||||
// log.info("【设备处理】JSON解析:{}",payloadObj);
|
||||
// 解析设备ID:主题格式为dtu/{deviceId}/up,分割后第2个元素是设备ID
|
||||
String deviceId = topic.split("/")[1];
|
||||
|
||||
// 第二步:判断是否为设备回执({"suc":true/false,"prop":{"功能码":指令}})
|
||||
String funcType = null;
|
||||
Integer funcValue = null;
|
||||
boolean isAck = false;
|
||||
// 新增:标记是否需要执行自动关任务(全局可用)
|
||||
|
||||
// 第二步:设备回执处理逻辑(完全移除Redis写入)
|
||||
if (payloadObj.containsKey("suc") && payloadObj.containsKey("prop")) {
|
||||
isAck = true;
|
||||
JSONObject propObj = payloadObj.getJSONObject("prop");
|
||||
if (propObj != null && !propObj.isEmpty()) {
|
||||
// 提取prop中的第一个功能码
|
||||
Map.Entry<String, Object> propEntry = propObj.entrySet().iterator().next();
|
||||
funcType = propEntry.getKey();
|
||||
try {
|
||||
funcValue = Integer.parseInt(String.valueOf(propEntry.getValue()));
|
||||
} catch (Exception ignore) {
|
||||
}
|
||||
|
||||
// 释放对应功能的分布式锁
|
||||
String lockKey = "lock:" + deviceId + ":" + funcType;
|
||||
Boolean delete = stringRedisTemplate.delete(lockKey);
|
||||
if (propObj.size() > 1) {
|
||||
log.warn("【设备回执】prop包含多个功能码,仅处理第一个:{}", propObj.keySet());
|
||||
}
|
||||
log.info("【设备回执】设备{}的{}功能执行完成,已释放锁:{},{}", deviceId, funcType, lockKey, delete);
|
||||
|
||||
// 回执成功且值=1时启动自动关闭任务(保留原有逻辑)
|
||||
boolean suc = payloadObj.getBooleanValue("suc");
|
||||
if (suc && StringUtils.hasText(funcType) && funcValue != null && funcValue == 1) {
|
||||
SysAgriLimit agriLimit = agriLimitService.lambdaQuery()
|
||||
.eq(SysAgriLimit::getImei, deviceId)
|
||||
.one();
|
||||
int autoOffSeconds = 0;
|
||||
if (agriLimit != null) {
|
||||
autoOffSeconds = LIMIT_MAP.getOrDefault(funcType, k -> 0).apply(agriLimit);
|
||||
}
|
||||
// 新增:判断是否真的需要执行自动关任务(延迟秒数>0才是有效任务)
|
||||
if (autoOffSeconds > 0) {
|
||||
mqttAutoOffManager.scheduleAutoOff(deviceId, funcType, autoOffSeconds);
|
||||
log.debug("【自动关任务】标记需要执行,deviceId={}, funcType={}, delay={}s", deviceId, funcType, autoOffSeconds);
|
||||
}
|
||||
}
|
||||
|
||||
if (suc && StringUtils.hasText(funcType) && funcValue != null && funcValue == 0) {
|
||||
mqttAutoOffManager.cancelAutoOff(deviceId, funcType);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 第三步:仅处理非回执的设备状态包,且仅当是8个功能码结构就写入Redis
|
||||
// 有没有人订阅都得写,只要发送设备开的指令成功了就得写
|
||||
if (!isAck) {
|
||||
// 1) 先校验状态包是否包含8个固定功能码(核心:只有这种结构才写入)
|
||||
boolean isValidStatus = true;
|
||||
for (String validCode : VALID_FUNC_CODES) {
|
||||
if (!payloadObj.containsKey(validCode)) {
|
||||
isValidStatus = false;
|
||||
// log.debug("【设备状态包】结构不合法(非8个功能码),跳过Redis写入,deviceId={};payload={}", deviceId, payload);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (mqttAutoOffManager.hasAutoOffTask(deviceId) && isValidStatus) {
|
||||
// ✅ 8个功能码状态包:无条件写device:latest:{deviceId},避免自动关读不到最新状态
|
||||
stringRedisTemplate.opsForValue().set(
|
||||
"device:latest:" + deviceId,
|
||||
payload, // 完整的8功能码JSON
|
||||
latestTtlSeconds,
|
||||
TimeUnit.SECONDS
|
||||
);
|
||||
log.debug("【设备状态包】写入Redis成功,deviceId={}", deviceId);
|
||||
}
|
||||
}
|
||||
// 非回执消息:正常转发给订阅前端
|
||||
// 查询Redis中订阅该设备的前端列表:sub:{deviceId}
|
||||
Set<String> subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId);
|
||||
|
||||
if (subscribedClients != null && !subscribedClients.isEmpty()) {
|
||||
// 推送给每个订阅的前端
|
||||
// 方案B:不再依赖online:;改为校验subc:{clientId}是否仍包含deviceId(取消订阅失败/异常退出兜底)
|
||||
List<String> clients = new ArrayList<>(subscribedClients);
|
||||
// 判断subc是否还存在 一次性查全部 获取失效的clientId
|
||||
List<Boolean> stillSubs = mqttSubscriptionManager.pipeIsMemberSubc(clients, deviceId);
|
||||
|
||||
// 关系不存在:清理sub:{deviceId}残留,避免一直给前端发
|
||||
List<String> stale = null;
|
||||
|
||||
for (int i = 0; i < clients.size(); i++) {
|
||||
String clientId = clients.get(i);
|
||||
boolean stillSub = i < stillSubs.size() && Boolean.TRUE.equals(stillSubs.get(i));
|
||||
if (!stillSub) {
|
||||
if (stale == null) {
|
||||
stale = new ArrayList<>();
|
||||
}
|
||||
// false不存在添加队列
|
||||
stale.add(clientId);
|
||||
continue;
|
||||
}
|
||||
// 前端专属主题:frontend/{clientId}/dtu/{deviceId}/listener
|
||||
String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";
|
||||
// 发布消息
|
||||
mqttMessageSender.publish(frontendTopic, payload);
|
||||
log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic);
|
||||
}
|
||||
// 删掉设备对应的客户端
|
||||
if (stale != null && !stale.isEmpty()) {
|
||||
mqttSubscriptionManager.pipeSRemSub(deviceId, stale);
|
||||
}
|
||||
} else {
|
||||
// 优化:替换System.out为log.info
|
||||
// log.info("【设备状态转发】设备{}无订阅前端,跳过转发", deviceId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,127 @@
|
|||
package com.agri.framework.interceptor;
|
||||
|
||||
import com.agri.framework.config.MqttConfig;
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.alibaba.fastjson2.TypeReference;
|
||||
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 前端控制指令处理器
|
||||
* 核心功能:
|
||||
* 1. 解析前端控制指令,校验入参合法性
|
||||
* 2. 前端操作设备权限校验
|
||||
* 3. 分布式锁控制,避免同设备同功能并发指令
|
||||
* 4. 转发前端指令到设备端
|
||||
* 适配JDK 8,无心跳包相关逻辑
|
||||
*/
|
||||
@Component
|
||||
public class FrontendControlHandler {
|
||||
/**
|
||||
* 优化:统一使用SLF4J日志(JDK 8兼容)
|
||||
*/
|
||||
private static final Logger log = LoggerFactory.getLogger(FrontendControlHandler.class);
|
||||
|
||||
/**
|
||||
* Redis模板,用于存储订阅关系、设备在线状态、分布式锁
|
||||
*/
|
||||
@Resource
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
/**
|
||||
* MQTT消息发送工具类(由MqttConfig配置类注入)
|
||||
*/
|
||||
@Resource
|
||||
private MqttConfig.MqttMessageSender mqttMessageSender;
|
||||
|
||||
/**
|
||||
* 处理前端控制指令:权限校验+分布式锁+转发给设备
|
||||
*/
|
||||
public void handle(String topic, String payload) throws MqttException {
|
||||
// 解析前端clientId、设备ID
|
||||
String[] parts = topic.split("/");
|
||||
String clientId = parts[1];
|
||||
String deviceId = parts[3];
|
||||
|
||||
// 新增:入参非空校验(JDK 8兼容)
|
||||
if (!StringUtils.hasText(clientId) || !StringUtils.hasText(deviceId)) {
|
||||
log.error("【指令处理】clientId或deviceId为空,topic={}", topic);
|
||||
return;
|
||||
}
|
||||
|
||||
// 解析功能码({"功能码":状态码}格式)
|
||||
Map<String, Integer> funcCodeMap = null;
|
||||
try {
|
||||
funcCodeMap = JSON.parseObject(payload, new TypeReference<Map<String, Integer>>() {
|
||||
});
|
||||
} catch (Exception e) {
|
||||
log.error("【指令处理】功能码解析失败,payload={}", payload, e);
|
||||
// String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";
|
||||
// mqttMessageSender.publish(errorTopic, "{\"msg\":\"指令格式错误\"}");
|
||||
return;
|
||||
}
|
||||
if (funcCodeMap == null || funcCodeMap.isEmpty()) {
|
||||
// String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";
|
||||
// mqttMessageSender.publish(errorTopic, "{\"msg\":\"功能码不能为空\"}");
|
||||
log.warn("【指令处理】前端{}操作设备{}失败:功能码为空", clientId, deviceId);
|
||||
return;
|
||||
}
|
||||
// 提取第一个功能码作为锁标识
|
||||
String funcType = funcCodeMap.keySet().iterator().next();
|
||||
|
||||
// 1. 权限校验(示例:admin开头有全权限)
|
||||
if (!checkPermission(clientId, deviceId)) {
|
||||
String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";
|
||||
mqttMessageSender.publish(errorTopic, "{\"msg\":\"无设备操作权限\"}");
|
||||
log.warn("【权限校验】前端{}操作设备{}失败", clientId, deviceId);
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 分布式锁:设备ID+功能类型(避免同设备同功能并发控制)
|
||||
String lockKey = "lock:" + deviceId + ":" + funcType;
|
||||
Boolean lockSuccess = stringRedisTemplate.opsForValue().setIfAbsent(
|
||||
lockKey, clientId, 15, TimeUnit.SECONDS // 延长至15秒,适配设备回执场景
|
||||
);
|
||||
if (lockSuccess == null || !lockSuccess) {
|
||||
String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";
|
||||
mqttMessageSender.publish(errorTopic, "{\"msg\":\"设备" + funcType + "功能忙,请稍后重试\"}");
|
||||
log.warn("【分布式锁】前端{}操作设备{}的{}功能失败", clientId, deviceId, funcType);
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 记录日志
|
||||
log.info("【指令处理】前端{}于{}控制设备{}的{}功能,指令:{}",
|
||||
clientId, LocalDateTime.now(), deviceId, funcType, payload);
|
||||
|
||||
// 4. 转发指令到设备
|
||||
String deviceTopic = "dtu/" + deviceId + "/down";
|
||||
//todo
|
||||
mqttMessageSender.publish(deviceTopic, payload);
|
||||
log.info("【指令转发】前端{} → 设备{}的{}功能", clientId, deviceId, funcType);
|
||||
}
|
||||
|
||||
/**
|
||||
* 权限校验逻辑(示例)
|
||||
* 可根据业务需求扩展:
|
||||
* 1. 管理员前端(clientId以admin_开头)拥有所有权限
|
||||
* 2. 普通前端仅能操作Redis中绑定的设备(user_device:{clientId} → deviceId集合)
|
||||
*
|
||||
* @param clientId 前端唯一标识
|
||||
* @param deviceId 设备ID
|
||||
* @return true=有权限,false=无权限
|
||||
*/
|
||||
private boolean checkPermission(String clientId, String deviceId) {
|
||||
// 管理员权限:clientId以admin_开头
|
||||
// 普通用户权限:校验Redis中是否绑定该设备
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,61 @@
|
|||
package com.agri.framework.interceptor;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 前端在线心跳处理器
|
||||
* 核心功能:
|
||||
* 1. 处理前端在线心跳消息
|
||||
* 2. 续期前端订阅关系TTL,兜底异常退出场景
|
||||
* 适配JDK 8,无心跳包相关逻辑
|
||||
*/
|
||||
@Component
|
||||
public class FrontendOnlineHandler {
|
||||
/**
|
||||
* 优化:统一使用SLF4J日志(JDK 8兼容)
|
||||
*/
|
||||
private static final Logger log = LoggerFactory.getLogger(FrontendOnlineHandler.class);
|
||||
|
||||
/**
|
||||
* Redis模板,用于存储订阅关系、设备在线状态、分布式锁
|
||||
*/
|
||||
@Resource
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
// 新增:前端订阅关系TTL(兜底“取消订阅失败/异常退出”)——只维护subc:{clientId}的TTL
|
||||
@Value("${spring.mqtt.subc-ttl-seconds:3600}")
|
||||
private int subcTtlSeconds;
|
||||
|
||||
/**
|
||||
* 新增:处理前端在线心跳:写入Redis在线标记(带TTL)
|
||||
* 主题格式:frontend/{clientId}/online
|
||||
*/
|
||||
public void handle(String topic, String payload) {
|
||||
try {
|
||||
String[] parts = topic.split("/");
|
||||
if (parts.length < 3) {
|
||||
return;
|
||||
}
|
||||
String clientId = parts[1];
|
||||
if (!StringUtils.hasText(clientId)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 续期subc:{clientId}
|
||||
stringRedisTemplate.expire("subc:" + clientId, subcTtlSeconds, TimeUnit.SECONDS);
|
||||
|
||||
// todo 生产环境不建议打印每次心跳
|
||||
// log.debug("【在线心跳】clientId={} 续期subcTTL={}s payload={}", clientId, subcTtlSeconds, payload);
|
||||
} catch (Exception e) {
|
||||
log.warn("【在线心跳】处理失败 topic={} msg={}", topic, e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
|
|
@ -0,0 +1,265 @@
|
|||
package com.agri.framework.manager;
|
||||
|
||||
import com.agri.framework.config.MqttConfig;
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 设备自动关任务管理器
|
||||
* 核心功能:
|
||||
* 1. 初始化/优雅关闭自动关任务线程池
|
||||
* 2. 调度/取消设备功能自动关闭任务
|
||||
* 3. 执行自动关任务,校验设备最新状态
|
||||
* 4. 维护设备未完成自动关任务计数
|
||||
* 改造点:自动关闭任务改为多线程并行执行
|
||||
* 适配JDK 8,无心跳包相关逻辑
|
||||
*/
|
||||
@Component
|
||||
public class MqttAutoOffManager {
|
||||
/**
|
||||
* 优化:统一使用SLF4J日志(JDK 8兼容)
|
||||
*/
|
||||
private static final Logger log = LoggerFactory.getLogger(MqttAutoOffManager.class);
|
||||
|
||||
/**
|
||||
* Redis模板,用于存储订阅关系、设备在线状态、分布式锁
|
||||
*/
|
||||
@Resource
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
/**
|
||||
* MQTT消息发送工具类(由MqttConfig配置类注入)
|
||||
*/
|
||||
@Resource
|
||||
private MqttConfig.MqttMessageSender mqttMessageSender;
|
||||
|
||||
// 改造:将单线程池改为固定线程池,支持多任务并行执行
|
||||
// 替代原有的 Executors.newSingleThreadScheduledExecutor()
|
||||
private ScheduledExecutorService autoOffExecutor;
|
||||
|
||||
// 新增:同设备同功能只保留最后一次自动关任务
|
||||
private final ConcurrentHashMap<String, ScheduledFuture<?>> autoOffFutureMap = new ConcurrentHashMap<>();
|
||||
|
||||
// 新增:按设备维度统计“未完成的自动关任务”数量,hasAutoOffTask从扫描O(N)降为O(1)
|
||||
private final ConcurrentHashMap<String, Integer> autoOffDeviceCnt = new ConcurrentHashMap<>();
|
||||
|
||||
// 新增:自动关闭任务线程池核心线程数(可配置)
|
||||
@Value("${spring.mqtt.auto-off-thread-pool-size:5}")
|
||||
private int autoOffThreadPoolSize;
|
||||
/**
|
||||
* 初始化自动关任务线程池
|
||||
* @param corePoolSize 核心线程数
|
||||
*/
|
||||
public void initExecutor(int corePoolSize) {
|
||||
// 初始化多线程池(固定线程数)
|
||||
autoOffExecutor = new ScheduledThreadPoolExecutor(
|
||||
autoOffThreadPoolSize, // 核心线程数
|
||||
r -> {
|
||||
Thread thread = new Thread(r);
|
||||
thread.setName("auto-off-task-" + thread.getId());
|
||||
thread.setDaemon(true); // 设置为守护线程,不阻塞JVM退出
|
||||
return thread;
|
||||
},
|
||||
new ThreadPoolExecutor.CallerRunsPolicy() // 队列压力或关闭时兜底不丢任务
|
||||
);
|
||||
|
||||
// 关键优化1:取消任务后立即从队列移除,避免队列堆积
|
||||
((ScheduledThreadPoolExecutor) autoOffExecutor).setRemoveOnCancelPolicy(true);
|
||||
|
||||
// 关键优化2:允许核心线程超时回收,空闲时省资源
|
||||
((ScheduledThreadPoolExecutor) autoOffExecutor).setKeepAliveTime(60, TimeUnit.SECONDS);
|
||||
((ScheduledThreadPoolExecutor) autoOffExecutor).allowCoreThreadTimeOut(true);
|
||||
|
||||
log.info("自动关任务线程池初始化完成,核心线程数={}", corePoolSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* 优雅关闭自动关任务线程池
|
||||
*/
|
||||
public void shutdownExecutor() {
|
||||
try {
|
||||
// 1. 取消所有未执行的自动关闭任务
|
||||
for (Map.Entry<String, ScheduledFuture<?>> entry : autoOffFutureMap.entrySet()) {
|
||||
entry.getValue().cancel(false);
|
||||
log.debug("【自动关任务】取消任务:{}", entry.getKey());
|
||||
}
|
||||
autoOffFutureMap.clear();
|
||||
// ✅ 停止时直接清空计数,避免残留
|
||||
autoOffDeviceCnt.clear();
|
||||
|
||||
// 2. 优雅关闭线程池
|
||||
if (autoOffExecutor != null) {
|
||||
autoOffExecutor.shutdown();
|
||||
try {
|
||||
// 等待3秒让任务完成
|
||||
if (!autoOffExecutor.awaitTermination(3, TimeUnit.SECONDS)) {
|
||||
// 强制关闭
|
||||
autoOffExecutor.shutdownNow();
|
||||
log.warn("【自动关任务】线程池强制关闭");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
autoOffExecutor.shutdownNow();
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
log.info("【自动关任务】线程池已关闭");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("【自动关任务】线程池关闭失败", e);
|
||||
}
|
||||
}
|
||||
|
||||
// 新增:是否存在该设备的自动关任务
|
||||
public boolean hasAutoOffTask(String deviceId) {
|
||||
if (!StringUtils.hasText(deviceId)) {
|
||||
return false;
|
||||
}
|
||||
// ✅ O(1)查询,无需扫描整个任务表
|
||||
Integer cnt = autoOffDeviceCnt.get(deviceId);
|
||||
return cnt != null && cnt > 0;
|
||||
}
|
||||
|
||||
// 新增:自动关任务计数 +1(只维护deviceId维度,确保hasAutoOffTask为O(1))
|
||||
private void incAutoOffCnt(String deviceId) {
|
||||
if (!StringUtils.hasText(deviceId)) {
|
||||
return;
|
||||
}
|
||||
autoOffDeviceCnt.merge(deviceId, 1, (a, b) -> a + b);
|
||||
}
|
||||
|
||||
// 新增:自动关任务计数 -1(避免负数;归零则清理key,省内存)
|
||||
private void decAutoOffCnt(String deviceId) {
|
||||
if (!StringUtils.hasText(deviceId)) {
|
||||
return;
|
||||
}
|
||||
autoOffDeviceCnt.compute(deviceId, (k, v) -> {
|
||||
if (v == null || v <= 1) {
|
||||
return null;
|
||||
}
|
||||
return v - 1;
|
||||
});
|
||||
}
|
||||
|
||||
// 改造:多线程执行自动关闭任务
|
||||
// 起个任务,固定多少秒-n秒,【监听最新的设备状态,如果还在运行】,发送设备关的指令
|
||||
public void scheduleAutoOff(String deviceId, String funcType, int delaySeconds) {
|
||||
|
||||
// ✅ 防御:避免极端情况下线程池尚未初始化导致NPE
|
||||
if (autoOffExecutor == null) {
|
||||
log.warn("【自动关任务】线程池未初始化,跳过创建任务:deviceId={}, funcType={}", deviceId, funcType);
|
||||
return;
|
||||
}
|
||||
|
||||
String taskKey = "autooff:" + deviceId + ":" + funcType;
|
||||
|
||||
cancelAutoOff(deviceId,funcType);
|
||||
|
||||
// 使用多线程池提交任务
|
||||
ScheduledFuture<?> newFuture = autoOffExecutor.schedule(() -> {
|
||||
try {
|
||||
runAutoOff(deviceId, funcType);
|
||||
} catch (Exception e) {
|
||||
log.error("【自动关任务】执行失败,deviceId={}, funcType={}", deviceId, funcType, e);
|
||||
} finally {
|
||||
// 任务执行完成后移除映射
|
||||
autoOffFutureMap.remove(taskKey);
|
||||
// ✅ 任务结束(成功/失败都算结束):减少该设备的“未完成任务数”,保证hasAutoOffTask准确
|
||||
decAutoOffCnt(deviceId);
|
||||
}
|
||||
}, delaySeconds, TimeUnit.SECONDS);
|
||||
|
||||
// 保存新任务的引用
|
||||
autoOffFutureMap.put(taskKey, newFuture);
|
||||
// ✅ 新任务创建成功:增加该设备的“未完成任务数”
|
||||
incAutoOffCnt(deviceId);
|
||||
|
||||
log.info("【自动关任务】已创建(多线程):deviceId={}, funcType={}, delay={}s", deviceId, funcType, delaySeconds);
|
||||
}
|
||||
|
||||
// 自动关闭任务的核心逻辑(无改动)
|
||||
// 新增:读取最新状态(device:latest:{deviceId}),若仍为1则下发 {"funcType":0} 到 dtu/{id}/down
|
||||
private void runAutoOff(String deviceId, String funcType) throws MqttException {
|
||||
String latest = stringRedisTemplate.opsForValue().get("device:latest:" + deviceId);
|
||||
if (!StringUtils.hasText(latest)) {
|
||||
//todo
|
||||
log.warn("【自动关任务】无最新状态,跳过:deviceId={}, funcType={}", deviceId, funcType);
|
||||
return;
|
||||
}
|
||||
|
||||
JSONObject latestObj;
|
||||
try {
|
||||
latestObj = JSON.parseObject(latest);
|
||||
} catch (Exception e) {
|
||||
log.warn("【自动关任务】最新状态JSON解析失败,跳过:deviceId={}, funcType={}", deviceId, funcType);
|
||||
return;
|
||||
}
|
||||
if (latestObj == null || latestObj.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 设备每10秒上报的状态包:{"jm1k":0/1,...} 顶层字段直接取
|
||||
Integer current = null;
|
||||
try {
|
||||
if (latestObj.containsKey(funcType)) {
|
||||
current = latestObj.getIntValue(funcType);
|
||||
}
|
||||
} catch (Exception ignore) {
|
||||
}
|
||||
|
||||
if (current != null && current == 1) {
|
||||
// 新增:自动关也走分布式锁(避免与前端并发控制同一功能导致乱序/互相覆盖)
|
||||
String lockKey = "lock:" + deviceId + ":" + funcType;
|
||||
Boolean lockSuccess = stringRedisTemplate.opsForValue().setIfAbsent(
|
||||
lockKey, "autooff", 15, TimeUnit.SECONDS
|
||||
);
|
||||
if (lockSuccess == null || !lockSuccess) {
|
||||
log.info("【自动关任务】{}功能忙(锁占用),跳过自动关闭:deviceId={}, funcType={}", funcType, deviceId, funcType);
|
||||
return;
|
||||
}
|
||||
JSONObject down = new JSONObject();
|
||||
down.put(funcType, 0);
|
||||
|
||||
String deviceTopic = "dtu/" + deviceId + "/down";
|
||||
//todo
|
||||
mqttMessageSender.publish(deviceTopic, down.toJSONString());
|
||||
log.info("【自动关任务】检测仍在运行,已下发关闭:deviceId={}, funcType={}, payload={}", deviceId, funcType, down.toJSONString());
|
||||
} else {
|
||||
log.info("【自动关任务】检测未运行或状态未知,跳过关闭:deviceId={}, funcType={}, current={}", deviceId, funcType, current);
|
||||
}
|
||||
}
|
||||
|
||||
// 新增:收到“关”指令时,尝试取消对应自动关任务(优化:减少无意义任务执行;正确性仍以到点状态判断为准)
|
||||
public void cancelAutoOff(String deviceId, String funcType) {
|
||||
if (!StringUtils.hasText(deviceId) || !StringUtils.hasText(funcType)) {
|
||||
return;
|
||||
}
|
||||
String taskKey = "autooff:" + deviceId + ":" + funcType;
|
||||
// 同设备同功能只保留最后一次任务:只有旧任务还没开始时才替换
|
||||
ScheduledFuture<?> oldFuture = autoOffFutureMap.get(taskKey);
|
||||
if (oldFuture != null) {
|
||||
// cancel=false 说明任务已开始/已完成,避免双执行:不再创建新任务
|
||||
if (!oldFuture.cancel(false)) {
|
||||
return;
|
||||
}
|
||||
// cancel成功:旧任务不会跑了,这时再remove并减计数
|
||||
autoOffFutureMap.remove(taskKey, oldFuture);
|
||||
decAutoOffCnt(deviceId);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,278 @@
|
|||
package com.agri.framework.manager;
|
||||
|
||||
import com.agri.framework.web.dispatcher.MqttMessageDispatcher;
|
||||
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
||||
import org.eclipse.paho.client.mqttv3.MqttCallback;
|
||||
import org.eclipse.paho.client.mqttv3.MqttClient;
|
||||
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
||||
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.SmartLifecycle;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* MQTT客户端生命周期管理器
|
||||
* 核心功能:
|
||||
* 1. 订阅设备状态、前端控制指令主题
|
||||
* 2. MQTT客户端启动/停止/重连
|
||||
* 3. 管理自动关闭任务线程池
|
||||
* 4. 监听MQTT连接状态,设置消息回调
|
||||
* 适配JDK 8,无心跳包相关逻辑
|
||||
*/
|
||||
@Component
|
||||
public class MqttClientManager implements SmartLifecycle {
|
||||
|
||||
/**
|
||||
* 优化:统一使用SLF4J日志(JDK 8兼容)
|
||||
*/
|
||||
private static final Logger log = LoggerFactory.getLogger(MqttClientManager.class);
|
||||
|
||||
/**
|
||||
* 新增:生命周期管理标识,控制MQTT客户端启动/关闭
|
||||
*/
|
||||
private final AtomicBoolean isRunning = new AtomicBoolean(false);
|
||||
|
||||
/**
|
||||
* MQTT客户端(由MqttConfig配置类注入)
|
||||
*/
|
||||
@Resource
|
||||
private MqttClient mqttClient;
|
||||
|
||||
/**
|
||||
* MQTT连接配置项(从MqttConfig注入)
|
||||
*/
|
||||
@Resource
|
||||
private MqttConnectOptions mqttConnectOptions;
|
||||
|
||||
/**
|
||||
* MQTT消息分发器,转发消息到对应处理器
|
||||
*/
|
||||
@Resource
|
||||
private MqttMessageDispatcher mqttMessageDispatcher;
|
||||
|
||||
/**
|
||||
* 自动关任务管理器,初始化/关闭线程池
|
||||
*/
|
||||
@Resource
|
||||
private MqttAutoOffManager mqttAutoOffManager;
|
||||
|
||||
// 读取配置文件中的默认订阅主题(移除心跳主题)
|
||||
@Value("${spring.mqtt.default-topic}")
|
||||
private String defaultTopic;
|
||||
|
||||
// 新增:自动关闭任务线程池核心线程数(可配置)
|
||||
@Value("${spring.mqtt.auto-off-thread-pool-size:5}")
|
||||
private int autoOffThreadPoolSize;
|
||||
|
||||
/**
|
||||
* 初始化:订阅主题+设置回调
|
||||
* (移除@PostConstruct,改为由SmartLifecycle的start()触发)
|
||||
* <p>
|
||||
* 【方案A】不做自写重连;Paho会在连接断开后自动重连(前提:connectOptions.setAutomaticReconnect(true))
|
||||
*/
|
||||
public void subscribeTopics() throws MqttException {
|
||||
// 关键补充1:判空
|
||||
if (mqttClient == null) {
|
||||
log.error("【MQTT初始化】客户端实例为空,无法订阅主题");
|
||||
throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
|
||||
}
|
||||
|
||||
// 关键补充:先确保客户端已连接(如果没连则连接,已连则跳过)
|
||||
// 注意:这里只使用同一个client实例,避免sender与handler使用不同client
|
||||
if (!mqttClient.isConnected()) {
|
||||
try {
|
||||
// 使用注入的连接配置项连接Broker(带用户名密码、自动重连等配置)
|
||||
mqttClient.connect(mqttConnectOptions);
|
||||
log.info("【MQTT连接】客户端已成功连接到Broker,clientId:{}", mqttClient.getClientId());
|
||||
} catch (MqttException e) {
|
||||
log.error("【MQTT连接】连接Broker失败,clientId:{}", mqttClient.getClientId(), e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
// 解析配置的主题列表
|
||||
String[] topics = defaultTopic.split(",");
|
||||
int[] qosArray = new int[topics.length];
|
||||
// 按主题类型设置QoS:控制指令/状态用QoS 1
|
||||
for (int i = 0; i < topics.length; i++) {
|
||||
qosArray[i] = 0;
|
||||
topics[i] = topics[i].trim();
|
||||
}
|
||||
|
||||
// 设置MQTT消息回调:处理连接断开、消息接收、消息发布完成
|
||||
mqttClient.setCallback(new MqttCallback() {
|
||||
/**
|
||||
* MQTT连接断开回调
|
||||
* @param cause 断开原因
|
||||
*/
|
||||
@Override
|
||||
public void connectionLost(Throwable cause) {
|
||||
log.error("【MQTT连接异常】连接断开,clientId:{},原因:{}",
|
||||
safeClientId(), (cause == null ? "unknown" : cause.getMessage()), cause);
|
||||
|
||||
// 【方案A】不再触发自写重连;Paho自动重连会接管重连过程
|
||||
// 这里只记录日志即可
|
||||
if (isRunning.get()) {
|
||||
log.warn("【MQTT自动重连】已开启automaticReconnect,等待Paho自动重连...");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 收到MQTT消息回调:转发到消息分发器
|
||||
* @param topic 消息主题
|
||||
* @param message 消息内容
|
||||
* @throws Exception 消息处理异常
|
||||
*/
|
||||
@Override
|
||||
public void messageArrived(String topic, MqttMessage message) throws Exception {
|
||||
// 优化:显式指定UTF-8编码,避免乱码(JDK 8兼容)
|
||||
mqttMessageDispatcher.handleMessage(topic, new String(message.getPayload(), StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
/**
|
||||
* 消息发布完成回调(仅日志记录,无业务逻辑)
|
||||
* @param token 发布令牌
|
||||
*/
|
||||
@Override
|
||||
public void deliveryComplete(IMqttDeliveryToken token) {
|
||||
if (token != null && token.getTopics() != null && token.getTopics().length > 0) {
|
||||
// log.info("【MQTT确认】消息发布完成,clientId:{},主题:{}", safeClientId(), token.getTopics()[0]);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// 【方案A关键点】不再 unsubscribe 主题
|
||||
// cleanSession=false + unsubscribe 会破坏Broker侧会话订阅;并且自动重连场景更不建议这么做
|
||||
|
||||
// 订阅主题
|
||||
mqttClient.subscribe(topics, qosArray);
|
||||
// 优化:打印clientId,方便排查
|
||||
log.info("【MQTT初始化】订阅主题完成,clientId:{},订阅主题:{}", mqttClient.getClientId(), String.join(",", topics));
|
||||
}
|
||||
|
||||
private String safeClientId() {
|
||||
try {
|
||||
return (mqttClient == null ? "null" : mqttClient.getClientId());
|
||||
} catch (Exception e) {
|
||||
return "unknown";
|
||||
}
|
||||
}
|
||||
|
||||
// ========== 手动重连接口(供Controller调用) ==========
|
||||
|
||||
/**
|
||||
* 手动触发MQTT重连(最小改动:不替换client实例,只用同一个client重连)
|
||||
*/
|
||||
public synchronized String manualReconnect() {
|
||||
isRunning.set(true);
|
||||
try {
|
||||
// 强制断开旧连接(如果存在)
|
||||
if (mqttClient != null && mqttClient.isConnected()) {
|
||||
mqttClient.disconnect();
|
||||
}
|
||||
// 重新初始化订阅(内部会connect + subscribe)
|
||||
subscribeTopics();
|
||||
log.info("【手动重连】MQTT客户端重连成功");
|
||||
return "MQTT手动重连成功";
|
||||
} catch (MqttException e) {
|
||||
log.error("【手动重连】MQTT客户端重连失败", e);
|
||||
return "MQTT手动重连失败:" + e.getMessage();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取当前MQTT连接状态
|
||||
*/
|
||||
public String getMqttStatus() {
|
||||
boolean connected = (mqttClient != null && mqttClient.isConnected());
|
||||
String status = connected ? "已连接" : "已断开";
|
||||
return String.format("MQTT连接状态:%s;clientId:%s", status, safeClientId());
|
||||
}
|
||||
|
||||
// ======================== SmartLifecycle 生命周期管理(核心修复) ========================
|
||||
|
||||
/**
|
||||
* 启动MQTT客户端(Spring上下文初始化/重启时触发)
|
||||
* 核心:替代@PostConstruct,保证上下文重启时重新初始化MQTT连接
|
||||
*/
|
||||
@Override
|
||||
public void start() {
|
||||
log.info("开始监听");
|
||||
if (isRunning.compareAndSet(false, true)) {
|
||||
try {
|
||||
// 初始化自动关任务线程池
|
||||
mqttAutoOffManager.initExecutor(autoOffThreadPoolSize);
|
||||
// 核心修改:无论是否已连接,都执行订阅
|
||||
subscribeTopics();
|
||||
log.info("【MQTT生命周期】客户端启动成功(已设置回调+订阅主题),自动关闭任务线程池大小:{}", autoOffThreadPoolSize);
|
||||
} catch (MqttException e) {
|
||||
log.error("【MQTT生命周期】客户端启动失败", e);
|
||||
isRunning.set(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 停止MQTT客户端
|
||||
* 改造点:优化多线程池的优雅关闭
|
||||
*/
|
||||
@Override
|
||||
public void stop() {
|
||||
if (isRunning.compareAndSet(true, false)) {
|
||||
try {
|
||||
// 关闭自动关任务线程池
|
||||
mqttAutoOffManager.shutdownExecutor();
|
||||
// 关闭MQTT客户端
|
||||
if (mqttClient != null) {
|
||||
if (mqttClient.isConnected()) {
|
||||
mqttClient.disconnect();
|
||||
}
|
||||
mqttClient.close();
|
||||
log.info("【MQTT生命周期】客户端已优雅关闭");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("【MQTT生命周期】客户端关闭失败", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 异步停止
|
||||
*/
|
||||
@Override
|
||||
public void stop(Runnable callback) {
|
||||
stop();
|
||||
callback.run();
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断MQTT客户端是否运行中
|
||||
*/
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return isRunning.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* 启动优先级(保证MQTT在Redis之后启动)
|
||||
*/
|
||||
@Override
|
||||
public int getPhase() {
|
||||
return 10;
|
||||
}
|
||||
|
||||
/**
|
||||
* 是否自动启动(默认true,Spring上下文初始化时自动调用start())
|
||||
*/
|
||||
@Override
|
||||
public boolean isAutoStartup() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,317 @@
|
|||
package com.agri.framework.manager;
|
||||
|
||||
import com.agri.common.utils.SecurityUtils;
|
||||
import com.agri.system.domain.SysAgriInfo;
|
||||
import com.agri.system.service.ISysAgriInfoService;
|
||||
import org.apache.commons.collections4.CollectionUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.dao.DataAccessException;
|
||||
import org.springframework.data.redis.connection.RedisConnection;
|
||||
import org.springframework.data.redis.core.RedisCallback;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.data.redis.serializer.RedisSerializer;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* MQTT订阅关系管理器
|
||||
* 核心功能:
|
||||
* 1. 维护前端-设备双向订阅关系(Redis)
|
||||
* 2. 前端单设备/全量设备订阅/取消订阅
|
||||
* 3. Redis批量操作(pipeline),提升性能
|
||||
* 4. 清理无效订阅关系,避免脏数据
|
||||
* 适配JDK 8,无心跳包相关逻辑
|
||||
*/
|
||||
@Component
|
||||
public class MqttSubscriptionManager {
|
||||
/**
|
||||
* 优化:统一使用SLF4J日志(JDK 8兼容)
|
||||
*/
|
||||
private static final Logger log = LoggerFactory.getLogger(MqttSubscriptionManager.class);
|
||||
|
||||
/**
|
||||
* Redis模板,用于存储订阅关系、设备在线状态、分布式锁
|
||||
*/
|
||||
@Resource
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
/**
|
||||
* 农业信息服务,查询用户名下设备
|
||||
*/
|
||||
@Resource
|
||||
private ISysAgriInfoService agriInfoService;
|
||||
|
||||
// 新增:前端订阅关系TTL(兜底“取消订阅失败/异常退出”)——只维护subc:{clientId}的TTL
|
||||
@Value("${spring.mqtt.subc-ttl-seconds:3600}")
|
||||
private int subcTtlSeconds;
|
||||
|
||||
// 新增:pipeline 批量 SISMEMBER subc:{clientId} deviceId(N次->1次往返) 拿取失效的client
|
||||
public List<Boolean> pipeIsMemberSubc(List<String> clientIds, String deviceId) {
|
||||
if (clientIds == null || clientIds.isEmpty() || !StringUtils.hasText(deviceId)) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
// ✅ 关键:不发“占位命令”;只对有效clientId发SISMEMBER,同时保证返回结果与入参严格对齐
|
||||
int n = clientIds.size();
|
||||
// 创建长度n全部false的队列
|
||||
List<Boolean> out = new ArrayList<>(Collections.nCopies(n, Boolean.FALSE));
|
||||
List<Integer> idx = new ArrayList<>(n);
|
||||
|
||||
for (int i = 0; i < n; i++) {
|
||||
if (StringUtils.hasText(clientIds.get(i))) {
|
||||
// 符合条件存进clientIds对应的索引
|
||||
idx.add(i);
|
||||
}
|
||||
}
|
||||
if (idx.isEmpty()) {
|
||||
return out;
|
||||
}
|
||||
|
||||
// 处理每个每个clientId的值是否还在 值存在rs中 执行 stringRedisTemplate.executePipelined
|
||||
List<Object> rs = stringRedisTemplate.executePipelined((RedisCallback<Object>) connection -> {
|
||||
RedisSerializer<String> serializer = stringRedisTemplate.getStringSerializer();
|
||||
byte[] member = serializer.serialize(deviceId);
|
||||
for (int i : idx) {
|
||||
String clientId = clientIds.get(i);
|
||||
// 存放命令
|
||||
connection.sIsMember(serializer.serialize("subc:" + clientId), member);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
|
||||
for (int j = 0; j < idx.size() && j < (rs == null ? 0 : rs.size()); j++) {
|
||||
out.set(idx.get(j), Boolean.TRUE.equals(rs.get(j)));
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
// 对应的subc不存在 删除对应的sub pipeline 批量 SREM sub:{deviceId} clientId(清理残留 N次->1次往返)
|
||||
public void pipeSRemSub(String deviceId, List<String> staleClientIds) {
|
||||
if (!StringUtils.hasText(deviceId) || staleClientIds == null || staleClientIds.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
stringRedisTemplate.executePipelined((RedisCallback<Object>) connection -> {
|
||||
RedisSerializer<String> serializer = stringRedisTemplate.getStringSerializer();
|
||||
byte[] subKey = serializer.serialize("sub:" + deviceId);
|
||||
for (String clientId : staleClientIds) {
|
||||
if (StringUtils.hasText(clientId)) {
|
||||
connection.sRem(subKey, serializer.serialize(clientId));
|
||||
}
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 前端订阅设备(Controller调用)
|
||||
*/
|
||||
public void subscribeDevice(String clientId, String deviceId) {
|
||||
if (!StringUtils.hasText(clientId) || !StringUtils.hasText(deviceId)) {
|
||||
log.error("【订阅管理】clientId或deviceId不能为空");
|
||||
throw new IllegalArgumentException("clientId和deviceId不能为空");
|
||||
}
|
||||
|
||||
// 保存订阅关系到Redis
|
||||
stringRedisTemplate.opsForSet().add("sub:" + deviceId, clientId);
|
||||
stringRedisTemplate.opsForSet().add("subc:" + clientId, deviceId);
|
||||
|
||||
// 新增:订阅成功后给subc设置TTL(兜底“取消订阅失败/异常退出”)
|
||||
stringRedisTemplate.expire("subc:" + clientId, subcTtlSeconds, TimeUnit.SECONDS);
|
||||
|
||||
log.info("【订阅管理】前端{}订阅设备{}成功", clientId, deviceId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 前端取消订阅设备状态接口(供Controller层调用)
|
||||
* 逻辑:从设备的订阅列表移除前端clientId
|
||||
*
|
||||
* @param clientId 前端唯一标识
|
||||
* @param deviceId 设备ID
|
||||
*/
|
||||
public void unsubscribeDevice(String clientId, String deviceId) {
|
||||
if (!StringUtils.hasText(clientId) || !StringUtils.hasText(deviceId)) {
|
||||
log.error("【前端取消订阅】clientId或deviceId不能为空");
|
||||
throw new IllegalArgumentException("clientId和deviceId不能为空");
|
||||
}
|
||||
|
||||
// 从Redis删除订阅关系
|
||||
stringRedisTemplate.opsForSet().remove("sub:" + deviceId, clientId);
|
||||
stringRedisTemplate.opsForSet().remove("subc:" + clientId, deviceId);
|
||||
log.info("【前端取消订阅】前端{}取消订阅设备{}成功", clientId, deviceId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 全量订阅:前端订阅指定用户名下的所有设备(Controller调用)
|
||||
* @param clientId 前端唯一标识(如web_001、app_002)
|
||||
* @return 订阅成功的设备数量
|
||||
*/
|
||||
public int subscribeAllDeviceByUserId(String clientId) {
|
||||
// 1. 入参校验
|
||||
if (!StringUtils.hasText(clientId)) {
|
||||
log.error("【全量订阅】clientId不能为空");
|
||||
throw new IllegalArgumentException("clientId不能为空");
|
||||
}
|
||||
|
||||
|
||||
// 2. Redis连接可用性校验
|
||||
try {
|
||||
stringRedisTemplate.hasKey("test:connection");
|
||||
} catch (Exception e) {
|
||||
log.warn("【全量订阅】Redis连接不可用,订阅操作跳过:{}", e.getMessage());
|
||||
return 0;
|
||||
}
|
||||
Long userId = SecurityUtils.getLoginUser().getUserId();
|
||||
// 3. 查询该用户名下的所有设备ID(替换为你的实际设备查询逻辑)
|
||||
List<String> deviceIds = new ArrayList<>(queryImeiByUserId(userId));
|
||||
if (userId == 1) {
|
||||
deviceIds.add("862538065276061");
|
||||
}
|
||||
if (deviceIds == null || deviceIds.isEmpty()) {
|
||||
log.warn("【全量订阅】用户{}名下无可用设备", userId);
|
||||
return 0;
|
||||
}
|
||||
// 过滤空设备ID,避免无效操作
|
||||
List<String> validDeviceIds = deviceIds.stream()
|
||||
.filter(StringUtils::hasText)
|
||||
.distinct()
|
||||
.collect(Collectors.toList());
|
||||
if (validDeviceIds.isEmpty()) {
|
||||
log.warn("【全量订阅】用户{}名下无有效设备ID", userId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// 4. 批量写入Redis订阅关系(兼容JDK 8的RedisCallback写法)
|
||||
try {
|
||||
stringRedisTemplate.execute(new RedisCallback<Void>() {
|
||||
@Override
|
||||
public Void doInRedis(RedisConnection connection) throws DataAccessException {
|
||||
// 获取String序列化器(和stringRedisTemplate保持一致)
|
||||
RedisSerializer<String> serializer = stringRedisTemplate.getStringSerializer();
|
||||
|
||||
// 开启Redis事务
|
||||
connection.multi();
|
||||
byte[] clientIdBytes = serializer.serialize(clientId);
|
||||
// 4.1 设备→前端:给每个设备的订阅集合添加clientId
|
||||
for (String deviceId : validDeviceIds) {
|
||||
byte[] subKey = serializer.serialize("sub:" + deviceId);
|
||||
connection.sAdd(subKey, clientIdBytes);
|
||||
}
|
||||
|
||||
// 4.2 前端→设备:给前端的订阅集合批量添加所有设备ID
|
||||
byte[] subcKey = serializer.serialize("subc:" + clientId);
|
||||
byte[][] deviceIdBytesArray = new byte[validDeviceIds.size()][];
|
||||
for (int i = 0; i < validDeviceIds.size(); i++) {
|
||||
deviceIdBytesArray[i] = serializer.serialize(validDeviceIds.get(i));
|
||||
}
|
||||
connection.sAdd(subcKey, deviceIdBytesArray);
|
||||
|
||||
// 新增:给subc设置TTL(兜底“取消订阅失败/异常退出”)
|
||||
connection.expire(subcKey, subcTtlSeconds);
|
||||
|
||||
// 执行事务
|
||||
connection.exec();
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
log.info("【全量订阅】前端{}成功订阅用户{}名下的{}个设备,设备列表:{}",
|
||||
clientId, userId, validDeviceIds.size(), validDeviceIds);
|
||||
return validDeviceIds.size();
|
||||
} catch (Exception e) {
|
||||
log.error("【全量------订阅】前端{}订阅用户{}名下设备失败", clientId, userId, e);
|
||||
throw new RuntimeException("全量订阅失败:" + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 全量取消:前端取消订阅的所有设备(即用户名下所有设备)
|
||||
* @param clientId 前端唯一标识
|
||||
* @return 需要前端取消监听的MQTT主题列表
|
||||
*/
|
||||
public List<String> unsubscribeAllDevice(String clientId) {
|
||||
// 1. 入参校验
|
||||
if (!StringUtils.hasText(clientId)) {
|
||||
log.error("【全量取消】clientId不能为空");
|
||||
throw new IllegalArgumentException("clientId不能为空");
|
||||
}
|
||||
|
||||
// 2. Redis连接可用性校验
|
||||
try {
|
||||
stringRedisTemplate.hasKey("test:connection");
|
||||
} catch (Exception e) {
|
||||
log.warn("【全量取消】Redis连接不可用,取消操作跳过:{}", e.getMessage());
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
// 3. 查询该前端订阅的所有设备ID(即用户名下所有设备)
|
||||
Set<String> deviceSet = stringRedisTemplate.opsForSet().members("subc:" + clientId);
|
||||
if (deviceSet == null || deviceSet.isEmpty()) {
|
||||
log.warn("【全量取消】前端{}无订阅的设备", clientId);
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
// 4. 构建需要取消的MQTT主题列表
|
||||
List<String> frontendTopics = new ArrayList<>();
|
||||
for (String deviceId : deviceSet) {
|
||||
frontendTopics.add("frontend/" + clientId + "/dtu/" + deviceId + "/listener");
|
||||
}
|
||||
|
||||
// 5. 批量删除Redis订阅关系(兼容JDK 8的RedisCallback写法)
|
||||
try {
|
||||
stringRedisTemplate.execute(new RedisCallback<Void>() {
|
||||
@Override
|
||||
public Void doInRedis(RedisConnection connection) throws DataAccessException {
|
||||
RedisSerializer<String> serializer = stringRedisTemplate.getStringSerializer();
|
||||
|
||||
// 开启事务
|
||||
connection.multi();
|
||||
byte[] clientIdBytes = serializer.serialize(clientId);
|
||||
// 5.1 批量删除设备→前端的订阅关系
|
||||
for (String deviceId : deviceSet) {
|
||||
byte[] subKey = serializer.serialize("sub:" + deviceId);
|
||||
connection.sRem(subKey, clientIdBytes);
|
||||
}
|
||||
|
||||
// 5.2 删除前端→设备的反向索引(核心:清空该前端的所有订阅设备)
|
||||
byte[] subcKey = serializer.serialize("subc:" + clientId);
|
||||
connection.del(subcKey);
|
||||
|
||||
// 执行事务
|
||||
connection.exec();
|
||||
return null;
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
log.error("【全量取消】Redis批量删除失败", e);
|
||||
throw new RuntimeException("全量取消订阅失败:" + e.getMessage());
|
||||
}
|
||||
log.info("【全量取消】前端{}成功取消{}个设备的订阅", clientId, deviceSet.size());
|
||||
return frontendTopics;
|
||||
}
|
||||
|
||||
/**
|
||||
* 实际业务中:查询指定用户名下的所有设备ID(需替换为你的DAO/Service逻辑)
|
||||
* @return 设备ID列表
|
||||
*/
|
||||
private List<String> queryImeiByUserId(Long userId) {
|
||||
// 示例:替换为你项目中查询用户设备的实际代码
|
||||
// 比如:return deviceService.listDeviceIdsByUserId(userId);
|
||||
List<SysAgriInfo> agriInfos = agriInfoService.lambdaQuery()
|
||||
.eq(SysAgriInfo::getUserId, userId)
|
||||
.list();
|
||||
if (CollectionUtils.isEmpty(agriInfos)) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
return agriInfos.stream().map(SysAgriInfo::getImei).collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,69 @@
|
|||
package com.agri.framework.web.dispatcher;
|
||||
|
||||
import com.agri.framework.interceptor.DeviceStatusHandler;
|
||||
import com.agri.framework.interceptor.FrontendControlHandler;
|
||||
import com.agri.framework.interceptor.FrontendOnlineHandler;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
/**
|
||||
* MQTT消息分发器
|
||||
* 核心功能:根据Topic类型路由消息到对应处理器
|
||||
* 适配JDK 8,无心跳包相关逻辑
|
||||
*/
|
||||
@Component
|
||||
public class MqttMessageDispatcher {
|
||||
/**
|
||||
* 优化:统一使用SLF4J日志(JDK 8兼容)
|
||||
*/
|
||||
private static final Logger log = LoggerFactory.getLogger(MqttMessageDispatcher.class);
|
||||
|
||||
/**
|
||||
* 设备状态处理器
|
||||
*/
|
||||
@Resource
|
||||
private DeviceStatusHandler deviceStatusHandler;
|
||||
|
||||
/**
|
||||
* 前端控制指令处理器
|
||||
*/
|
||||
@Resource
|
||||
private FrontendControlHandler frontendControlHandler;
|
||||
|
||||
/**
|
||||
* 前端在线心跳处理器
|
||||
*/
|
||||
@Resource
|
||||
private FrontendOnlineHandler frontendOnlineHandler;
|
||||
|
||||
/**
|
||||
* 消息分发处理:根据主题类型路由到不同处理方法
|
||||
* 可以监听到设备传过来的业务数据 以及前端传过来的控制设备指令
|
||||
*
|
||||
* @param topic 消息主题
|
||||
* @param payload 消息内容(JSON字符串)
|
||||
*/
|
||||
public void handleMessage(String topic, String payload) {
|
||||
try {
|
||||
// log.info("【MQTT接收】topic={}, payload={}", topic, payload);
|
||||
|
||||
// 设备状态主题:dtu/{deviceId}/up
|
||||
if (topic.matches("dtu/\\w+/up")) {
|
||||
deviceStatusHandler.handle(topic, payload);
|
||||
}
|
||||
// 处理前端控制指令主题:frontend/{clientId}/control/{deviceId}
|
||||
else if (topic.matches("frontend/\\w+/control/\\w+")) {
|
||||
frontendControlHandler.handle(topic, payload);
|
||||
}
|
||||
// 新增:前端在线心跳主题:frontend/{clientId}/online
|
||||
else if (topic.matches("frontend/\\w+/online")) {
|
||||
frontendOnlineHandler.handle(topic, payload);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("【MQTT消息处理异常】topic={}", topic, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue