Message Queues with RabbitMq

Dipto Chakrabarty
7 min readMay 20, 2024

--

In today’s fast-paced digital landscape, efficient communication between different parts of an application is crucial. Whether it’s microservices communicating with each other, a web server sending tasks to a background job processor, or a system needing reliable message delivery, a robust message broker is essential. This is where RabbitMQ comes into play.

RabbitMQ is a highly reliable, scalable, and easy-to-deploy message broker that supports multiple messaging protocols. It facilitates the efficient exchange of information between various applications, ensuring smooth and effective communication.

In this blog, we’ll dive into the world of RabbitMQ, demonstrating how to set up a RabbitMQ server using Docker. Docker simplifies the deployment process by providing a consistent environment, making it easier to get RabbitMQ up and running without the complexities of manual installation.

Once our RabbitMQ server is up, we’ll explore how to interact with it using a Python client. We’ll cover the basics of connecting to the RabbitMQ server, sending messages, and consuming messages.

Setting up a RabbitMQ server using docker

The code can be found here -> SystemMonitoring Devops project.

To have a rabbitmq server we are going to use docker particularly docker compose which is going to start a rabbitmq server for us.

The docker compose file looks like this.

version: "3"
services:
rabbitmq:
image: rabbitmq:3-management-alpine
container_name: rabbitmq
restart: always
ports:
- '5672:5672'
- '8081:15692'
- '15672:15672'
volumes:
- ./rabbitmq_data:/var/lib/rabbitmq
networks:
- microservice
volumes:
rabbitmq_data:
networks:
microservice:

We use an alpine based image and expose the ports 5672, 15692 and 15672. A volume is used so that rabbitmq can store the data.

Run the following command to start the container.

docker-compose up

Once container starts head over to localhost:15672 and use the default password guest , guest to login to the system UI.

Pre concepts to know

Rabbitmq makes use of queues and routing keys , the significance and difference between them are explained below.

Understanding the difference between a queue and a routing key is fundamental to effectively using RabbitMQ. Here’s a detailed explanation of each term and their distinct roles:

Queue

Definition

A queue in RabbitMQ is a buffer that stores messages. It is where messages are kept until they are processed by a consumer. Queues in RabbitMQ are durable and can survive broker restarts, ensuring messages are not lost.

Characteristics

  • Storage: Holds messages until a consumer retrieves them.
  • Consumers: Can have one or more consumers retrieving messages from it.
  • Durability: Can be configured to be durable (persistent across broker restarts) or transient (lost if the broker restarts).
  • Order: Typically follows a First-In-First-Out (FIFO) order unless configured otherwise.

Role in Messaging

  • Endpoint: Queues are the endpoints where messages are delivered and stored.
  • Message Handling: Messages wait in the queue until they are consumed by a consumer.

Routing Key

Definition

A routing key is a string that the exchange uses to determine how to route a message to the appropriate queue(s). It is used in conjunction with exchanges and bindings to direct messages to the correct destination.

Characteristics

  • String Identifier: A simple string used to route messages.
  • Flexible: Can use patterns for more complex routing logic, especially with topic exchanges.
  • Binding Keys: Queues bind to exchanges with binding keys, which are compared to routing keys to determine message routing.

Role in Messaging

  • Message Direction: Helps the exchange determine which queue(s) a message should be delivered to.
  • Routing Logic: Different exchanges use routing keys in various ways to route messages (direct, topic, headers, or fanout).

Key Differences

  1. Purpose:
  • Queue: A storage and delivery mechanism for messages.
  • Routing Key: A routing mechanism used by the exchange to direct messages to queues.

2. Functionality:

  • Queue: Holds and manages the delivery of messages to consumers.
  • Routing Key: Determines how and where messages are routed within RabbitMQ.

3. Interaction:

  • Queue: Interacts with consumers directly, providing them with messages.
  • Routing Key: Interacts with exchanges, influencing the routing of messages to queues.

How They Work Together

  1. Producer Sends Message: The producer sends a message to an exchange and specifies a routing key.
  2. Exchange Routes Message: The exchange uses the routing key to determine which queue(s) should receive the message.
  3. Message Stored in Queue: The message is routed to the appropriate queue(s) where it is stored until a consumer retrieves it.

In short

  • Queue: A storage mechanism for messages, serving as the endpoint where messages are delivered and held until consumed.
  • Routing Key: A string used to guide the exchange in routing messages to the appropriate queue(s).

You can effectively design and implement RabbitMQ-based messaging systems that suit your application’s needs by understanding these details.

Pushing messages to the queue

We are going to build a class which will handle pushing messages to a rabbitmq queue.

To perform this python pika must be installed.

pip3 install pika

Now to develop the class to push messages to a queue.

from datetime import datetime
from datetime import timezone
from typing import Dict
import pika
import json
from pika.spec import Exchange

class RabbitMqConfig:
def __init__(self,queue:str ="sys" ,host:str ="localhost",routing_key:str ="sys") -> None:
self.queue = queue
self.host = host
self.routing_key = routing_key

We first write the RabbitMqConfig class ,class holds the configuration details for connecting to a RabbitMQ server.

Important Components

  • queue (str): The name of the queue to which messages will be sent or from which messages will be received.
  • host (str): The hostname of the RabbitMQ server.
  • routing_key (str): The routing key used for routing the messages.
class ServerMq:
def __init__(self,config: RabbitMqConfig) -> None:
self.config = config
self.connection = pika.BlockingConnection(pika.ConnectionParameters(self.config.host))
self.channel = self.connection.channel()
self.channel.queue_declare(queue=self.config.queue)

def publish(self,objectbody):
self.channel.basic_publish(exchange='',
routing_key = self.config.routing_key,
body = json.dumps(objectbody))
print(f"Message Sent to RabbitMq with routing key {self.config.routing_key}")
#self.connection.close()
#print("Connection Closed")

def __del__(self):
self.connection.close()

This class is responsible for sending (publishing) messages to a RabbitMQ queue.

Important Components

  • config (RabbitMqConfig): The configuration object that provides details about the queue, host, and routing key.
  • connection (pika.BlockingConnection): Establishes a connection to the RabbitMQ server.
  • channel (pika.adapters.blocking_connection.BlockingChannel): A channel through which you can interact with the RabbitMQ server.

The class is initialised with the necessary details and the main method is publish which is responsible for forwarding the messages as a json body to the queue making use of the routing key and the queue.

The del method ensures that the connection is closed once the script is done.

To demonstrate sending messages we run a script which takes up server details like CPU , RAM etc and pushes them to a queue.

Run the server.py code

python3 server.py

On running the code the terminal prints the following output.

Head over to rabbitmq queue section and you will find your queue with messages.

Click on the desired queue to view messages.

The messages can be viewed in the get messages section.

Thus the messages are stored in the queue and now it is time to retrieve them one by one.

Receiving messages from the queue

We build another class which takes care of receiving messages already present in a queue.

class ReceiveMq:
def __init__(self,config: RabbitMqConfig) -> None:
self.config = config
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host = self.config.host
))
self.channel = self.connection.channel()
self.channel.queue_declare(queue=self.config.queue)

def consume(self):
self.channel.basic_consume(queue=self.config.queue,
on_message_callback=self.callback,
auto_ack = True)
print("Waiting for messages\n")
self.channel.start_consuming()

@staticmethod
def callback(ch,method,properties,body):
objectbody = json.loads(body)
#dt = datetime.now(timezone.utc)
#utc_time = dt.replace(tzinfo=timezone.utc)
dbdata = [
{
"measurement": "status",
"time": datetime.now().isoformat(),
"fields": objectbody,
"tags": {
"monitor": "system"
}
}
]
#print(datetime.now(),utc_time.timestamp())
print(datetime.now().isoformat())
print(dbdata)
print("Data Received from Queue")

This class is responsible for receiving (consuming) messages from a RabbitMQ queue.

Important Components

  • config (RabbitMqConfig): The configuration object that provides details about the queue, host, and routing key.
  • connection (pika.BlockingConnection): Establishes a connection to the RabbitMQ server.
  • channel (pika.adapters.blocking_connection.BlockingChannel): A channel through which you can interact with the RabbitMQ server.

The important classes under this are the consume and callback.

  • consume(self): Starts consuming messages from the queue. It waits for messages and invokes the callback method when a message is received.
  • callback(ch, method, properties, body): A static method that processes the received message. It converts the message from JSON format, prepares data to be written to a database, and prints the data.

We make use of a client to receive messages from a queue

python3 client.py

Once run it will print the messages in the terminal.

Head over to the message queue and it is empty now.

This can be modified to send data over to a database a monitoring service and much more if needed.

If you like my post please follow me for more such content and your one or two claps will go a long way.

--

--

Dipto Chakrabarty
Dipto Chakrabarty

Written by Dipto Chakrabarty

MS @CMU , Site Reliability Engineer , I talk about Cloud Distributed Systems. Tech Doctor making sure to diagnose and make your apps run smoothly in production.

No responses yet