mqtt自动关
parent
251bfe63a6
commit
da8928cf9c
104
README.md
104
README.md
|
|
@ -1,95 +1,15 @@
|
|||
<p align="center">
|
||||
<img alt="logo" src="https://oscimg.oschina.net/oscnet/up-d3d0a9303e11d522a06cd263f3079027715.png">
|
||||
</p>
|
||||
<h1 align="center" style="margin: 30px 0 30px; font-weight: bold;">Agri v3.9.0</h1>
|
||||
<h4 align="center">基于SpringBoot+Vue前后端分离的Java快速开发框架</h4>
|
||||
<p align="center">
|
||||
<a href="https://gitee.com/y_project/Agri-Vue/stargazers"><img src="https://gitee.com/y_project/Agri-Vue/badge/star.svg?theme=dark"></a>
|
||||
<a href="https://gitee.com/y_project/Agri-Vue"><img src="https://img.shields.io/badge/Agri-v3.9.0-brightgreen.svg"></a>
|
||||
<a href="https://gitee.com/y_project/Agri-Vue/blob/master/LICENSE"><img src="https://img.shields.io/github/license/mashape/apistatus.svg"></a>
|
||||
</p>
|
||||
# MQTT 压测脚本说明
|
||||
|
||||
## 平台简介
|
||||
## 脚本列表
|
||||
- pressure_up.sh:设备状态上报压测
|
||||
- pressure_mix.sh:上报 + 控制混合压测
|
||||
|
||||
智能农业是一套全部开源的快速开发平台,毫无保留给个人及企业免费使用。
|
||||
## 使用说明
|
||||
1. 安装 mosquitto-clients
|
||||
2. 修改脚本中的 BROKER / IMEI 范围
|
||||
3. chmod +x *.sh
|
||||
4. 运行脚本
|
||||
|
||||
* 前端采用Vue、Element UI。
|
||||
* 后端采用Spring Boot、Spring Security、Redis & Jwt。
|
||||
* 权限认证使用Jwt,支持多终端认证系统。
|
||||
* 支持加载动态权限菜单,多方式轻松权限控制。
|
||||
* 高效率开发,使用代码生成器可以一键生成前后端代码。
|
||||
* 提供了技术栈([Vue3](https://v3.cn.vuejs.org) [Element Plus](https://element-plus.org/zh-CN) [Vite](https://cn.vitejs.dev))版本[RuoYi-Vue3](https://gitcode.com/yangzongzhuan/RuoYi-Vue3),保持同步更新。
|
||||
* 提供了单应用版本[RuoYi-Vue-fast](https://gitcode.com/yangzongzhuan/RuoYi-Vue-fast),Oracle版本[RuoYi-Vue-Oracle](https://gitcode.com/yangzongzhuan/RuoYi-Vue-Oracle),保持同步更新。
|
||||
* 不分离版本,请移步[RuoYi](https://gitee.com/y_project/RuoYi),微服务版本,请移步[RuoYi-Cloud](https://gitee.com/y_project/RuoYi-Cloud)
|
||||
* 阿里云折扣场:[点我进入](http://aly.ruoyi.vip),腾讯云秒杀场:[点我进入](http://txy.ruoyi.vip)
|
||||
|
||||
## 内置功能
|
||||
|
||||
1. 用户管理:用户是系统操作者,该功能主要完成系统用户配置。
|
||||
2. 部门管理:配置系统组织机构(公司、部门、小组),树结构展现支持数据权限。
|
||||
3. 岗位管理:配置系统用户所属担任职务。
|
||||
4. 菜单管理:配置系统菜单,操作权限,按钮权限标识等。
|
||||
5. 角色管理:角色菜单权限分配、设置角色按机构进行数据范围权限划分。
|
||||
6. 字典管理:对系统中经常使用的一些较为固定的数据进行维护。
|
||||
7. 参数管理:对系统动态配置常用参数。
|
||||
8. 通知公告:系统通知公告信息发布维护。
|
||||
9. 操作日志:系统正常操作日志记录和查询;系统异常信息日志记录和查询。
|
||||
10. 登录日志:系统登录日志记录查询包含登录异常。
|
||||
11. 在线用户:当前系统中活跃用户状态监控。
|
||||
12. 定时任务:在线(添加、修改、删除)任务调度包含执行结果日志。
|
||||
13. 代码生成:前后端代码的生成(java、html、xml、sql)支持CRUD下载 。
|
||||
14. 系统接口:根据业务代码自动生成相关的api接口文档。
|
||||
15. 服务监控:监视当前系统CPU、内存、磁盘、堆栈等相关信息。
|
||||
16. 缓存监控:对系统的缓存信息查询,命令统计等。
|
||||
17. 在线构建器:拖动表单元素生成相应的HTML代码。
|
||||
18. 连接池监视:监视当前系统数据库连接池状态,可进行分析SQL找出系统性能瓶颈。
|
||||
|
||||
## 在线体验
|
||||
|
||||
- admin/admin123
|
||||
- 陆陆续续收到一些打赏,为了更好的体验已用于演示服务器升级。谢谢各位小伙伴。
|
||||
|
||||
演示地址:http://vue.ruoyi.vip
|
||||
文档地址:http://doc.ruoyi.vip
|
||||
|
||||
## 演示图
|
||||
|
||||
<table>
|
||||
<tr>
|
||||
<td><img src="https://oscimg.oschina.net/oscnet/cd1f90be5f2684f4560c9519c0f2a232ee8.jpg"/></td>
|
||||
<td><img src="https://oscimg.oschina.net/oscnet/1cbcf0e6f257c7d3a063c0e3f2ff989e4b3.jpg"/></td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><img src="https://oscimg.oschina.net/oscnet/up-8074972883b5ba0622e13246738ebba237a.png"/></td>
|
||||
<td><img src="https://oscimg.oschina.net/oscnet/up-9f88719cdfca9af2e58b352a20e23d43b12.png"/></td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><img src="https://oscimg.oschina.net/oscnet/up-39bf2584ec3a529b0d5a3b70d15c9b37646.png"/></td>
|
||||
<td><img src="https://oscimg.oschina.net/oscnet/up-936ec82d1f4872e1bc980927654b6007307.png"/></td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><img src="https://oscimg.oschina.net/oscnet/up-b2d62ceb95d2dd9b3fbe157bb70d26001e9.png"/></td>
|
||||
<td><img src="https://oscimg.oschina.net/oscnet/up-d67451d308b7a79ad6819723396f7c3d77a.png"/></td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><img src="https://oscimg.oschina.net/oscnet/5e8c387724954459291aafd5eb52b456f53.jpg"/></td>
|
||||
<td><img src="https://oscimg.oschina.net/oscnet/644e78da53c2e92a95dfda4f76e6d117c4b.jpg"/></td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><img src="https://oscimg.oschina.net/oscnet/up-8370a0d02977eebf6dbf854c8450293c937.png"/></td>
|
||||
<td><img src="https://oscimg.oschina.net/oscnet/up-49003ed83f60f633e7153609a53a2b644f7.png"/></td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><img src="https://oscimg.oschina.net/oscnet/up-d4fe726319ece268d4746602c39cffc0621.png"/></td>
|
||||
<td><img src="https://oscimg.oschina.net/oscnet/up-c195234bbcd30be6927f037a6755e6ab69c.png"/></td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><img src="https://oscimg.oschina.net/oscnet/b6115bc8c31de52951982e509930b20684a.jpg"/></td>
|
||||
<td><img src="https://oscimg.oschina.net/oscnet/up-5e4daac0bb59612c5038448acbcef235e3a.png"/></td>
|
||||
</tr>
|
||||
</table>
|
||||
|
||||
|
||||
## 智能农业前后端分离交流群
|
||||
|
||||
QQ群: [](https://jq.qq.com/?_wv=1027&k=5bVB1og) [](https://jq.qq.com/?_wv=1027&k=5eiA4DH) [](https://jq.qq.com/?_wv=1027&k=5AxMKlC) [](https://jq.qq.com/?_wv=1027&k=51G72yr) [](https://jq.qq.com/?_wv=1027&k=VvjN2nvu) [](https://jq.qq.com/?_wv=1027&k=5vYAqA05) [](https://jq.qq.com/?_wv=1027&k=kOIINEb5) [](https://jq.qq.com/?_wv=1027&k=UKtX5jhs) [](https://jq.qq.com/?_wv=1027&k=EI9an8lJ) [](https://jq.qq.com/?_wv=1027&k=SWCtLnMz) [](https://jq.qq.com/?_wv=1027&k=96Dkdq0k) [](https://jq.qq.com/?_wv=1027&k=0fsNiYZt) [](https://jq.qq.com/?_wv=1027&k=7xw4xUG1) [](https://jq.qq.com/?_wv=1027&k=eCx8eyoJ) [](https://jq.qq.com/?_wv=1027&k=SpyH2875) [](https://jq.qq.com/?_wv=1027&k=tKEt51dz) [](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=0vBbSb0ztbBgVtn3kJS-Q4HUNYwip89G&authKey=8irq5PhutrZmWIvsUsklBxhj57l%2F1nOZqjzigkXZVoZE451GG4JHPOqW7AW6cf0T&noverify=0&group_code=143961921) [](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=ZFAPAbp09S2ltvwrJzp7wGlbopsc0rwi&authKey=HB2cxpxP2yspk%2Bo3WKTBfktRCccVkU26cgi5B16u0KcAYrVu7sBaE7XSEqmMdFQp&noverify=0&group_code=174951577) [](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=Fn2aF5IHpwsy8j6VlalNJK6qbwFLFHat&authKey=uyIT%2B97x2AXj3odyXpsSpVaPMC%2Bidw0LxG5MAtEqlrcBcWJUA%2FeS43rsF1Tg7IRJ&noverify=0&group_code=161281055) [](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=XIzkm_mV2xTsUtFxo63bmicYoDBA6Ifm&authKey=dDW%2F4qsmw3x9govoZY9w%2FoWAoC4wbHqGal%2BbqLzoS6VBarU8EBptIgPKN%2FviyC8j&noverify=0&group_code=138988063) [](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=DkugnCg68PevlycJSKSwjhFqfIgrWWwR&authKey=pR1Pa5lPIeGF%2FFtIk6d%2FGB5qFi0EdvyErtpQXULzo03zbhopBHLWcuqdpwY241R%2F&noverify=0&group_code=151450850) [](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=F58bgRa-Dp-rsQJThiJqIYv8t4-lWfXh&authKey=UmUs4CVG5OPA1whvsa4uSespOvyd8%2FAr9olEGaWAfdLmfKQk%2FVBp2YU3u2xXXt76&noverify=0&group_code=224622315) [](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=Nxb2EQ5qozWa218Wbs7zgBnjLSNk_tVT&authKey=obBKXj6SBKgrFTJZx0AqQnIYbNOvBB2kmgwWvGhzxR67RoRr84%2Bus5OadzMcdJl5&noverify=0&group_code=287842588) [](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=numtK1M_I4eVd2Gvg8qtbuL8JgX42qNh&authKey=giV9XWMaFZTY%2FqPlmWbkB9g3fi0Ev5CwEtT9Tgei0oUlFFCQLDp4ozWRiVIzubIm&noverify=0&group_code=187944233) [](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=G6r5KGCaa3pqdbUSXNIgYloyb8e0_L0D&authKey=4w8tF1eGW7%2FedWn%2FHAypQksdrML%2BDHolQSx7094Agm7Luakj9EbfPnSTxSi2T1LQ&noverify=0&group_code=228578329) [](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=GsOo-OLz53J8y_9TPoO6XXSGNRTgbFxA&authKey=R7Uy%2Feq%2BZsoKNqHvRKhiXpypW7DAogoWapOawUGHokJSBIBIre2%2FoiAZeZBSLuBc&noverify=0&group_code=191164766) [](http://qm.qq.com/cgi-bin/qm/qr?_wv=1027&k=PmYavuzsOthVqfdAPbo4uAeIbu7Ttjgc&authKey=p52l8%2FXa4PS1JcEmS3VccKSwOPJUZ1ZfQ69MEKzbrooNUljRtlKjvsXf04bxNp3G&noverify=0&group_code=174569686) 点击按钮入群。
|
||||
## 停止压测
|
||||
```bash
|
||||
pkill mosquitto_pub
|
||||
|
|
@ -3,13 +3,16 @@ package com.agri.web.controller.mqtt;
|
|||
import com.agri.common.annotation.Log;
|
||||
import com.agri.common.core.domain.AjaxResult;
|
||||
import com.agri.common.enums.BusinessType;
|
||||
import com.agri.framework.manager.MqttAutoOffManager;
|
||||
import com.agri.framework.manager.MqttClientManager;
|
||||
import com.agri.framework.manager.MqttSubscriptionManager;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.DeleteMapping;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
|
|
@ -32,6 +35,8 @@ public class MqttController {
|
|||
@Resource
|
||||
private MqttClientManager mqttClientManager;
|
||||
|
||||
@Autowired
|
||||
private MqttAutoOffManager mqttAutoOffManager;
|
||||
/**
|
||||
* 单个订阅
|
||||
*/
|
||||
|
|
@ -125,12 +130,21 @@ public class MqttController {
|
|||
*/
|
||||
@GetMapping("/status")
|
||||
@Log(title = "手动触发MQTT重连", businessType = BusinessType.SELECT)
|
||||
public String getMqttStatus() {
|
||||
public AjaxResult getMqttStatus() {
|
||||
try {
|
||||
return mqttClientManager.getMqttStatus();
|
||||
return AjaxResult.success(mqttClientManager.getMqttStatus());
|
||||
} catch (Exception e) {
|
||||
log.error("查询MQTT连接状态异常", e);
|
||||
return "查询状态失败:" + e.getMessage();
|
||||
return AjaxResult.error("查询状态失败:" + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 2️⃣ 查询某个设备是否存在自动关任务
|
||||
*/
|
||||
@GetMapping("/device/{deviceId}")
|
||||
public AjaxResult device(@PathVariable String deviceId) {
|
||||
boolean hasTask = mqttAutoOffManager.hasAutoOffTask(deviceId);
|
||||
return AjaxResult.success(hasTask);
|
||||
}
|
||||
}
|
||||
|
|
@ -189,7 +189,7 @@ public class DeviceStatusHandler {
|
|||
latestTtlSeconds,
|
||||
TimeUnit.SECONDS
|
||||
);
|
||||
log.debug("【设备状态包】写入Redis成功,deviceId={}", deviceId);
|
||||
// log.debug("【设备状态包】写入Redis成功,deviceId={}", deviceId);
|
||||
}
|
||||
}
|
||||
// 非回执消息:正常转发给订阅前端
|
||||
|
|
|
|||
|
|
@ -1,19 +1,25 @@
|
|||
package com.agri.framework.interceptor;
|
||||
|
||||
import com.agri.framework.config.MqttConfig;
|
||||
import com.agri.framework.manager.MqttAutoOffManager;
|
||||
import com.agri.system.domain.SysAgriLimit;
|
||||
import com.agri.system.service.ISysAgriLimitService;
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.alibaba.fastjson2.TypeReference;
|
||||
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* 前端控制指令处理器
|
||||
|
|
@ -43,6 +49,23 @@ public class FrontendControlHandler {
|
|||
@Resource
|
||||
private MqttConfig.MqttMessageSender mqttMessageSender;
|
||||
|
||||
@Resource
|
||||
private MqttAutoOffManager mqttAutoOffManager;
|
||||
|
||||
@Autowired
|
||||
private ISysAgriLimitService agriLimitService;
|
||||
|
||||
private static final Map<String, Function<SysAgriLimit, Integer>> LIMIT_MAP = new HashMap<>();
|
||||
static {
|
||||
LIMIT_MAP.put("jm1g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1gLimit())));
|
||||
LIMIT_MAP.put("jm2g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2gLimit())));
|
||||
LIMIT_MAP.put("jbg", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbgLimit())));
|
||||
LIMIT_MAP.put("jm3g", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3gLimit())));
|
||||
LIMIT_MAP.put("jm2k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm2kLimit())));
|
||||
LIMIT_MAP.put("jm3k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm3kLimit())));
|
||||
LIMIT_MAP.put("jbk", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJbkLimit())));
|
||||
LIMIT_MAP.put("jm1k", agriLimit -> Integer.parseInt(String.valueOf(agriLimit.getJm1kLimit())));
|
||||
}
|
||||
/**
|
||||
* 处理前端控制指令:权限校验+分布式锁+转发给设备
|
||||
*/
|
||||
|
|
@ -106,9 +129,40 @@ public class FrontendControlHandler {
|
|||
String deviceTopic = "dtu/" + deviceId + "/down";
|
||||
//todo
|
||||
mqttMessageSender.publish(deviceTopic, payload);
|
||||
// testAutoOffTask(deviceId,funcCodeMap);
|
||||
log.info("【指令转发】前端{} → 设备{}的{}功能", clientId, deviceId, funcType);
|
||||
}
|
||||
|
||||
public void testAutoOffTask(String deviceId, Map<String,Integer> funcCodeMap) throws MqttException {
|
||||
|
||||
String funcType = funcCodeMap.keySet().iterator().next();
|
||||
Integer funcValue = funcCodeMap.get(funcType);
|
||||
// 释放对应功能的分布式锁
|
||||
String lockKey = "lock:" + deviceId + ":" + funcType;
|
||||
stringRedisTemplate.delete(lockKey);
|
||||
|
||||
// 回执成功且值=1时启动自动关闭任务(保留原有逻辑)
|
||||
if (StringUtils.hasText(funcType) && funcValue != null && funcValue == 1) {
|
||||
SysAgriLimit agriLimit = agriLimitService.lambdaQuery()
|
||||
.eq(SysAgriLimit::getImei, deviceId)
|
||||
.one();
|
||||
int autoOffSeconds = 0;
|
||||
if (agriLimit != null) {
|
||||
autoOffSeconds = LIMIT_MAP.getOrDefault(funcType, k -> 0).apply(agriLimit);
|
||||
}
|
||||
// 新增:判断是否真的需要执行自动关任务(延迟秒数>0才是有效任务)
|
||||
if (autoOffSeconds > 0) {
|
||||
mqttAutoOffManager.scheduleAutoOff(deviceId, funcType, autoOffSeconds);
|
||||
log.debug("【自动关任务】标记需要执行,deviceId={}, funcType={}, delay={}s", deviceId, funcType, autoOffSeconds);
|
||||
} else {
|
||||
log.debug("【自动关任务】标记不符合执行运行时间未配置,deviceId={}, funcType={}, delay={}s", deviceId, funcType, autoOffSeconds);
|
||||
}
|
||||
}
|
||||
|
||||
if (StringUtils.hasText(funcType) && funcValue != null && funcValue == 0) {
|
||||
mqttAutoOffManager.cancelAutoOff(deviceId, funcType);
|
||||
}
|
||||
}
|
||||
/**
|
||||
* 权限校验逻辑(示例)
|
||||
* 可根据业务需求扩展:
|
||||
|
|
|
|||
|
|
@ -189,7 +189,7 @@ public class MqttAutoOffManager {
|
|||
autoOffFutureMap.put(taskKey, newFuture);
|
||||
// ✅ 新任务创建成功:增加该设备的“未完成任务数”
|
||||
incAutoOffCnt(deviceId);
|
||||
|
||||
log.info("【当前任务队列】:{}",autoOffDeviceCnt);
|
||||
log.info("【自动关任务】已创建(多线程):deviceId={}, funcType={}, delay={}s", deviceId, funcType, delaySeconds);
|
||||
}
|
||||
|
||||
|
|
@ -255,6 +255,7 @@ public class MqttAutoOffManager {
|
|||
// 同设备同功能只保留最后一次任务:只有旧任务还没开始时才替换
|
||||
ScheduledFuture<?> oldFuture = autoOffFutureMap.get(taskKey);
|
||||
if (oldFuture != null) {
|
||||
log.info("存在定时任务:taskKey: "+taskKey);
|
||||
// cancel=false 说明任务已开始/已完成,避免双执行:不再创建新任务
|
||||
if (!oldFuture.cancel(false)) {
|
||||
return;
|
||||
|
|
@ -262,7 +263,31 @@ public class MqttAutoOffManager {
|
|||
// cancel成功:旧任务不会跑了,这时再remove并减计数
|
||||
autoOffFutureMap.remove(taskKey, oldFuture);
|
||||
decAutoOffCnt(deviceId);
|
||||
log.info("旧任务已退出:"+taskKey);
|
||||
} else {
|
||||
log.info("不存在定时任务:taskKey: "+taskKey);
|
||||
}
|
||||
}
|
||||
|
||||
// 自动关是否启用(你可以先写死 true / false)
|
||||
public boolean isEnabled() {
|
||||
return true; // 之后可接配置
|
||||
}
|
||||
|
||||
// 线程池是否初始化
|
||||
public boolean isExecutorInited() {
|
||||
return autoOffExecutor != null;
|
||||
}
|
||||
|
||||
// 当前所有未完成任务数
|
||||
public int getTotalTaskCount() {
|
||||
return autoOffFutureMap.size();
|
||||
}
|
||||
|
||||
// 当前有自动关任务的设备数量
|
||||
public int getDeviceTaskCount() {
|
||||
return autoOffDeviceCnt.size();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
@ -17,6 +17,8 @@ import org.springframework.stereotype.Component;
|
|||
import javax.annotation.Resource;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
|
@ -164,9 +166,9 @@ public class MqttClientManager implements SmartLifecycle {
|
|||
}
|
||||
|
||||
mqttBizPool.execute(() -> {
|
||||
log.debug("mqttBizPool active={}, queue={}",
|
||||
mqttBizPool.getActiveCount(),
|
||||
mqttBizPool.getQueue().size());
|
||||
// log.debug("mqttBizPool active={}, queue={}",
|
||||
// mqttBizPool.getActiveCount(),
|
||||
// mqttBizPool.getQueue().size());
|
||||
if (mqttBizPool.getActiveCount()>10 || mqttBizPool.getQueue().size()>1000) {
|
||||
WxUtil.pushText("线程池繁忙 正在处理中任务:"+mqttBizPool.getActiveCount()+", 剩余待进行任务:"+mqttBizPool.getQueue().size());
|
||||
}
|
||||
|
|
@ -247,10 +249,18 @@ public class MqttClientManager implements SmartLifecycle {
|
|||
/**
|
||||
* 获取当前MQTT连接状态
|
||||
*/
|
||||
public String getMqttStatus() {
|
||||
public Map<String,Object> getMqttStatus() {
|
||||
boolean connected = (mqttClient != null && mqttClient.isConnected());
|
||||
String status = connected ? "已连接" : "已断开";
|
||||
return String.format("MQTT连接状态:%s;clientId:%s", status, safeClientId());
|
||||
|
||||
Map<String, Object> data = new HashMap<>();
|
||||
data.put("enabled", mqttAutoOffManager.isEnabled());
|
||||
data.put("mqtt", status);
|
||||
data.put("clientId", safeClientId());
|
||||
data.put("executorInited", mqttAutoOffManager.isExecutorInited());
|
||||
data.put("totalTaskCount", mqttAutoOffManager.getTotalTaskCount());
|
||||
data.put("deviceTaskCount", mqttAutoOffManager.getDeviceTaskCount());
|
||||
return data;
|
||||
}
|
||||
|
||||
// ======================== SmartLifecycle 生命周期管理(核心修复) ========================
|
||||
|
|
|
|||
|
|
@ -0,0 +1,20 @@
|
|||
# MQTT 压测准备说明
|
||||
|
||||
## 1. 目的
|
||||
在进行 MQTT 压测前,确保后端服务、Broker、Redis 处于稳定可控状态,
|
||||
避免因为环境问题导致压测结论失真。
|
||||
|
||||
## 2. 环境要求
|
||||
- MQTT Broker 已部署(服务器)
|
||||
- 后端服务已启动
|
||||
- Redis 可访问
|
||||
- 压测机器已安装 mosquitto-clients
|
||||
|
||||
## 3. 后端关键配置确认
|
||||
```yaml
|
||||
spring:
|
||||
mqtt:
|
||||
auto-off-enabled: false # 压测必须关闭自动关
|
||||
clean-session: true
|
||||
qos: 0
|
||||
max-inflight: 200
|
||||
|
|
@ -0,0 +1,17 @@
|
|||
# MQTT 压测说明
|
||||
|
||||
## 1. 压测目标
|
||||
验证系统在大规模设备并发上报情况下的稳定性和处理能力。
|
||||
|
||||
## 2. 压测模型
|
||||
- 设备数量:1000
|
||||
- 上报周期:10 秒 / 台
|
||||
- 上报内容:8 路功能码状态包
|
||||
|
||||
## 3. 脚本位置
|
||||
scripts/mqtt/pressure_up.sh
|
||||
|
||||
## 4. 启动压测
|
||||
```bash
|
||||
chmod +x scripts/mqtt/pressure_up.sh
|
||||
./scripts/mqtt/pressure_up.sh
|
||||
|
|
@ -0,0 +1,15 @@
|
|||
|
||||
```md
|
||||
# MQTT Topic 规范
|
||||
|
||||
## 设备 → 后端
|
||||
- dtu/{deviceId}/up
|
||||
|
||||
## 后端 → 设备
|
||||
- dtu/{deviceId}/down
|
||||
|
||||
## 前端 → 后端
|
||||
- frontend/{clientId}/control/{deviceId}
|
||||
|
||||
## 后端 → 前端
|
||||
- frontend/{clientId}/dtu/{deviceId}/listener
|
||||
|
|
@ -0,0 +1,20 @@
|
|||
#!/usr/bin/env bash
|
||||
|
||||
#(上报 + 控制)
|
||||
BROKER="your.mqtt.server.ip"
|
||||
PORT=1883
|
||||
|
||||
START_IMEI=862538065276000
|
||||
COUNT=1000
|
||||
CLIENT="web_001"
|
||||
|
||||
echo "Start mixed pressure test"
|
||||
|
||||
while true; do
|
||||
OFFSET=$((RANDOM % COUNT))
|
||||
DEVICE=$((START_IMEI + OFFSET))
|
||||
mosquitto_pub -h "$BROKER" -p "$PORT" \
|
||||
-t "frontend/${CLIENT}/control/${DEVICE}" \
|
||||
-m '{"jm1g":1}'
|
||||
sleep 0.2
|
||||
done
|
||||
|
|
@ -0,0 +1,26 @@
|
|||
#!/usr/bin/env bash
|
||||
# (设备上报压测)
|
||||
# ===== Broker 配置 =====
|
||||
BROKER="your.mqtt.server.ip"
|
||||
PORT=1883
|
||||
|
||||
# ===== 设备配置(纯数字 IMEI)=====
|
||||
START_IMEI=862538065276000
|
||||
COUNT=1000 # 设备数量
|
||||
INTERVAL=10 # 每台设备上报间隔(秒)
|
||||
|
||||
echo "Start MQTT pressure test: $COUNT devices, interval=${INTERVAL}s"
|
||||
|
||||
for ((i=0; i<COUNT; i++)); do
|
||||
(
|
||||
DEVICE=$((START_IMEI + i))
|
||||
while true; do
|
||||
mosquitto_pub -h "$BROKER" -p "$PORT" \
|
||||
-t "dtu/${DEVICE}/up" \
|
||||
-m '{"jm1g":0,"jm2g":0,"jbg":0,"jm3g":0,"jm2k":0,"jm3k":0,"jbk":0,"jm1k":0}'
|
||||
sleep "$INTERVAL"
|
||||
done
|
||||
) &
|
||||
done
|
||||
|
||||
wait
|
||||
Loading…
Reference in New Issue