首页
学习
活动
专区
圈层
工具
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Spring Boot集成EMQX V3动态主题发布订阅

1. 添加依赖

在pom.xml中添加MQTT客户端依赖(如Eclipse Paho):

  <groupId>org.springframework.boot</groupId>

  <artifactId>spring-boot-starter-integration</artifactId>

  <groupId>org.springframework.integration</groupId>

  <artifactId>spring-integration-mqtt</artifactId>

  <groupId>org.eclipse.paho</groupId>

  <artifactId>org.eclipse.paho.client.mqttv3</artifactId>

  <version>1.2.5</version>

2. 配置MQTT连接参数

在application.yaml中配置EMQX连接信息:

# MQTT 配置

mqtt:

broker:

url: tcp://localhost:1883

client:

id: lhzz_client_a19758b6

username: admin # 认证用户名(如果 Broker 需要)

password: public # 认证密码(如果 Broker 需要)

3. 创建MQTT配置类

配置MqttClient Bean并处理连接:

package com.writchie.aiot.module.mqtt.config;

import org.eclipse.paho.client.mqttv3.MqttClient;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

import org.eclipse.paho.client.mqttv3.MqttException;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class MqttConfig {

  @Value("${mqtt.broker.url}")

  private String brokerUrl;

  @Value("${mqtt.client.id}")

  private String clientId;

  @Value("${mqtt.username}")

  private String username;

  @Value("${mqtt.password}")

  private String password;

  // 定义 MqttConnectOptions Bean

  @Bean

  public MqttConnectOptions mqttConnectOptions() {

      MqttConnectOptions options = new MqttConnectOptions();

      options.setUserName(username);

      options.setPassword(password.toCharArray());

      options.setConnectionTimeout(30);

      options.setKeepAliveInterval(60);

      options.setAutomaticReconnect(true);

      return options;

  }

  // 通过参数注入 MqttConnectOptions

  @Bean

  public MqttClient mqttClient(MqttConnectOptions mqttConnectOptions) throws MqttException {

      MqttClient client = new MqttClient(brokerUrl, clientId);

      client.connect(mqttConnectOptions); // 使用注入的配置

      return client;

  }

}

4. 实现动态主题发布

创建服务类用于发布消息到动态主题:

package com.writchie.aiot.module.mqtt.service;

import org.eclipse.paho.client.mqttv3.MqttClient;

import org.eclipse.paho.client.mqttv3.MqttException;

import org.eclipse.paho.client.mqttv3.MqttMessage;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

@Service

public class MqttDynamicPublisher {

  @Autowired

  private MqttClient mqttClient;

  public void publish(String topic, String payload,int qos) throws MqttException {

      MqttMessage message = new MqttMessage(payload.getBytes());

      message.setQos(qos); // 设置QoS级别

      mqttClient.publish(topic, message);

  }

}

5. 实现动态主题订阅

方案:运行时动态订阅

允许在运行时动态订阅指定主题:

package com.writchie.aiot.module.mqtt.service;

import org.eclipse.paho.client.mqttv3.MqttClient;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

import org.eclipse.paho.client.mqttv3.MqttException;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

@Service

public class MqttDynamicSubscriber {

  @Autowired

  private MqttClient mqttClient;

  @Autowired

  private MqttConnectOptions mqttConnectOptions; // 直接注入 Bean

  public void subscribe(String topic) throws MqttException {

      System.out.println("*****订阅初始化开始*******");

      if (!mqttClient.isConnected()) {

          mqttClient.connect(mqttConnectOptions); // 使用注入的配置重连

          System.out.println("*****使用注入的配置重连*****");

      }

      mqttClient.subscribe(topic, 1, (t, message) -> {

          System.out.println("**********订阅有新的消息**********");

          System.out.println("*****订阅主题: " + topic);

          System.out.println("*****订阅消息: " + new String(message.getPayload()));

          //TODO  具体业务逻辑

      });

      System.out.println("*****订阅初始化完成*****");

  }

}

6、使用示例

package com.writchie.aiot.module.mqtt.controller.admin;

import com.writchie.aiot.module.mqtt.service.MqttDynamicSubscriber;

import com.writchie.aiot.module.mqtt.service.MqttDynamicPublisher;

import io.swagger.v3.oas.annotations.Operation;

import io.swagger.v3.oas.annotations.tags.Tag;

import lombok.extern.slf4j.Slf4j;

import org.eclipse.paho.client.mqttv3.MqttException;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.validation.annotation.Validated;

import org.springframework.web.bind.annotation.*;

@Tag(name = "管理后台 - MQTT ")

@RestController

@RequestMapping("/mqtt")

@Validated

@Slf4j

public class MqttController {

  @Autowired

  private MqttDynamicSubscriber mqttDynamicSubscriber;

  @Autowired

  private MqttDynamicPublisher mqttDynamicPublisher;

  @Operation(summary = "动态消息订阅消息")

  @PostMapping("/subscribeByTopic")

  public String subscribeByTopic(@RequestParam String topic) throws MqttException {

      mqttDynamicSubscriber.subscribe(topic);

      return "订阅主题 :" + topic;

  }

  @Operation(summary = "动态消息发布消息")

  @PostMapping("/publishMessage")

  public String publishMessage(@RequestParam String topic, @RequestParam String message, @RequestParam(defaultValue = "1") String qos) throws MqttException {

      mqttDynamicPublisher.publish(topic, message,Integer.parseInt(qos));

      System.out.printf("主题:[%s],消息:[%s],qos:[%s] \n", topic, message, qos);

      return "向主题为: " + topic+"\t 发达消息内容为:"+message+"qos为:"+qos;

  }

}

7、测试结果

  • 发表于:
  • 原文链接https://page.om.qq.com/page/OsAOC_hKBoGDKgP_8441SqQg0
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券