目录
- MQTT-SpringBoot
- 创建简单 SpringBoot 项目
- 导入必须依赖
- 增加MQTT相关配置
- 编写对应Java类
- 配置类
- 启动类增加开启配置
- 创建MQTT连接工厂类
- 接收消息处理类
- 接收消息配置类
- 发送消息配置类
- 发送消息网关接口类
- 发送消息服务类
- 测试验证
- 订阅消息验证
- 发送消息
- 订阅收到消息
- 发送消息验证
- 创建订阅者
- 订阅者接收消息
MQTT-SpringBoot
创建简单 SpringBoot 项目
导入必须依赖
pom.XML
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.study</groupId>
<artifactId>MqttDemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>SpringBootMqttDemo</name>
<description>SpringBootMqttDemo</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.6.13</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- spring boot项目web开发的起步依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- spring boot项目集成消息中间件基础依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<!-- spring boot项目和mqtt客户端集成起步依赖 -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.4.3</version>
</dependency>
<!-- lombok依赖 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- fastjson依赖 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
python </dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<configuration>
<mainClass&gjst;com.study.mqtt.demo.MqttDemoApplication</mainClass>
<skip>true</skip>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
增加MQTT相关配置
application.yml
spring:
mqtt:
# mqtt 服务器地址
url: tcp://192.168.40.128:1883
# 订阅客户端ID
subClientId: sub_client_id_1
# 订阅主题
subTopic: lq/iot/demo/
# 发布客户端ID
pubClientId: pub_client_id_1
# 用户名
username: admin
# 密码
password: admin123456
编写对应Java类
配置类
MqttConfig.java
package com.study.mqtt.demo.domain;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
@Data
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttConfig {
private String username;
private String password;
private String url;
private String subClientId ;
private String subTopic ;
private String pubClientId ;
}
启动类增加开启配置
MqttDemoApplication.java
package com.study.mqtt.demo;
import com.study.mqtt.demo.domain.MqttConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@SpringBootApplication
@EnableConfigurationProperties(value = MqttConfig.class)
public class MqttDemoApplication {
public static void main(String[] args) {
SpringApplication.run(MqttDemoApplication.class, args);
}
}
创建MQTT连接工厂类
MqttFactory.java
package com.study.mqtt.demo.factory;
import com.study.mqtt.demo.domain.MqttConfig;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
@Configuration
public class MqttFactory {
@Autowired
private MqttConfig mqttConfig;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
// 创建客户端工厂
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(mqttConfig.getUsername());
options.setPassword(mqttConfig.getPassword().toCharArray());
options.setServerURIs(new String[]{mqttConfig.getUrl()});
options.setCleanSession(true);
factory.setConnectionOptions(options);
return factory;
}
}
接收消息处理类
ReceiveMsgHandler.java
package com.study.mqtt.demo.handler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
@Component
public class ReceiveMsgHandler implements MessageHandler {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("接收到消息对象:" + message);
// 消息内容
Object payload = message.getPayload();
MessageHeaders headers = message.getHeaders();
Object mqttReceivedTopic = headers.get("mqtt_receivedTopic");
System.out.println("接收的消息主题:" + mqttReceivedTopic);
System.out.println("接收的消息内容:" + payload);
}
}
接收消息配置类
MqttInboundConfig.java
package com.study.mqtt.demo.inbound;
import com.study.mqtt.demo.domain.MqttConfig;
import com.study.mqtt.demo.factory.MqttFactory;
import com.study.mqtt.demo.handler.ReceiveMsgHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@Configuration
public class MqttInboundConfig {
@Autowired
private编程客栈 MqttConfig mqttConfig ;
@Autowired
private ReceiveMsgHandler receiveMsgHandler;
/**
* 配置消息接收通道
* @return
*/
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
/**
* 配置接收适配器
*/
@Bean
public MessageProducer messageProducer(MqttPahoClientFactory mqttPahoClientFactory) {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl() ,
mqttConfig.getSubClientId() ,
mqttPahoClientFactory , mqttConfig.getSubTopic().split(",")) ;
adapter.setConverter(new DefaultPahoMessageConverter());
// 质量服务等级
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter ;
}
/**
* 配置接收消息处理器
* @return
*/
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel") // 指定处理消息使用得通道
public MessageHandler messageHandler() {
return this.receiveMsgHandler ;
}
}
发送消息配置类
MqttOutbouwww.devze.comndConfig.java
package com.study.mqtt.demo.outbound;
import com.study.mqtt.demo.domain.MqttConfig;
import org.springframework.beans.factory.annowww.devze.comtation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@Configuration
public class MqttOutboundConfig {
@Autowired
private MqttConfig mqttConfig;
@Autowired
private MqttPahoClientFactory pahoClientFactory ;
@Bean
public MessageChannel mqttOutputChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "mqttOutputChannel")
public MessageHandler mqttOutboundMassageHandler() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfig.getUrl() ,
mqttConfig.getPubClientId() , pahoClientFactory ) ;
messageHandler.setAsync(true);
messageHandler.setDefaultQos(0);
messageHandler.setDefaultTopic("default");
return messageHandler ;
}
}
发送消息网关接口类
MqttGateway.java
package com.study.mqtt.demo.gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
public interface MqttGateway {
/**
* 发送mqtt消息
* @param topic 主题
* @param payload 内容
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
/**
* 发送包含qos的消息
* @param topic 主题
* @param qos 对消息处理的几种机制。
* * 0 发送成功就算完成,会出现消息丢失
* * 1 增加消息重试机制,消息发送失败会重新发送,会出现重复消息
* * 2 多了一次去重的动作,确保只有一次消息推给订阅者。
* @param payload 消息体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
发送消息服务类
MqttMsgSenderService.java
package com.study.mqtt.demo.service;
import com.study.mqtt.demo.gateway.MqttGateway;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MqttMsgSenderService {
@Autowired
private MqttGateway mqttGateway;
public void send(String topic, String payload) {
mqttGateway.sendToMqtt(topic, payload);
}
public void send(String topic, int qos, String payload) {
mqttGateway.sendToMqtt(topic, qos, payload);
}
}
测试验证
订阅消息验证
启动项目

发送消息
- 主题为配置文件中配置的订阅主题
lq/iot/demo/ - 发送时间:
2025-05-25 21:29:26:439

订阅收到消息
- 接收到消息的时间:
Sun May 25 21:29:26 GMT+08:00 2025 - 接收到的主题:
lq/iot/demo/ - 接收到的内容:
{ "msg":"spring boot mqtt demo" }

发送消息验证
- 编写测试类
- 发送主题:
sb/mqtt/test - 发送内容:
hello world !=> 当前时间
- 发送主题:
package com.study.mqtt.demo;
import com.study.mqtt.demo.service.MqttMsgSenderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.Date;
@SpringBootTest(classes = MqttDemoApplication.class)
class MqttDemoApplicationTests {
@Autowired
private MqttMsgSenderService mqttMsgSenderService;
@Test
void contextLoads() {
}
@Test
void sendMsg(){
mqttMsgSenderService.send("sb/mqtt/test", "hello world ! => " + new Date());
}
}
创建订阅者
订阅主题: sb/mqtt/test

运行测试类

订阅者接收消息
主题:sb/mqtt/test

到此这篇关于MQTT SpringBoot整合的文章就介绍到这了,更多相关MQTT SpringBoot整合内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!
加载中,请稍侯......
精彩评论