feasure
parent
0201c71a56
commit
6e9e54bc41
|
|
@ -0,0 +1,53 @@
|
||||||
|
package com.agri.web.controller.mqtt;
|
||||||
|
|
||||||
|
import com.agri.framework.interceptor.MqttMessageHandler;
|
||||||
|
import org.springframework.web.bind.annotation.PostMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RequestBody;
|
||||||
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @Auther: jone
|
||||||
|
* @Date: 2026/1/15 - 01 - 15 - 23:45
|
||||||
|
* @Description: com.agri.web.controller.mqtt
|
||||||
|
* @version: 1.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
@RestController
|
||||||
|
@RequestMapping("/api/mqtt")
|
||||||
|
public class MqttController {
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private MqttMessageHandler mqttMessageHandler;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 前端订阅设备状态
|
||||||
|
*/
|
||||||
|
@PostMapping("/subscribe")
|
||||||
|
public String subscribe(@RequestBody Map<String, String> params) {
|
||||||
|
String clientId = params.get("clientId");
|
||||||
|
String deviceId = params.get("deviceId");
|
||||||
|
if (clientId == null || deviceId == null) {
|
||||||
|
return "参数错误";
|
||||||
|
}
|
||||||
|
mqttMessageHandler.subscribeDevice(clientId, deviceId);
|
||||||
|
return "订阅成功";
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 前端取消订阅设备状态
|
||||||
|
*/
|
||||||
|
@PostMapping("/unsubscribe")
|
||||||
|
public String unsubscribe(@RequestBody Map<String, String> params) {
|
||||||
|
String clientId = params.get("clientId");
|
||||||
|
String deviceId = params.get("deviceId");
|
||||||
|
if (clientId == null || deviceId == null) {
|
||||||
|
return "参数错误";
|
||||||
|
}
|
||||||
|
mqttMessageHandler.unsubscribeDevice(clientId, deviceId);
|
||||||
|
return "取消订阅成功";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,18 @@
|
||||||
|
spring:
|
||||||
|
# Redis配置(分布式锁/订阅关系)
|
||||||
|
redis:
|
||||||
|
host: 122.51.109.52
|
||||||
|
port: 6379
|
||||||
|
password: lld123
|
||||||
|
database: 1
|
||||||
|
# MQTT配置
|
||||||
|
mqtt:
|
||||||
|
host: tcp://122.51.109.52:1883 # 设备/后端的MQTT TCP地址
|
||||||
|
ws-host: wss://mq.xiaoces.com/mqtt # 前端的WebSocket地址
|
||||||
|
username: admin # Mosquitto共用账号
|
||||||
|
password: Admin#12345678 # Mosquitto密码
|
||||||
|
client-id: springboot-backend-${random.uuid} # 后端客户端ID(唯一)
|
||||||
|
default-topic: device/+/status,device/+/heartbeat,frontend/+/control/+ # 后端监听的主题
|
||||||
|
qos: 1 # 消息可靠性
|
||||||
|
timeout: 60 # 连接超时
|
||||||
|
keep-alive: 60 # 心跳间隔
|
||||||
|
|
@ -61,7 +61,7 @@ spring:
|
||||||
# 国际化资源文件路径
|
# 国际化资源文件路径
|
||||||
basename: i18n/messages
|
basename: i18n/messages
|
||||||
profiles:
|
profiles:
|
||||||
active: druid
|
active: druid,mqtt
|
||||||
# 文件上传
|
# 文件上传
|
||||||
servlet:
|
servlet:
|
||||||
multipart:
|
multipart:
|
||||||
|
|
|
||||||
|
|
@ -130,7 +130,28 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.baomidou</groupId>
|
<groupId>com.baomidou</groupId>
|
||||||
<artifactId>mybatis-plus-boot-starter</artifactId>
|
<artifactId>mybatis-plus-boot-starter</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<!-- MQTT客户端依赖 -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.eclipse.paho</groupId>
|
||||||
|
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
|
||||||
|
<version>1.2.5</version>
|
||||||
|
</dependency>
|
||||||
|
<!-- SpringBoot整合MQTT(可选,简化配置) -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.integration</groupId>
|
||||||
|
<artifactId>spring-integration-mqtt</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<!-- Redis(分布式锁/订阅关系) -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-starter-data-redis</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<!-- HTTP客户端(上报4G平台) -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-starter-web</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,179 @@
|
||||||
|
package com.agri.framework.config;
|
||||||
|
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||||
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MQTT核心配置类
|
||||||
|
* 功能:
|
||||||
|
* 1. 初始化MQTT客户端连接(对接Mosquitto)
|
||||||
|
* 2. 提供MQTT消息发送工具类
|
||||||
|
* 3. 配置连接参数(账号、密码、重连、保活等)
|
||||||
|
* 适配JDK 8语法,兼容SpringBoot环境
|
||||||
|
*
|
||||||
|
* @Author: jone
|
||||||
|
* @Date: 2026/1/16
|
||||||
|
* @Version: 1.0
|
||||||
|
*/
|
||||||
|
@Configuration
|
||||||
|
public class MqttConfig {
|
||||||
|
|
||||||
|
/** Mosquitto服务器地址(TCP协议):格式为 tcp://IP:端口 */
|
||||||
|
@Value("${spring.mqtt.host}")
|
||||||
|
private String host;
|
||||||
|
|
||||||
|
/** MQTT认证用户名(Mosquitto配置的共用账号) */
|
||||||
|
@Value("${spring.mqtt.username}")
|
||||||
|
private String username;
|
||||||
|
|
||||||
|
/** MQTT认证密码(Mosquitto配置的共用密码) */
|
||||||
|
@Value("${spring.mqtt.password}")
|
||||||
|
private String password;
|
||||||
|
|
||||||
|
/** 后端MQTT客户端ID:需唯一,避免重复连接 */
|
||||||
|
@Value("${spring.mqtt.client-id}")
|
||||||
|
private String clientId;
|
||||||
|
|
||||||
|
/** MQTT消息QoS级别:1=至少一次送达,0=最多一次,2=恰好一次 */
|
||||||
|
@Value("${spring.mqtt.qos:1}")
|
||||||
|
private int qos;
|
||||||
|
|
||||||
|
/** MQTT连接超时时间(秒):超过该时间未连接成功则判定为失败 */
|
||||||
|
@Value("${spring.mqtt.timeout:60}")
|
||||||
|
private int timeout;
|
||||||
|
|
||||||
|
/** MQTT保活间隔(秒):客户端定期发送心跳给服务端,维持连接 */
|
||||||
|
@Value("${spring.mqtt.keep-alive:60}")
|
||||||
|
private int keepAlive;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建MQTT客户端实例(Spring Bean)
|
||||||
|
* 核心逻辑:
|
||||||
|
* 1. 配置连接参数(账号、密码、重连、保活等)
|
||||||
|
* 2. 初始化客户端并建立连接
|
||||||
|
* 3. 返回客户端实例供其他类注入使用
|
||||||
|
*
|
||||||
|
* @return MqttClient MQTT客户端实例
|
||||||
|
* @throws MqttException MQTT连接/初始化异常
|
||||||
|
*/
|
||||||
|
@Bean
|
||||||
|
public MqttClient mqttClient() throws MqttException {
|
||||||
|
// 1. 初始化连接配置项
|
||||||
|
MqttConnectOptions connectOptions = getMqttConnectOptions();
|
||||||
|
|
||||||
|
// 2. 初始化MQTT客户端
|
||||||
|
// MemoryPersistence:使用内存存储会话,不持久化到磁盘(适合后端服务)
|
||||||
|
MemoryPersistence persistence = new MemoryPersistence();
|
||||||
|
MqttClient mqttClient = new MqttClient(host, clientId, persistence);
|
||||||
|
|
||||||
|
// 3. 建立MQTT连接
|
||||||
|
if (!mqttClient.isConnected()) {
|
||||||
|
mqttClient.connect(connectOptions);
|
||||||
|
System.out.println("【MQTT连接成功】服务器地址:" + host + ",客户端ID:" + clientId);
|
||||||
|
} else {
|
||||||
|
System.out.println("【MQTT连接状态】已连接,无需重复初始化");
|
||||||
|
}
|
||||||
|
|
||||||
|
return mqttClient;
|
||||||
|
}
|
||||||
|
|
||||||
|
private MqttConnectOptions getMqttConnectOptions() {
|
||||||
|
MqttConnectOptions connectOptions = new MqttConnectOptions();
|
||||||
|
// 设置MQTT认证账号
|
||||||
|
connectOptions.setUserName(username);
|
||||||
|
// 设置MQTT认证密码(转换为字符数组,符合API要求)
|
||||||
|
connectOptions.setPassword(password.toCharArray());
|
||||||
|
// 设置连接超时时间(秒)
|
||||||
|
connectOptions.setConnectionTimeout(timeout);
|
||||||
|
// 设置保活间隔(秒):客户端每隔该时间发送一次心跳
|
||||||
|
connectOptions.setKeepAliveInterval(keepAlive);
|
||||||
|
// 关闭清除会话:false=重连后保留订阅关系(若不需要离线消息可设为true)
|
||||||
|
connectOptions.setCleanSession(true);
|
||||||
|
// 开启自动重连:连接断开后自动尝试重连,提升稳定性
|
||||||
|
connectOptions.setAutomaticReconnect(true);
|
||||||
|
// 设置最大重连间隔(秒):避免频繁重连消耗资源
|
||||||
|
connectOptions.setMaxReconnectDelay(30);
|
||||||
|
return connectOptions;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建MQTT消息发送工具类(Spring Bean)
|
||||||
|
* 封装消息发布逻辑,供业务层调用
|
||||||
|
*
|
||||||
|
* @param mqttClient MQTT客户端实例(自动注入)
|
||||||
|
* @return MqttMessageSender 消息发送工具类
|
||||||
|
*/
|
||||||
|
@Bean
|
||||||
|
public MqttMessageSender mqttMessageSender(MqttClient mqttClient) {
|
||||||
|
return new MqttMessageSender(mqttClient, qos);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MQTT消息发送工具类(内部类)
|
||||||
|
* 封装消息发布的核心逻辑,简化业务层调用
|
||||||
|
*/
|
||||||
|
public static class MqttMessageSender {
|
||||||
|
/** MQTT客户端实例 */
|
||||||
|
private final MqttClient client;
|
||||||
|
/** 默认QoS级别 */
|
||||||
|
private final int defaultQos;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 构造方法
|
||||||
|
* @param client MQTT客户端实例
|
||||||
|
* @param defaultQos 默认QoS级别
|
||||||
|
*/
|
||||||
|
public MqttMessageSender(MqttClient client, int defaultQos) {
|
||||||
|
this.client = client;
|
||||||
|
this.defaultQos = defaultQos;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发布MQTT消息(使用默认QoS)
|
||||||
|
* @param topic 消息主题
|
||||||
|
* @param payload 消息内容(JSON字符串)
|
||||||
|
* @throws MqttException 消息发布异常
|
||||||
|
*/
|
||||||
|
public void publish(String topic, String payload) throws MqttException {
|
||||||
|
publish(topic, payload, defaultQos);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发布MQTT消息(自定义QoS)
|
||||||
|
* 核心逻辑:
|
||||||
|
* 1. 校验客户端连接状态
|
||||||
|
* 2. 构建MQTT消息对象
|
||||||
|
* 3. 发布消息到指定主题
|
||||||
|
*
|
||||||
|
* @param topic 消息主题
|
||||||
|
* @param payload 消息内容(JSON字符串)
|
||||||
|
* @param qos 消息QoS级别
|
||||||
|
* @throws MqttException 消息发布异常(连接断开、主题无效等)
|
||||||
|
*/
|
||||||
|
public void publish(String topic, String payload, int qos) throws MqttException {
|
||||||
|
// 1. 校验客户端是否已连接
|
||||||
|
if (!client.isConnected()) {
|
||||||
|
throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. 构建MQTT消息对象
|
||||||
|
MqttMessage message = new MqttMessage();
|
||||||
|
// 设置消息内容(转换为字节数组)
|
||||||
|
message.setPayload(payload.getBytes());
|
||||||
|
// 设置QoS级别
|
||||||
|
message.setQos(qos);
|
||||||
|
// 设置保留消息:true=服务端保留该主题的最新消息,新订阅者可立即获取
|
||||||
|
message.setRetained(true);
|
||||||
|
|
||||||
|
// 3. 发布消息
|
||||||
|
client.publish(topic, message);
|
||||||
|
System.out.println("【MQTT消息发布成功】主题:" + topic + ",内容:" + payload);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,441 @@
|
||||||
|
package com.agri.framework.interceptor;
|
||||||
|
|
||||||
|
import com.agri.framework.config.MqttConfig;
|
||||||
|
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttCallback;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.web.client.RestTemplate;
|
||||||
|
|
||||||
|
import javax.annotation.PostConstruct;
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MQTT消息处理器
|
||||||
|
* 核心功能:
|
||||||
|
* 1. 订阅设备状态/心跳、前端控制指令主题
|
||||||
|
* 2. 转发设备状态到订阅的前端
|
||||||
|
* 3. 处理设备心跳,维护在线状态并上报4G平台
|
||||||
|
* 4. 处理前端控制指令,权限校验+分布式锁+转发给设备
|
||||||
|
* 5. 定时检查离线设备,更新状态并上报
|
||||||
|
*
|
||||||
|
* @Auther: lld
|
||||||
|
* @Date: 2026/1/15 - 01 - 15 - 23:43
|
||||||
|
* @version: 1.0
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class MqttMessageHandler {
|
||||||
|
|
||||||
|
/** MQTT客户端(由MqttConfig配置类注入) */
|
||||||
|
@Resource
|
||||||
|
private MqttClient mqttClient;
|
||||||
|
|
||||||
|
/** MQTT消息发送工具类(由MqttConfig配置类注入) */
|
||||||
|
@Resource
|
||||||
|
private MqttConfig.MqttMessageSender mqttMessageSender;
|
||||||
|
|
||||||
|
/** Redis模板,用于存储订阅关系、设备在线状态、分布式锁 */
|
||||||
|
@Resource
|
||||||
|
private StringRedisTemplate stringRedisTemplate;
|
||||||
|
|
||||||
|
/** 4G平台API地址:用于上报设备在线/离线状态 */
|
||||||
|
private static final String FOUR_G_API = "http://你的4G平台IP/api/device/status";
|
||||||
|
|
||||||
|
/** 心跳超时时间(秒):设备超过该时间未发心跳则判定为离线 */
|
||||||
|
private static final long HEARTBEAT_TIMEOUT = 60;
|
||||||
|
|
||||||
|
@Value("${spring.mqtt.default-topic}")
|
||||||
|
private String defaultTopic;
|
||||||
|
/**
|
||||||
|
* 初始化方法:项目启动时执行
|
||||||
|
* 1. 设置MQTT回调函数
|
||||||
|
* 2. 订阅核心主题
|
||||||
|
* 3. 启动离线设备检查线程
|
||||||
|
*
|
||||||
|
* @throws MqttException MQTT订阅失败异常
|
||||||
|
*/
|
||||||
|
@PostConstruct
|
||||||
|
public void subscribeTopics() throws MqttException {
|
||||||
|
// 定义需要监听的MQTT主题数组
|
||||||
|
// device/+/status:所有设备的业务状态(温湿度、开关等)
|
||||||
|
// device/+/heartbeat:所有设备的心跳包(用于判定在线状态)
|
||||||
|
// frontend/+/control/+:所有前端发送的设备控制指令
|
||||||
|
// 解析配置文件中的主题列表(逗号分隔)
|
||||||
|
String[] topics = defaultTopic.split(",");
|
||||||
|
// 对应主题的QoS级别(所有主题使用相同QoS,也可自定义多QoS配置)
|
||||||
|
int[] qos = new int[topics.length];
|
||||||
|
// 所有主题QoS=1
|
||||||
|
Arrays.fill(qos, 1);
|
||||||
|
|
||||||
|
// 设置MQTT消息回调:处理连接断开、消息接收、消息发布完成
|
||||||
|
mqttClient.setCallback(new MqttCallback() {
|
||||||
|
/**
|
||||||
|
* MQTT连接断开回调
|
||||||
|
* @param cause 断开原因
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void connectionLost(Throwable cause) {
|
||||||
|
System.err.println("【MQTT连接异常】连接断开:" + cause.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 收到MQTT消息回调:核心处理入口
|
||||||
|
* @param topic 消息主题
|
||||||
|
* @param message 消息内容
|
||||||
|
* @throws Exception 消息处理异常
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void messageArrived(String topic, MqttMessage message) throws Exception {
|
||||||
|
// 将字节数组转换为字符串,分发处理不同主题的消息
|
||||||
|
handleMessage(topic, new String(message.getPayload()));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 消息发布完成回调(仅日志记录,无业务逻辑)
|
||||||
|
* @param token 发布令牌
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void deliveryComplete(IMqttDeliveryToken token) {
|
||||||
|
// 可添加消息发布成功的日志
|
||||||
|
// System.out.println("消息发布完成:" + token.getTopics()[0]);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// 订阅主题:阻塞式操作,订阅成功后继续执行
|
||||||
|
mqttClient.subscribe(topics, qos);
|
||||||
|
System.out.println("【MQTT初始化】核心主题订阅成功,订阅列表:" + String.join(",", topics));
|
||||||
|
|
||||||
|
// 启动离线设备检查线程:独立线程,避免阻塞主线程
|
||||||
|
// 线程名:offline-check-thread,便于日志排查
|
||||||
|
new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
checkOfflineDevice();
|
||||||
|
}
|
||||||
|
}, "offline-check-thread").start();
|
||||||
|
System.out.println("【MQTT初始化】离线设备检查线程已启动");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 消息分发处理:根据主题类型路由到不同处理方法
|
||||||
|
* @param topic 消息主题
|
||||||
|
* @param payload 消息内容(JSON字符串)
|
||||||
|
*/
|
||||||
|
private void handleMessage(String topic, String payload) {
|
||||||
|
try {
|
||||||
|
System.out.println("【MQTT消息接收】topic=" + topic + ", payload=" + payload);
|
||||||
|
|
||||||
|
// 1. 处理设备业务状态主题:device/{deviceId}/status
|
||||||
|
if (topic.matches("device/\\w+/status")) {
|
||||||
|
handleDeviceStatus(topic, payload);
|
||||||
|
}
|
||||||
|
// 2. 处理设备心跳主题:device/{deviceId}/heartbeat
|
||||||
|
else if (topic.matches("device/\\w+/heartbeat")) {
|
||||||
|
handleDeviceHeartbeat(topic, payload);
|
||||||
|
}
|
||||||
|
// 3. 处理前端控制指令主题:frontend/{clientId}/control/{deviceId}
|
||||||
|
else if (topic.matches("frontend/\\w+/control/\\w+")) {
|
||||||
|
handleFrontendControl(topic, payload);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
System.err.println("【MQTT消息处理异常】topic=" + topic + ", 异常信息:" + e.getMessage());
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理设备业务状态消息
|
||||||
|
* 逻辑:
|
||||||
|
* 1. 解析设备ID
|
||||||
|
* 2. 补充设备在线状态到消息体
|
||||||
|
* 3. 查询订阅该设备的前端列表
|
||||||
|
* 4. 转发消息到每个前端的专属主题
|
||||||
|
*
|
||||||
|
* @param topic 消息主题(device/{deviceId}/status)
|
||||||
|
* @param payload 设备状态JSON字符串
|
||||||
|
* @throws MqttException 消息发布异常
|
||||||
|
*/
|
||||||
|
private void handleDeviceStatus(String topic, String payload) throws MqttException {
|
||||||
|
// 解析设备ID:主题格式为device/{deviceId}/status,分割后第2个元素是设备ID
|
||||||
|
String deviceId = topic.split("/")[1];
|
||||||
|
|
||||||
|
// 补充设备在线状态到payload(兼容JSON格式)
|
||||||
|
// 从Redis获取设备在线状态:device:online:{deviceId} → true/false
|
||||||
|
String onlineStatus = stringRedisTemplate.opsForValue().get("device:online:" + deviceId);
|
||||||
|
// 若Redis中无记录,默认离线
|
||||||
|
String finalOnlineStatus = (onlineStatus == null) ? "false" : onlineStatus;
|
||||||
|
// 拼接在线状态到JSON末尾(兼容无空格的JSON格式)
|
||||||
|
String newPayload = payload.replace("}", ",\"online\":\"" + finalOnlineStatus + "\"}");
|
||||||
|
|
||||||
|
// 查询Redis中订阅该设备的前端clientId列表:sub:{deviceId} → Set<String>
|
||||||
|
Set<String> subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId);
|
||||||
|
if (subscribedClients != null && !subscribedClients.isEmpty()) {
|
||||||
|
// 遍历所有订阅的前端,推送消息到前端专属主题
|
||||||
|
for (String clientId : subscribedClients) {
|
||||||
|
// 前端专属主题:frontend/{clientId}/device/{deviceId}/status
|
||||||
|
String frontendTopic = "frontend/" + clientId + "/device/" + deviceId + "/status";
|
||||||
|
// 发布消息(保留最新消息,前端订阅后可立即获取)
|
||||||
|
mqttMessageSender.publish(frontendTopic, newPayload);
|
||||||
|
System.out.println("【设备状态转发】设备" + deviceId + " → 前端" + clientId + ",主题:" + frontendTopic);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
System.out.println("【设备状态转发】设备" + deviceId + "无订阅前端,跳过转发");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理设备心跳消息
|
||||||
|
* 逻辑:
|
||||||
|
* 1. 解析设备ID
|
||||||
|
* 2. 更新Redis中设备最后心跳时间、在线状态
|
||||||
|
* 3. 异步上报在线状态到4G平台(避免阻塞MQTT消息处理)
|
||||||
|
*
|
||||||
|
* @param topic 消息主题(device/{deviceId}/heartbeat)
|
||||||
|
* @param payload 心跳包JSON字符串(可包含timestamp等字段)
|
||||||
|
*/
|
||||||
|
private void handleDeviceHeartbeat(String topic, String payload) {
|
||||||
|
// 解析设备ID:主题格式为device/{deviceId}/heartbeat,分割后第2个元素是设备ID
|
||||||
|
String deviceId = topic.split("/")[1];
|
||||||
|
|
||||||
|
// 获取当前时间戳(秒)
|
||||||
|
long currentTime = System.currentTimeMillis() / 1000;
|
||||||
|
|
||||||
|
// 更新Redis:存储最后心跳时间 → device:last_heartbeat:{deviceId}
|
||||||
|
stringRedisTemplate.opsForValue().set("device:last_heartbeat:" + deviceId, String.valueOf(currentTime));
|
||||||
|
// 更新Redis:存储在线状态,设置过期时间(心跳超时+10秒),避免Redis数据堆积
|
||||||
|
stringRedisTemplate.opsForValue().set(
|
||||||
|
"device:online:" + deviceId,
|
||||||
|
"true",
|
||||||
|
HEARTBEAT_TIMEOUT + 10,
|
||||||
|
TimeUnit.SECONDS
|
||||||
|
);
|
||||||
|
|
||||||
|
// 异步上报4G平台:使用独立线程,避免阻塞MQTT消息处理线程
|
||||||
|
new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
// 构造上报4G平台的JSON数据
|
||||||
|
String statusJson = String.format(
|
||||||
|
"{\"device_id\":\"%s\",\"online\":true,\"timestamp\":%d}",
|
||||||
|
deviceId, currentTime
|
||||||
|
);
|
||||||
|
|
||||||
|
// 调用4G平台API(POST请求)
|
||||||
|
RestTemplate restTemplate = new RestTemplate();
|
||||||
|
String response = restTemplate.postForObject(FOUR_G_API, statusJson, String.class);
|
||||||
|
|
||||||
|
System.out.println("【4G平台上报】设备" + deviceId + "在线状态上报成功,响应:" + response);
|
||||||
|
} catch (Exception e) {
|
||||||
|
System.err.println("【4G平台上报】设备" + deviceId + "在线状态上报失败,异常:" + e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理前端控制指令
|
||||||
|
* 逻辑:
|
||||||
|
* 1. 解析前端clientId、设备ID
|
||||||
|
* 2. 权限校验:验证前端是否有权操作该设备
|
||||||
|
* 3. 分布式锁:避免多前端同时控制同一设备
|
||||||
|
* 4. 记录操作日志
|
||||||
|
* 5. 转发指令到设备专属主题
|
||||||
|
*
|
||||||
|
* @param topic 消息主题(frontend/{clientId}/control/{deviceId})
|
||||||
|
* @param payload 控制指令JSON字符串
|
||||||
|
* @throws MqttException 消息发布异常
|
||||||
|
*/
|
||||||
|
private void handleFrontendControl(String topic, String payload) throws MqttException {
|
||||||
|
// 解析主题:frontend/{clientId}/control/{deviceId}
|
||||||
|
String[] parts = topic.split("/");
|
||||||
|
String clientId = parts[1]; // 前端唯一标识
|
||||||
|
String deviceId = parts[3]; // 目标设备ID
|
||||||
|
|
||||||
|
// 1. 权限校验:失败则推送错误消息给前端
|
||||||
|
if (!checkPermission(clientId, deviceId)) {
|
||||||
|
String errorTopic = "frontend/" + clientId + "/error/" + deviceId;
|
||||||
|
mqttMessageSender.publish(errorTopic, "{\"msg\":\"无设备操作权限\"}");
|
||||||
|
System.err.println("【前端指令处理】前端" + clientId + "操作设备" + deviceId + "权限校验失败");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. 分布式锁:lock:{deviceId},过期时间10秒(避免死锁)
|
||||||
|
String lockKey = "lock:" + deviceId;
|
||||||
|
Boolean lockSuccess = stringRedisTemplate.opsForValue().setIfAbsent(
|
||||||
|
lockKey,
|
||||||
|
clientId,
|
||||||
|
10,
|
||||||
|
TimeUnit.SECONDS
|
||||||
|
);
|
||||||
|
|
||||||
|
// 锁获取失败:设备忙,推送错误消息给前端
|
||||||
|
if (lockSuccess == null || !lockSuccess) {
|
||||||
|
String errorTopic = "frontend/" + clientId + "/error/" + deviceId;
|
||||||
|
mqttMessageSender.publish(errorTopic, "{\"msg\":\"设备忙,请稍后重试\"}");
|
||||||
|
System.err.println("【前端指令处理】前端" + clientId + "操作设备" + deviceId + "获取锁失败(设备忙)");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. 记录操作日志(示例:可替换为数据库存储)
|
||||||
|
System.out.println(String.format(
|
||||||
|
"【前端指令处理】前端%s于%s控制设备%s,指令:%s",
|
||||||
|
clientId, LocalDateTime.now(), deviceId, payload
|
||||||
|
));
|
||||||
|
|
||||||
|
// 4. 转发指令到设备专属主题:device/{deviceId}/control
|
||||||
|
String deviceTopic = "device/" + deviceId + "/control";
|
||||||
|
mqttMessageSender.publish(deviceTopic, payload);
|
||||||
|
System.out.println("【前端指令转发】前端" + clientId + " → 设备" + deviceId + ",主题:" + deviceTopic);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 权限校验逻辑(示例)
|
||||||
|
* 可根据业务需求扩展:
|
||||||
|
* 1. 管理员前端(clientId以admin_开头)拥有所有权限
|
||||||
|
* 2. 普通前端仅能操作Redis中绑定的设备(user_device:{clientId} → deviceId集合)
|
||||||
|
*
|
||||||
|
* @param clientId 前端唯一标识
|
||||||
|
* @param deviceId 设备ID
|
||||||
|
* @return true=有权限,false=无权限
|
||||||
|
*/
|
||||||
|
private boolean checkPermission(String clientId, String deviceId) {
|
||||||
|
// 管理员权限:clientId以admin_开头
|
||||||
|
if (clientId.startsWith("admin_")) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 普通用户权限:校验Redis中是否绑定该设备
|
||||||
|
return Boolean.TRUE.equals(stringRedisTemplate.opsForSet().isMember("user_device:" + clientId, deviceId));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 定时检查离线设备(无限循环,每10秒执行一次)
|
||||||
|
* 逻辑:
|
||||||
|
* 1. 获取所有设备的最后心跳记录
|
||||||
|
* 2. 判定是否离线(当前时间 - 最后心跳时间 > 心跳超时)
|
||||||
|
* 3. 更新Redis在线状态为离线
|
||||||
|
* 4. 上报离线状态到4G平台
|
||||||
|
*/
|
||||||
|
private void checkOfflineDevice() {
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
// 获取Redis中所有设备的最后心跳记录:device:last_heartbeat:*
|
||||||
|
Set<String> heartbeatKeys = stringRedisTemplate.keys("device:last_heartbeat:*");
|
||||||
|
|
||||||
|
// 无设备心跳记录,休眠10秒后继续
|
||||||
|
if (heartbeatKeys == null || heartbeatKeys.isEmpty()) {
|
||||||
|
Thread.sleep(10000);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 当前时间戳(秒)
|
||||||
|
long currentTime = System.currentTimeMillis() / 1000;
|
||||||
|
|
||||||
|
// 遍历所有设备心跳记录
|
||||||
|
for (String key : heartbeatKeys) {
|
||||||
|
// 解析设备ID:key格式为device:last_heartbeat:{deviceId}
|
||||||
|
String deviceId = key.split(":")[2];
|
||||||
|
// 获取最后心跳时间
|
||||||
|
String lastHeartbeatStr = stringRedisTemplate.opsForValue().get(key);
|
||||||
|
|
||||||
|
// 无心跳记录,跳过
|
||||||
|
if (lastHeartbeatStr == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 转换为长整型
|
||||||
|
long lastHeartbeat = Long.parseLong(lastHeartbeatStr);
|
||||||
|
|
||||||
|
// 判定离线:超过心跳超时时间未发心跳
|
||||||
|
if (currentTime - lastHeartbeat > HEARTBEAT_TIMEOUT) {
|
||||||
|
// 更新Redis在线状态为离线
|
||||||
|
stringRedisTemplate.opsForValue().set("device:online:" + deviceId, "false");
|
||||||
|
|
||||||
|
// 构造上报4G平台的JSON数据
|
||||||
|
String statusJson = String.format(
|
||||||
|
"{\"device_id\":\"%s\",\"online\":false,\"timestamp\":%d}",
|
||||||
|
deviceId, currentTime
|
||||||
|
);
|
||||||
|
|
||||||
|
// 调用4G平台API上报离线状态
|
||||||
|
RestTemplate restTemplate = new RestTemplate();
|
||||||
|
restTemplate.postForObject(FOUR_G_API, statusJson, String.class);
|
||||||
|
|
||||||
|
System.out.println("【离线设备检查】设备" + deviceId + "判定为离线,已上报4G平台");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 每10秒检查一次
|
||||||
|
Thread.sleep(10000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// 线程中断,退出循环
|
||||||
|
System.err.println("【离线设备检查】线程被中断,停止检查:" + e.getMessage());
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
break;
|
||||||
|
} catch (Exception e) {
|
||||||
|
System.err.println("【离线设备检查】异常:" + e.getMessage());
|
||||||
|
// 异常时休眠10秒,避免无限循环报错
|
||||||
|
try {
|
||||||
|
Thread.sleep(10000);
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 前端订阅设备状态接口(供Controller层调用)
|
||||||
|
* 逻辑:
|
||||||
|
* 1. 将前端clientId加入设备的订阅列表
|
||||||
|
* 2. 推送设备最新状态给前端(立即)
|
||||||
|
*
|
||||||
|
* @param clientId 前端唯一标识
|
||||||
|
* @param deviceId 设备ID
|
||||||
|
*/
|
||||||
|
public void subscribeDevice(String clientId, String deviceId) {
|
||||||
|
// 将前端clientId添加到设备的订阅列表:sub:{deviceId}
|
||||||
|
stringRedisTemplate.opsForSet().add("sub:" + deviceId, clientId);
|
||||||
|
System.out.println("【前端订阅】前端" + clientId + "订阅设备" + deviceId + "成功");
|
||||||
|
|
||||||
|
// 推送设备最新状态给前端(立即)
|
||||||
|
try {
|
||||||
|
// 从Redis获取设备最新状态:device:latest:{deviceId}
|
||||||
|
String latestStatus = stringRedisTemplate.opsForValue().get("device:latest:" + deviceId);
|
||||||
|
if (latestStatus != null) {
|
||||||
|
// 前端专属主题
|
||||||
|
String frontendTopic = "frontend/" + clientId + "/device/" + deviceId + "/status";
|
||||||
|
mqttMessageSender.publish(frontendTopic, latestStatus);
|
||||||
|
System.out.println("【前端订阅】推送设备" + deviceId + "最新状态给前端" + clientId);
|
||||||
|
} else {
|
||||||
|
System.out.println("【前端订阅】设备" + deviceId + "无最新状态,跳过推送");
|
||||||
|
}
|
||||||
|
} catch (MqttException e) {
|
||||||
|
System.err.println("【前端订阅】推送设备" + deviceId + "状态给前端" + clientId + "失败:" + e.getMessage());
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 前端取消订阅设备状态接口(供Controller层调用)
|
||||||
|
* 逻辑:从设备的订阅列表移除前端clientId
|
||||||
|
*
|
||||||
|
* @param clientId 前端唯一标识
|
||||||
|
* @param deviceId 设备ID
|
||||||
|
*/
|
||||||
|
public void unsubscribeDevice(String clientId, String deviceId) {
|
||||||
|
// 从设备订阅列表移除前端clientId
|
||||||
|
stringRedisTemplate.opsForSet().remove("sub:" + deviceId, clientId);
|
||||||
|
System.out.println("【前端取消订阅】前端" + clientId + "取消订阅设备" + deviceId + "成功");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -7,8 +7,10 @@ import com.agri.generator.dto.ApiConfigDTO;
|
||||||
import freemarker.template.Configuration;
|
import freemarker.template.Configuration;
|
||||||
import freemarker.template.Template;
|
import freemarker.template.Template;
|
||||||
import freemarker.template.TemplateException;
|
import freemarker.template.TemplateException;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
import org.springframework.core.convert.ConversionService;
|
import org.springframework.core.convert.ConversionService;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileWriter;
|
import java.io.FileWriter;
|
||||||
|
|
@ -16,8 +18,12 @@ import java.io.IOException;
|
||||||
import java.io.Writer;
|
import java.io.Writer;
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.time.format.DateTimeFormatter;
|
import java.time.format.DateTimeFormatter;
|
||||||
import java.util.*;
|
import java.util.ArrayList;
|
||||||
import java.util.stream.Collectors;
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 代码生成工具类
|
* 代码生成工具类
|
||||||
|
|
@ -35,7 +41,7 @@ public class CodeGenerator {
|
||||||
// 日期格式化器
|
// 日期格式化器
|
||||||
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||||
|
|
||||||
public CodeGenerator(ConversionService conversionService) {
|
public CodeGenerator(@Qualifier("mvcConversionService")ConversionService conversionService) {
|
||||||
this.conversionService = conversionService;
|
this.conversionService = conversionService;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue