mqtt基础版

feasure
xce 2026-01-16 18:13:24 +08:00
parent 38e73ba0d2
commit 6fd96b188c
6 changed files with 329 additions and 20 deletions

View File

@ -1,7 +1,15 @@
package com.agri.web.controller.mqtt;
import com.agri.framework.interceptor.MqttMessageHandler;
import org.springframework.web.bind.annotation.*;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.List;
@ -13,6 +21,8 @@ import java.util.List;
@RequestMapping("/api/mqtt")
public class MqttController {
private static final Logger log = LoggerFactory.getLogger(MqttController.class);
@Resource
private MqttMessageHandler mqttMessageHandler;
@ -21,8 +31,16 @@ public class MqttController {
*/
@PostMapping("/single")
public String subscribe(@RequestParam String clientId, @RequestParam String deviceId) {
try {
mqttMessageHandler.subscribeDevice(clientId, deviceId);
return "订阅成功";
} catch (IllegalArgumentException e) {
log.error("MQTT单个订阅失败{}", e.getMessage());
return "订阅失败:" + e.getMessage();
} catch (Exception e) {
log.error("MQTT单个订阅异常", e);
return "订阅失败:系统异常";
}
}
/**
@ -30,8 +48,16 @@ public class MqttController {
*/
@DeleteMapping("/single")
public String unsubscribe(@RequestParam String clientId, @RequestParam String deviceId) {
try {
mqttMessageHandler.unsubscribeDevice(clientId, deviceId);
return "取消订阅成功";
} catch (IllegalArgumentException e) {
log.error("MQTT单个取消订阅失败{}", e.getMessage());
return "取消订阅失败:" + e.getMessage();
} catch (Exception e) {
log.error("MQTT单个取消订阅异常", e);
return "取消订阅失败:系统异常";
}
}
/**
@ -39,7 +65,44 @@ public class MqttController {
*/
@DeleteMapping("/batch")
public List<String> unsubscribeAll(@RequestParam String clientId) {
try {
// 返回前端需要取消的MQTT主题列表
return mqttMessageHandler.unsubscribeAllDevice(clientId);
} catch (IllegalArgumentException e) {
log.error("MQTT批量取消订阅失败{}", e.getMessage());
// 异常时返回空列表,避免前端解析失败
return Lists.newArrayList();
} catch (Exception e) {
log.error("MQTT批量取消订阅异常", e);
return Lists.newArrayList();
}
}
/**
* MQTT
*
*/
@GetMapping("/reconnect")
public String manualReconnect() {
try {
return mqttMessageHandler.manualReconnect();
} catch (Exception e) {
log.error("MQTT手动重连异常", e);
return "手动重连失败:" + e.getMessage();
}
}
/**
* MQTT
* 便
*/
@GetMapping("/status")
public String getMqttStatus() {
try {
return mqttMessageHandler.getMqttStatus();
} catch (Exception e) {
log.error("查询MQTT连接状态异常", e);
return "查询状态失败:" + e.getMessage();
}
}
}

View File

@ -5,8 +5,11 @@ spring:
ws-host: wss://mq.xiaoces.com/mqtt # 前端的WebSocket地址
username: admin # Mosquitto共用账号
password: Admin#12345678 # Mosquitto密码
client-id: springboot-backend-${random.uuid} # 后端客户端ID唯一
client-id: springboot-backend # 截取UUID前8位自动去横线
default-topic: dtu/+/up,frontend/+/control/+ # 后端监听的主题
qos: 1 # 消息可靠性
timeout: 60 # 连接超时
keep-alive: 60 # 心跳间隔
# 新增重连配置
reconnect-interval: 5 # 重连间隔(秒)
max-reconnect-times: -1 # 最大重连次数(-1=无限重连)

View File

@ -1,11 +1,12 @@
package com.agri.common.utils.uuid;
import com.agri.common.exception.UtilException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import com.agri.common.exception.UtilException;
/**
* universally unique identifierUUID
@ -481,4 +482,21 @@ public final class UUID implements java.io.Serializable, Comparable<UUID>
{
return ThreadLocalRandom.current();
}
/**
* Spring ${random.alphanumeric:n}
* 0-9 + a-z + A-ZSpring
*/
public static String generateAlphanumericRandom(int length) {
// 定义字母数字混合的字符集和Spring ${random.alphanumeric}的字符集一致)
String chars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
StringBuilder sb = new StringBuilder();
// 基于JVM的随机数生成保证随机性
for (int i = 0; i < length; i++) {
int randomIndex = (int) (Math.random() * chars.length());
sb.append(chars.charAt(randomIndex));
}
return sb.toString();
}
}

View File

@ -1,5 +1,6 @@
package com.agri.framework.config;
import com.agri.common.utils.uuid.UUID;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
@ -14,7 +15,6 @@ import org.springframework.util.StringUtils;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
/**
* MQTT
@ -92,7 +92,7 @@ public class MqttConfig {
MemoryPersistence persistence = new MemoryPersistence();
// 新增客户端ID拼接随机后缀避免多实例部署时冲突生产环境必备
String uniqueClientId = clientId + "_" + UUID.randomUUID().toString().substring(0, 8);
String uniqueClientId = clientId + "_" + UUID.generateAlphanumericRandom(8);
MqttClient mqttClient = new MqttClient(host, uniqueClientId, persistence);
// 3. 建立MQTT连接
@ -228,4 +228,13 @@ public class MqttConfig {
}
}
}
/**
* MQTTBean使
*/
@Bean
public MqttConnectOptions mqttConnectOptions() {
return getMqttConnectOptions();
}
}

View File

@ -111,7 +111,7 @@ public class SecurityConfig
.authorizeHttpRequests((requests) -> {
permitAllUrl.getUrls().forEach(url -> requests.antMatchers(url).permitAll());
// 对于登录login 注册register 验证码captchaImage 允许匿名访问
requests.antMatchers("/login", "/register", "/captchaImage").permitAll()
requests.antMatchers("/login", "/register", "/captchaImage","/api/mqtt/status").permitAll()
// 静态资源,可匿名访问
.antMatchers(HttpMethod.GET, "/", "/*.html", "/**/*.html", "/**/*.css", "/**/*.js", "/profile/**").permitAll()
.antMatchers("/swagger-ui.html", "/swagger-resources/**", "/webjars/**", "/*/api-docs", "/druid/**").permitAll()

View File

@ -1,11 +1,14 @@
package com.agri.framework.interceptor;
import com.agri.common.utils.uuid.UUID;
import com.agri.framework.config.MqttConfig;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
@ -25,6 +28,8 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -62,11 +67,48 @@ public class MqttMessageHandler implements SmartLifecycle {
// 新增生命周期管理标识控制MQTT客户端启动/关闭
private final AtomicBoolean isRunning = new AtomicBoolean(false);
/** MQTT连接配置项从MqttConfig注入 */
@Resource
private MqttConnectOptions mqttConnectOptions;
// ========== 新增:重连相关配置 ==========
// 重连间隔(秒),可配置化
@Value("${spring.mqtt.reconnect-interval:5}")
private int reconnectInterval;
// 最大重连次数(-1表示无限重连
@Value("${spring.mqtt.max-reconnect-times:-1}")
private int maxReconnectTimes;
// 重连线程池(单线程,避免并发重连)
private final ScheduledExecutorService reconnectExecutor = Executors.newSingleThreadScheduledExecutor();
// 当前重连次数计数
private int currentReconnectCount = 0;
// 配置错误标记(避免配置错时无限重连)
private volatile boolean isConfigError = false;
/**
* +
* @PostConstructSmartLifecyclestart()
*/
public void subscribeTopics() throws MqttException {
// 关键补充1判空重连后替换的新实例可能为空
if (mqttClient == null) {
log.error("【MQTT初始化】客户端实例为空无法订阅主题");
throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
}
// 关键补充:先确保客户端已连接(如果没连则连接,已连则跳过)
if (!mqttClient.isConnected()) {
try {
// 使用注入的连接配置项连接Broker带用户名密码、重连等配置
mqttClient.connect(mqttConnectOptions);
log.info("【MQTT连接】客户端已成功连接到BrokerclientId{}", mqttClient.getClientId());
} catch (MqttException e) {
log.error("【MQTT连接】连接Broker失败clientId{}", mqttClient.getClientId(), e);
throw e; // 抛出异常让start()处理重连
}
}
// 解析配置的主题列表
String[] topics = defaultTopic.split(",");
int[] qosArray = new int[topics.length];
@ -75,6 +117,14 @@ public class MqttMessageHandler implements SmartLifecycle {
qosArray[i] = 1;
}
// 关键补充2先取消原有订阅避免重复订阅导致的消息重复接收
try {
mqttClient.unsubscribe(topics);
log.info("【MQTT初始化】已取消原有主题订阅准备重新订阅");
} catch (Exception e) {
log.warn("【MQTT初始化】取消原有订阅失败首次订阅可忽略{}", e.getMessage());
}
// 设置MQTT消息回调处理连接断开、消息接收、消息发布完成
mqttClient.setCallback(new MqttCallback() {
/**
@ -84,7 +134,11 @@ public class MqttMessageHandler implements SmartLifecycle {
@Override
public void connectionLost(Throwable cause) {
// 优化替换System.err为log.error
log.error("【MQTT连接异常】连接断开{}", cause.getMessage(), cause);
log.error("【MQTT连接异常】连接断开clientId{},原因:{}", mqttClient.getClientId(), cause.getMessage(), cause);
// 新增:触发自动重连(仅当客户端处于运行状态且非配置错误时)
if (isRunning.get() && !isConfigError) {
startReconnect();
}
}
/**
@ -107,15 +161,18 @@ public class MqttMessageHandler implements SmartLifecycle {
public void deliveryComplete(IMqttDeliveryToken token) {
// 优化替换System.out为log.info增加空值校验
if (token != null && token.getTopics() != null && token.getTopics().length > 0) {
log.info("【MQTT确认】消息发布完成主题:{}", token.getTopics()[0]);
log.info("【MQTT确认】消息发布完成clientId{}主题:{}", mqttClient.getClientId(), token.getTopics()[0]);
}
}
});
// 订阅主题
mqttClient.subscribe(topics, qosArray);
// 优化替换System.out为log.info
log.info("【MQTT初始化】订阅主题{}", String.join(",", topics));
// 重置重连计数和配置错误标记(连接成功后清零)
currentReconnectCount = 0;
isConfigError = false;
// 优化打印clientId方便排查重连后的实例是否替换成功
log.info("【MQTT初始化】订阅主题完成clientId{},订阅主题:{}", mqttClient.getClientId(), String.join(",", topics));
}
/**
@ -367,6 +424,148 @@ public class MqttMessageHandler implements SmartLifecycle {
return keys;
}
// ========== 新增:自动重连核心方法 ==========
/**
*
*/
private void startReconnect() {
// 终止条件:配置错误 或 达到最大重连次数
if (isConfigError || (maxReconnectTimes > 0 && currentReconnectCount >= maxReconnectTimes)) {
String reason = isConfigError ? "配置错误" : String.format("达到最大重连次数(%d)", maxReconnectTimes);
log.error("【MQTT重连】{},停止重连", reason);
// isRunning.set(false);
return;
}
// 极端场景兜底:客户端实例为空则终止重连
if (mqttClient == null) {
log.error("【MQTT重连】客户端实例为空终止重连");
return;
}
// 提交重连任务到单线程池
reconnectExecutor.schedule(() -> {
try {
currentReconnectCount++;
log.info("【MQTT重连】第{}次尝试重连(间隔{}秒)", currentReconnectCount, reconnectInterval);
// 兼容极端场景:先强制断开(无论当前状态),再重新连接
try {
if (mqttClient.isConnected()) {
mqttClient.disconnect();
log.info("【MQTT重连】已断开旧连接");
}
} catch (Exception e) {
log.warn("【MQTT重连】断开旧连接失败忽略{}", e.getMessage());
}
// 2. 生成新的clientId和MqttConfig中一致的规则原clientId + 随机后缀)
// 新代码(字母数字混合,和配置文件对齐)
String oldClientId = mqttClient.getClientId();
String originalClientId = oldClientId.contains("_") ? oldClientId.split("_")[0] : oldClientId;
// 调用工具方法生成8位字母数字随机串
String shortRandom = UUID.generateAlphanumericRandom(8);
String newClientId = originalClientId + "_" + shortRandom;
// 3. 创建新的MqttClient实例
MqttClient newMqttClient = new MqttClient(
mqttClient.getServerURI(),
newClientId,
new MemoryPersistence()
);
// 4. 用新客户端连接
newMqttClient.connect(mqttConnectOptions);
log.info("【MQTT重连】使用新clientId连接成功{}", newClientId);
// 5. 替换旧客户端实例,重新订阅
this.mqttClient = newMqttClient;
// 重新连接MQTT Broker + 重新订阅主题
subscribeTopics();
log.info("【MQTT重连】第{}次重连成功新clientId{}", currentReconnectCount, newClientId);
} catch (MqttException e) {
log.error("【MQTT重连】第{}次重连失败:{}", currentReconnectCount, e.getMessage(), e);
// 判断是否为配置类错误(永久错误)
judgeConfigError(e);
// 非配置错误则继续重连
if (!isConfigError) {
startReconnect();
}
}
}, reconnectInterval, TimeUnit.SECONDS);
}
/**
* MQTT/
* Paho
*/
private void judgeConfigError(MqttException e) {
int errorCode = e.getReasonCode();
switch (errorCode) {
// 21 = 认证失败(用户名/密码错误)→ 配置错误
case 21:
isConfigError = true;
log.error("【MQTT配置错误】认证失败请检查用户名/密码,错误码:{}", errorCode);
break;
// 32 = 客户端ID非法 → 配置错误
case 32:
isConfigError = true;
log.error("【MQTT配置错误】客户端ID非法请检查clientId配置错误码{}", errorCode);
break;
// 3 = 连接被拒绝(地址/端口错误)→ 配置错误
case 3:
isConfigError = true;
log.error("【MQTT配置错误】连接被拒绝请检查Broker地址/端口,错误码:{}", errorCode);
break;
// 31 = Broker不可达网络波动→ 继续重连
case 31:
log.warn("【MQTT错误】Broker不可达网络波动错误码{},继续重连", errorCode);
break;
// 其他错误 → 默认视为临时错误
default:
log.warn("【MQTT未知错误】错误码{},继续重连", errorCode);
break;
}
}
// ========== 新增手动重连接口供Controller调用 ==========
/**
* MQTT
*/
public String manualReconnect() {
// 重置配置错误标记(允许重连)
isConfigError = false;
currentReconnectCount = 0;
isRunning.set(true);
try {
// 强制断开旧连接(如果存在)
if (mqttClient.isConnected()) {
mqttClient.disconnect();
}
// 重新初始化订阅
subscribeTopics();
log.info("【手动重连】MQTT客户端重连成功");
return "MQTT手动重连成功";
} catch (MqttException e) {
log.error("【手动重连】MQTT客户端重连失败", e);
judgeConfigError(e);
return "MQTT手动重连失败" + e.getMessage();
}
}
/**
* MQTT
*/
public String getMqttStatus() {
boolean connected = mqttClient.isConnected();
String status = connected ? "已连接" : "已断开";
String configErrorStatus = isConfigError ? "(配置错误,已终止自动重连)" : "";
String reconnectStatus = String.format("当前重连次数:%d最大重连次数%d", currentReconnectCount, maxReconnectTimes);
return String.format("MQTT连接状态%s%s%s", status, configErrorStatus, reconnectStatus);
}
// ======================== SmartLifecycle 生命周期管理(核心修复) ========================
/**
* MQTTSpring/
@ -374,14 +573,22 @@ public class MqttMessageHandler implements SmartLifecycle {
*/
@Override
public void start() {
log.info("开始监听");
if (isRunning.compareAndSet(false, true)) {
log.info("开始监听111");
try {
// 重新初始化MQTT订阅和回调
// 核心修改:无论是否已连接,都执行订阅(设置回调+订阅主题)
// 移除原有的if (!mqttClient.isConnected()) 判断
log.info("开始监听222");
subscribeTopics();
log.info("【MQTT生命周期】客户端启动成功");
log.info("【MQTT生命周期】客户端启动成功(已设置回调+订阅主题)");
} catch (MqttException e) {
log.error("【MQTT生命周期】客户端启动失败", e);
isRunning.set(false);
judgeConfigError(e); // 启动失败时判断配置错误
if (!isConfigError) {
startReconnect();
}
}
}
}
@ -395,6 +602,12 @@ public class MqttMessageHandler implements SmartLifecycle {
// 修复JDK 8正确的compareAndSet写法无命名参数
if (isRunning.compareAndSet(true, false)) {
try {
// 新增:关闭重连线程池,避免内存泄漏
reconnectExecutor.shutdown();
if (!reconnectExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
reconnectExecutor.shutdownNow();
}
if (mqttClient != null && mqttClient.isConnected()) {
// 移除:取消订阅相关逻辑(避免依赖不存在的方法)
// 直接断开连接基础Paho支持的核心方法
@ -405,7 +618,10 @@ public class MqttMessageHandler implements SmartLifecycle {
}
// 移除resetConnection()避免Redis版本差异
// 替代方案无需主动重置Spring上下文重启会重新创建Redis连接
} catch (MqttException e) {
// 新增:重置重连相关状态
currentReconnectCount = 0;
isConfigError = false;
} catch (MqttException | InterruptedException e) {
log.error("【MQTT生命周期】客户端关闭失败", e);
}
}