分布式锁逻辑执行后删除

自动模式不走是否启动定时任务逻辑、不走是否开启自动关逻辑
master
lld 2026-03-29 17:33:22 +08:00
parent 575467d76a
commit 76469decc5
6 changed files with 167 additions and 117 deletions

View File

@ -92,7 +92,7 @@ public class DeviceAckHandler {
String lockKey = "lock:" + deviceId + ":" + funcType;
Boolean delete = stringRedisTemplate.delete(lockKey);
if (propObj.size() > 1) {
log.warn("【设备回执】prop包含多个功能码仅处理第一个{}", propObj.keySet());
log.warn("【设备回执】prop包含多个功能码仅处理第一个{}, {}", propObj,payload);
}
log.info("【设备回执】设备{}的{}功能执行完成,已释放锁:{},{}", deviceId, funcType, lockKey, delete);
int runTime = 0;

View File

@ -109,8 +109,31 @@ public class DeviceStatusHandler {
return;
}
boolean isAck = payloadObj.containsKey("suc") && payloadObj.containsKey("prop");
JSONObject sendObj = payloadObj; // 默认直接用原对象
// 如果是回执,先拿 funcType
if (isAck) {
JSONObject propObj = payloadObj.getJSONObject("prop");
if (propObj != null && !propObj.isEmpty()) {
String funcType = propObj.entrySet().iterator().next().getKey();
String lockKey = "lock:" + deviceId + ":" + funcType;
// 读取锁的 value比如 autooff / user:1001
String lockHolder = stringRedisTemplate.opsForValue().get(lockKey);
if (lockHolder != null) {
sendObj = new JSONObject(payloadObj); // 只在需要时复制
sendObj.put("clientId", lockHolder);
// 如果相等则为自动模式直接退出方法 不转发ack消息 自动关也不应该转发
if (AUTO_MODE.equals(lockHolder) || AUTO_OFF.equals(lockHolder)) {
return;
}
}
}
}
// 转发消息
forwardPayload(deviceId, payload,payloadObj,action);
forwardPayload(deviceId, payload,payloadObj,action, sendObj, isAck);
// 获取第二个动态段,如"up"或"ack"
if ("ack".equals(action)) {
@ -118,30 +141,10 @@ public class DeviceStatusHandler {
}
}
private void forwardPayload(String deviceId,String payload,JSONObject payloadObj,String action) {
private void forwardPayload(String deviceId,String payload,
JSONObject payloadObj,String action, JSONObject sendObj, boolean isAck) {
try {
boolean isAck = payloadObj.containsKey("suc") && payloadObj.containsKey("prop");
JSONObject sendObj = payloadObj; // 默认直接用原对象
// 如果是回执,先拿 funcType
if (isAck) {
JSONObject propObj = payloadObj.getJSONObject("prop");
if (propObj != null && !propObj.isEmpty()) {
String funcType = propObj.entrySet().iterator().next().getKey();
String lockKey = "lock:" + deviceId + ":" + funcType;
// 读取锁的 value比如 autooff / user:1001
String lockHolder = stringRedisTemplate.opsForValue().get(lockKey);
if (lockHolder != null) {
sendObj = new JSONObject(payloadObj); // 只在需要时复制
sendObj.put("clientId", lockHolder);
// 如果相等则为自动模式直接退出方法 不转发ack消息 自动关也不应该转发
if (AUTO_MODE.equals(lockHolder) || AUTO_OFF.equals(lockHolder)) {
return;
}
}
}
}
// 非回执消息:正常转发给订阅前端
// 查询Redis中订阅该设备的前端列表sub:{deviceId}
Set<String> subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId);

View File

@ -87,51 +87,55 @@ public class FrontendConfigHandler {
log.warn("【分布式锁】前端{}操作设备{}的{}功能失败;可能其他用户正在操作此功能", clientId, deviceId, funcType);
return;
}
// 转发前端指令
String deviceTopic = "dtu/" + deviceId + "/down";
mqttMessageSender.publish(deviceTopic, payload);
LocalDateTime currentTime = LocalDateTime.now();
// 3. 记录日志
log.info("【指令处理】前端{}于{}控制设备{}的{}功能,指令:{}",
clientId, currentTime, deviceId, funcType, payload);
try {
// 转发前端指令
String deviceTopic = "dtu/" + deviceId + "/down";
mqttMessageSender.publish(deviceTopic, payload);
LocalDateTime currentTime = LocalDateTime.now();
// 3. 记录日志
log.info("【指令处理】前端{}于{}控制设备{}的{}功能,指令:{}",
clientId, currentTime, deviceId, funcType, payload);
String funcName = funcType.substring(0, funcType.length() - 1);
Integer funcCode = funcCodeMap.get(funcType);
String funcName = funcType.substring(0, funcType.length() - 1);
Integer funcCode = funcCodeMap.get(funcType);
// 当卷膜 开 暂停,才执行的逻辑
if (funcCode == 0 && funcType.contains("k")) {
SysRollerAir sysRollerAir = sysRollerAirService.lambdaQuery()
.eq(SysRollerAir::getImei, deviceId)
.eq(SysRollerAir::getRoller, funcName)
.eq(SysRollerAir::getOpType, 1)
.orderByDesc(SysRollerAir::getId)
.last("limit 1")
.one();
BigDecimal ventTotalLen = BigDecimal.ZERO;
if (ObjectUtils.isNotEmpty(sysRollerAir)
&& sysRollerAir.getOpTime().isBefore(currentTime)) {
// 当卷膜 开 暂停,才执行的逻辑
if (funcCode == 0 && funcType.contains("k")) {
SysRollerAir sysRollerAir = sysRollerAirService.lambdaQuery()
.eq(SysRollerAir::getImei, deviceId)
.eq(SysRollerAir::getRoller, funcName)
.eq(SysRollerAir::getOpType, 1)
.orderByDesc(SysRollerAir::getId)
.last("limit 1")
.one();
BigDecimal ventTotalLen = BigDecimal.ZERO;
if (ObjectUtils.isNotEmpty(sysRollerAir)
&& sysRollerAir.getOpTime().isBefore(currentTime)) {
long time = Math.abs(Duration.between(currentTime, sysRollerAir.getOpTime()).getSeconds());
BigDecimal ventTotalTime = BigDecimal.valueOf(time);
long time = Math.abs(Duration.between(currentTime, sysRollerAir.getOpTime()).getSeconds());
BigDecimal ventTotalTime = BigDecimal.valueOf(time);
// 除以一圈的时间乘以一圈的长度
ventTotalLen = ventTotalTime.divide(perLapSec, 2, RoundingMode.HALF_UP)
.multiply(perLapLen).setScale(2, RoundingMode.HALF_UP);
log.info("【自动化参数】卷膜校准时间:{}; 一圈秒数:{}; 一圈长度: {}", ventTotalTime,perLapSec,perLapLen);
}
String config = "{\"ventTotalLen\": " + ventTotalLen +",\"clientId\":\""+clientId+"\"}";
// 查数据库、最后一条卷膜开暂停。计算时间发送自动关时间
mqttMessageSender.publish("frontend/"+clientId+"/dtu/"+deviceId+"/config", config);
}
// 插入记录
SysRollerAir rollerAir = new SysRollerAir();
rollerAir.setImei(deviceId);
rollerAir.setRoller(funcName);
rollerAir.setOpType(funcCode);
rollerAir.setPayload(payload);
rollerAir.setClientid(clientId);
rollerAir.setOpTime(currentTime);
sysRollerAirService.save(rollerAir);
log.info("【指令转发】前端{} → 设备{}的{}功能", clientId, deviceId, funcType);
// 除以一圈的时间乘以一圈的长度
ventTotalLen = ventTotalTime.divide(perLapSec, 2, RoundingMode.HALF_UP)
.multiply(perLapLen).setScale(2, RoundingMode.HALF_UP);
log.info("【自动化参数】卷膜校准时间:{}; 一圈秒数:{}; 一圈长度: {}", ventTotalTime,perLapSec,perLapLen);
}
String config = "{\"ventTotalLen\": " + ventTotalLen +",\"clientId\":\""+clientId+"\"}";
// 查数据库、最后一条卷膜开暂停。计算时间发送自动关时间
mqttMessageSender.publish("frontend/"+clientId+"/dtu/"+deviceId+"/config", config);
}
// 插入记录
SysRollerAir rollerAir = new SysRollerAir();
rollerAir.setImei(deviceId);
rollerAir.setRoller(funcName);
rollerAir.setOpType(funcCode);
rollerAir.setPayload(payload);
rollerAir.setClientid(clientId);
rollerAir.setOpTime(currentTime);
sysRollerAirService.save(rollerAir);
log.info("【指令转发】前端{} → 设备{}的{}功能", clientId, deviceId, funcType);
} finally {
stringRedisTemplate.delete(lockKey);
}
}
}

View File

@ -1,6 +1,7 @@
package com.agri.framework.interceptor;
import com.agri.common.core.domain.entity.SysUser;
import com.agri.common.utils.wechat.WxUtil;
import com.agri.framework.config.MqttConfig;
import com.agri.framework.manager.MqttAutoOffManager;
import com.agri.system.domain.SysAgriInfo;
@ -153,37 +154,41 @@ public class FrontendControlHandler {
return;
}
// 3. 记录日志
log.info("【指令处理】前端{}于{}控制设备{}的{}功能,指令:{}",
clientId, LocalDateTime.now(), deviceId, funcType, payload);
SysUser sysUser = sysUserService.lambdaQuery()
.eq(SysUser::getClientId, clientId)
.one();
String operator = "手动控制";
if (sysUser!=null) {
operator = sysUser.getUserName();
}
SysAgriInfo agriInfo = sysAgriInfoService.lambdaQuery()
.eq(SysAgriInfo::getImei, deviceId)
.one();
String agriName = (agriInfo!=null && ObjectUtils.isNotEmpty(agriInfo.getAgriName()))?agriInfo.getAgriName():null;
SysDevOperLog logDto = new SysDevOperLog();
logDto.setAgriName(agriName);
logDto.setImei(deviceId);
logDto.setFuncCode(funcType);
logDto.setOpType(funcCodeMap.get(funcType));
logDto.setOpSource(1);
logDto.setPayload(payload);
logDto.setLockAcquired(1);
logDto.setLockHolder(clientId);
logDto.setCreateBy(operator);
sysDevOperLogService.save(logDto);
//todo
mqttMessageSender.publish(deviceTopic, payload);
try {
// 3. 记录日志
log.info("【指令处理】前端{}于{}控制设备{}的{}功能,指令:{}",
clientId, LocalDateTime.now(), deviceId, funcType, payload);
SysUser sysUser = sysUserService.lambdaQuery()
.eq(SysUser::getClientId, clientId)
.one();
String operator = "手动控制";
if (sysUser!=null) {
operator = sysUser.getUserName();
}
SysAgriInfo agriInfo = sysAgriInfoService.lambdaQuery()
.eq(SysAgriInfo::getImei, deviceId)
.one();
String agriName = (agriInfo!=null && ObjectUtils.isNotEmpty(agriInfo.getAgriName()))?agriInfo.getAgriName():null;
SysDevOperLog logDto = new SysDevOperLog();
logDto.setAgriName(agriName);
logDto.setImei(deviceId);
logDto.setFuncCode(funcType);
logDto.setOpType(funcCodeMap.get(funcType));
logDto.setOpSource(1);
logDto.setPayload(payload);
logDto.setLockAcquired(1);
logDto.setLockHolder(clientId);
logDto.setCreateBy(operator);
sysDevOperLogService.save(logDto);
//todo
mqttMessageSender.publish(deviceTopic, payload);
// if (save) {
// testAutoOffTask(deviceId,funcCodeMap);
// }
log.info("【指令转发】前端{} → 设备{}的{}功能", clientId, deviceId, funcType);
log.info("【指令转发】前端{} → 设备{}的{}功能", clientId, deviceId, funcType);
} finally {
stringRedisTemplate.delete(lockKey);
}
}
public void testAutoOffTask(String deviceId, Map<String,Integer> funcCodeMap) throws MqttException {

View File

@ -276,16 +276,27 @@ public class MqttAutoOffManager {
log.info("【自动关任务】{}功能忙锁占用跳过自动关闭deviceId={}, funcType={}", funcType, deviceId, funcType);
return;
}
String deviceTopic = "dtu/" + deviceId + "/down";
JSONObject down = new JSONObject();
down.put(funcType, 0);
log.info("触发自动化条件");
//todo
mqttMessageSender.publish(deviceTopic, down.toJSONString());
try {
String deviceTopic = "dtu/" + deviceId + "/down";
JSONObject down = new JSONObject();
down.put(funcType, 0);
log.info("触发自动化条件");
mqttMessageSender.publish(deviceTopic, down.toJSONString());
saveDevLog(deviceId, funcType, down, latest);
log.info("【自动关任务】检测仍在运行已下发关闭deviceId={}, funcType={}, payload={}", deviceId, funcType, down.toJSONString());
} catch (Exception e){
WxUtil.pushText("自动关任务执行报错-下发关闭指令:\n deviceId: " + deviceId + "\n funcType:" + funcType+"\n Cause: "+e);
log.warn("【自动关任务】下发关闭指令失败跳过deviceId={}, funcType={}", deviceId, funcType);
} finally {
stringRedisTemplate.delete(lockKey);
}
}
private void saveDevLog(String deviceId, String funcType, JSONObject down, String latest) {
SysAgriInfo agriInfo = sysAgriInfoService.lambdaQuery()
.eq(SysAgriInfo::getImei, deviceId)
.one();
String agriName = (agriInfo!=null && ObjectUtils.isNotEmpty(agriInfo.getAgriName()))?agriInfo.getAgriName():null;
String agriName = (agriInfo != null && ObjectUtils.isNotEmpty(agriInfo.getAgriName())) ? agriInfo.getAgriName() : null;
SysDevOperLog logDto = new SysDevOperLog();
logDto.setAgriName(agriName);
logDto.setImei(deviceId);
@ -300,8 +311,6 @@ public class MqttAutoOffManager {
logDto.setCreateBy("自动关");
logDto.setTaskStatus(getFutureStatus().toString());
sysDevOperLogService.save(logDto);
log.info("【自动关任务】检测仍在运行已下发关闭deviceId={}, funcType={}, payload={}", deviceId, funcType, down.toJSONString());
}
// 新增:收到“关”指令时,尝试取消对应自动关任务(优化:减少无意义任务执行;正确性仍以到点状态判断为准)

View File

@ -7,13 +7,12 @@ import com.agri.common.utils.wechat.WxUtil;
import com.agri.framework.config.MqttConfig;
import com.agri.framework.manager.MqttAutoOffManager;
import com.agri.system.domain.SysAgriInfo;
import com.agri.system.domain.SysAgriLimit;
import com.agri.system.domain.SysDevOperLog;
import com.agri.system.domain.vo.RollerTermVO;
import com.agri.system.service.ISysAgriInfoService;
import com.agri.system.service.ISysDevOperLogService;
import com.agri.system.service.ISysDtuDataService;
import com.agri.system.service.ISysRollerParamService;
import com.agri.system.service.*;
import org.apache.commons.collections4.CollectionUtils;
import org.checkerframework.checker.units.qual.A;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -26,10 +25,9 @@ import javax.annotation.Resource;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
@ -74,12 +72,30 @@ public class RollerAutoTask {
private StringRedisTemplate stringRedisTemplate;
@Autowired
private ISysAgriLimitService agriLimitService;
@Value("${spring.mqtt.dtu-ctl-lock-ttl}")
private int dtuCtlLockTTL;
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss");
private static final Map<String, Function<SysAgriLimit, Integer>> LIMIT_MAP = new HashMap<>();
private static final Set<String> VALID_FUNC_CODES = new HashSet<>();
static {
LIMIT_MAP.put("jm1g1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1gLimit())));
LIMIT_MAP.put("jm2g1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2gLimit())));
LIMIT_MAP.put("jbg1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbgLimit())));
LIMIT_MAP.put("jm3g1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3gLimit())));
LIMIT_MAP.put("jm2k1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2kLimit())));
LIMIT_MAP.put("jm3k1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3kLimit())));
LIMIT_MAP.put("jbk1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbkLimit())));
LIMIT_MAP.put("jm1k1", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1kLimit())));
}
// ========== 常量定义(新增) ==========
private static final int WORK_MODE_AUTO = 1; // 自动模式
private static final int NOT_DELETED = 0; // 未删除
@ -88,6 +104,7 @@ public class RollerAutoTask {
private static final String CREATE_BY = "条件控制";
private static final int OP_SOURCE = 3; // 操作来源:条件控制
private static final int LOCK_ACQUIRED = 1; // 是否获取锁
private static final int DEFAULT_RUN_TIME = 300; // 默认运行时间
public void checkAutoTerm() {
try {
@ -359,22 +376,28 @@ public class RollerAutoTask {
// ========== 2. 记录操作日志 ==========
log.error("【指令处理】自动模式下触发{}设备{}的{}功能,指令:{}",
isOpen ? "开启" : "关闭", imei, funcType, message);
saveOperLog(imei, agriName, funcType, message, isOpen ? 1 : 0);
// ========== 3. 发布MQTT指令 ==========todo
mqttMessageSender.publish("dtu/" + imei + "/down", message);
if (isCancelOff) {
log.debug("【自动关调度】设备{}卷膜{}:触发最后一条自动化条件,下发关闭指令,无需暂停!时间:{}", imei, roller, LocalDateTime.now());
return;
}
// ========== 4. 计算运行时间并调度自动关 ==========
int runTime = RollerTimeCalculator.calculateRunTime(len);
if (isCancelOff) {
// 如果最后一条则遵循手动限位设置如果没设置默认300秒
SysAgriLimit agriLimit = agriLimitService.lambdaQuery()
.eq(SysAgriLimit::getImei, imei)
.one();
if (agriLimit != null) {
runTime = LIMIT_MAP.getOrDefault(funcType, k -> DEFAULT_RUN_TIME).apply(agriLimit);
}
log.debug("【自动关调度】设备{}卷膜{}:触发最后一条自动化条件,遵循手动限位设置!时间:{}", imei, roller, LocalDateTime.now());
}
if (runTime > 0) {
String autoOffKey = roller + (isOpen ? "k1" : "g1");
autoOffManager.scheduleAutoOff(imei, autoOffKey, runTime);
log.debug("【自动关调度】设备{}卷膜「{}:{}」调度{}秒后自动关闭", imei, roller,(isOpen?"开":"关"), runTime);
}
saveOperLog(imei, agriName, funcType, message, isOpen ? 1 : 0, isCancelOff, runTime);
}
// catch (MqttException e) {
// // ========== 异常处理:记录详细日志,不抛运行时异常 ==========
@ -397,7 +420,8 @@ public class RollerAutoTask {
* @param payload
* @param opType 1-0-
*/
private void saveOperLog(String imei, String agriName, String funcCode, String payload, int opType) {
private void saveOperLog(String imei, String agriName,
String funcCode, String payload, int opType, boolean isCancelOff, int runTime) {
SysDevOperLog logDto = new SysDevOperLog();
logDto.setAgriName(agriName);
logDto.setImei(imei);
@ -405,8 +429,13 @@ public class RollerAutoTask {
logDto.setOpType(opType);
logDto.setOpSource(OP_SOURCE);
logDto.setPayload(payload);
logDto.setRunTime(runTime);
logDto.setLockAcquired(LOCK_ACQUIRED); // 已获取锁
logDto.setLockHolder(AUTO_MODE);
logDto.setIsTask(isCancelOff?0:1);
if (isCancelOff) {
logDto.setNoTaskReason("触发最后一条自动化指令遵循手动限位设置默认300秒");
}
logDto.setCreateBy(CREATE_BY);
// 可选:增加异常捕获,避免日志保存失败影响指令执行
try {