Migrating from a monolithic Django application to microservices is a complex but rewarding journey. This section provides a comprehensive guide for planning, executing, and managing this transformation while maintaining system stability and business continuity.
Before starting the migration, thoroughly analyze your existing monolithic application:
# analysis_tools.py
import ast
import os
from collections import defaultdict, Counter
import networkx as nx
class MonolithAnalyzer:
"""Analyze monolithic Django application structure"""
def __init__(self, project_path):
self.project_path = project_path
self.apps = []
self.models = {}
self.views = {}
self.dependencies = defaultdict(set)
self.database_usage = defaultdict(set)
def analyze_project(self):
"""Perform comprehensive project analysis"""
self.discover_apps()
self.analyze_models()
self.analyze_views()
self.analyze_dependencies()
self.analyze_database_usage()
return self.generate_report()
def discover_apps(self):
"""Discover Django apps in the project"""
for root, dirs, files in os.walk(self.project_path):
if 'apps.py' in files or 'models.py' in files:
app_name = os.path.basename(root)
if app_name not in ['migrations', '__pycache__']:
self.apps.append(app_name)
def analyze_models(self):
"""Analyze model relationships and complexity"""
for app in self.apps:
models_file = os.path.join(self.project_path, app, 'models.py')
if os.path.exists(models_file):
self.models[app] = self._parse_models(models_file)
def analyze_views(self):
"""Analyze view complexity and dependencies"""
for app in self.apps:
views_file = os.path.join(self.project_path, app, 'views.py')
if os.path.exists(views_file):
self.views[app] = self._parse_views(views_file)
def analyze_dependencies(self):
"""Analyze inter-app dependencies"""
for app in self.apps:
app_path = os.path.join(self.project_path, app)
for root, dirs, files in os.walk(app_path):
for file in files:
if file.endswith('.py'):
file_path = os.path.join(root, file)
deps = self._extract_dependencies(file_path)
self.dependencies[app].update(deps)
def _parse_models(self, models_file):
"""Parse models from file"""
with open(models_file, 'r') as f:
content = f.read()
try:
tree = ast.parse(content)
models = []
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
# Check if it's a Django model
for base in node.bases:
if isinstance(base, ast.Attribute) and base.attr == 'Model':
models.append({
'name': node.name,
'fields': self._extract_model_fields(node),
'methods': [n.name for n in node.body if isinstance(n, ast.FunctionDef)]
})
return models
except SyntaxError:
return []
def _extract_model_fields(self, class_node):
"""Extract model fields from class definition"""
fields = []
for node in class_node.body:
if isinstance(node, ast.Assign):
for target in node.targets:
if isinstance(target, ast.Name):
fields.append(target.id)
return fields
def _parse_views(self, views_file):
"""Parse views from file"""
with open(views_file, 'r') as f:
content = f.read()
try:
tree = ast.parse(content)
views = []
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef) or isinstance(node, ast.ClassDef):
views.append({
'name': node.name,
'type': 'function' if isinstance(node, ast.FunctionDef) else 'class',
'complexity': self._calculate_complexity(node)
})
return views
except SyntaxError:
return []
def _calculate_complexity(self, node):
"""Calculate cyclomatic complexity"""
complexity = 1
for child in ast.walk(node):
if isinstance(child, (ast.If, ast.While, ast.For, ast.Try, ast.With)):
complexity += 1
return complexity
def _extract_dependencies(self, file_path):
"""Extract import dependencies from file"""
dependencies = set()
try:
with open(file_path, 'r') as f:
content = f.read()
tree = ast.parse(content)
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
dependencies.add(alias.name.split('.')[0])
elif isinstance(node, ast.ImportFrom):
if node.module:
dependencies.add(node.module.split('.')[0])
except (SyntaxError, UnicodeDecodeError):
pass
return dependencies
def generate_report(self):
"""Generate analysis report"""
return {
'apps': self.apps,
'total_models': sum(len(models) for models in self.models.values()),
'total_views': sum(len(views) for views in self.views.values()),
'app_dependencies': dict(self.dependencies),
'complexity_by_app': {
app: sum(view['complexity'] for view in views)
for app, views in self.views.items()
},
'models_by_app': {
app: len(models) for app, models in self.models.items()
}
}
# Usage
analyzer = MonolithAnalyzer('/path/to/django/project')
report = analyzer.analyze_project()
print(f"Found {len(report['apps'])} apps with {report['total_models']} models")
Identify potential service boundaries using Domain-Driven Design:
# service_boundaries.py
from collections import defaultdict
import networkx as nx
class ServiceBoundaryIdentifier:
"""Identify potential microservice boundaries"""
def __init__(self, analysis_report):
self.report = analysis_report
self.dependency_graph = nx.DiGraph()
self.cohesion_scores = {}
self.coupling_scores = {}
def identify_boundaries(self):
"""Identify service boundaries using various metrics"""
self._build_dependency_graph()
self._calculate_cohesion()
self._calculate_coupling()
self._identify_clusters()
return self._generate_recommendations()
def _build_dependency_graph(self):
"""Build dependency graph between apps"""
for app, dependencies in self.report['app_dependencies'].items():
for dep in dependencies:
if dep in self.report['apps']:
self.dependency_graph.add_edge(app, dep)
def _calculate_cohesion(self):
"""Calculate cohesion within each app"""
for app in self.report['apps']:
model_count = self.report['models_by_app'].get(app, 0)
complexity = self.report['complexity_by_app'].get(app, 0)
# Higher model count and complexity indicate higher cohesion
self.cohesion_scores[app] = model_count * 0.6 + complexity * 0.4
def _calculate_coupling(self):
"""Calculate coupling between apps"""
for app in self.report['apps']:
dependencies = self.report['app_dependencies'].get(app, set())
# More dependencies indicate higher coupling
self.coupling_scores[app] = len(dependencies)
def _identify_clusters(self):
"""Identify clusters of related apps"""
# Use community detection algorithms
try:
import community
partition = community.best_partition(self.dependency_graph.to_undirected())
clusters = defaultdict(list)
for app, cluster_id in partition.items():
clusters[cluster_id].append(app)
return dict(clusters)
except ImportError:
# Fallback to simple clustering
return self._simple_clustering()
def _simple_clustering(self):
"""Simple clustering based on dependencies"""
clusters = defaultdict(list)
visited = set()
cluster_id = 0
for app in self.report['apps']:
if app not in visited:
cluster = self._get_connected_components(app, visited)
clusters[cluster_id] = cluster
cluster_id += 1
return dict(clusters)
def _get_connected_components(self, start_app, visited):
"""Get connected components starting from an app"""
component = []
stack = [start_app]
while stack:
app = stack.pop()
if app not in visited:
visited.add(app)
component.append(app)
# Add neighbors
neighbors = list(self.dependency_graph.neighbors(app))
neighbors.extend(list(self.dependency_graph.predecessors(app)))
for neighbor in neighbors:
if neighbor not in visited:
stack.append(neighbor)
return component
def _generate_recommendations(self):
"""Generate service boundary recommendations"""
clusters = self._identify_clusters()
recommendations = []
for cluster_id, apps in clusters.items():
total_models = sum(self.report['models_by_app'].get(app, 0) for app in apps)
total_complexity = sum(self.report['complexity_by_app'].get(app, 0) for app in apps)
avg_coupling = sum(self.coupling_scores.get(app, 0) for app in apps) / len(apps)
service_name = f"service_{cluster_id}"
if len(apps) == 1:
service_name = f"{apps[0]}_service"
elif 'user' in apps:
service_name = "user_service"
elif 'product' in apps:
service_name = "product_service"
elif 'order' in apps:
service_name = "order_service"
recommendations.append({
'service_name': service_name,
'apps': apps,
'total_models': total_models,
'total_complexity': total_complexity,
'average_coupling': avg_coupling,
'recommendation_score': self._calculate_recommendation_score(
total_models, total_complexity, avg_coupling
)
})
# Sort by recommendation score
recommendations.sort(key=lambda x: x['recommendation_score'], reverse=True)
return recommendations
def _calculate_recommendation_score(self, models, complexity, coupling):
"""Calculate recommendation score for service boundary"""
# Higher models and complexity, lower coupling = better service boundary
return (models * 0.4 + complexity * 0.4) - (coupling * 0.2)
# Usage
boundary_identifier = ServiceBoundaryIdentifier(report)
recommendations = boundary_identifier.identify_boundaries()
for rec in recommendations:
print(f"Service: {rec['service_name']}")
print(f"Apps: {rec['apps']}")
print(f"Score: {rec['recommendation_score']:.2f}")
print("---")
Gradually replace monolith functionality:
# strangler_fig.py
from django.http import HttpResponse, HttpResponseRedirect
from django.urls import reverse
from django.conf import settings
import requests
import logging
logger = logging.getLogger(__name__)
class StranglerFigMiddleware:
"""Middleware to gradually route requests to microservices"""
def __init__(self, get_response):
self.get_response = get_response
self.migration_config = getattr(settings, 'MICROSERVICE_MIGRATION', {})
def __call__(self, request):
# Check if this request should be routed to a microservice
microservice_url = self._should_route_to_microservice(request)
if microservice_url:
return self._proxy_to_microservice(request, microservice_url)
# Continue with monolith
return self.get_response(request)
def _should_route_to_microservice(self, request):
"""Determine if request should be routed to microservice"""
path = request.path
for pattern, config in self.migration_config.items():
if path.startswith(pattern):
# Check migration percentage
import random
if random.randint(1, 100) <= config.get('percentage', 0):
return config.get('service_url')
return None
def _proxy_to_microservice(self, request, service_url):
"""Proxy request to microservice"""
try:
# Prepare request
url = f"{service_url.rstrip('/')}{request.path}"
headers = self._prepare_headers(request)
# Make request to microservice
if request.method == 'GET':
response = requests.get(url, params=request.GET, headers=headers)
elif request.method == 'POST':
response = requests.post(url, data=request.POST, headers=headers)
elif request.method == 'PUT':
response = requests.put(url, data=request.body, headers=headers)
elif request.method == 'DELETE':
response = requests.delete(url, headers=headers)
else:
# Fallback to monolith for unsupported methods
return self.get_response(request)
# Return microservice response
django_response = HttpResponse(
response.content,
status=response.status_code,
content_type=response.headers.get('content-type', 'text/html')
)
# Copy relevant headers
for header, value in response.headers.items():
if header.lower() not in ['content-length', 'transfer-encoding']:
django_response[header] = value
logger.info(f"Proxied {request.method} {request.path} to microservice")
return django_response
except requests.RequestException as e:
logger.error(f"Error proxying to microservice: {e}")
# Fallback to monolith
return self.get_response(request)
def _prepare_headers(self, request):
"""Prepare headers for microservice request"""
headers = {}
# Copy relevant headers
for header, value in request.META.items():
if header.startswith('HTTP_'):
header_name = header[5:].replace('_', '-').title()
headers[header_name] = value
# Add service authentication
headers['X-Service-Token'] = settings.SERVICE_SECRET_TOKEN
headers['X-Original-Host'] = request.get_host()
return headers
# Migration configuration example
# settings.py
MICROSERVICE_MIGRATION = {
'/api/users/': {
'service_url': 'http://user-service:8000',
'percentage': 10 # Route 10% of traffic
},
'/api/products/': {
'service_url': 'http://product-service:8000',
'percentage': 25 # Route 25% of traffic
}
}
Gradually separate shared databases:
# database_decomposition.py
from django.db import models, transaction
from django.conf import settings
import json
import logging
logger = logging.getLogger(__name__)
class DataMigrationManager:
"""Manage data migration between monolith and microservices"""
def __init__(self):
self.migration_log = []
def migrate_user_data(self, batch_size=1000):
"""Migrate user data to user service"""
from django.contrib.auth.models import User
total_users = User.objects.count()
migrated = 0
logger.info(f"Starting migration of {total_users} users")
for offset in range(0, total_users, batch_size):
users = User.objects.all()[offset:offset + batch_size]
for user in users:
try:
self._migrate_single_user(user)
migrated += 1
except Exception as e:
logger.error(f"Failed to migrate user {user.id}: {e}")
self.migration_log.append({
'type': 'error',
'user_id': user.id,
'error': str(e)
})
logger.info(f"Migrated {migrated}/{total_users} users")
return migrated
def _migrate_single_user(self, user):
"""Migrate a single user to user service"""
user_data = {
'id': user.id,
'username': user.username,
'email': user.email,
'first_name': user.first_name,
'last_name': user.last_name,
'is_active': user.is_active,
'is_staff': user.is_staff,
'date_joined': user.date_joined.isoformat(),
'last_login': user.last_login.isoformat() if user.last_login else None
}
# Send to user service
response = requests.post(
f"{settings.USER_SERVICE_URL}/api/v1/users/migrate/",
json=user_data,
headers={'X-Service-Token': settings.SERVICE_SECRET_TOKEN}
)
if response.status_code != 201:
raise Exception(f"User service returned {response.status_code}")
self.migration_log.append({
'type': 'success',
'user_id': user.id,
'migrated_at': timezone.now().isoformat()
})
class DualWriteManager:
"""Manage dual writes during migration period"""
def __init__(self):
self.enabled = getattr(settings, 'DUAL_WRITE_ENABLED', False)
def create_user(self, user_data):
"""Create user in both monolith and microservice"""
if not self.enabled:
return self._create_in_monolith(user_data)
# Create in monolith first
user = self._create_in_monolith(user_data)
# Async create in microservice
from celery import current_app
current_app.send_task(
'migration.create_user_in_microservice',
args=[user.id, user_data]
)
return user
def update_user(self, user_id, user_data):
"""Update user in both systems"""
if not self.enabled:
return self._update_in_monolith(user_id, user_data)
# Update in monolith first
user = self._update_in_monolith(user_id, user_data)
# Async update in microservice
from celery import current_app
current_app.send_task(
'migration.update_user_in_microservice',
args=[user_id, user_data]
)
return user
def _create_in_monolith(self, user_data):
"""Create user in monolith"""
from django.contrib.auth.models import User
return User.objects.create_user(**user_data)
def _update_in_monolith(self, user_id, user_data):
"""Update user in monolith"""
from django.contrib.auth.models import User
user = User.objects.get(id=user_id)
for key, value in user_data.items():
setattr(user, key, value)
user.save()
return user
# Celery tasks for async operations
from celery import shared_task
@shared_task
def create_user_in_microservice(user_id, user_data):
"""Create user in microservice asynchronously"""
try:
response = requests.post(
f"{settings.USER_SERVICE_URL}/api/v1/users/",
json=user_data,
headers={'X-Service-Token': settings.SERVICE_SECRET_TOKEN}
)
if response.status_code == 201:
logger.info(f"User {user_id} created in microservice")
else:
logger.error(f"Failed to create user {user_id} in microservice: {response.status_code}")
except Exception as e:
logger.error(f"Error creating user {user_id} in microservice: {e}")
@shared_task
def update_user_in_microservice(user_id, user_data):
"""Update user in microservice asynchronously"""
try:
response = requests.put(
f"{settings.USER_SERVICE_URL}/api/v1/users/{user_id}/",
json=user_data,
headers={'X-Service-Token': settings.SERVICE_SECRET_TOKEN}
)
if response.status_code == 200:
logger.info(f"User {user_id} updated in microservice")
else:
logger.error(f"Failed to update user {user_id} in microservice: {response.status_code}")
except Exception as e:
logger.error(f"Error updating user {user_id} in microservice: {e}")
Use events to keep systems synchronized:
# event_driven_migration.py
import json
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
from django.contrib.auth.models import User
from celery import shared_task
import logging
logger = logging.getLogger(__name__)
class EventPublisher:
"""Publish events during migration"""
def __init__(self):
self.enabled = getattr(settings, 'EVENT_PUBLISHING_ENABLED', False)
def publish_user_event(self, event_type, user_data):
"""Publish user-related events"""
if not self.enabled:
return
event = {
'event_type': event_type,
'service': 'monolith',
'timestamp': timezone.now().isoformat(),
'data': user_data
}
# Send to message queue
publish_event_async.delay('user.events', event)
def publish_order_event(self, event_type, order_data):
"""Publish order-related events"""
if not self.enabled:
return
event = {
'event_type': event_type,
'service': 'monolith',
'timestamp': timezone.now().isoformat(),
'data': order_data
}
publish_event_async.delay('order.events', event)
event_publisher = EventPublisher()
# Signal handlers for event publishing
@receiver(post_save, sender=User)
def publish_user_saved_event(sender, instance, created, **kwargs):
"""Publish user saved event"""
event_type = 'user.created' if created else 'user.updated'
user_data = {
'id': instance.id,
'username': instance.username,
'email': instance.email,
'first_name': instance.first_name,
'last_name': instance.last_name,
'is_active': instance.is_active,
'date_joined': instance.date_joined.isoformat(),
'last_login': instance.last_login.isoformat() if instance.last_login else None
}
event_publisher.publish_user_event(event_type, user_data)
@receiver(post_delete, sender=User)
def publish_user_deleted_event(sender, instance, **kwargs):
"""Publish user deleted event"""
user_data = {
'id': instance.id,
'username': instance.username,
'email': instance.email
}
event_publisher.publish_user_event('user.deleted', user_data)
@shared_task
def publish_event_async(topic, event_data):
"""Publish event to message queue asynchronously"""
try:
import pika
connection = pika.BlockingConnection(
pika.URLParameters(settings.RABBITMQ_URL)
)
channel = connection.channel()
# Declare exchange
channel.exchange_declare(
exchange='migration.events',
exchange_type='topic',
durable=True
)
# Publish event
channel.basic_publish(
exchange='migration.events',
routing_key=topic,
body=json.dumps(event_data),
properties=pika.BasicProperties(
delivery_mode=2, # Make message persistent
content_type='application/json'
)
)
connection.close()
logger.info(f"Published event to {topic}")
except Exception as e:
logger.error(f"Failed to publish event: {e}")
class EventConsumer:
"""Consume events from monolith in microservices"""
def __init__(self, service_name):
self.service_name = service_name
def start_consuming(self):
"""Start consuming events"""
import pika
connection = pika.BlockingConnection(
pika.URLParameters(settings.RABBITMQ_URL)
)
channel = connection.channel()
# Declare exchange
channel.exchange_declare(
exchange='migration.events',
exchange_type='topic',
durable=True
)
# Declare queue
queue_name = f'{self.service_name}.migration.events'
channel.queue_declare(queue=queue_name, durable=True)
# Bind queue to exchange
if self.service_name == 'user-service':
channel.queue_bind(
exchange='migration.events',
queue=queue_name,
routing_key='user.events'
)
elif self.service_name == 'order-service':
channel.queue_bind(
exchange='migration.events',
queue=queue_name,
routing_key='order.events'
)
# Start consuming
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue=queue_name,
on_message_callback=self.handle_event,
auto_ack=False
)
logger.info(f"Started consuming events for {self.service_name}")
channel.start_consuming()
def handle_event(self, ch, method, properties, body):
"""Handle incoming event"""
try:
event_data = json.loads(body)
event_type = event_data['event_type']
if event_type.startswith('user.'):
self.handle_user_event(event_data)
elif event_type.startswith('order.'):
self.handle_order_event(event_data)
# Acknowledge message
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
logger.error(f"Error handling event: {e}")
# Reject and requeue message
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
def handle_user_event(self, event_data):
"""Handle user-related events"""
event_type = event_data['event_type']
user_data = event_data['data']
if event_type == 'user.created':
self.create_user_from_event(user_data)
elif event_type == 'user.updated':
self.update_user_from_event(user_data)
elif event_type == 'user.deleted':
self.delete_user_from_event(user_data)
def create_user_from_event(self, user_data):
"""Create user from event data"""
# Implementation depends on microservice
logger.info(f"Creating user from event: {user_data['id']}")
def update_user_from_event(self, user_data):
"""Update user from event data"""
logger.info(f"Updating user from event: {user_data['id']}")
def delete_user_from_event(self, user_data):
"""Delete user from event data"""
logger.info(f"Deleting user from event: {user_data['id']}")
# saga_pattern.py
from enum import Enum
from django.db import models, transaction
import json
import uuid
import logging
logger = logging.getLogger(__name__)
class SagaStatus(models.TextChoices):
PENDING = 'pending', 'Pending'
COMPLETED = 'completed', 'Completed'
FAILED = 'failed', 'Failed'
COMPENSATING = 'compensating', 'Compensating'
class SagaStepStatus(models.TextChoices):
PENDING = 'pending', 'Pending'
COMPLETED = 'completed', 'Completed'
FAILED = 'failed', 'Failed'
COMPENSATED = 'compensated', 'Compensated'
class Saga(models.Model):
"""Saga orchestration model"""
id = models.UUIDField(primary_key=True, default=uuid.uuid4)
saga_type = models.CharField(max_length=100)
status = models.CharField(max_length=20, choices=SagaStatus.choices, default=SagaStatus.PENDING)
data = models.JSONField(default=dict)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'sagas'
class SagaStep(models.Model):
"""Individual saga step"""
id = models.UUIDField(primary_key=True, default=uuid.uuid4)
saga = models.ForeignKey(Saga, on_delete=models.CASCADE, related_name='steps')
step_name = models.CharField(max_length=100)
step_order = models.IntegerField()
status = models.CharField(max_length=20, choices=SagaStepStatus.choices, default=SagaStepStatus.PENDING)
service_name = models.CharField(max_length=100)
action_data = models.JSONField(default=dict)
compensation_data = models.JSONField(default=dict)
result_data = models.JSONField(default=dict)
error_message = models.TextField(blank=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'saga_steps'
ordering = ['step_order']
class SagaOrchestrator:
"""Orchestrate saga execution"""
def __init__(self):
self.step_handlers = {
'create_user': self.handle_create_user,
'create_profile': self.handle_create_profile,
'send_welcome_email': self.handle_send_welcome_email,
'create_order': self.handle_create_order,
'reserve_inventory': self.handle_reserve_inventory,
'process_payment': self.handle_process_payment,
}
self.compensation_handlers = {
'create_user': self.compensate_create_user,
'create_profile': self.compensate_create_profile,
'send_welcome_email': self.compensate_send_welcome_email,
'create_order': self.compensate_create_order,
'reserve_inventory': self.compensate_reserve_inventory,
'process_payment': self.compensate_process_payment,
}
def start_saga(self, saga_type, saga_data, steps):
"""Start a new saga"""
with transaction.atomic():
saga = Saga.objects.create(
saga_type=saga_type,
data=saga_data
)
# Create saga steps
for i, step_config in enumerate(steps):
SagaStep.objects.create(
saga=saga,
step_name=step_config['name'],
step_order=i,
service_name=step_config['service'],
action_data=step_config.get('data', {}),
compensation_data=step_config.get('compensation_data', {})
)
# Start executing saga
self.execute_saga(saga.id)
return saga
def execute_saga(self, saga_id):
"""Execute saga steps"""
saga = Saga.objects.get(id=saga_id)
if saga.status != SagaStatus.PENDING:
return
# Get next pending step
next_step = saga.steps.filter(status=SagaStepStatus.PENDING).first()
if not next_step:
# All steps completed
saga.status = SagaStatus.COMPLETED
saga.save()
logger.info(f"Saga {saga_id} completed successfully")
return
# Execute the step
try:
self.execute_step(next_step)
except Exception as e:
logger.error(f"Saga {saga_id} step {next_step.step_name} failed: {e}")
self.handle_saga_failure(saga)
def execute_step(self, step):
"""Execute a single saga step"""
handler = self.step_handlers.get(step.step_name)
if not handler:
raise ValueError(f"No handler for step {step.step_name}")
try:
result = handler(step)
step.status = SagaStepStatus.COMPLETED
step.result_data = result
step.save()
logger.info(f"Step {step.step_name} completed successfully")
# Continue with next step
from celery import current_app
current_app.send_task(
'saga.execute_saga',
args=[str(step.saga.id)],
countdown=1
)
except Exception as e:
step.status = SagaStepStatus.FAILED
step.error_message = str(e)
step.save()
raise e
def handle_saga_failure(self, saga):
"""Handle saga failure by running compensations"""
saga.status = SagaStatus.COMPENSATING
saga.save()
# Run compensations in reverse order
completed_steps = saga.steps.filter(
status=SagaStepStatus.COMPLETED
).order_by('-step_order')
for step in completed_steps:
try:
self.compensate_step(step)
except Exception as e:
logger.error(f"Compensation failed for step {step.step_name}: {e}")
saga.status = SagaStatus.FAILED
saga.save()
def compensate_step(self, step):
"""Compensate a completed step"""
handler = self.compensation_handlers.get(step.step_name)
if not handler:
logger.warning(f"No compensation handler for step {step.step_name}")
return
try:
handler(step)
step.status = SagaStepStatus.COMPENSATED
step.save()
logger.info(f"Step {step.step_name} compensated successfully")
except Exception as e:
logger.error(f"Compensation failed for step {step.step_name}: {e}")
raise e
# Step handlers
def handle_create_user(self, step):
"""Handle user creation step"""
user_data = step.action_data
response = requests.post(
f"{settings.USER_SERVICE_URL}/api/v1/users/",
json=user_data,
headers={'X-Service-Token': settings.SERVICE_SECRET_TOKEN}
)
if response.status_code != 201:
raise Exception(f"User creation failed: {response.status_code}")
return response.json()
def handle_create_profile(self, step):
"""Handle profile creation step"""
profile_data = step.action_data
user_id = step.saga.data.get('user_id')
response = requests.post(
f"{settings.USER_SERVICE_URL}/api/v1/users/{user_id}/profile/",
json=profile_data,
headers={'X-Service-Token': settings.SERVICE_SECRET_TOKEN}
)
if response.status_code != 201:
raise Exception(f"Profile creation failed: {response.status_code}")
return response.json()
def handle_send_welcome_email(self, step):
"""Handle welcome email step"""
email_data = step.action_data
response = requests.post(
f"{settings.NOTIFICATION_SERVICE_URL}/api/v1/emails/",
json=email_data,
headers={'X-Service-Token': settings.SERVICE_SECRET_TOKEN}
)
if response.status_code != 202:
raise Exception(f"Email sending failed: {response.status_code}")
return response.json()
# Compensation handlers
def compensate_create_user(self, step):
"""Compensate user creation"""
user_id = step.result_data.get('id')
if user_id:
requests.delete(
f"{settings.USER_SERVICE_URL}/api/v1/users/{user_id}/",
headers={'X-Service-Token': settings.SERVICE_SECRET_TOKEN}
)
def compensate_create_profile(self, step):
"""Compensate profile creation"""
user_id = step.saga.data.get('user_id')
if user_id:
requests.delete(
f"{settings.USER_SERVICE_URL}/api/v1/users/{user_id}/profile/",
headers={'X-Service-Token': settings.SERVICE_SECRET_TOKEN}
)
def compensate_send_welcome_email(self, step):
"""Compensate welcome email (no-op)"""
# Email can't be unsent, but we could send a cancellation email
pass
# Usage example
def create_user_with_profile_saga(user_data, profile_data):
"""Create user with profile using saga pattern"""
orchestrator = SagaOrchestrator()
steps = [
{
'name': 'create_user',
'service': 'user-service',
'data': user_data
},
{
'name': 'create_profile',
'service': 'user-service',
'data': profile_data
},
{
'name': 'send_welcome_email',
'service': 'notification-service',
'data': {
'to': user_data['email'],
'template': 'welcome',
'data': {'username': user_data['username']}
}
}
]
saga = orchestrator.start_saga('user_registration', user_data, steps)
return saga
# migration_metrics.py
from prometheus_client import Counter, Gauge, Histogram
import time
# Migration metrics
MIGRATION_REQUESTS = Counter(
'migration_requests_total',
'Total migration requests',
['source', 'target', 'status']
)
DUAL_WRITE_OPERATIONS = Counter(
'dual_write_operations_total',
'Total dual write operations',
['operation', 'status']
)
SAGA_EXECUTIONS = Counter(
'saga_executions_total',
'Total saga executions',
['saga_type', 'status']
)
MIGRATION_PROGRESS = Gauge(
'migration_progress_percent',
'Migration progress percentage',
['service']
)
class MigrationMetricsCollector:
"""Collect migration-specific metrics"""
@staticmethod
def record_migration_request(source, target, success=True):
"""Record migration request"""
status = 'success' if success else 'failure'
MIGRATION_REQUESTS.labels(
source=source,
target=target,
status=status
).inc()
@staticmethod
def record_dual_write(operation, success=True):
"""Record dual write operation"""
status = 'success' if success else 'failure'
DUAL_WRITE_OPERATIONS.labels(
operation=operation,
status=status
).inc()
@staticmethod
def record_saga_execution(saga_type, success=True):
"""Record saga execution"""
status = 'success' if success else 'failure'
SAGA_EXECUTIONS.labels(
saga_type=saga_type,
status=status
).inc()
@staticmethod
def update_migration_progress(service, percentage):
"""Update migration progress"""
MIGRATION_PROGRESS.labels(service=service).set(percentage)
class MigrationDashboard:
"""Generate migration progress dashboard"""
def __init__(self):
self.metrics = MigrationMetricsCollector()
def get_migration_status(self):
"""Get overall migration status"""
return {
'user_service': self._get_user_service_progress(),
'product_service': self._get_product_service_progress(),
'order_service': self._get_order_service_progress(),
'overall_progress': self._calculate_overall_progress()
}
def _get_user_service_progress(self):
"""Get user service migration progress"""
total_users = User.objects.count()
migrated_users = self._count_migrated_users()
progress = (migrated_users / total_users * 100) if total_users > 0 else 0
self.metrics.update_migration_progress('user-service', progress)
return {
'total_records': total_users,
'migrated_records': migrated_users,
'progress_percent': progress,
'status': 'in_progress' if progress < 100 else 'completed'
}
def _count_migrated_users(self):
"""Count users that have been migrated"""
# This would check migration status in your tracking system
# For example, checking if user exists in microservice
migrated_count = 0
try:
response = requests.get(
f"{settings.USER_SERVICE_URL}/api/v1/users/count/",
headers={'X-Service-Token': settings.SERVICE_SECRET_TOKEN}
)
if response.status_code == 200:
migrated_count = response.json().get('count', 0)
except:
pass
return migrated_count
def _calculate_overall_progress(self):
"""Calculate overall migration progress"""
# This would aggregate progress from all services
return 45.5 # Example percentage
# feature_flags.py
from django.conf import settings
from django.core.cache import cache
import requests
class FeatureFlagManager:
"""Manage feature flags for migration rollback"""
def __init__(self):
self.flags = {}
self.load_flags()
def load_flags(self):
"""Load feature flags from configuration"""
self.flags = getattr(settings, 'FEATURE_FLAGS', {
'use_user_microservice': False,
'use_product_microservice': False,
'use_order_microservice': False,
'enable_dual_write': True,
'enable_event_publishing': True
})
def is_enabled(self, flag_name):
"""Check if feature flag is enabled"""
# Check cache first
cache_key = f"feature_flag:{flag_name}"
cached_value = cache.get(cache_key)
if cached_value is not None:
return cached_value
# Get from configuration
enabled = self.flags.get(flag_name, False)
# Cache for 5 minutes
cache.set(cache_key, enabled, timeout=300)
return enabled
def set_flag(self, flag_name, enabled):
"""Set feature flag value"""
self.flags[flag_name] = enabled
# Update cache
cache_key = f"feature_flag:{flag_name}"
cache.set(cache_key, enabled, timeout=300)
# Optionally persist to external configuration service
self._persist_flag(flag_name, enabled)
def _persist_flag(self, flag_name, enabled):
"""Persist flag to external service"""
try:
requests.post(
f"{settings.CONFIG_SERVICE_URL}/api/v1/flags/",
json={
'name': flag_name,
'enabled': enabled,
'service': settings.SERVICE_NAME
},
headers={'X-Service-Token': settings.SERVICE_SECRET_TOKEN}
)
except:
pass
# Usage in views
feature_flags = FeatureFlagManager()
class UserViewSet(ModelViewSet):
def create(self, request):
"""Create user with feature flag support"""
if feature_flags.is_enabled('use_user_microservice'):
return self._create_in_microservice(request)
else:
return self._create_in_monolith(request)
def _create_in_microservice(self, request):
"""Create user in microservice"""
try:
response = requests.post(
f"{settings.USER_SERVICE_URL}/api/v1/users/",
json=request.data,
headers={'X-Service-Token': settings.SERVICE_SECRET_TOKEN}
)
if response.status_code == 201:
return Response(response.json(), status=201)
else:
# Fallback to monolith on error
logger.warning("Microservice failed, falling back to monolith")
return self._create_in_monolith(request)
except Exception as e:
logger.error(f"Microservice error: {e}")
return self._create_in_monolith(request)
def _create_in_monolith(self, request):
"""Create user in monolith"""
return super().create(request)
# Emergency rollback command
from django.core.management.base import BaseCommand
class Command(BaseCommand):
help = 'Emergency rollback to monolith'
def handle(self, *args, **options):
"""Perform emergency rollback"""
feature_flags = FeatureFlagManager()
# Disable all microservice flags
flags_to_disable = [
'use_user_microservice',
'use_product_microservice',
'use_order_microservice'
]
for flag in flags_to_disable:
feature_flags.set_flag(flag, False)
self.stdout.write(f"Disabled {flag}")
# Clear relevant caches
cache.clear()
self.stdout.write(
self.style.SUCCESS('Emergency rollback completed')
)
Transforming a monolithic Django application to microservices requires careful planning and execution:
Assessment Phase:
Migration Strategies:
Data Consistency:
Monitoring and Rollback:
Key Success Factors:
The migration from monolith to microservices is a journey, not a destination. Take it step by step, learn from each phase, and be prepared to adapt your strategy based on real-world feedback and operational experience.
This completes our comprehensive guide to Django microservices, from understanding the fundamentals to successfully migrating from monolithic architectures.
Best Practices
This section consolidates essential best practices for developing, deploying, and maintaining Django microservices. Following these practices ensures scalable, maintainable, and reliable microservices architecture.
Django Releases
Stay up-to-date with the latest Django releases, features, and improvements