diff --git a/agri-common/src/main/java/com/agri/common/utils/SecurityUtils.java b/agri-common/src/main/java/com/agri/common/utils/SecurityUtils.java index d439ccb..65b1d44 100644 --- a/agri-common/src/main/java/com/agri/common/utils/SecurityUtils.java +++ b/agri-common/src/main/java/com/agri/common/utils/SecurityUtils.java @@ -33,7 +33,11 @@ public class SecurityUtils { try { - return getLoginUser().getUserId(); + LoginUser loginUser = getLoginUser(); + if (loginUser!= null) { + return loginUser.getUserId(); + } + return null; } catch (Exception e) { @@ -63,7 +67,11 @@ public class SecurityUtils { try { - return getLoginUser().getUsername(); + LoginUser loginUser = getLoginUser(); + if (loginUser !=null ) { + return loginUser.getUsername(); + } + return null; } catch (Exception e) { @@ -76,16 +84,18 @@ public class SecurityUtils **/ public static LoginUser getLoginUser() { - LoginUser loginUser = null; - try - { - loginUser = (LoginUser) getAuthentication().getPrincipal(); - } - catch (Exception e) - { + try { + Authentication authentication = getAuthentication(); + if (authentication == null || !(authentication.getPrincipal() instanceof LoginUser)) { + return null; // 无用户上下文时返回 null + } + return (LoginUser) authentication.getPrincipal(); + + } catch (Exception e) { + log.error("获取用户信息异常: {}", HttpStatus.UNAUTHORIZED); } - return loginUser; + return null; } /** diff --git a/agri-framework/src/main/java/com/agri/framework/config/ThreadPoolConfig.java b/agri-framework/src/main/java/com/agri/framework/config/ThreadPoolConfig.java index 027fc68..bded484 100644 --- a/agri-framework/src/main/java/com/agri/framework/config/ThreadPoolConfig.java +++ b/agri-framework/src/main/java/com/agri/framework/config/ThreadPoolConfig.java @@ -5,6 +5,8 @@ import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; @@ -60,4 +62,17 @@ public class ThreadPoolConfig } }; } + + @Bean("mqttPushExecutor") + public Executor mqttPushExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(10); + executor.setMaxPoolSize(50); + executor.setQueueCapacity(2000); + executor.setThreadNamePrefix("mqtt-push-"); + executor.setWaitForTasksToCompleteOnShutdown(true); + executor.setAwaitTerminationSeconds(60); + executor.initialize(); + return executor; + } } diff --git a/agri-framework/src/main/java/com/agri/framework/manager/MqttSubscriptionManager.java b/agri-framework/src/main/java/com/agri/framework/manager/MqttSubscriptionManager.java index 977d489..80a7644 100644 --- a/agri-framework/src/main/java/com/agri/framework/manager/MqttSubscriptionManager.java +++ b/agri-framework/src/main/java/com/agri/framework/manager/MqttSubscriptionManager.java @@ -186,7 +186,7 @@ public class MqttSubscriptionManager { } // 3. 查询该用户名下的所有设备ID(替换为你的实际设备查询逻辑) - List deviceIds = new ArrayList<>(queryImeiByUserId(userId)); + List deviceIds = new ArrayList<>(agriInfoService.queryImeiByUserId(userId)); if (userId == 1) { deviceIds.add("862538065276061"); } @@ -313,21 +313,5 @@ public class MqttSubscriptionManager { return frontendTopics; } - /** - * 实际业务中:查询指定用户名下的所有设备ID(需替换为你的DAO/Service逻辑) - * @return 设备ID列表 - */ - private List queryImeiByUserId(Long userId) { - // 示例:替换为你项目中查询用户设备的实际代码 - // 比如:return deviceService.listDeviceIdsByUserId(userId); - SysAgriInfo sysAgriInfo = new SysAgriInfo(); - if (!SecurityUtils.isAdmin()) { - sysAgriInfo.setUserId(userId); - } - List agriInfos = agriInfoService.findAgriByUser(sysAgriInfo); - if (CollectionUtils.isEmpty(agriInfos)) { - return Collections.emptyList(); - } - return agriInfos.stream().map(SysAgriInfo::getImei).collect(Collectors.toList()); - } + } \ No newline at end of file diff --git a/agri-quartz/src/main/java/com/agri/quartz/task/AgriStatusTask.java b/agri-quartz/src/main/java/com/agri/quartz/task/AgriStatusTask.java index 2766eb2..c792ee3 100644 --- a/agri-quartz/src/main/java/com/agri/quartz/task/AgriStatusTask.java +++ b/agri-quartz/src/main/java/com/agri/quartz/task/AgriStatusTask.java @@ -1,14 +1,27 @@ package com.agri.quartz.task; -import com.agri.system.domain.SysDevOperLog; -import org.springframework.data.redis.core.RedisTemplate; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.agri.framework.config.MqttConfig; +import com.agri.system.service.ISysAgriInfoService; +import com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.core.*; +import org.springframework.data.redis.core.script.DefaultRedisScript; +import org.springframework.data.redis.serializer.StringRedisSerializer; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import javax.annotation.Resource; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Date; -import java.util.List; +import java.nio.charset.StandardCharsets; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.concurrent.TimeUnit; /** * 在线状态监测 告警和离线 @@ -16,46 +29,265 @@ import java.util.List; @Component public class AgriStatusTask { + private static final Logger log = LoggerFactory.getLogger(AgriStatusTask.class); + + // Redis 前缀常量 + private static final String SUB_KEY_PREFIX = "sub:"; + private static final String LOCK_KEY = "lock:device:online:push"; - // 注入RedisTemplate(Spring Boot自动配置) @Resource - private RedisTemplate redisTemplate; + private StringRedisTemplate stringRedisTemplate; - // 模拟从数据库/配置获取所有设备IMEI列表(实际可替换为DB查询) - private List getAllDeviceImeiList() { - // 示例:从数据库查询所有设备IMEI - // return deviceMapper.listAllImei(); - return Arrays.asList("861234567890123", "869876543210987", "860000000000000"); - } - public void checkDeviceOnlineStatus() { - // 步骤1:获取所有需要检查的设备IMEI列表(一次IO,从DB/配置读取) - List allImeiList = getAllDeviceImeiList(); - if (allImeiList.isEmpty()) { - System.out.println("无设备需要检查"); + @Resource + private MqttConfig.MqttMessageSender mqttMessageSender; + + @Value("${spring.mqtt.dtu-ctl-lock-ttl:15}") + private int lockTtl; + + @Autowired + private ISysAgriInfoService agriInfoService; + + // JSON序列化工具(单例) + private final ObjectMapper objectMapper = new ObjectMapper(); + + private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + /** + * 定时任务:每10秒执行一次 + * 1. 扫描所有 sub: 开头的 key,提取 IMEI + * 2. 批量查询设备在线状态 + * 3. 异步批量推送在线状态到 MQTT 主题 frontend/{imei}/online + */ + public void pushOnlineStatus() { + // 1. 加分布式锁,避免集群重复执行 + Boolean lockSuccess = stringRedisTemplate.opsForValue() + .setIfAbsent(LOCK_KEY, "running", lockTtl, TimeUnit.SECONDS); + + // 补充:处理Redis连接异常的情况 + if (lockSuccess == null) { + log.error("获取分布式锁失败:Redis连接异常"); return; } - - // 步骤2:批量查询Redis(核心!仅一次IO操作) - // MGET命令:批量获取多个Key的值,返回值列表与入参Key列表一一对应 - List redisValues = redisTemplate.opsForValue().multiGet(allImeiList); - - // 步骤3:解析结果,区分在线/离线设备 - List onlineImeiList = new ArrayList<>(); - List offlineImeiList = new ArrayList<>(); - - for (int i = 0; i < allImeiList.size(); i++) { - String imei = allImeiList.get(i); - // Redis中Key存在则值不为null(我们存的是"online"),不存在则为null - if (redisValues.get(i) != null) { - onlineImeiList.add(imei); - } else { - offlineImeiList.add(imei); + if (!lockSuccess) { + log.debug("其他节点正在执行,跳过本次推送"); + return; + } + long startTime = System.currentTimeMillis(); + try { + // 2. 安全遍历所有 sub: 开头的 key,提取设备 IMEI +// List allDeviceImeiList = scanAllSubDeviceImei(); +// if (allDeviceImeiList.isEmpty()) { +// log.info("没有找到任何设备订阅记录,结束任务"); +// return; +// } + // 查询大棚列表所有在线设备 + List imeiList = agriInfoService.queryImeiByUserId(null); + if (imeiList.isEmpty()) { + log.info("大棚表无数据,结束推送"); + return; } + log.info("从大棚表获取到合法IMEI总数:{}", imeiList.size()); + + // 3. 批量查询设备在线状态(Redis Pipeline,一次网络往返) + asyncBatchPushMqtt(batchCheckDeviceOnline(imeiList)); + } catch (Exception e) { + log.error("设备在线状态推送任务异常", e); + // 可选:异常告警(如企业微信/钉钉) + // WxUtil.pushText("【设备在线状态推送异常】\n" + e.getMessage()); + } finally { + // 释放锁(可选,也可依赖TTL自动过期) + stringRedisTemplate.delete(LOCK_KEY); + log.info("设备在线状态推送任务完成,耗时:{}ms", System.currentTimeMillis() - startTime); + } + } + + // ========== 批量查在线状态(Pipeline 优化版,JDK 8 适配) ========== + // 在线离线的都得推 + private Map> batchCheckDeviceOnline(List imeiList) { + Map> result = new HashMap<>(); + if (imeiList.isEmpty()) { + return result; } + // JDK 8 显式声明 RedisCallback,避免 Lambda 泛型问题 + List results = stringRedisTemplate.executePipelined( + new RedisCallback() { + @Override + public Object doInRedis(RedisConnection connection) { + StringRedisSerializer serializer = new StringRedisSerializer(); + for (String imei : imeiList) { + byte[] onlineKeyBytes = serializer.serialize(SUB_KEY_PREFIX + imei); + connection.exists(onlineKeyBytes); // 批量执行 exists + connection.exists(serializer.serialize(imei)); + } + return null; + } + }, + new StringRedisSerializer() + ); - // 步骤4:处理结果(示例:打印日志,实际可写入DB/推送告警等) - System.out.println("[" + new Date() + "] 在线设备:" + onlineImeiList); - System.out.println("[" + new Date() + "] 离线设备:" + offlineImeiList); + // 解析结果:每两个结果对应一个IMEI(subExist + imeiOnline) + for (int i = 0; i < imeiList.size(); i++) { + String imei = imeiList.get(i); + // 初始化默认状态:不存在+离线 + boolean subExist = false; + boolean imeiOnline = false; + + // 越界判断:避免IndexOutOfBoundsException + int subIndex = i * 2; + int imeiIndex = i * 2 + 1; + if (subIndex < results.size()) { + Object subResult = results.get(subIndex); + subExist = parseExistsResult(subResult); + } + if (imeiIndex < results.size()) { + Object imeiResult = results.get(imeiIndex); + imeiOnline = parseExistsResult(imeiResult); + } + result.put(imei, ImmutableMap.of("subExist", subExist, "imeiOnline", imeiOnline)); + } + return result; + } + private boolean parseExistsResult(Object result) { + if (result instanceof Long) { + return ((Long) result) == 1; + } else if (result instanceof Boolean) { + return (Boolean) result; + } + return false; + } + // ========== 核心方法3:异步批量推送在线状态到 MQTT(线程池隔离) ========== + @Async("mqttPushExecutor") + public void asyncBatchPushMqtt(Map> statusMap) { + if (statusMap.isEmpty()) { + log.info("不存在任何imei"); + return; + } + int successCount = 0; + int failCount = 0; + String dateNow = LocalDateTime.now().format(DATE_TIME_FORMATTER); + // 在线状态 + for (Map.Entry> map : statusMap.entrySet()) { + String imei = map.getKey(); + try { + // 按你的需求,直接推送到 frontend/{imei}/online 主题 + Map imeiMap = map.getValue(); + + // 设备在线的 && 推送首页状态 离线在线都推 + if (imeiMap.get("subExist")) { + // 构造首页消息(用ObjectMapper序列化,避免手动拼接JSON) + Map onlineMsg = new HashMap<>(); + onlineMsg.put("online", imeiMap.get("imeiOnline") ? "在线" : "离线"); + onlineMsg.put("time", dateNow); // 毫秒时间戳 + String onlineMessage = objectMapper.writeValueAsString(onlineMsg); + mqttMessageSender.publish("frontend/" + imei + "/online", onlineMessage); + } + // 无论设备是否在线 只要离线就推送设备状态 + if (!imeiMap.get("imeiOnline")) { + Map alarmMsg = new HashMap<>(); + alarmMsg.put("online", "设备离线"); + alarmMsg.put("time", dateNow); + String alarmMessage = objectMapper.writeValueAsString(alarmMsg); + mqttMessageSender.publish("frontend/" + imei + "/alarm", alarmMessage); + } + successCount++; + } catch (Exception e) { + failCount++; + log.error("向设备 {} 推送在线状态失败", imei, e); + } + + } + + log.info("批量在线状态推送完成:成功={},失败={}", successCount, failCount); + } + + + /* + * 企业级Lua方案:基于大棚表IMEI列表,批量查sub:{imei}是否存在 + * 核心:1次Redis网络往返完成所有查询,极致性能 + 数据合法 + * @return 合法大棚IMEI的在线状态(key=IMEI,value=是否在线) + */ + public Map getGreenhouseOnlineStatusByLua() { + // 1. 从大棚表获取所有合法IMEI + List allGreenhouseImeiList = agriInfoService.queryImeiByUserId(null); + if (allGreenhouseImeiList.isEmpty()) { + log.info("大棚表无合法IMEI,返回空"); + return new HashMap<>(); + } + + // 2. 简化版Lua脚本(批量查sub:{imei}是否存在) + String luaScript = "" + + "local imeiList = ARGV\n" + + "local prefix = KEYS[1]\n" + + "local result = {}\n" + + "for _, imei in ipairs(imeiList) do\n" + + " local key = prefix .. imei\n" + + " local isOnline = redis.call('EXISTS', key)\n" + + " table.insert(result, imei)\n" + + " table.insert(result, tostring(isOnline))\n" + + "end\n" + + "return result"; + + // ===== 核心修改:保留你指定的写法,仅做JDK 8兼容 ===== + DefaultRedisScript redisScript = new DefaultRedisScript<>(); + redisScript.setScriptText(luaScript); + redisScript.setResultType(List.class); // 保留原有写法 + + // 3. 构造参数 + List keys = Collections.singletonList(SUB_KEY_PREFIX); + List args = allGreenhouseImeiList; + + try { + // 4. 执行脚本(JDK 8 强制类型转换,安全兼容) + List resultList = stringRedisTemplate.execute(redisScript, keys, args.toArray(new String[0])); + + // 5. 解析结果(兼容JDK 8 原始List类型) + Map onlineStatusMap = new HashMap<>(); + if (resultList != null && resultList.size() >= 2) { + for (int i = 0; i < resultList.size(); i += 2) { + // JDK 8 显式转换为String,避免类型异常 + String imei = String.valueOf(resultList.get(i)); + String isOnlineStr = String.valueOf(resultList.get(i + 1)); + onlineStatusMap.put(imei, "1".equals(isOnlineStr)); + } + } + + log.info("Lua查询完成:合法IMEI数={},在线数={}", + allGreenhouseImeiList.size(), + onlineStatusMap.values().stream().filter(Boolean::booleanValue).count()); + return onlineStatusMap; + } catch (Exception e) { + log.error("Lua脚本执行失败", e); + return new HashMap<>(); + } + } + + + // ========== 方式1:用 ScanCursor 遍历 sub:* Key ========== + private List scanAllSubDeviceImei() { + List imeiList = new ArrayList<>(); + // count 建议一次扫描1000个槽位 + ScanOptions scanOptions = ScanOptions.scanOptions() + .match(SUB_KEY_PREFIX + "*") + .count(1000) + .build(); + + // JDK 8 需显式声明 Cursor 泛型,try-with-resources 自动关闭游标 + try (Cursor cursor = stringRedisTemplate.getConnectionFactory() + .getConnection() + .scan(scanOptions)) { + + while (cursor.hasNext()) { + byte[] keyBytes = cursor.next(); + String key = new String(keyBytes, StandardCharsets.UTF_8); + if (key.startsWith(SUB_KEY_PREFIX)) { + String imei = key.substring(SUB_KEY_PREFIX.length()); + imeiList.add(imei); + } + } + } catch (Exception e) { + log.error("Cursor 扫描 sub: 前缀 key 失败", e); + } + return imeiList; } } diff --git a/agri-system/src/main/java/com/agri/system/service/ISysAgriInfoService.java b/agri-system/src/main/java/com/agri/system/service/ISysAgriInfoService.java index 1484afd..776cbe8 100644 --- a/agri-system/src/main/java/com/agri/system/service/ISysAgriInfoService.java +++ b/agri-system/src/main/java/com/agri/system/service/ISysAgriInfoService.java @@ -71,4 +71,7 @@ public interface ISysAgriInfoService extends IService { Map addAgriFromMobile(SysAgriInfo sysAgriInfo); List findAgriOfAutoInfo(); + + + List queryImeiByUserId(Long userId); } diff --git a/agri-system/src/main/java/com/agri/system/service/impl/SysAgriInfoServiceImpl.java b/agri-system/src/main/java/com/agri/system/service/impl/SysAgriInfoServiceImpl.java index 27d1e6c..74a9114 100644 --- a/agri-system/src/main/java/com/agri/system/service/impl/SysAgriInfoServiceImpl.java +++ b/agri-system/src/main/java/com/agri/system/service/impl/SysAgriInfoServiceImpl.java @@ -19,6 +19,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.util.*; +import java.util.stream.Collectors; /** * 大棚管理Service业务层处理 @@ -262,4 +263,23 @@ public class SysAgriInfoServiceImpl extends ServiceImpl queryImeiByUserId(Long userId) { + + SysAgriInfo sysAgriInfo = new SysAgriInfo(); + if (!SecurityUtils.isAdmin()) { + sysAgriInfo.setUserId(userId); + } + List agriInfos = baseMapper.findAgriByUser(sysAgriInfo); + if (CollectionUtils.isEmpty(agriInfos)) { + return Collections.emptyList(); + } + return agriInfos.stream().map(SysAgriInfo::getImei).collect(Collectors.toList()); + } } diff --git a/agri-system/src/main/java/com/agri/system/service/impl/SysAutoTermServiceImpl.java b/agri-system/src/main/java/com/agri/system/service/impl/SysAutoTermServiceImpl.java index 9beaf7d..ae18c87 100644 --- a/agri-system/src/main/java/com/agri/system/service/impl/SysAutoTermServiceImpl.java +++ b/agri-system/src/main/java/com/agri/system/service/impl/SysAutoTermServiceImpl.java @@ -156,6 +156,11 @@ public class SysAutoTermServiceImpl extends ServiceImpl0) + || (config.getAutoTotalLen() != null && config.getAutoTotalLen().compareTo(BigDecimal.valueOf(300))>0); + if (isValidLen) { + return roller + "风口长度设置不能大于300cm!"; + } if (config.getReservedLen() == null || config.getReservedLen().compareTo(BigDecimal.ZERO)<=0) { return roller + "预留风口长度未设置,请点击相应页签右上角设置后重试!"; }