Microservices with Django

Orchestrating Microservices with Celery and RabbitMQ

Microservices architecture often requires asynchronous task processing and inter-service communication. Celery, combined with RabbitMQ as a message broker, provides a robust solution for orchestrating tasks across your Django microservices ecosystem.

Orchestrating Microservices with Celery and RabbitMQ

Microservices architecture often requires asynchronous task processing and inter-service communication. Celery, combined with RabbitMQ as a message broker, provides a robust solution for orchestrating tasks across your Django microservices ecosystem.

Understanding Task Orchestration in Microservices

When you build microservices, you quickly realize that services need to work together to accomplish complex business processes. Imagine an e-commerce system where creating an order involves multiple steps: validating inventory, processing payment, sending confirmation emails, and updating analytics. In a monolithic application, these would all happen in sequence within a single process. But in microservices, each of these tasks might be handled by different services.

This is where task orchestration becomes crucial. Think of it like conducting an orchestra - you need a conductor (orchestrator) to coordinate when each musician (service) plays their part to create beautiful music (complete business process).

Task orchestration involves several key concepts:

  • Asynchronous Processing: Instead of making users wait while you send emails or generate reports, you can queue these tasks to run in the background. This keeps your web application responsive while heavy work happens behind the scenes.
  • Inter-service Communication: Services need to talk to each other reliably. Rather than direct HTTP calls (which can fail), message queues provide a robust way for services to communicate even when some services are temporarily unavailable.
  • Load Distribution: When you have multiple instances of a service running, you want to distribute work evenly among them. This prevents any single instance from becoming overwhelmed while others sit idle.
  • Fault Tolerance: In distributed systems, things will go wrong. Networks fail, services crash, and databases become unavailable. Good orchestration ensures your system can handle these failures gracefully and retry operations when appropriate.

Technical Requirements

Before we dive into implementing Celery and RabbitMQ orchestration, let's understand what each component does and why we need them:

Celery is a distributed task queue system for Python. Think of it as a sophisticated job scheduler that can:

  • Execute tasks asynchronously (in the background)
  • Distribute tasks across multiple worker processes
  • Handle task failures and retries automatically
  • Schedule tasks to run at specific times

RabbitMQ is a message broker - it's like a post office for your services. It:

  • Receives messages from one service and delivers them to another
  • Ensures messages aren't lost even if services are temporarily down
  • Can route messages to different queues based on rules you define
  • Provides reliability through message acknowledgments

Redis (optional but recommended) serves as a result backend where Celery stores the results of completed tasks. This is useful when you need to check if a task completed successfully or retrieve its return value.

Docker helps us package and deploy all these components consistently across different environments.

Here's how to install the required packages:

# Install required packages
pip install celery[redis]==5.2.7
pip install kombu==5.2.4
pip install django-celery-beat==2.4.0
pip install django-celery-results==2.4.0

Let's break down what each package does:

  • celery[redis]: The main Celery package with Redis support
  • kombu: Low-level messaging library that Celery uses to communicate with message brokers
  • django-celery-beat: Enables periodic task scheduling (like cron jobs)
  • django-celery-results: Stores task results in your Django database

Setting Up RabbitMQ

RabbitMQ acts as the message broker between your Django services and Celery workers. Think of it as a reliable postal service that ensures messages get delivered even if the recipient isn't immediately available.

Why RabbitMQ?

RabbitMQ is particularly well-suited for microservices because it:

  • Guarantees message delivery: Messages won't be lost even if services restart
  • Supports complex routing: You can route different types of messages to different queues
  • Provides monitoring: Built-in web interface to see what's happening
  • Handles high throughput: Can process thousands of messages per second
  • Offers clustering: Can run across multiple servers for high availability

Using Docker is the easiest way to get RabbitMQ running locally. The configuration below sets up both RabbitMQ and Redis in containers:

# docker-compose.yml
version: '3.8'
services:
  rabbitmq:
    image: rabbitmq:3.11-management
    container_name: microservices_rabbitmq
    ports:
      - "5672:5672"    # AMQP port for applications
      - "15672:15672"  # Management web interface
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: password123
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq  # Persist data between restarts
    networks:
      - microservices_network

  redis:
    image: redis:7-alpine
    container_name: microservices_redis
    ports:
      - "6379:6379"
    networks:
      - microservices_network

volumes:
  rabbitmq_data:  # Named volume for data persistence

networks:
  microservices_network:
    driver: bridge

What each part does:

  • rabbitmq:3.11-management: Uses RabbitMQ with the management plugin enabled
  • Port 5672: The main AMQP port where your applications connect
  • Port 15672: Web management interface (visit http://localhost:15672)
  • Environment variables: Set default username/password for security
  • Volume: Ensures your queues and messages survive container restarts
  • Network: Allows containers to communicate with each other

Manual Installation (Alternative to Docker)

If you prefer to install RabbitMQ directly on your system, here's how:

# Ubuntu/Debian
sudo apt-get update
sudo apt-get install rabbitmq-server

# macOS (using Homebrew)
brew install rabbitmq

# Start RabbitMQ service
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server  # Start automatically on boot

# Enable management plugin for web interface
sudo rabbitmq-plugins enable rabbitmq_management

After installation:

  1. RabbitMQ will be running on port 5672
  2. Management interface available at http://localhost:15672
  3. Default credentials: guest/guest (change these in production!)

Testing your installation:

# Check if RabbitMQ is running
sudo systemctl status rabbitmq-server

# Or check if the port is listening
netstat -tlnp | grep 5672

Configuring Celery in Django

Now let's set up Celery to work with your Django microservices. The key concept here is that each service can have its own Celery configuration while sharing the same message broker (RabbitMQ).

Understanding the Project Structure

In a microservices architecture, you'll typically have multiple Django projects (services) that need to communicate. Here's a recommended structure:

microservices_project/
├── user_service/              # Django project for user management
│   ├── __init__.py
│   ├── settings.py
│   ├── celery.py             # Celery configuration for this service
│   ├── tasks.py              # Tasks specific to user service
│   └── views.py
├── order_service/             # Django project for order management
│   ├── __init__.py
│   ├── settings.py
│   ├── celery.py             # Celery configuration for this service
│   ├── tasks.py              # Tasks specific to order service
│   └── views.py
└── shared/                    # Shared configuration and utilities
    ├── __init__.py
    └── celery_config.py       # Common Celery settings

Why this structure?

  • Each service maintains its own Celery instance and tasks
  • Shared configuration ensures consistency across services
  • Services can communicate through message queues
  • Easy to scale individual services independently

Shared Celery Configuration

The shared configuration file contains settings that all services should use. This ensures consistency and makes it easier to manage your infrastructure:

# shared/celery_config.py
from kombu import Queue, Exchange

# Broker settings - this is where Celery connects to RabbitMQ
CELERY_BROKER_URL = 'amqp://admin:password123@localhost:5672//'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

# Task serialization - how tasks are converted to messages
CELERY_TASK_SERIALIZER = 'json'        # Use JSON for task arguments
CELERY_RESULT_SERIALIZER = 'json'      # Use JSON for task results
CELERY_ACCEPT_CONTENT = ['json']       # Only accept JSON content

# Timezone settings
CELERY_TIMEZONE = 'UTC'                # Use UTC for consistency
CELERY_ENABLE_UTC = True

# Task routing - this is crucial for microservices!
# It determines which queue each task goes to
CELERY_TASK_ROUTES = {
    'user_service.tasks.*': {'queue': 'user_queue'},           # All user service tasks
    'order_service.tasks.*': {'queue': 'order_queue'},         # All order service tasks
    'notification_service.tasks.*': {'queue': 'notification_queue'},  # Notification tasks
}

# Queue configuration - define the actual queues
CELERY_TASK_DEFAULT_QUEUE = 'default'
CELERY_TASK_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('user_queue', Exchange('user'), routing_key='user.#'),
    Queue('order_queue', Exchange('order'), routing_key='order.#'),
    Queue('notification_queue', Exchange('notification'), routing_key='notification.#'),
)

# Worker configuration - these settings optimize performance
CELERY_WORKER_PREFETCH_MULTIPLIER = 1  # Process one task at a time (safer)
CELERY_TASK_ACKS_LATE = True          # Acknowledge tasks only after completion
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000  # Restart workers after 1000 tasks (prevents memory leaks)

# Task execution limits - prevent runaway tasks
CELERY_TASK_TIME_LIMIT = 300          # 5 minutes hard limit
CELERY_TASK_SOFT_TIME_LIMIT = 240     # 4 minutes soft limit (sends signal to task)

Key concepts explained:

  1. Task Routing: This is how Celery knows which queue to send each task to. By using patterns like user_service.tasks.*, all tasks in the user service automatically go to the user queue.
  2. Exchanges and Queues: Think of exchanges as post offices and queues as mailboxes. Messages go to exchanges first, then get routed to the appropriate queue based on routing keys.
  3. Worker Configuration: These settings help prevent common issues like memory leaks and stuck tasks.
  4. Time Limits: Prevent tasks from running forever, which could block workers.

Service-Specific Celery Setup

Each Django service needs its own Celery application instance. This allows services to run independently while still communicating through the shared message broker.

# user_service/celery.py
import os
from celery import Celery
from django.conf import settings

# Set the default Django settings module for the 'celery' program.
# This tells Celery which Django settings to use
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'user_service.settings')

# Create the Celery application instance
# The name 'user_service' will appear in logs and monitoring tools
app = Celery('user_service')

# Load configuration from the shared config file
# The namespace='CELERY' means it will look for settings starting with CELERY_
app.config_from_object('shared.celery_config', namespace='CELERY')

# Auto-discover tasks from all installed Django apps
# This automatically finds tasks.py files in your Django apps
app.autodiscover_tasks()

# Debug task - useful for testing your setup
@app.task(bind=True)
def debug_task(self):
    """A simple task to test if Celery is working"""
    print(f'Request: {self.request!r}')
    return f'Debug task executed successfully'

Important concepts:

  1. App Instance: Each service has its own Celery app instance. This allows them to run independently.
  2. Settings Module: We tell Celery which Django settings to use. This is crucial for database connections and other Django features.
  3. Auto-discovery: Celery automatically finds tasks in your Django apps, so you don't have to manually register them.
  4. Debug Task: Always include a simple test task to verify your setup is working.

Now, make sure Celery is imported when Django starts:

# user_service/__init__.py
from .celery import app as celery_app

# This ensures that the Celery app is loaded when Django starts
__all__ = ('celery_app',)

Why this import is necessary:

  • Django needs to know about your Celery app
  • This import happens when Django starts up
  • Without it, your tasks won't be registered properly

Testing your setup:

# Start a Celery worker for the user service
celery -A user_service worker --loglevel=info -Q user_queue

# In another terminal, test the debug task
python manage.py shell
>>> from user_service.celery import debug_task
>>> result = debug_task.delay()
>>> print(result.get())

Implementing Task-Based Communication

Now that we have Celery configured, let's create actual tasks that demonstrate how microservices can communicate and coordinate work. We'll start with the user service, which handles user-related operations.

Understanding Celery Tasks

A Celery task is simply a Python function decorated with @shared_task or @app.task. When you call a task, instead of executing immediately, it gets serialized and sent to a message queue. A Celery worker then picks up the task and executes it.

Key benefits of tasks:

  • Asynchronous execution: Your web request doesn't wait for the task to complete
  • Reliability: Tasks are persisted in the queue and won't be lost if workers restart
  • Scalability: You can run multiple workers to process tasks in parallel
  • Retry logic: Tasks can automatically retry if they fail

User Service Tasks

Let's create tasks for the user service that demonstrate common patterns:

# user_service/tasks.py
from celery import shared_task
from django.core.mail import send_mail
from django.contrib.auth.models import User
from django.utils import timezone
import requests
import logging

# Set up logging to track task execution
logger = logging.getLogger(__name__)

@shared_task(bind=True, max_retries=3)
def send_welcome_email(self, user_id):
    """
    Send welcome email to new user
    
    Args:
        user_id: The ID of the user to send email to
        
    Returns:
        str: Success message or raises exception
        
    This task demonstrates:
    - Database access from tasks
    - Email sending (I/O operation)
    - Error handling and retries
    - Logging for debugging
    """
    try:
        # Get user from database
        # Note: We pass user_id instead of user object because
        # Celery can only serialize simple data types (JSON)
        user = User.objects.get(id=user_id)
        
        # Send the welcome email
        send_mail(
            subject='Welcome to Our Platform',
            message=f'Hello {user.first_name}, welcome to our microservices platform!',
            from_email='noreply@example.com',
            recipient_list=[user.email],
            fail_silently=False,  # Raise exception if email fails
        )
        
        # Log successful execution
        logger.info(f"Welcome email sent to user {user_id}")
        return f"Email sent to {user.email}"
        
    except User.DoesNotExist:
        # User not found - this is a permanent error, don't retry
        logger.error(f"User {user_id} not found")
        raise
        
    except Exception as exc:
        # Email sending failed - this might be temporary, so retry
        logger.error(f"Failed to send email to user {user_id}: {exc}")
        
        # Retry with exponential backoff
        # First retry: 60 seconds, second: 120 seconds, third: 240 seconds
        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))

@shared_task
def update_user_profile(user_id, profile_data):
    """
    Update user profile asynchronously
    
    This task demonstrates:
    - Updating database records
    - Triggering other tasks (task chaining)
    - Error handling for data operations
    """
    try:
        user = User.objects.get(id=user_id)
        
        # Update user fields from profile_data
        for field, value in profile_data.items():
            if hasattr(user, field):
                setattr(user, field, value)
        
        user.save()
        
        # Trigger another task to notify other services
        # This is called "task chaining" - one task triggers another
        notify_profile_update.delay(user_id, profile_data)
        
        logger.info(f"Profile updated for user {user_id}")
        return f"Profile updated for user {user_id}"
        
    except User.DoesNotExist:
        logger.error(f"User {user_id} not found for profile update")
        raise
    except Exception as exc:
        logger.error(f"Failed to update profile for user {user_id}: {exc}")
        raise

@shared_task
def notify_profile_update(user_id, profile_data):
    """
    Notify other services about profile updates
    
    This task demonstrates:
    - Inter-service communication via HTTP
    - Handling multiple service calls
    - Graceful error handling for external services
    """
    # List of services that need to know about profile updates
    services = [
        'http://order-service:8001/api/user-updated/',
        'http://notification-service:8002/api/user-updated/',
    ]
    
    # Prepare the payload to send to other services
    payload = {
        'user_id': user_id,
        'profile_data': profile_data,
        'timestamp': timezone.now().isoformat(),
        'event_type': 'user_profile_updated'
    }
    
    # Notify each service
    for service_url in services:
        try:
            response = requests.post(
                service_url,
                json=payload,
                timeout=10,  # Don't wait more than 10 seconds
                headers={'Content-Type': 'application/json'}
            )
            response.raise_for_status()  # Raise exception for HTTP errors
            
            logger.info(f"Notified {service_url} about user {user_id} update")
            
        except requests.RequestException as exc:
            # Log the error but don't fail the entire task
            # In production, you might want to retry failed notifications
            logger.error(f"Failed to notify {service_url}: {exc}")

Key patterns demonstrated:

  1. Error Handling: Different types of errors are handled differently. Permanent errors (like user not found) don't retry, while temporary errors (like network issues) do.
  2. Task Chaining: One task can trigger another task, allowing you to build complex workflows.
  3. Inter-service Communication: Tasks can make HTTP calls to other services, enabling loose coupling between services.
  4. Logging: Comprehensive logging helps with debugging and monitoring in production.

Order Service Tasks

The order service demonstrates more complex orchestration patterns. Order processing typically involves multiple steps that must happen in sequence, and some steps can happen in parallel. Let's see how Celery handles these scenarios:

# order_service/tasks.py
from celery import shared_task, group, chain, chord
from django.utils import timezone
from .models import Order, OrderItem
import requests
import logging

logger = logging.getLogger(__name__)

@shared_task(bind=True, max_retries=3)
def process_order(self, order_id):
    """
    Main order processing task that orchestrates the entire workflow
    
    This task demonstrates:
    - Complex workflow orchestration
    - Using Celery's chain primitive for sequential tasks
    - Error handling for business processes
    - Database state management
    """
    try:
        # Get the order from database
        order = Order.objects.get(id=order_id)
        
        # Update order status to indicate processing has started
        order.status = 'processing'
        order.processing_started_at = timezone.now()
        order.save()
        
        # Create a workflow chain - these tasks will execute in sequence
        # Each task passes its result to the next task
        workflow = chain(
            validate_inventory.s(order_id),      # Step 1: Check if items are in stock
            calculate_shipping.s(),              # Step 2: Calculate shipping costs
            process_payment.s(),                 # Step 3: Process payment
            send_confirmation.s()                # Step 4: Send confirmation email
        )
        
        # Execute the workflow asynchronously
        result = workflow.apply_async()
        
        logger.info(f"Order {order_id} processing workflow initiated")
        return f"Order {order_id} processing initiated"
        
    except Order.DoesNotExist:
        logger.error(f"Order {order_id} not found")
        raise
    except Exception as exc:
        logger.error(f"Failed to process order {order_id}: {exc}")
        # Update order status to failed
        try:
            order = Order.objects.get(id=order_id)
            order.status = 'failed'
            order.error_message = str(exc)
            order.save()
        except:
            pass  # Don't fail if we can't update status
        
        raise self.retry(exc=exc, countdown=60)

@shared_task
def validate_inventory(order_id):
    """
    Validate that all items in the order are available in inventory
    
    This task demonstrates:
    - External service communication
    - Business logic validation
    - Error handling for business rules
    """
    order = Order.objects.get(id=order_id)
    
    logger.info(f"Validating inventory for order {order_id}")
    
    # Check inventory for each item in the order
    for item in order.items.all():
        try:
            # Call the inventory service to check stock
            response = requests.post(
                'http://inventory-service:8003/api/check-stock/',
                json={
                    'product_id': item.product_id,
                    'quantity': item.quantity
                },
                timeout=10
            )
            response.raise_for_status()
            
            stock_data = response.json()
            if not stock_data.get('available'):
                # Not enough stock - this is a business rule violation
                raise Exception(f"Insufficient stock for product {item.product_id}")
                
            logger.info(f"Stock validated for product {item.product_id}")
            
        except requests.RequestException as exc:
            logger.error(f"Failed to check stock for product {item.product_id}: {exc}")
            raise Exception(f"Inventory service unavailable: {exc}")
    
    logger.info(f"All items validated for order {order_id}")
    return order_id  # Pass order_id to next task in chain

@shared_task
def calculate_shipping(order_id):
    """
    Calculate shipping costs for the order
    
    This task demonstrates:
    - External service integration
    - Data enrichment (adding calculated fields)
    - Error handling for calculations
    """
    order = Order.objects.get(id=order_id)
    
    logger.info(f"Calculating shipping for order {order_id}")
    
    try:
        # Call shipping service to calculate costs
        response = requests.post(
            'http://shipping-service:8004/api/calculate/',
            json={
                'order_id': order_id,
                'destination': order.shipping_address,
                'weight': order.total_weight,
                'items': [
                    {
                        'product_id': item.product_id,
                        'quantity': item.quantity,
                        'weight': item.weight
                    }
                    for item in order.items.all()
                ]
            },
            timeout=15
        )
        response.raise_for_status()
        
        shipping_data = response.json()
        shipping_cost = shipping_data.get('cost', 0)
        estimated_delivery = shipping_data.get('estimated_delivery')
        
        # Update order with shipping information
        order.shipping_cost = shipping_cost
        order.estimated_delivery = estimated_delivery
        order.total_amount = order.subtotal + shipping_cost
        order.save()
        
        logger.info(f"Shipping calculated for order {order_id}: ${shipping_cost}")
        return order_id
        
    except requests.RequestException as exc:
        logger.error(f"Failed to calculate shipping for order {order_id}: {exc}")
        raise Exception(f"Shipping service unavailable: {exc}")

@shared_task
def process_payment(order_id):
    """
    Process payment for the order
    
    This task demonstrates:
    - Critical business operations
    - Transaction handling
    - External payment processing
    - Detailed error handling
    """
    order = Order.objects.get(id=order_id)
    
    logger.info(f"Processing payment for order {order_id}")
    
    try:
        # Call payment service to charge the customer
        response = requests.post(
            'http://payment-service:8005/api/charge/',
            json={
                'order_id': order_id,
                'amount': float(order.total_amount),
                'currency': 'USD',
                'payment_method': order.payment_method,
                'customer_id': order.user_id,
                'description': f'Order #{order_id}'
            },
            timeout=30  # Payment processing might take longer
        )
        response.raise_for_status()
        
        payment_data = response.json()
        
        if payment_data.get('success'):
            # Payment successful - update order
            order.status = 'paid'
            order.payment_id = payment_data.get('payment_id')
            order.paid_at = timezone.now()
            order.save()
            
            logger.info(f"Payment processed for order {order_id}: {payment_data.get('payment_id')}")
            return order_id
        else:
            # Payment failed
            error_message = payment_data.get('error', 'Payment processing failed')
            logger.error(f"Payment failed for order {order_id}: {error_message}")
            raise Exception(f"Payment failed: {error_message}")
            
    except requests.RequestException as exc:
        logger.error(f"Payment service error for order {order_id}: {exc}")
        raise Exception(f"Payment service unavailable: {exc}")

@shared_task
def send_confirmation(order_id):
    """
    Send order confirmation to customer
    
    This task demonstrates:
    - Final step in workflow
    - Customer communication
    - Order completion
    """
    order = Order.objects.get(id=order_id)
    
    logger.info(f"Sending confirmation for order {order_id}")
    
    # Send confirmation email to customer
    send_order_email.delay(
        order.user.email,
        'Order Confirmation',
        f'Your order #{order_id} has been confirmed and will be shipped soon!'
    )
    
    # Update final order status
    order.status = 'confirmed'
    order.confirmed_at = timezone.now()
    order.save()
    
    logger.info(f"Order {order_id} confirmed and customer notified")
    return f"Order {order_id} confirmed"

@shared_task
def send_order_email(email, subject, message):
    """
    Send order-related emails
    
    This is a utility task that can be reused for different types of order emails
    """
    try:
        from django.core.mail import send_mail
        
        send_mail(
            subject,
            message,
            'orders@example.com',
            [email],
            fail_silently=False,
        )
        
        logger.info(f"Order email sent to {email}")
        return f"Email sent to {email}"
        
    except Exception as exc:
        logger.error(f"Failed to send email to {email}: {exc}")
        raise

Key concepts in order processing:

  1. Sequential Workflows: Using chain() ensures tasks execute in the correct order. Each task receives the result of the previous task.
  2. Business Logic: Each task handles a specific business concern (inventory, shipping, payment), making the code modular and testable.
  3. Error Propagation: If any task in the chain fails, the entire workflow stops, preventing inconsistent states.
  4. State Management: The order status is updated at each step, providing visibility into the process.
  5. External Service Integration: Each task handles communication with external services, including proper error handling and timeouts.

Advanced Orchestration Patterns

Celery provides several powerful primitives for orchestrating complex workflows. Understanding these patterns is crucial for building robust microservices that can handle real-world business processes.

Understanding Celery Primitives

Before diving into examples, let's understand the key orchestration primitives:

  1. Chain: Execute tasks sequentially, where each task receives the result of the previous task
  2. Group: Execute tasks in parallel, all tasks run simultaneously
  3. Chord: Execute tasks in parallel, then execute a callback task with all results
  4. Map: Execute the same task with different arguments in parallel
  5. Starmap: Like map, but unpacks arguments

Workflow Orchestration with Chains and Groups

Let's see how to combine these primitives for complex business processes:

# Complex workflow orchestration
from celery import chain, group, chord

@shared_task
def orchestrate_user_onboarding(user_id):
    """
    Orchestrate complete user onboarding process
    
    This demonstrates a real-world scenario where some tasks must happen
    in sequence (user validation) while others can happen in parallel
    (sending emails, setting up preferences).
    """
    
    # Step 1: Sequential tasks that must happen in order
    # These tasks depend on each other's results
    sequential_tasks = chain(
        validate_user_data.s(user_id),      # First: validate the user data
        create_user_profile.s(),            # Then: create profile (needs validated data)
        assign_user_role.s(),               # Finally: assign role (needs profile)
    )
    
    # Step 2: Parallel tasks that can run simultaneously
    # These tasks are independent and can run at the same time
    parallel_tasks = group(
        send_welcome_email.s(user_id),      # Send welcome email
        create_user_preferences.s(user_id), # Set up default preferences
        setup_default_settings.s(user_id),  # Configure default settings
        notify_admin_new_user.s(user_id),   # Notify administrators
    )
    
    # Step 3: Combine sequential and parallel execution
    # First run sequential tasks, then run parallel tasks
    workflow = chain(
        sequential_tasks,                    # Must complete first
        parallel_tasks,                      # Then run in parallel
        finalize_onboarding.s(user_id)       # Finally, complete onboarding
    )
    
    # Execute the entire workflow
    result = workflow.apply_async()
    
    return {
        'workflow_id': result.id,
        'status': 'initiated',
        'user_id': user_id
    }

@shared_task
def validate_user_data(user_id):
    """Validate user data and return validation results"""
    user = User.objects.get(id=user_id)
    
    # Perform various validations
    validations = {
        'email_valid': '@' in user.email,
        'name_provided': bool(user.first_name and user.last_name),
        'age_appropriate': True,  # Add age validation logic
    }
    
    if not all(validations.values()):
        raise Exception(f"User validation failed: {validations}")
    
    logger.info(f"User {user_id} validation passed")
    return {'user_id': user_id, 'validations': validations}

@shared_task
def create_user_profile(validation_result):
    """Create user profile based on validation results"""
    user_id = validation_result['user_id']
    user = User.objects.get(id=user_id)
    
    # Create profile with validated data
    profile = UserProfile.objects.create(
        user=user,
        status='active',
        created_via='onboarding_workflow'
    )
    
    logger.info(f"Profile created for user {user_id}")
    return {'user_id': user_id, 'profile_id': profile.id}

@shared_task
def orchestrate_order_fulfillment(order_id):
    """
    Orchestrate order fulfillment across multiple services
    
    This demonstrates the chord pattern - multiple tasks run in parallel,
    and when they all complete, a callback task runs with all the results.
    """
    
    # Preparation tasks that can run in parallel
    # All of these need to complete before we can fulfill the order
    preparation_tasks = group(
        reserve_inventory.s(order_id),           # Reserve items in warehouse
        calculate_taxes.s(order_id),             # Calculate tax amounts
        validate_shipping_address.s(order_id),   # Verify shipping address
        check_fraud_score.s(order_id),          # Run fraud detection
    )
    
    # Use chord to run preparation tasks in parallel, then run callback
    # The callback receives a list of all the results
    fulfillment_workflow = chord(
        preparation_tasks,                       # Run these in parallel
        finalize_order_preparation.s(order_id)  # Then run this with all results
    )
    
    result = fulfillment_workflow.apply_async()
    
    return {
        'workflow_id': result.id,
        'order_id': order_id,
        'status': 'preparation_started'
    }

@shared_task
def reserve_inventory(order_id):
    """Reserve inventory items for the order"""
    order = Order.objects.get(id=order_id)
    
    reservations = []
    for item in order.items.all():
        # Call inventory service to reserve items
        response = requests.post(
            'http://inventory-service:8003/api/reserve/',
            json={
                'product_id': item.product_id,
                'quantity': item.quantity,
                'order_id': order_id
            }
        )
        
        if response.status_code == 200:
            reservation_id = response.json().get('reservation_id')
            reservations.append({
                'product_id': item.product_id,
                'reservation_id': reservation_id
            })
        else:
            raise Exception(f"Failed to reserve {item.product_id}")
    
    return {'order_id': order_id, 'reservations': reservations}

@shared_task
def calculate_taxes(order_id):
    """Calculate tax amounts for the order"""
    order = Order.objects.get(id=order_id)
    
    # Call tax service
    response = requests.post(
        'http://tax-service:8006/api/calculate/',
        json={
            'order_id': order_id,
            'shipping_address': order.shipping_address,
            'items': [
                {
                    'product_id': item.product_id,
                    'price': float(item.price),
                    'quantity': item.quantity
                }
                for item in order.items.all()
            ]
        }
    )
    
    tax_data = response.json()
    return {'order_id': order_id, 'tax_amount': tax_data.get('total_tax', 0)}

@shared_task
def finalize_order_preparation(preparation_results, order_id):
    """
    Finalize order preparation using results from all preparation tasks
    
    This callback task receives the results from all the parallel tasks
    and uses them to complete the order preparation.
    """
    order = Order.objects.get(id=order_id)
    
    # Process results from all preparation tasks
    total_tax = 0
    reservations = []
    
    for result in preparation_results:
        if 'tax_amount' in result:
            total_tax += result['tax_amount']
        elif 'reservations' in result:
            reservations.extend(result['reservations'])
    
    # Update order with calculated values
    order.tax_amount = total_tax
    order.total_amount = order.subtotal + order.shipping_cost + total_tax
    order.status = 'ready_to_ship'
    order.save()
    
    # Store reservation information
    for reservation in reservations:
        OrderReservation.objects.create(
            order=order,
            product_id=reservation['product_id'],
            reservation_id=reservation['reservation_id']
        )
    
    logger.info(f"Order {order_id} preparation finalized")
    return f"Order {order_id} ready for fulfillment"

Key benefits of these patterns:

  1. Parallel Execution: Tasks that don't depend on each other can run simultaneously, reducing total processing time.
  2. Result Aggregation: Chord pattern allows you to collect results from multiple parallel tasks and process them together.
  3. Fault Isolation: If one task in a group fails, others can still complete successfully.
  4. Scalability: Parallel tasks can run on different workers, distributing load across your infrastructure.

Event-Driven Communication

Event-driven architecture is a powerful pattern for microservices where services communicate by publishing and consuming events. Instead of direct service-to-service calls, services publish events when something interesting happens, and other services can subscribe to these events.

Benefits of event-driven communication:

  • Loose coupling: Services don't need to know about each other directly
  • Scalability: Events can be processed asynchronously
  • Resilience: If a service is down, events can be processed later
  • Extensibility: New services can easily subscribe to existing events
# Event publishing and handling
@shared_task
def publish_user_event(event_type, user_id, data=None):
    """
    Publish user-related events to interested services
    
    This is the publisher side of event-driven communication.
    When something happens to a user (created, updated, deleted),
    this task publishes an event that other services can react to.
    
    Args:
        event_type: Type of event (e.g., 'user.created', 'user.updated')
        user_id: ID of the user the event relates to
        data: Additional event data
    """
    
    # Create a standardized event payload
    event_payload = {
        'event_type': event_type,
        'user_id': user_id,
        'data': data or {},
        'timestamp': timezone.now().isoformat(),
        'service': 'user_service',
        'event_id': str(uuid.uuid4()),  # Unique identifier for this event
        'version': '1.0'  # Event schema version for compatibility
    }
    
    # Define which services are interested in which events
    # This routing map determines where each event type gets sent
    routing_map = {
        'user.created': ['notification_queue', 'analytics_queue', 'recommendation_queue'],
        'user.updated': ['order_queue', 'recommendation_queue', 'cache_invalidation_queue'],
        'user.deleted': ['cleanup_queue', 'analytics_queue', 'gdpr_compliance_queue'],
        'user.login': ['analytics_queue', 'security_queue'],
        'user.password_changed': ['security_queue', 'notification_queue'],
    }
    
    # Get the queues that should receive this event type
    target_queues = routing_map.get(event_type, [])
    
    if not target_queues:
        logger.warning(f"No routing defined for event type: {event_type}")
        return
    
    # Send the event to all interested queues
    for queue in target_queues:
        try:
            handle_user_event.apply_async(
                args=[event_payload],
                queue=queue,
                retry=True,
                retry_policy={
                    'max_retries': 3,
                    'interval_start': 0,
                    'interval_step': 0.2,
                    'interval_max': 0.2,
                }
            )
            logger.info(f"Event {event_type} sent to queue {queue}")
            
        except Exception as exc:
            logger.error(f"Failed to send event {event_type} to queue {queue}: {exc}")

@shared_task
def handle_user_event(event_payload):
    """
    Handle incoming user events
    
    This is the consumer side of event-driven communication.
    Different services can implement this task to react to user events
    in their own specific way.
    
    Args:
        event_payload: The event data published by another service
    """
    event_type = event_payload['event_type']
    user_id = event_payload['user_id']
    event_data = event_payload.get('data', {})
    
    logger.info(f"Handling event {event_type} for user {user_id}")
    
    # Define handlers for different event types
    # Each handler implements the business logic for reacting to that event
    handlers = {
        'user.created': handle_user_created,
        'user.updated': handle_user_updated,
        'user.deleted': handle_user_deleted,
        'user.login': handle_user_login,
        'user.password_changed': handle_password_changed,
    }
    
    # Get the appropriate handler for this event type
    handler = handlers.get(event_type)
    if handler:
        try:
            result = handler(event_payload)
            logger.info(f"Successfully handled event {event_type} for user {user_id}")
            return result
        except Exception as exc:
            logger.error(f"Error handling event {event_type} for user {user_id}: {exc}")
            raise
    else:
        logger.warning(f"No handler for event type: {event_type}")

def handle_user_created(event_payload):
    """
    Handle user creation events
    
    This might be implemented differently in different services:
    - Analytics service: Track new user registration
    - Notification service: Send welcome email
    - Recommendation service: Initialize user preferences
    """
    user_id = event_payload['user_id']
    user_data = event_payload.get('data', {})
    
    # Example: Analytics service implementation
    if get_current_service() == 'analytics_service':
        # Track user registration in analytics
        track_user_registration.delay(user_id, user_data)
        
        # Update user statistics
        update_registration_stats.delay()
    
    # Example: Notification service implementation
    elif get_current_service() == 'notification_service':
        # Send welcome email
        send_welcome_email.delay(user_id)
        
        # Create notification preferences
        create_notification_preferences.delay(user_id)
    
    return f"User creation event handled for user {user_id}"

def handle_user_updated(event_payload):
    """Handle user update events"""
    user_id = event_payload['user_id']
    updated_fields = event_payload.get('data', {}).get('updated_fields', [])
    
    # Different services react to different field updates
    if 'email' in updated_fields:
        # Email changed - update email verification status
        verify_new_email.delay(user_id)
    
    if 'preferences' in updated_fields:
        # Preferences changed - update recommendation engine
        update_user_recommendations.delay(user_id)
    
    # Invalidate caches that might contain user data
    invalidate_user_caches.delay(user_id)
    
    return f"User update event handled for user {user_id}"

def handle_user_deleted(event_payload):
    """Handle user deletion events"""
    user_id = event_payload['user_id']
    
    # GDPR compliance - remove user data from all systems
    cleanup_user_data.delay(user_id)
    
    # Update analytics
    track_user_deletion.delay(user_id)
    
    # Cancel any pending tasks for this user
    cancel_user_tasks.delay(user_id)
    
    return f"User deletion event handled for user {user_id}"

# Example of how to publish events from your Django views or services
def create_user_with_events(user_data):
    """
    Create a user and publish appropriate events
    
    This shows how to integrate event publishing into your business logic
    """
    # Create the user
    user = User.objects.create(**user_data)
    
    # Publish user creation event
    publish_user_event.delay(
        event_type='user.created',
        user_id=user.id,
        data={
            'username': user.username,
            'email': user.email,
            'registration_source': 'web',
            'user_agent': request.META.get('HTTP_USER_AGENT', ''),
        }
    )
    
    return user

# Example of event-driven cache invalidation
@shared_task
def invalidate_user_caches(user_id):
    """
    Invalidate all caches related to a user
    
    This task can be triggered by user update events to ensure
    cached data stays consistent across services
    """
    cache_keys = [
        f'user:{user_id}',
        f'user_profile:{user_id}',
        f'user_preferences:{user_id}',
        f'user_orders:{user_id}',
    ]
    
    for key in cache_keys:
        cache.delete(key)
        logger.debug(f"Invalidated cache key: {key}")
    
    # Also invalidate pattern-based caches
    cache_patterns = [
        f'user_list:*',
        f'user_stats:*',
    ]
    
    for pattern in cache_patterns:
        invalidate_cache_pattern.delay(pattern)

Key concepts in event-driven communication:

  1. Event Schema: Standardized event format ensures all services can understand events
  2. Event Routing: Different event types go to different queues based on which services care about them
  3. Event Handlers: Each service implements handlers for events it cares about
  4. Idempotency: Events should be safe to process multiple times
  5. Event Versioning: Include version information for backward compatibility

Monitoring and Error Handling

Task Monitoring

# Monitoring and logging
import structlog
from celery.signals import task_prerun, task_postrun, task_failure

logger = structlog.get_logger()

@task_prerun.connect
def task_prerun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):
    logger.info("Task started", task_id=task_id, task_name=task.name)

@task_postrun.connect
def task_postrun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds):
    logger.info("Task completed", task_id=task_id, task_name=task.name, state=state)

@task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, traceback=None, einfo=None, **kwds):
    logger.error("Task failed", task_id=task_id, task_name=sender.name, exception=str(exception))

# Custom task base class with enhanced error handling
from celery import Task

class CallbackTask(Task):
    """Custom task class with callbacks and enhanced error handling"""
    
    def on_success(self, retval, task_id, args, kwargs):
        """Called on task success"""
        logger.info(f"Task {task_id} succeeded with result: {retval}")
    
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        """Called on task failure"""
        logger.error(f"Task {task_id} failed: {exc}")
        
        # Send failure notification
        send_failure_notification.delay(task_id, str(exc))
    
    def on_retry(self, exc, task_id, args, kwargs, einfo):
        """Called on task retry"""
        logger.warning(f"Task {task_id} retrying due to: {exc}")

@shared_task(base=CallbackTask, bind=True)
def robust_task(self, data):
    """Example task with robust error handling"""
    try:
        # Task logic here
        result = process_data(data)
        return result
    except RetryableException as exc:
        # Retry with exponential backoff
        raise self.retry(
            exc=exc,
            countdown=2 ** self.request.retries,
            max_retries=5
        )
    except FatalException as exc:
        # Don't retry fatal exceptions
        logger.error(f"Fatal error in task: {exc}")
        raise

Health Checks and Circuit Breakers

# Circuit breaker pattern for external service calls
import time
from functools import wraps

class CircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = 'CLOSED'  # CLOSED, OPEN, HALF_OPEN
    
    def call(self, func, *args, **kwargs):
        if self.state == 'OPEN':
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = 'HALF_OPEN'
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = func(*args, **kwargs)
            self.on_success()
            return result
        except Exception as exc:
            self.on_failure()
            raise exc
    
    def on_success(self):
        self.failure_count = 0
        self.state = 'CLOSED'
    
    def on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = 'OPEN'

# Usage in tasks
payment_circuit_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)

@shared_task(bind=True, max_retries=3)
def process_payment_with_circuit_breaker(self, order_id, amount):
    """Process payment with circuit breaker protection"""
    try:
        def make_payment_call():
            response = requests.post(
                'http://payment-service:8005/api/charge/',
                json={'order_id': order_id, 'amount': amount},
                timeout=10
            )
            response.raise_for_status()
            return response.json()
        
        result = payment_circuit_breaker.call(make_payment_call)
        return result
        
    except Exception as exc:
        logger.error(f"Payment processing failed: {exc}")
        raise self.retry(exc=exc, countdown=60)

Deployment and Scaling

Docker Configuration

# Dockerfile for Celery worker
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

# Create celery user
RUN adduser --disabled-password --gecos '' celery

USER celery

CMD ["celery", "-A", "user_service", "worker", "--loglevel=info", "--concurrency=4"]

Docker Compose for Complete Setup

# docker-compose.yml for microservices with Celery
version: '3.8'

services:
  rabbitmq:
    image: rabbitmq:3.11-management
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: password123
    ports:
      - "5672:5672"
      - "15672:15672"
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

  user-service:
    build: ./user_service
    ports:
      - "8000:8000"
    depends_on:
      - rabbitmq
      - redis
    environment:
      - CELERY_BROKER_URL=amqp://admin:password123@rabbitmq:5672//
      - CELERY_RESULT_BACKEND=redis://redis:6379/0

  user-worker:
    build: ./user_service
    command: celery -A user_service worker --loglevel=info --concurrency=4 -Q user_queue
    depends_on:
      - rabbitmq
      - redis
    environment:
      - CELERY_BROKER_URL=amqp://admin:password123@rabbitmq:5672//
      - CELERY_RESULT_BACKEND=redis://redis:6379/0

  order-service:
    build: ./order_service
    ports:
      - "8001:8000"
    depends_on:
      - rabbitmq
      - redis

  order-worker:
    build: ./order_service
    command: celery -A order_service worker --loglevel=info --concurrency=2 -Q order_queue
    depends_on:
      - rabbitmq
      - redis

  celery-beat:
    build: ./user_service
    command: celery -A user_service beat --loglevel=info
    depends_on:
      - rabbitmq
      - redis
    environment:
      - CELERY_BROKER_URL=amqp://admin:password123@rabbitmq:5672//

  flower:
    build: ./user_service
    command: celery -A user_service flower --port=5555
    ports:
      - "5555:5555"
    depends_on:
      - rabbitmq
      - redis
    environment:
      - CELERY_BROKER_URL=amqp://admin:password123@rabbitmq:5672//

volumes:
  rabbitmq_data:

Best Practices and Troubleshooting

Performance Optimization

# Optimized task configuration
@shared_task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,
    rate_limit='100/m',  # 100 tasks per minute
    time_limit=300,      # 5 minutes hard limit
    soft_time_limit=240, # 4 minutes soft limit
)
def optimized_task(self, data):
    """Example of well-configured task"""
    pass

# Batch processing for efficiency
@shared_task
def process_batch_emails(email_batch):
    """Process emails in batches for better performance"""
    from django.core.mail import send_mass_mail
    
    messages = []
    for email_data in email_batch:
        message = (
            email_data['subject'],
            email_data['body'],
            'noreply@example.com',
            [email_data['recipient']]
        )
        messages.append(message)
    
    send_mass_mail(messages, fail_silently=False)
    return f"Sent {len(messages)} emails"

# Chunking large datasets
from celery import group

@shared_task
def process_large_dataset(dataset_id, chunk_size=100):
    """Process large datasets in chunks"""
    dataset = get_dataset(dataset_id)
    total_items = dataset.count()
    
    # Create chunks
    chunks = []
    for i in range(0, total_items, chunk_size):
        chunk_data = list(dataset[i:i + chunk_size])
        chunks.append(process_chunk.s(chunk_data))
    
    # Process chunks in parallel
    job = group(chunks)
    result = job.apply_async()
    
    return f"Processing {len(chunks)} chunks"

@shared_task
def process_chunk(chunk_data):
    """Process a single chunk of data"""
    results = []
    for item in chunk_data:
        result = process_single_item(item)
        results.append(result)
    return results

Common Issues and Solutions

  1. Memory Leaks: Use CELERY_WORKER_MAX_TASKS_PER_CHILD to restart workers periodically
  2. Task Routing: Ensure proper queue configuration and routing
  3. Connection Issues: Implement connection pooling and retry logic
  4. Monitoring: Use Flower for real-time monitoring
  5. Logging: Implement structured logging for better debugging

This comprehensive guide provides the foundation for orchestrating microservices with Celery and RabbitMQ. The patterns and examples shown here can be adapted to your specific use cases and requirements.