From da8928cf9c8d6115c122c0095e294798ac3e537e Mon Sep 17 00:00:00 2001 From: xce Date: Sun, 25 Jan 2026 00:17:08 +0800 Subject: [PATCH] =?UTF-8?q?mqtt=E8=87=AA=E5=8A=A8=E5=85=B3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 104 ++---------------- .../web/controller/mqtt/MqttController.java | 20 +++- .../interceptor/DeviceStatusHandler.java | 2 +- .../interceptor/FrontendControlHandler.java | 54 +++++++++ .../framework/manager/MqttAutoOffManager.java | 27 ++++- .../framework/manager/MqttClientManager.java | 20 +++- doc/mqtt/mqtt-prepare.md | 20 ++++ doc/mqtt/mqtt-pressure-test.md | 17 +++ doc/mqtt/mqtt-topics.md | 15 +++ scripts/mqtt/pressure_mix.sh | 20 ++++ scripts/mqtt/pressure_up.sh | 26 +++++ 11 files changed, 223 insertions(+), 102 deletions(-) create mode 100644 doc/mqtt/mqtt-prepare.md create mode 100644 doc/mqtt/mqtt-pressure-test.md create mode 100644 doc/mqtt/mqtt-topics.md create mode 100644 scripts/mqtt/pressure_mix.sh create mode 100644 scripts/mqtt/pressure_up.sh diff --git a/README.md b/README.md index 94318d1..8faf5f4 100644 --- a/README.md +++ b/README.md @@ -1,95 +1,15 @@ -

- logo -

-

Agri v3.9.0

-

基于SpringBoot+Vue前后端分离的Java快速开发框架

-

- - - -

+# MQTT 压测脚本说明 -## 平台简介 +## 脚本列表 +- pressure_up.sh:设备状态上报压测 +- pressure_mix.sh:上报 + 控制混合压测 -智能农业是一套全部开源的快速开发平台,毫无保留给个人及企业免费使用。 +## 使用说明 +1. 安装 mosquitto-clients +2. 修改脚本中的 BROKER / IMEI 范围 +3. chmod +x *.sh +4. 运行脚本 -* 前端采用Vue、Element UI。 -* 后端采用Spring Boot、Spring Security、Redis & Jwt。 -* 权限认证使用Jwt,支持多终端认证系统。 -* 支持加载动态权限菜单,多方式轻松权限控制。 -* 高效率开发,使用代码生成器可以一键生成前后端代码。 -* 提供了技术栈([Vue3](https://v3.cn.vuejs.org) [Element Plus](https://element-plus.org/zh-CN) [Vite](https://cn.vitejs.dev))版本[RuoYi-Vue3](https://gitcode.com/yangzongzhuan/RuoYi-Vue3),保持同步更新。 -* 提供了单应用版本[RuoYi-Vue-fast](https://gitcode.com/yangzongzhuan/RuoYi-Vue-fast),Oracle版本[RuoYi-Vue-Oracle](https://gitcode.com/yangzongzhuan/RuoYi-Vue-Oracle),保持同步更新。 -* 不分离版本,请移步[RuoYi](https://gitee.com/y_project/RuoYi),微服务版本,请移步[RuoYi-Cloud](https://gitee.com/y_project/RuoYi-Cloud) -* 阿里云折扣场:[点我进入](http://aly.ruoyi.vip),腾讯云秒杀场:[点我进入](http://txy.ruoyi.vip)   - -## 内置功能 - -1. 用户管理:用户是系统操作者,该功能主要完成系统用户配置。 -2. 部门管理:配置系统组织机构(公司、部门、小组),树结构展现支持数据权限。 -3. 岗位管理:配置系统用户所属担任职务。 -4. 菜单管理:配置系统菜单,操作权限,按钮权限标识等。 -5. 角色管理:角色菜单权限分配、设置角色按机构进行数据范围权限划分。 -6. 字典管理:对系统中经常使用的一些较为固定的数据进行维护。 -7. 参数管理:对系统动态配置常用参数。 -8. 通知公告:系统通知公告信息发布维护。 -9. 操作日志:系统正常操作日志记录和查询;系统异常信息日志记录和查询。 -10. 登录日志:系统登录日志记录查询包含登录异常。 -11. 在线用户:当前系统中活跃用户状态监控。 -12. 定时任务:在线(添加、修改、删除)任务调度包含执行结果日志。 -13. 代码生成:前后端代码的生成(java、html、xml、sql)支持CRUD下载 。 -14. 系统接口:根据业务代码自动生成相关的api接口文档。 -15. 服务监控:监视当前系统CPU、内存、磁盘、堆栈等相关信息。 -16. 缓存监控:对系统的缓存信息查询,命令统计等。 -17. 在线构建器:拖动表单元素生成相应的HTML代码。 -18. 连接池监视:监视当前系统数据库连接池状态,可进行分析SQL找出系统性能瓶颈。 - -## 在线体验 - -- admin/admin123 -- 陆陆续续收到一些打赏,为了更好的体验已用于演示服务器升级。谢谢各位小伙伴。 - -演示地址:http://vue.ruoyi.vip -文档地址:http://doc.ruoyi.vip - -## 演示图 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
- - -## 智能农业前后端分离交流群 - -QQ群: [![加入QQ群](https://img.shields.io/badge/已满-937441-blue.svg)](https://jq.qq.com/?_wv=1027&k=5bVB1og) [![加入QQ群](https://img.shields.io/badge/已满-887144332-blue.svg)](https://jq.qq.com/?_wv=1027&k=5eiA4DH) [![加入QQ群](https://img.shields.io/badge/已满-180251782-blue.svg)](https://jq.qq.com/?_wv=1027&k=5AxMKlC) [![加入QQ群](https://img.shields.io/badge/已满-104180207-blue.svg)](https://jq.qq.com/?_wv=1027&k=51G72yr) [![加入QQ群](https://img.shields.io/badge/已满-186866453-blue.svg)](https://jq.qq.com/?_wv=1027&k=VvjN2nvu) [![加入QQ群](https://img.shields.io/badge/已满-201396349-blue.svg)](https://jq.qq.com/?_wv=1027&k=5vYAqA05) [![加入QQ群](https://img.shields.io/badge/已满-101456076-blue.svg)](https://jq.qq.com/?_wv=1027&k=kOIINEb5) [![加入QQ群](https://img.shields.io/badge/已满-101539465-blue.svg)](https://jq.qq.com/?_wv=1027&k=UKtX5jhs) [![加入QQ群](https://img.shields.io/badge/已满-264312783-blue.svg)](https://jq.qq.com/?_wv=1027&k=EI9an8lJ) [![加入QQ群](https://img.shields.io/badge/已满-167385320-blue.svg)](https://jq.qq.com/?_wv=1027&k=SWCtLnMz) [![加入QQ群](https://img.shields.io/badge/已满-104748341-blue.svg)](https://jq.qq.com/?_wv=1027&k=96Dkdq0k) [![加入QQ群](https://img.shields.io/badge/已满-160110482-blue.svg)](https://jq.qq.com/?_wv=1027&k=0fsNiYZt) [![加入QQ群](https://img.shields.io/badge/已满-170801498-blue.svg)](https://jq.qq.com/?_wv=1027&k=7xw4xUG1) [![加入QQ群](https://img.shields.io/badge/已满-108482800-blue.svg)](https://jq.qq.com/?_wv=1027&k=eCx8eyoJ) [![加入QQ群](https://img.shields.io/badge/已满-101046199-blue.svg)](https://jq.qq.com/?_wv=1027&k=SpyH2875) [![加入QQ群](https://img.shields.io/badge/已满-136919097-blue.svg)](https://jq.qq.com/?_wv=1027&k=tKEt51dz) [![加入QQ群](https://img.shields.io/badge/已满-143961921-blue.svg)](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=0vBbSb0ztbBgVtn3kJS-Q4HUNYwip89G&authKey=8irq5PhutrZmWIvsUsklBxhj57l%2F1nOZqjzigkXZVoZE451GG4JHPOqW7AW6cf0T&noverify=0&group_code=143961921) [![加入QQ群](https://img.shields.io/badge/已满-174951577-blue.svg)](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=ZFAPAbp09S2ltvwrJzp7wGlbopsc0rwi&authKey=HB2cxpxP2yspk%2Bo3WKTBfktRCccVkU26cgi5B16u0KcAYrVu7sBaE7XSEqmMdFQp&noverify=0&group_code=174951577) [![加入QQ群](https://img.shields.io/badge/已满-161281055-blue.svg)](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=Fn2aF5IHpwsy8j6VlalNJK6qbwFLFHat&authKey=uyIT%2B97x2AXj3odyXpsSpVaPMC%2Bidw0LxG5MAtEqlrcBcWJUA%2FeS43rsF1Tg7IRJ&noverify=0&group_code=161281055) [![加入QQ群](https://img.shields.io/badge/已满-138988063-blue.svg)](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=XIzkm_mV2xTsUtFxo63bmicYoDBA6Ifm&authKey=dDW%2F4qsmw3x9govoZY9w%2FoWAoC4wbHqGal%2BbqLzoS6VBarU8EBptIgPKN%2FviyC8j&noverify=0&group_code=138988063) [![加入QQ群](https://img.shields.io/badge/已满-151450850-blue.svg)](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=DkugnCg68PevlycJSKSwjhFqfIgrWWwR&authKey=pR1Pa5lPIeGF%2FFtIk6d%2FGB5qFi0EdvyErtpQXULzo03zbhopBHLWcuqdpwY241R%2F&noverify=0&group_code=151450850) [![加入QQ群](https://img.shields.io/badge/已满-224622315-blue.svg)](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=F58bgRa-Dp-rsQJThiJqIYv8t4-lWfXh&authKey=UmUs4CVG5OPA1whvsa4uSespOvyd8%2FAr9olEGaWAfdLmfKQk%2FVBp2YU3u2xXXt76&noverify=0&group_code=224622315) [![加入QQ群](https://img.shields.io/badge/已满-287842588-blue.svg)](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=Nxb2EQ5qozWa218Wbs7zgBnjLSNk_tVT&authKey=obBKXj6SBKgrFTJZx0AqQnIYbNOvBB2kmgwWvGhzxR67RoRr84%2Bus5OadzMcdJl5&noverify=0&group_code=287842588) [![加入QQ群](https://img.shields.io/badge/已满-187944233-blue.svg)](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=numtK1M_I4eVd2Gvg8qtbuL8JgX42qNh&authKey=giV9XWMaFZTY%2FqPlmWbkB9g3fi0Ev5CwEtT9Tgei0oUlFFCQLDp4ozWRiVIzubIm&noverify=0&group_code=187944233) [![加入QQ群](https://img.shields.io/badge/已满-228578329-blue.svg)](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=G6r5KGCaa3pqdbUSXNIgYloyb8e0_L0D&authKey=4w8tF1eGW7%2FedWn%2FHAypQksdrML%2BDHolQSx7094Agm7Luakj9EbfPnSTxSi2T1LQ&noverify=0&group_code=228578329) [![加入QQ群](https://img.shields.io/badge/已满-191164766-blue.svg)](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=GsOo-OLz53J8y_9TPoO6XXSGNRTgbFxA&authKey=R7Uy%2Feq%2BZsoKNqHvRKhiXpypW7DAogoWapOawUGHokJSBIBIre2%2FoiAZeZBSLuBc&noverify=0&group_code=191164766) [![加入QQ群](https://img.shields.io/badge/174569686-blue.svg)](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=PmYavuzsOthVqfdAPbo4uAeIbu7Ttjgc&authKey=p52l8%2FXa4PS1JcEmS3VccKSwOPJUZ1ZfQ69MEKzbrooNUljRtlKjvsXf04bxNp3G&noverify=0&group_code=174569686) 点击按钮入群。 \ No newline at end of file +## 停止压测 +```bash +pkill mosquitto_pub \ No newline at end of file diff --git a/agri-admin/src/main/java/com/agri/web/controller/mqtt/MqttController.java b/agri-admin/src/main/java/com/agri/web/controller/mqtt/MqttController.java index 66b521b..3b5786b 100644 --- a/agri-admin/src/main/java/com/agri/web/controller/mqtt/MqttController.java +++ b/agri-admin/src/main/java/com/agri/web/controller/mqtt/MqttController.java @@ -3,13 +3,16 @@ package com.agri.web.controller.mqtt; import com.agri.common.annotation.Log; import com.agri.common.core.domain.AjaxResult; import com.agri.common.enums.BusinessType; +import com.agri.framework.manager.MqttAutoOffManager; import com.agri.framework.manager.MqttClientManager; import com.agri.framework.manager.MqttSubscriptionManager; import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; @@ -32,6 +35,8 @@ public class MqttController { @Resource private MqttClientManager mqttClientManager; + @Autowired + private MqttAutoOffManager mqttAutoOffManager; /** * 单个订阅 */ @@ -125,12 +130,21 @@ public class MqttController { */ @GetMapping("/status") @Log(title = "手动触发MQTT重连", businessType = BusinessType.SELECT) - public String getMqttStatus() { + public AjaxResult getMqttStatus() { try { - return mqttClientManager.getMqttStatus(); + return AjaxResult.success(mqttClientManager.getMqttStatus()); } catch (Exception e) { log.error("查询MQTT连接状态异常", e); - return "查询状态失败:" + e.getMessage(); + return AjaxResult.error("查询状态失败:" + e.getMessage()); } } + + /** + * 2️⃣ 查询某个设备是否存在自动关任务 + */ + @GetMapping("/device/{deviceId}") + public AjaxResult device(@PathVariable String deviceId) { + boolean hasTask = mqttAutoOffManager.hasAutoOffTask(deviceId); + return AjaxResult.success(hasTask); + } } \ No newline at end of file diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceStatusHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceStatusHandler.java index 0633bfe..479c301 100644 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceStatusHandler.java +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/DeviceStatusHandler.java @@ -189,7 +189,7 @@ public class DeviceStatusHandler { latestTtlSeconds, TimeUnit.SECONDS ); - log.debug("【设备状态包】写入Redis成功,deviceId={}", deviceId); + // log.debug("【设备状态包】写入Redis成功,deviceId={}", deviceId); } } // 非回执消息:正常转发给订阅前端 diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/FrontendControlHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/FrontendControlHandler.java index f852352..3a02818 100644 --- a/agri-framework/src/main/java/com/agri/framework/interceptor/FrontendControlHandler.java +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/FrontendControlHandler.java @@ -1,19 +1,25 @@ package com.agri.framework.interceptor; import com.agri.framework.config.MqttConfig; +import com.agri.framework.manager.MqttAutoOffManager; +import com.agri.system.domain.SysAgriLimit; +import com.agri.system.service.ISysAgriLimitService; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.TypeReference; import org.eclipse.paho.client.mqttv3.MqttException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import javax.annotation.Resource; import java.time.LocalDateTime; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.Function; /** * 前端控制指令处理器 @@ -43,6 +49,23 @@ public class FrontendControlHandler { @Resource private MqttConfig.MqttMessageSender mqttMessageSender; + @Resource + private MqttAutoOffManager mqttAutoOffManager; + + @Autowired + private ISysAgriLimitService agriLimitService; + + private static final Map> LIMIT_MAP = new HashMap<>(); + static { + LIMIT_MAP.put("jm1g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1gLimit()))); + LIMIT_MAP.put("jm2g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2gLimit()))); + LIMIT_MAP.put("jbg", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbgLimit()))); + LIMIT_MAP.put("jm3g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3gLimit()))); + LIMIT_MAP.put("jm2k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2kLimit()))); + LIMIT_MAP.put("jm3k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3kLimit()))); + LIMIT_MAP.put("jbk", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbkLimit()))); + LIMIT_MAP.put("jm1k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1kLimit()))); + } /** * 处理前端控制指令:权限校验+分布式锁+转发给设备 */ @@ -106,9 +129,40 @@ public class FrontendControlHandler { String deviceTopic = "dtu/" + deviceId + "/down"; //todo mqttMessageSender.publish(deviceTopic, payload); + // testAutoOffTask(deviceId,funcCodeMap); log.info("【指令转发】前端{} → 设备{}的{}功能", clientId, deviceId, funcType); } + public void testAutoOffTask(String deviceId, Map funcCodeMap) throws MqttException { + + String funcType = funcCodeMap.keySet().iterator().next(); + Integer funcValue = funcCodeMap.get(funcType); + // 释放对应功能的分布式锁 + String lockKey = "lock:" + deviceId + ":" + funcType; + stringRedisTemplate.delete(lockKey); + + // 回执成功且值=1时启动自动关闭任务(保留原有逻辑) + if (StringUtils.hasText(funcType) && funcValue != null && funcValue == 1) { + SysAgriLimit agriLimit = agriLimitService.lambdaQuery() + .eq(SysAgriLimit::getImei, deviceId) + .one(); + int autoOffSeconds = 0; + if (agriLimit != null) { + autoOffSeconds = LIMIT_MAP.getOrDefault(funcType, k -> 0).apply(agriLimit); + } + // 新增:判断是否真的需要执行自动关任务(延迟秒数>0才是有效任务) + if (autoOffSeconds > 0) { + mqttAutoOffManager.scheduleAutoOff(deviceId, funcType, autoOffSeconds); + log.debug("【自动关任务】标记需要执行,deviceId={}, funcType={}, delay={}s", deviceId, funcType, autoOffSeconds); + } else { + log.debug("【自动关任务】标记不符合执行运行时间未配置,deviceId={}, funcType={}, delay={}s", deviceId, funcType, autoOffSeconds); + } + } + + if (StringUtils.hasText(funcType) && funcValue != null && funcValue == 0) { + mqttAutoOffManager.cancelAutoOff(deviceId, funcType); + } + } /** * 权限校验逻辑(示例) * 可根据业务需求扩展: diff --git a/agri-framework/src/main/java/com/agri/framework/manager/MqttAutoOffManager.java b/agri-framework/src/main/java/com/agri/framework/manager/MqttAutoOffManager.java index cd030de..ef76fc4 100644 --- a/agri-framework/src/main/java/com/agri/framework/manager/MqttAutoOffManager.java +++ b/agri-framework/src/main/java/com/agri/framework/manager/MqttAutoOffManager.java @@ -189,7 +189,7 @@ public class MqttAutoOffManager { autoOffFutureMap.put(taskKey, newFuture); // ✅ 新任务创建成功:增加该设备的“未完成任务数” incAutoOffCnt(deviceId); - + log.info("【当前任务队列】:{}",autoOffDeviceCnt); log.info("【自动关任务】已创建(多线程):deviceId={}, funcType={}, delay={}s", deviceId, funcType, delaySeconds); } @@ -255,6 +255,7 @@ public class MqttAutoOffManager { // 同设备同功能只保留最后一次任务:只有旧任务还没开始时才替换 ScheduledFuture oldFuture = autoOffFutureMap.get(taskKey); if (oldFuture != null) { + log.info("存在定时任务:taskKey: "+taskKey); // cancel=false 说明任务已开始/已完成,避免双执行:不再创建新任务 if (!oldFuture.cancel(false)) { return; @@ -262,7 +263,31 @@ public class MqttAutoOffManager { // cancel成功:旧任务不会跑了,这时再remove并减计数 autoOffFutureMap.remove(taskKey, oldFuture); decAutoOffCnt(deviceId); + log.info("旧任务已退出:"+taskKey); + } else { + log.info("不存在定时任务:taskKey: "+taskKey); } } + // 自动关是否启用(你可以先写死 true / false) + public boolean isEnabled() { + return true; // 之后可接配置 + } + + // 线程池是否初始化 + public boolean isExecutorInited() { + return autoOffExecutor != null; + } + + // 当前所有未完成任务数 + public int getTotalTaskCount() { + return autoOffFutureMap.size(); + } + + // 当前有自动关任务的设备数量 + public int getDeviceTaskCount() { + return autoOffDeviceCnt.size(); + } + + } \ No newline at end of file diff --git a/agri-framework/src/main/java/com/agri/framework/manager/MqttClientManager.java b/agri-framework/src/main/java/com/agri/framework/manager/MqttClientManager.java index 18852fb..e60f3d9 100644 --- a/agri-framework/src/main/java/com/agri/framework/manager/MqttClientManager.java +++ b/agri-framework/src/main/java/com/agri/framework/manager/MqttClientManager.java @@ -17,6 +17,8 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -164,9 +166,9 @@ public class MqttClientManager implements SmartLifecycle { } mqttBizPool.execute(() -> { - log.debug("mqttBizPool active={}, queue={}", - mqttBizPool.getActiveCount(), - mqttBizPool.getQueue().size()); + // log.debug("mqttBizPool active={}, queue={}", + // mqttBizPool.getActiveCount(), + // mqttBizPool.getQueue().size()); if (mqttBizPool.getActiveCount()>10 || mqttBizPool.getQueue().size()>1000) { WxUtil.pushText("线程池繁忙 正在处理中任务:"+mqttBizPool.getActiveCount()+", 剩余待进行任务:"+mqttBizPool.getQueue().size()); } @@ -247,10 +249,18 @@ public class MqttClientManager implements SmartLifecycle { /** * 获取当前MQTT连接状态 */ - public String getMqttStatus() { + public Map getMqttStatus() { boolean connected = (mqttClient != null && mqttClient.isConnected()); String status = connected ? "已连接" : "已断开"; - return String.format("MQTT连接状态:%s;clientId:%s", status, safeClientId()); + + Map data = new HashMap<>(); + data.put("enabled", mqttAutoOffManager.isEnabled()); + data.put("mqtt", status); + data.put("clientId", safeClientId()); + data.put("executorInited", mqttAutoOffManager.isExecutorInited()); + data.put("totalTaskCount", mqttAutoOffManager.getTotalTaskCount()); + data.put("deviceTaskCount", mqttAutoOffManager.getDeviceTaskCount()); + return data; } // ======================== SmartLifecycle 生命周期管理(核心修复) ======================== diff --git a/doc/mqtt/mqtt-prepare.md b/doc/mqtt/mqtt-prepare.md new file mode 100644 index 0000000..5307f92 --- /dev/null +++ b/doc/mqtt/mqtt-prepare.md @@ -0,0 +1,20 @@ +# MQTT 压测准备说明 + +## 1. 目的 +在进行 MQTT 压测前,确保后端服务、Broker、Redis 处于稳定可控状态, +避免因为环境问题导致压测结论失真。 + +## 2. 环境要求 +- MQTT Broker 已部署(服务器) +- 后端服务已启动 +- Redis 可访问 +- 压测机器已安装 mosquitto-clients + +## 3. 后端关键配置确认 +```yaml +spring: + mqtt: + auto-off-enabled: false # 压测必须关闭自动关 + clean-session: true + qos: 0 + max-inflight: 200 diff --git a/doc/mqtt/mqtt-pressure-test.md b/doc/mqtt/mqtt-pressure-test.md new file mode 100644 index 0000000..f20e095 --- /dev/null +++ b/doc/mqtt/mqtt-pressure-test.md @@ -0,0 +1,17 @@ +# MQTT 压测说明 + +## 1. 压测目标 +验证系统在大规模设备并发上报情况下的稳定性和处理能力。 + +## 2. 压测模型 +- 设备数量:1000 +- 上报周期:10 秒 / 台 +- 上报内容:8 路功能码状态包 + +## 3. 脚本位置 +scripts/mqtt/pressure_up.sh + +## 4. 启动压测 +```bash +chmod +x scripts/mqtt/pressure_up.sh +./scripts/mqtt/pressure_up.sh \ No newline at end of file diff --git a/doc/mqtt/mqtt-topics.md b/doc/mqtt/mqtt-topics.md new file mode 100644 index 0000000..a969b18 --- /dev/null +++ b/doc/mqtt/mqtt-topics.md @@ -0,0 +1,15 @@ + +```md +# MQTT Topic 规范 + +## 设备 → 后端 +- dtu/{deviceId}/up + +## 后端 → 设备 +- dtu/{deviceId}/down + +## 前端 → 后端 +- frontend/{clientId}/control/{deviceId} + +## 后端 → 前端 +- frontend/{clientId}/dtu/{deviceId}/listener diff --git a/scripts/mqtt/pressure_mix.sh b/scripts/mqtt/pressure_mix.sh new file mode 100644 index 0000000..32aada5 --- /dev/null +++ b/scripts/mqtt/pressure_mix.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash + +#(上报 + 控制) +BROKER="your.mqtt.server.ip" +PORT=1883 + +START_IMEI=862538065276000 +COUNT=1000 +CLIENT="web_001" + +echo "Start mixed pressure test" + +while true; do + OFFSET=$((RANDOM % COUNT)) + DEVICE=$((START_IMEI + OFFSET)) + mosquitto_pub -h "$BROKER" -p "$PORT" \ + -t "frontend/${CLIENT}/control/${DEVICE}" \ + -m '{"jm1g":1}' + sleep 0.2 +done diff --git a/scripts/mqtt/pressure_up.sh b/scripts/mqtt/pressure_up.sh new file mode 100644 index 0000000..516090a --- /dev/null +++ b/scripts/mqtt/pressure_up.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env bash +# (设备上报压测) +# ===== Broker 配置 ===== +BROKER="your.mqtt.server.ip" +PORT=1883 + +# ===== 设备配置(纯数字 IMEI)===== +START_IMEI=862538065276000 +COUNT=1000 # 设备数量 +INTERVAL=10 # 每台设备上报间隔(秒) + +echo "Start MQTT pressure test: $COUNT devices, interval=${INTERVAL}s" + +for ((i=0; i