在线状态

master
lld 2026-03-08 18:29:25 +08:00
parent a2cb8ddee5
commit 97da9f0abb
11 changed files with 224 additions and 712 deletions

View File

@ -1,104 +0,0 @@
package com.agri.system.controller;
import java.util.List;
import javax.servlet.http.HttpServletResponse;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.agri.common.annotation.Log;
import com.agri.common.core.controller.BaseController;
import com.agri.common.core.domain.AjaxResult;
import com.agri.common.enums.BusinessType;
import com.agri.system.domain.SysRollerAir;
import com.agri.system.service.ISysRollerAirService;
import com.agri.common.utils.poi.ExcelUtil;
import com.agri.common.core.page.TableDataInfo;
/**
* Controller
*
* @author lld
* @date 2026-03-04
*/
@RestController
@RequestMapping("/assets/air")
public class SysRollerAirController extends BaseController
{
@Autowired
private ISysRollerAirService sysRollerAirService;
/**
*
*/
@PreAuthorize("@ss.hasPermi('assets:air:list')")
@GetMapping("/list")
public TableDataInfo list(SysRollerAir sysRollerAir)
{
startPage();
List<SysRollerAir> list = sysRollerAirService.selectSysRollerAirList(sysRollerAir);
return getDataTable(list);
}
/**
*
*/
@PreAuthorize("@ss.hasPermi('assets:air:export')")
@Log(title = "自动化卷膜风口大小设置", businessType = BusinessType.EXPORT)
@PostMapping("/export")
public void export(HttpServletResponse response, SysRollerAir sysRollerAir)
{
List<SysRollerAir> list = sysRollerAirService.selectSysRollerAirList(sysRollerAir);
ExcelUtil<SysRollerAir> util = new ExcelUtil<SysRollerAir>(SysRollerAir.class);
util.exportExcel(response, list, "自动化卷膜风口大小设置数据");
}
/**
*
*/
@PreAuthorize("@ss.hasPermi('assets:air:query')")
@GetMapping(value = "/{id}")
public AjaxResult getInfo(@PathVariable("id") Long id)
{
return success(sysRollerAirService.selectSysRollerAirById(id));
}
/**
*
*/
@PreAuthorize("@ss.hasPermi('assets:air:add')")
@Log(title = "自动化卷膜风口大小设置", businessType = BusinessType.INSERT)
@PostMapping
public AjaxResult add(@RequestBody SysRollerAir sysRollerAir)
{
return toAjax(sysRollerAirService.insertSysRollerAir(sysRollerAir));
}
/**
*
*/
@PreAuthorize("@ss.hasPermi('assets:air:edit')")
@Log(title = "自动化卷膜风口大小设置", businessType = BusinessType.UPDATE)
@PutMapping
public AjaxResult edit(@RequestBody SysRollerAir sysRollerAir)
{
return toAjax(sysRollerAirService.updateSysRollerAir(sysRollerAir));
}
/**
*
*/
@PreAuthorize("@ss.hasPermi('assets:air:remove')")
@Log(title = "自动化卷膜风口大小设置", businessType = BusinessType.DELETE)
@DeleteMapping("/{ids}")
public AjaxResult remove(@PathVariable Long[] ids)
{
return toAjax(sysRollerAirService.deleteSysRollerAirByIds(ids));
}
}

View File

@ -1,139 +0,0 @@
package com.agri.system.domain;
import java.util.Date;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.baomidou.mybatisplus.annotation.TableName;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import com.agri.common.annotation.Excel;
import com.agri.common.core.domain.BaseEntity;
/**
* sys_roller_air
*
* @author lld
* @date 2026-03-04
*/
@TableName("sys_roller_air")
public class SysRollerAir extends BaseEntity
{
@TableField(exist = false)
private static final long serialVersionUID = 1L;
/** 主键ID */
private Long id;
/** 设备IMEI码 */
@Excel(name = "设备IMEI码")
private String imei;
/** 卷膜器编号/标识 */
@Excel(name = "卷膜器编号/标识")
private String roller;
/** 操作类型 0-停止 1-运行 2-查询 3-重置 */
@Excel(name = "操作类型 0-停止 1-运行 2-查询 3-重置")
private Long opType;
/** 操作参数(JSON格式存储风口大小等配置) */
@Excel(name = "操作参数(JSON格式存储风口大小等配置)")
private String payload;
/** 客户端ID */
@Excel(name = "客户端ID")
private String clientid;
/** 操作执行时间 */
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
@Excel(name = "操作执行时间", width = 30, dateFormat = "yyyy-MM-dd")
private Date opTime;
public void setId(Long id)
{
this.id = id;
}
public Long getId()
{
return id;
}
public void setImei(String imei)
{
this.imei = imei;
}
public String getImei()
{
return imei;
}
public void setRoller(String roller)
{
this.roller = roller;
}
public String getRoller()
{
return roller;
}
public void setOpType(Long opType)
{
this.opType = opType;
}
public Long getOpType()
{
return opType;
}
public void setPayload(String payload)
{
this.payload = payload;
}
public String getPayload()
{
return payload;
}
public void setClientid(String clientid)
{
this.clientid = clientid;
}
public String getClientid()
{
return clientid;
}
public void setOpTime(Date opTime)
{
this.opTime = opTime;
}
public Date getOpTime()
{
return opTime;
}
@Override
public String toString() {
return new ToStringBuilder(this,ToStringStyle.MULTI_LINE_STYLE)
.append("id", getId())
.append("imei", getImei())
.append("roller", getRoller())
.append("opType", getOpType())
.append("payload", getPayload())
.append("clientid", getClientid())
.append("opTime", getOpTime())
.append("createBy", getCreateBy())
.append("createTime", getCreateTime())
.append("updateBy", getUpdateBy())
.append("updateTime", getUpdateTime())
.toString();
}
}

View File

@ -1,62 +0,0 @@
package com.agri.system.mapper;
import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.agri.system.domain.SysRollerAir;
/**
* Mapper
*
* @author lld
* @date 2026-03-04
*/
public interface SysRollerAirMapper extends BaseMapper<SysRollerAir>
{
/**
*
*
* @param id
* @return
*/
public SysRollerAir selectSysRollerAirById(Long id);
/**
*
*
* @param sysRollerAir
* @return
*/
public List<SysRollerAir> selectSysRollerAirList(SysRollerAir sysRollerAir);
/**
*
*
* @param sysRollerAir
* @return
*/
public int insertSysRollerAir(SysRollerAir sysRollerAir);
/**
*
*
* @param sysRollerAir
* @return
*/
public int updateSysRollerAir(SysRollerAir sysRollerAir);
/**
*
*
* @param id
* @return
*/
public int deleteSysRollerAirById(Long id);
/**
*
*
* @param ids
* @return
*/
public int deleteSysRollerAirByIds(Long[] ids);
}

View File

@ -1,61 +0,0 @@
package com.agri.system.service;
import java.util.List;
import com.baomidou.mybatisplus.extension.service.IService;
import com.agri.system.domain.SysRollerAir;
/**
* Service
*
* @author lld
* @date 2026-03-04
*/
public interface ISysRollerAirService extends IService<SysRollerAir> {
/**
*
*
* @param id
* @return
*/
public SysRollerAir selectSysRollerAirById(Long id);
/**
*
*
* @param sysRollerAir
* @return
*/
public List<SysRollerAir> selectSysRollerAirList(SysRollerAir sysRollerAir);
/**
*
*
* @param sysRollerAir
* @return
*/
public int insertSysRollerAir(SysRollerAir sysRollerAir);
/**
*
*
* @param sysRollerAir
* @return
*/
public int updateSysRollerAir(SysRollerAir sysRollerAir);
/**
*
*
* @param ids
* @return
*/
public int deleteSysRollerAirByIds(Long[] ids);
/**
*
*
* @param id
* @return
*/
public int deleteSysRollerAirById(Long id);
}

View File

@ -1,94 +0,0 @@
package com.agri.system.service.impl;
import java.util.List;
import com.agri.common.utils.DateUtils;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.stereotype.Service;
import com.agri.system.mapper.SysRollerAirMapper;
import com.agri.system.domain.SysRollerAir;
import com.agri.system.service.ISysRollerAirService;
/**
* Service
*
* @author lld
* @date 2026-03-04
*/
@Service
public class SysRollerAirServiceImpl extends ServiceImpl<SysRollerAirMapper, SysRollerAir> implements ISysRollerAirService
{
/**
*
*
* @param id
* @return
*/
@Override
public SysRollerAir selectSysRollerAirById(Long id)
{
return baseMapper.selectSysRollerAirById(id);
}
/**
*
*
* @param sysRollerAir
* @return
*/
@Override
public List<SysRollerAir> selectSysRollerAirList(SysRollerAir sysRollerAir)
{
return baseMapper.selectSysRollerAirList(sysRollerAir);
}
/**
*
*
* @param sysRollerAir
* @return
*/
@Override
public int insertSysRollerAir(SysRollerAir sysRollerAir)
{
sysRollerAir.setCreateTime(DateUtils.getNowDate());
return baseMapper.insertSysRollerAir(sysRollerAir);
}
/**
*
*
* @param sysRollerAir
* @return
*/
@Override
public int updateSysRollerAir(SysRollerAir sysRollerAir)
{
sysRollerAir.setUpdateTime(DateUtils.getNowDate());
return baseMapper.updateSysRollerAir(sysRollerAir);
}
/**
*
*
* @param ids
* @return
*/
@Override
public int deleteSysRollerAirByIds(Long[] ids)
{
return baseMapper.deleteSysRollerAirByIds(ids);
}
/**
*
*
* @param id
* @return
*/
@Override
public int deleteSysRollerAirById(Long id)
{
return baseMapper.deleteSysRollerAirById(id);
}
}

View File

@ -3,6 +3,7 @@ package com.agri.web.controller.mqtt;
import com.agri.common.annotation.Log; import com.agri.common.annotation.Log;
import com.agri.common.core.domain.AjaxResult; import com.agri.common.core.domain.AjaxResult;
import com.agri.common.enums.BusinessType; import com.agri.common.enums.BusinessType;
import com.agri.framework.manager.AgriStatusManager;
import com.agri.framework.manager.MqttAutoOffManager; import com.agri.framework.manager.MqttAutoOffManager;
import com.agri.framework.manager.MqttClientManager; import com.agri.framework.manager.MqttClientManager;
import com.agri.framework.manager.MqttSubscriptionManager; import com.agri.framework.manager.MqttSubscriptionManager;
@ -10,13 +11,7 @@ import com.google.common.collect.Lists;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.List; import java.util.List;
@ -37,6 +32,10 @@ public class MqttController {
@Autowired @Autowired
private MqttAutoOffManager mqttAutoOffManager; private MqttAutoOffManager mqttAutoOffManager;
@Autowired
private AgriStatusManager agriStatusManager;
/** /**
* *
*/ */
@ -146,4 +145,15 @@ public class MqttController {
boolean hasTask = mqttAutoOffManager.hasAutoOffTask(deviceId); boolean hasTask = mqttAutoOffManager.hasAutoOffTask(deviceId);
return AjaxResult.success(hasTask); return AjaxResult.success(hasTask);
} }
@PostMapping("/getAgriStatus")
public void getAgriStatus(@RequestBody List<String> imeiList) {
if (imeiList.isEmpty()) {
log.info("大棚表无数据,结束推送");
return;
}
agriStatusManager.asyncBatchPushMqtt(agriStatusManager.batchCheckDeviceOnline(imeiList));
}
} }

View File

@ -1,97 +0,0 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.agri.system.mapper.SysRollerAirMapper">
<resultMap type="SysRollerAir" id="SysRollerAirResult">
<result property="id" column="id" />
<result property="imei" column="imei" />
<result property="roller" column="roller" />
<result property="opType" column="op_type" />
<result property="payload" column="payload" />
<result property="clientid" column="clientid" />
<result property="opTime" column="op_time" />
<result property="createBy" column="create_by" />
<result property="createTime" column="create_time" />
<result property="updateBy" column="update_by" />
<result property="updateTime" column="update_time" />
</resultMap>
<sql id="selectSysRollerAirVo">
select id, imei, roller, op_type, payload, clientid, op_time, create_by, create_time, update_by, update_time from sys_roller_air
</sql>
<select id="selectSysRollerAirList" parameterType="SysRollerAir" resultMap="SysRollerAirResult">
<include refid="selectSysRollerAirVo"/>
<where>
<if test="imei != null and imei != ''"> and imei = #{imei}</if>
<if test="roller != null and roller != ''"> and roller = #{roller}</if>
<if test="opType != null "> and op_type = #{opType}</if>
<if test="payload != null and payload != ''"> and payload = #{payload}</if>
<if test="clientid != null and clientid != ''"> and clientid = #{clientid}</if>
<if test="opTime != null "> and op_time = #{opTime}</if>
</where>
</select>
<select id="selectSysRollerAirById" parameterType="Long" resultMap="SysRollerAirResult">
<include refid="selectSysRollerAirVo"/>
where id = #{id}
</select>
<insert id="insertSysRollerAir" parameterType="SysRollerAir" useGeneratedKeys="true" keyProperty="id">
insert into sys_roller_air
<trim prefix="(" suffix=")" suffixOverrides=",">
<if test="imei != null and imei != ''">imei,</if>
<if test="roller != null and roller != ''">roller,</if>
<if test="opType != null">op_type,</if>
<if test="payload != null">payload,</if>
<if test="clientid != null">clientid,</if>
<if test="opTime != null">op_time,</if>
create_by,
create_time,
update_by,
update_time,
</trim>
<trim prefix="values (" suffix=")" suffixOverrides=",">
<if test="imei != null and imei != ''">#{imei},</if>
<if test="roller != null and roller != ''">#{roller},</if>
<if test="opType != null">#{opType},</if>
<if test="payload != null">#{payload},</if>
<if test="clientid != null">#{clientid},</if>
<if test="opTime != null">#{opTime},</if>
#{createBy},
#{createTime},
#{updateBy},
#{updateTime},
</trim>
</insert>
<update id="updateSysRollerAir" parameterType="SysRollerAir">
update sys_roller_air
<trim prefix="SET" suffixOverrides=",">
<if test="imei != null and imei != ''">imei = #{imei},</if>
<if test="roller != null and roller != ''">roller = #{roller},</if>
<if test="opType != null">op_type = #{opType},</if>
<if test="payload != null">payload = #{payload},</if>
<if test="clientid != null">clientid = #{clientid},</if>
<if test="opTime != null">op_time = #{opTime},</if>
<if test="createBy != null">create_by = #{createBy},</if>
<if test="createTime != null">create_time = #{createTime},</if>
<if test="updateBy != null">update_by = #{updateBy},</if>
<if test="updateTime != null">update_time = #{updateTime},</if>
</trim>
where id = #{id}
</update>
<delete id="deleteSysRollerAirById" parameterType="Long">
delete from sys_roller_air where id = #{id}
</delete>
<delete id="deleteSysRollerAirByIds" parameterType="String">
delete from sys_roller_air where id in
<foreach item="id" collection="array" open="(" separator="," close=")">
#{id}
</foreach>
</delete>
</mapper>

View File

@ -0,0 +1,148 @@
package com.agri.framework.manager;
import com.agri.framework.config.MqttConfig;
import com.agri.system.service.ISysAgriInfoService;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
public class AgriStatusManager {
private static final Logger log = LoggerFactory.getLogger(AgriStatusManager.class);
// Redis 前缀常量
private static final String SUB_KEY_PREFIX = "sub:";
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private MqttConfig.MqttMessageSender mqttMessageSender;
// JSON序列化工具单例
private final ObjectMapper objectMapper = new ObjectMapper();
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
// ========== 批量查在线状态Pipeline 优化版JDK 8 适配) ==========
// 在线离线的都得推
public Map<String, Map<String, Boolean>> batchCheckDeviceOnline(List<String> imeiList) {
Map<String, Map<String, Boolean>> result = new HashMap<>();
if (imeiList.isEmpty()) {
return result;
}
// JDK 8 显式声明 RedisCallback避免 Lambda 泛型问题
List<Object> results = stringRedisTemplate.executePipelined(
new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) {
StringRedisSerializer serializer = new StringRedisSerializer();
for (String imei : imeiList) {
byte[] onlineKeyBytes = serializer.serialize(SUB_KEY_PREFIX + imei);
connection.exists(onlineKeyBytes); // 批量执行 exists
connection.exists(serializer.serialize(imei));
}
return null;
}
},
new StringRedisSerializer()
);
// 解析结果每两个结果对应一个IMEIsubExist + imeiOnline
for (int i = 0; i < imeiList.size(); i++) {
String imei = imeiList.get(i);
// 初始化默认状态:不存在+离线
boolean subExist = false;
boolean imeiOnline = false;
// 越界判断避免IndexOutOfBoundsException
int subIndex = i * 2;
int imeiIndex = i * 2 + 1;
if (subIndex < results.size()) {
Object subResult = results.get(subIndex);
subExist = parseExistsResult(subResult);
}
if (imeiIndex < results.size()) {
Object imeiResult = results.get(imeiIndex);
imeiOnline = parseExistsResult(imeiResult);
}
result.put(imei, ImmutableMap.of("subExist", subExist, "imeiOnline", imeiOnline));
}
return result;
}
private boolean parseExistsResult(Object result) {
if (result instanceof Long) {
return ((Long) result) == 1;
} else if (result instanceof Boolean) {
return (Boolean) result;
}
return false;
}
// ========== 核心方法3异步批量推送在线状态到 MQTT线程池隔离 ==========
@Async("mqttPushExecutor")
public void asyncBatchPushMqtt(Map<String, Map<String, Boolean>> statusMap) {
if (statusMap.isEmpty()) {
log.info("不存在任何imei");
return;
}
int successCount = 0;
int failCount = 0;
String dateNow = LocalDateTime.now().format(DATE_TIME_FORMATTER);
// 在线状态
for (Map.Entry<String, Map<String, Boolean>> map : statusMap.entrySet()) {
String imei = map.getKey();
try {
// 按你的需求,直接推送到 frontend/{imei}/online 主题
Map<String, Boolean> imeiMap = map.getValue();
// 设备在线的 && 推送首页状态 离线在线都推
if (imeiMap.get("subExist")) {
// 构造首页消息用ObjectMapper序列化避免手动拼接JSON
Map<String, Object> onlineMsg = new HashMap<>();
onlineMsg.put("online", imeiMap.get("imeiOnline") ? "在线" : "离线");
onlineMsg.put("time", dateNow); // 毫秒时间戳
onlineMsg.put("imei", imei);
String onlineMessage = objectMapper.writeValueAsString(onlineMsg);
mqttMessageSender.publish("device/" + imei + "/status", onlineMessage);
}
// 无论设备是否在线 只要离线就推送设备状态
if (!imeiMap.get("imeiOnline")) {
// todo 设备离线推送 发消息提醒
Map<String, Object> alarmMsg = new HashMap<>();
alarmMsg.put("online", "设备离线");
alarmMsg.put("time", dateNow);
alarmMsg.put("imei", imei);
String alarmMessage = objectMapper.writeValueAsString(alarmMsg);
mqttMessageSender.publish("frontend/" + imei + "/alarm", alarmMessage);
}
successCount++;
} catch (Exception e) {
failCount++;
log.error("向设备 {} 推送在线状态失败", imei, e);
}
}
log.info("批量在线状态推送完成:成功={},失败={}", successCount, failCount);
}
}

View File

@ -1,5 +1,6 @@
package com.agri.quartz.task; package com.agri.quartz.task;
import com.agri.framework.manager.AgriStatusManager;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.agri.framework.config.MqttConfig; import com.agri.framework.config.MqttConfig;
import com.agri.system.service.ISysAgriInfoService; import com.agri.system.service.ISysAgriInfoService;
@ -38,19 +39,14 @@ public class AgriStatusTask {
@Resource @Resource
private StringRedisTemplate stringRedisTemplate; private StringRedisTemplate stringRedisTemplate;
@Resource
private MqttConfig.MqttMessageSender mqttMessageSender;
@Value("${spring.mqtt.dtu-ctl-lock-ttl:15}") @Value("${spring.mqtt.dtu-ctl-lock-ttl:15}")
private int lockTtl; private int lockTtl;
@Autowired @Autowired
private ISysAgriInfoService agriInfoService; private ISysAgriInfoService agriInfoService;
// JSON序列化工具单例 @Autowired
private final ObjectMapper objectMapper = new ObjectMapper(); private AgriStatusManager agriStatusManager;
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
/** /**
* 10 * 10
@ -58,7 +54,7 @@ public class AgriStatusTask {
* 2. 线 * 2. 线
* 3. 线 MQTT frontend/{imei}/online * 3. 线 MQTT frontend/{imei}/online
*/ */
public void pushOnlineStatus() { public void pushDeviceStatus() {
// 1. 加分布式锁,避免集群重复执行 // 1. 加分布式锁,避免集群重复执行
Boolean lockSuccess = stringRedisTemplate.opsForValue() Boolean lockSuccess = stringRedisTemplate.opsForValue()
.setIfAbsent(LOCK_KEY, "running", lockTtl, TimeUnit.SECONDS); .setIfAbsent(LOCK_KEY, "running", lockTtl, TimeUnit.SECONDS);
@ -89,7 +85,7 @@ public class AgriStatusTask {
log.info("从大棚表获取到合法IMEI总数{}", imeiList.size()); log.info("从大棚表获取到合法IMEI总数{}", imeiList.size());
// 3. 批量查询设备在线状态Redis Pipeline一次网络往返 // 3. 批量查询设备在线状态Redis Pipeline一次网络往返
asyncBatchPushMqtt(batchCheckDeviceOnline(imeiList)); agriStatusManager.asyncBatchPushMqtt(agriStatusManager.batchCheckDeviceOnline(imeiList));
} catch (Exception e) { } catch (Exception e) {
log.error("设备在线状态推送任务异常", e); log.error("设备在线状态推送任务异常", e);
// 可选:异常告警(如企业微信/钉钉) // 可选:异常告警(如企业微信/钉钉)
@ -101,109 +97,6 @@ public class AgriStatusTask {
} }
} }
// ========== 批量查在线状态Pipeline 优化版JDK 8 适配) ==========
// 在线离线的都得推
private Map<String, Map<String, Boolean>> batchCheckDeviceOnline(List<String> imeiList) {
Map<String, Map<String, Boolean>> result = new HashMap<>();
if (imeiList.isEmpty()) {
return result;
}
// JDK 8 显式声明 RedisCallback避免 Lambda 泛型问题
List<Object> results = stringRedisTemplate.executePipelined(
new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) {
StringRedisSerializer serializer = new StringRedisSerializer();
for (String imei : imeiList) {
byte[] onlineKeyBytes = serializer.serialize(SUB_KEY_PREFIX + imei);
connection.exists(onlineKeyBytes); // 批量执行 exists
connection.exists(serializer.serialize(imei));
}
return null;
}
},
new StringRedisSerializer()
);
// 解析结果每两个结果对应一个IMEIsubExist + imeiOnline
for (int i = 0; i < imeiList.size(); i++) {
String imei = imeiList.get(i);
// 初始化默认状态:不存在+离线
boolean subExist = false;
boolean imeiOnline = false;
// 越界判断避免IndexOutOfBoundsException
int subIndex = i * 2;
int imeiIndex = i * 2 + 1;
if (subIndex < results.size()) {
Object subResult = results.get(subIndex);
subExist = parseExistsResult(subResult);
}
if (imeiIndex < results.size()) {
Object imeiResult = results.get(imeiIndex);
imeiOnline = parseExistsResult(imeiResult);
}
result.put(imei, ImmutableMap.of("subExist", subExist, "imeiOnline", imeiOnline));
}
return result;
}
private boolean parseExistsResult(Object result) {
if (result instanceof Long) {
return ((Long) result) == 1;
} else if (result instanceof Boolean) {
return (Boolean) result;
}
return false;
}
// ========== 核心方法3异步批量推送在线状态到 MQTT线程池隔离 ==========
@Async("mqttPushExecutor")
public void asyncBatchPushMqtt(Map<String, Map<String, Boolean>> statusMap) {
if (statusMap.isEmpty()) {
log.info("不存在任何imei");
return;
}
int successCount = 0;
int failCount = 0;
String dateNow = LocalDateTime.now().format(DATE_TIME_FORMATTER);
// 在线状态
for (Map.Entry<String, Map<String, Boolean>> map : statusMap.entrySet()) {
String imei = map.getKey();
try {
// 按你的需求,直接推送到 frontend/{imei}/online 主题
Map<String, Boolean> imeiMap = map.getValue();
// 设备在线的 && 推送首页状态 离线在线都推
if (imeiMap.get("subExist")) {
// 构造首页消息用ObjectMapper序列化避免手动拼接JSON
Map<String, Object> onlineMsg = new HashMap<>();
onlineMsg.put("online", imeiMap.get("imeiOnline") ? "在线" : "离线");
onlineMsg.put("time", dateNow); // 毫秒时间戳
onlineMsg.put("imei", imei);
String onlineMessage = objectMapper.writeValueAsString(onlineMsg);
mqttMessageSender.publish("device/" + imei + "/status", onlineMessage);
}
// 无论设备是否在线 只要离线就推送设备状态
if (!imeiMap.get("imeiOnline")) {
// todo 设备离线推送 发消息提醒
Map<String, Object> alarmMsg = new HashMap<>();
alarmMsg.put("online", "设备离线");
alarmMsg.put("time", dateNow);
alarmMsg.put("imei", imei);
String alarmMessage = objectMapper.writeValueAsString(alarmMsg);
mqttMessageSender.publish("frontend/" + imei + "/alarm", alarmMessage);
}
successCount++;
} catch (Exception e) {
failCount++;
log.error("向设备 {} 推送在线状态失败", imei, e);
}
}
log.info("批量在线状态推送完成:成功={},失败={}", successCount, failCount);
}
/* /*
* LuaIMEIsub:{imei} * LuaIMEIsub:{imei}

View File

@ -2,52 +2,73 @@ package com.agri.quartz.task;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.RedisConnection; import org.springframework.beans.BeansException;
import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.data.redis.core.Cursor; import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ScanOptions; import org.springframework.data.redis.core.ScanOptions;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
/**
* JDK 8 Redis
* 1. RedisTemplate 2. Cursor<byte[]> 3.
* Redis sub:
*/
@Component("agriTask") @Component("agriTask")
public class AgriTask { public class AgriTask {
@Resource // 核心修复:指定注入的 Bean 名称为 "redisTemplate"(匹配自定义配置的 Bean
private RedisConnectionFactory redisConnectionFactory; @Autowired
private final static Logger log = LoggerFactory.getLogger(AgriTask.class); private RedisTemplate<String, String> redisTemplate;
private static final Logger log = LoggerFactory.getLogger(AgriTask.class);
/** /**
* * sub: Redis
* redis sub:
*/ */
public void clearInvalidCache() { public void clearInvalidCache() {
log.info("===== 开始执行Redis sub: 键清理任务 ====="); log.info("===== 开始执行Redis sub: 键清理任务 =====");
RedisConnection connection = null;
Cursor<byte[]> cursor = null;
int deletedCount = 0; int deletedCount = 0;
// 兜底获取 RedisTemplate指定 Bean 名称,解决多实例冲突)
if (redisTemplate == null) {
log.error("RedisTemplate 初始化失败,清理任务终止");
return;
}
try { try {
// 获取Redis连接 // 配置 SCAN 参数:匹配 sub:*分批遍历每次1000条
connection = redisConnectionFactory.getConnection();
// 配置SCAN参数匹配sub:*分批遍历每次1000条
ScanOptions scanOptions = ScanOptions.scanOptions() ScanOptions scanOptions = ScanOptions.scanOptions()
.match("sub:*") .match("sub:*")
.count(1000) .count(1000)
.build(); .build();
// 遍历所有匹配的键 // 接收 Cursor<byte[]> 类型JDK 8 下原生返回类型)
cursor = connection.scan(scanOptions); Cursor<byte[]> cursor = redisTemplate.executeWithStickyConnection(connection ->
List<byte[]> batchKeys = new ArrayList<>(1000); // 批量删除缓冲区 connection.scan(scanOptions)
);
while (cursor.hasNext()) { // 批量删除缓冲区(存储 String 类型键)
batchKeys.add(cursor.next()); List<String> batchKeys = new ArrayList<>(1000);
// 每攒1000个键批量删除减少网络交互
// 遍历字节数组类型的键,转换为 String
while (cursor != null && cursor.hasNext()) {
byte[] keyBytes = cursor.next();
// 转换 byte[] -> StringUTF-8 编码,避免乱码)
String key = new String(keyBytes, StandardCharsets.UTF_8);
batchKeys.add(key);
// 每攒1000个键批量删除
if (batchKeys.size() >= 1000) { if (batchKeys.size() >= 1000) {
deletedCount += batchKeys.size(); deletedCount += batchKeys.size();
connection.del(batchKeys.toArray(new byte[0][])); redisTemplate.delete(batchKeys);
log.info("批量删除 {} 个sub: 键", batchKeys.size()); log.info("批量删除 {} 个sub: 键", batchKeys.size());
batchKeys.clear(); batchKeys.clear();
} }
@ -56,27 +77,24 @@ public class AgriTask {
// 删除剩余的键 // 删除剩余的键
if (!batchKeys.isEmpty()) { if (!batchKeys.isEmpty()) {
deletedCount += batchKeys.size(); deletedCount += batchKeys.size();
connection.del(batchKeys.toArray(new byte[0][])); redisTemplate.delete(batchKeys);
log.info("批量删除剩余 {} 个sub: 键", batchKeys.size()); log.info("批量删除剩余 {} 个sub: 键", batchKeys.size());
} }
// 关闭游标(避免资源泄漏)
if (cursor != null) {
try {
cursor.close();
} catch (Exception e) {
log.error("关闭 Redis 游标失败", e);
}
}
log.info("===== Redis sub: 键清理完成,总计删除 {} 个键 =====", deletedCount); log.info("===== Redis sub: 键清理完成,总计删除 {} 个键 =====", deletedCount);
} catch (Exception e) { } catch (Exception e) {
log.error("Redis sub: 键清理失败", e); log.error("Redis sub: 键清理失败", e);
// 可选:添加告警逻辑(如钉钉/邮件通知)
} finally {
// 关闭游标和连接
if (cursor != null) {
try {
cursor.close();
} catch (Exception e) {
log.error("关闭游标失败", e);
}
}
if (connection != null) {
connection.close();
}
} }
} }
}
}

View File

@ -17,7 +17,7 @@ public class TempTask {
public void ryNoParams() public void ryNoParams()
{ {
dtuDataService.lambdaUpdate() dtuDataService.lambdaUpdate()
.eq(SysDtuData::getImei, "864536071851206") .eq(SysDtuData::getImei, "864865085008523")
.orderByDesc(SysDtuData::getId) .orderByDesc(SysDtuData::getId)
.last("limit 1") .last("limit 1")
.set(SysDtuData::getTime, new Date()) .set(SysDtuData::getTime, new Date())