diff --git a/agri-admin/src/main/java/com/agri/system/controller/SysRollerAirController.java b/agri-admin/src/main/java/com/agri/system/controller/SysRollerAirController.java deleted file mode 100644 index ee02ef7..0000000 --- a/agri-admin/src/main/java/com/agri/system/controller/SysRollerAirController.java +++ /dev/null @@ -1,104 +0,0 @@ -package com.agri.system.controller; - -import java.util.List; -import javax.servlet.http.HttpServletResponse; -import org.springframework.security.access.prepost.PreAuthorize; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.PutMapping; -import org.springframework.web.bind.annotation.DeleteMapping; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; -import com.agri.common.annotation.Log; -import com.agri.common.core.controller.BaseController; -import com.agri.common.core.domain.AjaxResult; -import com.agri.common.enums.BusinessType; -import com.agri.system.domain.SysRollerAir; -import com.agri.system.service.ISysRollerAirService; -import com.agri.common.utils.poi.ExcelUtil; -import com.agri.common.core.page.TableDataInfo; - -/** - * 自动化卷膜风口大小设置Controller - * - * @author lld - * @date 2026-03-04 - */ -@RestController -@RequestMapping("/assets/air") -public class SysRollerAirController extends BaseController -{ - @Autowired - private ISysRollerAirService sysRollerAirService; - - /** - * 查询自动化卷膜风口大小设置列表 - */ - @PreAuthorize("@ss.hasPermi('assets:air:list')") - @GetMapping("/list") - public TableDataInfo list(SysRollerAir sysRollerAir) - { - startPage(); - List list = sysRollerAirService.selectSysRollerAirList(sysRollerAir); - return getDataTable(list); - } - - /** - * 导出自动化卷膜风口大小设置列表 - */ - @PreAuthorize("@ss.hasPermi('assets:air:export')") - @Log(title = "自动化卷膜风口大小设置", businessType = BusinessType.EXPORT) - @PostMapping("/export") - public void export(HttpServletResponse response, SysRollerAir sysRollerAir) - { - List list = sysRollerAirService.selectSysRollerAirList(sysRollerAir); - ExcelUtil util = new ExcelUtil(SysRollerAir.class); - util.exportExcel(response, list, "自动化卷膜风口大小设置数据"); - } - - /** - * 获取自动化卷膜风口大小设置详细信息 - */ - @PreAuthorize("@ss.hasPermi('assets:air:query')") - @GetMapping(value = "/{id}") - public AjaxResult getInfo(@PathVariable("id") Long id) - { - return success(sysRollerAirService.selectSysRollerAirById(id)); - } - - /** - * 新增自动化卷膜风口大小设置 - */ - @PreAuthorize("@ss.hasPermi('assets:air:add')") - @Log(title = "自动化卷膜风口大小设置", businessType = BusinessType.INSERT) - @PostMapping - public AjaxResult add(@RequestBody SysRollerAir sysRollerAir) - { - return toAjax(sysRollerAirService.insertSysRollerAir(sysRollerAir)); - } - - /** - * 修改自动化卷膜风口大小设置 - */ - @PreAuthorize("@ss.hasPermi('assets:air:edit')") - @Log(title = "自动化卷膜风口大小设置", businessType = BusinessType.UPDATE) - @PutMapping - public AjaxResult edit(@RequestBody SysRollerAir sysRollerAir) - { - return toAjax(sysRollerAirService.updateSysRollerAir(sysRollerAir)); - } - - /** - * 删除自动化卷膜风口大小设置 - */ - @PreAuthorize("@ss.hasPermi('assets:air:remove')") - @Log(title = "自动化卷膜风口大小设置", businessType = BusinessType.DELETE) - @DeleteMapping("/{ids}") - public AjaxResult remove(@PathVariable Long[] ids) - { - return toAjax(sysRollerAirService.deleteSysRollerAirByIds(ids)); - } -} diff --git a/agri-admin/src/main/java/com/agri/system/domain/SysRollerAir.java b/agri-admin/src/main/java/com/agri/system/domain/SysRollerAir.java deleted file mode 100644 index c72692e..0000000 --- a/agri-admin/src/main/java/com/agri/system/domain/SysRollerAir.java +++ /dev/null @@ -1,139 +0,0 @@ -package com.agri.system.domain; - -import java.util.Date; -import com.fasterxml.jackson.annotation.JsonFormat; -import com.baomidou.mybatisplus.annotation.TableName; -import org.apache.commons.lang3.builder.ToStringBuilder; -import org.apache.commons.lang3.builder.ToStringStyle; -import com.baomidou.mybatisplus.annotation.TableField; -import com.baomidou.mybatisplus.annotation.TableName; -import com.agri.common.annotation.Excel; -import com.agri.common.core.domain.BaseEntity; - -/** - * 自动化卷膜风口大小设置对象 sys_roller_air - * - * @author lld - * @date 2026-03-04 - */ -@TableName("sys_roller_air") -public class SysRollerAir extends BaseEntity -{ - @TableField(exist = false) - private static final long serialVersionUID = 1L; - - /** 主键ID */ - private Long id; - - /** 设备IMEI码 */ - @Excel(name = "设备IMEI码") - private String imei; - - /** 卷膜器编号/标识 */ - @Excel(name = "卷膜器编号/标识") - private String roller; - - /** 操作类型 0-停止 1-运行 2-查询 3-重置 */ - @Excel(name = "操作类型 0-停止 1-运行 2-查询 3-重置") - private Long opType; - - /** 操作参数(JSON格式,存储风口大小等配置) */ - @Excel(name = "操作参数(JSON格式,存储风口大小等配置)") - private String payload; - - /** 客户端ID */ - @Excel(name = "客户端ID") - private String clientid; - - /** 操作执行时间 */ - @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8") - @Excel(name = "操作执行时间", width = 30, dateFormat = "yyyy-MM-dd") - private Date opTime; - - public void setId(Long id) - { - this.id = id; - } - - public Long getId() - { - return id; - } - - public void setImei(String imei) - { - this.imei = imei; - } - - public String getImei() - { - return imei; - } - - public void setRoller(String roller) - { - this.roller = roller; - } - - public String getRoller() - { - return roller; - } - - public void setOpType(Long opType) - { - this.opType = opType; - } - - public Long getOpType() - { - return opType; - } - - public void setPayload(String payload) - { - this.payload = payload; - } - - public String getPayload() - { - return payload; - } - - public void setClientid(String clientid) - { - this.clientid = clientid; - } - - public String getClientid() - { - return clientid; - } - - public void setOpTime(Date opTime) - { - this.opTime = opTime; - } - - public Date getOpTime() - { - return opTime; - } - - @Override - public String toString() { - return new ToStringBuilder(this,ToStringStyle.MULTI_LINE_STYLE) - .append("id", getId()) - .append("imei", getImei()) - .append("roller", getRoller()) - .append("opType", getOpType()) - .append("payload", getPayload()) - .append("clientid", getClientid()) - .append("opTime", getOpTime()) - .append("createBy", getCreateBy()) - .append("createTime", getCreateTime()) - .append("updateBy", getUpdateBy()) - .append("updateTime", getUpdateTime()) - .toString(); - } -} diff --git a/agri-admin/src/main/java/com/agri/system/mapper/SysRollerAirMapper.java b/agri-admin/src/main/java/com/agri/system/mapper/SysRollerAirMapper.java deleted file mode 100644 index c90a013..0000000 --- a/agri-admin/src/main/java/com/agri/system/mapper/SysRollerAirMapper.java +++ /dev/null @@ -1,62 +0,0 @@ -package com.agri.system.mapper; - -import java.util.List; -import com.baomidou.mybatisplus.core.mapper.BaseMapper; -import com.agri.system.domain.SysRollerAir; - -/** - * 自动化卷膜风口大小设置Mapper接口 - * - * @author lld - * @date 2026-03-04 - */ -public interface SysRollerAirMapper extends BaseMapper -{ - /** - * 查询自动化卷膜风口大小设置 - * - * @param id 自动化卷膜风口大小设置主键 - * @return 自动化卷膜风口大小设置 - */ - public SysRollerAir selectSysRollerAirById(Long id); - - /** - * 查询自动化卷膜风口大小设置列表 - * - * @param sysRollerAir 自动化卷膜风口大小设置 - * @return 自动化卷膜风口大小设置集合 - */ - public List selectSysRollerAirList(SysRollerAir sysRollerAir); - - /** - * 新增自动化卷膜风口大小设置 - * - * @param sysRollerAir 自动化卷膜风口大小设置 - * @return 结果 - */ - public int insertSysRollerAir(SysRollerAir sysRollerAir); - - /** - * 修改自动化卷膜风口大小设置 - * - * @param sysRollerAir 自动化卷膜风口大小设置 - * @return 结果 - */ - public int updateSysRollerAir(SysRollerAir sysRollerAir); - - /** - * 删除自动化卷膜风口大小设置 - * - * @param id 自动化卷膜风口大小设置主键 - * @return 结果 - */ - public int deleteSysRollerAirById(Long id); - - /** - * 批量删除自动化卷膜风口大小设置 - * - * @param ids 需要删除的数据主键集合 - * @return 结果 - */ - public int deleteSysRollerAirByIds(Long[] ids); -} diff --git a/agri-admin/src/main/java/com/agri/system/service/ISysRollerAirService.java b/agri-admin/src/main/java/com/agri/system/service/ISysRollerAirService.java deleted file mode 100644 index f679ddf..0000000 --- a/agri-admin/src/main/java/com/agri/system/service/ISysRollerAirService.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.agri.system.service; - -import java.util.List; -import com.baomidou.mybatisplus.extension.service.IService; -import com.agri.system.domain.SysRollerAir; - -/** - * 自动化卷膜风口大小设置Service接口 - * - * @author lld - * @date 2026-03-04 - */ -public interface ISysRollerAirService extends IService { - /** - * 查询自动化卷膜风口大小设置 - * - * @param id 自动化卷膜风口大小设置主键 - * @return 自动化卷膜风口大小设置 - */ - public SysRollerAir selectSysRollerAirById(Long id); - - /** - * 查询自动化卷膜风口大小设置列表 - * - * @param sysRollerAir 自动化卷膜风口大小设置 - * @return 自动化卷膜风口大小设置集合 - */ - public List selectSysRollerAirList(SysRollerAir sysRollerAir); - - /** - * 新增自动化卷膜风口大小设置 - * - * @param sysRollerAir 自动化卷膜风口大小设置 - * @return 结果 - */ - public int insertSysRollerAir(SysRollerAir sysRollerAir); - - /** - * 修改自动化卷膜风口大小设置 - * - * @param sysRollerAir 自动化卷膜风口大小设置 - * @return 结果 - */ - public int updateSysRollerAir(SysRollerAir sysRollerAir); - - /** - * 批量删除自动化卷膜风口大小设置 - * - * @param ids 需要删除的自动化卷膜风口大小设置主键集合 - * @return 结果 - */ - public int deleteSysRollerAirByIds(Long[] ids); - - /** - * 删除自动化卷膜风口大小设置信息 - * - * @param id 自动化卷膜风口大小设置主键 - * @return 结果 - */ - public int deleteSysRollerAirById(Long id); -} diff --git a/agri-admin/src/main/java/com/agri/system/service/impl/SysRollerAirServiceImpl.java b/agri-admin/src/main/java/com/agri/system/service/impl/SysRollerAirServiceImpl.java deleted file mode 100644 index abdb4f4..0000000 --- a/agri-admin/src/main/java/com/agri/system/service/impl/SysRollerAirServiceImpl.java +++ /dev/null @@ -1,94 +0,0 @@ -package com.agri.system.service.impl; - -import java.util.List; -import com.agri.common.utils.DateUtils; -import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import org.springframework.stereotype.Service; -import com.agri.system.mapper.SysRollerAirMapper; -import com.agri.system.domain.SysRollerAir; -import com.agri.system.service.ISysRollerAirService; - -/** - * 自动化卷膜风口大小设置Service业务层处理 - * - * @author lld - * @date 2026-03-04 - */ -@Service -public class SysRollerAirServiceImpl extends ServiceImpl implements ISysRollerAirService -{ - - /** - * 查询自动化卷膜风口大小设置 - * - * @param id 自动化卷膜风口大小设置主键 - * @return 自动化卷膜风口大小设置 - */ - @Override - public SysRollerAir selectSysRollerAirById(Long id) - { - return baseMapper.selectSysRollerAirById(id); - } - - /** - * 查询自动化卷膜风口大小设置列表 - * - * @param sysRollerAir 自动化卷膜风口大小设置 - * @return 自动化卷膜风口大小设置 - */ - @Override - public List selectSysRollerAirList(SysRollerAir sysRollerAir) - { - return baseMapper.selectSysRollerAirList(sysRollerAir); - } - - /** - * 新增自动化卷膜风口大小设置 - * - * @param sysRollerAir 自动化卷膜风口大小设置 - * @return 结果 - */ - @Override - public int insertSysRollerAir(SysRollerAir sysRollerAir) - { - sysRollerAir.setCreateTime(DateUtils.getNowDate()); - return baseMapper.insertSysRollerAir(sysRollerAir); - } - - /** - * 修改自动化卷膜风口大小设置 - * - * @param sysRollerAir 自动化卷膜风口大小设置 - * @return 结果 - */ - @Override - public int updateSysRollerAir(SysRollerAir sysRollerAir) - { - sysRollerAir.setUpdateTime(DateUtils.getNowDate()); - return baseMapper.updateSysRollerAir(sysRollerAir); - } - - /** - * 批量删除自动化卷膜风口大小设置 - * - * @param ids 需要删除的自动化卷膜风口大小设置主键 - * @return 结果 - */ - @Override - public int deleteSysRollerAirByIds(Long[] ids) - { - return baseMapper.deleteSysRollerAirByIds(ids); - } - - /** - * 删除自动化卷膜风口大小设置信息 - * - * @param id 自动化卷膜风口大小设置主键 - * @return 结果 - */ - @Override - public int deleteSysRollerAirById(Long id) - { - return baseMapper.deleteSysRollerAirById(id); - } -} diff --git a/agri-admin/src/main/java/com/agri/web/controller/mqtt/MqttController.java b/agri-admin/src/main/java/com/agri/web/controller/mqtt/MqttController.java index 7d7c608..58b9161 100644 --- a/agri-admin/src/main/java/com/agri/web/controller/mqtt/MqttController.java +++ b/agri-admin/src/main/java/com/agri/web/controller/mqtt/MqttController.java @@ -3,6 +3,7 @@ package com.agri.web.controller.mqtt; import com.agri.common.annotation.Log; import com.agri.common.core.domain.AjaxResult; import com.agri.common.enums.BusinessType; +import com.agri.framework.manager.AgriStatusManager; import com.agri.framework.manager.MqttAutoOffManager; import com.agri.framework.manager.MqttClientManager; import com.agri.framework.manager.MqttSubscriptionManager; @@ -10,13 +11,7 @@ import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.bind.annotation.DeleteMapping; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; import java.util.List; @@ -37,6 +32,10 @@ public class MqttController { @Autowired private MqttAutoOffManager mqttAutoOffManager; + + @Autowired + private AgriStatusManager agriStatusManager; + /** * 单个订阅 */ @@ -146,4 +145,15 @@ public class MqttController { boolean hasTask = mqttAutoOffManager.hasAutoOffTask(deviceId); return AjaxResult.success(hasTask); } + + + @PostMapping("/getAgriStatus") + public void getAgriStatus(@RequestBody List imeiList) { + + if (imeiList.isEmpty()) { + log.info("大棚表无数据,结束推送"); + return; + } + agriStatusManager.asyncBatchPushMqtt(agriStatusManager.batchCheckDeviceOnline(imeiList)); + } } \ No newline at end of file diff --git a/agri-admin/src/main/resources/mapper/assets/SysRollerAirMapper.xml b/agri-admin/src/main/resources/mapper/assets/SysRollerAirMapper.xml deleted file mode 100644 index c5580a1..0000000 --- a/agri-admin/src/main/resources/mapper/assets/SysRollerAirMapper.xml +++ /dev/null @@ -1,97 +0,0 @@ - - - - - - - - - - - - - - - - - - - - select id, imei, roller, op_type, payload, clientid, op_time, create_by, create_time, update_by, update_time from sys_roller_air - - - - - - - - insert into sys_roller_air - - imei, - roller, - op_type, - payload, - clientid, - op_time, - create_by, - create_time, - update_by, - update_time, - - - #{imei}, - #{roller}, - #{opType}, - #{payload}, - #{clientid}, - #{opTime}, - #{createBy}, - #{createTime}, - #{updateBy}, - #{updateTime}, - - - - - update sys_roller_air - - imei = #{imei}, - roller = #{roller}, - op_type = #{opType}, - payload = #{payload}, - clientid = #{clientid}, - op_time = #{opTime}, - create_by = #{createBy}, - create_time = #{createTime}, - update_by = #{updateBy}, - update_time = #{updateTime}, - - where id = #{id} - - - - delete from sys_roller_air where id = #{id} - - - - delete from sys_roller_air where id in - - #{id} - - - \ No newline at end of file diff --git a/agri-framework/src/main/java/com/agri/framework/manager/AgriStatusManager.java b/agri-framework/src/main/java/com/agri/framework/manager/AgriStatusManager.java new file mode 100644 index 0000000..5b7b6c2 --- /dev/null +++ b/agri-framework/src/main/java/com/agri/framework/manager/AgriStatusManager.java @@ -0,0 +1,148 @@ +package com.agri.framework.manager; + +import com.agri.framework.config.MqttConfig; +import com.agri.system.service.ISysAgriInfoService; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.core.RedisCallback; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.serializer.StringRedisSerializer; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Component +public class AgriStatusManager { + + private static final Logger log = LoggerFactory.getLogger(AgriStatusManager.class); + + // Redis 前缀常量 + private static final String SUB_KEY_PREFIX = "sub:"; + + @Resource + private StringRedisTemplate stringRedisTemplate; + + @Resource + private MqttConfig.MqttMessageSender mqttMessageSender; + + + // JSON序列化工具(单例) + private final ObjectMapper objectMapper = new ObjectMapper(); + + private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + + // ========== 批量查在线状态(Pipeline 优化版,JDK 8 适配) ========== + // 在线离线的都得推 + public Map> batchCheckDeviceOnline(List imeiList) { + Map> result = new HashMap<>(); + if (imeiList.isEmpty()) { + return result; + } + + // JDK 8 显式声明 RedisCallback,避免 Lambda 泛型问题 + List results = stringRedisTemplate.executePipelined( + new RedisCallback() { + @Override + public Object doInRedis(RedisConnection connection) { + StringRedisSerializer serializer = new StringRedisSerializer(); + for (String imei : imeiList) { + byte[] onlineKeyBytes = serializer.serialize(SUB_KEY_PREFIX + imei); + connection.exists(onlineKeyBytes); // 批量执行 exists + connection.exists(serializer.serialize(imei)); + } + return null; + } + }, + new StringRedisSerializer() + ); + + // 解析结果:每两个结果对应一个IMEI(subExist + imeiOnline) + for (int i = 0; i < imeiList.size(); i++) { + String imei = imeiList.get(i); + // 初始化默认状态:不存在+离线 + boolean subExist = false; + boolean imeiOnline = false; + + // 越界判断:避免IndexOutOfBoundsException + int subIndex = i * 2; + int imeiIndex = i * 2 + 1; + if (subIndex < results.size()) { + Object subResult = results.get(subIndex); + subExist = parseExistsResult(subResult); + } + if (imeiIndex < results.size()) { + Object imeiResult = results.get(imeiIndex); + imeiOnline = parseExistsResult(imeiResult); + } + result.put(imei, ImmutableMap.of("subExist", subExist, "imeiOnline", imeiOnline)); + } + return result; + } + private boolean parseExistsResult(Object result) { + if (result instanceof Long) { + return ((Long) result) == 1; + } else if (result instanceof Boolean) { + return (Boolean) result; + } + return false; + } + // ========== 核心方法3:异步批量推送在线状态到 MQTT(线程池隔离) ========== + @Async("mqttPushExecutor") + public void asyncBatchPushMqtt(Map> statusMap) { + if (statusMap.isEmpty()) { + log.info("不存在任何imei"); + return; + } + int successCount = 0; + int failCount = 0; + String dateNow = LocalDateTime.now().format(DATE_TIME_FORMATTER); + // 在线状态 + for (Map.Entry> map : statusMap.entrySet()) { + String imei = map.getKey(); + try { + // 按你的需求,直接推送到 frontend/{imei}/online 主题 + Map imeiMap = map.getValue(); + + // 设备在线的 && 推送首页状态 离线在线都推 + if (imeiMap.get("subExist")) { + // 构造首页消息(用ObjectMapper序列化,避免手动拼接JSON) + Map onlineMsg = new HashMap<>(); + onlineMsg.put("online", imeiMap.get("imeiOnline") ? "在线" : "离线"); + onlineMsg.put("time", dateNow); // 毫秒时间戳 + onlineMsg.put("imei", imei); + String onlineMessage = objectMapper.writeValueAsString(onlineMsg); + mqttMessageSender.publish("device/" + imei + "/status", onlineMessage); + } + // 无论设备是否在线 只要离线就推送设备状态 + if (!imeiMap.get("imeiOnline")) { + // todo 设备离线推送 发消息提醒 + Map alarmMsg = new HashMap<>(); + alarmMsg.put("online", "设备离线"); + alarmMsg.put("time", dateNow); + alarmMsg.put("imei", imei); + String alarmMessage = objectMapper.writeValueAsString(alarmMsg); + mqttMessageSender.publish("frontend/" + imei + "/alarm", alarmMessage); + } + successCount++; + } catch (Exception e) { + failCount++; + log.error("向设备 {} 推送在线状态失败", imei, e); + } + + } + + log.info("批量在线状态推送完成:成功={},失败={}", successCount, failCount); + } +} diff --git a/agri-quartz/src/main/java/com/agri/quartz/task/AgriStatusTask.java b/agri-quartz/src/main/java/com/agri/quartz/task/AgriStatusTask.java index 9078c6b..3139dd9 100644 --- a/agri-quartz/src/main/java/com/agri/quartz/task/AgriStatusTask.java +++ b/agri-quartz/src/main/java/com/agri/quartz/task/AgriStatusTask.java @@ -1,5 +1,6 @@ package com.agri.quartz.task; +import com.agri.framework.manager.AgriStatusManager; import com.fasterxml.jackson.databind.ObjectMapper; import com.agri.framework.config.MqttConfig; import com.agri.system.service.ISysAgriInfoService; @@ -38,19 +39,14 @@ public class AgriStatusTask { @Resource private StringRedisTemplate stringRedisTemplate; - @Resource - private MqttConfig.MqttMessageSender mqttMessageSender; - @Value("${spring.mqtt.dtu-ctl-lock-ttl:15}") private int lockTtl; @Autowired private ISysAgriInfoService agriInfoService; - // JSON序列化工具(单例) - private final ObjectMapper objectMapper = new ObjectMapper(); - - private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + @Autowired + private AgriStatusManager agriStatusManager; /** * 定时任务:每10秒执行一次 @@ -58,7 +54,7 @@ public class AgriStatusTask { * 2. 批量查询设备在线状态 * 3. 异步批量推送在线状态到 MQTT 主题 frontend/{imei}/online */ - public void pushOnlineStatus() { + public void pushDeviceStatus() { // 1. 加分布式锁,避免集群重复执行 Boolean lockSuccess = stringRedisTemplate.opsForValue() .setIfAbsent(LOCK_KEY, "running", lockTtl, TimeUnit.SECONDS); @@ -89,7 +85,7 @@ public class AgriStatusTask { log.info("从大棚表获取到合法IMEI总数:{}", imeiList.size()); // 3. 批量查询设备在线状态(Redis Pipeline,一次网络往返) - asyncBatchPushMqtt(batchCheckDeviceOnline(imeiList)); + agriStatusManager.asyncBatchPushMqtt(agriStatusManager.batchCheckDeviceOnline(imeiList)); } catch (Exception e) { log.error("设备在线状态推送任务异常", e); // 可选:异常告警(如企业微信/钉钉) @@ -101,109 +97,6 @@ public class AgriStatusTask { } } - // ========== 批量查在线状态(Pipeline 优化版,JDK 8 适配) ========== - // 在线离线的都得推 - private Map> batchCheckDeviceOnline(List imeiList) { - Map> result = new HashMap<>(); - if (imeiList.isEmpty()) { - return result; - } - - // JDK 8 显式声明 RedisCallback,避免 Lambda 泛型问题 - List results = stringRedisTemplate.executePipelined( - new RedisCallback() { - @Override - public Object doInRedis(RedisConnection connection) { - StringRedisSerializer serializer = new StringRedisSerializer(); - for (String imei : imeiList) { - byte[] onlineKeyBytes = serializer.serialize(SUB_KEY_PREFIX + imei); - connection.exists(onlineKeyBytes); // 批量执行 exists - connection.exists(serializer.serialize(imei)); - } - return null; - } - }, - new StringRedisSerializer() - ); - - // 解析结果:每两个结果对应一个IMEI(subExist + imeiOnline) - for (int i = 0; i < imeiList.size(); i++) { - String imei = imeiList.get(i); - // 初始化默认状态:不存在+离线 - boolean subExist = false; - boolean imeiOnline = false; - - // 越界判断:避免IndexOutOfBoundsException - int subIndex = i * 2; - int imeiIndex = i * 2 + 1; - if (subIndex < results.size()) { - Object subResult = results.get(subIndex); - subExist = parseExistsResult(subResult); - } - if (imeiIndex < results.size()) { - Object imeiResult = results.get(imeiIndex); - imeiOnline = parseExistsResult(imeiResult); - } - result.put(imei, ImmutableMap.of("subExist", subExist, "imeiOnline", imeiOnline)); - } - return result; - } - private boolean parseExistsResult(Object result) { - if (result instanceof Long) { - return ((Long) result) == 1; - } else if (result instanceof Boolean) { - return (Boolean) result; - } - return false; - } - // ========== 核心方法3:异步批量推送在线状态到 MQTT(线程池隔离) ========== - @Async("mqttPushExecutor") - public void asyncBatchPushMqtt(Map> statusMap) { - if (statusMap.isEmpty()) { - log.info("不存在任何imei"); - return; - } - int successCount = 0; - int failCount = 0; - String dateNow = LocalDateTime.now().format(DATE_TIME_FORMATTER); - // 在线状态 - for (Map.Entry> map : statusMap.entrySet()) { - String imei = map.getKey(); - try { - // 按你的需求,直接推送到 frontend/{imei}/online 主题 - Map imeiMap = map.getValue(); - - // 设备在线的 && 推送首页状态 离线在线都推 - if (imeiMap.get("subExist")) { - // 构造首页消息(用ObjectMapper序列化,避免手动拼接JSON) - Map onlineMsg = new HashMap<>(); - onlineMsg.put("online", imeiMap.get("imeiOnline") ? "在线" : "离线"); - onlineMsg.put("time", dateNow); // 毫秒时间戳 - onlineMsg.put("imei", imei); - String onlineMessage = objectMapper.writeValueAsString(onlineMsg); - mqttMessageSender.publish("device/" + imei + "/status", onlineMessage); - } - // 无论设备是否在线 只要离线就推送设备状态 - if (!imeiMap.get("imeiOnline")) { - // todo 设备离线推送 发消息提醒 - Map alarmMsg = new HashMap<>(); - alarmMsg.put("online", "设备离线"); - alarmMsg.put("time", dateNow); - alarmMsg.put("imei", imei); - String alarmMessage = objectMapper.writeValueAsString(alarmMsg); - mqttMessageSender.publish("frontend/" + imei + "/alarm", alarmMessage); - } - successCount++; - } catch (Exception e) { - failCount++; - log.error("向设备 {} 推送在线状态失败", imei, e); - } - - } - - log.info("批量在线状态推送完成:成功={},失败={}", successCount, failCount); - } - /* * 企业级Lua方案:基于大棚表IMEI列表,批量查sub:{imei}是否存在 diff --git a/agri-quartz/src/main/java/com/agri/quartz/task/AgriTask.java b/agri-quartz/src/main/java/com/agri/quartz/task/AgriTask.java index 692e744..a67a6d1 100644 --- a/agri-quartz/src/main/java/com/agri/quartz/task/AgriTask.java +++ b/agri-quartz/src/main/java/com/agri/quartz/task/AgriTask.java @@ -2,52 +2,73 @@ package com.agri.quartz.task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.data.redis.connection.RedisConnection; -import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; import org.springframework.data.redis.core.Cursor; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ScanOptions; import org.springframework.stereotype.Component; import javax.annotation.Resource; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +/** + * JDK 8 适配版 Redis 缓存清理任务 + * 解决:1. RedisTemplate 多实例冲突 2. Cursor 类型不匹配 3. 空指针问题 + * 功能:批量删除 Redis 中 sub: 开头的键 + */ @Component("agriTask") public class AgriTask { - @Resource - private RedisConnectionFactory redisConnectionFactory; - private final static Logger log = LoggerFactory.getLogger(AgriTask.class); + // 核心修复:指定注入的 Bean 名称为 "redisTemplate"(匹配自定义配置的 Bean) + @Autowired + private RedisTemplate redisTemplate; + + private static final Logger log = LoggerFactory.getLogger(AgriTask.class); + /** - * 每日零点执行 - * 删除 redis 所有sub: 开头的数据 + * 每日零点执行:清理 sub: 开头的 Redis 键 */ public void clearInvalidCache() { - log.info("===== 开始执行Redis sub: 键清理任务 ====="); - RedisConnection connection = null; - Cursor cursor = null; int deletedCount = 0; + // 兜底获取 RedisTemplate(指定 Bean 名称,解决多实例冲突) + if (redisTemplate == null) { + log.error("RedisTemplate 初始化失败,清理任务终止"); + return; + } + try { - // 获取Redis连接 - connection = redisConnectionFactory.getConnection(); - // 配置SCAN参数:匹配sub:*,分批遍历(每次1000条) + // 配置 SCAN 参数:匹配 sub:*,分批遍历(每次1000条) ScanOptions scanOptions = ScanOptions.scanOptions() .match("sub:*") .count(1000) .build(); - // 遍历所有匹配的键 - cursor = connection.scan(scanOptions); - List batchKeys = new ArrayList<>(1000); // 批量删除缓冲区 + // 接收 Cursor 类型(JDK 8 下原生返回类型) + Cursor cursor = redisTemplate.executeWithStickyConnection(connection -> + connection.scan(scanOptions) + ); - while (cursor.hasNext()) { - batchKeys.add(cursor.next()); - // 每攒1000个键批量删除(减少网络交互) + // 批量删除缓冲区(存储 String 类型键) + List batchKeys = new ArrayList<>(1000); + + // 遍历字节数组类型的键,转换为 String + while (cursor != null && cursor.hasNext()) { + byte[] keyBytes = cursor.next(); + // 转换 byte[] -> String(UTF-8 编码,避免乱码) + String key = new String(keyBytes, StandardCharsets.UTF_8); + batchKeys.add(key); + + // 每攒1000个键批量删除 if (batchKeys.size() >= 1000) { deletedCount += batchKeys.size(); - connection.del(batchKeys.toArray(new byte[0][])); + redisTemplate.delete(batchKeys); log.info("批量删除 {} 个sub: 键", batchKeys.size()); batchKeys.clear(); } @@ -56,27 +77,24 @@ public class AgriTask { // 删除剩余的键 if (!batchKeys.isEmpty()) { deletedCount += batchKeys.size(); - connection.del(batchKeys.toArray(new byte[0][])); + redisTemplate.delete(batchKeys); log.info("批量删除剩余 {} 个sub: 键", batchKeys.size()); } + // 关闭游标(避免资源泄漏) + if (cursor != null) { + try { + cursor.close(); + } catch (Exception e) { + log.error("关闭 Redis 游标失败", e); + } + } + log.info("===== Redis sub: 键清理完成,总计删除 {} 个键 =====", deletedCount); } catch (Exception e) { log.error("Redis sub: 键清理失败", e); - // 可选:添加告警逻辑(如钉钉/邮件通知) - } finally { - // 关闭游标和连接 - if (cursor != null) { - try { - cursor.close(); - } catch (Exception e) { - log.error("关闭游标失败", e); - } - } - if (connection != null) { - connection.close(); - } } } -} + +} \ No newline at end of file diff --git a/agri-quartz/src/main/java/com/agri/quartz/task/TempTask.java b/agri-quartz/src/main/java/com/agri/quartz/task/TempTask.java index 7eecfbe..9e86061 100644 --- a/agri-quartz/src/main/java/com/agri/quartz/task/TempTask.java +++ b/agri-quartz/src/main/java/com/agri/quartz/task/TempTask.java @@ -17,7 +17,7 @@ public class TempTask { public void ryNoParams() { dtuDataService.lambdaUpdate() - .eq(SysDtuData::getImei, "864536071851206") + .eq(SysDtuData::getImei, "864865085008523") .orderByDesc(SysDtuData::getId) .last("limit 1") .set(SysDtuData::getTime, new Date())