目录
- 前言
- 一、环境准备
- 二、示例—Springboot集成mq(手动连接)
- 2.1 新建项目
- 2.2 引入依赖
- 2.3 生产者发送消息
- 2.4 消费者消费消息
- 三、示例2—Springboot集成mq(配置连接)
- 3.1 配置文件修改
- 3.2 添加依赖
- 3.3 消费者service类
- 3.4 生产者service类
- 3.5 测试controller类
- 3.6 启动服务
- 3.7 测试 同步消息1
- 四、结束语
前言
在分布式系统中,消息中间件是解耦服务、实现异步通信的核心组件。RocketMQ 作为阿里巴巴开源的高性能分布式消息中间件,凭借其高吞吐、低延迟、高可靠等特性,成为企业级应用的首选。而 Spring Boot 通过其“约定优于配置”的设计理念,极大简化了项目开发的复杂度。本文将通过 手动连接 和 配置连接 两种方式,详细讲解如何在 Spring Boot 中集成 RocketMQ,实现消息的同步与异步发送,并提供完整示例代码。
一、环境准备
在开始前,请确保:
- JDK 17、Maven 3.6+、Spring Boot 2.7+。
- 安装RocketMQ服务(本地或远程),推荐使用RocketMQ docker镜像快速搭建(可参考之前文章)。
二、示例—Springboot集成mq(手动连接)
通过编码方式初始化生产者,适用于需要动态控制资源的场景。
2.1 新建项目
‍
2.2 引入依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.4</version> </dependency>
2.3 生产者发送消息
- 构建一个消息生产者DefaultMQProducer实例,然后指定生产者组为jihaiProducer;
- 指定NameServer的地址:服务器的ip:9876,因为需要从NameServer拉取Broker的信息
- producer.start() 启动生产者
- 构建一个内容为:技海拾贝的消息www.devze.com1,然后指定这个消息往jihaishibei这个topic发送
- producer.sen编程客栈d(msg):发送消息,打印结果
- 关闭生产者
public class Producer { public static void main(String[] args) throws Exception { //创建一个生产者,指定生产者组为jihaiProducer DefaultMQProducer producer = new DefaultMQProducer("jihaiProducer"); // 指定NameServer的地址 producer.setNamesrvAddr("localhost:9876"); // 第一次发送可能会超时,设置的比较大 producer.setSendMsgTimeout(60000); // 启动生产者 producer.start(); // 创建一条消息 // topic为 jihaishibei // 消息内容为 技海拾贝的消息1 // tags 为 TagA Message msg = new Message("jihaishibei", "TagA", "技海拾贝的消息1 ".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 发送消息并得到消息的发送结果,然后打印 SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); // 关闭生产者 producer.shutdown(); } }
启动,发送消息
在控制台可以看到这条消息
这里就能看到发送消息的详细信息。
左下角消息的消费的消费,因为我们还没有消费者订阅这个topic,所以左下角没数据。
2.4 消费者消费消息
- 创建一个消费者实例对象,指定消费者组为jihaiConsumer
- 指定NameServer的地址:服务器的ip:9876
- 订阅 jihaishibei这个topic的所有信息
- consumer.registerMessageListener ,这个很重要,是注册一个监听器,这个监听器是当有消息的时候就会回调这个监听器,处理消息,所以需要用户实现这个接口,然后处理消息。
- 启动消费者
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { // 通过www.devze.compush模式消费消息,指定消费者组 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("jihaiConsumer"); // 指定NameServer的地址 consumer.setNamesrvAddr("localhost:9876"); // 订阅这个topic下的所有的消息 consumer.subscribe("jihaishibei", "*"); // 注册一个消费的监听器,当有消息的时候,会回调这个监听器来消费消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.printf("消费消息:%s", new String(msg.getBody()) + "\n"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者 consumer.start(); System.out.printf("Consumer Started.%n"); } }
启动服务,进行消费
在控制台,发现被jihaiConsumer这个消费者组给消费了。
三、示例2—Springboot集成mq(配置连接)
在 Spring Boot 中,可以通过配置文件简化 RocketMQ 的连接配置。以下是在 application.yml
文件中进行的配置:
3.1 配置文件修改
spring: application: name: rocket-mq-demo rocketmq: name-server: 127.0.0.1:9876 producer: group: rocket-mq-demo-producer send-message-timeout: 10000 comsumer: group: rocket-mq-demo-comsumer send-message-timeout: 10000
3.2 添加依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.3.0</version> </dependency>
根据需要选择最新版本,从中央仓库可以查看
https://central.sonatype.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter
备注:如果添加rocketmq-client依赖,先注释这个依赖
3.3 消费者service类
import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * messageModel=MessageModel.CLUSTERING * 监听模式,有消息就会消费 */ @Service @RocketMQMessageListener(topic = "jihaishibei-topic", consumerGroup = "rocket-mq-demo-comsumer", messageModel = MessageModel.CLUSTERING) public class RocketMQConsumer implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.printf("收到消息: %s\n", s); } }
3.4 生产者service类
import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; @Service public class RocketMQProducer { @Autowired private RocketMQTemplate rocketMQTemplate; private final String topic = "jihaishibei-topic"; // 1.同步发送消息 // 同步发送是指发送方发送一条消息后,会等待服务器返回确认信息后再进行后续操作。这种方式适用于需要可靠性保证的场景。 public void createAndSend(String message){ rocketMQTemplate.convertAndSend(topic, message); System.out.printf("同步发送结果: %s\n", message); } // 1.同步发送消息 // 同步发送是指发送方发送一条消息后,会等待服务器返回确认信息后再进行后续操作。这种方式适用于需要可靠性保证的场景。 public void sendSyncMessage(String message){ SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(message).build()); System.out.println(sendResult.getMsgId()); System.out.printf("同步发送结果: %s\n", message); } // 2.异步发送消息 // 异步发送是指发送方发送消息后,不等待服务器返回确认信息,而是通过回调接口处理返回结果。这种方式适用于对响应时间要求较高的场景。 public void sendAsyncMessage(String message){ rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(message).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.printf("异步发送成功: %s\n", sendResult); } @Override public void onException(Throwable throwable) { System.out.printf("异步发送失败: %s\n", throjavascriptwable.getMessage()); } }); } // 3.单向发送消息 // 单向发送是指发送方只负责发送消息,不关心服务器的响应。该方式适用于对可靠性要求不高的场景,如日志收集。 public void sendOneWayMessage(String message){ rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(message).build()); System.out.println("单向消息发送成功"); } }
3.5 测试controller类
@RequestMapping("api") @RestController public class RocketController { @Autowired private RocketMQProducer rocketMQProducer; @GetMapping("/createAndSend") public String createAndSend(@RequestParam String message) { rocketMQProducer.createAndSend(message); return "同步消息发送成功"; } @GetMapping("/sendSync") public String sendSync(@RequestParam String message) { rocketMQProducer.sendSyncMessage(message); return "同步消息发送成功"; } @GetMapping("/sendAsync") public String sendAsync(@RequestParam String message) { rocketMQProducer.sendAsyncMessage(message); return "异步消息发送中"; } @GetMapping("/sendOneWay") public String sendOneWay(@RequestParam String message) { rocketMQProducer.sendOneWayMessage(message); return "单向消息发送成功"; } }
3.6 启动服务
3.7 测试 同步消息1
同步消息2
异步消息
单向发送消息
四、结束语
本文通过手动连接与配置连接两种方式,展示了Spring Boot与RocketMQ的集成实践。手动连接帮助开发者理解底层API逻辑,而Spring Boot的配置化集成则极大简化了开发流程。无论是同步消息的可靠性保障,还是异步消息的性能优化,RocketMQ均能与Spring Boot无缝协作,为分布式系统提供高效的消息通信能力。
未来可进一步探索集群部署、消息重试机制及监控告警,以实现更健壮的消息服务。希望本文能为开发者快速构建高可用的消息系统提供参考!
到此这篇关于Spring Boot 集成 RocketMQ 全流程指南(从依赖引入到消息收发)的文章就介绍到这了,更多相关Spring Boot 集成 RocketMQ内容请搜索编程客栈(www.devze.comhttp://www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!
精彩评论