Microservices with Django

Transforming a Monolithic Web App into a Microservice version

Migrating from a monolithic Django application to microservices is a complex but rewarding journey. This section provides a comprehensive guide for planning, executing, and managing this transformation while maintaining system stability and business continuity.

Transforming a Monolithic Web App into a Microservice version

Migrating from a monolithic Django application to microservices is a complex but rewarding journey. This section provides a comprehensive guide for planning, executing, and managing this transformation while maintaining system stability and business continuity.

Assessment and Planning

1. Monolith Analysis

Before starting the migration, thoroughly analyze your existing monolithic application:

# analysis_tools.py
import ast
import os
from collections import defaultdict, Counter
import networkx as nx

class MonolithAnalyzer:
    """Analyze monolithic Django application structure"""
    
    def __init__(self, project_path):
        self.project_path = project_path
        self.apps = []
        self.models = {}
        self.views = {}
        self.dependencies = defaultdict(set)
        self.database_usage = defaultdict(set)
    
    def analyze_project(self):
        """Perform comprehensive project analysis"""
        self.discover_apps()
        self.analyze_models()
        self.analyze_views()
        self.analyze_dependencies()
        self.analyze_database_usage()
        
        return self.generate_report()
    
    def discover_apps(self):
        """Discover Django apps in the project"""
        for root, dirs, files in os.walk(self.project_path):
            if 'apps.py' in files or 'models.py' in files:
                app_name = os.path.basename(root)
                if app_name not in ['migrations', '__pycache__']:
                    self.apps.append(app_name)
    
    def analyze_models(self):
        """Analyze model relationships and complexity"""
        for app in self.apps:
            models_file = os.path.join(self.project_path, app, 'models.py')
            if os.path.exists(models_file):
                self.models[app] = self._parse_models(models_file)
    
    def analyze_views(self):
        """Analyze view complexity and dependencies"""
        for app in self.apps:
            views_file = os.path.join(self.project_path, app, 'views.py')
            if os.path.exists(views_file):
                self.views[app] = self._parse_views(views_file)
    
    def analyze_dependencies(self):
        """Analyze inter-app dependencies"""
        for app in self.apps:
            app_path = os.path.join(self.project_path, app)
            for root, dirs, files in os.walk(app_path):
                for file in files:
                    if file.endswith('.py'):
                        file_path = os.path.join(root, file)
                        deps = self._extract_dependencies(file_path)
                        self.dependencies[app].update(deps)
    
    def _parse_models(self, models_file):
        """Parse models from file"""
        with open(models_file, 'r') as f:
            content = f.read()
        
        try:
            tree = ast.parse(content)
            models = []
            
            for node in ast.walk(tree):
                if isinstance(node, ast.ClassDef):
                    # Check if it's a Django model
                    for base in node.bases:
                        if isinstance(base, ast.Attribute) and base.attr == 'Model':
                            models.append({
                                'name': node.name,
                                'fields': self._extract_model_fields(node),
                                'methods': [n.name for n in node.body if isinstance(n, ast.FunctionDef)]
                            })
            
            return models
        except SyntaxError:
            return []
    
    def _extract_model_fields(self, class_node):
        """Extract model fields from class definition"""
        fields = []
        for node in class_node.body:
            if isinstance(node, ast.Assign):
                for target in node.targets:
                    if isinstance(target, ast.Name):
                        fields.append(target.id)
        return fields
    
    def _parse_views(self, views_file):
        """Parse views from file"""
        with open(views_file, 'r') as f:
            content = f.read()
        
        try:
            tree = ast.parse(content)
            views = []
            
            for node in ast.walk(tree):
                if isinstance(node, ast.FunctionDef) or isinstance(node, ast.ClassDef):
                    views.append({
                        'name': node.name,
                        'type': 'function' if isinstance(node, ast.FunctionDef) else 'class',
                        'complexity': self._calculate_complexity(node)
                    })
            
            return views
        except SyntaxError:
            return []
    
    def _calculate_complexity(self, node):
        """Calculate cyclomatic complexity"""
        complexity = 1
        for child in ast.walk(node):
            if isinstance(child, (ast.If, ast.While, ast.For, ast.Try, ast.With)):
                complexity += 1
        return complexity
    
    def _extract_dependencies(self, file_path):
        """Extract import dependencies from file"""
        dependencies = set()
        
        try:
            with open(file_path, 'r') as f:
                content = f.read()
            
            tree = ast.parse(content)
            
            for node in ast.walk(tree):
                if isinstance(node, ast.Import):
                    for alias in node.names:
                        dependencies.add(alias.name.split('.')[0])
                elif isinstance(node, ast.ImportFrom):
                    if node.module:
                        dependencies.add(node.module.split('.')[0])
        
        except (SyntaxError, UnicodeDecodeError):
            pass
        
        return dependencies
    
    def generate_report(self):
        """Generate analysis report"""
        return {
            'apps': self.apps,
            'total_models': sum(len(models) for models in self.models.values()),
            'total_views': sum(len(views) for views in self.views.values()),
            'app_dependencies': dict(self.dependencies),
            'complexity_by_app': {
                app: sum(view['complexity'] for view in views)
                for app, views in self.views.items()
            },
            'models_by_app': {
                app: len(models) for app, models in self.models.items()
            }
        }

# Usage
analyzer = MonolithAnalyzer('/path/to/django/project')
report = analyzer.analyze_project()
print(f"Found {len(report['apps'])} apps with {report['total_models']} models")

2. Service Boundary Identification

Identify potential service boundaries using Domain-Driven Design:

# service_boundaries.py
from collections import defaultdict
import networkx as nx

class ServiceBoundaryIdentifier:
    """Identify potential microservice boundaries"""
    
    def __init__(self, analysis_report):
        self.report = analysis_report
        self.dependency_graph = nx.DiGraph()
        self.cohesion_scores = {}
        self.coupling_scores = {}
    
    def identify_boundaries(self):
        """Identify service boundaries using various metrics"""
        self._build_dependency_graph()
        self._calculate_cohesion()
        self._calculate_coupling()
        self._identify_clusters()
        
        return self._generate_recommendations()
    
    def _build_dependency_graph(self):
        """Build dependency graph between apps"""
        for app, dependencies in self.report['app_dependencies'].items():
            for dep in dependencies:
                if dep in self.report['apps']:
                    self.dependency_graph.add_edge(app, dep)
    
    def _calculate_cohesion(self):
        """Calculate cohesion within each app"""
        for app in self.report['apps']:
            model_count = self.report['models_by_app'].get(app, 0)
            complexity = self.report['complexity_by_app'].get(app, 0)
            
            # Higher model count and complexity indicate higher cohesion
            self.cohesion_scores[app] = model_count * 0.6 + complexity * 0.4
    
    def _calculate_coupling(self):
        """Calculate coupling between apps"""
        for app in self.report['apps']:
            dependencies = self.report['app_dependencies'].get(app, set())
            # More dependencies indicate higher coupling
            self.coupling_scores[app] = len(dependencies)
    
    def _identify_clusters(self):
        """Identify clusters of related apps"""
        # Use community detection algorithms
        try:
            import community
            partition = community.best_partition(self.dependency_graph.to_undirected())
            
            clusters = defaultdict(list)
            for app, cluster_id in partition.items():
                clusters[cluster_id].append(app)
            
            return dict(clusters)
        except ImportError:
            # Fallback to simple clustering
            return self._simple_clustering()
    
    def _simple_clustering(self):
        """Simple clustering based on dependencies"""
        clusters = defaultdict(list)
        visited = set()
        cluster_id = 0
        
        for app in self.report['apps']:
            if app not in visited:
                cluster = self._get_connected_components(app, visited)
                clusters[cluster_id] = cluster
                cluster_id += 1
        
        return dict(clusters)
    
    def _get_connected_components(self, start_app, visited):
        """Get connected components starting from an app"""
        component = []
        stack = [start_app]
        
        while stack:
            app = stack.pop()
            if app not in visited:
                visited.add(app)
                component.append(app)
                
                # Add neighbors
                neighbors = list(self.dependency_graph.neighbors(app))
                neighbors.extend(list(self.dependency_graph.predecessors(app)))
                
                for neighbor in neighbors:
                    if neighbor not in visited:
                        stack.append(neighbor)
        
        return component
    
    def _generate_recommendations(self):
        """Generate service boundary recommendations"""
        clusters = self._identify_clusters()
        
        recommendations = []
        
        for cluster_id, apps in clusters.items():
            total_models = sum(self.report['models_by_app'].get(app, 0) for app in apps)
            total_complexity = sum(self.report['complexity_by_app'].get(app, 0) for app in apps)
            avg_coupling = sum(self.coupling_scores.get(app, 0) for app in apps) / len(apps)
            
            service_name = f"service_{cluster_id}"
            if len(apps) == 1:
                service_name = f"{apps[0]}_service"
            elif 'user' in apps:
                service_name = "user_service"
            elif 'product' in apps:
                service_name = "product_service"
            elif 'order' in apps:
                service_name = "order_service"
            
            recommendations.append({
                'service_name': service_name,
                'apps': apps,
                'total_models': total_models,
                'total_complexity': total_complexity,
                'average_coupling': avg_coupling,
                'recommendation_score': self._calculate_recommendation_score(
                    total_models, total_complexity, avg_coupling
                )
            })
        
        # Sort by recommendation score
        recommendations.sort(key=lambda x: x['recommendation_score'], reverse=True)
        
        return recommendations
    
    def _calculate_recommendation_score(self, models, complexity, coupling):
        """Calculate recommendation score for service boundary"""
        # Higher models and complexity, lower coupling = better service boundary
        return (models * 0.4 + complexity * 0.4) - (coupling * 0.2)

# Usage
boundary_identifier = ServiceBoundaryIdentifier(report)
recommendations = boundary_identifier.identify_boundaries()

for rec in recommendations:
    print(f"Service: {rec['service_name']}")
    print(f"Apps: {rec['apps']}")
    print(f"Score: {rec['recommendation_score']:.2f}")
    print("---")

Migration Strategies

1. Strangler Fig Pattern

Gradually replace monolith functionality:

# strangler_fig.py
from django.http import HttpResponse, HttpResponseRedirect
from django.urls import reverse
from django.conf import settings
import requests
import logging

logger = logging.getLogger(__name__)

class StranglerFigMiddleware:
    """Middleware to gradually route requests to microservices"""
    
    def __init__(self, get_response):
        self.get_response = get_response
        self.migration_config = getattr(settings, 'MICROSERVICE_MIGRATION', {})
    
    def __call__(self, request):
        # Check if this request should be routed to a microservice
        microservice_url = self._should_route_to_microservice(request)
        
        if microservice_url:
            return self._proxy_to_microservice(request, microservice_url)
        
        # Continue with monolith
        return self.get_response(request)
    
    def _should_route_to_microservice(self, request):
        """Determine if request should be routed to microservice"""
        path = request.path
        
        for pattern, config in self.migration_config.items():
            if path.startswith(pattern):
                # Check migration percentage
                import random
                if random.randint(1, 100) <= config.get('percentage', 0):
                    return config.get('service_url')
        
        return None
    
    def _proxy_to_microservice(self, request, service_url):
        """Proxy request to microservice"""
        try:
            # Prepare request
            url = f"{service_url.rstrip('/')}{request.path}"
            headers = self._prepare_headers(request)
            
            # Make request to microservice
            if request.method == 'GET':
                response = requests.get(url, params=request.GET, headers=headers)
            elif request.method == 'POST':
                response = requests.post(url, data=request.POST, headers=headers)
            elif request.method == 'PUT':
                response = requests.put(url, data=request.body, headers=headers)
            elif request.method == 'DELETE':
                response = requests.delete(url, headers=headers)
            else:
                # Fallback to monolith for unsupported methods
                return self.get_response(request)
            
            # Return microservice response
            django_response = HttpResponse(
                response.content,
                status=response.status_code,
                content_type=response.headers.get('content-type', 'text/html')
            )
            
            # Copy relevant headers
            for header, value in response.headers.items():
                if header.lower() not in ['content-length', 'transfer-encoding']:
                    django_response[header] = value
            
            logger.info(f"Proxied {request.method} {request.path} to microservice")
            return django_response
            
        except requests.RequestException as e:
            logger.error(f"Error proxying to microservice: {e}")
            # Fallback to monolith
            return self.get_response(request)
    
    def _prepare_headers(self, request):
        """Prepare headers for microservice request"""
        headers = {}
        
        # Copy relevant headers
        for header, value in request.META.items():
            if header.startswith('HTTP_'):
                header_name = header[5:].replace('_', '-').title()
                headers[header_name] = value
        
        # Add service authentication
        headers['X-Service-Token'] = settings.SERVICE_SECRET_TOKEN
        headers['X-Original-Host'] = request.get_host()
        
        return headers

# Migration configuration example
# settings.py
MICROSERVICE_MIGRATION = {
    '/api/users/': {
        'service_url': 'http://user-service:8000',
        'percentage': 10  # Route 10% of traffic
    },
    '/api/products/': {
        'service_url': 'http://product-service:8000',
        'percentage': 25  # Route 25% of traffic
    }
}

2. Database Decomposition

Gradually separate shared databases:

# database_decomposition.py
from django.db import models, transaction
from django.conf import settings
import json
import logging

logger = logging.getLogger(__name__)

class DataMigrationManager:
    """Manage data migration between monolith and microservices"""
    
    def __init__(self):
        self.migration_log = []
    
    def migrate_user_data(self, batch_size=1000):
        """Migrate user data to user service"""
        from django.contrib.auth.models import User
        
        total_users = User.objects.count()
        migrated = 0
        
        logger.info(f"Starting migration of {total_users} users")
        
        for offset in range(0, total_users, batch_size):
            users = User.objects.all()[offset:offset + batch_size]
            
            for user in users:
                try:
                    self._migrate_single_user(user)
                    migrated += 1
                except Exception as e:
                    logger.error(f"Failed to migrate user {user.id}: {e}")
                    self.migration_log.append({
                        'type': 'error',
                        'user_id': user.id,
                        'error': str(e)
                    })
            
            logger.info(f"Migrated {migrated}/{total_users} users")
        
        return migrated
    
    def _migrate_single_user(self, user):
        """Migrate a single user to user service"""
        user_data = {
            'id': user.id,
            'username': user.username,
            'email': user.email,
            'first_name': user.first_name,
            'last_name': user.last_name,
            'is_active': user.is_active,
            'is_staff': user.is_staff,
            'date_joined': user.date_joined.isoformat(),
            'last_login': user.last_login.isoformat() if user.last_login else None
        }
        
        # Send to user service
        response = requests.post(
            f"{settings.USER_SERVICE_URL}/api/v1/users/migrate/",
            json=user_data,
            headers={'X-Service-Token': settings.SERVICE_SECRET_TOKEN}
        )
        
        if response.status_code != 201:
            raise Exception(f"User service returned {response.status_code}")
        
        self.migration_log.append({
            'type': 'success',
            'user_id': user.id,
            'migrated_at': timezone.now().isoformat()
        })

class DualWriteManager:
    """Manage dual writes during migration period"""
    
    def __init__(self):
        self.enabled = getattr(settings, 'DUAL_WRITE_ENABLED', False)
    
    def create_user(self, user_data):
        """Create user in both monolith and microservice"""
        if not self.enabled:
            return self._create_in_monolith(user_data)
        
        # Create in monolith first
        user = self._create_in_monolith(user_data)
        
        # Async create in microservice
        from celery import current_app
        current_app.send_task(
            'migration.create_user_in_microservice',
            args=[user.id, user_data]
        )
        
        return user
    
    def update_user(self, user_id, user_data):
        """Update user in both systems"""
        if not self.enabled:
            return self._update_in_monolith(user_id, user_data)
        
        # Update in monolith first
        user = self._update_in_monolith(user_id, user_data)
        
        # Async update in microservice
        from celery import current_app
        current_app.send_task(
            'migration.update_user_in_microservice',
            args=[user_id, user_data]
        )
        
        return user
    
    def _create_in_monolith(self, user_data):
        """Create user in monolith"""
        from django.contrib.auth.models import User
        return User.objects.create_user(**user_data)
    
    def _update_in_monolith(self, user_id, user_data):
        """Update user in monolith"""
        from django.contrib.auth.models import User
        user = User.objects.get(id=user_id)
        for key, value in user_data.items():
            setattr(user, key, value)
        user.save()
        return user

# Celery tasks for async operations
from celery import shared_task

@shared_task
def create_user_in_microservice(user_id, user_data):
    """Create user in microservice asynchronously"""
    try:
        response = requests.post(
            f"{settings.USER_SERVICE_URL}/api/v1/users/",
            json=user_data,
            headers={'X-Service-Token': settings.SERVICE_SECRET_TOKEN}
        )
        
        if response.status_code == 201:
            logger.info(f"User {user_id} created in microservice")
        else:
            logger.error(f"Failed to create user {user_id} in microservice: {response.status_code}")
            
    except Exception as e:
        logger.error(f"Error creating user {user_id} in microservice: {e}")

@shared_task
def update_user_in_microservice(user_id, user_data):
    """Update user in microservice asynchronously"""
    try:
        response = requests.put(
            f"{settings.USER_SERVICE_URL}/api/v1/users/{user_id}/",
            json=user_data,
            headers={'X-Service-Token': settings.SERVICE_SECRET_TOKEN}
        )
        
        if response.status_code == 200:
            logger.info(f"User {user_id} updated in microservice")
        else:
            logger.error(f"Failed to update user {user_id} in microservice: {response.status_code}")
            
    except Exception as e:
        logger.error(f"Error updating user {user_id} in microservice: {e}")

3. Event-Driven Migration

Use events to keep systems synchronized:

# event_driven_migration.py
import json
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
from django.contrib.auth.models import User
from celery import shared_task
import logging

logger = logging.getLogger(__name__)

class EventPublisher:
    """Publish events during migration"""
    
    def __init__(self):
        self.enabled = getattr(settings, 'EVENT_PUBLISHING_ENABLED', False)
    
    def publish_user_event(self, event_type, user_data):
        """Publish user-related events"""
        if not self.enabled:
            return
        
        event = {
            'event_type': event_type,
            'service': 'monolith',
            'timestamp': timezone.now().isoformat(),
            'data': user_data
        }
        
        # Send to message queue
        publish_event_async.delay('user.events', event)
    
    def publish_order_event(self, event_type, order_data):
        """Publish order-related events"""
        if not self.enabled:
            return
        
        event = {
            'event_type': event_type,
            'service': 'monolith',
            'timestamp': timezone.now().isoformat(),
            'data': order_data
        }
        
        publish_event_async.delay('order.events', event)

event_publisher = EventPublisher()

# Signal handlers for event publishing
@receiver(post_save, sender=User)
def publish_user_saved_event(sender, instance, created, **kwargs):
    """Publish user saved event"""
    event_type = 'user.created' if created else 'user.updated'
    
    user_data = {
        'id': instance.id,
        'username': instance.username,
        'email': instance.email,
        'first_name': instance.first_name,
        'last_name': instance.last_name,
        'is_active': instance.is_active,
        'date_joined': instance.date_joined.isoformat(),
        'last_login': instance.last_login.isoformat() if instance.last_login else None
    }
    
    event_publisher.publish_user_event(event_type, user_data)

@receiver(post_delete, sender=User)
def publish_user_deleted_event(sender, instance, **kwargs):
    """Publish user deleted event"""
    user_data = {
        'id': instance.id,
        'username': instance.username,
        'email': instance.email
    }
    
    event_publisher.publish_user_event('user.deleted', user_data)

@shared_task
def publish_event_async(topic, event_data):
    """Publish event to message queue asynchronously"""
    try:
        import pika
        
        connection = pika.BlockingConnection(
            pika.URLParameters(settings.RABBITMQ_URL)
        )
        channel = connection.channel()
        
        # Declare exchange
        channel.exchange_declare(
            exchange='migration.events',
            exchange_type='topic',
            durable=True
        )
        
        # Publish event
        channel.basic_publish(
            exchange='migration.events',
            routing_key=topic,
            body=json.dumps(event_data),
            properties=pika.BasicProperties(
                delivery_mode=2,  # Make message persistent
                content_type='application/json'
            )
        )
        
        connection.close()
        logger.info(f"Published event to {topic}")
        
    except Exception as e:
        logger.error(f"Failed to publish event: {e}")

class EventConsumer:
    """Consume events from monolith in microservices"""
    
    def __init__(self, service_name):
        self.service_name = service_name
    
    def start_consuming(self):
        """Start consuming events"""
        import pika
        
        connection = pika.BlockingConnection(
            pika.URLParameters(settings.RABBITMQ_URL)
        )
        channel = connection.channel()
        
        # Declare exchange
        channel.exchange_declare(
            exchange='migration.events',
            exchange_type='topic',
            durable=True
        )
        
        # Declare queue
        queue_name = f'{self.service_name}.migration.events'
        channel.queue_declare(queue=queue_name, durable=True)
        
        # Bind queue to exchange
        if self.service_name == 'user-service':
            channel.queue_bind(
                exchange='migration.events',
                queue=queue_name,
                routing_key='user.events'
            )
        elif self.service_name == 'order-service':
            channel.queue_bind(
                exchange='migration.events',
                queue=queue_name,
                routing_key='order.events'
            )
        
        # Start consuming
        channel.basic_qos(prefetch_count=1)
        channel.basic_consume(
            queue=queue_name,
            on_message_callback=self.handle_event,
            auto_ack=False
        )
        
        logger.info(f"Started consuming events for {self.service_name}")
        channel.start_consuming()
    
    def handle_event(self, ch, method, properties, body):
        """Handle incoming event"""
        try:
            event_data = json.loads(body)
            event_type = event_data['event_type']
            
            if event_type.startswith('user.'):
                self.handle_user_event(event_data)
            elif event_type.startswith('order.'):
                self.handle_order_event(event_data)
            
            # Acknowledge message
            ch.basic_ack(delivery_tag=method.delivery_tag)
            
        except Exception as e:
            logger.error(f"Error handling event: {e}")
            # Reject and requeue message
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
    
    def handle_user_event(self, event_data):
        """Handle user-related events"""
        event_type = event_data['event_type']
        user_data = event_data['data']
        
        if event_type == 'user.created':
            self.create_user_from_event(user_data)
        elif event_type == 'user.updated':
            self.update_user_from_event(user_data)
        elif event_type == 'user.deleted':
            self.delete_user_from_event(user_data)
    
    def create_user_from_event(self, user_data):
        """Create user from event data"""
        # Implementation depends on microservice
        logger.info(f"Creating user from event: {user_data['id']}")
    
    def update_user_from_event(self, user_data):
        """Update user from event data"""
        logger.info(f"Updating user from event: {user_data['id']}")
    
    def delete_user_from_event(self, user_data):
        """Delete user from event data"""
        logger.info(f"Deleting user from event: {user_data['id']}")

Data Consistency Strategies

1. Saga Pattern Implementation

# saga_pattern.py
from enum import Enum
from django.db import models, transaction
import json
import uuid
import logging

logger = logging.getLogger(__name__)

class SagaStatus(models.TextChoices):
    PENDING = 'pending', 'Pending'
    COMPLETED = 'completed', 'Completed'
    FAILED = 'failed', 'Failed'
    COMPENSATING = 'compensating', 'Compensating'

class SagaStepStatus(models.TextChoices):
    PENDING = 'pending', 'Pending'
    COMPLETED = 'completed', 'Completed'
    FAILED = 'failed', 'Failed'
    COMPENSATED = 'compensated', 'Compensated'

class Saga(models.Model):
    """Saga orchestration model"""
    
    id = models.UUIDField(primary_key=True, default=uuid.uuid4)
    saga_type = models.CharField(max_length=100)
    status = models.CharField(max_length=20, choices=SagaStatus.choices, default=SagaStatus.PENDING)
    data = models.JSONField(default=dict)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    
    class Meta:
        db_table = 'sagas'

class SagaStep(models.Model):
    """Individual saga step"""
    
    id = models.UUIDField(primary_key=True, default=uuid.uuid4)
    saga = models.ForeignKey(Saga, on_delete=models.CASCADE, related_name='steps')
    step_name = models.CharField(max_length=100)
    step_order = models.IntegerField()
    status = models.CharField(max_length=20, choices=SagaStepStatus.choices, default=SagaStepStatus.PENDING)
    service_name = models.CharField(max_length=100)
    action_data = models.JSONField(default=dict)
    compensation_data = models.JSONField(default=dict)
    result_data = models.JSONField(default=dict)
    error_message = models.TextField(blank=True)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    
    class Meta:
        db_table = 'saga_steps'
        ordering = ['step_order']

class SagaOrchestrator:
    """Orchestrate saga execution"""
    
    def __init__(self):
        self.step_handlers = {
            'create_user': self.handle_create_user,
            'create_profile': self.handle_create_profile,
            'send_welcome_email': self.handle_send_welcome_email,
            'create_order': self.handle_create_order,
            'reserve_inventory': self.handle_reserve_inventory,
            'process_payment': self.handle_process_payment,
        }
        
        self.compensation_handlers = {
            'create_user': self.compensate_create_user,
            'create_profile': self.compensate_create_profile,
            'send_welcome_email': self.compensate_send_welcome_email,
            'create_order': self.compensate_create_order,
            'reserve_inventory': self.compensate_reserve_inventory,
            'process_payment': self.compensate_process_payment,
        }
    
    def start_saga(self, saga_type, saga_data, steps):
        """Start a new saga"""
        with transaction.atomic():
            saga = Saga.objects.create(
                saga_type=saga_type,
                data=saga_data
            )
            
            # Create saga steps
            for i, step_config in enumerate(steps):
                SagaStep.objects.create(
                    saga=saga,
                    step_name=step_config['name'],
                    step_order=i,
                    service_name=step_config['service'],
                    action_data=step_config.get('data', {}),
                    compensation_data=step_config.get('compensation_data', {})
                )
        
        # Start executing saga
        self.execute_saga(saga.id)
        return saga
    
    def execute_saga(self, saga_id):
        """Execute saga steps"""
        saga = Saga.objects.get(id=saga_id)
        
        if saga.status != SagaStatus.PENDING:
            return
        
        # Get next pending step
        next_step = saga.steps.filter(status=SagaStepStatus.PENDING).first()
        
        if not next_step:
            # All steps completed
            saga.status = SagaStatus.COMPLETED
            saga.save()
            logger.info(f"Saga {saga_id} completed successfully")
            return
        
        # Execute the step
        try:
            self.execute_step(next_step)
        except Exception as e:
            logger.error(f"Saga {saga_id} step {next_step.step_name} failed: {e}")
            self.handle_saga_failure(saga)
    
    def execute_step(self, step):
        """Execute a single saga step"""
        handler = self.step_handlers.get(step.step_name)
        if not handler:
            raise ValueError(f"No handler for step {step.step_name}")
        
        try:
            result = handler(step)
            
            step.status = SagaStepStatus.COMPLETED
            step.result_data = result
            step.save()
            
            logger.info(f"Step {step.step_name} completed successfully")
            
            # Continue with next step
            from celery import current_app
            current_app.send_task(
                'saga.execute_saga',
                args=[str(step.saga.id)],
                countdown=1
            )
            
        except Exception as e:
            step.status = SagaStepStatus.FAILED
            step.error_message = str(e)
            step.save()
            
            raise e
    
    def handle_saga_failure(self, saga):
        """Handle saga failure by running compensations"""
        saga.status = SagaStatus.COMPENSATING
        saga.save()
        
        # Run compensations in reverse order
        completed_steps = saga.steps.filter(
            status=SagaStepStatus.COMPLETED
        ).order_by('-step_order')
        
        for step in completed_steps:
            try:
                self.compensate_step(step)
            except Exception as e:
                logger.error(f"Compensation failed for step {step.step_name}: {e}")
        
        saga.status = SagaStatus.FAILED
        saga.save()
    
    def compensate_step(self, step):
        """Compensate a completed step"""
        handler = self.compensation_handlers.get(step.step_name)
        if not handler:
            logger.warning(f"No compensation handler for step {step.step_name}")
            return
        
        try:
            handler(step)
            step.status = SagaStepStatus.COMPENSATED
            step.save()
            
            logger.info(f"Step {step.step_name} compensated successfully")
            
        except Exception as e:
            logger.error(f"Compensation failed for step {step.step_name}: {e}")
            raise e
    
    # Step handlers
    def handle_create_user(self, step):
        """Handle user creation step"""
        user_data = step.action_data
        
        response = requests.post(
            f"{settings.USER_SERVICE_URL}/api/v1/users/",
            json=user_data,
            headers={'X-Service-Token': settings.SERVICE_SECRET_TOKEN}
        )
        
        if response.status_code != 201:
            raise Exception(f"User creation failed: {response.status_code}")
        
        return response.json()
    
    def handle_create_profile(self, step):
        """Handle profile creation step"""
        profile_data = step.action_data
        user_id = step.saga.data.get('user_id')
        
        response = requests.post(
            f"{settings.USER_SERVICE_URL}/api/v1/users/{user_id}/profile/",
            json=profile_data,
            headers={'X-Service-Token': settings.SERVICE_SECRET_TOKEN}
        )
        
        if response.status_code != 201:
            raise Exception(f"Profile creation failed: {response.status_code}")
        
        return response.json()
    
    def handle_send_welcome_email(self, step):
        """Handle welcome email step"""
        email_data = step.action_data
        
        response = requests.post(
            f"{settings.NOTIFICATION_SERVICE_URL}/api/v1/emails/",
            json=email_data,
            headers={'X-Service-Token': settings.SERVICE_SECRET_TOKEN}
        )
        
        if response.status_code != 202:
            raise Exception(f"Email sending failed: {response.status_code}")
        
        return response.json()
    
    # Compensation handlers
    def compensate_create_user(self, step):
        """Compensate user creation"""
        user_id = step.result_data.get('id')
        if user_id:
            requests.delete(
                f"{settings.USER_SERVICE_URL}/api/v1/users/{user_id}/",
                headers={'X-Service-Token': settings.SERVICE_SECRET_TOKEN}
            )
    
    def compensate_create_profile(self, step):
        """Compensate profile creation"""
        user_id = step.saga.data.get('user_id')
        if user_id:
            requests.delete(
                f"{settings.USER_SERVICE_URL}/api/v1/users/{user_id}/profile/",
                headers={'X-Service-Token': settings.SERVICE_SECRET_TOKEN}
            )
    
    def compensate_send_welcome_email(self, step):
        """Compensate welcome email (no-op)"""
        # Email can't be unsent, but we could send a cancellation email
        pass

# Usage example
def create_user_with_profile_saga(user_data, profile_data):
    """Create user with profile using saga pattern"""
    orchestrator = SagaOrchestrator()
    
    steps = [
        {
            'name': 'create_user',
            'service': 'user-service',
            'data': user_data
        },
        {
            'name': 'create_profile',
            'service': 'user-service',
            'data': profile_data
        },
        {
            'name': 'send_welcome_email',
            'service': 'notification-service',
            'data': {
                'to': user_data['email'],
                'template': 'welcome',
                'data': {'username': user_data['username']}
            }
        }
    ]
    
    saga = orchestrator.start_saga('user_registration', user_data, steps)
    return saga

Monitoring Migration Progress

1. Migration Metrics

# migration_metrics.py
from prometheus_client import Counter, Gauge, Histogram
import time

# Migration metrics
MIGRATION_REQUESTS = Counter(
    'migration_requests_total',
    'Total migration requests',
    ['source', 'target', 'status']
)

DUAL_WRITE_OPERATIONS = Counter(
    'dual_write_operations_total',
    'Total dual write operations',
    ['operation', 'status']
)

SAGA_EXECUTIONS = Counter(
    'saga_executions_total',
    'Total saga executions',
    ['saga_type', 'status']
)

MIGRATION_PROGRESS = Gauge(
    'migration_progress_percent',
    'Migration progress percentage',
    ['service']
)

class MigrationMetricsCollector:
    """Collect migration-specific metrics"""
    
    @staticmethod
    def record_migration_request(source, target, success=True):
        """Record migration request"""
        status = 'success' if success else 'failure'
        MIGRATION_REQUESTS.labels(
            source=source,
            target=target,
            status=status
        ).inc()
    
    @staticmethod
    def record_dual_write(operation, success=True):
        """Record dual write operation"""
        status = 'success' if success else 'failure'
        DUAL_WRITE_OPERATIONS.labels(
            operation=operation,
            status=status
        ).inc()
    
    @staticmethod
    def record_saga_execution(saga_type, success=True):
        """Record saga execution"""
        status = 'success' if success else 'failure'
        SAGA_EXECUTIONS.labels(
            saga_type=saga_type,
            status=status
        ).inc()
    
    @staticmethod
    def update_migration_progress(service, percentage):
        """Update migration progress"""
        MIGRATION_PROGRESS.labels(service=service).set(percentage)

class MigrationDashboard:
    """Generate migration progress dashboard"""
    
    def __init__(self):
        self.metrics = MigrationMetricsCollector()
    
    def get_migration_status(self):
        """Get overall migration status"""
        return {
            'user_service': self._get_user_service_progress(),
            'product_service': self._get_product_service_progress(),
            'order_service': self._get_order_service_progress(),
            'overall_progress': self._calculate_overall_progress()
        }
    
    def _get_user_service_progress(self):
        """Get user service migration progress"""
        total_users = User.objects.count()
        migrated_users = self._count_migrated_users()
        
        progress = (migrated_users / total_users * 100) if total_users > 0 else 0
        self.metrics.update_migration_progress('user-service', progress)
        
        return {
            'total_records': total_users,
            'migrated_records': migrated_users,
            'progress_percent': progress,
            'status': 'in_progress' if progress < 100 else 'completed'
        }
    
    def _count_migrated_users(self):
        """Count users that have been migrated"""
        # This would check migration status in your tracking system
        # For example, checking if user exists in microservice
        migrated_count = 0
        
        try:
            response = requests.get(
                f"{settings.USER_SERVICE_URL}/api/v1/users/count/",
                headers={'X-Service-Token': settings.SERVICE_SECRET_TOKEN}
            )
            
            if response.status_code == 200:
                migrated_count = response.json().get('count', 0)
        except:
            pass
        
        return migrated_count
    
    def _calculate_overall_progress(self):
        """Calculate overall migration progress"""
        # This would aggregate progress from all services
        return 45.5  # Example percentage

Rollback Strategies

1. Feature Flags for Rollback

# feature_flags.py
from django.conf import settings
from django.core.cache import cache
import requests

class FeatureFlagManager:
    """Manage feature flags for migration rollback"""
    
    def __init__(self):
        self.flags = {}
        self.load_flags()
    
    def load_flags(self):
        """Load feature flags from configuration"""
        self.flags = getattr(settings, 'FEATURE_FLAGS', {
            'use_user_microservice': False,
            'use_product_microservice': False,
            'use_order_microservice': False,
            'enable_dual_write': True,
            'enable_event_publishing': True
        })
    
    def is_enabled(self, flag_name):
        """Check if feature flag is enabled"""
        # Check cache first
        cache_key = f"feature_flag:{flag_name}"
        cached_value = cache.get(cache_key)
        
        if cached_value is not None:
            return cached_value
        
        # Get from configuration
        enabled = self.flags.get(flag_name, False)
        
        # Cache for 5 minutes
        cache.set(cache_key, enabled, timeout=300)
        
        return enabled
    
    def set_flag(self, flag_name, enabled):
        """Set feature flag value"""
        self.flags[flag_name] = enabled
        
        # Update cache
        cache_key = f"feature_flag:{flag_name}"
        cache.set(cache_key, enabled, timeout=300)
        
        # Optionally persist to external configuration service
        self._persist_flag(flag_name, enabled)
    
    def _persist_flag(self, flag_name, enabled):
        """Persist flag to external service"""
        try:
            requests.post(
                f"{settings.CONFIG_SERVICE_URL}/api/v1/flags/",
                json={
                    'name': flag_name,
                    'enabled': enabled,
                    'service': settings.SERVICE_NAME
                },
                headers={'X-Service-Token': settings.SERVICE_SECRET_TOKEN}
            )
        except:
            pass

# Usage in views
feature_flags = FeatureFlagManager()

class UserViewSet(ModelViewSet):
    
    def create(self, request):
        """Create user with feature flag support"""
        if feature_flags.is_enabled('use_user_microservice'):
            return self._create_in_microservice(request)
        else:
            return self._create_in_monolith(request)
    
    def _create_in_microservice(self, request):
        """Create user in microservice"""
        try:
            response = requests.post(
                f"{settings.USER_SERVICE_URL}/api/v1/users/",
                json=request.data,
                headers={'X-Service-Token': settings.SERVICE_SECRET_TOKEN}
            )
            
            if response.status_code == 201:
                return Response(response.json(), status=201)
            else:
                # Fallback to monolith on error
                logger.warning("Microservice failed, falling back to monolith")
                return self._create_in_monolith(request)
                
        except Exception as e:
            logger.error(f"Microservice error: {e}")
            return self._create_in_monolith(request)
    
    def _create_in_monolith(self, request):
        """Create user in monolith"""
        return super().create(request)

# Emergency rollback command
from django.core.management.base import BaseCommand

class Command(BaseCommand):
    help = 'Emergency rollback to monolith'
    
    def handle(self, *args, **options):
        """Perform emergency rollback"""
        feature_flags = FeatureFlagManager()
        
        # Disable all microservice flags
        flags_to_disable = [
            'use_user_microservice',
            'use_product_microservice',
            'use_order_microservice'
        ]
        
        for flag in flags_to_disable:
            feature_flags.set_flag(flag, False)
            self.stdout.write(f"Disabled {flag}")
        
        # Clear relevant caches
        cache.clear()
        
        self.stdout.write(
            self.style.SUCCESS('Emergency rollback completed')
        )

Summary

Transforming a monolithic Django application to microservices requires careful planning and execution:

Assessment Phase:

  • Analyze existing codebase structure
  • Identify service boundaries using DDD
  • Plan migration strategy and timeline

Migration Strategies:

  • Strangler Fig pattern for gradual replacement
  • Database decomposition with dual writes
  • Event-driven synchronization

Data Consistency:

  • Saga pattern for distributed transactions
  • Event sourcing for audit trails
  • Eventual consistency patterns

Monitoring and Rollback:

  • Migration progress tracking
  • Feature flags for controlled rollout
  • Emergency rollback procedures

Key Success Factors:

  • Start with least coupled components
  • Maintain backward compatibility
  • Implement comprehensive monitoring
  • Plan for rollback scenarios
  • Test thoroughly at each stage

The migration from monolith to microservices is a journey, not a destination. Take it step by step, learn from each phase, and be prepared to adapt your strategy based on real-world feedback and operational experience.

This completes our comprehensive guide to Django microservices, from understanding the fundamentals to successfully migrating from monolithic architectures.