转发逻辑修改,先转发、再处理逻辑;企业微信报警信息修改

master
lld 2026-01-29 16:44:09 +08:00
parent f47cf01cb0
commit c9652702db
4 changed files with 79 additions and 43 deletions

View File

@ -1,5 +1,6 @@
package com.agri.framework.interceptor;
import com.agri.common.utils.wechat.WxUtil;
import com.agri.framework.config.MqttConfig;
import com.agri.framework.manager.MqttAutoOffManager;
import com.agri.framework.manager.MqttSubscriptionManager;
@ -101,10 +102,9 @@ public class DeviceStatusHandler {
/**
*
*/
public void handle(String topic, String payload) throws MqttException {
if (!payload.trim().startsWith("{")) {
return;
}
public void handle(String topic, String payload) {
if (!isJsonObjectLike(payload)) return;
// 第一步解析JSON非有效JSON直接return
JSONObject payloadObj;
try {
@ -118,16 +118,23 @@ public class DeviceStatusHandler {
return;
}
// log.info("【设备处理】JSON解析{}",payloadObj);
// log.info("【设备处理】JSON解析{}",payloadObj);
// 解析设备ID主题格式为dtu/{deviceId}/up分割后第2个元素是设备ID
String deviceId = topic.split("/")[1];
String deviceId = extractDeviceId(topic);
if (deviceId == null) return;
// 转发消息
forwardPayload(deviceId, payload);
isStartAutoOffTask(payloadObj,deviceId,payload);
}
private void isStartAutoOffTask(JSONObject payloadObj,String deviceId,String payload) {
// 第二步:判断是否为设备回执({"suc":true/false,"prop":{"功能码":指令}}
String funcType = null;
Integer funcValue = null;
boolean isAck = false;
// 新增:标记是否需要执行自动关任务(全局可用)
// 第二步设备回执处理逻辑完全移除Redis写入
if (payloadObj.containsKey("suc") && payloadObj.containsKey("prop")) {
isAck = true;
@ -195,44 +202,72 @@ public class DeviceStatusHandler {
// log.debug("【设备状态包】写入Redis成功deviceId={}", deviceId);
}
}
// 非回执消息:正常转发给订阅前端
// 查询Redis中订阅该设备的前端列表sub:{deviceId}
Set<String> subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId);
}
if (subscribedClients != null && !subscribedClients.isEmpty()) {
// 推送给每个订阅的前端
// 方案B不再依赖online:改为校验subc:{clientId}是否仍包含deviceId取消订阅失败/异常退出兜底)
List<String> clients = new ArrayList<>(subscribedClients);
// 判断subc是否还存在 一次性查全部 获取失效的clientId
List<Boolean> stillSubs = mqttSubscriptionManager.pipeIsMemberSubc(clients, deviceId);
private void forwardPayload(String deviceId,String payload) {
try {
// 非回执消息:正常转发给订阅前端
// 查询Redis中订阅该设备的前端列表sub:{deviceId}
Set<String> subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId);
// 关系不存在清理sub:{deviceId}残留,避免一直给前端发
List<String> stale = null;
if (subscribedClients != null && !subscribedClients.isEmpty()) {
// 推送给每个订阅的前端
// 方案B不再依赖online:改为校验subc:{clientId}是否仍包含deviceId取消订阅失败/异常退出兜底)
List<String> clients = new ArrayList<>(subscribedClients);
// 判断subc是否还存在 一次性查全部 获取失效的clientId
List<Boolean> stillSubs = mqttSubscriptionManager.pipeIsMemberSubc(clients, deviceId);
for (int i = 0; i < clients.size(); i++) {
String clientId = clients.get(i);
boolean stillSub = i < stillSubs.size() && Boolean.TRUE.equals(stillSubs.get(i));
if (!stillSub) {
if (stale == null) {
stale = new ArrayList<>();
// 关系不存在清理sub:{deviceId}残留,避免一直给前端发
List<String> stale = null;
for (int i = 0; i < clients.size(); i++) {
String clientId = clients.get(i);
boolean stillSub = i < stillSubs.size() && Boolean.TRUE.equals(stillSubs.get(i));
if (!stillSub) {
if (stale == null) {
stale = new ArrayList<>();
}
// false不存在添加队列
stale.add(clientId);
continue;
}
// false不存在添加队列
stale.add(clientId);
continue;
// 前端专属主题frontend/{clientId}/dtu/{deviceId}/listener
String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";
// 发布消息
mqttMessageSender.publish(frontendTopic, payload);
// log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic);
}
// 前端专属主题frontend/{clientId}/dtu/{deviceId}/listener
String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener";
// 发布消息
mqttMessageSender.publish(frontendTopic, payload);
// log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic);
// 删掉设备对应的客户端
if (stale != null && !stale.isEmpty()) {
mqttSubscriptionManager.pipeSRemSub(deviceId, stale);
}
} else {
// 优化替换System.out为log.info
// log.info("【设备状态转发】设备{}无订阅前端,跳过转发", deviceId);
}
// 删掉设备对应的客户端
if (stale != null && !stale.isEmpty()) {
mqttSubscriptionManager.pipeSRemSub(deviceId, stale);
}
} else {
// 优化替换System.out为log.info
// log.info("【设备状态转发】设备{}无订阅前端,跳过转发", deviceId);
} catch (MqttException e) {
WxUtil.pushText(
"【消息转发失败】\n deviceId: "+deviceId+"\n payload: "+payload+"\n msg: "+e.getMessage()+"\n cause: "+e.getCause());
log.error("【消息转发失败】deviceId={}, msg={}", deviceId, e.getMessage(), e);
}
}
private String extractDeviceId(String topic) {
int first = topic.indexOf('/');
if (first < 0) return null;
int second = topic.indexOf('/', first + 1);
if (second < 0) return null;
return topic.substring(first + 1, second);
}
private boolean isJsonObjectLike(String s) {
if (s == null) return false;
int n = s.length();
for (int i = 0; i < n; i++) {
char c = s.charAt(i);
if (!Character.isWhitespace(c)) return c == '{';
}
return false;
}
}

View File

@ -175,7 +175,7 @@ public class MqttAutoOffManager {
try {
runAutoOff(deviceId, funcType);
} catch (Exception e) {
WxUtil.pushText("【自动关任务】提交任务失败! deviceId: "+deviceId+", funcType: "+funcType+", msg: "+e.getMessage());
WxUtil.pushText("【自动关任务】提交任务失败! \n deviceId: "+deviceId+"\n funcType: "+funcType+"\n msg: "+e.getMessage()+"\n cause: "+e.getCause());
log.error("【自动关任务】执行失败deviceId={}, funcType={}", deviceId, funcType, e);
} finally {
// 任务执行完成后移除映射
@ -207,7 +207,7 @@ public class MqttAutoOffManager {
try {
latestObj = JSON.parseObject(latest);
} catch (Exception e) {
WxUtil.pushText("自动关任务执行报错-解析异常:deviceId:" + deviceId + ", funcType:" + funcType+"异常:"+e.getMessage());
WxUtil.pushText("自动关任务执行报错-解析异常:\n deviceId: " + deviceId + "\n funcType:" + funcType+"\n 异常:"+e.getMessage()+"\n Cause: "+e.getCause());
log.warn("【自动关任务】最新状态JSON解析失败跳过deviceId={}, funcType={}", deviceId, funcType);
return;
}

View File

@ -142,7 +142,7 @@ public class MqttClientManager implements SmartLifecycle {
log.error("【MQTT连接异常】连接断开clientId{},原因:{}",
safeClientId(), (cause == null ? "unknown" : cause.getMessage()), cause);
WxUtil.pushText("【MQTT连接异常】连接断开clientId"+safeClientId()+"原因:"+(cause == null ? "unknown" : cause.getMessage()));
WxUtil.pushText("【MQTT连接异常】连接断开\n clientId"+safeClientId()+"\n 原因:"+(cause == null ? "unknown" : cause.getMessage())+"\n Cause: "+cause.getCause());
// 【方案A】不再触发自写重连Paho自动重连会接管重连过程
// 这里只记录日志即可
if (isRunning.get()) {
@ -170,7 +170,7 @@ public class MqttClientManager implements SmartLifecycle {
// mqttBizPool.getActiveCount(),
// mqttBizPool.getQueue().size());
if (mqttBizPool.getActiveCount()>10 || mqttBizPool.getQueue().size()>1000) {
WxUtil.pushText("线程池繁忙 正在处理中任务:"+mqttBizPool.getActiveCount()+", 剩余待进行任务:"+mqttBizPool.getQueue().size());
WxUtil.pushText("线程池繁忙! \n 正在处理中任务:"+mqttBizPool.getActiveCount()+"\n 剩余待进行任务:"+mqttBizPool.getQueue().size());
}
try {
// 优化显式指定UTF-8编码避免乱码JDK 8兼容

View File

@ -62,6 +62,7 @@ public class MqttMessageDispatcher {
else if (topic.matches("frontend/\\w+/online")) {
frontendOnlineHandler.handle(topic, payload);
}
// todo 是否加回复主题??
} catch (Exception e) {
log.error("【MQTT消息处理异常】topic={}", topic, e);
}