开发者

How do I make this AMQP single-message subscriber stable?

开发者 https://www.devze.com 2023-04-10 14:53 出处:网络
As a part of a larger application, I\'ve got to setup some basic rate-limiting of outgoing requests across multiple workers. The idea behind this is rather simple: by publishing a \"token\"-message wi

As a part of a larger application, I've got to setup some basic rate-limiting of outgoing requests across multiple workers. The idea behind this is rather simple: by publishing a "token"-message with the "immediate" flag, this message is automatically discarded if nobody is waiting for it. By having workers only subscribing to the token-queue just before sending an outgoing request, tokens are not "saved up", and each token is available for use only once. I thought this rather elegant.

Unfortunately, adding and removing subscribers is not entirely stable. I've setup a full example over at https://gist.github.com/1263921/ebdafa067ca09514183d3fc5d6e43c7094fc2733. The code is below:

require 'bundler'
Bundler.setup

require 'amqp'

puts "single-message consumer listening to rapid producer"

QUEUE_NAME   = 'test.rapid-queue-unsubscription'
PRODUCE_RATE = 1.0/10
CONSUME_RATE = 1.0/9

def start_producer
  exchange = AMQP::Exchange.new(AMQP::Channel.new, :direct, "") 

  n = 0
  EM::PeriodicTimer.new(PRODUCE_RATE) do
    message = "msg #{n}"
    exchange.publish(message,
                     :immediate   => true, # IMPORTANT, messages are dropped if nobody listening now
                     :routing_key => QUEUE_NAME)
    puts "> PUT #{message}"
    n += 1
  end
end

def start_consumer

  EM::PeriodicTimer.new(CONSUME_RATE) do

    started = Time.now
    AMQP::Channel.new do |channel_consumer|
      channel_consumer.prefetch(1)
      tick_queue = channel_consumer.queue(QUEUE_NAME)

      consumer = AMQP::Consumer.new(channel_consumer, tick_queue, nil, exclusive = false, no_ack = true)
      consumer.on_delivery do |_, messag开发者_如何学编程e|

        took = Time.now - started
        puts "< GET #{message} [waited #{took.round(2)}s][#{(1.0/took).round(2)} reqs/sec]"

        consumer.cancel
        channel_consumer.close
      end
      consumer.consume
    end
  end
end

EM.run do
  EM.set_quantum(50)

  start_producer
  start_consumer
end

Running that example for a few minutes ends up dying with one of two errors:

  1. amq-client-0.8.3/lib/amq/client/async/consumer.rb:246:in `block in <class:Consumer>': undefined method `handle_delivery' for nil:NilClass (NoMethodError)

  2. amq-client-0.8.3/lib/amq/client/async/adapter.rb:244:in `send_frame': Trying to send frame through a closed connection. Frame is #<AMQ::Protocol::MethodFrame:0x007fa6d29a35f0 @payload="\x00<\x00(\x00\x00\x00\x1Ftest.rapid-queue-unsubscription\x02", @channel=1> (AMQ::Client::ConnectionClosedError)

The first error is due to the subscriber having been removed, but a message is still delivered to it, and the amq-client library never expects this to happen. The second error is from the publisher, which all of a sudden has a closed connection.

What am I missing to make this consistently work as expected?

Versions used:

  • OS X 10.7.1
  • ruby 1.9.2p312 (2011-08-11 revision 32926) [x86_64-darwin11.1.0]
  • RabbitMQ 2.6.1

Gemfile:

source 'http://rubygems.org'

gem 'amqp'

Gemfile.lock:

GEM
  remote: http://rubygems.org/
  specs:
    amq-client (0.8.3)
      amq-protocol (>= 0.8.0)
      eventmachine
    amq-protocol (0.8.1)
    amqp (0.8.0)
      amq-client (~> 0.8.3)
      amq-protocol (~> 0.8.0)
      eventmachine
    eventmachine (0.12.10)

PLATFORMS
  ruby

DEPENDENCIES
  amqp
  eventmachine


From the #rabbitmq channel (amqp author antares_): just use a single channel, and it'll work fine. Slightly changed, but stable version:

require 'bundler'
Bundler.setup

require 'amqp'

puts "single-message consumer listening to rapid producer"

QUEUE_NAME   = 'test.rapid-queue-unsubscription'
PRODUCE_RATE = 1.0/10
CONSUME_RATE = 1.0/9

def start_producer channel
  exchange = AMQP::Exchange.new(channel, :direct, "") 

  n = 0
  EM::PeriodicTimer.new(PRODUCE_RATE) do
    message = "msg #{n}"
    exchange.publish(message,
                     :immediate   => true, # IMPORTANT, messages are dropped if nobody listening now
                     :routing_key => QUEUE_NAME)
    puts "> PUT #{message}"
    n += 1
  end
end

def start_consumer channel
  EM::PeriodicTimer.new(CONSUME_RATE) do

    started = Time.now
    tick_queue = channel.queue(QUEUE_NAME)

    consumer = AMQP::Consumer.new(channel, tick_queue, nil, exclusive = false, no_ack = true)
    consumer.on_delivery do |_, message|

      took = Time.now - started
      puts "< GET #{message} [waited #{took.round(2)}s][#{(1.0/took).round(2)} reqs/sec]"

      consumer.cancel do
        puts "< GET #{message} (CANCEL DONE)"
      end
    end
    consumer.consume
  end
end

EM.run do
  EM.set_quantum(50)

  AMQP::Channel.new do |channel|
    start_producer channel
  end

  AMQP::Channel.new do |channel|
    channel.prefetch(1)
    start_consumer channel
  end

end
0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号