设备状态离线和设备离线接到数据中心

master
lld 2026-03-31 20:03:24 +08:00
parent ad656d8cc3
commit 7cb223e15e
10 changed files with 254 additions and 409 deletions

View File

@ -1,6 +1,8 @@
package com.agri.framework.manager;
import com.agri.framework.config.MqttConfig;
import com.agri.system.domain.SysAgriInfo;
import com.agri.system.service.AgriService;
import com.agri.system.service.ISysAgriInfoService;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
@ -42,6 +44,8 @@ public class AgriStatusManager {
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@Autowired
private AgriService agriService;
// ========== 批量查在线状态Pipeline 优化版JDK 8 适配) ==========
// 在线离线的都得推
@ -127,13 +131,7 @@ public class AgriStatusManager {
}
// 无论设备是否在线 只要离线就推送设备状态
if (!imeiMap.get("imeiOnline")) {
// todo 设备离线推送 发消息提醒
Map<String, Object> alarmMsg = new HashMap<>();
alarmMsg.put("online", "设备离线");
alarmMsg.put("time", dateNow);
alarmMsg.put("imei", imei);
String alarmMessage = objectMapper.writeValueAsString(alarmMsg);
mqttMessageSender.publish("frontend/" + imei + "/alarm", alarmMessage);
agriService.sendAlarmMessage("设备离线", imei, dateNow);
}
successCount++;
} catch (Exception e) {

View File

@ -1,6 +1,8 @@
package com.agri.quartz.task;
import com.agri.framework.manager.AgriStatusManager;
import com.agri.system.domain.SysAgriInfo;
import com.agri.system.service.AgriService;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.agri.framework.config.MqttConfig;
import com.agri.system.service.ISysAgriInfoService;
@ -47,6 +49,8 @@ public class AgriStatusTask {
@Autowired
private AgriStatusManager agriStatusManager;
@Autowired
private AgriService agriService;
/**
* 10
@ -72,14 +76,21 @@ public class AgriStatusTask {
try {
// 查询大棚列表所有在线设备
List<String> imeiList = agriInfoService.queryImeiByUserId(null);
List<SysAgriInfo> agriInfos = agriInfoService.findAgriByUser(new SysAgriInfo());
List<String> imeiList = agriService.queryAllGreenhouseImei(agriInfos);
if (imeiList.isEmpty()) {
log.info("大棚表无数据,结束推送");
return;
}
Map<String, Map<String, Boolean>> statusMap
= agriStatusManager.batchCheckDeviceOnline(imeiList);
// 3. 批量查询设备在线状态Redis Pipeline一次网络往返
agriStatusManager.asyncBatchPushMqtt(agriStatusManager.batchCheckDeviceOnline(imeiList));
agriStatusManager.asyncBatchPushMqtt(statusMap);
// 4. 保存离线设备
List<SysAgriInfo> offlineDevices = findOfflineDevices(agriInfos, statusMap);
// 5. 保存消息中心
agriService.saveMessage(offlineDevices,"怀疑设备离线");
} catch (Exception e) {
log.error("设备在线状态推送任务异常", e);
// 可选:异常告警(如企业微信/钉钉)
@ -89,7 +100,22 @@ public class AgriStatusTask {
stringRedisTemplate.delete(LOCK_KEY);
}
}
// 查找离线设备
private List<SysAgriInfo> findOfflineDevices(List<SysAgriInfo> agriInfos,
Map<String, Map<String, Boolean>> statusMap) {
if (statusMap.isEmpty()) {
log.info("不存在任何imei");
return new ArrayList<>();
}
List<SysAgriInfo> offlineDevices = new ArrayList<>();
for (SysAgriInfo agriInfo : agriInfos) {
if (!statusMap.containsKey(agriInfo.getImei())) {
offlineDevices.add(agriInfo);
log.info("设备{} 不存在设备状态", agriInfo.getImei());
}
}
return offlineDevices;
}
/*
* LuaIMEIsub:{imei}

View File

@ -1,9 +1,9 @@
package com.agri.quartz.task;
import com.agri.framework.config.MqttConfig;
import com.agri.system.service.AgriService;
import com.agri.system.domain.SysAgriInfo;
import com.agri.system.service.ISysAgriInfoService;
import com.agri.system.service.ISysDtuDataService;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -15,11 +15,9 @@ import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Component
public class AgriTempTask {
@ -39,31 +37,31 @@ public class AgriTempTask {
@Value("${spring.mqtt.dtu-ctl-lock-ttl:60}")
private int lockTtl;
@Resource
private MqttConfig.MqttMessageSender mqttMessageSender;
private final ObjectMapper objectMapper = new ObjectMapper();
@Autowired
private AgriService agriService;
public void checkTempStatus() {
if (!acquireLock()) {
return;
}
try {
List<String> imeiList = queryAllGreenhouseImei();
List<SysAgriInfo> agriInfos = agriInfoService.findAgriByUser(new SysAgriInfo());
List<String> imeiList = agriService.queryAllGreenhouseImei(agriInfos);
if (CollectionUtils.isEmpty(imeiList)) {
log.info("大棚表无数据,结束推送");
return;
}
Map<String, Object> latestDataMap = queryLatestDtuData(imeiList);
List<String> offlineDeviceList = findOfflineDevices(imeiList, latestDataMap);
pushOfflineAlarm(offlineDeviceList);
List<SysAgriInfo> offlineDevices = findOfflineDevices(agriInfos, latestDataMap);
pushOfflineAlarm(offlineDevices);
agriService.saveMessage(offlineDevices,"怀疑温度离线");
} catch (Exception e) {
log.error("设备在线状态推送任务异常", e);
} finally {
releaseLock();
}
}
// 获取分布式锁
private boolean acquireLock() {
Boolean lockSuccess = stringRedisTemplate.opsForValue()
.setIfAbsent(LOCK_KEY, "agriTempTask", lockTtl, TimeUnit.SECONDS);
@ -78,14 +76,13 @@ public class AgriTempTask {
return true;
}
// 释放分布式锁
private void releaseLock() {
stringRedisTemplate.delete(LOCK_KEY);
}
private List<String> queryAllGreenhouseImei() {
return agriInfoService.queryImeiByUserId(null);
}
// 查询所有大棚的最新温湿度数据
private Map<String, Object> queryLatestDtuData(List<String> imeiList) {
List<Map<String, Object>> dtuDataList = dtuDataService.getLastDtuDataByImeiList(imeiList);
Map<String, Object> dataMap = new HashMap<>();
@ -101,37 +98,33 @@ public class AgriTempTask {
return dataMap;
}
private List<String> findOfflineDevices(List<String> imeiList, Map<String, Object> latestDataMap) {
List<String> offlineList = new ArrayList<>();
for (String imei : imeiList) {
if (!latestDataMap.containsKey(imei)) {
offlineList.add(imei);
log.info("设备{} 不存在温湿度数据", imei);
// 查找离线设备
private List<SysAgriInfo> findOfflineDevices(List<SysAgriInfo> agriInfos,
Map<String, Object> latestDataMap) {
List<SysAgriInfo> offlineList = new ArrayList<>();
for (SysAgriInfo agriInfo : agriInfos) {
if (!latestDataMap.containsKey(agriInfo.getImei())) {
offlineList.add(agriInfo);
log.info("设备{} 不存在温湿度数据", agriInfo.getImei());
}
}
return offlineList;
}
private void pushOfflineAlarm(List<String> offlineDeviceList) {
if (CollectionUtils.isEmpty(offlineDeviceList)) {
// 推送离线告警
private void pushOfflineAlarm(List<SysAgriInfo> offlineList) {
if (CollectionUtils.isEmpty(offlineList)) {
return;
}
String timeStr = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
for (String imei : offlineDeviceList) {
sendAlarmMessage(imei, timeStr);
for (SysAgriInfo agriInfo : offlineList) {
try {
agriService.sendAlarmMessage("温度离线",agriInfo.getImei(), timeStr);
} catch (Exception e) {
log.error("发送设备离线告警失败, imei={}", agriInfo.getImei(), e);
}
}
}
private void sendAlarmMessage(String imei, String timeStr) {
try {
Map<String, Object> alarmMsg = new HashMap<>();
alarmMsg.put("online", "温度离线");
alarmMsg.put("time", timeStr);
alarmMsg.put("imei", imei);
String alarmMessage = objectMapper.writeValueAsString(alarmMsg);
mqttMessageSender.publish("frontend/" + imei + "/alarm", alarmMessage);
} catch (Exception e) {
log.error("发送设备离线告警失败, imei={}", imei, e);
}
}
}

View File

@ -24,6 +24,7 @@ import java.util.Date;
* @author agri
* @date 2026-01-08
*/
@Data
@TableName("sys_agri_info")
public class SysAgriInfo extends BaseEntity
{
@ -96,115 +97,19 @@ public class SysAgriInfo extends BaseEntity
*/
private Integer blindNum;
public Long getId() {
return id;
}
/** 温度上限(℃) */
@Excel(name = "温度上限(℃)")
private BigDecimal tempUp;
public void setId(Long id) {
this.id = id;
}
/** 温度下限(℃) */
@Excel(name = "温度下限(℃)")
private BigDecimal tempLow;
public String getImei() {
return imei;
}
/** 湿度上限(%RH) */
@Excel(name = "湿度上限(%RH)")
private BigDecimal humiUp;
public void setImei(String imei) {
this.imei = imei;
}
public String getAgriName() {
return agriName;
}
public void setAgriName(String agriName) {
this.agriName = agriName;
}
public Integer getWorkMode() {
return workMode;
}
public void setWorkMode(Integer workMode) {
this.workMode = workMode;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public Integer getAlarmStatus() {
return alarmStatus;
}
public void setAlarmStatus(Integer alarmStatus) {
this.alarmStatus = alarmStatus;
}
public Integer getDeviceStatus() {
return deviceStatus;
}
public void setDeviceStatus(Integer deviceStatus) {
this.deviceStatus = deviceStatus;
}
public Date getInstallTime() {
return installTime;
}
public void setInstallTime(Date installTime) {
this.installTime = installTime;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
public Integer getIsDeleted() {
return isDeleted;
}
public void setIsDeleted(Integer isDeleted) {
this.isDeleted = isDeleted;
}
public Integer getSourceCode() {
return sourceCode;
}
public void setSourceCode(Integer sourceCode) {
this.sourceCode = sourceCode;
}
public Integer getQuiltNum() {
return quiltNum;
}
public void setQuiltNum(Integer quiltNum) {
this.quiltNum = quiltNum;
}
public Integer getFilmNum() {
return filmNum;
}
public void setFilmNum(Integer filmNum) {
this.filmNum = filmNum;
}
public Integer getBlindNum() {
return blindNum;
}
public void setBlindNum(Integer blindNum) {
this.blindNum = blindNum;
}
/** 湿度下限(%RH) */
@Excel(name = "湿度下限(%RH)")
private BigDecimal humiLow;
}

View File

@ -1,6 +1,7 @@
package com.agri.system.domain;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import com.baomidou.mybatisplus.annotation.TableField;
@ -14,6 +15,7 @@ import com.agri.common.core.domain.BaseEntity;
* @author lld
* @date 2026-03-26
*/
@Data
@TableName("sys_message")
public class SysMessage extends BaseEntity
{
@ -25,14 +27,14 @@ public class SysMessage extends BaseEntity
/** 接收人all=全体用户,其他=用户ID */
@Excel(name = "接收人all=全体用户,其他=用户ID")
private String receiver;
private Long receiver;
/** 消息标题 */
@Excel(name = "消息标题")
private String title;
/** 消息类型activity-活动 notice-通知 interact-互动消息 */
@Excel(name = "消息类型:activity-活动 notice-通知 interact-互动消息")
@Excel(name = "消息类型:status-状态notice-通知event-活动")
private String msgType;
/** 阅读状态0-未读 1-已读 */
@ -55,113 +57,4 @@ public class SysMessage extends BaseEntity
@Excel(name = "跳转链接地址")
private String linkUrl;
public void setId(Long id)
{
this.id = id;
}
public Long getId()
{
return id;
}
public void setReceiver(String receiver)
{
this.receiver = receiver;
}
public String getReceiver()
{
return receiver;
}
public void setTitle(String title)
{
this.title = title;
}
public String getTitle()
{
return title;
}
public void setMsgType(String msgType)
{
this.msgType = msgType;
}
public String getMsgType()
{
return msgType;
}
public void setReadStatus(Long readStatus)
{
this.readStatus = readStatus;
}
public Long getReadStatus()
{
return readStatus;
}
public void setContent(String content)
{
this.content = content;
}
public String getContent()
{
return content;
}
public void setRichContent(String richContent)
{
this.richContent = richContent;
}
public String getRichContent()
{
return richContent;
}
public void setImgUrl(String imgUrl)
{
this.imgUrl = imgUrl;
}
public String getImgUrl()
{
return imgUrl;
}
public void setLinkUrl(String linkUrl)
{
this.linkUrl = linkUrl;
}
public String getLinkUrl()
{
return linkUrl;
}
@Override
public String toString() {
return new ToStringBuilder(this,ToStringStyle.MULTI_LINE_STYLE)
.append("id", getId())
.append("receiver", getReceiver())
.append("title", getTitle())
.append("msgType", getMsgType())
.append("readStatus", getReadStatus())
.append("content", getContent())
.append("richContent", getRichContent())
.append("imgUrl", getImgUrl())
.append("linkUrl", getLinkUrl())
.append("createBy", getCreateBy())
.append("createTime", getCreateTime())
.append("updateBy", getUpdateBy())
.append("updateTime", getUpdateTime())
.append("remark", getRemark())
.toString();
}
}

View File

@ -11,10 +11,12 @@ import com.baomidou.mybatisplus.annotation.Version;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import lombok.Data;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import java.util.Date;
import java.util.List;
/**
* - sys_user_agri
@ -22,6 +24,7 @@ import java.util.Date;
* @author lld
* @date 2026-01-25
*/
@Data
@TableName("sys_user_agri")
public class SysUserAgri extends BaseEntity
{
@ -81,146 +84,10 @@ public class SysUserAgri extends BaseEntity
@TableField(exist = false)
private Boolean disabled;
public Boolean getDisabled() {
return disabled;
}
@TableField(exist = false)
private List<Long> idList;
public void setDisabled(Boolean disabled) {
this.disabled = disabled;
}
public String getImei() {
return imei;
}
public void setImei(String imei) {
this.imei = imei;
}
public SysUser getSysUser() {
return sysUser;
}
public void setSysUser(SysUser sysUser) {
this.sysUser = sysUser;
}
public void setId(Long id)
{
this.id = id;
}
public Long getId()
{
return id;
}
public void setAgriId(String agriId)
{
this.agriId = agriId;
}
public String getAgriId()
{
return agriId;
}
public void setUserId(Long userId)
{
this.userId = userId;
}
public Long getUserId()
{
return userId;
}
public void setRole(Integer role)
{
this.role = role;
}
public Integer getRole()
{
return role;
}
public void setStatus(Integer status)
{
this.status = status;
}
public Integer getStatus()
{
return status;
}
public void setInviteBy(Long inviteBy)
{
this.inviteBy = inviteBy;
}
public Long getInviteBy()
{
return inviteBy;
}
public void setInviteTime(Date inviteTime)
{
this.inviteTime = inviteTime;
}
public Date getInviteTime()
{
return inviteTime;
}
public void setAcceptTime(Date acceptTime)
{
this.acceptTime = acceptTime;
}
public Date getAcceptTime()
{
return acceptTime;
}
public Integer getPermMask() {
return permMask;
}
public void setPermMask(Integer permMask) {
this.permMask = permMask;
}
public Integer getVersion() {
return version;
}
public void setVersion(Integer version) {
this.version = version;
}
@Override
public String toString() {
return new ToStringBuilder(this,ToStringStyle.MULTI_LINE_STYLE)
.append("id", getId())
.append("agriId", getAgriId())
.append("sysUser", getSysUser())
.append("userId", getUserId())
.append("role", getRole())
.append("permMask", getPermMask())
.append("status", getStatus())
.append("inviteBy", getInviteBy())
.append("inviteTime", getInviteTime())
.append("acceptTime", getAcceptTime())
.append("remark", getRemark())
.append("createTime", getCreateTime())
.append("createBy", getCreateBy())
.append("updateTime", getUpdateTime())
.append("updateBy", getUpdateBy())
.append("version", getVersion())
.toString();
public SysUserAgri(List<Long> idList) {
this.idList = idList;
}
}

View File

@ -0,0 +1,16 @@
package com.agri.system.service;
import com.agri.system.domain.SysAgriInfo;
import java.util.List;
import java.util.Map;
public interface AgriService {
void sendAlarmMessage(String msg, String imei, String timeStr) throws Exception;
void saveMessage(List<SysAgriInfo> offlineList,String msg);
List<String> queryAllGreenhouseImei(List<SysAgriInfo> agriInfos);
}

View File

@ -0,0 +1,89 @@
package com.agri.system.service.impl;
import com.agri.framework.config.MqttConfig;
import com.agri.system.service.AgriService;
import com.agri.system.domain.SysAgriInfo;
import com.agri.system.domain.SysMessage;
import com.agri.system.domain.SysUserAgri;
import com.agri.system.service.ISysMessageService;
import com.agri.system.service.ISysUserAgriService;
import com.agri.system.util.UrlEncodeUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.*;
import java.util.stream.Collectors;
@Service
public class AgriServiceImpl implements AgriService {
@Resource
private MqttConfig.MqttMessageSender mqttMessageSender;
@Resource
private ISysUserAgriService userAgriService;
@Resource
private ISysMessageService messageService;
private final ObjectMapper objectMapper = new ObjectMapper();
// 发送离线告警消息
@Override
public void sendAlarmMessage(String msg, String imei, String timeStr) throws Exception {
Map<String, Object> alarmMsg = new HashMap<>();
alarmMsg.put("online", msg);
alarmMsg.put("time", timeStr);
alarmMsg.put("imei", imei);
String alarmMessage = objectMapper.writeValueAsString(alarmMsg);
mqttMessageSender.publish("frontend/" + imei + "/alarm", alarmMessage);
}
// 保存离线设备消息
@Override
public void saveMessage(List<SysAgriInfo> offlineList,String msg) {
// 离线是否为空
if (CollectionUtils.isEmpty(offlineList)) {
return;
}
List<Long> idList
= offlineList.stream().map(SysAgriInfo::getId).collect(Collectors.toList());
List<SysUserAgri> agriUser
= userAgriService.findAgriUser(new SysUserAgri(idList));
List<SysMessage> msgList = new ArrayList<>();
for (SysAgriInfo agriInfo : offlineList) {
SysMessage message = new SysMessage();
message.setTitle(msg);
message.setMsgType("status");
message.setReadStatus(0L);
message.setContent("大棚【" + agriInfo.getAgriName() + "】" + msg + "!请检查设备状态。");
message.setImgUrl("");
message.setLinkUrl(UrlEncodeUtil.buildControlPageUrl(agriInfo, "/pages/home/control/index?agriInfo="));
for (SysUserAgri userAgri : agriUser){
message.setReceiver(userAgri.getUserId());
msgList.add(message);
}
}
if (CollectionUtils.isNotEmpty(msgList)) {
messageService.saveBatch(msgList);
}
}
// 查询所有大棚的imei号
@Override
public List<String> queryAllGreenhouseImei(List<SysAgriInfo> agriInfos) {
if (CollectionUtils.isEmpty(agriInfos)) {
return Collections.emptyList();
}
return agriInfos.stream().map(SysAgriInfo::getImei).collect(Collectors.toList());
}
}

View File

@ -0,0 +1,51 @@
package com.agri.system.util;
import com.agri.system.domain.SysAgriInfo;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import java.net.URLEncoder;
/**
* JS
* JSON.stringify() + encodeURIComponent()
*/
public class UrlEncodeUtil {
/**
* JS URL
*/
public static String buildControlPageUrl(SysAgriInfo agriInfo, String linkUrl) {
try {
// 1. 构建和JS一模一样的JSON对象
JSONObject agriObj = new JSONObject();
agriObj.put("imei", agriInfo.getImei());
agriObj.put("agriName", agriInfo.getAgriName());
agriObj.put("agriId", agriInfo.getId());
agriObj.put("workMode", agriInfo.getWorkMode());
// 2. JSON.stringify
String agriJson = JSON.toJSONString(agriObj);
// 3. encodeURIComponent
String agriInfoEncoded = URLEncoder.encode(agriJson, "UTF-8");
// 4. 最终URL和你JS跳转一模一样
return linkUrl + agriInfoEncoded;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
// 通用版:任意对象转 encodeURIComponent 字符串
public static String encodeUriComponent(Object obj) {
try {
String json = JSON.toJSONString(obj);
return URLEncoder.encode(json, "UTF-8");
} catch (Exception e) {
return null;
}
}
}

View File

@ -189,6 +189,13 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<if test="acceptTime != null "> and user_agri.accept_time = #{acceptTime}</if>
<if test="version != null "> and user_agri.version = #{version}</if>
<if test="sysUser != null ">
<if test="sysUser.idList != null and sysUser.idList.size() > 0">
`u`.user_id IN
<foreach collection="idList" item="id" open="(" separator="," close=")">
#{id}
</foreach>
</if>
<if test="sysUser.userId != null and sysUser.userId != 0">
AND `u`.user_id = #{sysUser.userId}
</if>