RabbitMQ is an open-source message broker that implements the Advanced Message Queuing Protocol (AMQP). It acts as an intermediary for messages, enabling reliable communication between different parts of distributed systems. RabbitMQ is widely used in microservices architectures, cloud-native applications, and any system that requires asynchronous, reliable message processing.
Routes messages to queues based on exact routing key matches.
# Producer sends message with routing key "user.created"
channel.basic_publish(
exchange='user_events',
routing_key='user.created',
body='{"user_id": 123, "email": "[email protected]"}'
)
# Consumer binds queue with same routing key
channel.queue_bind(
exchange='user_events',
queue='user_notifications',
routing_key='user.created'
)
Routes messages using wildcard patterns in routing keys.
# Routing key pattern: "user.*.created"
# Matches: "user.email.created", "user.profile.created"
# Doesn't match: "user.created", "user.email.profile.created"
channel.queue_bind(
exchange='user_events',
queue='email_notifications',
routing_key='user.*.created'
)
Broadcasts messages to all bound queues, ignoring routing keys.
# Producer sends to fanout exchange
channel.basic_publish(
exchange='system_events',
routing_key='', # Ignored for fanout
body='{"event": "system.maintenance", "time": "2024-01-01"}'
)
# All bound queues receive the message
channel.queue_bind(
exchange='system_events',
queue='logging_queue'
)
channel.queue_bind(
exchange='system_events',
queue='notification_queue'
)
Routes messages based on message header values instead of routing keys.
# Producer sets headers
properties = pika.BasicProperties(
headers={'type': 'email', 'priority': 'high'}
)
channel.basic_publish(
exchange='message_router',
routing_key='',
body='Message content',
properties=properties
)
# Consumer binds with header matching
channel.queue_bind(
exchange='message_router',
queue='high_priority_emails',
arguments={'type': 'email', 'priority': 'high'}
)
RabbitMQ uses acknowledgments to ensure reliable message delivery:
# Consumer acknowledges message processing
def callback(ch, method, properties, body):
try:
# Process the message
process_message(body)
# Acknowledge successful processing
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
# Reject message and requeue
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
# Enable manual acknowledgment
channel.basic_consume(
queue='my_queue',
on_message_callback=callback,
auto_ack=False
)
Survive broker restarts but consume more resources.
channel.queue_declare(
queue='durable_queue',
durable=True
)
Only accessible by the connection that created them.
channel.queue_declare(
queue='exclusive_queue',
exclusive=True
)
Automatically deleted when no consumers are connected.
channel.queue_declare(
queue='auto_delete_queue',
auto_delete=True
)
Handles messages that cannot be processed normally:
# Declare dead letter exchange
channel.exchange_declare(
exchange='dlx',
exchange_type='direct'
)
# Declare queue with dead letter configuration
channel.queue_declare(
queue='main_queue',
arguments={
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'failed'
}
)
# Declare dead letter queue
channel.queue_declare(queue='dlq')
channel.queue_bind(
exchange='dlx',
queue='dlq',
routing_key='failed'
)
Command | Description | Usage |
---|---|---|
rabbitmqctl start |
Starts the RabbitMQ service | sudo rabbitmqctl start |
rabbitmqctl stop |
Stops the RabbitMQ service | sudo rabbitmqctl stop |
rabbitmqctl status |
Shows the status of RabbitMQ | sudo rabbitmqctl status |
rabbitmqctl list_queues |
Lists all queues with their properties | sudo rabbitmqctl list_queues name messages consumers |
rabbitmqctl list_exchanges |
Lists all exchanges | sudo rabbitmqctl list_exchanges name type durable |
rabbitmqctl list_bindings |
Lists all bindings between exchanges and queues | sudo rabbitmqctl list_bindings |
rabbitmqctl list_connections |
Lists all client connections | sudo rabbitmqctl list_connections |
rabbitmqctl list_channels |
Lists all channels | sudo rabbitmqctl list_channels |
rabbitmqctl purge_queue |
Removes all messages from a queue | sudo rabbitmqctl purge_queue queue_name |
rabbitmqctl delete_queue |
Deletes a queue | sudo rabbitmqctl delete_queue queue_name |
rabbitmqctl delete_exchange |
Deletes an exchange | sudo rabbitmqctl delete_exchange exchange_name |
import pika
import json
# Establish connection
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
# Declare exchange
channel.exchange_declare(
exchange='user_events',
exchange_type='topic'
)
# Declare queue
channel.queue_declare(queue='user_notifications')
# Bind queue to exchange
channel.queue_bind(
exchange='user_events',
queue='user_notifications',
routing_key='user.*.created'
)
# Send message
message = {
'user_id': 123,
'email': '[email protected]',
'action': 'created'
}
channel.basic_publish(
exchange='user_events',
routing_key='user.email.created',
body=json.dumps(message)
)
print(f"Sent message: {message}")
connection.close()
import pika
import json
import time
def callback(ch, method, properties, body):
print(f"Received message: {body.decode()}")
# Simulate processing time
time.sleep(1)
# Acknowledge message
ch.basic_ack(delivery_tag=method.delivery_tag)
print("Message processed successfully")
# Establish connection
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
# Declare queue
channel.queue_declare(queue='user_notifications')
# Set QoS to process one message at a time
channel.basic_qos(prefetch_count=1)
# Start consuming
channel.basic_consume(
queue='user_notifications',
on_message_callback=callback,
auto_ack=False
)
print("Waiting for messages. To exit press CTRL+C")
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
connection.close()
# docker-compose.yml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq
ports:
- "5672:5672" # AMQP port
- "15672:15672" # Management UI port
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=admin123
volumes:
- rabbitmq_data:/var/lib/rabbitmq
restart: unless-stopped
volumes:
rabbitmq_data:
RabbitMQ is a powerful and flexible message broker that provides reliable, asynchronous communication for distributed systems. Its support for multiple exchange types, message acknowledgments, and clustering makes it suitable for both simple and complex messaging scenarios. By understanding its core concepts and following best practices, you can build robust, scalable applications that handle message processing efficiently.
Whether you're building microservices, implementing event-driven architectures, or simply need reliable message queuing, RabbitMQ offers the features and reliability needed for production environments. Its active community, extensive documentation, and support for multiple protocols make it a popular choice for message broker solutions.