RabbitMQ is an open source message broker that implements a variety of message protocols. RabbitMQ was originally developed to implement AMQP (Advanced Message Queuing Protocol), a standard and cross-langauge protocol for messaging middleware.
RabbitMQ is a message broker. RabbitMQ receives and sends messages. Think of it like putting mail into a postbox and knowing that mail will get delivered. Instead of mail, RabbitMQ stores and sends messages.
Usually the producer, consumer, and broker are not on the same host.
The core idea in the messaging model in RabbitMQ is that a producer never sends any messages directly to a queue. Usually, the producer doesn’t even know if a message will be delivered to any queue at all. Instead, the producer sends messages to an exchange. When that exchange receives these messages, the exchange decides what to do with the message; should it be appended to a particular queue? should it be appended to multiple queues? should it get discarded? The rules for an exchange are defined by the exchange type.
producer is a program that sends messages. exchange is the place that a producer sends messages to. consumer is a program that waits and receives messages. queue is the name of the post box, basically a place to store your messages. Its a large message buffer that is limited to your machine’s memory and disk limits. work queue (aka task queue) is to avoid doing a resource-intensive task immediately (and do it later) ack(nowledgement an ack is a message sent back by the consumer to tell RabbitMQ that message has been received, processed and RabbitMQ is free to delete it. If no ack is received, that message will be re-queued. virtual host (aka vhost) is a namespace for objects like exchanges, queues, and bindings
The rules for how we send messages to consumers are determined by the exchange type. A few of the different
exchanges include: direct
, topic
, headers
, and fanout
.
word1.word2.word3
.
Queues binding to a topic exchange supply a matching pattern for the server to use when routing the message (e.g. *
to match a word in a specific position of the routing key or #
to match zero or more words)The connection between an exchange and a queue is called the binding. The routing key is a message attribute. The exchange might look at this key when deciding how to route the messages to queues.
RabbitMQ is a general purpose message broker. So what kind of protocols are there? Some of these include:
Here we have the simplest program that can send and receives messages from a named queue.
import pika
credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters(host="localhost",
credentials=credentials,
heartbeat_interval=600)
def test_simple_producer_consumer():
print("Creating Connection, testing simple producer and consumer")
# Create Connection
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue='hello-queue')
# Publish to Queue
print("About to publish to queue")
channel.basic_publish(exchange='', routing_key='hello-queue', body='test') # publish to default exchange
print "After Channel Publish"
# Consume from Queue and run callback
channel.basic_consume(callback, queue='hello-queue', no_ack=True) # acknowledge
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
connection.close()
def callback(ch, method, properties, body):
""" Callback from Consumer """
print(" [x] Received %r" % body)
if __name__ == '__main__':
test_simple_producer_consumer()
Here we have a work queue that is used to distribute time-consuming tasks among multiple workers
RabbitMQ can be used either synchronous or asynchronous.
rabbitmqctl is a command line tool for managing a RabbitMQ broker. It performs all actions by connecting to one of the broker’s nodes.
Commands include:
rabbitmqctl -l
to list commandsrabbitmqctl status
to show memoryrabbitmqctl stop_app
and rabbitmqctl start_app
rabbitmqctl reset
and rabbitmqctl force_reset
rabbitmqctl rotate_logs
rabbitmqctl join_cluster
join a cluster as either a disc (default) or ram onlyrabbitmqctl purge_queue <queue>
to remove all messages in itrabbitmqctl list_queues
and rabbitmqctl list_exchanges
rabbitmqadmin is a command line tool that ships with the the management plugin, allowing you to do some of the same actions as the web-based UI, making things a bit more helpful for automation tasks. If you want to invoke rabbitmqadmin from your own program, then consider using an HTTP API client library instead.
rabbitmq-plugins is how you can enable plugins in RabbitMQ.
rabbitmq-plugins list # see what plugins are enabled
rabbitmq-plugins enable <plugin-name> # enable a plugin
rabbitmq-plugins disable <plugin-name> # disable a plugin
Plugins include:
RabbitMQ has a firehose feature that the administrator can enable on a per-node, per-vhost basis, basically making it so that an exchange’s publish and delivery-notifications can be CC’d.
1.) Decide which node (default is ‘rabbit@(hostname)’ and vhost (default is “/”) you want to enable firehose for.
2.) In your vhost, when creating your queues, bind them to the topic exchange amq.rabbitmq.trace
and start consuming
3.) Run rabbitmqctl trace_on
1.) Run rabbitmqctl trace_off
2.) Clean up the queues used by the firehose
Firehose publishes messages to the topic exchange amq.rabbitmq.trace
with the
routing key publish.exchangename
for messages entering the broker or to
the routing key deliver.queuename
for messages leaving the broker.
So with a task queue, we can distribute work across threads or machines. You input a task (a unit of work) and then add it to the task queue. There’s worker processes that come check if new work to perform.
Celery communicates via messages. We add a message to the queue, then the broker delivers that message to a worker. Celery uses a message transport (e.g. RabbitMQ or Redis) to send and receive messages.