This section consolidates essential best practices for developing, deploying, and maintaining Django microservices. Following these practices ensures scalable, maintainable, and reliable microservices architecture.
Each microservice should have a single, well-defined responsibility:
# Good: User service focused on user management
class UserService:
"""Handles all user-related operations"""
def create_user(self, user_data):
"""Create new user account"""
pass
def authenticate_user(self, credentials):
"""Authenticate user credentials"""
pass
def update_user_profile(self, user_id, profile_data):
"""Update user profile information"""
pass
def deactivate_user(self, user_id):
"""Deactivate user account"""
pass
# Bad: Mixed responsibilities
class UserOrderPaymentService:
"""Handles users, orders, and payments - too many responsibilities"""
def create_user(self, user_data):
pass
def create_order(self, order_data):
pass
def process_payment(self, payment_data):
pass
Each service should own its data:
# User Service Database
# settings.py
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'user_service_db',
'USER': 'user_service_user',
'PASSWORD': 'user_service_password',
'HOST': 'user-db.internal',
'PORT': '5432',
}
}
# Order Service Database
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'order_service_db',
'USER': 'order_service_user',
'PASSWORD': 'order_service_password',
'HOST': 'order-db.internal',
'PORT': '5432',
}
}
# Never access another service's database directly
# Bad: Order service accessing user database
def get_user_orders(user_id):
# Don't do this - accessing user database from order service
user = User.objects.using('user_db').get(id=user_id)
return user.orders.all()
# Good: Use API calls between services
def get_user_orders(user_id):
# Validate user exists via API call
user_client = ServiceClient('user-service')
user_data = user_client.get(f'/api/v1/users/{user_id}/')
if user_data:
return Order.objects.filter(user_id=user_id)
else:
raise ValueError("User not found")
Design APIs before implementation:
# api_specification.py
"""
User Service API Specification
Base URL: /api/v1/users/
Endpoints:
- GET / # List users
- POST / # Create user
- GET /{id}/ # Get user details
- PUT /{id}/ # Update user
- DELETE /{id}/ # Delete user
- POST /{id}/activate/ # Activate user
- POST /{id}/deactivate/ # Deactivate user
- GET /{id}/profile/ # Get user profile
- PUT /{id}/profile/ # Update user profile
Request/Response formats defined in OpenAPI specification
"""
from drf_spectacular.utils import extend_schema, OpenApiExample
class UserViewSet(ModelViewSet):
@extend_schema(
summary="Create new user",
description="Create a new user account with the provided information",
request=UserCreateSerializer,
responses={
201: UserSerializer,
400: OpenApiExample(
'Validation Error',
value={'email': ['This field must be unique.']}
)
},
examples=[
OpenApiExample(
'User Creation',
value={
'username': 'johndoe',
'email': 'john@example.com',
'first_name': 'John',
'last_name': 'Doe'
}
)
]
)
def create(self, request):
return super().create(request)
Use environment-based configuration:
# settings.py
import os
from decouple import config, Csv
# Environment-based settings
DEBUG = config('DEBUG', default=False, cast=bool)
SECRET_KEY = config('SECRET_KEY')
ALLOWED_HOSTS = config('ALLOWED_HOSTS', default='', cast=Csv())
# Database configuration
DATABASE_URL = config('DATABASE_URL')
DATABASES = {
'default': dj_database_url.parse(DATABASE_URL)
}
# External service URLs
USER_SERVICE_URL = config('USER_SERVICE_URL', default='http://user-service:8000')
PRODUCT_SERVICE_URL = config('PRODUCT_SERVICE_URL', default='http://product-service:8000')
ORDER_SERVICE_URL = config('ORDER_SERVICE_URL', default='http://order-service:8000')
# Feature flags
ENABLE_CACHING = config('ENABLE_CACHING', default=True, cast=bool)
ENABLE_ASYNC_PROCESSING = config('ENABLE_ASYNC_PROCESSING', default=True, cast=bool)
ENABLE_METRICS = config('ENABLE_METRICS', default=True, cast=bool)
# Service-specific configuration
SERVICE_NAME = config('SERVICE_NAME', default='user-service')
SERVICE_VERSION = config('SERVICE_VERSION', default='1.0.0')
SERVICE_ENVIRONMENT = config('SERVICE_ENVIRONMENT', default='development')
# Logging configuration
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'json': {
'format': '{"timestamp": "%(asctime)s", "level": "%(levelname)s", "service": "' + SERVICE_NAME + '", "message": "%(message)s"}',
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'json',
},
},
'root': {
'handlers': ['console'],
'level': config('LOG_LEVEL', default='INFO'),
},
}
Implement consistent error handling:
# exceptions.py
from rest_framework.views import exception_handler
from rest_framework.response import Response
from rest_framework import status
import logging
import uuid
logger = logging.getLogger(__name__)
class ServiceException(Exception):
"""Base exception for service errors"""
def __init__(self, message, error_code=None, details=None):
self.message = message
self.error_code = error_code or 'SERVICE_ERROR'
self.details = details or {}
super().__init__(self.message)
class ValidationException(ServiceException):
"""Validation error exception"""
def __init__(self, message, field_errors=None):
super().__init__(message, 'VALIDATION_ERROR', {'field_errors': field_errors or {}})
class BusinessLogicException(ServiceException):
"""Business logic error exception"""
def __init__(self, message, business_rule=None):
super().__init__(message, 'BUSINESS_LOGIC_ERROR', {'business_rule': business_rule})
class ExternalServiceException(ServiceException):
"""External service error exception"""
def __init__(self, message, service_name, status_code=None):
super().__init__(
message,
'EXTERNAL_SERVICE_ERROR',
{'service_name': service_name, 'status_code': status_code}
)
def custom_exception_handler(exc, context):
"""Custom exception handler for consistent error responses"""
# Generate correlation ID for error tracking
correlation_id = str(uuid.uuid4())
# Handle custom service exceptions
if isinstance(exc, ServiceException):
error_response = {
'error': {
'code': exc.error_code,
'message': exc.message,
'details': exc.details,
'correlation_id': correlation_id,
'service': settings.SERVICE_NAME,
'timestamp': timezone.now().isoformat()
}
}
# Log the error
logger.error(f"Service error [{correlation_id}]: {exc.message}", extra={
'error_code': exc.error_code,
'details': exc.details,
'correlation_id': correlation_id
})
# Determine HTTP status code
status_code = status.HTTP_400_BAD_REQUEST
if exc.error_code == 'EXTERNAL_SERVICE_ERROR':
status_code = status.HTTP_502_BAD_GATEWAY
elif exc.error_code == 'BUSINESS_LOGIC_ERROR':
status_code = status.HTTP_422_UNPROCESSABLE_ENTITY
return Response(error_response, status=status_code)
# Handle Django REST framework exceptions
response = exception_handler(exc, context)
if response is not None:
# Enhance DRF error responses
custom_response_data = {
'error': {
'code': 'VALIDATION_ERROR' if response.status_code == 400 else 'API_ERROR',
'message': 'Request validation failed' if response.status_code == 400 else 'API error occurred',
'details': response.data,
'correlation_id': correlation_id,
'service': settings.SERVICE_NAME,
'timestamp': timezone.now().isoformat()
}
}
# Log the error
logger.warning(f"API error [{correlation_id}]: {response.status_code}", extra={
'status_code': response.status_code,
'details': response.data,
'correlation_id': correlation_id
})
response.data = custom_response_data
return response
# Usage in views
class UserViewSet(ModelViewSet):
def create(self, request):
"""Create user with proper error handling"""
try:
# Validate business rules
if User.objects.filter(email=request.data.get('email')).exists():
raise BusinessLogicException(
"User with this email already exists",
business_rule="unique_email"
)
# Validate with external service
validation_result = self.validate_with_external_service(request.data)
if not validation_result['valid']:
raise ExternalServiceException(
"External validation failed",
service_name="validation-service",
status_code=validation_result.get('status_code')
)
return super().create(request)
except ServiceException:
raise # Re-raise service exceptions
except Exception as e:
# Log unexpected errors
logger.exception("Unexpected error in user creation")
raise ServiceException(
"An unexpected error occurred",
error_code="INTERNAL_ERROR"
)
Implement structured logging:
# logging_utils.py
import logging
import json
import time
from django.utils.deprecation import MiddlewareMixin
from django.conf import settings
class StructuredLogger:
"""Structured logging utility"""
def __init__(self, name):
self.logger = logging.getLogger(name)
def log(self, level, message, **kwargs):
"""Log structured message"""
log_data = {
'timestamp': time.time(),
'service': settings.SERVICE_NAME,
'version': settings.SERVICE_VERSION,
'environment': settings.SERVICE_ENVIRONMENT,
'message': message,
**kwargs
}
self.logger.log(level, json.dumps(log_data))
def info(self, message, **kwargs):
self.log(logging.INFO, message, **kwargs)
def warning(self, message, **kwargs):
self.log(logging.WARNING, message, **kwargs)
def error(self, message, **kwargs):
self.log(logging.ERROR, message, **kwargs)
def debug(self, message, **kwargs):
self.log(logging.DEBUG, message, **kwargs)
class RequestLoggingMiddleware(MiddlewareMixin):
"""Log all requests with structured data"""
def __init__(self, get_response):
self.get_response = get_response
self.logger = StructuredLogger('requests')
def __call__(self, request):
start_time = time.time()
# Log request
self.logger.info("Request started",
method=request.method,
path=request.path,
user_agent=request.META.get('HTTP_USER_AGENT'),
ip_address=self.get_client_ip(request),
correlation_id=getattr(request, 'correlation_id', None)
)
response = self.get_response(request)
# Log response
duration = time.time() - start_time
self.logger.info("Request completed",
method=request.method,
path=request.path,
status_code=response.status_code,
duration_ms=duration * 1000,
correlation_id=getattr(request, 'correlation_id', None)
)
return response
def get_client_ip(self, request):
"""Get client IP address"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
# Business logic logging
class UserService:
"""User service with structured logging"""
def __init__(self):
self.logger = StructuredLogger('user_service')
def create_user(self, user_data):
"""Create user with logging"""
self.logger.info("Creating user",
username=user_data.get('username'),
email=user_data.get('email')
)
try:
user = User.objects.create_user(**user_data)
self.logger.info("User created successfully",
user_id=user.id,
username=user.username
)
return user
except Exception as e:
self.logger.error("User creation failed",
error=str(e),
username=user_data.get('username'),
email=user_data.get('email')
)
raise
Implement comprehensive testing:
# test_strategy.py
import pytest
from django.test import TestCase, TransactionTestCase
from rest_framework.test import APITestCase
from unittest.mock import patch, Mock
import responses
class BaseTestCase(TestCase):
"""Base test case with common utilities"""
def setUp(self):
self.user = self.create_test_user()
def create_test_user(self, **kwargs):
"""Create test user with defaults"""
defaults = {
'username': 'testuser',
'email': 'test@example.com',
'first_name': 'Test',
'last_name': 'User'
}
defaults.update(kwargs)
return User.objects.create_user(**defaults)
def assert_error_response(self, response, error_code, status_code):
"""Assert error response format"""
self.assertEqual(response.status_code, status_code)
self.assertIn('error', response.data)
self.assertEqual(response.data['error']['code'], error_code)
class UserServiceTest(BaseTestCase):
"""Unit tests for user service"""
def test_create_user_success(self):
"""Test successful user creation"""
user_data = {
'username': 'newuser',
'email': 'new@example.com',
'first_name': 'New',
'last_name': 'User'
}
service = UserService()
user = service.create_user(user_data)
self.assertEqual(user.username, 'newuser')
self.assertEqual(user.email, 'new@example.com')
self.assertTrue(User.objects.filter(username='newuser').exists())
def test_create_user_duplicate_email(self):
"""Test user creation with duplicate email"""
user_data = {
'username': 'newuser',
'email': 'test@example.com', # Duplicate email
'first_name': 'New',
'last_name': 'User'
}
service = UserService()
with self.assertRaises(BusinessLogicException) as context:
service.create_user(user_data)
self.assertEqual(context.exception.error_code, 'BUSINESS_LOGIC_ERROR')
class UserAPITest(APITestCase):
"""Integration tests for user API"""
def setUp(self):
super().setUp()
self.user = self.create_test_user()
self.client.force_authenticate(user=self.user)
def test_list_users(self):
"""Test user list endpoint"""
response = self.client.get('/api/v1/users/')
self.assertEqual(response.status_code, 200)
self.assertIn('results', response.data)
self.assertIn('pagination', response.data)
@responses.activate
def test_create_user_with_external_validation(self):
"""Test user creation with external service validation"""
# Mock external service response
responses.add(
responses.POST,
'http://validation-service:8000/api/v1/validate/',
json={'valid': True, 'score': 0.95},
status=200
)
user_data = {
'username': 'validuser',
'email': 'valid@example.com',
'first_name': 'Valid',
'last_name': 'User'
}
response = self.client.post('/api/v1/users/', user_data)
self.assertEqual(response.status_code, 201)
self.assertEqual(response.data['username'], 'validuser')
class UserIntegrationTest(TransactionTestCase):
"""Integration tests with database transactions"""
def test_user_creation_with_profile(self):
"""Test user creation creates profile atomically"""
user_data = {
'username': 'profileuser',
'email': 'profile@example.com'
}
service = UserService()
user = service.create_user(user_data)
# Profile should be created automatically
self.assertTrue(hasattr(user, 'profile'))
self.assertIsNotNone(user.profile)
# Performance tests
class UserPerformanceTest(TestCase):
"""Performance tests for user operations"""
def test_user_list_performance(self):
"""Test user list performance with large dataset"""
# Create 1000 test users
users = [
User(username=f'user{i}', email=f'user{i}@example.com')
for i in range(1000)
]
User.objects.bulk_create(users)
# Test query performance
import time
start_time = time.time()
user_list = list(User.objects.all()[:20])
end_time = time.time()
query_time = end_time - start_time
# Should complete within 100ms
self.assertLess(query_time, 0.1)
self.assertEqual(len(user_list), 20)
# Contract tests
class UserContractTest(TestCase):
"""Contract tests for API compatibility"""
def test_user_response_schema(self):
"""Test user response matches expected schema"""
user = self.create_test_user()
from .serializers import UserSerializer
serializer = UserSerializer(user)
data = serializer.data
# Verify required fields
required_fields = ['id', 'username', 'email', 'first_name', 'last_name']
for field in required_fields:
self.assertIn(field, data)
# Verify field types
self.assertIsInstance(data['id'], int)
self.assertIsInstance(data['username'], str)
self.assertIsInstance(data['email'], str)
Implement comprehensive health checks:
# health_checks.py
from django.http import JsonResponse
from django.db import connection
from django.core.cache import cache
from django.conf import settings
import time
import requests
class HealthChecker:
"""Comprehensive health check implementation"""
def __init__(self):
self.checks = {
'database': self.check_database,
'cache': self.check_cache,
'external_services': self.check_external_services,
'disk_space': self.check_disk_space,
'memory': self.check_memory
}
def check_all(self):
"""Run all health checks"""
results = {}
overall_healthy = True
for check_name, check_func in self.checks.items():
try:
result = check_func()
results[check_name] = result
if not result.get('healthy', False):
overall_healthy = False
except Exception as e:
results[check_name] = {
'healthy': False,
'error': str(e)
}
overall_healthy = False
return {
'healthy': overall_healthy,
'service': settings.SERVICE_NAME,
'version': settings.SERVICE_VERSION,
'timestamp': time.time(),
'checks': results
}
def check_database(self):
"""Check database connectivity"""
try:
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
result = cursor.fetchone()
return {
'healthy': True,
'response_time_ms': 0 # Could measure actual response time
}
except Exception as e:
return {
'healthy': False,
'error': str(e)
}
def check_cache(self):
"""Check cache connectivity"""
try:
test_key = f"health_check_{int(time.time())}"
test_value = "health_check_value"
# Test set and get
cache.set(test_key, test_value, timeout=60)
retrieved_value = cache.get(test_key)
cache.delete(test_key)
if retrieved_value != test_value:
return {
'healthy': False,
'error': 'Cache value mismatch'
}
return {
'healthy': True,
'response_time_ms': 0
}
except Exception as e:
return {
'healthy': False,
'error': str(e)
}
def check_external_services(self):
"""Check external service dependencies"""
external_services = getattr(settings, 'EXTERNAL_SERVICES', {})
service_results = {}
for service_name, service_url in external_services.items():
try:
response = requests.get(
f"{service_url}/health/",
timeout=5
)
service_results[service_name] = {
'healthy': response.status_code == 200,
'status_code': response.status_code,
'response_time_ms': response.elapsed.total_seconds() * 1000
}
except requests.RequestException as e:
service_results[service_name] = {
'healthy': False,
'error': str(e)
}
overall_healthy = all(
result['healthy'] for result in service_results.values()
)
return {
'healthy': overall_healthy,
'services': service_results
}
def check_disk_space(self):
"""Check available disk space"""
try:
import shutil
total, used, free = shutil.disk_usage('/')
free_percentage = (free / total) * 100
return {
'healthy': free_percentage > 10, # Alert if less than 10% free
'free_percentage': free_percentage,
'free_bytes': free,
'total_bytes': total
}
except Exception as e:
return {
'healthy': False,
'error': str(e)
}
def check_memory(self):
"""Check memory usage"""
try:
import psutil
memory = psutil.virtual_memory()
return {
'healthy': memory.percent < 90, # Alert if more than 90% used
'used_percentage': memory.percent,
'available_bytes': memory.available,
'total_bytes': memory.total
}
except Exception as e:
return {
'healthy': False,
'error': str(e)
}
# Health check views
health_checker = HealthChecker()
def health_check(request):
"""Basic health check endpoint"""
result = health_checker.check_all()
status_code = 200 if result['healthy'] else 503
return JsonResponse(result, status=status_code)
def readiness_check(request):
"""Readiness check for Kubernetes"""
# Check if service is ready to receive traffic
result = health_checker.check_all()
# Service is ready if database and cache are healthy
ready = (
result['checks'].get('database', {}).get('healthy', False) and
result['checks'].get('cache', {}).get('healthy', False)
)
status_code = 200 if ready else 503
return JsonResponse({
'ready': ready,
'service': settings.SERVICE_NAME,
'timestamp': time.time()
}, status=status_code)
def liveness_check(request):
"""Liveness check for Kubernetes"""
# Simple check to see if the service is alive
return JsonResponse({
'alive': True,
'service': settings.SERVICE_NAME,
'timestamp': time.time()
})
Implement graceful shutdown handling:
# graceful_shutdown.py
import signal
import sys
import threading
import time
from django.core.management.base import BaseCommand
from django.conf import settings
class GracefulShutdownHandler:
"""Handle graceful shutdown of the service"""
def __init__(self):
self.shutdown_event = threading.Event()
self.active_requests = 0
self.request_lock = threading.Lock()
# Register signal handlers
signal.signal(signal.SIGTERM, self.signal_handler)
signal.signal(signal.SIGINT, self.signal_handler)
def signal_handler(self, signum, frame):
"""Handle shutdown signals"""
print(f"Received signal {signum}, initiating graceful shutdown...")
self.shutdown_event.set()
# Wait for active requests to complete
self.wait_for_requests()
# Perform cleanup
self.cleanup()
print("Graceful shutdown completed")
sys.exit(0)
def request_started(self):
"""Called when a request starts"""
with self.request_lock:
self.active_requests += 1
def request_finished(self):
"""Called when a request finishes"""
with self.request_lock:
self.active_requests -= 1
def wait_for_requests(self, timeout=30):
"""Wait for active requests to complete"""
start_time = time.time()
while self.active_requests > 0:
if time.time() - start_time > timeout:
print(f"Timeout waiting for {self.active_requests} active requests")
break
print(f"Waiting for {self.active_requests} active requests to complete...")
time.sleep(1)
def cleanup(self):
"""Perform cleanup tasks"""
# Close database connections
from django.db import connections
for conn in connections.all():
conn.close()
# Close cache connections
from django.core.cache import caches
for cache in caches.all():
if hasattr(cache, 'close'):
cache.close()
# Stop Celery workers gracefully
try:
from celery import current_app
current_app.control.shutdown()
except:
pass
# Middleware to track active requests
from django.utils.deprecation import MiddlewareMixin
shutdown_handler = GracefulShutdownHandler()
class GracefulShutdownMiddleware(MiddlewareMixin):
"""Middleware to track active requests for graceful shutdown"""
def __call__(self, request):
if shutdown_handler.shutdown_event.is_set():
# Service is shutting down, reject new requests
from django.http import HttpResponse
return HttpResponse("Service shutting down", status=503)
shutdown_handler.request_started()
try:
response = self.get_response(request)
return response
finally:
shutdown_handler.request_finished()
Implement proper resource management:
# resource_management.py
from django.conf import settings
import threading
import time
import psutil
import logging
logger = logging.getLogger(__name__)
class ResourceMonitor:
"""Monitor and manage system resources"""
def __init__(self):
self.monitoring = False
self.monitor_thread = None
self.thresholds = {
'cpu_percent': 80,
'memory_percent': 85,
'disk_percent': 90
}
def start_monitoring(self):
"""Start resource monitoring"""
if not self.monitoring:
self.monitoring = True
self.monitor_thread = threading.Thread(target=self._monitor_loop)
self.monitor_thread.daemon = True
self.monitor_thread.start()
logger.info("Resource monitoring started")
def stop_monitoring(self):
"""Stop resource monitoring"""
self.monitoring = False
if self.monitor_thread:
self.monitor_thread.join()
logger.info("Resource monitoring stopped")
def _monitor_loop(self):
"""Main monitoring loop"""
while self.monitoring:
try:
self._check_resources()
time.sleep(30) # Check every 30 seconds
except Exception as e:
logger.error(f"Error in resource monitoring: {e}")
def _check_resources(self):
"""Check system resources"""
# CPU usage
cpu_percent = psutil.cpu_percent(interval=1)
if cpu_percent > self.thresholds['cpu_percent']:
logger.warning(f"High CPU usage: {cpu_percent}%")
self._handle_high_cpu()
# Memory usage
memory = psutil.virtual_memory()
if memory.percent > self.thresholds['memory_percent']:
logger.warning(f"High memory usage: {memory.percent}%")
self._handle_high_memory()
# Disk usage
disk = psutil.disk_usage('/')
disk_percent = (disk.used / disk.total) * 100
if disk_percent > self.thresholds['disk_percent']:
logger.warning(f"High disk usage: {disk_percent}%")
self._handle_high_disk()
def _handle_high_cpu(self):
"""Handle high CPU usage"""
# Could implement CPU throttling or request limiting
pass
def _handle_high_memory(self):
"""Handle high memory usage"""
# Clear caches to free memory
from django.core.cache import cache
cache.clear()
logger.info("Cleared cache due to high memory usage")
def _handle_high_disk(self):
"""Handle high disk usage"""
# Could implement log rotation or cleanup
pass
# Connection pooling
class DatabaseConnectionManager:
"""Manage database connections efficiently"""
def __init__(self):
self.max_connections = getattr(settings, 'DB_MAX_CONNECTIONS', 20)
self.connection_timeout = getattr(settings, 'DB_CONNECTION_TIMEOUT', 300)
def configure_connection_pooling(self):
"""Configure database connection pooling"""
# This would typically be done in settings.py
database_config = {
'ENGINE': 'django.db.backends.postgresql',
'OPTIONS': {
'MAX_CONNS': self.max_connections,
'CONN_MAX_AGE': self.connection_timeout,
}
}
return database_config
# Memory management
class MemoryManager:
"""Manage memory usage"""
@staticmethod
def clear_query_cache():
"""Clear Django query cache"""
from django.db import reset_queries
reset_queries()
@staticmethod
def clear_template_cache():
"""Clear template cache"""
from django.template.loader import get_template
get_template.cache_clear()
@staticmethod
def force_garbage_collection():
"""Force garbage collection"""
import gc
collected = gc.collect()
logger.info(f"Garbage collection freed {collected} objects")
return collected
# Resource limits
class ResourceLimiter:
"""Implement resource limits"""
def __init__(self):
self.max_request_size = getattr(settings, 'MAX_REQUEST_SIZE', 10 * 1024 * 1024) # 10MB
self.max_response_size = getattr(settings, 'MAX_RESPONSE_SIZE', 50 * 1024 * 1024) # 50MB
def check_request_size(self, request):
"""Check if request size is within limits"""
content_length = request.META.get('CONTENT_LENGTH')
if content_length and int(content_length) > self.max_request_size:
from django.http import HttpResponseBadRequest
return HttpResponseBadRequest("Request too large")
return None
def check_response_size(self, response):
"""Check if response size is within limits"""
if hasattr(response, 'content') and len(response.content) > self.max_response_size:
logger.warning("Response size exceeds limit")
# Could truncate or compress response
return response
# Initialize resource monitoring
resource_monitor = ResourceMonitor()
resource_monitor.start_monitoring()
# metrics.py
from prometheus_client import Counter, Histogram, Gauge, Info
import time
import functools
# Application metrics
REQUEST_COUNT = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
REQUEST_DURATION = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint']
)
ACTIVE_CONNECTIONS = Gauge(
'active_connections',
'Number of active connections'
)
SERVICE_INFO = Info(
'service_info',
'Service information'
)
# Business metrics
USER_REGISTRATIONS = Counter(
'user_registrations_total',
'Total user registrations'
)
USER_LOGINS = Counter(
'user_logins_total',
'Total user logins',
['status']
)
ACTIVE_USERS = Gauge(
'active_users',
'Number of active users'
)
class MetricsCollector:
"""Collect application metrics"""
@staticmethod
def record_request(method, endpoint, status_code, duration):
"""Record HTTP request metrics"""
REQUEST_COUNT.labels(
method=method,
endpoint=endpoint,
status=status_code
).inc()
REQUEST_DURATION.labels(
method=method,
endpoint=endpoint
).observe(duration)
@staticmethod
def record_user_registration():
"""Record user registration"""
USER_REGISTRATIONS.inc()
@staticmethod
def record_user_login(success=True):
"""Record user login attempt"""
status = 'success' if success else 'failure'
USER_LOGINS.labels(status=status).inc()
@staticmethod
def update_active_users(count):
"""Update active users count"""
ACTIVE_USERS.set(count)
@staticmethod
def update_service_info():
"""Update service information"""
SERVICE_INFO.info({
'version': settings.SERVICE_VERSION,
'environment': settings.SERVICE_ENVIRONMENT,
'service': settings.SERVICE_NAME
})
# Metrics middleware
class MetricsMiddleware:
"""Middleware to collect request metrics"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
start_time = time.time()
response = self.get_response(request)
duration = time.time() - start_time
MetricsCollector.record_request(
method=request.method,
endpoint=request.path,
status_code=response.status_code,
duration=duration
)
return response
# Metrics decorator
def track_execution_time(metric_name):
"""Decorator to track function execution time"""
def decorator(func):
execution_time = Histogram(
f'{metric_name}_duration_seconds',
f'Execution time for {func.__name__}'
)
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
return result
finally:
duration = time.time() - start_time
execution_time.observe(duration)
return wrapper
return decorator
# Usage example
@track_execution_time('user_creation')
def create_user(user_data):
"""Create user with execution time tracking"""
user = User.objects.create_user(**user_data)
MetricsCollector.record_user_registration()
return user
Following these best practices ensures robust, scalable, and maintainable Django microservices:
Design Principles:
Development Practices:
Deployment Practices:
Key Takeaways:
These practices form the foundation for successful microservices architecture with Django. In the final section, we'll explore transforming monolithic applications into microservices.
Improving Microservices Performance with Caching
Performance optimization in microservices requires a multi-layered approach to caching, from application-level caching to distributed caching strategies. This chapter covers comprehensive caching techniques, performance monitoring, and optimization strategies for Django microservices.
Transforming a Monolithic Web App into a Microservice version
Migrating from a monolithic Django application to microservices is a complex but rewarding journey. This section provides a comprehensive guide for planning, executing, and managing this transformation while maintaining system stability and business continuity.