RabbitMQ/AMQP: ¿cola única, múltiples consumidores para el mismo mensaje?
Recién estoy empezando a usar RabbitMQ y AMQP en general.
- tengo una cola de mensajes
- Tengo varios consumidores a los que me gustaría hacer diferentes cosas con el mismo mensaje .
La mayor parte de la documentación de RabbitMQ parece centrarse en el round robin, es decir, donde un único consumidor consume un único mensaje y la carga se distribuye entre cada consumidor. De hecho, este es el comportamiento del que soy testigo.
Un ejemplo: el productor tiene una única cola y envía mensajes cada 2 segundos:
var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;
connection.on('ready', function () {
var sendMessage = function(connection, queue_name, payload) {
var encoded_payload = JSON.stringify(payload);
connection.publish(queue_name, encoded_payload);
}
setInterval( function() {
var test_message = 'TEST '+count
sendMessage(connection, "my_queue_name", test_message)
count += 1;
}, 2000)
})
Y aquí hay un consumidor:
var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
connection.queue("my_queue_name", function(queue){
queue.bind('#');
queue.subscribe(function (message) {
var encoded_payload = unescape(message.data)
var payload = JSON.parse(encoded_payload)
console.log('Recieved a message:')
console.log(payload)
})
})
})
Si inicio al consumidor dos veces, puedo ver que cada consumidor consume mensajes alternativos en un comportamiento de operación por turnos. Por ejemplo, veré los mensajes 1, 3, 5 en un terminal, 2, 4, 6 en el otro .
Mi pregunta es:
¿Puedo hacer que cada consumidor reciba los mismos mensajes? Es decir, ¿ambos consumidores reciben el mensaje 1, 2, 3, 4, 5, 6? ¿Cómo se llama esto en el lenguaje AMQP/RabbitMQ? ¿Cómo se configura normalmente?
¿Se hace esto comúnmente? ¿Debería simplemente hacer que el intercambio enrute el mensaje en dos colas separadas, con un solo consumidor?
¿Puedo hacer que cada consumidor reciba los mismos mensajes? Es decir, ¿ambos consumidores reciben el mensaje 1, 2, 3, 4, 5, 6? ¿Cómo se llama esto en el lenguaje AMQP/RabbitMQ? ¿Cómo se configura normalmente?
No, no si los consumidores están en la misma cola. De la guía de conceptos AMQP de RabbitMQ :
Es importante comprender que, en AMQP 0-9-1, la carga de los mensajes se equilibra entre los consumidores.
Esto parece implicar que el comportamiento por turnos dentro de una cola es un hecho y no configurable. Es decir, se requieren colas separadas para que varios consumidores manejen el mismo ID de mensaje.
¿Se hace esto comúnmente? ¿Debería simplemente hacer que el intercambio enrute el mensaje en dos colas separadas, con un solo consumidor?
No, no lo es, no es posible una sola cola/varios consumidores en los que cada consumidor maneje el mismo ID de mensaje. De hecho, es mejor que el intercambio enrute el mensaje en dos colas separadas.
Como no necesito un enrutamiento demasiado complejo, un intercambio de distribución lo manejará muy bien. No me centré demasiado en los intercambios anteriormente, ya que node-amqp tiene el concepto de un 'intercambio predeterminado' que le permite publicar mensajes en una conexión directamente; sin embargo, la mayoría de los mensajes AMQP se publican en un intercambio específico.
Aquí está mi intercambio fanout, tanto de envío como de recepción:
var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;
connection.on('ready', function () {
connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {
var sendMessage = function(exchange, payload) {
console.log('about to publish')
var encoded_payload = JSON.stringify(payload);
exchange.publish('', encoded_payload, {})
}
// Recieve messages
connection.queue("my_queue_name", function(queue){
console.log('Created queue')
queue.bind(exchange, '');
queue.subscribe(function (message) {
console.log('subscribed to queue')
var encoded_payload = unescape(message.data)
var payload = JSON.parse(encoded_payload)
console.log('Recieved a message:')
console.log(payload)
})
})
setInterval( function() {
var test_message = 'TEST '+count
sendMessage(exchange, test_message)
count += 1;
}, 2000)
})
})
Las últimas dos respuestas son casi correctas: tengo toneladas de aplicaciones que generan mensajes que deben llegar a diferentes consumidores, por lo que el proceso es muy simple.
Si desea que varios consumidores reciban el mismo mensaje, realice el siguiente procedimiento.
Cree varias colas, una para cada aplicación que recibirá el mensaje; en las propiedades de cada cola, "vincule" una etiqueta de enrutamiento con el intercambio amq.direct. Cambie su aplicación de publicación para enviar a amq.direct y use la etiqueta de enrutamiento (no una cola). AMQP luego copiará el mensaje en cada cola con el mismo enlace. Funciona de maravilla :)
Ejemplo: digamos que tengo una cadena JSON que genero, la publico en el intercambio "amq.direct" usando la etiqueta de enrutamiento "new-sales-order", tengo una cola para mi aplicación order_printer que imprime el pedido, tengo una cola para mi sistema de facturación que enviará una copia del pedido y facturará al cliente y tengo un sistema de archivo web donde archivo pedidos por razones históricas/de cumplimiento y tengo una interfaz web para el cliente donde se rastrean los pedidos a medida que llega otra información sobre una orden.
Entonces, mis colas son: order_printer, order_billing, order_archive y order_tracking. Todas tienen la etiqueta vinculante "new-sales-order" vinculada, las 4 obtendrán los datos JSON.
Esta es una forma ideal de enviar datos sin que la aplicación de publicación sepa o se preocupe por las aplicaciones receptoras.