Microservices architecture often requires asynchronous task processing and inter-service communication. Celery, combined with RabbitMQ as a message broker, provides a robust solution for orchestrating tasks across your Django microservices ecosystem.
When you build microservices, you quickly realize that services need to work together to accomplish complex business processes. Imagine an e-commerce system where creating an order involves multiple steps: validating inventory, processing payment, sending confirmation emails, and updating analytics. In a monolithic application, these would all happen in sequence within a single process. But in microservices, each of these tasks might be handled by different services.
This is where task orchestration becomes crucial. Think of it like conducting an orchestra - you need a conductor (orchestrator) to coordinate when each musician (service) plays their part to create beautiful music (complete business process).
Task orchestration involves several key concepts:
Before we dive into implementing Celery and RabbitMQ orchestration, let's understand what each component does and why we need them:
Celery is a distributed task queue system for Python. Think of it as a sophisticated job scheduler that can:
RabbitMQ is a message broker - it's like a post office for your services. It:
Redis (optional but recommended) serves as a result backend where Celery stores the results of completed tasks. This is useful when you need to check if a task completed successfully or retrieve its return value.
Docker helps us package and deploy all these components consistently across different environments.
Here's how to install the required packages:
# Install required packages
pip install celery[redis]==5.2.7
pip install kombu==5.2.4
pip install django-celery-beat==2.4.0
pip install django-celery-results==2.4.0
Let's break down what each package does:
celery[redis]: The main Celery package with Redis supportkombu: Low-level messaging library that Celery uses to communicate with message brokersdjango-celery-beat: Enables periodic task scheduling (like cron jobs)django-celery-results: Stores task results in your Django databaseRabbitMQ acts as the message broker between your Django services and Celery workers. Think of it as a reliable postal service that ensures messages get delivered even if the recipient isn't immediately available.
RabbitMQ is particularly well-suited for microservices because it:
Using Docker is the easiest way to get RabbitMQ running locally. The configuration below sets up both RabbitMQ and Redis in containers:
# docker-compose.yml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3.11-management
container_name: microservices_rabbitmq
ports:
- "5672:5672" # AMQP port for applications
- "15672:15672" # Management web interface
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: password123
volumes:
- rabbitmq_data:/var/lib/rabbitmq # Persist data between restarts
networks:
- microservices_network
redis:
image: redis:7-alpine
container_name: microservices_redis
ports:
- "6379:6379"
networks:
- microservices_network
volumes:
rabbitmq_data: # Named volume for data persistence
networks:
microservices_network:
driver: bridge
What each part does:
rabbitmq:3.11-management: Uses RabbitMQ with the management plugin enabled5672: The main AMQP port where your applications connect15672: Web management interface (visit http://localhost:15672)If you prefer to install RabbitMQ directly on your system, here's how:
# Ubuntu/Debian
sudo apt-get update
sudo apt-get install rabbitmq-server
# macOS (using Homebrew)
brew install rabbitmq
# Start RabbitMQ service
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server # Start automatically on boot
# Enable management plugin for web interface
sudo rabbitmq-plugins enable rabbitmq_management
After installation:
Testing your installation:
# Check if RabbitMQ is running
sudo systemctl status rabbitmq-server
# Or check if the port is listening
netstat -tlnp | grep 5672
Now let's set up Celery to work with your Django microservices. The key concept here is that each service can have its own Celery configuration while sharing the same message broker (RabbitMQ).
In a microservices architecture, you'll typically have multiple Django projects (services) that need to communicate. Here's a recommended structure:
microservices_project/
├── user_service/ # Django project for user management
│ ├── __init__.py
│ ├── settings.py
│ ├── celery.py # Celery configuration for this service
│ ├── tasks.py # Tasks specific to user service
│ └── views.py
├── order_service/ # Django project for order management
│ ├── __init__.py
│ ├── settings.py
│ ├── celery.py # Celery configuration for this service
│ ├── tasks.py # Tasks specific to order service
│ └── views.py
└── shared/ # Shared configuration and utilities
├── __init__.py
└── celery_config.py # Common Celery settings
Why this structure?
The shared configuration file contains settings that all services should use. This ensures consistency and makes it easier to manage your infrastructure:
# shared/celery_config.py
from kombu import Queue, Exchange
# Broker settings - this is where Celery connects to RabbitMQ
CELERY_BROKER_URL = 'amqp://admin:password123@localhost:5672//'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
# Task serialization - how tasks are converted to messages
CELERY_TASK_SERIALIZER = 'json' # Use JSON for task arguments
CELERY_RESULT_SERIALIZER = 'json' # Use JSON for task results
CELERY_ACCEPT_CONTENT = ['json'] # Only accept JSON content
# Timezone settings
CELERY_TIMEZONE = 'UTC' # Use UTC for consistency
CELERY_ENABLE_UTC = True
# Task routing - this is crucial for microservices!
# It determines which queue each task goes to
CELERY_TASK_ROUTES = {
'user_service.tasks.*': {'queue': 'user_queue'}, # All user service tasks
'order_service.tasks.*': {'queue': 'order_queue'}, # All order service tasks
'notification_service.tasks.*': {'queue': 'notification_queue'}, # Notification tasks
}
# Queue configuration - define the actual queues
CELERY_TASK_DEFAULT_QUEUE = 'default'
CELERY_TASK_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('user_queue', Exchange('user'), routing_key='user.#'),
Queue('order_queue', Exchange('order'), routing_key='order.#'),
Queue('notification_queue', Exchange('notification'), routing_key='notification.#'),
)
# Worker configuration - these settings optimize performance
CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # Process one task at a time (safer)
CELERY_TASK_ACKS_LATE = True # Acknowledge tasks only after completion
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000 # Restart workers after 1000 tasks (prevents memory leaks)
# Task execution limits - prevent runaway tasks
CELERY_TASK_TIME_LIMIT = 300 # 5 minutes hard limit
CELERY_TASK_SOFT_TIME_LIMIT = 240 # 4 minutes soft limit (sends signal to task)
Key concepts explained:
user_service.tasks.*, all tasks in the user service automatically go to the user queue.Each Django service needs its own Celery application instance. This allows services to run independently while still communicating through the shared message broker.
# user_service/celery.py
import os
from celery import Celery
from django.conf import settings
# Set the default Django settings module for the 'celery' program.
# This tells Celery which Django settings to use
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'user_service.settings')
# Create the Celery application instance
# The name 'user_service' will appear in logs and monitoring tools
app = Celery('user_service')
# Load configuration from the shared config file
# The namespace='CELERY' means it will look for settings starting with CELERY_
app.config_from_object('shared.celery_config', namespace='CELERY')
# Auto-discover tasks from all installed Django apps
# This automatically finds tasks.py files in your Django apps
app.autodiscover_tasks()
# Debug task - useful for testing your setup
@app.task(bind=True)
def debug_task(self):
"""A simple task to test if Celery is working"""
print(f'Request: {self.request!r}')
return f'Debug task executed successfully'
Important concepts:
Now, make sure Celery is imported when Django starts:
# user_service/__init__.py
from .celery import app as celery_app
# This ensures that the Celery app is loaded when Django starts
__all__ = ('celery_app',)
Why this import is necessary:
Testing your setup:
# Start a Celery worker for the user service
celery -A user_service worker --loglevel=info -Q user_queue
# In another terminal, test the debug task
python manage.py shell
>>> from user_service.celery import debug_task
>>> result = debug_task.delay()
>>> print(result.get())
Now that we have Celery configured, let's create actual tasks that demonstrate how microservices can communicate and coordinate work. We'll start with the user service, which handles user-related operations.
A Celery task is simply a Python function decorated with @shared_task or @app.task. When you call a task, instead of executing immediately, it gets serialized and sent to a message queue. A Celery worker then picks up the task and executes it.
Key benefits of tasks:
Let's create tasks for the user service that demonstrate common patterns:
# user_service/tasks.py
from celery import shared_task
from django.core.mail import send_mail
from django.contrib.auth.models import User
from django.utils import timezone
import requests
import logging
# Set up logging to track task execution
logger = logging.getLogger(__name__)
@shared_task(bind=True, max_retries=3)
def send_welcome_email(self, user_id):
"""
Send welcome email to new user
Args:
user_id: The ID of the user to send email to
Returns:
str: Success message or raises exception
This task demonstrates:
- Database access from tasks
- Email sending (I/O operation)
- Error handling and retries
- Logging for debugging
"""
try:
# Get user from database
# Note: We pass user_id instead of user object because
# Celery can only serialize simple data types (JSON)
user = User.objects.get(id=user_id)
# Send the welcome email
send_mail(
subject='Welcome to Our Platform',
message=f'Hello {user.first_name}, welcome to our microservices platform!',
from_email='noreply@example.com',
recipient_list=[user.email],
fail_silently=False, # Raise exception if email fails
)
# Log successful execution
logger.info(f"Welcome email sent to user {user_id}")
return f"Email sent to {user.email}"
except User.DoesNotExist:
# User not found - this is a permanent error, don't retry
logger.error(f"User {user_id} not found")
raise
except Exception as exc:
# Email sending failed - this might be temporary, so retry
logger.error(f"Failed to send email to user {user_id}: {exc}")
# Retry with exponential backoff
# First retry: 60 seconds, second: 120 seconds, third: 240 seconds
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
@shared_task
def update_user_profile(user_id, profile_data):
"""
Update user profile asynchronously
This task demonstrates:
- Updating database records
- Triggering other tasks (task chaining)
- Error handling for data operations
"""
try:
user = User.objects.get(id=user_id)
# Update user fields from profile_data
for field, value in profile_data.items():
if hasattr(user, field):
setattr(user, field, value)
user.save()
# Trigger another task to notify other services
# This is called "task chaining" - one task triggers another
notify_profile_update.delay(user_id, profile_data)
logger.info(f"Profile updated for user {user_id}")
return f"Profile updated for user {user_id}"
except User.DoesNotExist:
logger.error(f"User {user_id} not found for profile update")
raise
except Exception as exc:
logger.error(f"Failed to update profile for user {user_id}: {exc}")
raise
@shared_task
def notify_profile_update(user_id, profile_data):
"""
Notify other services about profile updates
This task demonstrates:
- Inter-service communication via HTTP
- Handling multiple service calls
- Graceful error handling for external services
"""
# List of services that need to know about profile updates
services = [
'http://order-service:8001/api/user-updated/',
'http://notification-service:8002/api/user-updated/',
]
# Prepare the payload to send to other services
payload = {
'user_id': user_id,
'profile_data': profile_data,
'timestamp': timezone.now().isoformat(),
'event_type': 'user_profile_updated'
}
# Notify each service
for service_url in services:
try:
response = requests.post(
service_url,
json=payload,
timeout=10, # Don't wait more than 10 seconds
headers={'Content-Type': 'application/json'}
)
response.raise_for_status() # Raise exception for HTTP errors
logger.info(f"Notified {service_url} about user {user_id} update")
except requests.RequestException as exc:
# Log the error but don't fail the entire task
# In production, you might want to retry failed notifications
logger.error(f"Failed to notify {service_url}: {exc}")
Key patterns demonstrated:
The order service demonstrates more complex orchestration patterns. Order processing typically involves multiple steps that must happen in sequence, and some steps can happen in parallel. Let's see how Celery handles these scenarios:
# order_service/tasks.py
from celery import shared_task, group, chain, chord
from django.utils import timezone
from .models import Order, OrderItem
import requests
import logging
logger = logging.getLogger(__name__)
@shared_task(bind=True, max_retries=3)
def process_order(self, order_id):
"""
Main order processing task that orchestrates the entire workflow
This task demonstrates:
- Complex workflow orchestration
- Using Celery's chain primitive for sequential tasks
- Error handling for business processes
- Database state management
"""
try:
# Get the order from database
order = Order.objects.get(id=order_id)
# Update order status to indicate processing has started
order.status = 'processing'
order.processing_started_at = timezone.now()
order.save()
# Create a workflow chain - these tasks will execute in sequence
# Each task passes its result to the next task
workflow = chain(
validate_inventory.s(order_id), # Step 1: Check if items are in stock
calculate_shipping.s(), # Step 2: Calculate shipping costs
process_payment.s(), # Step 3: Process payment
send_confirmation.s() # Step 4: Send confirmation email
)
# Execute the workflow asynchronously
result = workflow.apply_async()
logger.info(f"Order {order_id} processing workflow initiated")
return f"Order {order_id} processing initiated"
except Order.DoesNotExist:
logger.error(f"Order {order_id} not found")
raise
except Exception as exc:
logger.error(f"Failed to process order {order_id}: {exc}")
# Update order status to failed
try:
order = Order.objects.get(id=order_id)
order.status = 'failed'
order.error_message = str(exc)
order.save()
except:
pass # Don't fail if we can't update status
raise self.retry(exc=exc, countdown=60)
@shared_task
def validate_inventory(order_id):
"""
Validate that all items in the order are available in inventory
This task demonstrates:
- External service communication
- Business logic validation
- Error handling for business rules
"""
order = Order.objects.get(id=order_id)
logger.info(f"Validating inventory for order {order_id}")
# Check inventory for each item in the order
for item in order.items.all():
try:
# Call the inventory service to check stock
response = requests.post(
'http://inventory-service:8003/api/check-stock/',
json={
'product_id': item.product_id,
'quantity': item.quantity
},
timeout=10
)
response.raise_for_status()
stock_data = response.json()
if not stock_data.get('available'):
# Not enough stock - this is a business rule violation
raise Exception(f"Insufficient stock for product {item.product_id}")
logger.info(f"Stock validated for product {item.product_id}")
except requests.RequestException as exc:
logger.error(f"Failed to check stock for product {item.product_id}: {exc}")
raise Exception(f"Inventory service unavailable: {exc}")
logger.info(f"All items validated for order {order_id}")
return order_id # Pass order_id to next task in chain
@shared_task
def calculate_shipping(order_id):
"""
Calculate shipping costs for the order
This task demonstrates:
- External service integration
- Data enrichment (adding calculated fields)
- Error handling for calculations
"""
order = Order.objects.get(id=order_id)
logger.info(f"Calculating shipping for order {order_id}")
try:
# Call shipping service to calculate costs
response = requests.post(
'http://shipping-service:8004/api/calculate/',
json={
'order_id': order_id,
'destination': order.shipping_address,
'weight': order.total_weight,
'items': [
{
'product_id': item.product_id,
'quantity': item.quantity,
'weight': item.weight
}
for item in order.items.all()
]
},
timeout=15
)
response.raise_for_status()
shipping_data = response.json()
shipping_cost = shipping_data.get('cost', 0)
estimated_delivery = shipping_data.get('estimated_delivery')
# Update order with shipping information
order.shipping_cost = shipping_cost
order.estimated_delivery = estimated_delivery
order.total_amount = order.subtotal + shipping_cost
order.save()
logger.info(f"Shipping calculated for order {order_id}: ${shipping_cost}")
return order_id
except requests.RequestException as exc:
logger.error(f"Failed to calculate shipping for order {order_id}: {exc}")
raise Exception(f"Shipping service unavailable: {exc}")
@shared_task
def process_payment(order_id):
"""
Process payment for the order
This task demonstrates:
- Critical business operations
- Transaction handling
- External payment processing
- Detailed error handling
"""
order = Order.objects.get(id=order_id)
logger.info(f"Processing payment for order {order_id}")
try:
# Call payment service to charge the customer
response = requests.post(
'http://payment-service:8005/api/charge/',
json={
'order_id': order_id,
'amount': float(order.total_amount),
'currency': 'USD',
'payment_method': order.payment_method,
'customer_id': order.user_id,
'description': f'Order #{order_id}'
},
timeout=30 # Payment processing might take longer
)
response.raise_for_status()
payment_data = response.json()
if payment_data.get('success'):
# Payment successful - update order
order.status = 'paid'
order.payment_id = payment_data.get('payment_id')
order.paid_at = timezone.now()
order.save()
logger.info(f"Payment processed for order {order_id}: {payment_data.get('payment_id')}")
return order_id
else:
# Payment failed
error_message = payment_data.get('error', 'Payment processing failed')
logger.error(f"Payment failed for order {order_id}: {error_message}")
raise Exception(f"Payment failed: {error_message}")
except requests.RequestException as exc:
logger.error(f"Payment service error for order {order_id}: {exc}")
raise Exception(f"Payment service unavailable: {exc}")
@shared_task
def send_confirmation(order_id):
"""
Send order confirmation to customer
This task demonstrates:
- Final step in workflow
- Customer communication
- Order completion
"""
order = Order.objects.get(id=order_id)
logger.info(f"Sending confirmation for order {order_id}")
# Send confirmation email to customer
send_order_email.delay(
order.user.email,
'Order Confirmation',
f'Your order #{order_id} has been confirmed and will be shipped soon!'
)
# Update final order status
order.status = 'confirmed'
order.confirmed_at = timezone.now()
order.save()
logger.info(f"Order {order_id} confirmed and customer notified")
return f"Order {order_id} confirmed"
@shared_task
def send_order_email(email, subject, message):
"""
Send order-related emails
This is a utility task that can be reused for different types of order emails
"""
try:
from django.core.mail import send_mail
send_mail(
subject,
message,
'orders@example.com',
[email],
fail_silently=False,
)
logger.info(f"Order email sent to {email}")
return f"Email sent to {email}"
except Exception as exc:
logger.error(f"Failed to send email to {email}: {exc}")
raise
Key concepts in order processing:
chain() ensures tasks execute in the correct order. Each task receives the result of the previous task.Celery provides several powerful primitives for orchestrating complex workflows. Understanding these patterns is crucial for building robust microservices that can handle real-world business processes.
Before diving into examples, let's understand the key orchestration primitives:
Let's see how to combine these primitives for complex business processes:
# Complex workflow orchestration
from celery import chain, group, chord
@shared_task
def orchestrate_user_onboarding(user_id):
"""
Orchestrate complete user onboarding process
This demonstrates a real-world scenario where some tasks must happen
in sequence (user validation) while others can happen in parallel
(sending emails, setting up preferences).
"""
# Step 1: Sequential tasks that must happen in order
# These tasks depend on each other's results
sequential_tasks = chain(
validate_user_data.s(user_id), # First: validate the user data
create_user_profile.s(), # Then: create profile (needs validated data)
assign_user_role.s(), # Finally: assign role (needs profile)
)
# Step 2: Parallel tasks that can run simultaneously
# These tasks are independent and can run at the same time
parallel_tasks = group(
send_welcome_email.s(user_id), # Send welcome email
create_user_preferences.s(user_id), # Set up default preferences
setup_default_settings.s(user_id), # Configure default settings
notify_admin_new_user.s(user_id), # Notify administrators
)
# Step 3: Combine sequential and parallel execution
# First run sequential tasks, then run parallel tasks
workflow = chain(
sequential_tasks, # Must complete first
parallel_tasks, # Then run in parallel
finalize_onboarding.s(user_id) # Finally, complete onboarding
)
# Execute the entire workflow
result = workflow.apply_async()
return {
'workflow_id': result.id,
'status': 'initiated',
'user_id': user_id
}
@shared_task
def validate_user_data(user_id):
"""Validate user data and return validation results"""
user = User.objects.get(id=user_id)
# Perform various validations
validations = {
'email_valid': '@' in user.email,
'name_provided': bool(user.first_name and user.last_name),
'age_appropriate': True, # Add age validation logic
}
if not all(validations.values()):
raise Exception(f"User validation failed: {validations}")
logger.info(f"User {user_id} validation passed")
return {'user_id': user_id, 'validations': validations}
@shared_task
def create_user_profile(validation_result):
"""Create user profile based on validation results"""
user_id = validation_result['user_id']
user = User.objects.get(id=user_id)
# Create profile with validated data
profile = UserProfile.objects.create(
user=user,
status='active',
created_via='onboarding_workflow'
)
logger.info(f"Profile created for user {user_id}")
return {'user_id': user_id, 'profile_id': profile.id}
@shared_task
def orchestrate_order_fulfillment(order_id):
"""
Orchestrate order fulfillment across multiple services
This demonstrates the chord pattern - multiple tasks run in parallel,
and when they all complete, a callback task runs with all the results.
"""
# Preparation tasks that can run in parallel
# All of these need to complete before we can fulfill the order
preparation_tasks = group(
reserve_inventory.s(order_id), # Reserve items in warehouse
calculate_taxes.s(order_id), # Calculate tax amounts
validate_shipping_address.s(order_id), # Verify shipping address
check_fraud_score.s(order_id), # Run fraud detection
)
# Use chord to run preparation tasks in parallel, then run callback
# The callback receives a list of all the results
fulfillment_workflow = chord(
preparation_tasks, # Run these in parallel
finalize_order_preparation.s(order_id) # Then run this with all results
)
result = fulfillment_workflow.apply_async()
return {
'workflow_id': result.id,
'order_id': order_id,
'status': 'preparation_started'
}
@shared_task
def reserve_inventory(order_id):
"""Reserve inventory items for the order"""
order = Order.objects.get(id=order_id)
reservations = []
for item in order.items.all():
# Call inventory service to reserve items
response = requests.post(
'http://inventory-service:8003/api/reserve/',
json={
'product_id': item.product_id,
'quantity': item.quantity,
'order_id': order_id
}
)
if response.status_code == 200:
reservation_id = response.json().get('reservation_id')
reservations.append({
'product_id': item.product_id,
'reservation_id': reservation_id
})
else:
raise Exception(f"Failed to reserve {item.product_id}")
return {'order_id': order_id, 'reservations': reservations}
@shared_task
def calculate_taxes(order_id):
"""Calculate tax amounts for the order"""
order = Order.objects.get(id=order_id)
# Call tax service
response = requests.post(
'http://tax-service:8006/api/calculate/',
json={
'order_id': order_id,
'shipping_address': order.shipping_address,
'items': [
{
'product_id': item.product_id,
'price': float(item.price),
'quantity': item.quantity
}
for item in order.items.all()
]
}
)
tax_data = response.json()
return {'order_id': order_id, 'tax_amount': tax_data.get('total_tax', 0)}
@shared_task
def finalize_order_preparation(preparation_results, order_id):
"""
Finalize order preparation using results from all preparation tasks
This callback task receives the results from all the parallel tasks
and uses them to complete the order preparation.
"""
order = Order.objects.get(id=order_id)
# Process results from all preparation tasks
total_tax = 0
reservations = []
for result in preparation_results:
if 'tax_amount' in result:
total_tax += result['tax_amount']
elif 'reservations' in result:
reservations.extend(result['reservations'])
# Update order with calculated values
order.tax_amount = total_tax
order.total_amount = order.subtotal + order.shipping_cost + total_tax
order.status = 'ready_to_ship'
order.save()
# Store reservation information
for reservation in reservations:
OrderReservation.objects.create(
order=order,
product_id=reservation['product_id'],
reservation_id=reservation['reservation_id']
)
logger.info(f"Order {order_id} preparation finalized")
return f"Order {order_id} ready for fulfillment"
Key benefits of these patterns:
Event-driven architecture is a powerful pattern for microservices where services communicate by publishing and consuming events. Instead of direct service-to-service calls, services publish events when something interesting happens, and other services can subscribe to these events.
Benefits of event-driven communication:
# Event publishing and handling
@shared_task
def publish_user_event(event_type, user_id, data=None):
"""
Publish user-related events to interested services
This is the publisher side of event-driven communication.
When something happens to a user (created, updated, deleted),
this task publishes an event that other services can react to.
Args:
event_type: Type of event (e.g., 'user.created', 'user.updated')
user_id: ID of the user the event relates to
data: Additional event data
"""
# Create a standardized event payload
event_payload = {
'event_type': event_type,
'user_id': user_id,
'data': data or {},
'timestamp': timezone.now().isoformat(),
'service': 'user_service',
'event_id': str(uuid.uuid4()), # Unique identifier for this event
'version': '1.0' # Event schema version for compatibility
}
# Define which services are interested in which events
# This routing map determines where each event type gets sent
routing_map = {
'user.created': ['notification_queue', 'analytics_queue', 'recommendation_queue'],
'user.updated': ['order_queue', 'recommendation_queue', 'cache_invalidation_queue'],
'user.deleted': ['cleanup_queue', 'analytics_queue', 'gdpr_compliance_queue'],
'user.login': ['analytics_queue', 'security_queue'],
'user.password_changed': ['security_queue', 'notification_queue'],
}
# Get the queues that should receive this event type
target_queues = routing_map.get(event_type, [])
if not target_queues:
logger.warning(f"No routing defined for event type: {event_type}")
return
# Send the event to all interested queues
for queue in target_queues:
try:
handle_user_event.apply_async(
args=[event_payload],
queue=queue,
retry=True,
retry_policy={
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
}
)
logger.info(f"Event {event_type} sent to queue {queue}")
except Exception as exc:
logger.error(f"Failed to send event {event_type} to queue {queue}: {exc}")
@shared_task
def handle_user_event(event_payload):
"""
Handle incoming user events
This is the consumer side of event-driven communication.
Different services can implement this task to react to user events
in their own specific way.
Args:
event_payload: The event data published by another service
"""
event_type = event_payload['event_type']
user_id = event_payload['user_id']
event_data = event_payload.get('data', {})
logger.info(f"Handling event {event_type} for user {user_id}")
# Define handlers for different event types
# Each handler implements the business logic for reacting to that event
handlers = {
'user.created': handle_user_created,
'user.updated': handle_user_updated,
'user.deleted': handle_user_deleted,
'user.login': handle_user_login,
'user.password_changed': handle_password_changed,
}
# Get the appropriate handler for this event type
handler = handlers.get(event_type)
if handler:
try:
result = handler(event_payload)
logger.info(f"Successfully handled event {event_type} for user {user_id}")
return result
except Exception as exc:
logger.error(f"Error handling event {event_type} for user {user_id}: {exc}")
raise
else:
logger.warning(f"No handler for event type: {event_type}")
def handle_user_created(event_payload):
"""
Handle user creation events
This might be implemented differently in different services:
- Analytics service: Track new user registration
- Notification service: Send welcome email
- Recommendation service: Initialize user preferences
"""
user_id = event_payload['user_id']
user_data = event_payload.get('data', {})
# Example: Analytics service implementation
if get_current_service() == 'analytics_service':
# Track user registration in analytics
track_user_registration.delay(user_id, user_data)
# Update user statistics
update_registration_stats.delay()
# Example: Notification service implementation
elif get_current_service() == 'notification_service':
# Send welcome email
send_welcome_email.delay(user_id)
# Create notification preferences
create_notification_preferences.delay(user_id)
return f"User creation event handled for user {user_id}"
def handle_user_updated(event_payload):
"""Handle user update events"""
user_id = event_payload['user_id']
updated_fields = event_payload.get('data', {}).get('updated_fields', [])
# Different services react to different field updates
if 'email' in updated_fields:
# Email changed - update email verification status
verify_new_email.delay(user_id)
if 'preferences' in updated_fields:
# Preferences changed - update recommendation engine
update_user_recommendations.delay(user_id)
# Invalidate caches that might contain user data
invalidate_user_caches.delay(user_id)
return f"User update event handled for user {user_id}"
def handle_user_deleted(event_payload):
"""Handle user deletion events"""
user_id = event_payload['user_id']
# GDPR compliance - remove user data from all systems
cleanup_user_data.delay(user_id)
# Update analytics
track_user_deletion.delay(user_id)
# Cancel any pending tasks for this user
cancel_user_tasks.delay(user_id)
return f"User deletion event handled for user {user_id}"
# Example of how to publish events from your Django views or services
def create_user_with_events(user_data):
"""
Create a user and publish appropriate events
This shows how to integrate event publishing into your business logic
"""
# Create the user
user = User.objects.create(**user_data)
# Publish user creation event
publish_user_event.delay(
event_type='user.created',
user_id=user.id,
data={
'username': user.username,
'email': user.email,
'registration_source': 'web',
'user_agent': request.META.get('HTTP_USER_AGENT', ''),
}
)
return user
# Example of event-driven cache invalidation
@shared_task
def invalidate_user_caches(user_id):
"""
Invalidate all caches related to a user
This task can be triggered by user update events to ensure
cached data stays consistent across services
"""
cache_keys = [
f'user:{user_id}',
f'user_profile:{user_id}',
f'user_preferences:{user_id}',
f'user_orders:{user_id}',
]
for key in cache_keys:
cache.delete(key)
logger.debug(f"Invalidated cache key: {key}")
# Also invalidate pattern-based caches
cache_patterns = [
f'user_list:*',
f'user_stats:*',
]
for pattern in cache_patterns:
invalidate_cache_pattern.delay(pattern)
Key concepts in event-driven communication:
# Monitoring and logging
import structlog
from celery.signals import task_prerun, task_postrun, task_failure
logger = structlog.get_logger()
@task_prerun.connect
def task_prerun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):
logger.info("Task started", task_id=task_id, task_name=task.name)
@task_postrun.connect
def task_postrun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds):
logger.info("Task completed", task_id=task_id, task_name=task.name, state=state)
@task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, traceback=None, einfo=None, **kwds):
logger.error("Task failed", task_id=task_id, task_name=sender.name, exception=str(exception))
# Custom task base class with enhanced error handling
from celery import Task
class CallbackTask(Task):
"""Custom task class with callbacks and enhanced error handling"""
def on_success(self, retval, task_id, args, kwargs):
"""Called on task success"""
logger.info(f"Task {task_id} succeeded with result: {retval}")
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Called on task failure"""
logger.error(f"Task {task_id} failed: {exc}")
# Send failure notification
send_failure_notification.delay(task_id, str(exc))
def on_retry(self, exc, task_id, args, kwargs, einfo):
"""Called on task retry"""
logger.warning(f"Task {task_id} retrying due to: {exc}")
@shared_task(base=CallbackTask, bind=True)
def robust_task(self, data):
"""Example task with robust error handling"""
try:
# Task logic here
result = process_data(data)
return result
except RetryableException as exc:
# Retry with exponential backoff
raise self.retry(
exc=exc,
countdown=2 ** self.request.retries,
max_retries=5
)
except FatalException as exc:
# Don't retry fatal exceptions
logger.error(f"Fatal error in task: {exc}")
raise
# Circuit breaker pattern for external service calls
import time
from functools import wraps
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
def call(self, func, *args, **kwargs):
if self.state == 'OPEN':
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = 'HALF_OPEN'
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self.on_success()
return result
except Exception as exc:
self.on_failure()
raise exc
def on_success(self):
self.failure_count = 0
self.state = 'CLOSED'
def on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
# Usage in tasks
payment_circuit_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)
@shared_task(bind=True, max_retries=3)
def process_payment_with_circuit_breaker(self, order_id, amount):
"""Process payment with circuit breaker protection"""
try:
def make_payment_call():
response = requests.post(
'http://payment-service:8005/api/charge/',
json={'order_id': order_id, 'amount': amount},
timeout=10
)
response.raise_for_status()
return response.json()
result = payment_circuit_breaker.call(make_payment_call)
return result
except Exception as exc:
logger.error(f"Payment processing failed: {exc}")
raise self.retry(exc=exc, countdown=60)
# Dockerfile for Celery worker
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# Create celery user
RUN adduser --disabled-password --gecos '' celery
USER celery
CMD ["celery", "-A", "user_service", "worker", "--loglevel=info", "--concurrency=4"]
# docker-compose.yml for microservices with Celery
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3.11-management
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: password123
ports:
- "5672:5672"
- "15672:15672"
volumes:
- rabbitmq_data:/var/lib/rabbitmq
redis:
image: redis:7-alpine
ports:
- "6379:6379"
user-service:
build: ./user_service
ports:
- "8000:8000"
depends_on:
- rabbitmq
- redis
environment:
- CELERY_BROKER_URL=amqp://admin:password123@rabbitmq:5672//
- CELERY_RESULT_BACKEND=redis://redis:6379/0
user-worker:
build: ./user_service
command: celery -A user_service worker --loglevel=info --concurrency=4 -Q user_queue
depends_on:
- rabbitmq
- redis
environment:
- CELERY_BROKER_URL=amqp://admin:password123@rabbitmq:5672//
- CELERY_RESULT_BACKEND=redis://redis:6379/0
order-service:
build: ./order_service
ports:
- "8001:8000"
depends_on:
- rabbitmq
- redis
order-worker:
build: ./order_service
command: celery -A order_service worker --loglevel=info --concurrency=2 -Q order_queue
depends_on:
- rabbitmq
- redis
celery-beat:
build: ./user_service
command: celery -A user_service beat --loglevel=info
depends_on:
- rabbitmq
- redis
environment:
- CELERY_BROKER_URL=amqp://admin:password123@rabbitmq:5672//
flower:
build: ./user_service
command: celery -A user_service flower --port=5555
ports:
- "5555:5555"
depends_on:
- rabbitmq
- redis
environment:
- CELERY_BROKER_URL=amqp://admin:password123@rabbitmq:5672//
volumes:
rabbitmq_data:
# Optimized task configuration
@shared_task(
bind=True,
max_retries=3,
default_retry_delay=60,
rate_limit='100/m', # 100 tasks per minute
time_limit=300, # 5 minutes hard limit
soft_time_limit=240, # 4 minutes soft limit
)
def optimized_task(self, data):
"""Example of well-configured task"""
pass
# Batch processing for efficiency
@shared_task
def process_batch_emails(email_batch):
"""Process emails in batches for better performance"""
from django.core.mail import send_mass_mail
messages = []
for email_data in email_batch:
message = (
email_data['subject'],
email_data['body'],
'noreply@example.com',
[email_data['recipient']]
)
messages.append(message)
send_mass_mail(messages, fail_silently=False)
return f"Sent {len(messages)} emails"
# Chunking large datasets
from celery import group
@shared_task
def process_large_dataset(dataset_id, chunk_size=100):
"""Process large datasets in chunks"""
dataset = get_dataset(dataset_id)
total_items = dataset.count()
# Create chunks
chunks = []
for i in range(0, total_items, chunk_size):
chunk_data = list(dataset[i:i + chunk_size])
chunks.append(process_chunk.s(chunk_data))
# Process chunks in parallel
job = group(chunks)
result = job.apply_async()
return f"Processing {len(chunks)} chunks"
@shared_task
def process_chunk(chunk_data):
"""Process a single chunk of data"""
results = []
for item in chunk_data:
result = process_single_item(item)
results.append(result)
return results
CELERY_WORKER_MAX_TASKS_PER_CHILD to restart workers periodicallyThis comprehensive guide provides the foundation for orchestrating microservices with Celery and RabbitMQ. The patterns and examples shown here can be adapted to your specific use cases and requirements.
Creating RESTful APIs for Microservices
RESTful APIs are the backbone of microservices communication. This section covers designing, implementing, and optimizing REST APIs using Django REST Framework for microservices architecture.
Testing Microservices
Testing microservices presents unique challenges compared to monolithic applications. This chapter covers comprehensive testing strategies, tools, and best practices for ensuring reliability and quality in your Django microservices architecture.