feasure
xce 2026-01-16 01:55:40 +08:00
parent 6e9e54bc41
commit 913365927f
3 changed files with 133 additions and 288 deletions

View File

@ -1,21 +1,14 @@
package com.agri.web.controller.mqtt;
import com.agri.framework.interceptor.MqttMessageHandler;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.util.Map;
import java.util.List;
/**
* @Auther: jone
* @Date: 2026/1/15 - 01 - 15 - 23:45
* @Description: com.agri.web.controller.mqtt
* @version: 1.0
* MQTT
*/
@RestController
@RequestMapping("/api/mqtt")
public class MqttController {
@ -24,30 +17,29 @@ public class MqttController {
private MqttMessageHandler mqttMessageHandler;
/**
*
*
*/
@PostMapping("/subscribe")
public String subscribe(@RequestBody Map<String, String> params) {
String clientId = params.get("clientId");
String deviceId = params.get("deviceId");
if (clientId == null || deviceId == null) {
return "参数错误";
}
@PostMapping("/single")
public String subscribe(@RequestParam String clientId, @RequestParam String deviceId) {
mqttMessageHandler.subscribeDevice(clientId, deviceId);
return "订阅成功";
}
/**
*
*
*/
@PostMapping("/unsubscribe")
public String unsubscribe(@RequestBody Map<String, String> params) {
String clientId = params.get("clientId");
String deviceId = params.get("deviceId");
if (clientId == null || deviceId == null) {
return "参数错误";
}
@DeleteMapping("/single")
public String unsubscribe(@RequestParam String clientId, @RequestParam String deviceId) {
mqttMessageHandler.unsubscribeDevice(clientId, deviceId);
return "取消订阅成功";
}
/**
*
*/
@DeleteMapping("/batch")
public List<String> unsubscribeAll(@RequestParam String clientId) {
// 返回前端需要取消的MQTT主题列表
return mqttMessageHandler.unsubscribeAllDevice(clientId);
}
}

View File

@ -5,6 +5,12 @@ spring:
port: 6379
password: lld123
database: 1
lettuce:
pool:
max-active: 8
max-idle: 8
min-idle: 2
max-wait: 10000ms
# MQTT配置
mqtt:
host: tcp://122.51.109.52:1883 # 设备/后端的MQTT TCP地址
@ -12,7 +18,7 @@ spring:
username: admin # Mosquitto共用账号
password: Admin#12345678 # Mosquitto密码
client-id: springboot-backend-${random.uuid} # 后端客户端ID唯一
default-topic: device/+/status,device/+/heartbeat,frontend/+/control/+ # 后端监听的主题
default-topic: dtu/+/up,frontend/+/control/+ # 后端监听的主题
qos: 1 # 消息可靠性
timeout: 60 # 连接超时
keep-alive: 60 # 心跳间隔

View File

@ -9,27 +9,23 @@ import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* MQTT
* MQTT
*
* 1. /
* 2.
* 3. 线4G
* 4. ++
* 5. 线
*
* @Auther: lld
* @Date: 2026/1/15 - 01 - 15 - 23:43
* @version: 1.0
* 1.
* 2. -Redis
* 3.
* 4. ++
* JDK 8
*/
@Component
public class MqttMessageHandler {
@ -46,34 +42,22 @@ public class MqttMessageHandler {
@Resource
private StringRedisTemplate stringRedisTemplate;
/** 4G平台API地址用于上报设备在线/离线状态 */
private static final String FOUR_G_API = "http://你的4G平台IP/api/device/status";
/** 心跳超时时间(秒):设备超过该时间未发心跳则判定为离线 */
private static final long HEARTBEAT_TIMEOUT = 60;
@Value("${spring.mqtt.default-topic}")
// 读取配置文件中的默认订阅主题(移除心跳主题)
@Value("${spring.mqtt.default-topic:device/+/status,frontend/+/control/+}")
private String defaultTopic;
/**
*
* 1. MQTT
* 2.
* 3. 线线
*
* @throws MqttException MQTT
* +
*/
@PostConstruct
public void subscribeTopics() throws MqttException {
// 定义需要监听的MQTT主题数组
// device/+/status所有设备的业务状态温湿度、开关等
// device/+/heartbeat所有设备的心跳包用于判定在线状态
// frontend/+/control/+:所有前端发送的设备控制指令
// 解析配置文件中的主题列表(逗号分隔)
// 解析配置的主题列表
String[] topics = defaultTopic.split(",");
// 对应主题的QoS级别所有主题使用相同QoS也可自定义多QoS配置
int[] qos = new int[topics.length];
// 所有主题QoS=1
Arrays.fill(qos, 1);
int[] qosArray = new int[topics.length];
// 按主题类型设置QoS控制指令/状态用QoS 1
for (int i = 0; i < topics.length; i++) {
qosArray[i] = 1;
}
// 设置MQTT消息回调处理连接断开、消息接收、消息发布完成
mqttClient.setCallback(new MqttCallback() {
@ -104,44 +88,30 @@ public class MqttMessageHandler {
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// 可添加消息发布成功的日志
// System.out.println("消息发布完成:" + token.getTopics()[0]);
// QoS 1确认消息已被服务端接收
System.out.println("【MQTT确认】消息发布完成主题" + token.getTopics()[0]);
}
});
// 订阅主题:阻塞式操作,订阅成功后继续执行
mqttClient.subscribe(topics, qos);
System.out.println("【MQTT初始化】核心主题订阅成功订阅列表" + String.join(",", topics));
// 启动离线设备检查线程:独立线程,避免阻塞主线程
// 线程名offline-check-thread便于日志排查
new Thread(new Runnable() {
@Override
public void run() {
checkOfflineDevice();
}
}, "offline-check-thread").start();
System.out.println("【MQTT初始化】离线设备检查线程已启动");
// 订阅主题
mqttClient.subscribe(topics, qosArray);
System.out.println("【MQTT初始化】订阅主题" + String.join(",", topics));
}
/**
*
* \
* @param topic
* @param payload JSON
*/
private void handleMessage(String topic, String payload) {
try {
System.out.println("【MQTT消息接收】topic=" + topic + ", payload=" + payload);
System.out.println("【MQTT接收】topic=" + topic + ", payload=" + payload);
// 1. 处理设备业务状态主题device/{deviceId}/status
// 设备状态主题device/{deviceId}/status
if (topic.matches("device/\\w+/status")) {
handleDeviceStatus(topic, payload);
}
// 2. 处理设备心跳主题device/{deviceId}/heartbeat
else if (topic.matches("device/\\w+/heartbeat")) {
handleDeviceHeartbeat(topic, payload);
}
// 3. 处理前端控制指令主题frontend/{clientId}/control/{deviceId}
// 处理前端控制指令主题frontend/{clientId}/control/{deviceId}
else if (topic.matches("frontend/\\w+/control/\\w+")) {
handleFrontendControl(topic, payload);
}
@ -152,38 +122,21 @@ public class MqttMessageHandler {
}
/**
*
*
* 1. ID
* 2. 线
* 3.
* 4.
*
* @param topic device/{deviceId}/status
* @param payload JSON
* @throws MqttException
*
*/
private void handleDeviceStatus(String topic, String payload) throws MqttException {
// 解析设备ID主题格式为device/{deviceId}/status分割后第2个元素是设备ID
String deviceId = topic.split("/")[1];
// 补充设备在线状态到payload兼容JSON格式
// 从Redis获取设备在线状态device:online:{deviceId} → true/false
String onlineStatus = stringRedisTemplate.opsForValue().get("device:online:" + deviceId);
// 若Redis中无记录默认离线
String finalOnlineStatus = (onlineStatus == null) ? "false" : onlineStatus;
// 拼接在线状态到JSON末尾兼容无空格的JSON格式
String newPayload = payload.replace("}", ",\"online\":\"" + finalOnlineStatus + "\"}");
// 查询Redis中订阅该设备的前端clientId列表sub:{deviceId} → Set<String>
// 查询Redis中订阅该设备的前端列表sub:{deviceId}
Set<String> subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId);
if (subscribedClients != null && !subscribedClients.isEmpty()) {
// 遍历所有订阅的前端,推送消息到前端专属主题
// 推送给每个订阅的前端
for (String clientId : subscribedClients) {
// 前端专属主题frontend/{clientId}/device/{deviceId}/status
String frontendTopic = "frontend/" + clientId + "/device/" + deviceId + "/status";
// 发布消息(保留最新消息,前端订阅后可立即获取)
mqttMessageSender.publish(frontendTopic, newPayload);
// 发布消息
mqttMessageSender.publish(frontendTopic, payload);
System.out.println("【设备状态转发】设备" + deviceId + " → 前端" + clientId + ",主题:" + frontendTopic);
}
} else {
@ -192,109 +145,44 @@ public class MqttMessageHandler {
}
/**
*
*
* 1. ID
* 2. Redis线
* 3. 线4GMQTT
*
* @param topic device/{deviceId}/heartbeat
* @param payload JSONtimestamp
*/
private void handleDeviceHeartbeat(String topic, String payload) {
// 解析设备ID主题格式为device/{deviceId}/heartbeat分割后第2个元素是设备ID
String deviceId = topic.split("/")[1];
// 获取当前时间戳(秒)
long currentTime = System.currentTimeMillis() / 1000;
// 更新Redis存储最后心跳时间 → device:last_heartbeat:{deviceId}
stringRedisTemplate.opsForValue().set("device:last_heartbeat:" + deviceId, String.valueOf(currentTime));
// 更新Redis存储在线状态设置过期时间心跳超时+10秒避免Redis数据堆积
stringRedisTemplate.opsForValue().set(
"device:online:" + deviceId,
"true",
HEARTBEAT_TIMEOUT + 10,
TimeUnit.SECONDS
);
// 异步上报4G平台使用独立线程避免阻塞MQTT消息处理线程
new Thread(new Runnable() {
@Override
public void run() {
try {
// 构造上报4G平台的JSON数据
String statusJson = String.format(
"{\"device_id\":\"%s\",\"online\":true,\"timestamp\":%d}",
deviceId, currentTime
);
// 调用4G平台APIPOST请求
RestTemplate restTemplate = new RestTemplate();
String response = restTemplate.postForObject(FOUR_G_API, statusJson, String.class);
System.out.println("【4G平台上报】设备" + deviceId + "在线状态上报成功,响应:" + response);
} catch (Exception e) {
System.err.println("【4G平台上报】设备" + deviceId + "在线状态上报失败,异常:" + e.getMessage());
}
}
}).start();
}
/**
*
*
* 1. clientIdID
* 2.
* 3.
* 4.
* 5.
*
* @param topic frontend/{clientId}/control/{deviceId}
* @param payload JSON
* @throws MqttException
* ++
*/
private void handleFrontendControl(String topic, String payload) throws MqttException {
// 解析主题frontend/{clientId}/control/{deviceId}
// 解析前端clientId、设备ID
String[] parts = topic.split("/");
String clientId = parts[1]; // 前端唯一标识
String deviceId = parts[3]; // 目标设备ID
String clientId = parts[1];
String deviceId = parts[3];
// 1. 权限校验:失败则推送错误消息给前端
// 1. 权限校验示例admin开头有全权限
if (!checkPermission(clientId, deviceId)) {
String errorTopic = "frontend/" + clientId + "/error/" + deviceId;
mqttMessageSender.publish(errorTopic, "{\"msg\":\"无设备操作权限\"}");
System.err.println("【前端指令处理】前端" + clientId + "操作设备" + deviceId + "权限校验失败");
System.err.println("【权限校验】前端" + clientId + "操作设备" + deviceId + "失败");
return;
}
// 2. 分布式锁:lock:{deviceId}过期时间10秒避免死锁
// 2. 分布式锁:避免多前端并发控制
String lockKey = "lock:" + deviceId;
Boolean lockSuccess = stringRedisTemplate.opsForValue().setIfAbsent(
lockKey,
clientId,
10,
TimeUnit.SECONDS
lockKey, clientId, 10, TimeUnit.SECONDS
);
// 锁获取失败:设备忙,推送错误消息给前端
if (lockSuccess == null || !lockSuccess) {
String errorTopic = "frontend/" + clientId + "/error/" + deviceId;
mqttMessageSender.publish(errorTopic, "{\"msg\":\"设备忙,请稍后重试\"}");
System.err.println("【前端指令处理】前端" + clientId + "操作设备" + deviceId + "获取锁失败(设备忙)");
System.err.println("【分布式锁】前端" + clientId + "操作设备" + deviceId + "失败");
return;
}
// 3. 记录操作日志(示例:可替换为数据库存储)
// 3. 记录日志
System.out.println(String.format(
"【前端指令处理】前端%s于%s控制设备%s指令%s",
"【指令处理】前端%s于%s控制设备%s指令%s",
clientId, LocalDateTime.now(), deviceId, payload
));
// 4. 转发指令到设备专属主题device/{deviceId}/control
// 4. 转发指令到设备
String deviceTopic = "device/" + deviceId + "/control";
mqttMessageSender.publish(deviceTopic, payload);
System.out.println("【前端指令转发】前端" + clientId + " → 设备" + deviceId + ",主题:" + deviceTopic);
System.out.println("【指令转发】前端" + clientId + " → 设备" + deviceId);
}
/**
@ -312,117 +200,27 @@ public class MqttMessageHandler {
if (clientId.startsWith("admin_")) {
return true;
}
// 普通用户权限校验Redis中是否绑定该设备
return Boolean.TRUE.equals(stringRedisTemplate.opsForSet().isMember("user_device:" + clientId, deviceId));
// 普通用户权限校验Redis中是否绑定该设备校验Redis中user_device:{clientId}是否包含该设备ID
return stringRedisTemplate.opsForSet().isMember("user_device:" + clientId, deviceId);
}
/**
* 线10
*
* 1.
* 2. 线 - >
* 3. Redis线线
* 4. 线4G
*/
private void checkOfflineDevice() {
while (true) {
try {
// 获取Redis中所有设备的最后心跳记录device:last_heartbeat:*
Set<String> heartbeatKeys = stringRedisTemplate.keys("device:last_heartbeat:*");
// 无设备心跳记录休眠10秒后继续
if (heartbeatKeys == null || heartbeatKeys.isEmpty()) {
Thread.sleep(10000);
continue;
}
// 当前时间戳(秒)
long currentTime = System.currentTimeMillis() / 1000;
// 遍历所有设备心跳记录
for (String key : heartbeatKeys) {
// 解析设备IDkey格式为device:last_heartbeat:{deviceId}
String deviceId = key.split(":")[2];
// 获取最后心跳时间
String lastHeartbeatStr = stringRedisTemplate.opsForValue().get(key);
// 无心跳记录,跳过
if (lastHeartbeatStr == null) {
continue;
}
// 转换为长整型
long lastHeartbeat = Long.parseLong(lastHeartbeatStr);
// 判定离线:超过心跳超时时间未发心跳
if (currentTime - lastHeartbeat > HEARTBEAT_TIMEOUT) {
// 更新Redis在线状态为离线
stringRedisTemplate.opsForValue().set("device:online:" + deviceId, "false");
// 构造上报4G平台的JSON数据
String statusJson = String.format(
"{\"device_id\":\"%s\",\"online\":false,\"timestamp\":%d}",
deviceId, currentTime
);
// 调用4G平台API上报离线状态
RestTemplate restTemplate = new RestTemplate();
restTemplate.postForObject(FOUR_G_API, statusJson, String.class);
System.out.println("【离线设备检查】设备" + deviceId + "判定为离线已上报4G平台");
}
}
// 每10秒检查一次
Thread.sleep(10000);
} catch (InterruptedException e) {
// 线程中断,退出循环
System.err.println("【离线设备检查】线程被中断,停止检查:" + e.getMessage());
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
System.err.println("【离线设备检查】异常:" + e.getMessage());
// 异常时休眠10秒避免无限循环报错
try {
Thread.sleep(10000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}
/**
* Controller
*
* 1. clientId
* 2.
*
* @param clientId
* @param deviceId ID
* Controller
*/
public void subscribeDevice(String clientId, String deviceId) {
// 将前端clientId添加到设备的订阅列表sub:{deviceId}
// 保存订阅关系到Redis
stringRedisTemplate.opsForSet().add("sub:" + deviceId, clientId);
System.out.println("【前端订阅】前端" + clientId + "订阅设备" + deviceId + "成功");
System.out.println("【订阅管理】前端" + clientId + "订阅设备" + deviceId + "成功");
// 推送设备最新状态给前端(立即)
try {
// 从Redis获取设备最新状态device:latest:{deviceId}
String latestStatus = stringRedisTemplate.opsForValue().get("device:latest:" + deviceId);
if (latestStatus != null) {
// 前端专属主题
String frontendTopic = "frontend/" + clientId + "/device/" + deviceId + "/status";
// 推送设备最新状态(可选)
String latestStatus = stringRedisTemplate.opsForValue().get("device:latest:" + deviceId);
if (latestStatus != null) {
String frontendTopic = "frontend/" + clientId + "/device/" + deviceId + "/status";
try {
mqttMessageSender.publish(frontendTopic, latestStatus);
System.out.println("【前端订阅】推送设备" + deviceId + "最新状态给前端" + clientId);
} else {
System.out.println("【前端订阅】设备" + deviceId + "无最新状态,跳过推送");
} catch (MqttException e) {
System.err.println("【订阅推送】设备" + deviceId + "状态推送失败:" + e.getMessage());
}
} catch (MqttException e) {
System.err.println("【前端订阅】推送设备" + deviceId + "状态给前端" + clientId + "失败:" + e.getMessage());
e.printStackTrace();
}
}
@ -434,8 +232,57 @@ public class MqttMessageHandler {
* @param deviceId ID
*/
public void unsubscribeDevice(String clientId, String deviceId) {
// 从设备订阅列表移除前端clientId
// 从Redis删除订阅关系
stringRedisTemplate.opsForSet().remove("sub:" + deviceId, clientId);
System.out.println("【前端取消订阅】前端" + clientId + "取消订阅设备" + deviceId + "成功");
}
/**
* clientId
* @param clientId wx_123
* @return MQTT
*/
public List<String> unsubscribeAllDevice(String clientId) {
// 步骤1查询该前端订阅的所有设备IDRedis中所有sub:*集合中包含该clientId的key
// 注意生产环境建议用scan代替keys避免阻塞Redis
Set<String> subKeys = stringRedisTemplate.keys("sub:*");
List<String> deviceIds = new ArrayList<>();
List<String> frontendTopics = new ArrayList<>();
if (subKeys != null && !subKeys.isEmpty()) {
for (String subKey : subKeys) {
// 检查该sub:{deviceId}集合中是否包含当前clientId
Boolean isMember = stringRedisTemplate.opsForSet().isMember(subKey, clientId);
if (Boolean.TRUE.equals(isMember)) {
// 解析设备IDsub:1001 → 1001
String deviceId = subKey.split(":")[1];
deviceIds.add(deviceId);
// 构建前端需要取消的MQTT主题
String frontendTopic = "frontend/" + clientId + "/device/" + deviceId + "/status";
frontendTopics.add(frontendTopic);
// 从该设备的订阅列表中移除clientId
stringRedisTemplate.opsForSet().remove(subKey, clientId);
System.out.println("【批量取消】前端" + clientId + "取消设备" + deviceId + "订阅");
}
}
}
// 步骤2清理该前端的分布式锁可选防止死锁
Set<String> lockKeys = stringRedisTemplate.keys("lock:*");
if (lockKeys != null && !lockKeys.isEmpty()) {
for (String lockKey : lockKeys) {
String lockValue = stringRedisTemplate.opsForValue().get(lockKey);
if (clientId.equals(lockValue)) {
stringRedisTemplate.delete(lockKey);
System.out.println("【批量取消】清理前端" + clientId + "持有的锁:" + lockKey);
}
}
}
System.out.println("【批量取消】前端" + clientId + "共取消" + deviceIds.size() + "个设备订阅");
return frontendTopics;
}
}