diff --git a/agri-admin/src/main/java/com/agri/web/controller/monitor/CacheController.java b/agri-admin/src/main/java/com/agri/web/controller/monitor/CacheController.java index 834ba09..4c6fef0 100644 --- a/agri-admin/src/main/java/com/agri/web/controller/monitor/CacheController.java +++ b/agri-admin/src/main/java/com/agri/web/controller/monitor/CacheController.java @@ -1,13 +1,9 @@ package com.agri.web.controller.monitor; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.TreeSet; +import com.agri.common.constant.CacheConstants; +import com.agri.common.core.domain.AjaxResult; +import com.agri.common.utils.StringUtils; +import com.agri.system.domain.SysCache; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; @@ -17,10 +13,15 @@ import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; -import com.agri.common.constant.CacheConstants; -import com.agri.common.core.domain.AjaxResult; -import com.agri.common.utils.StringUtils; -import com.agri.system.domain.SysCache; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.TreeSet; /** * 缓存监控 @@ -43,6 +44,8 @@ public class CacheController caches.add(new SysCache(CacheConstants.REPEAT_SUBMIT_KEY, "防重提交")); caches.add(new SysCache(CacheConstants.RATE_LIMIT_KEY, "限流处理")); caches.add(new SysCache(CacheConstants.PWD_ERR_CNT_KEY, "密码错误次数")); + caches.add(new SysCache(CacheConstants.SUB_CLIENT_ID, "Redis订阅关系")); + caches.add(new SysCache(CacheConstants.SUB_IMEI, "Redis订阅关系【反向】")); } @PreAuthorize("@ss.hasPermi('monitor:cache:list')") diff --git a/agri-admin/src/main/java/com/agri/web/controller/mqtt/MqttController.java b/agri-admin/src/main/java/com/agri/web/controller/mqtt/MqttController.java index 0a85317..d6fe9cb 100644 --- a/agri-admin/src/main/java/com/agri/web/controller/mqtt/MqttController.java +++ b/agri-admin/src/main/java/com/agri/web/controller/mqtt/MqttController.java @@ -1,5 +1,8 @@ package com.agri.web.controller.mqtt; +import com.agri.common.annotation.Log; +import com.agri.common.core.domain.AjaxResult; +import com.agri.common.enums.BusinessType; import com.agri.framework.interceptor.MqttMessageHandler; import com.google.common.collect.Lists; import org.slf4j.Logger; @@ -30,6 +33,7 @@ public class MqttController { * 单个订阅 */ @PostMapping("/single") + @Log(title = "订阅主题", businessType = BusinessType.INSERT) public String subscribe(@RequestParam String clientId, @RequestParam String deviceId) { try { mqttMessageHandler.subscribeDevice(clientId, deviceId); @@ -63,7 +67,27 @@ public class MqttController { /** * 批量取消当前用户的所有设备订阅 */ - @DeleteMapping("/batch") + @PostMapping("/batchSubscribe") + @Log(title = "批量订阅主题", businessType = BusinessType.INSERT) + public AjaxResult subscribeAll(@RequestParam String clientId) { + try { + // 返回前端需要取消的MQTT主题列表 + return AjaxResult.success(mqttMessageHandler.subscribeAllDeviceByUserId(clientId)); + } catch (IllegalArgumentException e) { + log.error("MQTT批量订阅失败:{}", e.getMessage()); + // 异常时返回空列表,避免前端解析失败 + return AjaxResult.error("MQTT批量订阅失败",0); + } catch (Exception e) { + log.error("MQTT批量订阅异常", e); + return AjaxResult.error("MQTT批量订阅异常",0); + } + } + + /** + * 批量取消当前用户的所有设备订阅 + */ + @DeleteMapping("/batchUnsubscribe") + @Log(title = "批量取消订阅主题", businessType = BusinessType.DELETE) public List unsubscribeAll(@RequestParam String clientId) { try { // 返回前端需要取消的MQTT主题列表 @@ -77,12 +101,12 @@ public class MqttController { return Lists.newArrayList(); } } - /** * 手动触发MQTT重连 * 场景:配置修正后,手动恢复连接(无需重启服务) */ @GetMapping("/reconnect") + @Log(title = "手动触发MQTT重连", businessType = BusinessType.OTHER) public String manualReconnect() { try { return mqttMessageHandler.manualReconnect(); @@ -97,6 +121,7 @@ public class MqttController { * 便于排查连接问题 */ @GetMapping("/status") + @Log(title = "手动触发MQTT重连", businessType = BusinessType.SELECT) public String getMqttStatus() { try { return mqttMessageHandler.getMqttStatus(); diff --git a/agri-common/src/main/java/com/agri/common/constant/CacheConstants.java b/agri-common/src/main/java/com/agri/common/constant/CacheConstants.java index 8f761f8..5013b6e 100644 --- a/agri-common/src/main/java/com/agri/common/constant/CacheConstants.java +++ b/agri-common/src/main/java/com/agri/common/constant/CacheConstants.java @@ -41,4 +41,6 @@ public class CacheConstants * 登录账户密码错误次数 redis key */ public static final String PWD_ERR_CNT_KEY = "pwd_err_cnt:"; + public static final String SUB_CLIENT_ID = "subc:"; + public static final String SUB_IMEI = "sub:"; } diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java index e07cf2a..d54ffc0 100644 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java @@ -1,11 +1,15 @@ package com.agri.framework.interceptor; +import com.agri.common.utils.SecurityUtils; import com.agri.framework.config.MqttConfig; +import com.agri.system.domain.SysAgriInfo; import com.agri.system.domain.SysAgriLimit; +import com.agri.system.service.ISysAgriInfoService; import com.agri.system.service.ISysAgriLimitService; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.TypeReference; +import org.apache.commons.collections4.CollectionUtils; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; @@ -17,10 +21,13 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.SmartLifecycle; +import org.springframework.dao.DataAccessException; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.core.Cursor; +import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.ScanOptions; import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @@ -41,8 +48,12 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +import java.util.stream.Collectors; /** + * + * * 前端监听: "frontend/" + clientId + "/dtu/" + deviceId + "/listener" + * * 前端发布主题:frontend/+/control/+ * MQTT消息处理器(无心跳包版本) * 核心功能: * 1. 订阅设备状态、前端控制指令主题 @@ -112,8 +123,11 @@ public class MqttMessageHandler implements SmartLifecycle { @Autowired private ISysAgriLimitService agriLimitService; + @Autowired + private ISysAgriInfoService agriInfoService; // 初始化映射(建议放在类初始化块/构造方法中,只初始化一次) private static final Map> LIMIT_MAP = new HashMap<>(); + private static final Set VALID_FUNC_CODES = new HashSet<>(); static { LIMIT_MAP.put("jm1g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1gLimit()))); LIMIT_MAP.put("jm2g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2gLimit()))); @@ -123,6 +137,15 @@ public class MqttMessageHandler implements SmartLifecycle { LIMIT_MAP.put("jm3k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3kLimit()))); LIMIT_MAP.put("jbk", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbkLimit()))); LIMIT_MAP.put("jm1k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1kLimit()))); + + VALID_FUNC_CODES.add("jm1g"); + VALID_FUNC_CODES.add("jm2g"); + VALID_FUNC_CODES.add("jbg"); + VALID_FUNC_CODES.add("jm3g"); + VALID_FUNC_CODES.add("jm2k"); + VALID_FUNC_CODES.add("jm3k"); + VALID_FUNC_CODES.add("jbk"); + VALID_FUNC_CODES.add("jm1k"); } /** @@ -431,6 +454,7 @@ public class MqttMessageHandler implements SmartLifecycle { down.put(funcType, 0); String deviceTopic = "dtu/" + deviceId + "/down"; + //todo mqttMessageSender.publish(deviceTopic, down.toJSONString()); log.info("【自动关任务】检测仍在运行,已下发关闭:deviceId={}, funcType={}, payload={}", deviceId, funcType, down.toJSONString()); } else { @@ -500,7 +524,7 @@ public class MqttMessageHandler implements SmartLifecycle { // 4. 转发指令到设备 String deviceTopic = "dtu/" + deviceId + "/down"; //todo - // mqttMessageSender.publish(deviceTopic, payload); + mqttMessageSender.publish(deviceTopic, payload); log.info("【指令转发】前端{} → 设备{}的{}功能", clientId, deviceId, funcType); } @@ -555,79 +579,184 @@ public class MqttMessageHandler implements SmartLifecycle { } /** - * 批量取消前端的所有设备订阅(核心:根据clientId清理所有订阅关系) - * - * @param clientId 前端唯一标识(如wx_123) - * @return 前端需要取消的MQTT主题列表(供前端批量取消) + * 全量订阅:前端订阅指定用户名下的所有设备(Controller调用) + * @param clientId 前端唯一标识(如web_001、app_002) + * @return 订阅成功的设备数量 */ - public List unsubscribeAllDevice(String clientId) { + public int subscribeAllDeviceByUserId(String clientId) { + // 1. 入参校验 if (!StringUtils.hasText(clientId)) { - log.error("【批量取消】clientId不能为空"); + log.error("【全量订阅】clientId不能为空"); throw new IllegalArgumentException("clientId不能为空"); } - // 适配低版本的Redis连接可用性校验(替换掉isRunning()) + + // 2. Redis连接可用性校验 try { stringRedisTemplate.hasKey("test:connection"); } catch (Exception e) { - log.warn("Redis连接不可用,取消订阅操作跳过:{}", e.getMessage()); + log.warn("【全量订阅】Redis连接不可用,订阅操作跳过:{}", e.getMessage()); + return 0; + } + Long userId = SecurityUtils.getLoginUser().getUserId(); + // 3. 查询该用户名下的所有设备ID(替换为你的实际设备查询逻辑) + List deviceIds = queryImeiByUserId(userId); + if (userId == 1) { + deviceIds.add("862538065276061"); + } + if (deviceIds == null || deviceIds.isEmpty()) { + log.warn("【全量订阅】用户{}名下无可用设备", userId); + return 0; + } + // 过滤空设备ID,避免无效操作 + List validDeviceIds = deviceIds.stream() + .filter(StringUtils::hasText) + .distinct() + .collect(Collectors.toList()); + if (validDeviceIds.isEmpty()) { + log.warn("【全量订阅】用户{}名下无有效设备ID", userId); + return 0; + } + + // 4. 批量写入Redis订阅关系(兼容JDK 8的RedisCallback写法) + try { + stringRedisTemplate.execute(new RedisCallback() { + @Override + public Void doInRedis(RedisConnection connection) throws DataAccessException { + // 获取String序列化器(和stringRedisTemplate保持一致) + RedisSerializer serializer = stringRedisTemplate.getStringSerializer(); + + // 开启Redis事务 + connection.multi(); + + // 4.1 设备→前端:给每个设备的订阅集合添加clientId + for (String deviceId : validDeviceIds) { + byte[] subKey = serializer.serialize("sub:" + deviceId); + byte[] clientIdBytes = serializer.serialize(clientId); + connection.sAdd(subKey, clientIdBytes); + } + + // 4.2 前端→设备:给前端的订阅集合批量添加所有设备ID + byte[] subcKey = serializer.serialize("subc:" + clientId); + byte[][] deviceIdBytesArray = new byte[validDeviceIds.size()][]; + for (int i = 0; i < validDeviceIds.size(); i++) { + deviceIdBytesArray[i] = serializer.serialize(validDeviceIds.get(i)); + } + connection.sAdd(subcKey, deviceIdBytesArray); + + // 执行事务 + connection.exec(); + return null; + } + }); + + log.info("【全量订阅】前端{}成功订阅用户{}名下的{}个设备,设备列表:{}", + clientId, userId, validDeviceIds.size(), validDeviceIds); + return validDeviceIds.size(); + } catch (Exception e) { + log.error("【全量订阅】前端{}订阅用户{}名下设备失败", clientId, userId, e); + throw new RuntimeException("全量订阅失败:" + e.getMessage()); + } + } + + /** + * 全量取消:前端取消订阅的所有设备(即用户名下所有设备) + * @param clientId 前端唯一标识 + * @return 需要前端取消监听的MQTT主题列表 + */ + public List unsubscribeAllDevice(String clientId) { + // 1. 入参校验 + if (!StringUtils.hasText(clientId)) { + log.error("【全量取消】clientId不能为空"); + throw new IllegalArgumentException("clientId不能为空"); + } + + // 2. Redis连接可用性校验 + try { + stringRedisTemplate.hasKey("test:connection"); + } catch (Exception e) { + log.warn("【全量取消】Redis连接不可用,取消操作跳过:{}", e.getMessage()); return Collections.emptyList(); } - // 步骤1:查询该前端订阅的所有设备ID(生产环境用Scan替代Keys,避免阻塞Redis) + // 3. 查询该前端订阅的所有设备ID(即用户名下所有设备) Set deviceSet = stringRedisTemplate.opsForSet().members("subc:" + clientId); if (deviceSet == null || deviceSet.isEmpty()) { + log.warn("【全量取消】前端{}无订阅的设备", clientId); return Collections.emptyList(); } + // 4. 构建需要取消的MQTT主题列表 List frontendTopics = new ArrayList<>(); - for (String deviceId : deviceSet) { - String subKey = "sub:" + deviceId; - stringRedisTemplate.opsForSet().remove(subKey, clientId); - String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; - frontendTopics.add(frontendTopic); - log.info("【批量取消】前端{}取消设备{}订阅", clientId, deviceId); + frontendTopics.add("frontend/" + clientId + "/dtu/" + deviceId + "/listener"); } - // 删除反向索引 - stringRedisTemplate.delete("subc:" + clientId); + // 5. 批量删除Redis订阅关系(兼容JDK 8的RedisCallback写法) + try { + stringRedisTemplate.execute(new RedisCallback() { + @Override + public Void doInRedis(RedisConnection connection) throws DataAccessException { + RedisSerializer serializer = stringRedisTemplate.getStringSerializer(); - // 步骤2:清理该前端的分布式锁(可选,防止死锁) - Set lockKeys = scanRedisKeys("lock:*"); - if (lockKeys != null && !lockKeys.isEmpty()) { - for (String lockKey : lockKeys) { - String lockValue = stringRedisTemplate.opsForValue().get(lockKey); - if (clientId.equals(lockValue)) { - stringRedisTemplate.delete(lockKey); - log.info("【批量取消】清理前端{}持有的锁:{}", clientId, lockKey); + // 开启事务 + connection.multi(); + + // 5.1 批量删除设备→前端的订阅关系 + for (String deviceId : deviceSet) { + byte[] subKey = serializer.serialize("sub:" + deviceId); + byte[] clientIdBytes = serializer.serialize(clientId); + connection.sRem(subKey, clientIdBytes); + } + + // 5.2 删除前端→设备的反向索引(核心:清空该前端的所有订阅设备) + byte[] subcKey = serializer.serialize("subc:" + clientId); + connection.del(subcKey); + + // 执行事务 + connection.exec(); + return null; } - } + }); + } catch (Exception e) { + log.error("【全量取消】Redis批量删除失败", e); + throw new RuntimeException("全量取消订阅失败:" + e.getMessage()); } - log.info("【批量取消】前端{}共取消{}个设备订阅", clientId, deviceSet.size()); + // 6. 清理该前端持有的分布式锁(精准扫描,提升效率) + Set lockKeys = scanRedisKeys("lock:*:" + clientId); + if (lockKeys != null && !lockKeys.isEmpty()) { + stringRedisTemplate.delete(lockKeys); // 批量删除锁,无需遍历 + log.info("【全量取消】清理前端{}持有的{}个分布式锁", clientId, lockKeys.size()); + } + + log.info("【全量取消】前端{}成功取消{}个设备的订阅", clientId, deviceSet.size()); return frontendTopics; } - // 生产环境用Scan替代Keys,避免Redis阻塞 + // 保留原有scanRedisKeys方法(兼容JDK 8) private Set scanRedisKeys(String pattern) { Set keys = new HashSet<>(); try { - stringRedisTemplate.executeWithStickyConnection((RedisConnection connection) -> { - ScanOptions scanOptions = ScanOptions.scanOptions() - .match(pattern) - .count(100) - .build(); - Cursor cursor = connection.scan(scanOptions); - while (cursor.hasNext()) { - byte[] keyBytes = cursor.next(); - String key = stringRedisTemplate.getStringSerializer().deserialize(keyBytes); - if (key != null) { - keys.add(key); + stringRedisTemplate.execute(new RedisCallback() { + @Override + public Void doInRedis(RedisConnection connection) throws DataAccessException { + RedisSerializer serializer = stringRedisTemplate.getStringSerializer(); + ScanOptions scanOptions = ScanOptions.scanOptions() + .match(pattern) + .count(100) + .build(); + Cursor cursor = connection.scan(scanOptions); + while (cursor.hasNext()) { + byte[] keyBytes = cursor.next(); + String key = serializer.deserialize(keyBytes); + if (key != null) { + keys.add(key); + } } + cursor.close(); + return null; } - cursor.close(); - return null; }); } catch (Exception e) { log.error("Redis Scan查询失败,pattern={}", pattern, e); @@ -635,6 +764,21 @@ public class MqttMessageHandler implements SmartLifecycle { return keys; } + /** + * 实际业务中:查询指定用户名下的所有设备ID(需替换为你的DAO/Service逻辑) + * @return 设备ID列表 + */ + private List queryImeiByUserId(Long userId) { + // 示例:替换为你项目中查询用户设备的实际代码 + // 比如:return deviceService.listDeviceIdsByUserId(userId); + List agriInfos = agriInfoService.lambdaQuery() + .eq(SysAgriInfo::getUserId, userId) + .list(); + if (CollectionUtils.isEmpty(agriInfos)) { + return Collections.emptyList(); + } + return agriInfos.stream().map(SysAgriInfo::getImei).collect(Collectors.toList()); + } // ========== 手动重连接口(供Controller调用) ========== /**