How to use RabbitMQ

Concepts

Exchanges

Messages are not published directly to a queue; instead, the producer sends messages to an exchange. An exchange is responsible for routing the messages to different queues with the help of bindings and routing keys.

A binding is a link between a queue and an exchange.

The routing key is a message attribute the exchange looks at when deciding how to route the message to queues (depending on exchange type).

Exchanges, connections, and queues can be configured with parameters such as durable, temporary, and auto delete upon creation. Durable exchanges survive server restarts and last until they are explicitly deleted. Temporary exchanges exist until RabbitMQ is shut down. Auto-deleted exchanges are removed once the last bound object is unbound from the exchange.

In RabbitMQ, there are four different types of exchanges that route the message differently using different parameters and bindings setups. Clients can create their own exchanges or use the predefined default exchanges which are created when the server starts for the first time.

Types of Exchanges

  • Direct: The message is routed to the queues whose binding key exactly matches the routing key of the message. For example, if the queue is bound to the exchange with the binding key pdfprocess, a message published to the exchange with a routing key pdfprocess is routed to that queue.
  • Fanout: A fanout exchange routes messages to all of the queues bound to it.
  • Topic: The topic exchange does a wildcard match between the routing key and the routing pattern specified in the binding.
  • Headers: Headers exchanges use the message header attributes for routing.

Publish and subscribe messages

RabbitMQ uses a protocol called AMQP by default. To be able to communicate with RabbitMQ you need a library that understands the same protocol as RabbitMQ. Download the client library for the programming language that you intend to use for your applications.

Steps to follow when setting up a connection and publishing a message/consuming a message:

  1. Set up/create a connection object. The username, password, connection URL, port, etc., will need to be specified. A TCP connection will be set up between the application and RabbitMQ when the start method is called.
  2. Create a channel in the TCP connection, then the connection interface can be used to open a channel through which to send and receive messages.
  3. Declare/create a queue. Declaring a queue will cause it to be created if it does not already exist. All queues need to be declared before they can be used.
  4. Set up exchanges and bind a queue to an exchange in subscriber/consumer. All exchanges must be declared before they can be used. An exchange accepts messages from a producer application and routes them to message queues. For messages to be routed to queues, queues must be bound to an exchange.
  5. In publisher: Publish a message to an exchange
    In subscriber/consumer: Consume a message from a queue.
  6. Close the channel and the connection.

Python publisher with pika

Install pika client

pip install pika

Sample code:

def rabbit_publish():
print("rabbit publish")

params = pika.URLParameters('amqp://guest:guest@localhost:5672')
params.socket_timeout = 5
connection = pika.BlockingConnection(params)

channel = connection.channel() # start a channel
channel.queue_declare(queue='queue1')

data = {
"id": int(datetime.now().timestamp()) % 100,
}

# send a message
channel.basic_publish(exchange='', routing_key='queue1', body=json.dumps(data))
print("[x] Message sent")
connection.close()

Python subscriber with pika

def rabbit_subscribe():
def process(msg):
print("Processing")
data = json.loads(msg)
print(" [x] Received:", data["id"])

time.sleep(3)
print(f"Processing finished: #{data['id']}");
return True

def
callback(ch: BlockingChannel,
method: pika.spec.Basic.Deliver,
properties: pika.spec.BasicProperties,
body: bytes):
if process(body):
ch.basic_ack(method.delivery_tag, multiple=False)

print("rabbit subscribe")
params = pika.URLParameters('amqp://guest:guest@localhost:5672')
params.socket_timeout = 5
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.basic_consume('queue1', callback, auto_ack=False)

# start consuming (blocks)
channel.start_consuming()
connection.close()
  • auto_ack=False: need to manually acknowledge the message, using channel.basic_ack for example.
  • multiple=False: When the multiple field is set to true, RabbitMQ will acknowledge all outstanding delivery tags up to and including the tag specified in the acknowledgement. Like everything else related to acknowledgements, this is scoped per channel. For example, given that there are delivery tags 5, 6, 7, and 8 unacknowledged on channel Ch, when an acknowledgement frame arrives on that channel with delivery_tag set to 8 and multiple set to true, all tags from 5 to 8 will be acknowledged. If multiple was set to false, deliveries 5, 6, and 7 would still be unacknowledged.

Types of Exchanges

Direct Exchange

A message goes to the queue(s) with the binding key that exactly matches the routing key of the message.

If the message routing key does not match any binding key, the message is discarded.

The default exchange AMQP brokers must provide for the direct exchange is amq.direct.

Default exchange

The default exchange is a pre-declared direct exchange with no name, usually referred by an empty string. When you use default exchange, your message is delivered to the queue with a name equal to the routing key of the message. Every queue is automatically bound to the default exchange with a routing key which is the same as the queue name.

Fanout Exchange

A fanout exchange copies and routes a received message to all queues that are bound to it regardless of routing keys or pattern matching as with direct and topic exchanges. The keys provided will simply be ignored.

Fanout exchanges can be useful when the same message needs to be sent to one or more queues with consumers who may process the same message in different ways.

The default exchange AMQP brokers must provide for the topic exchange is amq.fanout.

References

--

--

--

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

What is MongoDB: Explaining the Rise of NoSQL

Scala Days København

CS373 Software Engineering Blog#11

2 Years of Self-Learning Into a Developer

Comparing Flutter project vs Flutter module on Codemagic

Shift Left, Shift Right, Stuck In The Middle With You?

Golang Developer

Golang Developer

Journey to GSoC 2019

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Chi Thuc Nguyen

Chi Thuc Nguyen

More from Medium

Mike Anzivino — Things to know before moving to Fort Myers, FL!

CS373 Spring 2022: Tanner Dreiling, Week 10

#racialdiscrimination This article is part of a collection on the events of Jan.

Picking the Right Stocks and Good Timing Aren’t the Keys to Investing