diff --git a/agri-admin/src/main/java/com/agri/web/controller/mqtt/MqttController.java b/agri-admin/src/main/java/com/agri/web/controller/mqtt/MqttController.java new file mode 100644 index 0000000..f572ba7 --- /dev/null +++ b/agri-admin/src/main/java/com/agri/web/controller/mqtt/MqttController.java @@ -0,0 +1,53 @@ +package com.agri.web.controller.mqtt; + +import com.agri.framework.interceptor.MqttMessageHandler; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.Resource; +import java.util.Map; + +/** + * @Auther: jone + * @Date: 2026/1/15 - 01 - 15 - 23:45 + * @Description: com.agri.web.controller.mqtt + * @version: 1.0 + */ + +@RestController +@RequestMapping("/api/mqtt") +public class MqttController { + + @Resource + private MqttMessageHandler mqttMessageHandler; + + /** + * 前端订阅设备状态 + */ + @PostMapping("/subscribe") + public String subscribe(@RequestBody Map params) { + String clientId = params.get("clientId"); + String deviceId = params.get("deviceId"); + if (clientId == null || deviceId == null) { + return "参数错误"; + } + mqttMessageHandler.subscribeDevice(clientId, deviceId); + return "订阅成功"; + } + + /** + * 前端取消订阅设备状态 + */ + @PostMapping("/unsubscribe") + public String unsubscribe(@RequestBody Map params) { + String clientId = params.get("clientId"); + String deviceId = params.get("deviceId"); + if (clientId == null || deviceId == null) { + return "参数错误"; + } + mqttMessageHandler.unsubscribeDevice(clientId, deviceId); + return "取消订阅成功"; + } +} \ No newline at end of file diff --git a/agri-admin/src/main/resources/application-mqtt.yml b/agri-admin/src/main/resources/application-mqtt.yml new file mode 100644 index 0000000..0e1918e --- /dev/null +++ b/agri-admin/src/main/resources/application-mqtt.yml @@ -0,0 +1,18 @@ +spring: + # Redis配置(分布式锁/订阅关系) + redis: + host: 122.51.109.52 + port: 6379 + password: lld123 + database: 1 + # MQTT配置 + mqtt: + host: tcp://122.51.109.52:1883 # 设备/后端的MQTT TCP地址 + ws-host: wss://mq.xiaoces.com/mqtt # 前端的WebSocket地址 + username: admin # Mosquitto共用账号 + password: Admin#12345678 # Mosquitto密码 + client-id: springboot-backend-${random.uuid} # 后端客户端ID(唯一) + default-topic: device/+/status,device/+/heartbeat,frontend/+/control/+ # 后端监听的主题 + qos: 1 # 消息可靠性 + timeout: 60 # 连接超时 + keep-alive: 60 # 心跳间隔 \ No newline at end of file diff --git a/agri-admin/src/main/resources/application.yml b/agri-admin/src/main/resources/application.yml index eee0f13..b1ddf2c 100644 --- a/agri-admin/src/main/resources/application.yml +++ b/agri-admin/src/main/resources/application.yml @@ -61,7 +61,7 @@ spring: # 国际化资源文件路径 basename: i18n/messages profiles: - active: druid + active: druid,mqtt # 文件上传 servlet: multipart: diff --git a/agri-common/pom.xml b/agri-common/pom.xml index af02191..5034cd1 100644 --- a/agri-common/pom.xml +++ b/agri-common/pom.xml @@ -130,7 +130,28 @@ com.baomidou mybatis-plus-boot-starter + + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.5 + + + + org.springframework.integration + spring-integration-mqtt + + + + org.springframework.boot + spring-boot-starter-data-redis + + + + org.springframework.boot + spring-boot-starter-web diff --git a/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java b/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java new file mode 100644 index 0000000..9d4ed23 --- /dev/null +++ b/agri-framework/src/main/java/com/agri/framework/config/MqttConfig.java @@ -0,0 +1,179 @@ +package com.agri.framework.config; + +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * MQTT核心配置类 + * 功能: + * 1. 初始化MQTT客户端连接(对接Mosquitto) + * 2. 提供MQTT消息发送工具类 + * 3. 配置连接参数(账号、密码、重连、保活等) + * 适配JDK 8语法,兼容SpringBoot环境 + * + * @Author: jone + * @Date: 2026/1/16 + * @Version: 1.0 + */ +@Configuration +public class MqttConfig { + + /** Mosquitto服务器地址(TCP协议):格式为 tcp://IP:端口 */ + @Value("${spring.mqtt.host}") + private String host; + + /** MQTT认证用户名(Mosquitto配置的共用账号) */ + @Value("${spring.mqtt.username}") + private String username; + + /** MQTT认证密码(Mosquitto配置的共用密码) */ + @Value("${spring.mqtt.password}") + private String password; + + /** 后端MQTT客户端ID:需唯一,避免重复连接 */ + @Value("${spring.mqtt.client-id}") + private String clientId; + + /** MQTT消息QoS级别:1=至少一次送达,0=最多一次,2=恰好一次 */ + @Value("${spring.mqtt.qos:1}") + private int qos; + + /** MQTT连接超时时间(秒):超过该时间未连接成功则判定为失败 */ + @Value("${spring.mqtt.timeout:60}") + private int timeout; + + /** MQTT保活间隔(秒):客户端定期发送心跳给服务端,维持连接 */ + @Value("${spring.mqtt.keep-alive:60}") + private int keepAlive; + + /** + * 创建MQTT客户端实例(Spring Bean) + * 核心逻辑: + * 1. 配置连接参数(账号、密码、重连、保活等) + * 2. 初始化客户端并建立连接 + * 3. 返回客户端实例供其他类注入使用 + * + * @return MqttClient MQTT客户端实例 + * @throws MqttException MQTT连接/初始化异常 + */ + @Bean + public MqttClient mqttClient() throws MqttException { + // 1. 初始化连接配置项 + MqttConnectOptions connectOptions = getMqttConnectOptions(); + + // 2. 初始化MQTT客户端 + // MemoryPersistence:使用内存存储会话,不持久化到磁盘(适合后端服务) + MemoryPersistence persistence = new MemoryPersistence(); + MqttClient mqttClient = new MqttClient(host, clientId, persistence); + + // 3. 建立MQTT连接 + if (!mqttClient.isConnected()) { + mqttClient.connect(connectOptions); + System.out.println("【MQTT连接成功】服务器地址:" + host + ",客户端ID:" + clientId); + } else { + System.out.println("【MQTT连接状态】已连接,无需重复初始化"); + } + + return mqttClient; + } + + private MqttConnectOptions getMqttConnectOptions() { + MqttConnectOptions connectOptions = new MqttConnectOptions(); + // 设置MQTT认证账号 + connectOptions.setUserName(username); + // 设置MQTT认证密码(转换为字符数组,符合API要求) + connectOptions.setPassword(password.toCharArray()); + // 设置连接超时时间(秒) + connectOptions.setConnectionTimeout(timeout); + // 设置保活间隔(秒):客户端每隔该时间发送一次心跳 + connectOptions.setKeepAliveInterval(keepAlive); + // 关闭清除会话:false=重连后保留订阅关系(若不需要离线消息可设为true) + connectOptions.setCleanSession(true); + // 开启自动重连:连接断开后自动尝试重连,提升稳定性 + connectOptions.setAutomaticReconnect(true); + // 设置最大重连间隔(秒):避免频繁重连消耗资源 + connectOptions.setMaxReconnectDelay(30); + return connectOptions; + } + + /** + * 创建MQTT消息发送工具类(Spring Bean) + * 封装消息发布逻辑,供业务层调用 + * + * @param mqttClient MQTT客户端实例(自动注入) + * @return MqttMessageSender 消息发送工具类 + */ + @Bean + public MqttMessageSender mqttMessageSender(MqttClient mqttClient) { + return new MqttMessageSender(mqttClient, qos); + } + + /** + * MQTT消息发送工具类(内部类) + * 封装消息发布的核心逻辑,简化业务层调用 + */ + public static class MqttMessageSender { + /** MQTT客户端实例 */ + private final MqttClient client; + /** 默认QoS级别 */ + private final int defaultQos; + + /** + * 构造方法 + * @param client MQTT客户端实例 + * @param defaultQos 默认QoS级别 + */ + public MqttMessageSender(MqttClient client, int defaultQos) { + this.client = client; + this.defaultQos = defaultQos; + } + + /** + * 发布MQTT消息(使用默认QoS) + * @param topic 消息主题 + * @param payload 消息内容(JSON字符串) + * @throws MqttException 消息发布异常 + */ + public void publish(String topic, String payload) throws MqttException { + publish(topic, payload, defaultQos); + } + + /** + * 发布MQTT消息(自定义QoS) + * 核心逻辑: + * 1. 校验客户端连接状态 + * 2. 构建MQTT消息对象 + * 3. 发布消息到指定主题 + * + * @param topic 消息主题 + * @param payload 消息内容(JSON字符串) + * @param qos 消息QoS级别 + * @throws MqttException 消息发布异常(连接断开、主题无效等) + */ + public void publish(String topic, String payload, int qos) throws MqttException { + // 1. 校验客户端是否已连接 + if (!client.isConnected()) { + throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED); + } + + // 2. 构建MQTT消息对象 + MqttMessage message = new MqttMessage(); + // 设置消息内容(转换为字节数组) + message.setPayload(payload.getBytes()); + // 设置QoS级别 + message.setQos(qos); + // 设置保留消息:true=服务端保留该主题的最新消息,新订阅者可立即获取 + message.setRetained(true); + + // 3. 发布消息 + client.publish(topic, message); + System.out.println("【MQTT消息发布成功】主题:" + topic + ",内容:" + payload); + } + } +} \ No newline at end of file diff --git a/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java new file mode 100644 index 0000000..6d7900a --- /dev/null +++ b/agri-framework/src/main/java/com/agri/framework/interceptor/MqttMessageHandler.java @@ -0,0 +1,441 @@ +package com.agri.framework.interceptor; + +import com.agri.framework.config.MqttConfig; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * MQTT消息处理器 + * 核心功能: + * 1. 订阅设备状态/心跳、前端控制指令主题 + * 2. 转发设备状态到订阅的前端 + * 3. 处理设备心跳,维护在线状态并上报4G平台 + * 4. 处理前端控制指令,权限校验+分布式锁+转发给设备 + * 5. 定时检查离线设备,更新状态并上报 + * + * @Auther: lld + * @Date: 2026/1/15 - 01 - 15 - 23:43 + * @version: 1.0 + */ +@Component +public class MqttMessageHandler { + + /** MQTT客户端(由MqttConfig配置类注入) */ + @Resource + private MqttClient mqttClient; + + /** MQTT消息发送工具类(由MqttConfig配置类注入) */ + @Resource + private MqttConfig.MqttMessageSender mqttMessageSender; + + /** Redis模板,用于存储订阅关系、设备在线状态、分布式锁 */ + @Resource + private StringRedisTemplate stringRedisTemplate; + + /** 4G平台API地址:用于上报设备在线/离线状态 */ + private static final String FOUR_G_API = "http://你的4G平台IP/api/device/status"; + + /** 心跳超时时间(秒):设备超过该时间未发心跳则判定为离线 */ + private static final long HEARTBEAT_TIMEOUT = 60; + + @Value("${spring.mqtt.default-topic}") + private String defaultTopic; + /** + * 初始化方法:项目启动时执行 + * 1. 设置MQTT回调函数 + * 2. 订阅核心主题 + * 3. 启动离线设备检查线程 + * + * @throws MqttException MQTT订阅失败异常 + */ + @PostConstruct + public void subscribeTopics() throws MqttException { + // 定义需要监听的MQTT主题数组 + // device/+/status:所有设备的业务状态(温湿度、开关等) + // device/+/heartbeat:所有设备的心跳包(用于判定在线状态) + // frontend/+/control/+:所有前端发送的设备控制指令 + // 解析配置文件中的主题列表(逗号分隔) + String[] topics = defaultTopic.split(","); + // 对应主题的QoS级别(所有主题使用相同QoS,也可自定义多QoS配置) + int[] qos = new int[topics.length]; + // 所有主题QoS=1 + Arrays.fill(qos, 1); + + // 设置MQTT消息回调:处理连接断开、消息接收、消息发布完成 + mqttClient.setCallback(new MqttCallback() { + /** + * MQTT连接断开回调 + * @param cause 断开原因 + */ + @Override + public void connectionLost(Throwable cause) { + System.err.println("【MQTT连接异常】连接断开:" + cause.getMessage()); + } + + /** + * 收到MQTT消息回调:核心处理入口 + * @param topic 消息主题 + * @param message 消息内容 + * @throws Exception 消息处理异常 + */ + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + // 将字节数组转换为字符串,分发处理不同主题的消息 + handleMessage(topic, new String(message.getPayload())); + } + + /** + * 消息发布完成回调(仅日志记录,无业务逻辑) + * @param token 发布令牌 + */ + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + // 可添加消息发布成功的日志 + // System.out.println("消息发布完成:" + token.getTopics()[0]); + } + }); + + // 订阅主题:阻塞式操作,订阅成功后继续执行 + mqttClient.subscribe(topics, qos); + System.out.println("【MQTT初始化】核心主题订阅成功,订阅列表:" + String.join(",", topics)); + + // 启动离线设备检查线程:独立线程,避免阻塞主线程 + // 线程名:offline-check-thread,便于日志排查 + new Thread(new Runnable() { + @Override + public void run() { + checkOfflineDevice(); + } + }, "offline-check-thread").start(); + System.out.println("【MQTT初始化】离线设备检查线程已启动"); + } + + /** + * 消息分发处理:根据主题类型路由到不同处理方法 + * @param topic 消息主题 + * @param payload 消息内容(JSON字符串) + */ + private void handleMessage(String topic, String payload) { + try { + System.out.println("【MQTT消息接收】topic=" + topic + ", payload=" + payload); + + // 1. 处理设备业务状态主题:device/{deviceId}/status + if (topic.matches("device/\\w+/status")) { + handleDeviceStatus(topic, payload); + } + // 2. 处理设备心跳主题:device/{deviceId}/heartbeat + else if (topic.matches("device/\\w+/heartbeat")) { + handleDeviceHeartbeat(topic, payload); + } + // 3. 处理前端控制指令主题:frontend/{clientId}/control/{deviceId} + else if (topic.matches("frontend/\\w+/control/\\w+")) { + handleFrontendControl(topic, payload); + } + } catch (Exception e) { + System.err.println("【MQTT消息处理异常】topic=" + topic + ", 异常信息:" + e.getMessage()); + e.printStackTrace(); + } + } + + /** + * 处理设备业务状态消息 + * 逻辑: + * 1. 解析设备ID + * 2. 补充设备在线状态到消息体 + * 3. 查询订阅该设备的前端列表 + * 4. 转发消息到每个前端的专属主题 + * + * @param topic 消息主题(device/{deviceId}/status) + * @param payload 设备状态JSON字符串 + * @throws MqttException 消息发布异常 + */ + private void handleDeviceStatus(String topic, String payload) throws MqttException { + // 解析设备ID:主题格式为device/{deviceId}/status,分割后第2个元素是设备ID + String deviceId = topic.split("/")[1]; + + // 补充设备在线状态到payload(兼容JSON格式) + // 从Redis获取设备在线状态:device:online:{deviceId} → true/false + String onlineStatus = stringRedisTemplate.opsForValue().get("device:online:" + deviceId); + // 若Redis中无记录,默认离线 + String finalOnlineStatus = (onlineStatus == null) ? "false" : onlineStatus; + // 拼接在线状态到JSON末尾(兼容无空格的JSON格式) + String newPayload = payload.replace("}", ",\"online\":\"" + finalOnlineStatus + "\"}"); + + // 查询Redis中订阅该设备的前端clientId列表:sub:{deviceId} → Set + Set subscribedClients = stringRedisTemplate.opsForSet().members("sub:" + deviceId); + if (subscribedClients != null && !subscribedClients.isEmpty()) { + // 遍历所有订阅的前端,推送消息到前端专属主题 + for (String clientId : subscribedClients) { + // 前端专属主题:frontend/{clientId}/device/{deviceId}/status + String frontendTopic = "frontend/" + clientId + "/device/" + deviceId + "/status"; + // 发布消息(保留最新消息,前端订阅后可立即获取) + mqttMessageSender.publish(frontendTopic, newPayload); + System.out.println("【设备状态转发】设备" + deviceId + " → 前端" + clientId + ",主题:" + frontendTopic); + } + } else { + System.out.println("【设备状态转发】设备" + deviceId + "无订阅前端,跳过转发"); + } + } + + /** + * 处理设备心跳消息 + * 逻辑: + * 1. 解析设备ID + * 2. 更新Redis中设备最后心跳时间、在线状态 + * 3. 异步上报在线状态到4G平台(避免阻塞MQTT消息处理) + * + * @param topic 消息主题(device/{deviceId}/heartbeat) + * @param payload 心跳包JSON字符串(可包含timestamp等字段) + */ + private void handleDeviceHeartbeat(String topic, String payload) { + // 解析设备ID:主题格式为device/{deviceId}/heartbeat,分割后第2个元素是设备ID + String deviceId = topic.split("/")[1]; + + // 获取当前时间戳(秒) + long currentTime = System.currentTimeMillis() / 1000; + + // 更新Redis:存储最后心跳时间 → device:last_heartbeat:{deviceId} + stringRedisTemplate.opsForValue().set("device:last_heartbeat:" + deviceId, String.valueOf(currentTime)); + // 更新Redis:存储在线状态,设置过期时间(心跳超时+10秒),避免Redis数据堆积 + stringRedisTemplate.opsForValue().set( + "device:online:" + deviceId, + "true", + HEARTBEAT_TIMEOUT + 10, + TimeUnit.SECONDS + ); + + // 异步上报4G平台:使用独立线程,避免阻塞MQTT消息处理线程 + new Thread(new Runnable() { + @Override + public void run() { + try { + // 构造上报4G平台的JSON数据 + String statusJson = String.format( + "{\"device_id\":\"%s\",\"online\":true,\"timestamp\":%d}", + deviceId, currentTime + ); + + // 调用4G平台API(POST请求) + RestTemplate restTemplate = new RestTemplate(); + String response = restTemplate.postForObject(FOUR_G_API, statusJson, String.class); + + System.out.println("【4G平台上报】设备" + deviceId + "在线状态上报成功,响应:" + response); + } catch (Exception e) { + System.err.println("【4G平台上报】设备" + deviceId + "在线状态上报失败,异常:" + e.getMessage()); + } + } + }).start(); + } + + /** + * 处理前端控制指令 + * 逻辑: + * 1. 解析前端clientId、设备ID + * 2. 权限校验:验证前端是否有权操作该设备 + * 3. 分布式锁:避免多前端同时控制同一设备 + * 4. 记录操作日志 + * 5. 转发指令到设备专属主题 + * + * @param topic 消息主题(frontend/{clientId}/control/{deviceId}) + * @param payload 控制指令JSON字符串 + * @throws MqttException 消息发布异常 + */ + private void handleFrontendControl(String topic, String payload) throws MqttException { + // 解析主题:frontend/{clientId}/control/{deviceId} + String[] parts = topic.split("/"); + String clientId = parts[1]; // 前端唯一标识 + String deviceId = parts[3]; // 目标设备ID + + // 1. 权限校验:失败则推送错误消息给前端 + if (!checkPermission(clientId, deviceId)) { + String errorTopic = "frontend/" + clientId + "/error/" + deviceId; + mqttMessageSender.publish(errorTopic, "{\"msg\":\"无设备操作权限\"}"); + System.err.println("【前端指令处理】前端" + clientId + "操作设备" + deviceId + "权限校验失败"); + return; + } + + // 2. 分布式锁:lock:{deviceId},过期时间10秒(避免死锁) + String lockKey = "lock:" + deviceId; + Boolean lockSuccess = stringRedisTemplate.opsForValue().setIfAbsent( + lockKey, + clientId, + 10, + TimeUnit.SECONDS + ); + + // 锁获取失败:设备忙,推送错误消息给前端 + if (lockSuccess == null || !lockSuccess) { + String errorTopic = "frontend/" + clientId + "/error/" + deviceId; + mqttMessageSender.publish(errorTopic, "{\"msg\":\"设备忙,请稍后重试\"}"); + System.err.println("【前端指令处理】前端" + clientId + "操作设备" + deviceId + "获取锁失败(设备忙)"); + return; + } + + // 3. 记录操作日志(示例:可替换为数据库存储) + System.out.println(String.format( + "【前端指令处理】前端%s于%s控制设备%s,指令:%s", + clientId, LocalDateTime.now(), deviceId, payload + )); + + // 4. 转发指令到设备专属主题:device/{deviceId}/control + String deviceTopic = "device/" + deviceId + "/control"; + mqttMessageSender.publish(deviceTopic, payload); + System.out.println("【前端指令转发】前端" + clientId + " → 设备" + deviceId + ",主题:" + deviceTopic); + } + + /** + * 权限校验逻辑(示例) + * 可根据业务需求扩展: + * 1. 管理员前端(clientId以admin_开头)拥有所有权限 + * 2. 普通前端仅能操作Redis中绑定的设备(user_device:{clientId} → deviceId集合) + * + * @param clientId 前端唯一标识 + * @param deviceId 设备ID + * @return true=有权限,false=无权限 + */ + private boolean checkPermission(String clientId, String deviceId) { + // 管理员权限:clientId以admin_开头 + if (clientId.startsWith("admin_")) { + return true; + } + + // 普通用户权限:校验Redis中是否绑定该设备 + return Boolean.TRUE.equals(stringRedisTemplate.opsForSet().isMember("user_device:" + clientId, deviceId)); + } + + /** + * 定时检查离线设备(无限循环,每10秒执行一次) + * 逻辑: + * 1. 获取所有设备的最后心跳记录 + * 2. 判定是否离线(当前时间 - 最后心跳时间 > 心跳超时) + * 3. 更新Redis在线状态为离线 + * 4. 上报离线状态到4G平台 + */ + private void checkOfflineDevice() { + while (true) { + try { + // 获取Redis中所有设备的最后心跳记录:device:last_heartbeat:* + Set heartbeatKeys = stringRedisTemplate.keys("device:last_heartbeat:*"); + + // 无设备心跳记录,休眠10秒后继续 + if (heartbeatKeys == null || heartbeatKeys.isEmpty()) { + Thread.sleep(10000); + continue; + } + + // 当前时间戳(秒) + long currentTime = System.currentTimeMillis() / 1000; + + // 遍历所有设备心跳记录 + for (String key : heartbeatKeys) { + // 解析设备ID:key格式为device:last_heartbeat:{deviceId} + String deviceId = key.split(":")[2]; + // 获取最后心跳时间 + String lastHeartbeatStr = stringRedisTemplate.opsForValue().get(key); + + // 无心跳记录,跳过 + if (lastHeartbeatStr == null) { + continue; + } + + // 转换为长整型 + long lastHeartbeat = Long.parseLong(lastHeartbeatStr); + + // 判定离线:超过心跳超时时间未发心跳 + if (currentTime - lastHeartbeat > HEARTBEAT_TIMEOUT) { + // 更新Redis在线状态为离线 + stringRedisTemplate.opsForValue().set("device:online:" + deviceId, "false"); + + // 构造上报4G平台的JSON数据 + String statusJson = String.format( + "{\"device_id\":\"%s\",\"online\":false,\"timestamp\":%d}", + deviceId, currentTime + ); + + // 调用4G平台API上报离线状态 + RestTemplate restTemplate = new RestTemplate(); + restTemplate.postForObject(FOUR_G_API, statusJson, String.class); + + System.out.println("【离线设备检查】设备" + deviceId + "判定为离线,已上报4G平台"); + } + } + + // 每10秒检查一次 + Thread.sleep(10000); + } catch (InterruptedException e) { + // 线程中断,退出循环 + System.err.println("【离线设备检查】线程被中断,停止检查:" + e.getMessage()); + Thread.currentThread().interrupt(); + break; + } catch (Exception e) { + System.err.println("【离线设备检查】异常:" + e.getMessage()); + // 异常时休眠10秒,避免无限循环报错 + try { + Thread.sleep(10000); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + } + } + + /** + * 前端订阅设备状态接口(供Controller层调用) + * 逻辑: + * 1. 将前端clientId加入设备的订阅列表 + * 2. 推送设备最新状态给前端(立即) + * + * @param clientId 前端唯一标识 + * @param deviceId 设备ID + */ + public void subscribeDevice(String clientId, String deviceId) { + // 将前端clientId添加到设备的订阅列表:sub:{deviceId} + stringRedisTemplate.opsForSet().add("sub:" + deviceId, clientId); + System.out.println("【前端订阅】前端" + clientId + "订阅设备" + deviceId + "成功"); + + // 推送设备最新状态给前端(立即) + try { + // 从Redis获取设备最新状态:device:latest:{deviceId} + String latestStatus = stringRedisTemplate.opsForValue().get("device:latest:" + deviceId); + if (latestStatus != null) { + // 前端专属主题 + String frontendTopic = "frontend/" + clientId + "/device/" + deviceId + "/status"; + mqttMessageSender.publish(frontendTopic, latestStatus); + System.out.println("【前端订阅】推送设备" + deviceId + "最新状态给前端" + clientId); + } else { + System.out.println("【前端订阅】设备" + deviceId + "无最新状态,跳过推送"); + } + } catch (MqttException e) { + System.err.println("【前端订阅】推送设备" + deviceId + "状态给前端" + clientId + "失败:" + e.getMessage()); + e.printStackTrace(); + } + } + + /** + * 前端取消订阅设备状态接口(供Controller层调用) + * 逻辑:从设备的订阅列表移除前端clientId + * + * @param clientId 前端唯一标识 + * @param deviceId 设备ID + */ + public void unsubscribeDevice(String clientId, String deviceId) { + // 从设备订阅列表移除前端clientId + stringRedisTemplate.opsForSet().remove("sub:" + deviceId, clientId); + System.out.println("【前端取消订阅】前端" + clientId + "取消订阅设备" + deviceId + "成功"); + } +} \ No newline at end of file diff --git a/agri-generator/src/main/java/com/agri/generator/util/CodeGenerator.java b/agri-generator/src/main/java/com/agri/generator/util/CodeGenerator.java index 54cc35d..65e109d 100644 --- a/agri-generator/src/main/java/com/agri/generator/util/CodeGenerator.java +++ b/agri-generator/src/main/java/com/agri/generator/util/CodeGenerator.java @@ -7,8 +7,10 @@ import com.agri.generator.dto.ApiConfigDTO; import freemarker.template.Configuration; import freemarker.template.Template; import freemarker.template.TemplateException; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.core.convert.ConversionService; import org.springframework.stereotype.Component; + import javax.annotation.PostConstruct; import java.io.File; import java.io.FileWriter; @@ -16,8 +18,12 @@ import java.io.IOException; import java.io.Writer; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; -import java.util.*; -import java.util.stream.Collectors; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; /** * 代码生成工具类 @@ -35,7 +41,7 @@ public class CodeGenerator { // 日期格式化器 private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - public CodeGenerator(ConversionService conversionService) { + public CodeGenerator(@Qualifier("mvcConversionService")ConversionService conversionService) { this.conversionService = conversionService; }