Django, traditionally known for building monolithic web applications, can be effectively adapted for microservices architecture. In this section, we'll explore Django's native components that support microservices development and the external tools that complement Django in a distributed environment.
Before diving into Django microservices, ensure you have:
DRF is essential for building APIs in microservices:
# settings.py
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'rest_framework',
'rest_framework.authtoken',
'corsheaders',
]
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework.authentication.TokenAuthentication',
'rest_framework.authentication.SessionAuthentication',
],
'DEFAULT_PERMISSION_CLASSES': [
'rest_framework.permissions.IsAuthenticated',
],
'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.PageNumberPagination',
'PAGE_SIZE': 20,
'DEFAULT_RENDERER_CLASSES': [
'rest_framework.renderers.JSONRenderer',
],
'DEFAULT_THROTTLE_CLASSES': [
'rest_framework.throttling.AnonRateThrottle',
'rest_framework.throttling.UserRateThrottle'
],
'DEFAULT_THROTTLE_RATES': {
'anon': '100/hour',
'user': '1000/hour'
}
}
# serializers.py
from rest_framework import serializers
from .models import User, Product, Order
class UserSerializer(serializers.ModelSerializer):
class Meta:
model = User
fields = ['id', 'username', 'email', 'first_name', 'last_name', 'date_joined']
read_only_fields = ['id', 'date_joined']
class ProductSerializer(serializers.ModelSerializer):
class Meta:
model = Product
fields = '__all__'
def validate_price(self, value):
if value <= 0:
raise serializers.ValidationError("Price must be positive")
return value
class OrderSerializer(serializers.ModelSerializer):
user = UserSerializer(read_only=True)
products = ProductSerializer(many=True, read_only=True)
total_amount = serializers.DecimalField(max_digits=10, decimal_places=2, read_only=True)
class Meta:
model = Order
fields = ['id', 'user', 'products', 'status', 'total_amount', 'created_at']
# views.py
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from django.db import transaction
from .models import User, Product, Order
from .serializers import UserSerializer, ProductSerializer, OrderSerializer
class UserViewSet(viewsets.ModelViewSet):
queryset = User.objects.all()
serializer_class = UserSerializer
@action(detail=True, methods=['get'])
def orders(self, request, pk=None):
"""Get user's orders"""
user = self.get_object()
orders = Order.objects.filter(user=user)
serializer = OrderSerializer(orders, many=True)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def change_password(self, request, pk=None):
"""Change user password"""
user = self.get_object()
old_password = request.data.get('old_password')
new_password = request.data.get('new_password')
if not user.check_password(old_password):
return Response({'error': 'Invalid old password'},
status=status.HTTP_400_BAD_REQUEST)
user.set_password(new_password)
user.save()
return Response({'message': 'Password changed successfully'})
class ProductViewSet(viewsets.ModelViewSet):
queryset = Product.objects.all()
serializer_class = ProductSerializer
@action(detail=False, methods=['get'])
def featured(self, request):
"""Get featured products"""
featured_products = Product.objects.filter(is_featured=True)
serializer = self.get_serializer(featured_products, many=True)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def update_inventory(self, request, pk=None):
"""Update product inventory"""
product = self.get_object()
quantity = request.data.get('quantity', 0)
with transaction.atomic():
product.inventory_count += quantity
product.save()
return Response({'inventory_count': product.inventory_count})
# urls.py
from django.urls import path, include
from rest_framework.routers import DefaultRouter
from . import views
router = DefaultRouter()
router.register(r'users', views.UserViewSet)
router.register(r'products', views.ProductViewSet)
router.register(r'orders', views.OrderViewSet)
urlpatterns = [
path('api/v1/', include(router.urls)),
path('api/auth/', include('rest_framework.urls')),
path('health/', views.health_check, name='health_check'),
]
# Health check endpoint
from django.http import JsonResponse
from django.db import connection
def health_check(request):
"""Health check endpoint for service discovery"""
try:
# Check database connection
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
return JsonResponse({
'status': 'healthy',
'service': 'user-service',
'version': '1.0.0',
'timestamp': timezone.now().isoformat()
})
except Exception as e:
return JsonResponse({
'status': 'unhealthy',
'error': str(e)
}, status=500)
# settings.py - Service-specific database configuration
import os
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': os.getenv('DB_NAME', 'user_service_db'),
'USER': os.getenv('DB_USER', 'postgres'),
'PASSWORD': os.getenv('DB_PASSWORD', 'password'),
'HOST': os.getenv('DB_HOST', 'localhost'),
'PORT': os.getenv('DB_PORT', '5432'),
'OPTIONS': {
'MAX_CONNS': 20,
'CONN_MAX_AGE': 600,
}
}
}
# Connection pooling for better performance
DATABASE_POOL_ARGS = {
'max_overflow': 10,
'pool_pre_ping': True,
'pool_recycle': 300,
}
# middleware.py
import uuid
import time
import logging
from django.utils.deprecation import MiddlewareMixin
logger = logging.getLogger(__name__)
class RequestTrackingMiddleware(MiddlewareMixin):
"""Track requests across microservices"""
def process_request(self, request):
# Generate or extract correlation ID
correlation_id = request.META.get('HTTP_X_CORRELATION_ID', str(uuid.uuid4()))
request.correlation_id = correlation_id
request.start_time = time.time()
logger.info(f"Request started: {request.method} {request.path} "
f"[{correlation_id}]")
def process_response(self, request, response):
if hasattr(request, 'correlation_id'):
response['X-Correlation-ID'] = request.correlation_id
# Log request completion
duration = time.time() - request.start_time
logger.info(f"Request completed: {request.method} {request.path} "
f"[{request.correlation_id}] - {response.status_code} "
f"({duration:.3f}s)")
return response
class ServiceAuthMiddleware(MiddlewareMixin):
"""Authenticate inter-service communication"""
def process_request(self, request):
# Check for service-to-service authentication
service_token = request.META.get('HTTP_X_SERVICE_TOKEN')
if service_token:
# Validate service token
if self.validate_service_token(service_token):
request.is_service_request = True
else:
from django.http import JsonResponse
return JsonResponse({'error': 'Invalid service token'}, status=401)
def validate_service_token(self, token):
# Implement your service token validation logic
from django.conf import settings
return token == settings.SERVICE_SECRET_TOKEN
# docker-compose.yml
version: '3.8'
services:
kong:
image: kong:latest
environment:
KONG_DATABASE: "off"
KONG_DECLARATIVE_CONFIG: /kong/declarative/kong.yml
KONG_PROXY_ACCESS_LOG: /dev/stdout
KONG_ADMIN_ACCESS_LOG: /dev/stdout
KONG_PROXY_ERROR_LOG: /dev/stderr
KONG_ADMIN_ERROR_LOG: /dev/stderr
KONG_ADMIN_LISTEN: 0.0.0.0:8001
volumes:
- ./kong.yml:/kong/declarative/kong.yml
ports:
- "8000:8000"
- "8001:8001"
# kong.yml
_format_version: "2.1"
_transform: true
services:
- name: user-service
url: http://user-service:8000
routes:
- name: user-route
paths:
- /api/users
plugins:
- name: rate-limiting
config:
minute: 100
hour: 1000
- name: product-service
url: http://product-service:8000
routes:
- name: product-route
paths:
- /api/products
plugins:
- name: cors
config:
origins:
- "*"
# service_discovery.py
import consul
import requests
from django.conf import settings
class ConsulServiceDiscovery:
def __init__(self):
self.consul = consul.Consul(host=settings.CONSUL_HOST, port=settings.CONSUL_PORT)
def register_service(self, name, address, port, health_check_url):
"""Register service with Consul"""
self.consul.agent.service.register(
name=name,
service_id=f"{name}-{address}-{port}",
address=address,
port=port,
check=consul.Check.http(health_check_url, interval="10s")
)
def discover_service(self, service_name):
"""Discover service instances"""
_, services = self.consul.health.service(service_name, passing=True)
if services:
service = services[0]['Service']
return f"http://{service['Address']}:{service['Port']}"
return None
def get_all_services(self):
"""Get all healthy service instances"""
services = {}
_, catalog = self.consul.catalog.services()
for service_name in catalog:
_, instances = self.consul.health.service(service_name, passing=True)
services[service_name] = [
f"http://{instance['Service']['Address']}:{instance['Service']['Port']}"
for instance in instances
]
return services
# Django app integration
from django.apps import AppConfig
class UserServiceConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'user_service'
def ready(self):
# Register service on startup
discovery = ConsulServiceDiscovery()
discovery.register_service(
name='user-service',
address='localhost',
port=8000,
health_check_url='http://localhost:8000/health/'
)
# messaging.py
import pika
import json
import logging
from django.conf import settings
logger = logging.getLogger(__name__)
class MessagePublisher:
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=settings.RABBITMQ_HOST,
port=settings.RABBITMQ_PORT,
credentials=pika.PlainCredentials(
settings.RABBITMQ_USER,
settings.RABBITMQ_PASSWORD
)
)
)
self.channel = self.connection.channel()
def publish_event(self, exchange, routing_key, message):
"""Publish event to message queue"""
self.channel.exchange_declare(exchange=exchange, exchange_type='topic')
self.channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # Make message persistent
content_type='application/json'
)
)
logger.info(f"Published event: {routing_key} to {exchange}")
def close(self):
self.connection.close()
class MessageConsumer:
def __init__(self, queue_name, callback):
self.queue_name = queue_name
self.callback = callback
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=settings.RABBITMQ_HOST)
)
self.channel = self.connection.channel()
self.channel.queue_declare(queue=queue_name, durable=True)
def start_consuming(self):
"""Start consuming messages"""
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(
queue=self.queue_name,
on_message_callback=self.callback,
auto_ack=False
)
logger.info(f"Started consuming from {self.queue_name}")
self.channel.start_consuming()
# Event handlers
def handle_user_created(ch, method, properties, body):
"""Handle user created event"""
try:
data = json.loads(body)
user_id = data['user_id']
# Process the event (e.g., send welcome email)
logger.info(f"Processing user created event for user {user_id}")
# Acknowledge message
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
logger.error(f"Error processing user created event: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
# caching.py
import redis
import json
import pickle
from django.conf import settings
from django.core.cache import cache
class DistributedCache:
def __init__(self):
self.redis_client = redis.Redis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DB,
decode_responses=True
)
def set_json(self, key, value, timeout=300):
"""Set JSON serializable value"""
self.redis_client.setex(key, timeout, json.dumps(value))
def get_json(self, key):
"""Get JSON value"""
value = self.redis_client.get(key)
return json.loads(value) if value else None
def set_object(self, key, obj, timeout=300):
"""Set Python object using pickle"""
self.redis_client.setex(key, timeout, pickle.dumps(obj))
def get_object(self, key):
"""Get Python object"""
value = self.redis_client.get(key)
return pickle.loads(value) if value else None
def invalidate_pattern(self, pattern):
"""Invalidate keys matching pattern"""
keys = self.redis_client.keys(pattern)
if keys:
self.redis_client.delete(*keys)
# Cache decorators
from functools import wraps
def cache_result(timeout=300, key_prefix=''):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Generate cache key
cache_key = f"{key_prefix}:{func.__name__}:{hash(str(args) + str(kwargs))}"
# Try to get from cache
result = cache.get(cache_key)
if result is not None:
return result
# Execute function and cache result
result = func(*args, **kwargs)
cache.set(cache_key, result, timeout)
return result
return wrapper
return decorator
# Usage example
@cache_result(timeout=600, key_prefix='user')
def get_user_profile(user_id):
"""Get user profile with caching"""
# This would typically make a database query
return User.objects.get(id=user_id)
Let's create a complete user management microservice:
user-service/
├── user_service/
│ ├── __init__.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
├── users/
│ ├── __init__.py
│ ├── models.py
│ ├── serializers.py
│ ├── views.py
│ ├── urls.py
│ └── apps.py
├── requirements.txt
├── Dockerfile
├── docker-compose.yml
└── manage.py
# users/models.py
from django.contrib.auth.models import AbstractUser
from django.db import models
import uuid
class User(AbstractUser):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
email = models.EmailField(unique=True)
phone_number = models.CharField(max_length=20, blank=True)
date_of_birth = models.DateField(null=True, blank=True)
is_verified = models.BooleanField(default=False)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
USERNAME_FIELD = 'email'
REQUIRED_FIELDS = ['username']
class UserProfile(models.Model):
user = models.OneToOneField(User, on_delete=models.CASCADE, related_name='profile')
bio = models.TextField(max_length=500, blank=True)
location = models.CharField(max_length=100, blank=True)
website = models.URLField(blank=True)
avatar = models.URLField(blank=True)
preferences = models.JSONField(default=dict)
# users/views.py
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated, AllowAny
from django.contrib.auth import authenticate
from .models import User, UserProfile
from .serializers import UserSerializer, UserProfileSerializer, UserRegistrationSerializer
class UserViewSet(viewsets.ModelViewSet):
queryset = User.objects.all()
serializer_class = UserSerializer
def get_permissions(self):
if self.action == 'create':
permission_classes = [AllowAny]
else:
permission_classes = [IsAuthenticated]
return [permission() for permission in permission_classes]
def create(self, request):
"""Register new user"""
serializer = UserRegistrationSerializer(data=request.data)
if serializer.is_valid():
user = serializer.save()
# Publish user created event
from .messaging import publish_user_event
publish_user_event('user.created', {
'user_id': str(user.id),
'email': user.email,
'username': user.username
})
return Response(UserSerializer(user).data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@action(detail=False, methods=['post'])
def authenticate(self, request):
"""Authenticate user"""
email = request.data.get('email')
password = request.data.get('password')
user = authenticate(username=email, password=password)
if user:
from rest_framework.authtoken.models import Token
token, created = Token.objects.get_or_create(user=user)
return Response({
'token': token.key,
'user': UserSerializer(user).data
})
return Response({'error': 'Invalid credentials'},
status=status.HTTP_401_UNAUTHORIZED)
@action(detail=True, methods=['get', 'put'])
def profile(self, request, pk=None):
"""Get or update user profile"""
user = self.get_object()
profile, created = UserProfile.objects.get_or_create(user=user)
if request.method == 'GET':
serializer = UserProfileSerializer(profile)
return Response(serializer.data)
elif request.method == 'PUT':
serializer = UserProfileSerializer(profile, data=request.data, partial=True)
if serializer.is_valid():
serializer.save()
return Response(serializer.data)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
postgresql-client \
&& rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Expose port
EXPOSE 8000
# Run migrations and start server
CMD ["sh", "-c", "python manage.py migrate && python manage.py runserver 0.0.0.0:8000"]
# docker-compose.yml
version: '3.8'
services:
user-service:
build: .
ports:
- "8000:8000"
environment:
- DEBUG=1
- DB_HOST=postgres
- DB_NAME=user_service_db
- DB_USER=postgres
- DB_PASSWORD=password
- REDIS_HOST=redis
- RABBITMQ_HOST=rabbitmq
depends_on:
- postgres
- redis
- rabbitmq
volumes:
- .:/app
postgres:
image: postgres:15
environment:
POSTGRES_DB: user_service_db
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
redis:
image: redis:7-alpine
ports:
- "6379:6379"
rabbitmq:
image: rabbitmq:3-management
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: password
ports:
- "5672:5672"
- "15672:15672"
volumes:
postgres_data:
# requirements.txt
Django==4.2.7
djangorestframework==3.14.0
django-cors-headers==4.3.1
psycopg2-binary==2.9.7
redis==5.0.1
pika==1.3.2
python-consul==1.1.0
celery==5.3.4
gunicorn==21.2.0
python-decouple==3.8
# inter_service_client.py
import requests
from django.conf import settings
from .service_discovery import ConsulServiceDiscovery
class ServiceClient:
def __init__(self):
self.discovery = ConsulServiceDiscovery()
def call_service(self, service_name, endpoint, method='GET', data=None, headers=None):
"""Make inter-service API call"""
service_url = self.discovery.discover_service(service_name)
if not service_url:
raise Exception(f"Service {service_name} not found")
url = f"{service_url}{endpoint}"
# Add service authentication headers
if headers is None:
headers = {}
headers['X-Service-Token'] = settings.SERVICE_SECRET_TOKEN
response = requests.request(method, url, json=data, headers=headers)
response.raise_for_status()
return response.json()
# Usage in views
class OrderViewSet(viewsets.ModelViewSet):
def create(self, request):
"""Create order with user validation"""
user_id = request.data.get('user_id')
# Validate user exists via user service
client = ServiceClient()
try:
user_data = client.call_service('user-service', f'/api/users/{user_id}/')
except requests.RequestException:
return Response({'error': 'User not found'},
status=status.HTTP_400_BAD_REQUEST)
# Create order
order_data = request.data.copy()
order_data['user_email'] = user_data['email']
serializer = self.get_serializer(data=order_data)
if serializer.is_valid():
order = serializer.save()
return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
Django provides excellent foundation for microservices with its REST framework, ORM, and middleware system. Combined with external tools like service discovery, message queues, and API gateways, you can build robust microservices architecture.
Key components covered:
In the next section, we'll set up the complete development and runtime environment for Django microservices.
What Is a Microservice?
Microservices architecture has revolutionized how we build and deploy modern applications. In this section, we'll explore what microservices are, how they compare to monolithic applications, and why they've become so popular in today's development landscape.
Setting Up the Development and Runtime Environment
Setting up a proper development and runtime environment is crucial for successful microservices development with Django. This section covers everything from local development setup to production-ready deployment configurations.