开发者

RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)

开发者 https://www.devze.com 2025-08-21 10:37 出处:网络 作者: 代码怪兽大作战
目录 一、什么是 RabbitMQ 延时队列? 二、安装前准备✅ RabbitMQ 环境要求 三、安装延时队列插件 插件名称:1️⃣ 下载插件2️⃣ 复制插件到插件目录3️⃣ 启用插件4️⃣ 重启 RabbitMQ 容器(建议)5️⃣ 验证插件
目录
  • 一、什么是 RabbitMQ 延时队列?
  • 二、安装前准备
    • ✅ RabbitMQ 环境要求
  • 三、安装延时队列插件
    • 插件名称:
    • 1️⃣ 下载插件
    • 2️⃣ 复制插件到插件目录
    • 3️⃣ 启用插件
    • 4️⃣ 重启 RabbitMQ 容器(建议)
    • 5️⃣ 验证插件是否启用成功
    • 6️⃣ 配置容器重启时自动加载插件
  • 四、使用延时队列功能(Java 示例)
    • 创建延时交换机与队列
    • 发送延时消息
  • 五、注意事项
    • 六、Web 管理界面配置示例
      • 七、延时队列 VS RabbitMQ 原生 TTL 实现
        • ✅ 八、常见问题javascript FAQ
          • 1. 插件安装后没有效果?
          • 2. 插件是否与集群兼容?
        • 总结

          RabbitMQ 默认并不支持“真正意义上的延迟队列”。实现延时消息最常用的方式就是安装 rabbitmq_delayed_message_exchange 插件。

          本文将从插件安装、启用、配置、使用全流程带你掌握 RabbitMQ 延时队列的正确姿势。

          一、什么是 RabbitMQ 延时队列?

          延时队列的核心功能就是:让消息延迟一定时间后再投递到消费者

          典型应用场景包括

          • 订单支付超时取消
          • 秒杀未支付自动释放库存
          • 用户注册后延时发送欢迎邮件
          • 自动关闭未操作的工单等

          二、安装前准备

          ✅ RabbitMQ 环境要求

          • RabbitMQ ≥ 3.6.x
          • Erlang ≥ 19.x(越新越好)
          • 插件版本需与 RabbitMQ 版本匹配(注意版本兼容

          docker 搭建 RabbitMQ

          三、安装延时队列插件

          插件名称:

          rabbitmq_delayed_message_exchange

          1️⃣ 下载插件

          你可以从 github 或 RabbitMQ 官网下载预编译的 .ez 文件。

          插件发布地址:

          • https://www.rabbitmq.com/community-plugins.html

          RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)

          • GitHub 地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

          进入web端后左上角有显示当前安装的RabbitMQ版本

          RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)

          需要保证RabbitMQ插件的大版本与RabbitMQ保持一致,否则会启动失败

          例如(RabbitMQ 4.0.*):

          RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)

          wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v4.0.7/rabbitmq_delayed_message_exchange-v4.0.7.ez
          

          2️⃣ 复制插件到插件目录

          将插件拷贝到容器内plugins目录下

          docker cp /root/rabbitmq_delayed_message_exchange-v4.0.7.ez rabbitmq:/plugins

          .ez 文件复制进去:

          // 进入容器 我这里使用容器名字 也可以用容器id进入
          docker exec -it rabbitmq /bin/bash
          // 移动到plugins目录下
          cd plugins
          // 查看是否上传成功
          ls

          RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)

          3️⃣ 启用插件

          rabbitmq-plugins enable rabbitmq_delayed_message_exchange

          输出类似:

          RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)

          检查插件是否存在于镜像中

          容器内执行:

          rabbitmq-plugins list

          RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)

          这表明:

          ✅ 插件 rabbitmq_delayed_message_exchange 已经启用并正在运行。

          4️⃣ 重启 RabbitMQ 容器(建议)

          docker restart rabbitmq

          5️⃣ 验证插件是否启用成功

          容器启动成功之后,登录RabbitMQ的管理界面(ip:15672 访问web界面),找到ExchangesTab页。点击Add a new exchange,在Type里面查看是否有x-delayed-message选项,如果存在就代表插件安装成功。

          RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)

          6️⃣ 配置容器重启时自动加载插件

          为了确保 RabbitMQ 容器每次重启时自动启用指定插件,需要在容器内 /etc/rabbitmq/enabled_plugins 文件中配置插件列表。

          进入正在运行的容器:

          docker exec -it <容器名称或ID> /bin/bash

          编辑或创建插件配置文件 /etc/rabbitmq/enabled_plugins,写入如下内容:

          [rabbitmq_management, rabbitmq_delayed_message_exchange].

          RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)

          保存文件并退出容器。

          重新启动容器,确认插件自动启用:

          docker restart <容器名称或ID>

          通过 RabbitMQ 管理界面或命令行确认插件状态:

          docker exec <容器名称或ID> rabbitmq-plugins list

          备注:

          • 确保挂载的配置目录(包含 enabled_plugins 文件)权限正确,容器内 RabbitMQ 进程能正常读取。
          • enabled_plugins 文件必须以 Erlang 列表格式并以句点 . 结尾。
          • 也可通过环境变量或启动命令方式启用插件,但写入 enabled_plugins 文件更持久、可靠。

          四、使用延时队列功能(Java 示例)

          延时消息的核心是:通过一个特殊的 Exchange 类型 x-delayed-message + 自定义 header 设置延迟时间。

          创建延时交换机与队列

          /**
               * 声明延迟队列(Queue 本身无 TTL,延迟由消息属性 x-delay 控制)
               * 注意:需要安装 RabbitMQ 的延迟消息插件(rabbitmq_delayed_message_exchange)
               */
              @Bean
              public Queue delayQueue() {
                  return QueueBuilder.durable(getRetry().getQueue()).build();
              }
              /**
               * 声明延迟交换机:必须使用插件类型 x-delayed-message,并声明延迟基础类型为 direct
               */
              @Bean
              public CustomExchange delayExchange() {
                  Map<String, Object> args = new HashMap<>();
                  args.put("x-delayed-type", "direct");
                  return new CustomExchange(
                          getRetry().getExchange(),      // 名称
                          "x-delayed-message",           // 类型(延迟消息插件)
                          true,                          // durable
                          false,                         // auto-delete
                          args                           // 参数:声明 delayed 类型为 direct
                  );
              }
              /**
               * 绑定延迟队列到延迟交换机
               */
              @Bean
              public Binding delayBinding() {
                  return BindingBuilder.bind(delayQueue())
                          .to(delayExchange())
                          .with(getRetry().getRoutingKey())
                          .noargs();
              }

          发送延时消息

          /**
               * 默认延迟 30 分钟(1800000 毫秒)
               */
              public void sendDelayedTask(String message) {
                  this.sendDelayedTask(message, 30 * 60 * 1000);
              }
              /**
               * 发送延迟消息
               */
              public void sendDelayedTask(String message, long delayMilli编程s) {
                  Mess编程客栈ageProperties properties = new MessageProperties();
                  properties.setContentType(MessageProperties.CONTENT_TYPE_jsON);
                  properties.setHeader("x-delay", delayMillis); // 设置延迟时间(单位:毫秒)
                  Message amqpMessage = new Message(message.getBytes(StandardCharsets.UTF_8), properties);
                  rabbitTemplate.send(
                          rabbitMqProperties.getConfig().getRetry().getExchange(),
                          rabbitMqProperties.getConfig().getRetry().getRoutingKey(),
                          amqpMessage
                  );
                  log.info("延迟任务已发送,延迟 {} s 后投递: {}", delayMillis/1000, message);
              }

          五、注意事项

          • 插件启用后,RabbitMQ 会新增一种交换机类型 x-delayevwIvHnhd-message
          • 不兼容 RabbitMQ 自带的 TTL + DLX 延迟实现方案
          • 插件仅支持设置 发送时延迟,不支持基于队列统一延迟
          • 插件不支持所有 AMQP 客户端(需显式支持 header 设置的客户端)

          六、Web 管理界面配置示例

          你也可以通过 RabbitMQ 的 Web 控制台:

          1. 创建交换机类型为 x-delayed-message
          2. 设置参数 x-delayed-type = direct
          3. 绑定队列
          4. 消息发布时设置 header:x-delay: 10000(单位 ms)

          七、延时队列 VS RabbitMQ 原生 TTL 实现

          方案支持粒度精准控制依赖 DLX灵活性复杂度
          原生 TTL + 死信队列队列级 / 消息级一般中等中等
          延时插件 rabbitmq_delayed_message_exchange消息级

          ✅ 八、常见问题 FAQ

          1. 插件安装后没有效果?

          • 检查是否重启 RabbitMQ
          • 检查交换机类型是否正确设置为 x-delayed-message
          • 检查消息是否设置了 header:x-delay

          2. 插件是否与集群兼容?

          支持,只需确保每个节点都安装启用了插件

          总结

          通过安装 rabbitmq_delayed_message_exchange 插件,RabbitMQ 拥有了真正意义上的“消息级”延迟投递功能,无需依赖死信队列和 TTL,极大简化了延时场景的开发与维护工作。

          到此这篇关于Rabbit编程客栈MQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)的文章就介绍到这了,更多相关RabbitMQ 延时队列插件内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

          0

          精彩评论

          暂无评论...
          验证码 换一张
          取 消

          关注公众号