暂时提交

master
lld 2026-03-06 21:23:45 +08:00
parent 2b140bd180
commit 6d62735dc4
5 changed files with 305 additions and 49 deletions

View File

@ -219,7 +219,6 @@ public class MqttAutoOffManager {
log.warn("【自动关任务】无最新状态跳过deviceId={}, funcType={}", deviceId, funcType);
return;
}
JSONObject latestObj;
try {
latestObj = JSON.parseObject(latest);
@ -244,8 +243,11 @@ public class MqttAutoOffManager {
} catch (Exception ignore) {
skipReason = "【自动关任务】最新状态功能码获取失败";
}
if (current == null || current!=1) {
skipReason = "【自动关任务】检测未运行或状态未知";
if (current == null) {
skipReason = "【自动关任务】状态未知";
}
if (current != 1) {
skipReason = "【自动关任务】未运行";
}
sysDevOperLogService.lambdaUpdate()
.eq(SysDevOperLog::getImei, deviceId)
@ -256,49 +258,48 @@ public class MqttAutoOffManager {
.eq(SysDevOperLog::getIsTask, 1)
.orderByDesc(SysDevOperLog::getCreateTime)
.last("LIMIT 1")
.set(SysDevOperLog::getExecResult,1)
.set(SysDevOperLog::getLatestState, latest)
.set(SysDevOperLog::getUpdateBy,"自动关")
.set(SysDevOperLog::getSkipReason, skipReason.isEmpty()?"解析报错":skipReason)
.set(SysDevOperLog::getExecResult,skipReason.isEmpty()?1:0)
.set(SysDevOperLog::getUpdateBy,"自动关更新数据")
.set(SysDevOperLog::getSkipReason, skipReason.isEmpty()?"执行成功":skipReason)
.update();
if (current != null && current == 1) {
// 新增:自动关也走分布式锁(避免与前端并发控制同一功能导致乱序/互相覆盖)
String lockKey = "lock:" + deviceId + ":" + funcType;
Boolean lockSuccess = stringRedisTemplate.opsForValue().setIfAbsent(
lockKey, "autooff", dtuCtlLockTTL, TimeUnit.SECONDS
);
if (lockSuccess == null || !lockSuccess) {
log.info("【自动关任务】{}功能忙锁占用跳过自动关闭deviceId={}, funcType={}", funcType, deviceId, funcType);
return;
}
String deviceTopic = "dtu/" + deviceId + "/down";
JSONObject down = new JSONObject();
down.put(funcType, 0);
mqttMessageSender.publish(deviceTopic, down.toJSONString());
SysAgriInfo agriInfo = sysAgriInfoService.lambdaQuery()
.eq(SysAgriInfo::getImei, deviceId)
.one();
String agriName = (agriInfo!=null && ObjectUtils.isNotEmpty(agriInfo.getAgriName()))?agriInfo.getAgriName():null;
SysDevOperLog logDto = new SysDevOperLog();
logDto.setAgriName(agriName);
logDto.setImei(deviceId);
logDto.setFuncCode(funcType);
logDto.setOpType(0);
logDto.setOpSource(2);
logDto.setPayload(down.toJSONString());
logDto.setLockAcquired(1);
logDto.setLockHolder("autoOff");
logDto.setExecResult(1);
logDto.setLatestState(latest);
logDto.setCreateBy("自动关");
logDto.setTaskStatus(getFutureStatus().toString());
sysDevOperLogService.save(logDto);
log.info("【自动关任务】检测仍在运行已下发关闭deviceId={}, funcType={}, payload={}", deviceId, funcType, down.toJSONString());
} else {
if (!skipReason.isEmpty()) {
log.info("【自动关任务】检测未运行或状态未知跳过关闭deviceId={}, funcType={}, current={}", deviceId, funcType, current);
return;
}
// 新增:自动关也走分布式锁(避免与前端并发控制同一功能导致乱序/互相覆盖)
String lockKey = "lock:" + deviceId + ":" + funcType;
Boolean lockSuccess = stringRedisTemplate.opsForValue().setIfAbsent(
lockKey, "autooff", dtuCtlLockTTL, TimeUnit.SECONDS
);
if (lockSuccess == null || !lockSuccess) {
log.info("【自动关任务】{}功能忙锁占用跳过自动关闭deviceId={}, funcType={}", funcType, deviceId, funcType);
return;
}
String deviceTopic = "dtu/" + deviceId + "/down";
JSONObject down = new JSONObject();
down.put(funcType, 0);
mqttMessageSender.publish(deviceTopic, down.toJSONString());
SysAgriInfo agriInfo = sysAgriInfoService.lambdaQuery()
.eq(SysAgriInfo::getImei, deviceId)
.one();
String agriName = (agriInfo!=null && ObjectUtils.isNotEmpty(agriInfo.getAgriName()))?agriInfo.getAgriName():null;
SysDevOperLog logDto = new SysDevOperLog();
logDto.setAgriName(agriName);
logDto.setImei(deviceId);
logDto.setFuncCode(funcType);
logDto.setOpType(0);
logDto.setOpSource(2);
logDto.setPayload(down.toJSONString());
logDto.setLockAcquired(1);
logDto.setLockHolder("autoOff");
logDto.setExecResult(1);
logDto.setLatestState(latest);
logDto.setCreateBy("自动关");
logDto.setTaskStatus(getFutureStatus().toString());
sysDevOperLogService.save(logDto);
log.info("【自动关任务】检测仍在运行已下发关闭deviceId={}, funcType={}, payload={}", deviceId, funcType, down.toJSONString());
}
// 新增:收到“关”指令时,尝试取消对应自动关任务(优化:减少无意义任务执行;正确性仍以到点状态判断为准)

View File

@ -1,4 +1,61 @@
package com.agri.quartz.task;
import com.agri.system.domain.SysDevOperLog;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
/**
* 线 线
*/
@Component
public class AgriStatusTask {
// 注入RedisTemplateSpring Boot自动配置
@Resource
private RedisTemplate<String, String> redisTemplate;
// 模拟从数据库/配置获取所有设备IMEI列表实际可替换为DB查询
private List<String> getAllDeviceImeiList() {
// 示例从数据库查询所有设备IMEI
// return deviceMapper.listAllImei();
return Arrays.asList("861234567890123", "869876543210987", "860000000000000");
}
public void checkDeviceOnlineStatus() {
// 步骤1获取所有需要检查的设备IMEI列表一次IO从DB/配置读取)
List<String> allImeiList = getAllDeviceImeiList();
if (allImeiList.isEmpty()) {
System.out.println("无设备需要检查");
return;
}
// 步骤2批量查询Redis核心仅一次IO操作
// MGET命令批量获取多个Key的值返回值列表与入参Key列表一一对应
List<String> redisValues = redisTemplate.opsForValue().multiGet(allImeiList);
// 步骤3解析结果区分在线/离线设备
List<String> onlineImeiList = new ArrayList<>();
List<String> offlineImeiList = new ArrayList<>();
for (int i = 0; i < allImeiList.size(); i++) {
String imei = allImeiList.get(i);
// Redis中Key存在则值不为null我们存的是"online"不存在则为null
if (redisValues.get(i) != null) {
onlineImeiList.add(imei);
} else {
offlineImeiList.add(imei);
}
}
// 步骤4处理结果示例打印日志实际可写入DB/推送告警等)
System.out.println("[" + new Date() + "] 在线设备:" + onlineImeiList);
System.out.println("[" + new Date() + "] 离线设备:" + offlineImeiList);
}
}

View File

@ -1,16 +1,29 @@
package com.agri.quartz.task;
import com.agri.common.core.domain.entity.SysUser;
import com.agri.common.enums.TempCommandStatus;
import com.agri.common.utils.RollerTimeCalculator;
import com.agri.common.utils.TempJudgeUtil;
import com.agri.common.utils.TimeConvertUtil;
import com.agri.common.utils.TimeRangeUtil;
import com.agri.framework.config.MqttConfig;
import com.agri.framework.interceptor.FrontendControlHandler;
import com.agri.framework.manager.MqttAutoOffManager;
import com.agri.system.domain.SysAgriInfo;
import com.agri.system.domain.SysDevOperLog;
import com.agri.system.domain.vo.RollerTermVO;
import com.agri.system.service.ISysAgriInfoService;
import com.agri.system.service.ISysDevOperLogService;
import com.agri.system.service.ISysDtuDataService;
import com.agri.system.service.ISysRollerParamService;
import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@ -18,6 +31,8 @@ import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
/**
@ -44,7 +59,25 @@ public class RollerAutoTask {
@Resource
private MqttConfig.MqttMessageSender mqttMessageSender;
@Autowired
private ISysDevOperLogService devOperLogService;
@Autowired
private MqttAutoOffManager autoOffManager;
/**
* Redis线
*/
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 使SLF4JJDK 8
*/
private static final Logger log = LoggerFactory.getLogger(RollerAutoTask.class);
@Value("${spring.mqtt.dtu-ctl-lock-ttl}")
private int dtuCtlLockTTL;
public void checkAutoTerm() {
// 查询自动模式的大棚
@ -62,7 +95,8 @@ public class RollerAutoTask {
List<Map<String, Object>> dtuDataList = dtuDataService.getLastDtuDataByImeiList(imeiList);
if (CollectionUtils.isEmpty(dtuDataList)) return;
// 根据温湿度imei分组
Map<String, List<Map<String, Object>>> dtuDataByImeiMap = dtuDataList.stream().collect(Collectors.groupingBy(map -> (String) map.get("imei")));
Map<String, List<Map<String, Object>>> dtuDataByImeiMap
= dtuDataList.stream().collect(Collectors.groupingBy(map -> (String) map.get("imei")));
// 获取所有开启自动化模式的大棚列表的卷膜参数和条件设置
List<RollerTermVO> rollerTermList = rollerParamService.getRollerTerms(imeiList);
@ -77,6 +111,8 @@ public class RollerAutoTask {
Collectors.groupingBy(RollerTermVO::getRoller)
));
// 查询每个IMEI今天的第一条日志
Map<String, Map<String, Integer>> todayLogCountByImeiMap = devOperLogService.getTodayLogCountByImeiMap(imeiList);
// 循环所有开启自动化的大棚
for (String imei : imeiList) {
// 获取温湿度指定imei的数据
@ -86,20 +122,36 @@ public class RollerAutoTask {
// todo 该大棚下1分钟内无最新温湿度怀疑离线
break;
}
// 获取今天对应imei的日志
Map<String, Integer> todayLogByRoller = todayLogCountByImeiMap.get(imei);
// 最后一条对应imei对应温度
Map<String, Object> dtuData = dtuDataInfo.get(0);
// 获取当前imei下的所有参数设置以及卷膜自动化条件设置
Map<String, List<RollerTermVO>> configTermByRollerMap = rollerTermMap.get(imei);
if (configTermByRollerMap.isEmpty()) {
// todo 当前大棚下没有设置条件或者参数
break;
}
configTermByRollerMap.forEach((config, terms) ->{
// 每个卷膜分组只会有一个卷膜参数设置,所有取第一个即可
RollerTermVO rollerConfig = terms.get(0);
BigDecimal ventTotalLen = rollerConfig.getVentTotalLen(); // 风口总长
String roller = rollerConfig.getRoller(); // 卷膜
// 整体参数
String refTempCode = rollerConfig.getRefTempCode();
BigDecimal reservedLen = rollerConfig.getReservedLen(); // 预留封口
String roller = rollerConfig.getRoller(); // 卷膜
BigDecimal ventTotalLen = rollerConfig.getVentTotalLen(); // 风口总长
BigDecimal reservedLen = rollerConfig.getReservedLen(); // 预留风口
// 判断对应卷膜是否是今天第一次操作
boolean isFirstRun = true;
// todayLogByRoller为空铁定第一次操作否则就是todayLogByRoller.getRoller为空是第一次操作
if (!todayLogByRoller.isEmpty()) {
Integer logOfRoller = todayLogByRoller.getOrDefault(roller, 0);
if (logOfRoller>0) {
isFirstRun = false;
}
}
terms.forEach(term -> {
// 求温度上报时间
LocalDateTime dtuTime = TimeConvertUtil.strToLocalDateTimeSafe((String) dtuData.get("time"));
if (dtuTime == null) {
return; // 跳过该设备
@ -108,14 +160,17 @@ public class RollerAutoTask {
boolean inRange = TimeRangeUtil.isTimeInRange(dtuTime, term.getStartTime(), term.getEndTime());
// 在范围内
if (inRange) {
//判断温度是否在
//判断温度是否在适宜温度内
String redTempCode = dtuData.get(refTempCode).toString();
BigDecimal currentTemp = new BigDecimal(dtuData.get(redTempCode).toString());
TempCommandStatus tempCommandStatus = TempJudgeUtil.judgeTempCommand(currentTemp, term.getTemp());
if (tempCommandStatus==TempCommandStatus.OPEN) {
// 开指令
sendOpenCommand(imei, roller, isFirstRun, term.getVent(),reservedLen);
isFirstRun = false;
} else if (tempCommandStatus==TempCommandStatus.CLOSE) {
// 关指令
sendCloseCommand(imei, roller, isFirstRun, term.getVent());
}
}
// 不在掠过
@ -126,4 +181,87 @@ public class RollerAutoTask {
}
}
/**
* @param isFirstRun
* @param vent
*/
private void sendOpenCommand(String imei, String roller, boolean isFirstRun,
BigDecimal vent, BigDecimal reservedLen) {
try {
// 默认
BigDecimal openLen = vent;
// 是第一次需要vent+预留风口
if (isFirstRun) {
log.info("自动模式:设备【{}】开启自动模式。触发自动化条件,卷膜【{}】是今天第一次执行",imei,roller);
openLen = openLen.add(reservedLen);
}
String funcType = roller + "k1";
String lockKey = "lock:" + imei + ":" + funcType;
Boolean lockSuccess = stringRedisTemplate.opsForValue().setIfAbsent(
lockKey, "auto_mode", dtuCtlLockTTL, TimeUnit.SECONDS // 延长至15秒适配设备回执场景
);
if (lockSuccess == null || !lockSuccess) {
log.warn("【分布式锁】前端【自动模式】下触发【{}】操作设备【{}】的【{}】功能失败;可能其他用户正在操作此功能", roller, imei, funcType);
return;
}
// 3. 记录日志
log.info("【指令处理】前端{}于{}控制设备{}的{}功能,指令:{}",
clientId, LocalDateTime.now(), deviceId, funcType, payload);
SysUser sysUser = sysUserService.lambdaQuery()
.eq(SysUser::getClientId, clientId)
.one();
String operator = "手动控制";
if (sysUser!=null) {
operator = sysUser.getUserName();
}
SysAgriInfo agriInfo = sysAgriInfoService.lambdaQuery()
.eq(SysAgriInfo::getImei, deviceId)
.one();
String agriName = (agriInfo!=null && org.apache.commons.lang3.ObjectUtils.isNotEmpty(agriInfo.getAgriName()))?agriInfo.getAgriName():null;
SysDevOperLog logDto = new SysDevOperLog();
logDto.setAgriName(agriName);
logDto.setImei(deviceId);
logDto.setFuncCode(funcType);
logDto.setOpType(funcCodeMap.get(funcType));
logDto.setOpSource(1);
logDto.setPayload(payload);
logDto.setLockAcquired(1);
logDto.setLockHolder(clientId);
logDto.setCreateBy(operator);
sysDevOperLogService.save(logDto);
mqttMessageSender.publish("dtu/"+imei+"/down", "{\""+funcType+"\":1}");
// 获取运行时间
int runTime = RollerTimeCalculator.calculateRunTime(openLen);
if (runTime>0) {
autoOffManager.scheduleAutoOff(imei, roller+"k",runTime);
}
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
/**
* @param isFirstRun
* @param vent
*/
private void sendCloseCommand(String imei, String roller, boolean isFirstRun, BigDecimal vent) {
try {
// 如果是第一次直接返回
if (isFirstRun) return;
String funcType = roller + "g1";
mqttMessageSender.publish("dtu/"+imei+"/down", "{\""+funcType+"\":1}");
// 获取运行时间
int runTime = RollerTimeCalculator.calculateRunTime(vent);
if (runTime > 0) {
autoOffManager.scheduleAutoOff(imei, roller+"g",runTime);
}
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -1,6 +1,8 @@
package com.agri.system.service;
import java.util.List;
import java.util.Map;
import com.baomidou.mybatisplus.extension.service.IService;
import com.agri.system.domain.SysDevOperLog;
@ -58,4 +60,7 @@ public interface ISysDevOperLogService extends IService<SysDevOperLog> {
* @return
*/
public int deleteSysDevOperLogById(Long id);
Map<String, Map<String, Integer>> getTodayLogCountByImeiMap(List<String> imeiList);
}

View File

@ -1,7 +1,16 @@
package com.agri.system.service.impl;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import com.agri.common.utils.DateUtils;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.stereotype.Service;
import com.agri.system.mapper.SysDevOperLogMapper;
@ -91,4 +100,50 @@ public class SysDevOperLogServiceImpl extends ServiceImpl<SysDevOperLogMapper, S
{
return baseMapper.deleteSysDevOperLogById(id);
}
/**
* IMEI lambdaQuery IMEI
* super.lambdaQuery() new LambdaQueryWrapper
*/
/**
* IMEI MapIMEIroller
*
*/
/**
* SQLIMEIfunc_code
* idx_imei_time/idx_funcJSON
*/
public Map<String, Map<String, Integer>> getTodayLogCountByImeiMap(List<String> imeiList) {
// 1. 构造今日时间范围(精准过滤,避免毫秒级漏查)
LocalDateTime todayStart = LocalDateTime.of(LocalDate.now(), LocalTime.MIN);
LocalDateTime tomorrowStart = LocalDateTime.of(LocalDate.now().plusDays(1), LocalTime.MIN);
// 2. 构建SQL层面的分组统计查询仅查必要字段利用索引
QueryWrapper<SysDevOperLog> queryWrapper = new QueryWrapper<>();
queryWrapper.select("imei", "func_code", "COUNT(*) as count") // 仅统计必需字段
.in("imei", imeiList) // 过滤固定IMEI列表命中idx_imei_time/idx_func的imei前缀
.ge("create_time", todayStart) // 今日0点后命中idx_imei_time的create_time
.lt("create_time", tomorrowStart) // 明日0点前避免23:59:59.999漏查)
.groupBy("imei", "func_code"); // 按IMEI+func_code分组利用idx_func索引
// 3. 执行查询返回Map列表仅包含imei/func_code/count三个字段
List<Map<String, Object>> sqlResult = baseMapper.selectMaps(queryWrapper);
// 4. 组装嵌套MapMap<IMEI, Map<func_code, 今日条数>>
Map<String, Map<String, Integer>> nestedCountMap = new HashMap<>(imeiList.size());
// 4.1 初始化所有固定IMEI的内层Map避免取值时NPE
for (String imei : imeiList) {
nestedCountMap.put(imei, new HashMap<>());
}
// 4.2 填充统计结果
for (Map<String, Object> row : sqlResult) {
String imei = (String) row.get("imei");
String funcCode = (String) row.get("func_code");
Integer count = ((Number) row.get("count")).intValue();
nestedCountMap.get(imei).put(funcCode, count);
}
return nestedCountMap;
}
}