Microservices with Django

Introducing the Django Microservices Architecture

Django, traditionally known for building monolithic web applications, can be effectively adapted for microservices architecture. In this section, we'll explore Django's native components that support microservices development and the external tools that complement Django in a distributed environment.

Introducing the Django Microservices Architecture

Django, traditionally known for building monolithic web applications, can be effectively adapted for microservices architecture. In this section, we'll explore Django's native components that support microservices development and the external tools that complement Django in a distributed environment.

Technical Requirements

Before diving into Django microservices, ensure you have:

Software Requirements

  • Python 3.8+
  • Django 4.2+
  • Docker and Docker Compose
  • PostgreSQL or MongoDB
  • Redis for caching and message brokering
  • Git for version control

Development Tools

  • IDE with Python support (VS Code, PyCharm)
  • API testing tools (Postman, HTTPie)
  • Container orchestration knowledge (Kubernetes basics)
  • Message queue understanding (RabbitMQ, Apache Kafka)

Infrastructure Knowledge

  • RESTful API design principles
  • Database design and optimization
  • Caching strategies
  • Security best practices
  • Monitoring and logging

Django's Native Components for Microservices

1. Django REST Framework (DRF)

DRF is essential for building APIs in microservices:

# settings.py
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'rest_framework',
    'rest_framework.authtoken',
    'corsheaders',
]

REST_FRAMEWORK = {
    'DEFAULT_AUTHENTICATION_CLASSES': [
        'rest_framework.authentication.TokenAuthentication',
        'rest_framework.authentication.SessionAuthentication',
    ],
    'DEFAULT_PERMISSION_CLASSES': [
        'rest_framework.permissions.IsAuthenticated',
    ],
    'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.PageNumberPagination',
    'PAGE_SIZE': 20,
    'DEFAULT_RENDERER_CLASSES': [
        'rest_framework.renderers.JSONRenderer',
    ],
    'DEFAULT_THROTTLE_CLASSES': [
        'rest_framework.throttling.AnonRateThrottle',
        'rest_framework.throttling.UserRateThrottle'
    ],
    'DEFAULT_THROTTLE_RATES': {
        'anon': '100/hour',
        'user': '1000/hour'
    }
}

2. Serializers for Data Validation

# serializers.py
from rest_framework import serializers
from .models import User, Product, Order

class UserSerializer(serializers.ModelSerializer):
    class Meta:
        model = User
        fields = ['id', 'username', 'email', 'first_name', 'last_name', 'date_joined']
        read_only_fields = ['id', 'date_joined']

class ProductSerializer(serializers.ModelSerializer):
    class Meta:
        model = Product
        fields = '__all__'
    
    def validate_price(self, value):
        if value <= 0:
            raise serializers.ValidationError("Price must be positive")
        return value

class OrderSerializer(serializers.ModelSerializer):
    user = UserSerializer(read_only=True)
    products = ProductSerializer(many=True, read_only=True)
    total_amount = serializers.DecimalField(max_digits=10, decimal_places=2, read_only=True)
    
    class Meta:
        model = Order
        fields = ['id', 'user', 'products', 'status', 'total_amount', 'created_at']

3. ViewSets for CRUD Operations

# views.py
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from django.db import transaction
from .models import User, Product, Order
from .serializers import UserSerializer, ProductSerializer, OrderSerializer

class UserViewSet(viewsets.ModelViewSet):
    queryset = User.objects.all()
    serializer_class = UserSerializer
    
    @action(detail=True, methods=['get'])
    def orders(self, request, pk=None):
        """Get user's orders"""
        user = self.get_object()
        orders = Order.objects.filter(user=user)
        serializer = OrderSerializer(orders, many=True)
        return Response(serializer.data)
    
    @action(detail=True, methods=['post'])
    def change_password(self, request, pk=None):
        """Change user password"""
        user = self.get_object()
        old_password = request.data.get('old_password')
        new_password = request.data.get('new_password')
        
        if not user.check_password(old_password):
            return Response({'error': 'Invalid old password'}, 
                          status=status.HTTP_400_BAD_REQUEST)
        
        user.set_password(new_password)
        user.save()
        return Response({'message': 'Password changed successfully'})

class ProductViewSet(viewsets.ModelViewSet):
    queryset = Product.objects.all()
    serializer_class = ProductSerializer
    
    @action(detail=False, methods=['get'])
    def featured(self, request):
        """Get featured products"""
        featured_products = Product.objects.filter(is_featured=True)
        serializer = self.get_serializer(featured_products, many=True)
        return Response(serializer.data)
    
    @action(detail=True, methods=['post'])
    def update_inventory(self, request, pk=None):
        """Update product inventory"""
        product = self.get_object()
        quantity = request.data.get('quantity', 0)
        
        with transaction.atomic():
            product.inventory_count += quantity
            product.save()
        
        return Response({'inventory_count': product.inventory_count})

4. URL Routing

# urls.py
from django.urls import path, include
from rest_framework.routers import DefaultRouter
from . import views

router = DefaultRouter()
router.register(r'users', views.UserViewSet)
router.register(r'products', views.ProductViewSet)
router.register(r'orders', views.OrderViewSet)

urlpatterns = [
    path('api/v1/', include(router.urls)),
    path('api/auth/', include('rest_framework.urls')),
    path('health/', views.health_check, name='health_check'),
]

# Health check endpoint
from django.http import JsonResponse
from django.db import connection

def health_check(request):
    """Health check endpoint for service discovery"""
    try:
        # Check database connection
        with connection.cursor() as cursor:
            cursor.execute("SELECT 1")
        
        return JsonResponse({
            'status': 'healthy',
            'service': 'user-service',
            'version': '1.0.0',
            'timestamp': timezone.now().isoformat()
        })
    except Exception as e:
        return JsonResponse({
            'status': 'unhealthy',
            'error': str(e)
        }, status=500)

5. Database Configuration

# settings.py - Service-specific database configuration
import os

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql',
        'NAME': os.getenv('DB_NAME', 'user_service_db'),
        'USER': os.getenv('DB_USER', 'postgres'),
        'PASSWORD': os.getenv('DB_PASSWORD', 'password'),
        'HOST': os.getenv('DB_HOST', 'localhost'),
        'PORT': os.getenv('DB_PORT', '5432'),
        'OPTIONS': {
            'MAX_CONNS': 20,
            'CONN_MAX_AGE': 600,
        }
    }
}

# Connection pooling for better performance
DATABASE_POOL_ARGS = {
    'max_overflow': 10,
    'pool_pre_ping': True,
    'pool_recycle': 300,
}

6. Middleware for Cross-Service Communication

# middleware.py
import uuid
import time
import logging
from django.utils.deprecation import MiddlewareMixin

logger = logging.getLogger(__name__)

class RequestTrackingMiddleware(MiddlewareMixin):
    """Track requests across microservices"""
    
    def process_request(self, request):
        # Generate or extract correlation ID
        correlation_id = request.META.get('HTTP_X_CORRELATION_ID', str(uuid.uuid4()))
        request.correlation_id = correlation_id
        request.start_time = time.time()
        
        logger.info(f"Request started: {request.method} {request.path} "
                   f"[{correlation_id}]")
    
    def process_response(self, request, response):
        if hasattr(request, 'correlation_id'):
            response['X-Correlation-ID'] = request.correlation_id
            
            # Log request completion
            duration = time.time() - request.start_time
            logger.info(f"Request completed: {request.method} {request.path} "
                       f"[{request.correlation_id}] - {response.status_code} "
                       f"({duration:.3f}s)")
        
        return response

class ServiceAuthMiddleware(MiddlewareMixin):
    """Authenticate inter-service communication"""
    
    def process_request(self, request):
        # Check for service-to-service authentication
        service_token = request.META.get('HTTP_X_SERVICE_TOKEN')
        if service_token:
            # Validate service token
            if self.validate_service_token(service_token):
                request.is_service_request = True
            else:
                from django.http import JsonResponse
                return JsonResponse({'error': 'Invalid service token'}, status=401)
    
    def validate_service_token(self, token):
        # Implement your service token validation logic
        from django.conf import settings
        return token == settings.SERVICE_SECRET_TOKEN

External Components for Django Microservices

1. API Gateway with Kong

# docker-compose.yml
version: '3.8'
services:
  kong:
    image: kong:latest
    environment:
      KONG_DATABASE: "off"
      KONG_DECLARATIVE_CONFIG: /kong/declarative/kong.yml
      KONG_PROXY_ACCESS_LOG: /dev/stdout
      KONG_ADMIN_ACCESS_LOG: /dev/stdout
      KONG_PROXY_ERROR_LOG: /dev/stderr
      KONG_ADMIN_ERROR_LOG: /dev/stderr
      KONG_ADMIN_LISTEN: 0.0.0.0:8001
    volumes:
      - ./kong.yml:/kong/declarative/kong.yml
    ports:
      - "8000:8000"
      - "8001:8001"
# kong.yml
_format_version: "2.1"
_transform: true

services:
  - name: user-service
    url: http://user-service:8000
    routes:
      - name: user-route
        paths:
          - /api/users
    plugins:
      - name: rate-limiting
        config:
          minute: 100
          hour: 1000

  - name: product-service
    url: http://product-service:8000
    routes:
      - name: product-route
        paths:
          - /api/products
    plugins:
      - name: cors
        config:
          origins:
            - "*"

2. Service Discovery with Consul

# service_discovery.py
import consul
import requests
from django.conf import settings

class ConsulServiceDiscovery:
    def __init__(self):
        self.consul = consul.Consul(host=settings.CONSUL_HOST, port=settings.CONSUL_PORT)
    
    def register_service(self, name, address, port, health_check_url):
        """Register service with Consul"""
        self.consul.agent.service.register(
            name=name,
            service_id=f"{name}-{address}-{port}",
            address=address,
            port=port,
            check=consul.Check.http(health_check_url, interval="10s")
        )
    
    def discover_service(self, service_name):
        """Discover service instances"""
        _, services = self.consul.health.service(service_name, passing=True)
        if services:
            service = services[0]['Service']
            return f"http://{service['Address']}:{service['Port']}"
        return None
    
    def get_all_services(self):
        """Get all healthy service instances"""
        services = {}
        _, catalog = self.consul.catalog.services()
        
        for service_name in catalog:
            _, instances = self.consul.health.service(service_name, passing=True)
            services[service_name] = [
                f"http://{instance['Service']['Address']}:{instance['Service']['Port']}"
                for instance in instances
            ]
        
        return services

# Django app integration
from django.apps import AppConfig

class UserServiceConfig(AppConfig):
    default_auto_field = 'django.db.models.BigAutoField'
    name = 'user_service'
    
    def ready(self):
        # Register service on startup
        discovery = ConsulServiceDiscovery()
        discovery.register_service(
            name='user-service',
            address='localhost',
            port=8000,
            health_check_url='http://localhost:8000/health/'
        )

3. Message Queue with RabbitMQ

# messaging.py
import pika
import json
import logging
from django.conf import settings

logger = logging.getLogger(__name__)

class MessagePublisher:
    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(
                host=settings.RABBITMQ_HOST,
                port=settings.RABBITMQ_PORT,
                credentials=pika.PlainCredentials(
                    settings.RABBITMQ_USER,
                    settings.RABBITMQ_PASSWORD
                )
            )
        )
        self.channel = self.connection.channel()
    
    def publish_event(self, exchange, routing_key, message):
        """Publish event to message queue"""
        self.channel.exchange_declare(exchange=exchange, exchange_type='topic')
        
        self.channel.basic_publish(
            exchange=exchange,
            routing_key=routing_key,
            body=json.dumps(message),
            properties=pika.BasicProperties(
                delivery_mode=2,  # Make message persistent
                content_type='application/json'
            )
        )
        
        logger.info(f"Published event: {routing_key} to {exchange}")
    
    def close(self):
        self.connection.close()

class MessageConsumer:
    def __init__(self, queue_name, callback):
        self.queue_name = queue_name
        self.callback = callback
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=settings.RABBITMQ_HOST)
        )
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue=queue_name, durable=True)
    
    def start_consuming(self):
        """Start consuming messages"""
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(
            queue=self.queue_name,
            on_message_callback=self.callback,
            auto_ack=False
        )
        
        logger.info(f"Started consuming from {self.queue_name}")
        self.channel.start_consuming()

# Event handlers
def handle_user_created(ch, method, properties, body):
    """Handle user created event"""
    try:
        data = json.loads(body)
        user_id = data['user_id']
        
        # Process the event (e.g., send welcome email)
        logger.info(f"Processing user created event for user {user_id}")
        
        # Acknowledge message
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        logger.error(f"Error processing user created event: {e}")
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

4. Distributed Caching with Redis

# caching.py
import redis
import json
import pickle
from django.conf import settings
from django.core.cache import cache

class DistributedCache:
    def __init__(self):
        self.redis_client = redis.Redis(
            host=settings.REDIS_HOST,
            port=settings.REDIS_PORT,
            db=settings.REDIS_DB,
            decode_responses=True
        )
    
    def set_json(self, key, value, timeout=300):
        """Set JSON serializable value"""
        self.redis_client.setex(key, timeout, json.dumps(value))
    
    def get_json(self, key):
        """Get JSON value"""
        value = self.redis_client.get(key)
        return json.loads(value) if value else None
    
    def set_object(self, key, obj, timeout=300):
        """Set Python object using pickle"""
        self.redis_client.setex(key, timeout, pickle.dumps(obj))
    
    def get_object(self, key):
        """Get Python object"""
        value = self.redis_client.get(key)
        return pickle.loads(value) if value else None
    
    def invalidate_pattern(self, pattern):
        """Invalidate keys matching pattern"""
        keys = self.redis_client.keys(pattern)
        if keys:
            self.redis_client.delete(*keys)

# Cache decorators
from functools import wraps

def cache_result(timeout=300, key_prefix=''):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Generate cache key
            cache_key = f"{key_prefix}:{func.__name__}:{hash(str(args) + str(kwargs))}"
            
            # Try to get from cache
            result = cache.get(cache_key)
            if result is not None:
                return result
            
            # Execute function and cache result
            result = func(*args, **kwargs)
            cache.set(cache_key, result, timeout)
            return result
        return wrapper
    return decorator

# Usage example
@cache_result(timeout=600, key_prefix='user')
def get_user_profile(user_id):
    """Get user profile with caching"""
    # This would typically make a database query
    return User.objects.get(id=user_id)

Creating a Sample Microservice

Let's create a complete user management microservice:

1. Project Structure

user-service/
├── user_service/
│   ├── __init__.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
├── users/
│   ├── __init__.py
│   ├── models.py
│   ├── serializers.py
│   ├── views.py
│   ├── urls.py
│   └── apps.py
├── requirements.txt
├── Dockerfile
├── docker-compose.yml
└── manage.py

2. Models

# users/models.py
from django.contrib.auth.models import AbstractUser
from django.db import models
import uuid

class User(AbstractUser):
    id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
    email = models.EmailField(unique=True)
    phone_number = models.CharField(max_length=20, blank=True)
    date_of_birth = models.DateField(null=True, blank=True)
    is_verified = models.BooleanField(default=False)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    
    USERNAME_FIELD = 'email'
    REQUIRED_FIELDS = ['username']

class UserProfile(models.Model):
    user = models.OneToOneField(User, on_delete=models.CASCADE, related_name='profile')
    bio = models.TextField(max_length=500, blank=True)
    location = models.CharField(max_length=100, blank=True)
    website = models.URLField(blank=True)
    avatar = models.URLField(blank=True)
    preferences = models.JSONField(default=dict)

3. API Views

# users/views.py
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated, AllowAny
from django.contrib.auth import authenticate
from .models import User, UserProfile
from .serializers import UserSerializer, UserProfileSerializer, UserRegistrationSerializer

class UserViewSet(viewsets.ModelViewSet):
    queryset = User.objects.all()
    serializer_class = UserSerializer
    
    def get_permissions(self):
        if self.action == 'create':
            permission_classes = [AllowAny]
        else:
            permission_classes = [IsAuthenticated]
        return [permission() for permission in permission_classes]
    
    def create(self, request):
        """Register new user"""
        serializer = UserRegistrationSerializer(data=request.data)
        if serializer.is_valid():
            user = serializer.save()
            
            # Publish user created event
            from .messaging import publish_user_event
            publish_user_event('user.created', {
                'user_id': str(user.id),
                'email': user.email,
                'username': user.username
            })
            
            return Response(UserSerializer(user).data, status=status.HTTP_201_CREATED)
        return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
    
    @action(detail=False, methods=['post'])
    def authenticate(self, request):
        """Authenticate user"""
        email = request.data.get('email')
        password = request.data.get('password')
        
        user = authenticate(username=email, password=password)
        if user:
            from rest_framework.authtoken.models import Token
            token, created = Token.objects.get_or_create(user=user)
            return Response({
                'token': token.key,
                'user': UserSerializer(user).data
            })
        
        return Response({'error': 'Invalid credentials'}, 
                       status=status.HTTP_401_UNAUTHORIZED)
    
    @action(detail=True, methods=['get', 'put'])
    def profile(self, request, pk=None):
        """Get or update user profile"""
        user = self.get_object()
        profile, created = UserProfile.objects.get_or_create(user=user)
        
        if request.method == 'GET':
            serializer = UserProfileSerializer(profile)
            return Response(serializer.data)
        
        elif request.method == 'PUT':
            serializer = UserProfileSerializer(profile, data=request.data, partial=True)
            if serializer.is_valid():
                serializer.save()
                return Response(serializer.data)
            return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

4. Docker Configuration

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    postgresql-client \
    && rm -rf /var/lib/apt/lists/*

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Expose port
EXPOSE 8000

# Run migrations and start server
CMD ["sh", "-c", "python manage.py migrate && python manage.py runserver 0.0.0.0:8000"]
# docker-compose.yml
version: '3.8'

services:
  user-service:
    build: .
    ports:
      - "8000:8000"
    environment:
      - DEBUG=1
      - DB_HOST=postgres
      - DB_NAME=user_service_db
      - DB_USER=postgres
      - DB_PASSWORD=password
      - REDIS_HOST=redis
      - RABBITMQ_HOST=rabbitmq
    depends_on:
      - postgres
      - redis
      - rabbitmq
    volumes:
      - .:/app

  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: user_service_db
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: password
    volumes:
      - postgres_data:/var/lib/postgresql/data
    ports:
      - "5432:5432"

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

  rabbitmq:
    image: rabbitmq:3-management
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: password
    ports:
      - "5672:5672"
      - "15672:15672"

volumes:
  postgres_data:

5. Requirements

# requirements.txt
Django==4.2.7
djangorestframework==3.14.0
django-cors-headers==4.3.1
psycopg2-binary==2.9.7
redis==5.0.1
pika==1.3.2
python-consul==1.1.0
celery==5.3.4
gunicorn==21.2.0
python-decouple==3.8

Service Communication Example

# inter_service_client.py
import requests
from django.conf import settings
from .service_discovery import ConsulServiceDiscovery

class ServiceClient:
    def __init__(self):
        self.discovery = ConsulServiceDiscovery()
    
    def call_service(self, service_name, endpoint, method='GET', data=None, headers=None):
        """Make inter-service API call"""
        service_url = self.discovery.discover_service(service_name)
        if not service_url:
            raise Exception(f"Service {service_name} not found")
        
        url = f"{service_url}{endpoint}"
        
        # Add service authentication headers
        if headers is None:
            headers = {}
        headers['X-Service-Token'] = settings.SERVICE_SECRET_TOKEN
        
        response = requests.request(method, url, json=data, headers=headers)
        response.raise_for_status()
        return response.json()

# Usage in views
class OrderViewSet(viewsets.ModelViewSet):
    def create(self, request):
        """Create order with user validation"""
        user_id = request.data.get('user_id')
        
        # Validate user exists via user service
        client = ServiceClient()
        try:
            user_data = client.call_service('user-service', f'/api/users/{user_id}/')
        except requests.RequestException:
            return Response({'error': 'User not found'}, 
                          status=status.HTTP_400_BAD_REQUEST)
        
        # Create order
        order_data = request.data.copy()
        order_data['user_email'] = user_data['email']
        
        serializer = self.get_serializer(data=order_data)
        if serializer.is_valid():
            order = serializer.save()
            return Response(serializer.data, status=status.HTTP_201_CREATED)
        
        return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

Summary

Django provides excellent foundation for microservices with its REST framework, ORM, and middleware system. Combined with external tools like service discovery, message queues, and API gateways, you can build robust microservices architecture.

Key components covered:

  • Django REST Framework for API development
  • Service discovery with Consul
  • Message queuing with RabbitMQ
  • Distributed caching with Redis
  • API Gateway with Kong
  • Inter-service communication patterns

In the next section, we'll set up the complete development and runtime environment for Django microservices.