首页在线状态推送,设备离线告警

master
lld 2026-03-07 22:45:38 +08:00
parent 8aea454417
commit b47415ab02
7 changed files with 335 additions and 66 deletions

View File

@ -33,7 +33,11 @@ public class SecurityUtils
{ {
try try
{ {
return getLoginUser().getUserId(); LoginUser loginUser = getLoginUser();
if (loginUser!= null) {
return loginUser.getUserId();
}
return null;
} }
catch (Exception e) catch (Exception e)
{ {
@ -63,7 +67,11 @@ public class SecurityUtils
{ {
try try
{ {
return getLoginUser().getUsername(); LoginUser loginUser = getLoginUser();
if (loginUser !=null ) {
return loginUser.getUsername();
}
return null;
} }
catch (Exception e) catch (Exception e)
{ {
@ -76,16 +84,18 @@ public class SecurityUtils
**/ **/
public static LoginUser getLoginUser() public static LoginUser getLoginUser()
{ {
LoginUser loginUser = null; try {
try Authentication authentication = getAuthentication();
{ if (authentication == null || !(authentication.getPrincipal() instanceof LoginUser)) {
loginUser = (LoginUser) getAuthentication().getPrincipal(); return null; // 无用户上下文时返回 null
} }
catch (Exception e) return (LoginUser) authentication.getPrincipal();
{
} catch (Exception e) {
log.error("获取用户信息异常: {}", HttpStatus.UNAUTHORIZED); log.error("获取用户信息异常: {}", HttpStatus.UNAUTHORIZED);
} }
return loginUser; return null;
} }
/** /**

View File

@ -5,6 +5,8 @@ import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -60,4 +62,17 @@ public class ThreadPoolConfig
} }
}; };
} }
@Bean("mqttPushExecutor")
public Executor mqttPushExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(2000);
executor.setThreadNamePrefix("mqtt-push-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
} }

View File

@ -186,7 +186,7 @@ public class MqttSubscriptionManager {
} }
// 3. 查询该用户名下的所有设备ID替换为你的实际设备查询逻辑 // 3. 查询该用户名下的所有设备ID替换为你的实际设备查询逻辑
List<String> deviceIds = new ArrayList<>(queryImeiByUserId(userId)); List<String> deviceIds = new ArrayList<>(agriInfoService.queryImeiByUserId(userId));
if (userId == 1) { if (userId == 1) {
deviceIds.add("862538065276061"); deviceIds.add("862538065276061");
} }
@ -313,21 +313,5 @@ public class MqttSubscriptionManager {
return frontendTopics; return frontendTopics;
} }
/**
* IDDAO/Service
* @return ID
*/
private List<String> queryImeiByUserId(Long userId) {
// 示例:替换为你项目中查询用户设备的实际代码
// 比如return deviceService.listDeviceIdsByUserId(userId);
SysAgriInfo sysAgriInfo = new SysAgriInfo();
if (!SecurityUtils.isAdmin()) {
sysAgriInfo.setUserId(userId);
}
List<SysAgriInfo> agriInfos = agriInfoService.findAgriByUser(sysAgriInfo);
if (CollectionUtils.isEmpty(agriInfos)) {
return Collections.emptyList();
}
return agriInfos.stream().map(SysAgriInfo::getImei).collect(Collectors.toList());
}
} }

View File

@ -1,14 +1,27 @@
package com.agri.quartz.task; package com.agri.quartz.task;
import com.agri.system.domain.SysDevOperLog; import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.data.redis.core.RedisTemplate; import com.agri.framework.config.MqttConfig;
import com.agri.system.service.ISysAgriInfoService;
import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.*;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.ArrayList; import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.time.LocalDateTime;
import java.util.Date; import java.time.ZoneOffset;
import java.util.List; import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.TimeUnit;
/** /**
* 线 线 * 线 线
@ -16,46 +29,265 @@ import java.util.List;
@Component @Component
public class AgriStatusTask { public class AgriStatusTask {
private static final Logger log = LoggerFactory.getLogger(AgriStatusTask.class);
// Redis 前缀常量
private static final String SUB_KEY_PREFIX = "sub:";
private static final String LOCK_KEY = "lock:device:online:push";
// 注入RedisTemplateSpring Boot自动配置
@Resource @Resource
private RedisTemplate<String, String> redisTemplate; private StringRedisTemplate stringRedisTemplate;
// 模拟从数据库/配置获取所有设备IMEI列表实际可替换为DB查询 @Resource
private List<String> getAllDeviceImeiList() { private MqttConfig.MqttMessageSender mqttMessageSender;
// 示例从数据库查询所有设备IMEI
// return deviceMapper.listAllImei(); @Value("${spring.mqtt.dtu-ctl-lock-ttl:15}")
return Arrays.asList("861234567890123", "869876543210987", "860000000000000"); private int lockTtl;
}
public void checkDeviceOnlineStatus() { @Autowired
// 步骤1获取所有需要检查的设备IMEI列表一次IO从DB/配置读取) private ISysAgriInfoService agriInfoService;
List<String> allImeiList = getAllDeviceImeiList();
if (allImeiList.isEmpty()) { // JSON序列化工具单例
System.out.println("无设备需要检查"); private final ObjectMapper objectMapper = new ObjectMapper();
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
/**
* 10
* 1. sub: key IMEI
* 2. 线
* 3. 线 MQTT frontend/{imei}/online
*/
public void pushOnlineStatus() {
// 1. 加分布式锁,避免集群重复执行
Boolean lockSuccess = stringRedisTemplate.opsForValue()
.setIfAbsent(LOCK_KEY, "running", lockTtl, TimeUnit.SECONDS);
// 补充处理Redis连接异常的情况
if (lockSuccess == null) {
log.error("获取分布式锁失败Redis连接异常");
return; return;
} }
if (!lockSuccess) {
// 步骤2批量查询Redis核心仅一次IO操作 log.debug("其他节点正在执行,跳过本次推送");
// MGET命令批量获取多个Key的值返回值列表与入参Key列表一一对应 return;
List<String> redisValues = redisTemplate.opsForValue().multiGet(allImeiList); }
long startTime = System.currentTimeMillis();
// 步骤3解析结果区分在线/离线设备 try {
List<String> onlineImeiList = new ArrayList<>(); // 2. 安全遍历所有 sub: 开头的 key提取设备 IMEI
List<String> offlineImeiList = new ArrayList<>(); // List<String> allDeviceImeiList = scanAllSubDeviceImei();
// if (allDeviceImeiList.isEmpty()) {
for (int i = 0; i < allImeiList.size(); i++) { // log.info("没有找到任何设备订阅记录,结束任务");
String imei = allImeiList.get(i); // return;
// Redis中Key存在则值不为null我们存的是"online"不存在则为null // }
if (redisValues.get(i) != null) { // 查询大棚列表所有在线设备
onlineImeiList.add(imei); List<String> imeiList = agriInfoService.queryImeiByUserId(null);
} else { if (imeiList.isEmpty()) {
offlineImeiList.add(imei); log.info("大棚表无数据,结束推送");
return;
} }
log.info("从大棚表获取到合法IMEI总数{}", imeiList.size());
// 3. 批量查询设备在线状态Redis Pipeline一次网络往返
asyncBatchPushMqtt(batchCheckDeviceOnline(imeiList));
} catch (Exception e) {
log.error("设备在线状态推送任务异常", e);
// 可选:异常告警(如企业微信/钉钉)
// WxUtil.pushText("【设备在线状态推送异常】\n" + e.getMessage());
} finally {
// 释放锁可选也可依赖TTL自动过期
stringRedisTemplate.delete(LOCK_KEY);
log.info("设备在线状态推送任务完成,耗时:{}ms", System.currentTimeMillis() - startTime);
}
}
// ========== 批量查在线状态Pipeline 优化版JDK 8 适配) ==========
// 在线离线的都得推
private Map<String, Map<String, Boolean>> batchCheckDeviceOnline(List<String> imeiList) {
Map<String, Map<String, Boolean>> result = new HashMap<>();
if (imeiList.isEmpty()) {
return result;
} }
// JDK 8 显式声明 RedisCallback避免 Lambda 泛型问题
List<Object> results = stringRedisTemplate.executePipelined(
new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) {
StringRedisSerializer serializer = new StringRedisSerializer();
for (String imei : imeiList) {
byte[] onlineKeyBytes = serializer.serialize(SUB_KEY_PREFIX + imei);
connection.exists(onlineKeyBytes); // 批量执行 exists
connection.exists(serializer.serialize(imei));
}
return null;
}
},
new StringRedisSerializer()
);
// 步骤4处理结果示例打印日志实际可写入DB/推送告警等) // 解析结果每两个结果对应一个IMEIsubExist + imeiOnline
System.out.println("[" + new Date() + "] 在线设备:" + onlineImeiList); for (int i = 0; i < imeiList.size(); i++) {
System.out.println("[" + new Date() + "] 离线设备:" + offlineImeiList); String imei = imeiList.get(i);
// 初始化默认状态:不存在+离线
boolean subExist = false;
boolean imeiOnline = false;
// 越界判断避免IndexOutOfBoundsException
int subIndex = i * 2;
int imeiIndex = i * 2 + 1;
if (subIndex < results.size()) {
Object subResult = results.get(subIndex);
subExist = parseExistsResult(subResult);
}
if (imeiIndex < results.size()) {
Object imeiResult = results.get(imeiIndex);
imeiOnline = parseExistsResult(imeiResult);
}
result.put(imei, ImmutableMap.of("subExist", subExist, "imeiOnline", imeiOnline));
}
return result;
}
private boolean parseExistsResult(Object result) {
if (result instanceof Long) {
return ((Long) result) == 1;
} else if (result instanceof Boolean) {
return (Boolean) result;
}
return false;
}
// ========== 核心方法3异步批量推送在线状态到 MQTT线程池隔离 ==========
@Async("mqttPushExecutor")
public void asyncBatchPushMqtt(Map<String, Map<String, Boolean>> statusMap) {
if (statusMap.isEmpty()) {
log.info("不存在任何imei");
return;
}
int successCount = 0;
int failCount = 0;
String dateNow = LocalDateTime.now().format(DATE_TIME_FORMATTER);
// 在线状态
for (Map.Entry<String, Map<String, Boolean>> map : statusMap.entrySet()) {
String imei = map.getKey();
try {
// 按你的需求,直接推送到 frontend/{imei}/online 主题
Map<String, Boolean> imeiMap = map.getValue();
// 设备在线的 && 推送首页状态 离线在线都推
if (imeiMap.get("subExist")) {
// 构造首页消息用ObjectMapper序列化避免手动拼接JSON
Map<String, Object> onlineMsg = new HashMap<>();
onlineMsg.put("online", imeiMap.get("imeiOnline") ? "在线" : "离线");
onlineMsg.put("time", dateNow); // 毫秒时间戳
String onlineMessage = objectMapper.writeValueAsString(onlineMsg);
mqttMessageSender.publish("frontend/" + imei + "/online", onlineMessage);
}
// 无论设备是否在线 只要离线就推送设备状态
if (!imeiMap.get("imeiOnline")) {
Map<String, Object> alarmMsg = new HashMap<>();
alarmMsg.put("online", "设备离线");
alarmMsg.put("time", dateNow);
String alarmMessage = objectMapper.writeValueAsString(alarmMsg);
mqttMessageSender.publish("frontend/" + imei + "/alarm", alarmMessage);
}
successCount++;
} catch (Exception e) {
failCount++;
log.error("向设备 {} 推送在线状态失败", imei, e);
}
}
log.info("批量在线状态推送完成:成功={},失败={}", successCount, failCount);
}
/*
* LuaIMEIsub:{imei}
* 1Redis +
* @return IMEI线key=IMEIvalue=线
*/
public Map<String, Boolean> getGreenhouseOnlineStatusByLua() {
// 1. 从大棚表获取所有合法IMEI
List<String> allGreenhouseImeiList = agriInfoService.queryImeiByUserId(null);
if (allGreenhouseImeiList.isEmpty()) {
log.info("大棚表无合法IMEI返回空");
return new HashMap<>();
}
// 2. 简化版Lua脚本批量查sub:{imei}是否存在)
String luaScript = "" +
"local imeiList = ARGV\n" +
"local prefix = KEYS[1]\n" +
"local result = {}\n" +
"for _, imei in ipairs(imeiList) do\n" +
" local key = prefix .. imei\n" +
" local isOnline = redis.call('EXISTS', key)\n" +
" table.insert(result, imei)\n" +
" table.insert(result, tostring(isOnline))\n" +
"end\n" +
"return result";
// ===== 核心修改保留你指定的写法仅做JDK 8兼容 =====
DefaultRedisScript<List> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(luaScript);
redisScript.setResultType(List.class); // 保留原有写法
// 3. 构造参数
List<String> keys = Collections.singletonList(SUB_KEY_PREFIX);
List<String> args = allGreenhouseImeiList;
try {
// 4. 执行脚本JDK 8 强制类型转换,安全兼容)
List resultList = stringRedisTemplate.execute(redisScript, keys, args.toArray(new String[0]));
// 5. 解析结果兼容JDK 8 原始List类型
Map<String, Boolean> onlineStatusMap = new HashMap<>();
if (resultList != null && resultList.size() >= 2) {
for (int i = 0; i < resultList.size(); i += 2) {
// JDK 8 显式转换为String避免类型异常
String imei = String.valueOf(resultList.get(i));
String isOnlineStr = String.valueOf(resultList.get(i + 1));
onlineStatusMap.put(imei, "1".equals(isOnlineStr));
}
}
log.info("Lua查询完成合法IMEI数={},在线数={}",
allGreenhouseImeiList.size(),
onlineStatusMap.values().stream().filter(Boolean::booleanValue).count());
return onlineStatusMap;
} catch (Exception e) {
log.error("Lua脚本执行失败", e);
return new HashMap<>();
}
}
// ========== 方式1用 ScanCursor 遍历 sub:* Key ==========
private List<String> scanAllSubDeviceImei() {
List<String> imeiList = new ArrayList<>();
// count 建议一次扫描1000个槽位
ScanOptions scanOptions = ScanOptions.scanOptions()
.match(SUB_KEY_PREFIX + "*")
.count(1000)
.build();
// JDK 8 需显式声明 Cursor<byte[]> 泛型try-with-resources 自动关闭游标
try (Cursor<byte[]> cursor = stringRedisTemplate.getConnectionFactory()
.getConnection()
.scan(scanOptions)) {
while (cursor.hasNext()) {
byte[] keyBytes = cursor.next();
String key = new String(keyBytes, StandardCharsets.UTF_8);
if (key.startsWith(SUB_KEY_PREFIX)) {
String imei = key.substring(SUB_KEY_PREFIX.length());
imeiList.add(imei);
}
}
} catch (Exception e) {
log.error("Cursor 扫描 sub: 前缀 key 失败", e);
}
return imeiList;
} }
} }

View File

@ -71,4 +71,7 @@ public interface ISysAgriInfoService extends IService<SysAgriInfo> {
Map<String,Object> addAgriFromMobile(SysAgriInfo sysAgriInfo); Map<String,Object> addAgriFromMobile(SysAgriInfo sysAgriInfo);
List<AgriAutoInfoVo> findAgriOfAutoInfo(); List<AgriAutoInfoVo> findAgriOfAutoInfo();
List<String> queryImeiByUserId(Long userId);
} }

View File

@ -19,6 +19,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.*; import java.util.*;
import java.util.stream.Collectors;
/** /**
* Service * Service
@ -262,4 +263,23 @@ public class SysAgriInfoServiceImpl extends ServiceImpl<SysAgriInfoMapper, SysAg
return baseMapper.findAgriOfAutoInfo(); return baseMapper.findAgriOfAutoInfo();
} }
/**
* IDDAO/Service
* @return ID
*/
@Override
public List<String> queryImeiByUserId(Long userId) {
SysAgriInfo sysAgriInfo = new SysAgriInfo();
if (!SecurityUtils.isAdmin()) {
sysAgriInfo.setUserId(userId);
}
List<SysAgriInfo> agriInfos = baseMapper.findAgriByUser(sysAgriInfo);
if (CollectionUtils.isEmpty(agriInfos)) {
return Collections.emptyList();
}
return agriInfos.stream().map(SysAgriInfo::getImei).collect(Collectors.toList());
}
} }

View File

@ -156,6 +156,11 @@ public class SysAutoTermServiceImpl extends ServiceImpl<SysAutoTermMapper, SysAu
return roller + "计算风口总长和手动设置风口长度至少填写一个!"; return roller + "计算风口总长和手动设置风口长度至少填写一个!";
} }
boolean isValidLen = (config.getManualTotalLen() != null && config.getManualTotalLen().compareTo(BigDecimal.valueOf(300))>0)
|| (config.getAutoTotalLen() != null && config.getAutoTotalLen().compareTo(BigDecimal.valueOf(300))>0);
if (isValidLen) {
return roller + "风口长度设置不能大于300cm";
}
if (config.getReservedLen() == null || config.getReservedLen().compareTo(BigDecimal.ZERO)<=0) { if (config.getReservedLen() == null || config.getReservedLen().compareTo(BigDecimal.ZERO)<=0) {
return roller + "预留风口长度未设置,请点击相应页签右上角设置后重试!"; return roller + "预留风口长度未设置,请点击相应页签右上角设置后重试!";
} }