大棚关联用户

master
lld 2026-02-08 16:47:20 +08:00
parent 14f9770c2c
commit 2724342268
3 changed files with 54 additions and 42 deletions

View File

@ -15,10 +15,7 @@ import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.HashMap; import java.util.*;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
@ -47,10 +44,6 @@ public class DeviceAckHandler {
@Resource @Resource
private ISysAgriLimitService agriLimitService; private ISysAgriLimitService agriLimitService;
// 新增最新状态缓存TTL设备每10秒上报一次缓存一小段时间即可
@Value("${spring.mqtt.latest-ttl-seconds:120}")
private int latestTtlSeconds;
@Autowired @Autowired
private ISysDevOperLogService sysDevOperLogService; private ISysDevOperLogService sysDevOperLogService;
@ -82,11 +75,9 @@ public class DeviceAckHandler {
// 第二步:判断是否为设备回执({"suc":true/false,"prop":{"功能码":指令}} // 第二步:判断是否为设备回执({"suc":true/false,"prop":{"功能码":指令}}
String funcType = null; String funcType = null;
Integer funcValue = null; Integer funcValue = null;
boolean isAck = false;
// 新增:标记是否需要执行自动关任务(全局可用) // 新增:标记是否需要执行自动关任务(全局可用)
// 第二步设备回执处理逻辑完全移除Redis写入 // 第二步设备回执处理逻辑完全移除Redis写入
if (payloadObj.containsKey("suc") && payloadObj.containsKey("prop")) { if (payloadObj.containsKey("suc") && payloadObj.containsKey("prop")) {
isAck = true;
JSONObject propObj = payloadObj.getJSONObject("prop"); JSONObject propObj = payloadObj.getJSONObject("prop");
if (propObj != null && !propObj.isEmpty()) { if (propObj != null && !propObj.isEmpty()) {
// 提取prop中的第一个功能码 // 提取prop中的第一个功能码
@ -125,6 +116,7 @@ public class DeviceAckHandler {
if (suc && StringUtils.hasText(funcType) && funcValue != null && funcValue == 0) { if (suc && StringUtils.hasText(funcType) && funcValue != null && funcValue == 0) {
mqttAutoOffManager.cancelAutoOff(deviceId, funcType); mqttAutoOffManager.cancelAutoOff(deviceId, funcType);
} }
boolean isTask = (Objects.equals(funcValue, 1)) && (runTime > 0);
sysDevOperLogService.lambdaUpdate() sysDevOperLogService.lambdaUpdate()
.eq(SysDevOperLog::getImei, deviceId) .eq(SysDevOperLog::getImei, deviceId)
.eq(SysDevOperLog::getFuncCode, funcType) .eq(SysDevOperLog::getFuncCode, funcType)
@ -135,37 +127,13 @@ public class DeviceAckHandler {
.set(SysDevOperLog::getAckReceived,1) .set(SysDevOperLog::getAckReceived,1)
.set(SysDevOperLog::getAckSuc, suc?1:0) .set(SysDevOperLog::getAckSuc, suc?1:0)
.set(SysDevOperLog::getIsLockSuc,delete?1:0) .set(SysDevOperLog::getIsLockSuc,delete?1:0)
.set(SysDevOperLog::getIsTask,runTime > 0?1:0) .set(SysDevOperLog::getIsTask,isTask?1:0)
.set(SysDevOperLog::getRunTime, runTime) .set(isTask,SysDevOperLog::getRunTime, runTime)
.set(SysDevOperLog::getNoTaskReason,runTime > 0?null:"【自动关任务】标记不符合执行运行时间未配置,当前运行时间:【"+runTime+" s】") .set(isTask,SysDevOperLog::getNoTaskReason,runTime > 0?null:"【自动关任务】标记不符合执行运行时间未配置,当前运行时间:【"+runTime+" s】")
.set(SysDevOperLog::getUpdateBy,"设备回执") .set(SysDevOperLog::getUpdateBy,"设备回执")
.set(SysDevOperLog::getAck, payload) .set(SysDevOperLog::getAck, payload)
.update(); .update();
} }
} }
// 第三步仅处理非回执的设备状态包且仅当是8个功能码结构就写入Redis
// 有没有人订阅都得写,只要发送设备开的指令成功了就得写
if (!isAck) {
// 1) 先校验状态包是否包含8个固定功能码核心只有这种结构才写入
boolean isValidStatus = true;
for (String validCode : VALID_FUNC_CODES) {
if (!payloadObj.containsKey(validCode)) {
isValidStatus = false;
// log.debug("【设备状态包】结构不合法非8个功能码跳过Redis写入deviceId={}payload={}", deviceId, payload);
break;
}
}
if (mqttAutoOffManager.hasAutoOffTask(deviceId) && isValidStatus) {
// ✅ 8个功能码状态包无条件写device:latest:{deviceId},避免自动关读不到最新状态
stringRedisTemplate.opsForValue().set(
"device:latest:" + deviceId,
payload, // 完整的8功能码JSON
latestTtlSeconds,
TimeUnit.SECONDS
);
// log.debug("【设备状态包】写入Redis成功deviceId={}", deviceId);
}
}
} }
} }

View File

@ -65,11 +65,32 @@ public class DeviceStatusHandler {
@Resource @Resource
private MqttSubscriptionManager mqttSubscriptionManager; private MqttSubscriptionManager mqttSubscriptionManager;
@Autowired @Autowired
private DeviceAckHandler deviceAckHandler; private DeviceAckHandler deviceAckHandler;
// 新增最新状态缓存TTL设备每10秒上报一次缓存一小段时间即可
@Value("${spring.mqtt.latest-ttl-seconds:120}")
private int latestTtlSeconds;
/**
* /
*/
@Resource
private MqttAutoOffManager mqttAutoOffManager;
private static final Set<String> VALID_FUNC_CODES = new HashSet<>();
static {
VALID_FUNC_CODES.add("jm1g");
VALID_FUNC_CODES.add("jm2g");
VALID_FUNC_CODES.add("jbg");
VALID_FUNC_CODES.add("jm3g");
VALID_FUNC_CODES.add("jm2k");
VALID_FUNC_CODES.add("jm3k");
VALID_FUNC_CODES.add("jbk");
VALID_FUNC_CODES.add("jm1k");
}
/** /**
* *
*/ */
@ -167,6 +188,30 @@ public class DeviceStatusHandler {
// 优化替换System.out为log.info // 优化替换System.out为log.info
// log.info("【设备状态转发】设备{}无订阅前端,跳过转发", deviceId); // log.info("【设备状态转发】设备{}无订阅前端,跳过转发", deviceId);
} }
// 第三步仅处理非回执的设备状态包且仅当是8个功能码结构就写入Redis
// 有没有人订阅都得写,只要发送设备开的指令成功了就得写
if (!isAck) {
// 1) 先校验状态包是否包含8个固定功能码核心只有这种结构才写入
boolean isValidStatus = true;
for (String validCode : VALID_FUNC_CODES) {
if (!payloadObj.containsKey(validCode)) {
isValidStatus = false;
// log.debug("【设备状态包】结构不合法非8个功能码跳过Redis写入deviceId={}payload={}", deviceId, payload);
break;
}
}
if (mqttAutoOffManager.hasAutoOffTask(deviceId) && isValidStatus) {
// ✅ 8个功能码状态包无条件写device:latest:{deviceId},避免自动关读不到最新状态
stringRedisTemplate.opsForValue().set(
"device:latest:" + deviceId,
payload, // 完整的8功能码JSON
latestTtlSeconds,
TimeUnit.SECONDS
);
// log.debug("【设备状态包】写入Redis成功deviceId={}", deviceId);
}
}
} catch (MqttException e) { } catch (MqttException e) {
WxUtil.pushText( WxUtil.pushText(
"【消息转发失败】\n deviceId: "+deviceId+"\n payload: "+payload+"\n cause: "+e); "【消息转发失败】\n deviceId: "+deviceId+"\n payload: "+payload+"\n cause: "+e);

View File

@ -216,7 +216,6 @@ public class MqttAutoOffManager {
String skipReason = ""; String skipReason = "";
if (!StringUtils.hasText(latest)) { if (!StringUtils.hasText(latest)) {
//todo //todo
skipReason = "【自动关任务】无最新状态";
log.warn("【自动关任务】无最新状态跳过deviceId={}, funcType={}", deviceId, funcType); log.warn("【自动关任务】无最新状态跳过deviceId={}, funcType={}", deviceId, funcType);
return; return;
} }
@ -225,12 +224,12 @@ public class MqttAutoOffManager {
try { try {
latestObj = JSON.parseObject(latest); latestObj = JSON.parseObject(latest);
} catch (Exception e) { } catch (Exception e) {
skipReason = "【自动关任务】执行报错-解析异常";
WxUtil.pushText("自动关任务执行报错-解析异常:\n deviceId: " + deviceId + "\n funcType:" + funcType+"\n Cause: "+e); WxUtil.pushText("自动关任务执行报错-解析异常:\n deviceId: " + deviceId + "\n funcType:" + funcType+"\n Cause: "+e);
log.warn("【自动关任务】最新状态JSON解析失败跳过deviceId={}, funcType={}", deviceId, funcType); log.warn("【自动关任务】最新状态JSON解析失败跳过deviceId={}, funcType={}", deviceId, funcType);
return; return;
} }
if (latestObj == null || latestObj.isEmpty()) { if (latestObj == null || latestObj.isEmpty()) {
log.info("【自动关任务】最新状态为空");
skipReason = "【自动关任务】最新状态为空"; skipReason = "【自动关任务】最新状态为空";
return; return;
} }
@ -260,7 +259,7 @@ public class MqttAutoOffManager {
.set(SysDevOperLog::getExecResult,1) .set(SysDevOperLog::getExecResult,1)
.set(SysDevOperLog::getLatestState, latest) .set(SysDevOperLog::getLatestState, latest)
.set(SysDevOperLog::getUpdateBy,"自动关") .set(SysDevOperLog::getUpdateBy,"自动关")
.set(!skipReason.isEmpty(), SysDevOperLog::getSkipReason, skipReason) .set(SysDevOperLog::getSkipReason, skipReason.isEmpty()?"解析报错":skipReason)
.update(); .update();
if (current != null && current == 1) { if (current != null && current == 1) {