Microservices with Django

Best Practices

This section consolidates essential best practices for developing, deploying, and maintaining Django microservices. Following these practices ensures scalable, maintainable, and reliable microservices architecture.

Best Practices

This section consolidates essential best practices for developing, deploying, and maintaining Django microservices. Following these practices ensures scalable, maintainable, and reliable microservices architecture.

Design Principles

1. Single Responsibility Principle

Each microservice should have a single, well-defined responsibility:

# Good: User service focused on user management
class UserService:
    """Handles all user-related operations"""
    
    def create_user(self, user_data):
        """Create new user account"""
        pass
    
    def authenticate_user(self, credentials):
        """Authenticate user credentials"""
        pass
    
    def update_user_profile(self, user_id, profile_data):
        """Update user profile information"""
        pass
    
    def deactivate_user(self, user_id):
        """Deactivate user account"""
        pass

# Bad: Mixed responsibilities
class UserOrderPaymentService:
    """Handles users, orders, and payments - too many responsibilities"""
    
    def create_user(self, user_data):
        pass
    
    def create_order(self, order_data):
        pass
    
    def process_payment(self, payment_data):
        pass

2. Database per Service

Each service should own its data:

# User Service Database
# settings.py
DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql',
        'NAME': 'user_service_db',
        'USER': 'user_service_user',
        'PASSWORD': 'user_service_password',
        'HOST': 'user-db.internal',
        'PORT': '5432',
    }
}

# Order Service Database
DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql',
        'NAME': 'order_service_db',
        'USER': 'order_service_user',
        'PASSWORD': 'order_service_password',
        'HOST': 'order-db.internal',
        'PORT': '5432',
    }
}

# Never access another service's database directly
# Bad: Order service accessing user database
def get_user_orders(user_id):
    # Don't do this - accessing user database from order service
    user = User.objects.using('user_db').get(id=user_id)
    return user.orders.all()

# Good: Use API calls between services
def get_user_orders(user_id):
    # Validate user exists via API call
    user_client = ServiceClient('user-service')
    user_data = user_client.get(f'/api/v1/users/{user_id}/')
    
    if user_data:
        return Order.objects.filter(user_id=user_id)
    else:
        raise ValueError("User not found")

3. API-First Design

Design APIs before implementation:

# api_specification.py
"""
User Service API Specification

Base URL: /api/v1/users/

Endpoints:
- GET    /                    # List users
- POST   /                    # Create user
- GET    /{id}/               # Get user details
- PUT    /{id}/               # Update user
- DELETE /{id}/               # Delete user
- POST   /{id}/activate/      # Activate user
- POST   /{id}/deactivate/    # Deactivate user
- GET    /{id}/profile/       # Get user profile
- PUT    /{id}/profile/       # Update user profile

Request/Response formats defined in OpenAPI specification
"""

from drf_spectacular.utils import extend_schema, OpenApiExample

class UserViewSet(ModelViewSet):
    
    @extend_schema(
        summary="Create new user",
        description="Create a new user account with the provided information",
        request=UserCreateSerializer,
        responses={
            201: UserSerializer,
            400: OpenApiExample(
                'Validation Error',
                value={'email': ['This field must be unique.']}
            )
        },
        examples=[
            OpenApiExample(
                'User Creation',
                value={
                    'username': 'johndoe',
                    'email': 'john@example.com',
                    'first_name': 'John',
                    'last_name': 'Doe'
                }
            )
        ]
    )
    def create(self, request):
        return super().create(request)

Development Best Practices

1. Configuration Management

Use environment-based configuration:

# settings.py
import os
from decouple import config, Csv

# Environment-based settings
DEBUG = config('DEBUG', default=False, cast=bool)
SECRET_KEY = config('SECRET_KEY')
ALLOWED_HOSTS = config('ALLOWED_HOSTS', default='', cast=Csv())

# Database configuration
DATABASE_URL = config('DATABASE_URL')
DATABASES = {
    'default': dj_database_url.parse(DATABASE_URL)
}

# External service URLs
USER_SERVICE_URL = config('USER_SERVICE_URL', default='http://user-service:8000')
PRODUCT_SERVICE_URL = config('PRODUCT_SERVICE_URL', default='http://product-service:8000')
ORDER_SERVICE_URL = config('ORDER_SERVICE_URL', default='http://order-service:8000')

# Feature flags
ENABLE_CACHING = config('ENABLE_CACHING', default=True, cast=bool)
ENABLE_ASYNC_PROCESSING = config('ENABLE_ASYNC_PROCESSING', default=True, cast=bool)
ENABLE_METRICS = config('ENABLE_METRICS', default=True, cast=bool)

# Service-specific configuration
SERVICE_NAME = config('SERVICE_NAME', default='user-service')
SERVICE_VERSION = config('SERVICE_VERSION', default='1.0.0')
SERVICE_ENVIRONMENT = config('SERVICE_ENVIRONMENT', default='development')

# Logging configuration
LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'json': {
            'format': '{"timestamp": "%(asctime)s", "level": "%(levelname)s", "service": "' + SERVICE_NAME + '", "message": "%(message)s"}',
        },
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'formatter': 'json',
        },
    },
    'root': {
        'handlers': ['console'],
        'level': config('LOG_LEVEL', default='INFO'),
    },
}

2. Error Handling

Implement consistent error handling:

# exceptions.py
from rest_framework.views import exception_handler
from rest_framework.response import Response
from rest_framework import status
import logging
import uuid

logger = logging.getLogger(__name__)

class ServiceException(Exception):
    """Base exception for service errors"""
    
    def __init__(self, message, error_code=None, details=None):
        self.message = message
        self.error_code = error_code or 'SERVICE_ERROR'
        self.details = details or {}
        super().__init__(self.message)

class ValidationException(ServiceException):
    """Validation error exception"""
    
    def __init__(self, message, field_errors=None):
        super().__init__(message, 'VALIDATION_ERROR', {'field_errors': field_errors or {}})

class BusinessLogicException(ServiceException):
    """Business logic error exception"""
    
    def __init__(self, message, business_rule=None):
        super().__init__(message, 'BUSINESS_LOGIC_ERROR', {'business_rule': business_rule})

class ExternalServiceException(ServiceException):
    """External service error exception"""
    
    def __init__(self, message, service_name, status_code=None):
        super().__init__(
            message, 
            'EXTERNAL_SERVICE_ERROR', 
            {'service_name': service_name, 'status_code': status_code}
        )

def custom_exception_handler(exc, context):
    """Custom exception handler for consistent error responses"""
    
    # Generate correlation ID for error tracking
    correlation_id = str(uuid.uuid4())
    
    # Handle custom service exceptions
    if isinstance(exc, ServiceException):
        error_response = {
            'error': {
                'code': exc.error_code,
                'message': exc.message,
                'details': exc.details,
                'correlation_id': correlation_id,
                'service': settings.SERVICE_NAME,
                'timestamp': timezone.now().isoformat()
            }
        }
        
        # Log the error
        logger.error(f"Service error [{correlation_id}]: {exc.message}", extra={
            'error_code': exc.error_code,
            'details': exc.details,
            'correlation_id': correlation_id
        })
        
        # Determine HTTP status code
        status_code = status.HTTP_400_BAD_REQUEST
        if exc.error_code == 'EXTERNAL_SERVICE_ERROR':
            status_code = status.HTTP_502_BAD_GATEWAY
        elif exc.error_code == 'BUSINESS_LOGIC_ERROR':
            status_code = status.HTTP_422_UNPROCESSABLE_ENTITY
        
        return Response(error_response, status=status_code)
    
    # Handle Django REST framework exceptions
    response = exception_handler(exc, context)
    
    if response is not None:
        # Enhance DRF error responses
        custom_response_data = {
            'error': {
                'code': 'VALIDATION_ERROR' if response.status_code == 400 else 'API_ERROR',
                'message': 'Request validation failed' if response.status_code == 400 else 'API error occurred',
                'details': response.data,
                'correlation_id': correlation_id,
                'service': settings.SERVICE_NAME,
                'timestamp': timezone.now().isoformat()
            }
        }
        
        # Log the error
        logger.warning(f"API error [{correlation_id}]: {response.status_code}", extra={
            'status_code': response.status_code,
            'details': response.data,
            'correlation_id': correlation_id
        })
        
        response.data = custom_response_data
    
    return response

# Usage in views
class UserViewSet(ModelViewSet):
    
    def create(self, request):
        """Create user with proper error handling"""
        try:
            # Validate business rules
            if User.objects.filter(email=request.data.get('email')).exists():
                raise BusinessLogicException(
                    "User with this email already exists",
                    business_rule="unique_email"
                )
            
            # Validate with external service
            validation_result = self.validate_with_external_service(request.data)
            if not validation_result['valid']:
                raise ExternalServiceException(
                    "External validation failed",
                    service_name="validation-service",
                    status_code=validation_result.get('status_code')
                )
            
            return super().create(request)
            
        except ServiceException:
            raise  # Re-raise service exceptions
        except Exception as e:
            # Log unexpected errors
            logger.exception("Unexpected error in user creation")
            raise ServiceException(
                "An unexpected error occurred",
                error_code="INTERNAL_ERROR"
            )

3. Logging and Observability

Implement structured logging:

# logging_utils.py
import logging
import json
import time
from django.utils.deprecation import MiddlewareMixin
from django.conf import settings

class StructuredLogger:
    """Structured logging utility"""
    
    def __init__(self, name):
        self.logger = logging.getLogger(name)
    
    def log(self, level, message, **kwargs):
        """Log structured message"""
        log_data = {
            'timestamp': time.time(),
            'service': settings.SERVICE_NAME,
            'version': settings.SERVICE_VERSION,
            'environment': settings.SERVICE_ENVIRONMENT,
            'message': message,
            **kwargs
        }
        
        self.logger.log(level, json.dumps(log_data))
    
    def info(self, message, **kwargs):
        self.log(logging.INFO, message, **kwargs)
    
    def warning(self, message, **kwargs):
        self.log(logging.WARNING, message, **kwargs)
    
    def error(self, message, **kwargs):
        self.log(logging.ERROR, message, **kwargs)
    
    def debug(self, message, **kwargs):
        self.log(logging.DEBUG, message, **kwargs)

class RequestLoggingMiddleware(MiddlewareMixin):
    """Log all requests with structured data"""
    
    def __init__(self, get_response):
        self.get_response = get_response
        self.logger = StructuredLogger('requests')
    
    def __call__(self, request):
        start_time = time.time()
        
        # Log request
        self.logger.info("Request started", 
            method=request.method,
            path=request.path,
            user_agent=request.META.get('HTTP_USER_AGENT'),
            ip_address=self.get_client_ip(request),
            correlation_id=getattr(request, 'correlation_id', None)
        )
        
        response = self.get_response(request)
        
        # Log response
        duration = time.time() - start_time
        self.logger.info("Request completed",
            method=request.method,
            path=request.path,
            status_code=response.status_code,
            duration_ms=duration * 1000,
            correlation_id=getattr(request, 'correlation_id', None)
        )
        
        return response
    
    def get_client_ip(self, request):
        """Get client IP address"""
        x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
        if x_forwarded_for:
            ip = x_forwarded_for.split(',')[0]
        else:
            ip = request.META.get('REMOTE_ADDR')
        return ip

# Business logic logging
class UserService:
    """User service with structured logging"""
    
    def __init__(self):
        self.logger = StructuredLogger('user_service')
    
    def create_user(self, user_data):
        """Create user with logging"""
        self.logger.info("Creating user", 
            username=user_data.get('username'),
            email=user_data.get('email')
        )
        
        try:
            user = User.objects.create_user(**user_data)
            
            self.logger.info("User created successfully",
                user_id=user.id,
                username=user.username
            )
            
            return user
            
        except Exception as e:
            self.logger.error("User creation failed",
                error=str(e),
                username=user_data.get('username'),
                email=user_data.get('email')
            )
            raise

4. Testing Strategy

Implement comprehensive testing:

# test_strategy.py
import pytest
from django.test import TestCase, TransactionTestCase
from rest_framework.test import APITestCase
from unittest.mock import patch, Mock
import responses

class BaseTestCase(TestCase):
    """Base test case with common utilities"""
    
    def setUp(self):
        self.user = self.create_test_user()
    
    def create_test_user(self, **kwargs):
        """Create test user with defaults"""
        defaults = {
            'username': 'testuser',
            'email': 'test@example.com',
            'first_name': 'Test',
            'last_name': 'User'
        }
        defaults.update(kwargs)
        return User.objects.create_user(**defaults)
    
    def assert_error_response(self, response, error_code, status_code):
        """Assert error response format"""
        self.assertEqual(response.status_code, status_code)
        self.assertIn('error', response.data)
        self.assertEqual(response.data['error']['code'], error_code)

class UserServiceTest(BaseTestCase):
    """Unit tests for user service"""
    
    def test_create_user_success(self):
        """Test successful user creation"""
        user_data = {
            'username': 'newuser',
            'email': 'new@example.com',
            'first_name': 'New',
            'last_name': 'User'
        }
        
        service = UserService()
        user = service.create_user(user_data)
        
        self.assertEqual(user.username, 'newuser')
        self.assertEqual(user.email, 'new@example.com')
        self.assertTrue(User.objects.filter(username='newuser').exists())
    
    def test_create_user_duplicate_email(self):
        """Test user creation with duplicate email"""
        user_data = {
            'username': 'newuser',
            'email': 'test@example.com',  # Duplicate email
            'first_name': 'New',
            'last_name': 'User'
        }
        
        service = UserService()
        
        with self.assertRaises(BusinessLogicException) as context:
            service.create_user(user_data)
        
        self.assertEqual(context.exception.error_code, 'BUSINESS_LOGIC_ERROR')

class UserAPITest(APITestCase):
    """Integration tests for user API"""
    
    def setUp(self):
        super().setUp()
        self.user = self.create_test_user()
        self.client.force_authenticate(user=self.user)
    
    def test_list_users(self):
        """Test user list endpoint"""
        response = self.client.get('/api/v1/users/')
        
        self.assertEqual(response.status_code, 200)
        self.assertIn('results', response.data)
        self.assertIn('pagination', response.data)
    
    @responses.activate
    def test_create_user_with_external_validation(self):
        """Test user creation with external service validation"""
        # Mock external service response
        responses.add(
            responses.POST,
            'http://validation-service:8000/api/v1/validate/',
            json={'valid': True, 'score': 0.95},
            status=200
        )
        
        user_data = {
            'username': 'validuser',
            'email': 'valid@example.com',
            'first_name': 'Valid',
            'last_name': 'User'
        }
        
        response = self.client.post('/api/v1/users/', user_data)
        
        self.assertEqual(response.status_code, 201)
        self.assertEqual(response.data['username'], 'validuser')

class UserIntegrationTest(TransactionTestCase):
    """Integration tests with database transactions"""
    
    def test_user_creation_with_profile(self):
        """Test user creation creates profile atomically"""
        user_data = {
            'username': 'profileuser',
            'email': 'profile@example.com'
        }
        
        service = UserService()
        user = service.create_user(user_data)
        
        # Profile should be created automatically
        self.assertTrue(hasattr(user, 'profile'))
        self.assertIsNotNone(user.profile)

# Performance tests
class UserPerformanceTest(TestCase):
    """Performance tests for user operations"""
    
    def test_user_list_performance(self):
        """Test user list performance with large dataset"""
        # Create 1000 test users
        users = [
            User(username=f'user{i}', email=f'user{i}@example.com')
            for i in range(1000)
        ]
        User.objects.bulk_create(users)
        
        # Test query performance
        import time
        start_time = time.time()
        
        user_list = list(User.objects.all()[:20])
        
        end_time = time.time()
        query_time = end_time - start_time
        
        # Should complete within 100ms
        self.assertLess(query_time, 0.1)
        self.assertEqual(len(user_list), 20)

# Contract tests
class UserContractTest(TestCase):
    """Contract tests for API compatibility"""
    
    def test_user_response_schema(self):
        """Test user response matches expected schema"""
        user = self.create_test_user()
        
        from .serializers import UserSerializer
        serializer = UserSerializer(user)
        data = serializer.data
        
        # Verify required fields
        required_fields = ['id', 'username', 'email', 'first_name', 'last_name']
        for field in required_fields:
            self.assertIn(field, data)
        
        # Verify field types
        self.assertIsInstance(data['id'], int)
        self.assertIsInstance(data['username'], str)
        self.assertIsInstance(data['email'], str)

Deployment Best Practices

1. Health Checks

Implement comprehensive health checks:

# health_checks.py
from django.http import JsonResponse
from django.db import connection
from django.core.cache import cache
from django.conf import settings
import time
import requests

class HealthChecker:
    """Comprehensive health check implementation"""
    
    def __init__(self):
        self.checks = {
            'database': self.check_database,
            'cache': self.check_cache,
            'external_services': self.check_external_services,
            'disk_space': self.check_disk_space,
            'memory': self.check_memory
        }
    
    def check_all(self):
        """Run all health checks"""
        results = {}
        overall_healthy = True
        
        for check_name, check_func in self.checks.items():
            try:
                result = check_func()
                results[check_name] = result
                
                if not result.get('healthy', False):
                    overall_healthy = False
                    
            except Exception as e:
                results[check_name] = {
                    'healthy': False,
                    'error': str(e)
                }
                overall_healthy = False
        
        return {
            'healthy': overall_healthy,
            'service': settings.SERVICE_NAME,
            'version': settings.SERVICE_VERSION,
            'timestamp': time.time(),
            'checks': results
        }
    
    def check_database(self):
        """Check database connectivity"""
        try:
            with connection.cursor() as cursor:
                cursor.execute("SELECT 1")
                result = cursor.fetchone()
            
            return {
                'healthy': True,
                'response_time_ms': 0  # Could measure actual response time
            }
            
        except Exception as e:
            return {
                'healthy': False,
                'error': str(e)
            }
    
    def check_cache(self):
        """Check cache connectivity"""
        try:
            test_key = f"health_check_{int(time.time())}"
            test_value = "health_check_value"
            
            # Test set and get
            cache.set(test_key, test_value, timeout=60)
            retrieved_value = cache.get(test_key)
            cache.delete(test_key)
            
            if retrieved_value != test_value:
                return {
                    'healthy': False,
                    'error': 'Cache value mismatch'
                }
            
            return {
                'healthy': True,
                'response_time_ms': 0
            }
            
        except Exception as e:
            return {
                'healthy': False,
                'error': str(e)
            }
    
    def check_external_services(self):
        """Check external service dependencies"""
        external_services = getattr(settings, 'EXTERNAL_SERVICES', {})
        service_results = {}
        
        for service_name, service_url in external_services.items():
            try:
                response = requests.get(
                    f"{service_url}/health/",
                    timeout=5
                )
                
                service_results[service_name] = {
                    'healthy': response.status_code == 200,
                    'status_code': response.status_code,
                    'response_time_ms': response.elapsed.total_seconds() * 1000
                }
                
            except requests.RequestException as e:
                service_results[service_name] = {
                    'healthy': False,
                    'error': str(e)
                }
        
        overall_healthy = all(
            result['healthy'] for result in service_results.values()
        )
        
        return {
            'healthy': overall_healthy,
            'services': service_results
        }
    
    def check_disk_space(self):
        """Check available disk space"""
        try:
            import shutil
            
            total, used, free = shutil.disk_usage('/')
            free_percentage = (free / total) * 100
            
            return {
                'healthy': free_percentage > 10,  # Alert if less than 10% free
                'free_percentage': free_percentage,
                'free_bytes': free,
                'total_bytes': total
            }
            
        except Exception as e:
            return {
                'healthy': False,
                'error': str(e)
            }
    
    def check_memory(self):
        """Check memory usage"""
        try:
            import psutil
            
            memory = psutil.virtual_memory()
            
            return {
                'healthy': memory.percent < 90,  # Alert if more than 90% used
                'used_percentage': memory.percent,
                'available_bytes': memory.available,
                'total_bytes': memory.total
            }
            
        except Exception as e:
            return {
                'healthy': False,
                'error': str(e)
            }

# Health check views
health_checker = HealthChecker()

def health_check(request):
    """Basic health check endpoint"""
    result = health_checker.check_all()
    status_code = 200 if result['healthy'] else 503
    
    return JsonResponse(result, status=status_code)

def readiness_check(request):
    """Readiness check for Kubernetes"""
    # Check if service is ready to receive traffic
    result = health_checker.check_all()
    
    # Service is ready if database and cache are healthy
    ready = (
        result['checks'].get('database', {}).get('healthy', False) and
        result['checks'].get('cache', {}).get('healthy', False)
    )
    
    status_code = 200 if ready else 503
    
    return JsonResponse({
        'ready': ready,
        'service': settings.SERVICE_NAME,
        'timestamp': time.time()
    }, status=status_code)

def liveness_check(request):
    """Liveness check for Kubernetes"""
    # Simple check to see if the service is alive
    return JsonResponse({
        'alive': True,
        'service': settings.SERVICE_NAME,
        'timestamp': time.time()
    })

2. Graceful Shutdown

Implement graceful shutdown handling:

# graceful_shutdown.py
import signal
import sys
import threading
import time
from django.core.management.base import BaseCommand
from django.conf import settings

class GracefulShutdownHandler:
    """Handle graceful shutdown of the service"""
    
    def __init__(self):
        self.shutdown_event = threading.Event()
        self.active_requests = 0
        self.request_lock = threading.Lock()
        
        # Register signal handlers
        signal.signal(signal.SIGTERM, self.signal_handler)
        signal.signal(signal.SIGINT, self.signal_handler)
    
    def signal_handler(self, signum, frame):
        """Handle shutdown signals"""
        print(f"Received signal {signum}, initiating graceful shutdown...")
        self.shutdown_event.set()
        
        # Wait for active requests to complete
        self.wait_for_requests()
        
        # Perform cleanup
        self.cleanup()
        
        print("Graceful shutdown completed")
        sys.exit(0)
    
    def request_started(self):
        """Called when a request starts"""
        with self.request_lock:
            self.active_requests += 1
    
    def request_finished(self):
        """Called when a request finishes"""
        with self.request_lock:
            self.active_requests -= 1
    
    def wait_for_requests(self, timeout=30):
        """Wait for active requests to complete"""
        start_time = time.time()
        
        while self.active_requests > 0:
            if time.time() - start_time > timeout:
                print(f"Timeout waiting for {self.active_requests} active requests")
                break
            
            print(f"Waiting for {self.active_requests} active requests to complete...")
            time.sleep(1)
    
    def cleanup(self):
        """Perform cleanup tasks"""
        # Close database connections
        from django.db import connections
        for conn in connections.all():
            conn.close()
        
        # Close cache connections
        from django.core.cache import caches
        for cache in caches.all():
            if hasattr(cache, 'close'):
                cache.close()
        
        # Stop Celery workers gracefully
        try:
            from celery import current_app
            current_app.control.shutdown()
        except:
            pass

# Middleware to track active requests
from django.utils.deprecation import MiddlewareMixin

shutdown_handler = GracefulShutdownHandler()

class GracefulShutdownMiddleware(MiddlewareMixin):
    """Middleware to track active requests for graceful shutdown"""
    
    def __call__(self, request):
        if shutdown_handler.shutdown_event.is_set():
            # Service is shutting down, reject new requests
            from django.http import HttpResponse
            return HttpResponse("Service shutting down", status=503)
        
        shutdown_handler.request_started()
        
        try:
            response = self.get_response(request)
            return response
        finally:
            shutdown_handler.request_finished()

3. Resource Management

Implement proper resource management:

# resource_management.py
from django.conf import settings
import threading
import time
import psutil
import logging

logger = logging.getLogger(__name__)

class ResourceMonitor:
    """Monitor and manage system resources"""
    
    def __init__(self):
        self.monitoring = False
        self.monitor_thread = None
        self.thresholds = {
            'cpu_percent': 80,
            'memory_percent': 85,
            'disk_percent': 90
        }
    
    def start_monitoring(self):
        """Start resource monitoring"""
        if not self.monitoring:
            self.monitoring = True
            self.monitor_thread = threading.Thread(target=self._monitor_loop)
            self.monitor_thread.daemon = True
            self.monitor_thread.start()
            logger.info("Resource monitoring started")
    
    def stop_monitoring(self):
        """Stop resource monitoring"""
        self.monitoring = False
        if self.monitor_thread:
            self.monitor_thread.join()
        logger.info("Resource monitoring stopped")
    
    def _monitor_loop(self):
        """Main monitoring loop"""
        while self.monitoring:
            try:
                self._check_resources()
                time.sleep(30)  # Check every 30 seconds
            except Exception as e:
                logger.error(f"Error in resource monitoring: {e}")
    
    def _check_resources(self):
        """Check system resources"""
        # CPU usage
        cpu_percent = psutil.cpu_percent(interval=1)
        if cpu_percent > self.thresholds['cpu_percent']:
            logger.warning(f"High CPU usage: {cpu_percent}%")
            self._handle_high_cpu()
        
        # Memory usage
        memory = psutil.virtual_memory()
        if memory.percent > self.thresholds['memory_percent']:
            logger.warning(f"High memory usage: {memory.percent}%")
            self._handle_high_memory()
        
        # Disk usage
        disk = psutil.disk_usage('/')
        disk_percent = (disk.used / disk.total) * 100
        if disk_percent > self.thresholds['disk_percent']:
            logger.warning(f"High disk usage: {disk_percent}%")
            self._handle_high_disk()
    
    def _handle_high_cpu(self):
        """Handle high CPU usage"""
        # Could implement CPU throttling or request limiting
        pass
    
    def _handle_high_memory(self):
        """Handle high memory usage"""
        # Clear caches to free memory
        from django.core.cache import cache
        cache.clear()
        logger.info("Cleared cache due to high memory usage")
    
    def _handle_high_disk(self):
        """Handle high disk usage"""
        # Could implement log rotation or cleanup
        pass

# Connection pooling
class DatabaseConnectionManager:
    """Manage database connections efficiently"""
    
    def __init__(self):
        self.max_connections = getattr(settings, 'DB_MAX_CONNECTIONS', 20)
        self.connection_timeout = getattr(settings, 'DB_CONNECTION_TIMEOUT', 300)
    
    def configure_connection_pooling(self):
        """Configure database connection pooling"""
        # This would typically be done in settings.py
        database_config = {
            'ENGINE': 'django.db.backends.postgresql',
            'OPTIONS': {
                'MAX_CONNS': self.max_connections,
                'CONN_MAX_AGE': self.connection_timeout,
            }
        }
        return database_config

# Memory management
class MemoryManager:
    """Manage memory usage"""
    
    @staticmethod
    def clear_query_cache():
        """Clear Django query cache"""
        from django.db import reset_queries
        reset_queries()
    
    @staticmethod
    def clear_template_cache():
        """Clear template cache"""
        from django.template.loader import get_template
        get_template.cache_clear()
    
    @staticmethod
    def force_garbage_collection():
        """Force garbage collection"""
        import gc
        collected = gc.collect()
        logger.info(f"Garbage collection freed {collected} objects")
        return collected

# Resource limits
class ResourceLimiter:
    """Implement resource limits"""
    
    def __init__(self):
        self.max_request_size = getattr(settings, 'MAX_REQUEST_SIZE', 10 * 1024 * 1024)  # 10MB
        self.max_response_size = getattr(settings, 'MAX_RESPONSE_SIZE', 50 * 1024 * 1024)  # 50MB
    
    def check_request_size(self, request):
        """Check if request size is within limits"""
        content_length = request.META.get('CONTENT_LENGTH')
        if content_length and int(content_length) > self.max_request_size:
            from django.http import HttpResponseBadRequest
            return HttpResponseBadRequest("Request too large")
        return None
    
    def check_response_size(self, response):
        """Check if response size is within limits"""
        if hasattr(response, 'content') and len(response.content) > self.max_response_size:
            logger.warning("Response size exceeds limit")
            # Could truncate or compress response
        return response

# Initialize resource monitoring
resource_monitor = ResourceMonitor()
resource_monitor.start_monitoring()

Monitoring and Observability

1. Metrics Collection

# metrics.py
from prometheus_client import Counter, Histogram, Gauge, Info
import time
import functools

# Application metrics
REQUEST_COUNT = Counter(
    'http_requests_total',
    'Total HTTP requests',
    ['method', 'endpoint', 'status']
)

REQUEST_DURATION = Histogram(
    'http_request_duration_seconds',
    'HTTP request duration',
    ['method', 'endpoint']
)

ACTIVE_CONNECTIONS = Gauge(
    'active_connections',
    'Number of active connections'
)

SERVICE_INFO = Info(
    'service_info',
    'Service information'
)

# Business metrics
USER_REGISTRATIONS = Counter(
    'user_registrations_total',
    'Total user registrations'
)

USER_LOGINS = Counter(
    'user_logins_total',
    'Total user logins',
    ['status']
)

ACTIVE_USERS = Gauge(
    'active_users',
    'Number of active users'
)

class MetricsCollector:
    """Collect application metrics"""
    
    @staticmethod
    def record_request(method, endpoint, status_code, duration):
        """Record HTTP request metrics"""
        REQUEST_COUNT.labels(
            method=method,
            endpoint=endpoint,
            status=status_code
        ).inc()
        
        REQUEST_DURATION.labels(
            method=method,
            endpoint=endpoint
        ).observe(duration)
    
    @staticmethod
    def record_user_registration():
        """Record user registration"""
        USER_REGISTRATIONS.inc()
    
    @staticmethod
    def record_user_login(success=True):
        """Record user login attempt"""
        status = 'success' if success else 'failure'
        USER_LOGINS.labels(status=status).inc()
    
    @staticmethod
    def update_active_users(count):
        """Update active users count"""
        ACTIVE_USERS.set(count)
    
    @staticmethod
    def update_service_info():
        """Update service information"""
        SERVICE_INFO.info({
            'version': settings.SERVICE_VERSION,
            'environment': settings.SERVICE_ENVIRONMENT,
            'service': settings.SERVICE_NAME
        })

# Metrics middleware
class MetricsMiddleware:
    """Middleware to collect request metrics"""
    
    def __init__(self, get_response):
        self.get_response = get_response
    
    def __call__(self, request):
        start_time = time.time()
        
        response = self.get_response(request)
        
        duration = time.time() - start_time
        
        MetricsCollector.record_request(
            method=request.method,
            endpoint=request.path,
            status_code=response.status_code,
            duration=duration
        )
        
        return response

# Metrics decorator
def track_execution_time(metric_name):
    """Decorator to track function execution time"""
    def decorator(func):
        execution_time = Histogram(
            f'{metric_name}_duration_seconds',
            f'Execution time for {func.__name__}'
        )
        
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time.time()
            try:
                result = func(*args, **kwargs)
                return result
            finally:
                duration = time.time() - start_time
                execution_time.observe(duration)
        
        return wrapper
    return decorator

# Usage example
@track_execution_time('user_creation')
def create_user(user_data):
    """Create user with execution time tracking"""
    user = User.objects.create_user(**user_data)
    MetricsCollector.record_user_registration()
    return user

Summary

Following these best practices ensures robust, scalable, and maintainable Django microservices:

Design Principles:

  • Single responsibility per service
  • Database per service
  • API-first design approach

Development Practices:

  • Environment-based configuration
  • Consistent error handling
  • Structured logging
  • Comprehensive testing

Deployment Practices:

  • Health checks and monitoring
  • Graceful shutdown handling
  • Resource management
  • Metrics collection

Key Takeaways:

  • Plan service boundaries carefully
  • Implement proper error handling and logging
  • Use comprehensive testing strategies
  • Monitor service health and performance
  • Follow security best practices
  • Maintain clear documentation

These practices form the foundation for successful microservices architecture with Django. In the final section, we'll explore transforming monolithic applications into microservices.