开发者

Producer work consistently hashing to consumers via a message queue?

开发者 https://www.devze.com 2023-01-14 06:17 出处:网络
I have a producer that I want to distribute work consistently across consumers by consistent hashing. For example, with consumer nodes X and Y, tasks A, B, C should always go to consumer X, and D, E,

I have a producer that I want to distribute work consistently across consumers by consistent hashing. For example, with consumer nodes X and Y, tasks A, B, C should always go to consumer X, and D, E, F to consumer Y. But that may shift a little if Z joins the pool of consumers.

I didn't want to deal with writing my own logic to connect to the consumer nodes, and especially not with managing nodes joining and leaving the pool, so I've gone down the path of using RabbitMQ, and an exclusive queue per consumer node.

One problem I'm running into is listing these queues, since the producer needs to know all the available queues before work is distributed. AMQP doesn't even support listing queues, which makes me uncertain of my whole approach. RabbitM开发者_开发问答Q and Alice (brokenly at the moment) add that functionality though: Is there an API for listing queues and exchanges on RabbitMQ?

Is this a wise use of Rabbit? Should I be using a message queue at all? Is there a better design so the queue can consistently divide my work among consumers, instead of me needing to do it?


What you describe is do-able in RabbitMQ.

Your setup would be something like:

  • a producer publishes the message to a topic exchange; let's name it consistent_divider;
  • when a consumer, joins the pool, it connects to the broker and creates an exclusive queue with its name, but doesn't bind it to anything
  • the producer periodically polls the broker (maybe using rabbitmqctl list_consumers) to check if the consumers have changed; if they have, it removes all of the existing bindings and rebinds the queues as needed;
  • when the producer publishes, messages are assigned a routing key that matches their task type.

So, if you have 6 task types: A, B, C, D, E, F, and only two consumers C1 and C2, your bindings would look like: C1 bound 3 times to consistent_divider with routing keys A, B and C; C2 bound 3 times to c_d with routing keys D, E and F.

When C3 joins the pool, the producer sees this and rebinds the queues accordingly.

When the producer publishes, it sends out the messages with routing_keys A, B, C, D, E and/or F, and the messages will get routed to the correct queues.

There would be two potential problems with this:

  1. There's a slight lag between when the consumer joins the pool and messages get routed to it; also, if there are messages already in the queues, it's possible for a consumer to get messages meant for another consumer (e.g. C3 joins, the producer rebinds, but C2 still gets some E and F messages because they were already in its queue),
  2. If a consumer dies for whatever reason, the messages in its queue (and en route to its queue) will be lost; this can be solved by republishing and dead-lettering the messages, respectively.

To answer your last question, you probably want to use queuing and RabbitMQ is a great choice, but your requirements (more precisely the `divide the work consistently' bit) don't quite fit AMQP perfectly.


You could use the official consistent-hashing plugin for rabbitmq as answered here

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号