diff --git a/agri-framework/src/main/java/com/agri/framework/manager/MqttAutoOffManager.java b/agri-framework/src/main/java/com/agri/framework/manager/MqttAutoOffManager.java index 01bdf5a..911e6d1 100644 --- a/agri-framework/src/main/java/com/agri/framework/manager/MqttAutoOffManager.java +++ b/agri-framework/src/main/java/com/agri/framework/manager/MqttAutoOffManager.java @@ -219,7 +219,6 @@ public class MqttAutoOffManager { log.warn("【自动关任务】无最新状态,跳过:deviceId={}, funcType={}", deviceId, funcType); return; } - JSONObject latestObj; try { latestObj = JSON.parseObject(latest); @@ -244,8 +243,11 @@ public class MqttAutoOffManager { } catch (Exception ignore) { skipReason = "【自动关任务】最新状态功能码获取失败"; } - if (current == null || current!=1) { - skipReason = "【自动关任务】检测未运行或状态未知"; + if (current == null) { + skipReason = "【自动关任务】状态未知"; + } + if (current != 1) { + skipReason = "【自动关任务】未运行"; } sysDevOperLogService.lambdaUpdate() .eq(SysDevOperLog::getImei, deviceId) @@ -256,49 +258,48 @@ public class MqttAutoOffManager { .eq(SysDevOperLog::getIsTask, 1) .orderByDesc(SysDevOperLog::getCreateTime) .last("LIMIT 1") - .set(SysDevOperLog::getExecResult,1) - .set(SysDevOperLog::getLatestState, latest) - .set(SysDevOperLog::getUpdateBy,"自动关") - .set(SysDevOperLog::getSkipReason, skipReason.isEmpty()?"解析报错":skipReason) + .set(SysDevOperLog::getExecResult,skipReason.isEmpty()?1:0) + .set(SysDevOperLog::getUpdateBy,"自动关更新数据") + .set(SysDevOperLog::getSkipReason, skipReason.isEmpty()?"执行成功":skipReason) .update(); - - if (current != null && current == 1) { - // 新增:自动关也走分布式锁(避免与前端并发控制同一功能导致乱序/互相覆盖) - String lockKey = "lock:" + deviceId + ":" + funcType; - Boolean lockSuccess = stringRedisTemplate.opsForValue().setIfAbsent( - lockKey, "autooff", dtuCtlLockTTL, TimeUnit.SECONDS - ); - if (lockSuccess == null || !lockSuccess) { - log.info("【自动关任务】{}功能忙(锁占用),跳过自动关闭:deviceId={}, funcType={}", funcType, deviceId, funcType); - return; - } - String deviceTopic = "dtu/" + deviceId + "/down"; - JSONObject down = new JSONObject(); - down.put(funcType, 0); - mqttMessageSender.publish(deviceTopic, down.toJSONString()); - - SysAgriInfo agriInfo = sysAgriInfoService.lambdaQuery() - .eq(SysAgriInfo::getImei, deviceId) - .one(); - String agriName = (agriInfo!=null && ObjectUtils.isNotEmpty(agriInfo.getAgriName()))?agriInfo.getAgriName():null; - SysDevOperLog logDto = new SysDevOperLog(); - logDto.setAgriName(agriName); - logDto.setImei(deviceId); - logDto.setFuncCode(funcType); - logDto.setOpType(0); - logDto.setOpSource(2); - logDto.setPayload(down.toJSONString()); - logDto.setLockAcquired(1); - logDto.setLockHolder("autoOff"); - logDto.setExecResult(1); - logDto.setLatestState(latest); - logDto.setCreateBy("自动关"); - logDto.setTaskStatus(getFutureStatus().toString()); - sysDevOperLogService.save(logDto); - log.info("【自动关任务】检测仍在运行,已下发关闭:deviceId={}, funcType={}, payload={}", deviceId, funcType, down.toJSONString()); - } else { + if (!skipReason.isEmpty()) { log.info("【自动关任务】检测未运行或状态未知,跳过关闭:deviceId={}, funcType={}, current={}", deviceId, funcType, current); + return; } + + // 新增:自动关也走分布式锁(避免与前端并发控制同一功能导致乱序/互相覆盖) + String lockKey = "lock:" + deviceId + ":" + funcType; + Boolean lockSuccess = stringRedisTemplate.opsForValue().setIfAbsent( + lockKey, "autooff", dtuCtlLockTTL, TimeUnit.SECONDS + ); + if (lockSuccess == null || !lockSuccess) { + log.info("【自动关任务】{}功能忙(锁占用),跳过自动关闭:deviceId={}, funcType={}", funcType, deviceId, funcType); + return; + } + String deviceTopic = "dtu/" + deviceId + "/down"; + JSONObject down = new JSONObject(); + down.put(funcType, 0); + mqttMessageSender.publish(deviceTopic, down.toJSONString()); + SysAgriInfo agriInfo = sysAgriInfoService.lambdaQuery() + .eq(SysAgriInfo::getImei, deviceId) + .one(); + String agriName = (agriInfo!=null && ObjectUtils.isNotEmpty(agriInfo.getAgriName()))?agriInfo.getAgriName():null; + SysDevOperLog logDto = new SysDevOperLog(); + logDto.setAgriName(agriName); + logDto.setImei(deviceId); + logDto.setFuncCode(funcType); + logDto.setOpType(0); + logDto.setOpSource(2); + logDto.setPayload(down.toJSONString()); + logDto.setLockAcquired(1); + logDto.setLockHolder("autoOff"); + logDto.setExecResult(1); + logDto.setLatestState(latest); + logDto.setCreateBy("自动关"); + logDto.setTaskStatus(getFutureStatus().toString()); + sysDevOperLogService.save(logDto); + log.info("【自动关任务】检测仍在运行,已下发关闭:deviceId={}, funcType={}, payload={}", deviceId, funcType, down.toJSONString()); + } // 新增:收到“关”指令时,尝试取消对应自动关任务(优化:减少无意义任务执行;正确性仍以到点状态判断为准) diff --git a/agri-quartz/src/main/java/com/agri/quartz/task/AgriStatusTask.java b/agri-quartz/src/main/java/com/agri/quartz/task/AgriStatusTask.java index 792041c..2766eb2 100644 --- a/agri-quartz/src/main/java/com/agri/quartz/task/AgriStatusTask.java +++ b/agri-quartz/src/main/java/com/agri/quartz/task/AgriStatusTask.java @@ -1,4 +1,61 @@ package com.agri.quartz.task; +import com.agri.system.domain.SysDevOperLog; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; + +/** + * 在线状态监测 告警和离线 + */ +@Component public class AgriStatusTask { + + + // 注入RedisTemplate(Spring Boot自动配置) + @Resource + private RedisTemplate redisTemplate; + + // 模拟从数据库/配置获取所有设备IMEI列表(实际可替换为DB查询) + private List getAllDeviceImeiList() { + // 示例:从数据库查询所有设备IMEI + // return deviceMapper.listAllImei(); + return Arrays.asList("861234567890123", "869876543210987", "860000000000000"); + } + public void checkDeviceOnlineStatus() { + // 步骤1:获取所有需要检查的设备IMEI列表(一次IO,从DB/配置读取) + List allImeiList = getAllDeviceImeiList(); + if (allImeiList.isEmpty()) { + System.out.println("无设备需要检查"); + return; + } + + // 步骤2:批量查询Redis(核心!仅一次IO操作) + // MGET命令:批量获取多个Key的值,返回值列表与入参Key列表一一对应 + List redisValues = redisTemplate.opsForValue().multiGet(allImeiList); + + // 步骤3:解析结果,区分在线/离线设备 + List onlineImeiList = new ArrayList<>(); + List offlineImeiList = new ArrayList<>(); + + for (int i = 0; i < allImeiList.size(); i++) { + String imei = allImeiList.get(i); + // Redis中Key存在则值不为null(我们存的是"online"),不存在则为null + if (redisValues.get(i) != null) { + onlineImeiList.add(imei); + } else { + offlineImeiList.add(imei); + } + } + + + // 步骤4:处理结果(示例:打印日志,实际可写入DB/推送告警等) + System.out.println("[" + new Date() + "] 在线设备:" + onlineImeiList); + System.out.println("[" + new Date() + "] 离线设备:" + offlineImeiList); + } } diff --git a/agri-quartz/src/main/java/com/agri/quartz/task/RollerAutoTask.java b/agri-quartz/src/main/java/com/agri/quartz/task/RollerAutoTask.java index d3a8a37..99d4fb1 100644 --- a/agri-quartz/src/main/java/com/agri/quartz/task/RollerAutoTask.java +++ b/agri-quartz/src/main/java/com/agri/quartz/task/RollerAutoTask.java @@ -1,16 +1,29 @@ package com.agri.quartz.task; +import com.agri.common.core.domain.entity.SysUser; import com.agri.common.enums.TempCommandStatus; +import com.agri.common.utils.RollerTimeCalculator; import com.agri.common.utils.TempJudgeUtil; import com.agri.common.utils.TimeConvertUtil; import com.agri.common.utils.TimeRangeUtil; +import com.agri.framework.config.MqttConfig; +import com.agri.framework.interceptor.FrontendControlHandler; +import com.agri.framework.manager.MqttAutoOffManager; import com.agri.system.domain.SysAgriInfo; +import com.agri.system.domain.SysDevOperLog; import com.agri.system.domain.vo.RollerTermVO; import com.agri.system.service.ISysAgriInfoService; +import com.agri.system.service.ISysDevOperLogService; import com.agri.system.service.ISysDtuDataService; import com.agri.system.service.ISysRollerParamService; +import com.baomidou.mybatisplus.core.toolkit.ObjectUtils; import org.apache.commons.collections4.CollectionUtils; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -18,6 +31,8 @@ import java.math.BigDecimal; import java.time.LocalDateTime; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; /** @@ -44,7 +59,25 @@ public class RollerAutoTask { @Resource private MqttConfig.MqttMessageSender mqttMessageSender; + @Autowired + private ISysDevOperLogService devOperLogService; + @Autowired + private MqttAutoOffManager autoOffManager; + + /** + * Redis模板,用于存储订阅关系、设备在线状态、分布式锁 + */ + @Resource + private StringRedisTemplate stringRedisTemplate; + + /** + * 优化:统一使用SLF4J日志(JDK 8兼容) + */ + private static final Logger log = LoggerFactory.getLogger(RollerAutoTask.class); + + @Value("${spring.mqtt.dtu-ctl-lock-ttl}") + private int dtuCtlLockTTL; public void checkAutoTerm() { // 查询自动模式的大棚 @@ -62,7 +95,8 @@ public class RollerAutoTask { List> dtuDataList = dtuDataService.getLastDtuDataByImeiList(imeiList); if (CollectionUtils.isEmpty(dtuDataList)) return; // 根据温湿度imei分组 - Map>> dtuDataByImeiMap = dtuDataList.stream().collect(Collectors.groupingBy(map -> (String) map.get("imei"))); + Map>> dtuDataByImeiMap + = dtuDataList.stream().collect(Collectors.groupingBy(map -> (String) map.get("imei"))); // 获取所有开启自动化模式的大棚列表的卷膜参数和条件设置 List rollerTermList = rollerParamService.getRollerTerms(imeiList); @@ -77,6 +111,8 @@ public class RollerAutoTask { Collectors.groupingBy(RollerTermVO::getRoller) )); + // 查询每个IMEI今天的第一条日志 + Map> todayLogCountByImeiMap = devOperLogService.getTodayLogCountByImeiMap(imeiList); // 循环所有开启自动化的大棚 for (String imei : imeiList) { // 获取温湿度指定imei的数据 @@ -86,20 +122,36 @@ public class RollerAutoTask { // todo 该大棚下1分钟内无最新温湿度,怀疑离线 break; } + // 获取今天对应imei的日志 + Map todayLogByRoller = todayLogCountByImeiMap.get(imei); // 最后一条对应imei对应温度 Map dtuData = dtuDataInfo.get(0); // 获取当前imei下的所有参数设置以及卷膜自动化条件设置 Map> configTermByRollerMap = rollerTermMap.get(imei); - + if (configTermByRollerMap.isEmpty()) { + // todo 当前大棚下没有设置条件或者参数 + break; + } configTermByRollerMap.forEach((config, terms) ->{ // 每个卷膜分组只会有一个卷膜参数设置,所有取第一个即可 RollerTermVO rollerConfig = terms.get(0); - BigDecimal ventTotalLen = rollerConfig.getVentTotalLen(); // 风口总长 - String roller = rollerConfig.getRoller(); // 卷膜 + // 整体参数 String refTempCode = rollerConfig.getRefTempCode(); - BigDecimal reservedLen = rollerConfig.getReservedLen(); // 预留封口 + String roller = rollerConfig.getRoller(); // 卷膜 + BigDecimal ventTotalLen = rollerConfig.getVentTotalLen(); // 风口总长 + BigDecimal reservedLen = rollerConfig.getReservedLen(); // 预留风口 + // 判断对应卷膜是否是今天第一次操作 + boolean isFirstRun = true; + // todayLogByRoller为空铁定第一次操作,否则,就是todayLogByRoller.getRoller为空是第一次操作 + if (!todayLogByRoller.isEmpty()) { + Integer logOfRoller = todayLogByRoller.getOrDefault(roller, 0); + if (logOfRoller>0) { + isFirstRun = false; + } + } terms.forEach(term -> { + // 求温度上报时间 LocalDateTime dtuTime = TimeConvertUtil.strToLocalDateTimeSafe((String) dtuData.get("time")); if (dtuTime == null) { return; // 跳过该设备 @@ -108,14 +160,17 @@ public class RollerAutoTask { boolean inRange = TimeRangeUtil.isTimeInRange(dtuTime, term.getStartTime(), term.getEndTime()); // 在范围内 if (inRange) { - //判断温度是否在 + //判断温度是否在适宜温度内 String redTempCode = dtuData.get(refTempCode).toString(); BigDecimal currentTemp = new BigDecimal(dtuData.get(redTempCode).toString()); TempCommandStatus tempCommandStatus = TempJudgeUtil.judgeTempCommand(currentTemp, term.getTemp()); if (tempCommandStatus==TempCommandStatus.OPEN) { // 开指令 + sendOpenCommand(imei, roller, isFirstRun, term.getVent(),reservedLen); + isFirstRun = false; } else if (tempCommandStatus==TempCommandStatus.CLOSE) { // 关指令 + sendCloseCommand(imei, roller, isFirstRun, term.getVent()); } } // 不在掠过 @@ -126,4 +181,87 @@ public class RollerAutoTask { } } + /** + * @param isFirstRun 是否第一次执行 + * @param vent 每次执行的风口大小 + */ + private void sendOpenCommand(String imei, String roller, boolean isFirstRun, + BigDecimal vent, BigDecimal reservedLen) { + try { + // 默认 + BigDecimal openLen = vent; + // 是第一次,需要vent+预留风口 + if (isFirstRun) { + log.info("自动模式:设备【{}】开启自动模式。触发自动化条件,卷膜【{}】是今天第一次执行",imei,roller); + openLen = openLen.add(reservedLen); + } + String funcType = roller + "k1"; + + + String lockKey = "lock:" + imei + ":" + funcType; + Boolean lockSuccess = stringRedisTemplate.opsForValue().setIfAbsent( + lockKey, "auto_mode", dtuCtlLockTTL, TimeUnit.SECONDS // 延长至15秒,适配设备回执场景 + ); + if (lockSuccess == null || !lockSuccess) { + log.warn("【分布式锁】前端【自动模式】下触发【{}】操作设备【{}】的【{}】功能失败;可能其他用户正在操作此功能", roller, imei, funcType); + return; + } + + // 3. 记录日志 + log.info("【指令处理】前端{}于{}控制设备{}的{}功能,指令:{}", + clientId, LocalDateTime.now(), deviceId, funcType, payload); + SysUser sysUser = sysUserService.lambdaQuery() + .eq(SysUser::getClientId, clientId) + .one(); + String operator = "手动控制"; + if (sysUser!=null) { + operator = sysUser.getUserName(); + } + SysAgriInfo agriInfo = sysAgriInfoService.lambdaQuery() + .eq(SysAgriInfo::getImei, deviceId) + .one(); + String agriName = (agriInfo!=null && org.apache.commons.lang3.ObjectUtils.isNotEmpty(agriInfo.getAgriName()))?agriInfo.getAgriName():null; + SysDevOperLog logDto = new SysDevOperLog(); + logDto.setAgriName(agriName); + logDto.setImei(deviceId); + logDto.setFuncCode(funcType); + logDto.setOpType(funcCodeMap.get(funcType)); + logDto.setOpSource(1); + logDto.setPayload(payload); + logDto.setLockAcquired(1); + logDto.setLockHolder(clientId); + logDto.setCreateBy(operator); + sysDevOperLogService.save(logDto); + + + mqttMessageSender.publish("dtu/"+imei+"/down", "{\""+funcType+"\":1}"); + // 获取运行时间 + int runTime = RollerTimeCalculator.calculateRunTime(openLen); + if (runTime>0) { + autoOffManager.scheduleAutoOff(imei, roller+"k",runTime); + } + } catch (MqttException e) { + throw new RuntimeException(e); + } + } + + /** + * @param isFirstRun 是否第一次执行 + * @param vent 每次执行的风口大小 + */ + private void sendCloseCommand(String imei, String roller, boolean isFirstRun, BigDecimal vent) { + try { + // 如果是第一次直接返回 + if (isFirstRun) return; + String funcType = roller + "g1"; + mqttMessageSender.publish("dtu/"+imei+"/down", "{\""+funcType+"\":1}"); + // 获取运行时间 + int runTime = RollerTimeCalculator.calculateRunTime(vent); + if (runTime > 0) { + autoOffManager.scheduleAutoOff(imei, roller+"g",runTime); + } + } catch (MqttException e) { + throw new RuntimeException(e); + } + } } diff --git a/agri-system/src/main/java/com/agri/system/service/ISysDevOperLogService.java b/agri-system/src/main/java/com/agri/system/service/ISysDevOperLogService.java index 2f87a3b..1b44e25 100644 --- a/agri-system/src/main/java/com/agri/system/service/ISysDevOperLogService.java +++ b/agri-system/src/main/java/com/agri/system/service/ISysDevOperLogService.java @@ -1,6 +1,8 @@ package com.agri.system.service; import java.util.List; +import java.util.Map; + import com.baomidou.mybatisplus.extension.service.IService; import com.agri.system.domain.SysDevOperLog; @@ -58,4 +60,7 @@ public interface ISysDevOperLogService extends IService { * @return 结果 */ public int deleteSysDevOperLogById(Long id); + + + Map> getTodayLogCountByImeiMap(List imeiList); } diff --git a/agri-system/src/main/java/com/agri/system/service/impl/SysDevOperLogServiceImpl.java b/agri-system/src/main/java/com/agri/system/service/impl/SysDevOperLogServiceImpl.java index bdb48d0..b4ccaef 100644 --- a/agri-system/src/main/java/com/agri/system/service/impl/SysDevOperLogServiceImpl.java +++ b/agri-system/src/main/java/com/agri/system/service/impl/SysDevOperLogServiceImpl.java @@ -1,7 +1,16 @@ package com.agri.system.service.impl; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + import com.agri.common.utils.DateUtils; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import org.springframework.stereotype.Service; import com.agri.system.mapper.SysDevOperLogMapper; @@ -91,4 +100,50 @@ public class SysDevOperLogServiceImpl extends ServiceImpl> getTodayLogCountByImeiMap(List imeiList) { + // 1. 构造今日时间范围(精准过滤,避免毫秒级漏查) + LocalDateTime todayStart = LocalDateTime.of(LocalDate.now(), LocalTime.MIN); + LocalDateTime tomorrowStart = LocalDateTime.of(LocalDate.now().plusDays(1), LocalTime.MIN); + + // 2. 构建SQL层面的分组统计查询(仅查必要字段,利用索引) + QueryWrapper queryWrapper = new QueryWrapper<>(); + queryWrapper.select("imei", "func_code", "COUNT(*) as count") // 仅统计必需字段 + .in("imei", imeiList) // 过滤固定IMEI列表(命中idx_imei_time/idx_func的imei前缀) + .ge("create_time", todayStart) // 今日0点后(命中idx_imei_time的create_time) + .lt("create_time", tomorrowStart) // 明日0点前(避免23:59:59.999漏查) + .groupBy("imei", "func_code"); // 按IMEI+func_code分组(利用idx_func索引) + + // 3. 执行查询(返回Map列表,仅包含imei/func_code/count三个字段) + List> sqlResult = baseMapper.selectMaps(queryWrapper); + + // 4. 组装嵌套Map:Map> + Map> nestedCountMap = new HashMap<>(imeiList.size()); + + // 4.1 初始化所有固定IMEI的内层Map(避免取值时NPE) + for (String imei : imeiList) { + nestedCountMap.put(imei, new HashMap<>()); + } + // 4.2 填充统计结果 + for (Map row : sqlResult) { + String imei = (String) row.get("imei"); + String funcCode = (String) row.get("func_code"); + Integer count = ((Number) row.get("count")).intValue(); + nestedCountMap.get(imei).put(funcCode, count); + } + return nestedCountMap; + } }