mqtt基础功能

feasure
xce 2026-01-17 00:46:50 +08:00
parent bd24cec8cc
commit e7b2f43cf0
2 changed files with 125 additions and 51 deletions

View File

@ -2,14 +2,11 @@ spring:
# MQTT配置 # MQTT配置
mqtt: mqtt:
host: tcp://122.51.109.52:1883 # 设备/后端的MQTT TCP地址 host: tcp://122.51.109.52:1883 # 设备/后端的MQTT TCP地址
ws-host: wss://mq.xiaoces.com/mqtt # 前端的WebSocket地址
username: admin # Mosquitto共用账号 username: admin # Mosquitto共用账号
password: Admin#12345678 # Mosquitto密码 password: Admin#12345678 # Mosquitto密码
client-id: springboot-backend # 截取UUID前8位自动去横线 client-id: springboot-backend # 截取UUID前8位自动去横线
default-topic: dtu/+/up,frontend/+/control/+ # 后端监听的主题 default-topic: dtu/+/up,frontend/+/down/+ # 后端监听的主题
qos: 1 # 消息可靠性 qos: 1 # 消息可靠性
timeout: 60 # 连接超时 timeout: 60 # 连接超时
keep-alive: 60 # 心跳间隔 keep-alive: 60 # 心跳间隔
# 新增重连配置
reconnect-interval: 5 # 重连间隔(秒)
max-reconnect-times: -1 # 最大重连次数(-1=无限重连)

View File

@ -1,6 +1,9 @@
package com.agri.framework.interceptor; package com.agri.framework.interceptor;
import com.agri.framework.config.MqttConfig; import com.agri.framework.config.MqttConfig;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.TypeReference;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttClient;
@ -25,6 +28,7 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -37,7 +41,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
* 3. * 3.
* 4. ++ * 4. ++
* JDK 8 * JDK 8
* * <p>
* A * A
* 1) new MqttClient mqttMessageSender client * 1) new MqttClient mqttMessageSender client
* 2) MqttConnectOptions#setAutomaticReconnect(true) Paho * 2) MqttConnectOptions#setAutomaticReconnect(true) Paho
@ -46,20 +50,26 @@ import java.util.concurrent.atomic.AtomicBoolean;
@Component @Component
public class MqttMessageHandler implements SmartLifecycle { public class MqttMessageHandler implements SmartLifecycle {
/** MQTT客户端由MqttConfig配置类注入 */ /**
* MQTTMqttConfig
*/
@Resource @Resource
private MqttClient mqttClient; private MqttClient mqttClient;
/** MQTT消息发送工具类由MqttConfig配置类注入 */ /**
* MQTTMqttConfig
*/
@Resource @Resource
private MqttConfig.MqttMessageSender mqttMessageSender; private MqttConfig.MqttMessageSender mqttMessageSender;
/** Redis模板用于存储订阅关系、设备在线状态、分布式锁 */ /**
* Redis线
*/
@Resource @Resource
private StringRedisTemplate stringRedisTemplate; private StringRedisTemplate stringRedisTemplate;
// 读取配置文件中的默认订阅主题(移除心跳主题) // 读取配置文件中的默认订阅主题(移除心跳主题)
@Value("${spring.mqtt.default-topic:dtu/+/up,frontend/+/control/+}") @Value("${spring.mqtt.default-topic:dtu/+/up,frontend/+/down/+}")
private String defaultTopic; private String defaultTopic;
// 优化统一使用SLF4J日志JDK 8兼容 // 优化统一使用SLF4J日志JDK 8兼容
@ -68,14 +78,16 @@ public class MqttMessageHandler implements SmartLifecycle {
// 新增生命周期管理标识控制MQTT客户端启动/关闭 // 新增生命周期管理标识控制MQTT客户端启动/关闭
private final AtomicBoolean isRunning = new AtomicBoolean(false); private final AtomicBoolean isRunning = new AtomicBoolean(false);
/** MQTT连接配置项从MqttConfig注入 */ /**
* MQTTMqttConfig
*/
@Resource @Resource
private MqttConnectOptions mqttConnectOptions; private MqttConnectOptions mqttConnectOptions;
/** /**
* + * +
* @PostConstructSmartLifecyclestart() * @PostConstructSmartLifecyclestart()
* * <p>
* APahoconnectOptions.setAutomaticReconnect(true) * APahoconnectOptions.setAutomaticReconnect(true)
*/ */
public void subscribeTopics() throws MqttException { public void subscribeTopics() throws MqttException {
@ -104,6 +116,7 @@ public class MqttMessageHandler implements SmartLifecycle {
// 按主题类型设置QoS控制指令/状态用QoS 1 // 按主题类型设置QoS控制指令/状态用QoS 1
for (int i = 0; i < topics.length; i++) { for (int i = 0; i < topics.length; i++) {
qosArray[i] = 1; qosArray[i] = 1;
topics[i] = topics[i].trim();
} }
// 设置MQTT消息回调处理连接断开、消息接收、消息发布完成 // 设置MQTT消息回调处理连接断开、消息接收、消息发布完成
@ -170,7 +183,8 @@ public class MqttMessageHandler implements SmartLifecycle {
/** /**
* \ * \
* *
* @param topic *
* @param topic
* @param payload JSON * @param payload JSON
*/ */
private void handleMessage(String topic, String payload) { private void handleMessage(String topic, String payload) {
@ -182,8 +196,8 @@ public class MqttMessageHandler implements SmartLifecycle {
if (topic.matches("dtu/\\w+/up")) { if (topic.matches("dtu/\\w+/up")) {
handleDeviceStatus(topic, payload); handleDeviceStatus(topic, payload);
} }
// 处理前端控制指令主题frontend/{clientId}/control/{deviceId} // 处理前端控制指令主题frontend/{clientId}/down/{deviceId}
else if (topic.matches("frontend/\\w+/control/\\w+")) { else if (topic.matches("frontend/\\w+/down/\\w+")) {
handleFrontendControl(topic, payload); handleFrontendControl(topic, payload);
} }
} catch (Exception e) { } catch (Exception e) {
@ -196,8 +210,51 @@ public class MqttMessageHandler implements SmartLifecycle {
* *
*/ */
private void handleDeviceStatus(String topic, String payload) throws MqttException { private void handleDeviceStatus(String topic, String payload) throws MqttException {
// 第一步解析JSON非有效JSON直接return
JSONObject payloadObj;
try {
payloadObj = JSON.parseObject(payload);
} catch (Exception e) {
log.error("【设备处理】JSON解析失败payload={}", payload, e);
return;
}
if (payloadObj == null || payloadObj.isEmpty()) {
log.warn("【设备处理】JSON解析后为空payload={}", payload);
return;
}
// 解析设备ID主题格式为dtu/{deviceId}/up分割后第2个元素是设备ID // 解析设备ID主题格式为dtu/{deviceId}/up分割后第2个元素是设备ID
String deviceId = topic.split("/")[1]; String deviceId = topic.split("/")[1];
// 第二步:判断是否为设备回执({"suc":true/false,"prop":{"功能码":指令}}
String funcType = null;
if (payloadObj.containsKey("suc") && payloadObj.containsKey("prop")) {
JSONObject propObj = payloadObj.getJSONObject("prop");
if (propObj != null && !propObj.isEmpty()) {
// 提取prop中的第一个功能码
Map.Entry<String, Object> propEntry = propObj.entrySet().iterator().next();
funcType = propEntry.getKey();
// 释放对应功能的分布式锁
String lockKey = "lock:" + deviceId + ":" + funcType;
Boolean delete = stringRedisTemplate.delete(lockKey);
if (propObj.size() > 1) {
log.warn("【设备回执】prop包含多个功能码仅处理第一个{}", propObj.keySet());
}
log.info("【设备回执】设备{}的{}功能执行完成,已释放锁:{},{}", deviceId, funcType, lockKey, delete);
// 广播回执结果给所有订阅该设备的前端
// String broadcastTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/up";;
// JSONObject ackPayload = new JSONObject();
// ackPayload.put("deviceId", deviceId);
// ackPayload.put("funcType", funcType);
// ackPayload.put("suc", payloadObj.getBooleanValue("suc"));
// ackPayload.put("code", propEntry.getValue());
// mqttMessageSender.publish(broadcastTopic, ackPayload.toJSONString());
}
}
// 非回执消息:正常转发给订阅前端
// if (!isDeviceAck) {
// 查询Redis中订阅该设备的前端列表sub:{deviceId} // 查询Redis中订阅该设备的前端列表sub:{deviceId}
Set<String> subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId); Set<String> subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId);
@ -215,6 +272,7 @@ public class MqttMessageHandler implements SmartLifecycle {
// 优化替换System.out为log.info // 优化替换System.out为log.info
log.info("【设备状态转发】设备{}无订阅前端,跳过转发", deviceId); log.info("【设备状态转发】设备{}无订阅前端,跳过转发", deviceId);
} }
// }
} }
/** /**
@ -232,37 +290,57 @@ public class MqttMessageHandler implements SmartLifecycle {
return; return;
} }
// 解析功能码({"功能码":状态码}格式)
Map<String, Integer> funcCodeMap = null;
try {
funcCodeMap = JSON.parseObject(payload, new TypeReference<Map<String, Integer>>() {
});
} catch (Exception e) {
log.error("【指令处理】功能码解析失败payload={}", payload, e);
// String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId+"/up";
// mqttMessageSender.publish(errorTopic, "{\"msg\":\"指令格式错误\"}");
return;
}
if (funcCodeMap == null || funcCodeMap.isEmpty()) {
// String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId+"/up";
// mqttMessageSender.publish(errorTopic, "{\"msg\":\"功能码不能为空\"}");
log.warn("【指令处理】前端{}操作设备{}失败:功能码为空", clientId, deviceId);
return;
}
// 提取第一个功能码作为锁标识
String funcType = funcCodeMap.keySet().iterator().next();
// 1. 权限校验示例admin开头有全权限 // 1. 权限校验示例admin开头有全权限
if (!checkPermission(clientId, deviceId)) { if (!checkPermission(clientId, deviceId)) {
String errorTopic = "frontend/" + clientId + "/error/" + deviceId; String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId+"/up";
mqttMessageSender.publish(errorTopic, "{\"msg\":\"无设备操作权限\"}"); mqttMessageSender.publish(errorTopic, "{\"msg\":\"无设备操作权限\"}");
// 优化替换System.err为log.warn // 优化替换System.err为log.warn
log.warn("【权限校验】前端{}操作设备{}失败", clientId, deviceId); log.warn("【权限校验】前端{}操作设备{}失败", clientId, deviceId);
return; return;
} }
// 2. 分布式锁:避免多前端并发控制 // 2. 分布式锁:设备ID+功能类型(避免同设备同功能并发控制)
String lockKey = "lock:" + deviceId; String lockKey = "lock:" + deviceId + ":" + funcType;
Boolean lockSuccess = stringRedisTemplate.opsForValue().setIfAbsent( Boolean lockSuccess = stringRedisTemplate.opsForValue().setIfAbsent(
lockKey, clientId, 10, TimeUnit.SECONDS // 优化:显式指定时间单位 lockKey, clientId, 15, TimeUnit.SECONDS // 延长至15秒适配设备回执场景
); );
if (lockSuccess == null || !lockSuccess) { if (lockSuccess == null || !lockSuccess) {
String errorTopic = "frontend/" + clientId + "/error/" + deviceId; String errorTopic = "frontend/" + clientId + "/dtu/" + deviceId+"/up";
mqttMessageSender.publish(errorTopic, "{\"msg\":\"设备忙,请稍后重试\"}"); mqttMessageSender.publish(errorTopic, "{\"msg\":\"设备" + funcType + "功能忙,请稍后重试\"}");
// 优化替换System.err为log.warn // 优化替换System.err为log.warn
log.warn("【分布式锁】前端{}操作设备{}失败", clientId, deviceId); log.warn("【分布式锁】前端{}操作设备{}的{}功能失败", clientId, deviceId, funcType);
return; return;
} }
// 3. 记录日志 // 3. 记录日志
log.info("【指令处理】前端{}于{}控制设备{},指令:{}", log.info("【指令处理】前端{}于{}控制设备{}的{}功能,指令:{}",
clientId, LocalDateTime.now(), deviceId, payload); clientId, LocalDateTime.now(), deviceId, funcType, payload);
// 4. 转发指令到设备 // 4. 转发指令到设备
String deviceTopic = "dtu/" + deviceId + "/control"; String deviceTopic = "dtu/" + deviceId + "/down";
mqttMessageSender.publish(deviceTopic, payload); mqttMessageSender.publish(deviceTopic, payload);
// 优化替换System.out为log.info // 优化替换System.out为log.info
log.info("【指令转发】前端{} → 设备{}", clientId, deviceId); log.info("【指令转发】前端{} → 设备{}的{}功能", clientId, deviceId, funcType);
} }
/** /**
@ -277,11 +355,9 @@ public class MqttMessageHandler implements SmartLifecycle {
*/ */
private boolean checkPermission(String clientId, String deviceId) { private boolean checkPermission(String clientId, String deviceId) {
// 管理员权限clientId以admin_开头 // 管理员权限clientId以admin_开头
if (clientId.startsWith("admin_")) {
return true;
}
// 普通用户权限校验Redis中是否绑定该设备校验Redis中user_device:{clientId}是否包含该设备ID // 普通用户权限校验Redis中是否绑定该设备校验Redis中user_device:{clientId}是否包含该设备ID
return Boolean.TRUE.equals(stringRedisTemplate.opsForSet().isMember("user_device:" + clientId, deviceId)); return Boolean.TRUE;
} }
/** /**
@ -296,6 +372,7 @@ public class MqttMessageHandler implements SmartLifecycle {
// 保存订阅关系到Redis // 保存订阅关系到Redis
stringRedisTemplate.opsForSet().add("sub:" + deviceId, clientId); stringRedisTemplate.opsForSet().add("sub:" + deviceId, clientId);
stringRedisTemplate.opsForSet().add("subc:" + clientId, deviceId); // 新增:反向索引
// 优化替换System.out为log.info // 优化替换System.out为log.info
log.info("【订阅管理】前端{}订阅设备{}成功", clientId, deviceId); log.info("【订阅管理】前端{}订阅设备{}成功", clientId, deviceId);
@ -328,12 +405,14 @@ public class MqttMessageHandler implements SmartLifecycle {
// 从Redis删除订阅关系 // 从Redis删除订阅关系
stringRedisTemplate.opsForSet().remove("sub:" + deviceId, clientId); stringRedisTemplate.opsForSet().remove("sub:" + deviceId, clientId);
stringRedisTemplate.opsForSet().remove("subc:" + clientId, deviceId); // 新增:反向索引
// 优化替换System.out为log.info // 优化替换System.out为log.info
log.info("【前端取消订阅】前端{}取消订阅设备{}成功", clientId, deviceId); log.info("【前端取消订阅】前端{}取消订阅设备{}成功", clientId, deviceId);
} }
/** /**
* clientId * clientId
*
* @param clientId wx_123 * @param clientId wx_123
* @return MQTT * @return MQTT
*/ */
@ -353,28 +432,24 @@ public class MqttMessageHandler implements SmartLifecycle {
} }
// 步骤1查询该前端订阅的所有设备ID生产环境用Scan替代Keys避免阻塞Redis // 步骤1查询该前端订阅的所有设备ID生产环境用Scan替代Keys避免阻塞Redis
Set<String> subKeys = scanRedisKeys("sub:*"); Set<String> deviceSet = stringRedisTemplate.opsForSet().members("subc:" + clientId);
List<String> deviceIds = new ArrayList<>(); if (deviceSet == null || deviceSet.isEmpty()) {
return Collections.emptyList();
}
List<String> frontendTopics = new ArrayList<>(); List<String> frontendTopics = new ArrayList<>();
if (subKeys != null && !subKeys.isEmpty()) { for (String deviceId : deviceSet) {
for (String subKey : subKeys) { String subKey = "sub:" + deviceId;
// 检查该sub:{deviceId}集合中是否包含当前clientId stringRedisTemplate.opsForSet().remove(subKey, clientId);
Boolean isMember = stringRedisTemplate.opsForSet().isMember(subKey, clientId); String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/up";
if (Boolean.TRUE.equals(isMember)) { frontendTopics.add(frontendTopic);
// 解析设备IDsub:1001 → 1001 log.info("【批量取消】前端{}取消设备{}订阅", clientId, deviceId);
String deviceId = subKey.split(":")[1];
deviceIds.add(deviceId);
// 构建前端需要取消的MQTT主题
String frontendTopic = "frontend/" + clientId + "/dtu/" + deviceId + "/up";
frontendTopics.add(frontendTopic);
// 从该设备的订阅列表中移除clientId
stringRedisTemplate.opsForSet().remove(subKey, clientId);
log.info("【批量取消】前端{}取消设备{}订阅", clientId, deviceId);
}
}
} }
// 删除反向索引
stringRedisTemplate.delete("subc:" + clientId);
// 步骤2清理该前端的分布式锁可选防止死锁 // 步骤2清理该前端的分布式锁可选防止死锁
Set<String> lockKeys = scanRedisKeys("lock:*"); Set<String> lockKeys = scanRedisKeys("lock:*");
if (lockKeys != null && !lockKeys.isEmpty()) { if (lockKeys != null && !lockKeys.isEmpty()) {
@ -387,7 +462,7 @@ public class MqttMessageHandler implements SmartLifecycle {
} }
} }
log.info("【批量取消】前端{}共取消{}个设备订阅", clientId, deviceIds.size()); log.info("【批量取消】前端{}共取消{}个设备订阅", clientId, deviceSet.size());
return frontendTopics; return frontendTopics;
} }
@ -418,10 +493,11 @@ public class MqttMessageHandler implements SmartLifecycle {
} }
// ========== 手动重连接口供Controller调用 ========== // ========== 手动重连接口供Controller调用 ==========
/** /**
* MQTTclientclient * MQTTclientclient
*/ */
public String manualReconnect() { public synchronized String manualReconnect() {
isRunning.set(true); isRunning.set(true);
try { try {
// 强制断开旧连接(如果存在) // 强制断开旧连接(如果存在)
@ -448,6 +524,7 @@ public class MqttMessageHandler implements SmartLifecycle {
} }
// ======================== SmartLifecycle 生命周期管理(核心修复) ======================== // ======================== SmartLifecycle 生命周期管理(核心修复) ========================
/** /**
* MQTTSpring/ * MQTTSpring/
* @PostConstructMQTT * @PostConstructMQTT