How to use RabbitMQ

Chi Thuc Nguyen
5 min readMay 4, 2020

--

Concepts

Exchanges

Messages are not published directly to a queue; instead, the producer sends messages to an exchange. An exchange is responsible for routing the messages to different queues with the help of bindings and routing keys.

A binding is a link between a queue and an exchange.

The routing key is a message attribute the exchange looks at when deciding how to route the message to queues (depending on exchange type).

Exchanges, connections, and queues can be configured with parameters such as durable, temporary, and auto delete upon creation. Durable exchanges survive server restarts and last until they are explicitly deleted. Temporary exchanges exist until RabbitMQ is shut down. Auto-deleted exchanges are removed once the last bound object is unbound from the exchange.

In RabbitMQ, there are four different types of exchanges that route the message differently using different parameters and bindings setups. Clients can create their own exchanges or use the predefined default exchanges which are created when the server starts for the first time.

Types of Exchanges

  • Direct: The message is routed to the queues whose binding key exactly matches the routing key of the message. For example, if the queue is bound to the exchange with the binding key pdfprocess, a message published to the exchange with a routing key pdfprocess is routed to that queue.
  • Fanout: A fanout exchange routes messages to all of the queues bound to it.
  • Topic: The topic exchange does a wildcard match between the routing key and the routing pattern specified in the binding.
  • Headers: Headers exchanges use the message header attributes for routing.

Publish and subscribe messages

RabbitMQ uses a protocol called AMQP by default. To be able to communicate with RabbitMQ you need a library that understands the same protocol as RabbitMQ. Download the client library for the programming language that you intend to use for your applications.

Steps to follow when setting up a connection and publishing a message/consuming a message:

  1. Set up/create a connection object. The username, password, connection URL, port, etc., will need to be specified. A TCP connection will be set up between the application and RabbitMQ when the start method is called.
  2. Create a channel in the TCP connection, then the connection interface can be used to open a channel through which to send and receive messages.
  3. Declare/create a queue. Declaring a queue will cause it to be created if it does not already exist. All queues need to be declared before they can be used.
  4. Set up exchanges and bind a queue to an exchange in subscriber/consumer. All exchanges must be declared before they can be used. An exchange accepts messages from a producer application and routes them to message queues. For messages to be routed to queues, queues must be bound to an exchange.
  5. In publisher: Publish a message to an exchange
    In subscriber/consumer: Consume a message from a queue.
  6. Close the channel and the connection.

Python publisher with pika

Install pika client

pip install pika

Sample code:

def rabbit_publish():
print("rabbit publish")

params = pika.URLParameters('amqp://guest:guest@localhost:5672')
params.socket_timeout = 5
connection = pika.BlockingConnection(params)

channel = connection.channel() # start a channel
channel.queue_declare(queue='queue1')

data = {
"id": int(datetime.now().timestamp()) % 100,
}

# send a message
channel.basic_publish(exchange='', routing_key='queue1', body=json.dumps(data))
print("[x] Message sent")
connection.close()

Python subscriber with pika

def rabbit_subscribe():
def process(msg):
print("Processing")
data = json.loads(msg)
print(" [x] Received:", data["id"])

time.sleep(3)
print(f"Processing finished: #{data['id']}");
return True

def
callback(ch: BlockingChannel,
method: pika.spec.Basic.Deliver,
properties: pika.spec.BasicProperties,
body: bytes):
if process(body):
ch.basic_ack(method.delivery_tag, multiple=False)

print("rabbit subscribe")
params = pika.URLParameters('amqp://guest:guest@localhost:5672')
params.socket_timeout = 5
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.basic_consume('queue1', callback, auto_ack=False)

# start consuming (blocks)
channel.start_consuming()
connection.close()
  • auto_ack=False: need to manually acknowledge the message, using channel.basic_ack for example.
  • multiple=False: When the multiple field is set to true, RabbitMQ will acknowledge all outstanding delivery tags up to and including the tag specified in the acknowledgement. Like everything else related to acknowledgements, this is scoped per channel. For example, given that there are delivery tags 5, 6, 7, and 8 unacknowledged on channel Ch, when an acknowledgement frame arrives on that channel with delivery_tag set to 8 and multiple set to true, all tags from 5 to 8 will be acknowledged. If multiple was set to false, deliveries 5, 6, and 7 would still be unacknowledged.

Types of Exchanges

Direct Exchange

A message goes to the queue(s) with the binding key that exactly matches the routing key of the message.

If the message routing key does not match any binding key, the message is discarded.

The default exchange AMQP brokers must provide for the direct exchange is amq.direct.

Default exchange

The default exchange is a pre-declared direct exchange with no name, usually referred by an empty string. When you use default exchange, your message is delivered to the queue with a name equal to the routing key of the message. Every queue is automatically bound to the default exchange with a routing key which is the same as the queue name.

Fanout Exchange

A fanout exchange copies and routes a received message to all queues that are bound to it regardless of routing keys or pattern matching as with direct and topic exchanges. The keys provided will simply be ignored.

Fanout exchanges can be useful when the same message needs to be sent to one or more queues with consumers who may process the same message in different ways.

The default exchange AMQP brokers must provide for the topic exchange is amq.fanout.

References

--

--

No responses yet