mqtt延时任务

feasure
xce 2026-01-17 05:58:44 +08:00
parent 45961455eb
commit ec17992c55
4 changed files with 230 additions and 56 deletions

View File

@ -1,13 +1,9 @@
package com.agri.web.controller.monitor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import com.agri.common.constant.CacheConstants;
import com.agri.common.core.domain.AjaxResult;
import com.agri.common.utils.StringUtils;
import com.agri.system.domain.SysCache;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
@ -17,10 +13,15 @@ import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.agri.common.constant.CacheConstants;
import com.agri.common.core.domain.AjaxResult;
import com.agri.common.utils.StringUtils;
import com.agri.system.domain.SysCache;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
/**
*
@ -43,6 +44,8 @@ public class CacheController
caches.add(new SysCache(CacheConstants.REPEAT_SUBMIT_KEY, "防重提交"));
caches.add(new SysCache(CacheConstants.RATE_LIMIT_KEY, "限流处理"));
caches.add(new SysCache(CacheConstants.PWD_ERR_CNT_KEY, "密码错误次数"));
caches.add(new SysCache(CacheConstants.SUB_CLIENT_ID, "Redis订阅关系"));
caches.add(new SysCache(CacheConstants.SUB_IMEI, "Redis订阅关系【反向】"));
}
@PreAuthorize("@ss.hasPermi('monitor:cache:list')")

View File

@ -1,5 +1,8 @@
package com.agri.web.controller.mqtt;
import com.agri.common.annotation.Log;
import com.agri.common.core.domain.AjaxResult;
import com.agri.common.enums.BusinessType;
import com.agri.framework.interceptor.MqttMessageHandler;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
@ -30,6 +33,7 @@ public class MqttController {
*
*/
@PostMapping("/single")
@Log(title = "订阅主题", businessType = BusinessType.INSERT)
public String subscribe(@RequestParam String clientId, @RequestParam String deviceId) {
try {
mqttMessageHandler.subscribeDevice(clientId, deviceId);
@ -63,7 +67,27 @@ public class MqttController {
/**
*
*/
@DeleteMapping("/batch")
@PostMapping("/batchSubscribe")
@Log(title = "批量订阅主题", businessType = BusinessType.INSERT)
public AjaxResult subscribeAll(@RequestParam String clientId) {
try {
// 返回前端需要取消的MQTT主题列表
return AjaxResult.success(mqttMessageHandler.subscribeAllDeviceByUserId(clientId));
} catch (IllegalArgumentException e) {
log.error("MQTT批量订阅失败{}", e.getMessage());
// 异常时返回空列表,避免前端解析失败
return AjaxResult.error("MQTT批量订阅失败",0);
} catch (Exception e) {
log.error("MQTT批量订阅异常", e);
return AjaxResult.error("MQTT批量订阅异常",0);
}
}
/**
*
*/
@DeleteMapping("/batchUnsubscribe")
@Log(title = "批量取消订阅主题", businessType = BusinessType.DELETE)
public List<String> unsubscribeAll(@RequestParam String clientId) {
try {
// 返回前端需要取消的MQTT主题列表
@ -77,12 +101,12 @@ public class MqttController {
return Lists.newArrayList();
}
}
/**
* MQTT
*
*/
@GetMapping("/reconnect")
@Log(title = "手动触发MQTT重连", businessType = BusinessType.OTHER)
public String manualReconnect() {
try {
return mqttMessageHandler.manualReconnect();
@ -97,6 +121,7 @@ public class MqttController {
* 便
*/
@GetMapping("/status")
@Log(title = "手动触发MQTT重连", businessType = BusinessType.SELECT)
public String getMqttStatus() {
try {
return mqttMessageHandler.getMqttStatus();

View File

@ -41,4 +41,6 @@ public class CacheConstants
* redis key
*/
public static final String PWD_ERR_CNT_KEY = "pwd_err_cnt:";
public static final String SUB_CLIENT_ID = "subc:";
public static final String SUB_IMEI = "sub:";
}

View File

@ -1,11 +1,15 @@
package com.agri.framework.interceptor;
import com.agri.common.utils.SecurityUtils;
import com.agri.framework.config.MqttConfig;
import com.agri.system.domain.SysAgriInfo;
import com.agri.system.domain.SysAgriLimit;
import com.agri.system.service.ISysAgriInfoService;
import com.agri.system.service.ISysAgriLimitService;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.TypeReference;
import org.apache.commons.collections4.CollectionUtils;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
@ -17,10 +21,13 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.SmartLifecycle;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@ -41,8 +48,12 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
*
* * "frontend/" + clientId + "/dtu/" + deviceId + "/listener"
* * frontend/+/control/+
* MQTT
*
* 1.
@ -112,8 +123,11 @@ public class MqttMessageHandler implements SmartLifecycle {
@Autowired
private ISysAgriLimitService agriLimitService;
@Autowired
private ISysAgriInfoService agriInfoService;
// 初始化映射(建议放在类初始化块/构造方法中,只初始化一次)
private static final Map<String, Function<SysAgriLimit, Integer>> LIMIT_MAP = new HashMap<>();
private static final Set<String> VALID_FUNC_CODES = new HashSet<>();
static {
LIMIT_MAP.put("jm1g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1gLimit())));
LIMIT_MAP.put("jm2g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2gLimit())));
@ -123,6 +137,15 @@ public class MqttMessageHandler implements SmartLifecycle {
LIMIT_MAP.put("jm3k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3kLimit())));
LIMIT_MAP.put("jbk", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbkLimit())));
LIMIT_MAP.put("jm1k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1kLimit())));
VALID_FUNC_CODES.add("jm1g");
VALID_FUNC_CODES.add("jm2g");
VALID_FUNC_CODES.add("jbg");
VALID_FUNC_CODES.add("jm3g");
VALID_FUNC_CODES.add("jm2k");
VALID_FUNC_CODES.add("jm3k");
VALID_FUNC_CODES.add("jbk");
VALID_FUNC_CODES.add("jm1k");
}
/**
@ -431,6 +454,7 @@ public class MqttMessageHandler implements SmartLifecycle {
down.put(funcType, 0);
String deviceTopic = "dtu/" + deviceId + "/down";
//todo
mqttMessageSender.publish(deviceTopic, down.toJSONString());
log.info("【自动关任务】检测仍在运行已下发关闭deviceId={}, funcType={}, payload={}", deviceId, funcType, down.toJSONString());
} else {
@ -500,7 +524,7 @@ public class MqttMessageHandler implements SmartLifecycle {
// 4. 转发指令到设备
String deviceTopic = "dtu/" + deviceId + "/down";
//todo
// mqttMessageSender.publish(deviceTopic, payload);
mqttMessageSender.publish(deviceTopic, payload);
log.info("【指令转发】前端{} → 设备{}的{}功能", clientId, deviceId, funcType);
}
@ -555,79 +579,184 @@ public class MqttMessageHandler implements SmartLifecycle {
}
/**
* clientId
*
* @param clientId wx_123
* @return MQTT
* Controller
* @param clientId web_001app_002
* @return
*/
public List<String> unsubscribeAllDevice(String clientId) {
public int subscribeAllDeviceByUserId(String clientId) {
// 1. 入参校验
if (!StringUtils.hasText(clientId)) {
log.error("【批量取消】clientId不能为空");
log.error("【全量订阅】clientId不能为空");
throw new IllegalArgumentException("clientId不能为空");
}
// 适配低版本的Redis连接可用性校验替换掉isRunning()
// 2. Redis连接可用性校验
try {
stringRedisTemplate.hasKey("test:connection");
} catch (Exception e) {
log.warn("Redis连接不可用取消订阅操作跳过{}", e.getMessage());
log.warn("【全量订阅】Redis连接不可用订阅操作跳过{}", e.getMessage());
return 0;
}
Long userId = SecurityUtils.getLoginUser().getUserId();
// 3. 查询该用户名下的所有设备ID替换为你的实际设备查询逻辑
List<String> deviceIds = queryImeiByUserId(userId);
if (userId == 1) {
deviceIds.add("862538065276061");
}
if (deviceIds == null || deviceIds.isEmpty()) {
log.warn("【全量订阅】用户{}名下无可用设备", userId);
return 0;
}
// 过滤空设备ID避免无效操作
List<String> validDeviceIds = deviceIds.stream()
.filter(StringUtils::hasText)
.distinct()
.collect(Collectors.toList());
if (validDeviceIds.isEmpty()) {
log.warn("【全量订阅】用户{}名下无有效设备ID", userId);
return 0;
}
// 4. 批量写入Redis订阅关系兼容JDK 8的RedisCallback写法
try {
stringRedisTemplate.execute(new RedisCallback<Void>() {
@Override
public Void doInRedis(RedisConnection connection) throws DataAccessException {
// 获取String序列化器和stringRedisTemplate保持一致
RedisSerializer<String> serializer = stringRedisTemplate.getStringSerializer();
// 开启Redis事务
connection.multi();
// 4.1 设备→前端给每个设备的订阅集合添加clientId
for (String deviceId : validDeviceIds) {
byte[] subKey = serializer.serialize("sub:" + deviceId);
byte[] clientIdBytes = serializer.serialize(clientId);
connection.sAdd(subKey, clientIdBytes);
}
// 4.2 前端→设备给前端的订阅集合批量添加所有设备ID
byte[] subcKey = serializer.serialize("subc:" + clientId);
byte[][] deviceIdBytesArray = new byte[validDeviceIds.size()][];
for (int i = 0; i < validDeviceIds.size(); i++) {
deviceIdBytesArray[i] = serializer.serialize(validDeviceIds.get(i));
}
connection.sAdd(subcKey, deviceIdBytesArray);
// 执行事务
connection.exec();
return null;
}
});
log.info("【全量订阅】前端{}成功订阅用户{}名下的{}个设备,设备列表:{}",
clientId, userId, validDeviceIds.size(), validDeviceIds);
return validDeviceIds.size();
} catch (Exception e) {
log.error("【全量订阅】前端{}订阅用户{}名下设备失败", clientId, userId, e);
throw new RuntimeException("全量订阅失败:" + e.getMessage());
}
}
/**
*
* @param clientId
* @return MQTT
*/
public List<String> unsubscribeAllDevice(String clientId) {
// 1. 入参校验
if (!StringUtils.hasText(clientId)) {
log.error("【全量取消】clientId不能为空");
throw new IllegalArgumentException("clientId不能为空");
}
// 2. Redis连接可用性校验
try {
stringRedisTemplate.hasKey("test:connection");
} catch (Exception e) {
log.warn("【全量取消】Redis连接不可用取消操作跳过{}", e.getMessage());
return Collections.emptyList();
}
// 步骤1查询该前端订阅的所有设备ID生产环境用Scan替代Keys避免阻塞Redis
// 3. 查询该前端订阅的所有设备ID即用户名下所有设备
Set<String> deviceSet = stringRedisTemplate.opsForSet().members("subc:" + clientId);
if (deviceSet == null || deviceSet.isEmpty()) {
log.warn("【全量取消】前端{}无订阅的设备", clientId);
return Collections.emptyList();
}
// 4. 构建需要取消的MQTT主题列表
List<String> frontendTopics = new ArrayList<>();
for (String deviceId : deviceSet) {
String subKey = "sub:" + deviceId;
stringRedisTemplate.opsForSet().remove(subKey, clientId);
String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";
frontendTopics.add(frontendTopic);
log.info("【批量取消】前端{}取消设备{}订阅", clientId, deviceId);
frontendTopics.add("frontend/" + clientId + "/dtu/" + deviceId + "/listener");
}
// 删除反向索引
stringRedisTemplate.delete("subc:" + clientId);
// 5. 批量删除Redis订阅关系兼容JDK 8的RedisCallback写法
try {
stringRedisTemplate.execute(new RedisCallback<Void>() {
@Override
public Void doInRedis(RedisConnection connection) throws DataAccessException {
RedisSerializer<String> serializer = stringRedisTemplate.getStringSerializer();
// 步骤2清理该前端的分布式锁可选防止死锁
Set<String> lockKeys = scanRedisKeys("lock:*");
if (lockKeys != null && !lockKeys.isEmpty()) {
for (String lockKey : lockKeys) {
String lockValue = stringRedisTemplate.opsForValue().get(lockKey);
if (clientId.equals(lockValue)) {
stringRedisTemplate.delete(lockKey);
log.info("【批量取消】清理前端{}持有的锁:{}", clientId, lockKey);
// 开启事务
connection.multi();
// 5.1 批量删除设备→前端的订阅关系
for (String deviceId : deviceSet) {
byte[] subKey = serializer.serialize("sub:" + deviceId);
byte[] clientIdBytes = serializer.serialize(clientId);
connection.sRem(subKey, clientIdBytes);
}
// 5.2 删除前端→设备的反向索引(核心:清空该前端的所有订阅设备)
byte[] subcKey = serializer.serialize("subc:" + clientId);
connection.del(subcKey);
// 执行事务
connection.exec();
return null;
}
}
});
} catch (Exception e) {
log.error("【全量取消】Redis批量删除失败", e);
throw new RuntimeException("全量取消订阅失败:" + e.getMessage());
}
log.info("【批量取消】前端{}共取消{}个设备订阅", clientId, deviceSet.size());
// 6. 清理该前端持有的分布式锁(精准扫描,提升效率)
Set<String> lockKeys = scanRedisKeys("lock:*:" + clientId);
if (lockKeys != null && !lockKeys.isEmpty()) {
stringRedisTemplate.delete(lockKeys); // 批量删除锁,无需遍历
log.info("【全量取消】清理前端{}持有的{}个分布式锁", clientId, lockKeys.size());
}
log.info("【全量取消】前端{}成功取消{}个设备的订阅", clientId, deviceSet.size());
return frontendTopics;
}
// 生产环境用Scan替代Keys避免Redis阻塞
// 保留原有scanRedisKeys方法兼容JDK 8
private Set<String> scanRedisKeys(String pattern) {
Set<String> keys = new HashSet<>();
try {
stringRedisTemplate.executeWithStickyConnection((RedisConnection connection) -> {
ScanOptions scanOptions = ScanOptions.scanOptions()
.match(pattern)
.count(100)
.build();
Cursor<byte[]> cursor = connection.scan(scanOptions);
while (cursor.hasNext()) {
byte[] keyBytes = cursor.next();
String key = stringRedisTemplate.getStringSerializer().deserialize(keyBytes);
if (key != null) {
keys.add(key);
stringRedisTemplate.execute(new RedisCallback<Void>() {
@Override
public Void doInRedis(RedisConnection connection) throws DataAccessException {
RedisSerializer<String> serializer = stringRedisTemplate.getStringSerializer();
ScanOptions scanOptions = ScanOptions.scanOptions()
.match(pattern)
.count(100)
.build();
Cursor<byte[]> cursor = connection.scan(scanOptions);
while (cursor.hasNext()) {
byte[] keyBytes = cursor.next();
String key = serializer.deserialize(keyBytes);
if (key != null) {
keys.add(key);
}
}
cursor.close();
return null;
}
cursor.close();
return null;
});
} catch (Exception e) {
log.error("Redis Scan查询失败pattern={}", pattern, e);
@ -635,6 +764,21 @@ public class MqttMessageHandler implements SmartLifecycle {
return keys;
}
/**
* IDDAO/Service
* @return ID
*/
private List<String> queryImeiByUserId(Long userId) {
// 示例:替换为你项目中查询用户设备的实际代码
// 比如return deviceService.listDeviceIdsByUserId(userId);
List<SysAgriInfo> agriInfos = agriInfoService.lambdaQuery()
.eq(SysAgriInfo::getUserId, userId)
.list();
if (CollectionUtils.isEmpty(agriInfos)) {
return Collections.emptyList();
}
return agriInfos.stream().map(SysAgriInfo::getImei).collect(Collectors.toList());
}
// ========== 手动重连接口供Controller调用 ==========
/**