diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceStatusHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceStatusHandler.java index a947b66..7bdf95d 100644 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceStatusHandler.java +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceStatusHandler.java @@ -1,5 +1,6 @@ package com.agri.framework.interceptor; +import com.agri.common.utils.wechat.WxUtil; import com.agri.framework.config.MqttConfig; import com.agri.framework.manager.MqttAutoOffManager; import com.agri.framework.manager.MqttSubscriptionManager; @@ -101,10 +102,9 @@ public class DeviceStatusHandler { /** * 处理设备状态:转发给订阅的前端、处理回执、触发自动关 */ - public void handle(String topic, String payload) throws MqttException { - if (!payload.trim().startsWith("{")) { - return; - } + public void handle(String topic, String payload) { + + if (!isJsonObjectLike(payload)) return; // 第一步:解析JSON,非有效JSON直接return JSONObject payloadObj; try { @@ -118,16 +118,23 @@ public class DeviceStatusHandler { return; } -// log.info("【设备处理】JSON解析:{}",payloadObj); + // log.info("【设备处理】JSON解析:{}",payloadObj); // 解析设备ID:主题格式为dtu/{deviceId}/up,分割后第2个元素是设备ID - String deviceId = topic.split("/")[1]; + String deviceId = extractDeviceId(topic); + if (deviceId == null) return; + // 转发消息 + forwardPayload(deviceId, payload); + isStartAutoOffTask(payloadObj,deviceId,payload); + + } + + private void isStartAutoOffTask(JSONObject payloadObj,String deviceId,String payload) { // 第二步:判断是否为设备回执({"suc":true/false,"prop":{"功能码":指令}}) String funcType = null; Integer funcValue = null; boolean isAck = false; // 新增:标记是否需要执行自动关任务(全局可用) - // 第二步:设备回执处理逻辑(完全移除Redis写入) if (payloadObj.containsKey("suc") && payloadObj.containsKey("prop")) { isAck = true; @@ -195,44 +202,72 @@ public class DeviceStatusHandler { // log.debug("【设备状态包】写入Redis成功,deviceId={}", deviceId); } } - // 非回执消息:正常转发给订阅前端 - // 查询Redis中订阅该设备的前端列表:sub:{deviceId} - Set subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId); + } - if (subscribedClients != null && !subscribedClients.isEmpty()) { - // 推送给每个订阅的前端 - // 方案B:不再依赖online:;改为校验subc:{clientId}是否仍包含deviceId(取消订阅失败/异常退出兜底) - List clients = new ArrayList<>(subscribedClients); - // 判断subc是否还存在 一次性查全部 获取失效的clientId - List stillSubs = mqttSubscriptionManager.pipeIsMemberSubc(clients, deviceId); + private void forwardPayload(String deviceId,String payload) { + try { + // 非回执消息:正常转发给订阅前端 + // 查询Redis中订阅该设备的前端列表:sub:{deviceId} + Set subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId); - // 关系不存在:清理sub:{deviceId}残留,避免一直给前端发 - List stale = null; + if (subscribedClients != null && !subscribedClients.isEmpty()) { + // 推送给每个订阅的前端 + // 方案B:不再依赖online:;改为校验subc:{clientId}是否仍包含deviceId(取消订阅失败/异常退出兜底) + List clients = new ArrayList<>(subscribedClients); + // 判断subc是否还存在 一次性查全部 获取失效的clientId + List stillSubs = mqttSubscriptionManager.pipeIsMemberSubc(clients, deviceId); - for (int i = 0; i < clients.size(); i++) { - String clientId = clients.get(i); - boolean stillSub = i < stillSubs.size() && Boolean.TRUE.equals(stillSubs.get(i)); - if (!stillSub) { - if (stale == null) { - stale = new ArrayList<>(); + // 关系不存在:清理sub:{deviceId}残留,避免一直给前端发 + List stale = null; + + for (int i = 0; i < clients.size(); i++) { + String clientId = clients.get(i); + boolean stillSub = i < stillSubs.size() && Boolean.TRUE.equals(stillSubs.get(i)); + if (!stillSub) { + if (stale == null) { + stale = new ArrayList<>(); + } + // false不存在添加队列 + stale.add(clientId); + continue; } - // false不存在添加队列 - stale.add(clientId); - continue; + // 前端专属主题:frontend/{clientId}/dtu/{deviceId}/listener + String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; + // 发布消息 + mqttMessageSender.publish(frontendTopic, payload); + // log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic); } - // 前端专属主题:frontend/{clientId}/dtu/{deviceId}/listener - String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/listener"; - // 发布消息 - mqttMessageSender.publish(frontendTopic, payload); - // log.info("【设备状态转发】设备{} → 前端{},主题:{}", deviceId, clientId, frontendTopic); + // 删掉设备对应的客户端 + if (stale != null && !stale.isEmpty()) { + mqttSubscriptionManager.pipeSRemSub(deviceId, stale); + } + } else { + // 优化:替换System.out为log.info + // log.info("【设备状态转发】设备{}无订阅前端,跳过转发", deviceId); } - // 删掉设备对应的客户端 - if (stale != null && !stale.isEmpty()) { - mqttSubscriptionManager.pipeSRemSub(deviceId, stale); - } - } else { - // 优化:替换System.out为log.info - // log.info("【设备状态转发】设备{}无订阅前端,跳过转发", deviceId); + } catch (MqttException e) { + WxUtil.pushText( + "【消息转发失败】\n deviceId: "+deviceId+"\n payload: "+payload+"\n msg: "+e.getMessage()+"\n cause: "+e.getCause()); + log.error("【消息转发失败】deviceId={}, msg={}", deviceId, e.getMessage(), e); } } + + private String extractDeviceId(String topic) { + int first = topic.indexOf('/'); + if (first < 0) return null; + int second = topic.indexOf('/', first + 1); + if (second < 0) return null; + return topic.substring(first + 1, second); + } + + private boolean isJsonObjectLike(String s) { + if (s == null) return false; + int n = s.length(); + for (int i = 0; i < n; i++) { + char c = s.charAt(i); + if (!Character.isWhitespace(c)) return c == '{'; + } + return false; + } + } \ No newline at end of file diff --git a/agri-framework/src/main/java/com/agri/framework/manager/MqttAutoOffManager.java b/agri-framework/src/main/java/com/agri/framework/manager/MqttAutoOffManager.java index ef76fc4..abc234d 100644 --- a/agri-framework/src/main/java/com/agri/framework/manager/MqttAutoOffManager.java +++ b/agri-framework/src/main/java/com/agri/framework/manager/MqttAutoOffManager.java @@ -175,7 +175,7 @@ public class MqttAutoOffManager { try { runAutoOff(deviceId, funcType); } catch (Exception e) { - WxUtil.pushText("【自动关任务】提交任务失败! deviceId: "+deviceId+", funcType: "+funcType+", msg: "+e.getMessage()); + WxUtil.pushText("【自动关任务】提交任务失败! \n deviceId: "+deviceId+"\n funcType: "+funcType+"\n msg: "+e.getMessage()+"\n cause: "+e.getCause()); log.error("【自动关任务】执行失败,deviceId={}, funcType={}", deviceId, funcType, e); } finally { // 任务执行完成后移除映射 @@ -207,7 +207,7 @@ public class MqttAutoOffManager { try { latestObj = JSON.parseObject(latest); } catch (Exception e) { - WxUtil.pushText("自动关任务执行报错-解析异常:deviceId:" + deviceId + ", funcType:" + funcType+"异常:"+e.getMessage()); + WxUtil.pushText("自动关任务执行报错-解析异常:\n deviceId: " + deviceId + "\n funcType:" + funcType+"\n 异常:"+e.getMessage()+"\n Cause: "+e.getCause()); log.warn("【自动关任务】最新状态JSON解析失败,跳过:deviceId={}, funcType={}", deviceId, funcType); return; } diff --git a/agri-framework/src/main/java/com/agri/framework/manager/MqttClientManager.java b/agri-framework/src/main/java/com/agri/framework/manager/MqttClientManager.java index e60f3d9..4abc56f 100644 --- a/agri-framework/src/main/java/com/agri/framework/manager/MqttClientManager.java +++ b/agri-framework/src/main/java/com/agri/framework/manager/MqttClientManager.java @@ -142,7 +142,7 @@ public class MqttClientManager implements SmartLifecycle { log.error("【MQTT连接异常】连接断开,clientId:{},原因:{}", safeClientId(), (cause == null ? "unknown" : cause.getMessage()), cause); - WxUtil.pushText("【MQTT连接异常】连接断开,clientId:"+safeClientId()+",原因:"+(cause == null ? "unknown" : cause.getMessage())); + WxUtil.pushText("【MQTT连接异常】连接断开:\n clientId:"+safeClientId()+"\n 原因:"+(cause == null ? "unknown" : cause.getMessage())+"\n Cause: "+cause.getCause()); // 【方案A】不再触发自写重连;Paho自动重连会接管重连过程 // 这里只记录日志即可 if (isRunning.get()) { @@ -170,7 +170,7 @@ public class MqttClientManager implements SmartLifecycle { // mqttBizPool.getActiveCount(), // mqttBizPool.getQueue().size()); if (mqttBizPool.getActiveCount()>10 || mqttBizPool.getQueue().size()>1000) { - WxUtil.pushText("线程池繁忙 正在处理中任务:"+mqttBizPool.getActiveCount()+", 剩余待进行任务:"+mqttBizPool.getQueue().size()); + WxUtil.pushText("线程池繁忙! \n 正在处理中任务:"+mqttBizPool.getActiveCount()+"\n 剩余待进行任务:"+mqttBizPool.getQueue().size()); } try { // 优化:显式指定UTF-8编码,避免乱码(JDK 8兼容) diff --git a/agri-framework/src/main/java/com/agri/framework/web/dispatcher/MqttMessageDispatcher.java b/agri-framework/src/main/java/com/agri/framework/web/dispatcher/MqttMessageDispatcher.java index 8a23191..b2f1665 100644 --- a/agri-framework/src/main/java/com/agri/framework/web/dispatcher/MqttMessageDispatcher.java +++ b/agri-framework/src/main/java/com/agri/framework/web/dispatcher/MqttMessageDispatcher.java @@ -62,6 +62,7 @@ public class MqttMessageDispatcher { else if (topic.matches("frontend/\\w+/online")) { frontendOnlineHandler.handle(topic, payload); } + // todo 是否加回复主题?? } catch (Exception e) { log.error("【MQTT消息处理异常】topic={}", topic, e); }