System architecture patterns provide proven solutions for organizing complex Django applications. These patterns help create maintainable, scalable, and testable systems by defining clear boundaries between components and establishing consistent communication patterns. This comprehensive guide covers the most important architectural patterns for Django applications.
Architecture patterns are high-level design templates that provide structure for organizing application components. They define how different parts of your system interact, where business logic resides, and how to manage dependencies between components.
Maintainability
Scalability
Testability
Flexibility
The layered architecture pattern organizes code into horizontal layers, each with specific responsibilities. This is the most common pattern in Django applications.
# Presentation Layer (Views)
from django.http import JsonResponse
from django.views import View
from .services import ProductService
from .serializers import ProductSerializer
class ProductView(View):
"""Presentation layer - handles HTTP concerns"""
def __init__(self):
self.product_service = ProductService()
self.serializer = ProductSerializer()
def get(self, request, product_id):
"""Handle GET request for product details"""
try:
product = self.product_service.get_product(product_id)
data = self.serializer.serialize(product)
return JsonResponse(data)
except ProductNotFound:
return JsonResponse({'error': 'Product not found'}, status=404)
def post(self, request):
"""Handle POST request for product creation"""
try:
data = self.serializer.deserialize(request.body)
product = self.product_service.create_product(data)
response_data = self.serializer.serialize(product)
return JsonResponse(response_data, status=201)
except ValidationError as e:
return JsonResponse({'errors': e.messages}, status=400)
# Business Logic Layer (Services)
from .models import Product, Category
from .repositories import ProductRepository
from django.db import transaction
class ProductService:
"""Business logic layer - contains use cases and business rules"""
def __init__(self):
self.product_repo = ProductRepository()
def get_product(self, product_id: int) -> Product:
"""Get product with business logic"""
product = self.product_repo.get_by_id(product_id)
if not product:
raise ProductNotFound(f"Product {product_id} not found")
# Apply business rules
if not product.is_active:
raise ProductNotAvailable("Product is not available")
return product
@transaction.atomic
def create_product(self, data: dict) -> Product:
"""Create product with business validation"""
# Validate business rules
if self.product_repo.exists_by_name(data['name']):
raise BusinessRuleError("Product name must be unique")
category = Category.objects.get(id=data['category_id'])
if not category.allows_products:
raise BusinessRuleError("Category does not allow products")
# Create product
product = Product(
name=data['name'],
description=data['description'],
price=data['price'],
category=category
)
# Apply business logic
product.calculate_pricing()
product.set_initial_status()
return self.product_repo.save(product)
def update_product_price(self, product_id: int, new_price: Decimal) -> Product:
"""Update product price with business rules"""
product = self.get_product(product_id)
# Business rule: price changes require approval for expensive items
if product.price > 1000 and new_price != product.price:
product.requires_approval = True
# Business rule: cannot decrease price by more than 50%
if new_price < product.price * Decimal('0.5'):
raise BusinessRuleError("Cannot decrease price by more than 50%")
product.price = new_price
product.price_updated_at = timezone.now()
return self.product_repo.save(product)
# Data Access Layer (Repository Pattern)
from typing import Optional, List
from django.db.models import Q
class ProductRepository:
"""Data access layer - abstracts database operations"""
def get_by_id(self, product_id: int) -> Optional[Product]:
"""Get product by ID"""
try:
return Product.objects.select_related('category').get(id=product_id)
except Product.DoesNotExist:
return None
def get_by_name(self, name: str) -> Optional[Product]:
"""Get product by name"""
try:
return Product.objects.get(name=name)
except Product.DoesNotExist:
return None
def exists_by_name(self, name: str) -> bool:
"""Check if product exists by name"""
return Product.objects.filter(name=name).exists()
def find_by_category(self, category_id: int) -> List[Product]:
"""Find products by category"""
return list(
Product.objects.filter(category_id=category_id, is_active=True)
.select_related('category')
.order_by('name')
)
def search(self, query: str) -> List[Product]:
"""Search products by name or description"""
return list(
Product.objects.filter(
Q(name__icontains=query) | Q(description__icontains=query),
is_active=True
).select_related('category')
)
def save(self, product: Product) -> Product:
"""Save product to database"""
product.save()
return product
def delete(self, product: Product) -> None:
"""Delete product from database"""
product.delete()
# Domain Layer (Models with Business Logic)
from django.db import models
from decimal import Decimal
class Product(models.Model):
"""Product model with business logic"""
name = models.CharField(max_length=200, unique=True)
description = models.TextField()
price = models.DecimalField(max_digits=10, decimal_places=2)
category = models.ForeignKey('Category', on_delete=models.CASCADE)
is_active = models.BooleanField(default=True)
requires_approval = models.BooleanField(default=False)
created_at = models.DateTimeField(auto_now_add=True)
price_updated_at = models.DateTimeField(auto_now=True)
def calculate_pricing(self):
"""Business logic for pricing calculation"""
# Apply category-based pricing rules
if self.category.premium_category:
self.price *= Decimal('1.2') # 20% premium
# Apply volume-based pricing
if self.price > 100:
self.price = self.price.quantize(Decimal('0.99')) # Psychological pricing
def set_initial_status(self):
"""Business logic for initial status"""
# New products start as inactive for review
self.is_active = False
# Expensive products require approval
if self.price > 1000:
self.requires_approval = True
def can_be_deleted(self) -> bool:
"""Business rule for deletion"""
# Cannot delete products with orders
return not self.orderitem_set.exists()
def apply_discount(self, percentage: Decimal) -> Decimal:
"""Business logic for discount application"""
if percentage < 0 or percentage > 50:
raise ValueError("Discount must be between 0% and 50%")
discount_amount = self.price * (percentage / 100)
return self.price - discount_amount
class Meta:
indexes = [
models.Index(fields=['category', 'is_active']),
models.Index(fields=['name']),
]
Hexagonal architecture, also known as Ports and Adapters, isolates the core business logic from external concerns. This pattern is excellent for complex Django applications with multiple integrations.
# Domain Layer - Pure business logic
from abc import ABC, abstractmethod
from typing import List, Optional
from dataclasses import dataclass
from decimal import Decimal
@dataclass
class OrderItem:
"""Value object for order items"""
product_id: int
quantity: int
price: Decimal
def total(self) -> Decimal:
return self.quantity * self.price
class Order:
"""Rich domain model with business logic"""
def __init__(self, customer_id: int):
self.customer_id = customer_id
self.items: List[OrderItem] = []
self.status = 'draft'
self.total = Decimal('0.00')
self.created_at = None
self.id = None
def add_item(self, product_id: int, quantity: int, price: Decimal):
"""Add item with business validation"""
if quantity <= 0:
raise ValueError("Quantity must be positive")
if price <= 0:
raise ValueError("Price must be positive")
# Check if item already exists
existing_item = self._find_item(product_id)
if existing_item:
existing_item.quantity += quantity
else:
self.items.append(OrderItem(product_id, quantity, price))
self._recalculate_total()
def remove_item(self, product_id: int):
"""Remove item from order"""
self.items = [item for item in self.items if item.product_id != product_id]
self._recalculate_total()
def confirm(self):
"""Confirm order with business rules"""
if not self.items:
raise ValueError("Cannot confirm empty order")
if self.status != 'draft':
raise ValueError("Can only confirm draft orders")
self.status = 'confirmed'
def cancel(self):
"""Cancel order with business rules"""
if self.status not in ['draft', 'confirmed']:
raise ValueError("Cannot cancel order in current status")
self.status = 'cancelled'
def _find_item(self, product_id: int) -> Optional[OrderItem]:
"""Find item by product ID"""
return next((item for item in self.items if item.product_id == product_id), None)
def _recalculate_total(self):
"""Recalculate order total"""
self.total = sum(item.total() for item in self.items)
# Ports (Interfaces) - Define contracts
class OrderRepository(ABC):
"""Port for order persistence"""
@abstractmethod
def save(self, order: Order) -> Order:
pass
@abstractmethod
def get_by_id(self, order_id: int) -> Optional[Order]:
pass
@abstractmethod
def get_by_customer(self, customer_id: int) -> List[Order]:
pass
class PaymentService(ABC):
"""Port for payment processing"""
@abstractmethod
def process_payment(self, order: Order, payment_method: str) -> bool:
pass
class InventoryService(ABC):
"""Port for inventory management"""
@abstractmethod
def reserve_items(self, items: List[OrderItem]) -> bool:
pass
@abstractmethod
def release_items(self, items: List[OrderItem]) -> bool:
pass
class NotificationService(ABC):
"""Port for notifications"""
@abstractmethod
def send_order_confirmation(self, order: Order) -> bool:
pass
# Application Service - Orchestrates use cases
class OrderService:
"""Application service using dependency inversion"""
def __init__(self,
order_repo: OrderRepository,
payment_service: PaymentService,
inventory_service: InventoryService,
notification_service: NotificationService):
self.order_repo = order_repo
self.payment_service = payment_service
self.inventory_service = inventory_service
self.notification_service = notification_service
def create_order(self, customer_id: int, items: List[dict]) -> Order:
"""Create new order use case"""
order = Order(customer_id)
# Add items to order
for item_data in items:
order.add_item(
product_id=item_data['product_id'],
quantity=item_data['quantity'],
price=item_data['price']
)
# Save order
return self.order_repo.save(order)
def place_order(self, order_id: int, payment_method: str) -> bool:
"""Place order use case"""
order = self.order_repo.get_by_id(order_id)
if not order:
raise ValueError("Order not found")
# Reserve inventory
if not self.inventory_service.reserve_items(order.items):
raise ValueError("Insufficient inventory")
try:
# Process payment
if not self.payment_service.process_payment(order, payment_method):
# Release inventory if payment fails
self.inventory_service.release_items(order.items)
raise ValueError("Payment processing failed")
# Confirm order
order.confirm()
self.order_repo.save(order)
# Send confirmation
self.notification_service.send_order_confirmation(order)
return True
except Exception:
# Release inventory on any failure
self.inventory_service.release_items(order.items)
raise
# Adapters - Implement the ports
from django.db import models, transaction
class DjangoOrderRepository(OrderRepository):
"""Django ORM adapter for order repository"""
def save(self, order: Order) -> Order:
"""Save order using Django ORM"""
with transaction.atomic():
if order.id:
# Update existing order
django_order = OrderModel.objects.get(id=order.id)
django_order.status = order.status
django_order.total = order.total
django_order.save()
# Update items
django_order.items.all().delete()
else:
# Create new order
django_order = OrderModel.objects.create(
customer_id=order.customer_id,
status=order.status,
total=order.total
)
order.id = django_order.id
order.created_at = django_order.created_at
# Save items
for item in order.items:
OrderItemModel.objects.create(
order=django_order,
product_id=item.product_id,
quantity=item.quantity,
price=item.price
)
return order
def get_by_id(self, order_id: int) -> Optional[Order]:
"""Get order by ID"""
try:
django_order = OrderModel.objects.prefetch_related('items').get(id=order_id)
return self._to_domain_object(django_order)
except OrderModel.DoesNotExist:
return None
def get_by_customer(self, customer_id: int) -> List[Order]:
"""Get orders by customer"""
django_orders = OrderModel.objects.filter(
customer_id=customer_id
).prefetch_related('items').order_by('-created_at')
return [self._to_domain_object(django_order) for django_order in django_orders]
def _to_domain_object(self, django_order) -> Order:
"""Convert Django model to domain object"""
order = Order(django_order.customer_id)
order.id = django_order.id
order.status = django_order.status
order.total = django_order.total
order.created_at = django_order.created_at
# Add items
for django_item in django_order.items.all():
order.items.append(OrderItem(
product_id=django_item.product_id,
quantity=django_item.quantity,
price=django_item.price
))
return order
class StripePaymentService(PaymentService):
"""Stripe adapter for payment processing"""
def __init__(self, api_key: str):
self.api_key = api_key
def process_payment(self, order: Order, payment_method: str) -> bool:
"""Process payment using Stripe"""
try:
# Stripe payment processing logic
import stripe
stripe.api_key = self.api_key
charge = stripe.Charge.create(
amount=int(order.total * 100), # Convert to cents
currency='usd',
source=payment_method,
description=f'Order {order.id}'
)
return charge.paid
except Exception as e:
logger.error(f"Payment processing failed: {e}")
return False
class RedisInventoryService(InventoryService):
"""Redis adapter for inventory management"""
def __init__(self, redis_client):
self.redis = redis_client
def reserve_items(self, items: List[OrderItem]) -> bool:
"""Reserve items in Redis"""
pipe = self.redis.pipeline()
try:
# Check availability first
for item in items:
available = self.redis.get(f"inventory:{item.product_id}")
if not available or int(available) < item.quantity:
return False
# Reserve items
for item in items:
pipe.decrby(f"inventory:{item.product_id}", item.quantity)
pipe.execute()
return True
except Exception as e:
logger.error(f"Inventory reservation failed: {e}")
return False
def release_items(self, items: List[OrderItem]) -> bool:
"""Release reserved items"""
pipe = self.redis.pipeline()
try:
for item in items:
pipe.incrby(f"inventory:{item.product_id}", item.quantity)
pipe.execute()
return True
except Exception as e:
logger.error(f"Inventory release failed: {e}")
return False
# Django Models (Infrastructure)
class OrderModel(models.Model):
"""Django model for order persistence"""
customer_id = models.IntegerField()
status = models.CharField(max_length=20, default='draft')
total = models.DecimalField(max_digits=10, decimal_places=2, default=0)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class OrderItemModel(models.Model):
"""Django model for order items"""
order = models.ForeignKey(OrderModel, related_name='items', on_delete=models.CASCADE)
product_id = models.IntegerField()
quantity = models.IntegerField()
price = models.DecimalField(max_digits=10, decimal_places=2)
# Dependency Injection Configuration
class DIContainer:
"""Simple dependency injection container"""
def __init__(self):
self._services = {}
def register(self, interface, implementation):
"""Register service implementation"""
self._services[interface] = implementation
def get(self, interface):
"""Get service implementation"""
return self._services.get(interface)
# Configuration
def configure_dependencies():
"""Configure dependency injection"""
container = DIContainer()
# Register implementations
container.register(OrderRepository, DjangoOrderRepository())
container.register(PaymentService, StripePaymentService(settings.STRIPE_API_KEY))
container.register(InventoryService, RedisInventoryService(redis_client))
container.register(NotificationService, EmailNotificationService())
return container
# Usage in views
def create_order_view(request):
"""View using hexagonal architecture"""
container = configure_dependencies()
order_service = OrderService(
order_repo=container.get(OrderRepository),
payment_service=container.get(PaymentService),
inventory_service=container.get(InventoryService),
notification_service=container.get(NotificationService)
)
# Extract data from request
customer_id = request.user.id
items = request.data.get('items', [])
# Create order
order = order_service.create_order(customer_id, items)
return JsonResponse({
'order_id': order.id,
'total': str(order.total),
'status': order.status
})
Clean Architecture takes the separation of concerns further by organizing code into concentric circles, with business logic at the center and external concerns at the outer layers.
# Entities (Innermost layer) - Enterprise business rules
class User:
"""User entity with business rules"""
def __init__(self, email: str, password: str):
self.email = email
self.password_hash = self._hash_password(password)
self.is_active = True
self.created_at = timezone.now()
self.last_login = None
def authenticate(self, password: str) -> bool:
"""Authenticate user with business rules"""
if not self.is_active:
return False
return self._verify_password(password, self.password_hash)
def change_password(self, old_password: str, new_password: str):
"""Change password with business validation"""
if not self.authenticate(old_password):
raise ValueError("Current password is incorrect")
if len(new_password) < 8:
raise ValueError("Password must be at least 8 characters")
self.password_hash = self._hash_password(new_password)
def deactivate(self):
"""Deactivate user account"""
self.is_active = False
def record_login(self):
"""Record successful login"""
self.last_login = timezone.now()
def _hash_password(self, password: str) -> str:
"""Hash password using secure algorithm"""
import bcrypt
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
def _verify_password(self, password: str, hash: str) -> bool:
"""Verify password against hash"""
import bcrypt
return bcrypt.checkpw(password.encode('utf-8'), hash.encode('utf-8'))
# Use Cases (Application layer) - Application business rules
from abc import ABC, abstractmethod
class UserRepository(ABC):
"""Abstract repository for user persistence"""
@abstractmethod
def save(self, user: User) -> User:
pass
@abstractmethod
def get_by_email(self, email: str) -> Optional[User]:
pass
@abstractmethod
def get_by_id(self, user_id: int) -> Optional[User]:
pass
class EmailService(ABC):
"""Abstract service for email operations"""
@abstractmethod
def send_welcome_email(self, user: User) -> bool:
pass
@abstractmethod
def send_password_reset_email(self, user: User, reset_token: str) -> bool:
pass
class RegisterUserUseCase:
"""Use case for user registration"""
def __init__(self, user_repo: UserRepository, email_service: EmailService):
self.user_repo = user_repo
self.email_service = email_service
def execute(self, email: str, password: str) -> User:
"""Execute user registration use case"""
# Validate input
if not email or '@' not in email:
raise ValueError("Invalid email address")
# Check if user already exists
existing_user = self.user_repo.get_by_email(email)
if existing_user:
raise ValueError("User already exists")
# Create user entity
user = User(email, password)
# Save user
saved_user = self.user_repo.save(user)
# Send welcome email
self.email_service.send_welcome_email(saved_user)
return saved_user
class AuthenticateUserUseCase:
"""Use case for user authentication"""
def __init__(self, user_repo: UserRepository):
self.user_repo = user_repo
def execute(self, email: str, password: str) -> Optional[User]:
"""Execute user authentication use case"""
user = self.user_repo.get_by_email(email)
if not user:
return None
if user.authenticate(password):
user.record_login()
self.user_repo.save(user)
return user
return None
# Interface Adapters (Controllers, Presenters, Gateways)
class UserController:
"""Controller for user-related HTTP requests"""
def __init__(self, register_use_case: RegisterUserUseCase,
auth_use_case: AuthenticateUserUseCase):
self.register_use_case = register_use_case
self.auth_use_case = auth_use_case
def register(self, request):
"""Handle user registration request"""
try:
email = request.data.get('email')
password = request.data.get('password')
user = self.register_use_case.execute(email, password)
return JsonResponse({
'id': user.id,
'email': user.email,
'created_at': user.created_at.isoformat()
}, status=201)
except ValueError as e:
return JsonResponse({'error': str(e)}, status=400)
def login(self, request):
"""Handle user login request"""
email = request.data.get('email')
password = request.data.get('password')
user = self.auth_use_case.execute(email, password)
if user:
# Create session or JWT token
request.session['user_id'] = user.id
return JsonResponse({
'id': user.id,
'email': user.email,
'last_login': user.last_login.isoformat() if user.last_login else None
})
else:
return JsonResponse({'error': 'Invalid credentials'}, status=401)
# Frameworks and Drivers (Outermost layer)
class DjangoUserRepository(UserRepository):
"""Django implementation of user repository"""
def save(self, user: User) -> User:
"""Save user using Django ORM"""
if hasattr(user, 'id') and user.id:
# Update existing user
django_user = UserModel.objects.get(id=user.id)
django_user.email = user.email
django_user.password_hash = user.password_hash
django_user.is_active = user.is_active
django_user.last_login = user.last_login
django_user.save()
else:
# Create new user
django_user = UserModel.objects.create(
email=user.email,
password_hash=user.password_hash,
is_active=user.is_active,
last_login=user.last_login
)
user.id = django_user.id
return user
def get_by_email(self, email: str) -> Optional[User]:
"""Get user by email"""
try:
django_user = UserModel.objects.get(email=email)
return self._to_entity(django_user)
except UserModel.DoesNotExist:
return None
def get_by_id(self, user_id: int) -> Optional[User]:
"""Get user by ID"""
try:
django_user = UserModel.objects.get(id=user_id)
return self._to_entity(django_user)
except UserModel.DoesNotExist:
return None
def _to_entity(self, django_user) -> User:
"""Convert Django model to entity"""
user = User.__new__(User) # Create without calling __init__
user.id = django_user.id
user.email = django_user.email
user.password_hash = django_user.password_hash
user.is_active = django_user.is_active
user.created_at = django_user.created_at
user.last_login = django_user.last_login
return user
class UserModel(models.Model):
"""Django model for user persistence"""
email = models.EmailField(unique=True)
password_hash = models.CharField(max_length=128)
is_active = models.BooleanField(default=True)
created_at = models.DateTimeField(auto_now_add=True)
last_login = models.DateTimeField(null=True, blank=True)
# Dependency injection and configuration
def create_user_controller():
"""Factory function for user controller"""
user_repo = DjangoUserRepository()
email_service = DjangoEmailService()
register_use_case = RegisterUserUseCase(user_repo, email_service)
auth_use_case = AuthenticateUserUseCase(user_repo)
return UserController(register_use_case, auth_use_case)
# Django views using clean architecture
def register_view(request):
"""Django view for user registration"""
controller = create_user_controller()
return controller.register(request)
def login_view(request):
"""Django view for user login"""
controller = create_user_controller()
return controller.login(request)
CQRS separates read and write operations, allowing for optimized data models and improved scalability. This pattern is particularly useful for complex Django applications with different read and write requirements.
# Commands - Write operations
from dataclasses import dataclass
from typing import Optional
from abc import ABC, abstractmethod
@dataclass
class CreateProductCommand:
"""Command for creating a product"""
name: str
description: str
price: Decimal
category_id: int
user_id: int
@dataclass
class UpdateProductPriceCommand:
"""Command for updating product price"""
product_id: int
new_price: Decimal
user_id: int
@dataclass
class DeactivateProductCommand:
"""Command for deactivating a product"""
product_id: int
reason: str
user_id: int
# Command Handlers
class CommandHandler(ABC):
"""Abstract base for command handlers"""
@abstractmethod
def handle(self, command) -> any:
pass
class CreateProductCommandHandler(CommandHandler):
"""Handler for product creation commands"""
def __init__(self, product_repo: ProductRepository,
event_store: EventStore):
self.product_repo = product_repo
self.event_store = event_store
def handle(self, command: CreateProductCommand) -> int:
"""Handle product creation command"""
# Validate business rules
if self.product_repo.exists_by_name(command.name):
raise BusinessRuleError("Product name must be unique")
# Create product
product = Product(
name=command.name,
description=command.description,
price=command.price,
category_id=command.category_id
)
# Save product
saved_product = self.product_repo.save(product)
# Record event
event = ProductCreatedEvent(
product_id=saved_product.id,
name=command.name,
price=command.price,
created_by=command.user_id,
created_at=timezone.now()
)
self.event_store.append(event)
return saved_product.id
class UpdateProductPriceCommandHandler(CommandHandler):
"""Handler for product price update commands"""
def __init__(self, product_repo: ProductRepository,
event_store: EventStore):
self.product_repo = product_repo
self.event_store = event_store
def handle(self, command: UpdateProductPriceCommand) -> None:
"""Handle product price update command"""
product = self.product_repo.get_by_id(command.product_id)
if not product:
raise ValueError("Product not found")
old_price = product.price
# Apply business rules
if command.new_price <= 0:
raise BusinessRuleError("Price must be positive")
if command.new_price < old_price * Decimal('0.5'):
raise BusinessRuleError("Cannot decrease price by more than 50%")
# Update price
product.price = command.new_price
self.product_repo.save(product)
# Record event
event = ProductPriceUpdatedEvent(
product_id=product.id,
old_price=old_price,
new_price=command.new_price,
updated_by=command.user_id,
updated_at=timezone.now()
)
self.event_store.append(event)
# Queries - Read operations
@dataclass
class GetProductQuery:
"""Query for getting a single product"""
product_id: int
@dataclass
class SearchProductsQuery:
"""Query for searching products"""
search_term: str
category_id: Optional[int] = None
min_price: Optional[Decimal] = None
max_price: Optional[Decimal] = None
page: int = 1
page_size: int = 20
@dataclass
class GetProductStatsQuery:
"""Query for product statistics"""
product_id: int
# Query Handlers
class QueryHandler(ABC):
"""Abstract base for query handlers"""
@abstractmethod
def handle(self, query) -> any:
pass
class GetProductQueryHandler(QueryHandler):
"""Handler for single product queries"""
def __init__(self, read_model_repo: ProductReadModelRepository):
self.read_model_repo = read_model_repo
def handle(self, query: GetProductQuery) -> Optional[ProductReadModel]:
"""Handle get product query"""
return self.read_model_repo.get_by_id(query.product_id)
class SearchProductsQueryHandler(QueryHandler):
"""Handler for product search queries"""
def __init__(self, read_model_repo: ProductReadModelRepository):
self.read_model_repo = read_model_repo
def handle(self, query: SearchProductsQuery) -> dict:
"""Handle product search query"""
filters = {}
if query.search_term:
filters['search_term'] = query.search_term
if query.category_id:
filters['category_id'] = query.category_id
if query.min_price:
filters['min_price'] = query.min_price
if query.max_price:
filters['max_price'] = query.max_price
products = self.read_model_repo.search(
filters=filters,
page=query.page,
page_size=query.page_size
)
total_count = self.read_model_repo.count(filters)
return {
'products': products,
'total_count': total_count,
'page': query.page,
'page_size': query.page_size,
'total_pages': (total_count + query.page_size - 1) // query.page_size
}
# Read Models - Optimized for queries
class ProductReadModel:
"""Read model optimized for product queries"""
def __init__(self, product_id: int, name: str, description: str,
price: Decimal, category_name: str, is_active: bool,
created_at: datetime, average_rating: Optional[Decimal] = None,
review_count: int = 0, view_count: int = 0):
self.product_id = product_id
self.name = name
self.description = description
self.price = price
self.category_name = category_name
self.is_active = is_active
self.created_at = created_at
self.average_rating = average_rating
self.review_count = review_count
self.view_count = view_count
class ProductReadModelRepository:
"""Repository for product read models"""
def get_by_id(self, product_id: int) -> Optional[ProductReadModel]:
"""Get product read model by ID"""
try:
# Use optimized read model table
data = ProductReadModelTable.objects.get(product_id=product_id)
return self._to_read_model(data)
except ProductReadModelTable.DoesNotExist:
return None
def search(self, filters: dict, page: int, page_size: int) -> List[ProductReadModel]:
"""Search products with filters"""
queryset = ProductReadModelTable.objects.filter(is_active=True)
if 'search_term' in filters:
queryset = queryset.filter(
Q(name__icontains=filters['search_term']) |
Q(description__icontains=filters['search_term'])
)
if 'category_id' in filters:
queryset = queryset.filter(category_id=filters['category_id'])
if 'min_price' in filters:
queryset = queryset.filter(price__gte=filters['min_price'])
if 'max_price' in filters:
queryset = queryset.filter(price__lte=filters['max_price'])
# Pagination
offset = (page - 1) * page_size
queryset = queryset[offset:offset + page_size]
return [self._to_read_model(data) for data in queryset]
def count(self, filters: dict) -> int:
"""Count products matching filters"""
# Similar filtering logic as search
queryset = ProductReadModelTable.objects.filter(is_active=True)
# Apply filters...
return queryset.count()
def _to_read_model(self, data) -> ProductReadModel:
"""Convert table data to read model"""
return ProductReadModel(
product_id=data.product_id,
name=data.name,
description=data.description,
price=data.price,
category_name=data.category_name,
is_active=data.is_active,
created_at=data.created_at,
average_rating=data.average_rating,
review_count=data.review_count,
view_count=data.view_count
)
# Read Model Table (Denormalized for performance)
class ProductReadModelTable(models.Model):
"""Denormalized table for product read operations"""
product_id = models.IntegerField(unique=True, db_index=True)
name = models.CharField(max_length=200, db_index=True)
description = models.TextField()
price = models.DecimalField(max_digits=10, decimal_places=2, db_index=True)
category_id = models.IntegerField(db_index=True)
category_name = models.CharField(max_length=100)
is_active = models.BooleanField(default=True, db_index=True)
created_at = models.DateTimeField(db_index=True)
average_rating = models.DecimalField(max_digits=3, decimal_places=2, null=True)
review_count = models.IntegerField(default=0)
view_count = models.IntegerField(default=0)
class Meta:
indexes = [
models.Index(fields=['category_id', 'is_active']),
models.Index(fields=['price', 'is_active']),
models.Index(fields=['name', 'is_active']),
]
# Event Store for CQRS
class Event:
"""Base class for domain events"""
def __init__(self, event_type: str, aggregate_id: int,
event_data: dict, occurred_at: datetime):
self.event_type = event_type
self.aggregate_id = aggregate_id
self.event_data = event_data
self.occurred_at = occurred_at
class EventStore:
"""Store for domain events"""
def append(self, event: Event) -> None:
"""Append event to store"""
EventStoreTable.objects.create(
event_type=event.event_type,
aggregate_id=event.aggregate_id,
event_data=event.event_data,
occurred_at=event.occurred_at
)
def get_events(self, aggregate_id: int) -> List[Event]:
"""Get all events for an aggregate"""
events = EventStoreTable.objects.filter(
aggregate_id=aggregate_id
).order_by('occurred_at')
return [
Event(
event_type=e.event_type,
aggregate_id=e.aggregate_id,
event_data=e.event_data,
occurred_at=e.occurred_at
)
for e in events
]
class EventStoreTable(models.Model):
"""Table for storing domain events"""
event_type = models.CharField(max_length=100)
aggregate_id = models.IntegerField(db_index=True)
event_data = models.JSONField()
occurred_at = models.DateTimeField(db_index=True)
class Meta:
indexes = [
models.Index(fields=['aggregate_id', 'occurred_at']),
models.Index(fields=['event_type', 'occurred_at']),
]
# Command and Query Bus
class CommandBus:
"""Bus for dispatching commands to handlers"""
def __init__(self):
self.handlers = {}
def register_handler(self, command_type: type, handler: CommandHandler):
"""Register command handler"""
self.handlers[command_type] = handler
def dispatch(self, command) -> any:
"""Dispatch command to appropriate handler"""
command_type = type(command)
handler = self.handlers.get(command_type)
if not handler:
raise ValueError(f"No handler registered for {command_type}")
return handler.handle(command)
class QueryBus:
"""Bus for dispatching queries to handlers"""
def __init__(self):
self.handlers = {}
def register_handler(self, query_type: type, handler: QueryHandler):
"""Register query handler"""
self.handlers[query_type] = handler
def dispatch(self, query) -> any:
"""Dispatch query to appropriate handler"""
query_type = type(query)
handler = self.handlers.get(query_type)
if not handler:
raise ValueError(f"No handler registered for {query_type}")
return handler.handle(query)
# Configuration
def configure_cqrs():
"""Configure CQRS command and query buses"""
# Command bus setup
command_bus = CommandBus()
product_repo = DjangoProductRepository()
event_store = EventStore()
command_bus.register_handler(
CreateProductCommand,
CreateProductCommandHandler(product_repo, event_store)
)
command_bus.register_handler(
UpdateProductPriceCommand,
UpdateProductPriceCommandHandler(product_repo, event_store)
)
# Query bus setup
query_bus = QueryBus()
read_model_repo = ProductReadModelRepository()
query_bus.register_handler(
GetProductQuery,
GetProductQueryHandler(read_model_repo)
)
query_bus.register_handler(
SearchProductsQuery,
SearchProductsQueryHandler(read_model_repo)
)
return command_bus, query_bus
# Usage in views
def create_product_view(request):
"""View for creating products using CQRS"""
command_bus, _ = configure_cqrs()
command = CreateProductCommand(
name=request.data['name'],
description=request.data['description'],
price=Decimal(request.data['price']),
category_id=request.data['category_id'],
user_id=request.user.id
)
try:
product_id = command_bus.dispatch(command)
return JsonResponse({'product_id': product_id}, status=201)
except BusinessRuleError as e:
return JsonResponse({'error': str(e)}, status=400)
def search_products_view(request):
"""View for searching products using CQRS"""
_, query_bus = configure_cqrs()
query = SearchProductsQuery(
search_term=request.GET.get('q', ''),
category_id=request.GET.get('category_id'),
min_price=request.GET.get('min_price'),
max_price=request.GET.get('max_price'),
page=int(request.GET.get('page', 1)),
page_size=int(request.GET.get('page_size', 20))
)
result = query_bus.dispatch(query)
return JsonResponse({
'products': [
{
'id': p.product_id,
'name': p.name,
'price': str(p.price),
'category': p.category_name,
'rating': str(p.average_rating) if p.average_rating else None,
'review_count': p.review_count
}
for p in result['products']
],
'pagination': {
'total_count': result['total_count'],
'page': result['page'],
'page_size': result['page_size'],
'total_pages': result['total_pages']
}
})
Event Sourcing stores all changes to application state as a sequence of events. This pattern provides complete audit trails and enables powerful features like time travel and event replay.
# Domain Events
from dataclasses import dataclass
from datetime import datetime
from decimal import Decimal
from typing import List, Optional
@dataclass
class DomainEvent:
"""Base class for domain events"""
aggregate_id: int
event_id: str
occurred_at: datetime
version: int
@dataclass
class ProductCreatedEvent(DomainEvent):
"""Event for product creation"""
name: str
description: str
price: Decimal
category_id: int
created_by: int
@dataclass
class ProductPriceChangedEvent(DomainEvent):
"""Event for product price changes"""
old_price: Decimal
new_price: Decimal
changed_by: int
@dataclass
class ProductDeactivatedEvent(DomainEvent):
"""Event for product deactivation"""
reason: str
deactivated_by: int
# Event-Sourced Aggregate
class ProductAggregate:
"""Product aggregate using event sourcing"""
def __init__(self, product_id: int):
self.id = product_id
self.name = None
self.description = None
self.price = None
self.category_id = None
self.is_active = True
self.version = 0
self.uncommitted_events = []
@classmethod
def create(cls, product_id: int, name: str, description: str,
price: Decimal, category_id: int, created_by: int) -> 'ProductAggregate':
"""Create new product aggregate"""
aggregate = cls(product_id)
event = ProductCreatedEvent(
aggregate_id=product_id,
event_id=str(uuid.uuid4()),
occurred_at=timezone.now(),
version=1,
name=name,
description=description,
price=price,
category_id=category_id,
created_by=created_by
)
aggregate._apply_event(event)
aggregate.uncommitted_events.append(event)
return aggregate
def change_price(self, new_price: Decimal, changed_by: int):
"""Change product price"""
if new_price <= 0:
raise ValueError("Price must be positive")
if new_price == self.price:
return # No change needed
old_price = self.price
event = ProductPriceChangedEvent(
aggregate_id=self.id,
event_id=str(uuid.uuid4()),
occurred_at=timezone.now(),
version=self.version + 1,
old_price=old_price,
new_price=new_price,
changed_by=changed_by
)
self._apply_event(event)
self.uncommitted_events.append(event)
def deactivate(self, reason: str, deactivated_by: int):
"""Deactivate product"""
if not self.is_active:
raise ValueError("Product is already deactivated")
event = ProductDeactivatedEvent(
aggregate_id=self.id,
event_id=str(uuid.uuid4()),
occurred_at=timezone.now(),
version=self.version + 1,
reason=reason,
deactivated_by=deactivated_by
)
self._apply_event(event)
self.uncommitted_events.append(event)
def load_from_history(self, events: List[DomainEvent]):
"""Rebuild aggregate from event history"""
for event in events:
self._apply_event(event)
def mark_events_as_committed(self):
"""Mark uncommitted events as committed"""
self.uncommitted_events.clear()
def _apply_event(self, event: DomainEvent):
"""Apply event to aggregate state"""
if isinstance(event, ProductCreatedEvent):
self.name = event.name
self.description = event.description
self.price = event.price
self.category_id = event.category_id
self.is_active = True
elif isinstance(event, ProductPriceChangedEvent):
self.price = event.new_price
elif isinstance(event, ProductDeactivatedEvent):
self.is_active = False
self.version = event.version
# Event Store
class EventStore:
"""Store for domain events with event sourcing"""
def save_events(self, aggregate_id: int, events: List[DomainEvent],
expected_version: int) -> None:
"""Save events with optimistic concurrency control"""
with transaction.atomic():
# Check current version
current_version = self._get_current_version(aggregate_id)
if current_version != expected_version:
raise ConcurrencyError(
f"Expected version {expected_version}, "
f"but current version is {current_version}"
)
# Save events
for event in events:
EventStoreRecord.objects.create(
aggregate_id=event.aggregate_id,
event_id=event.event_id,
event_type=event.__class__.__name__,
event_data=self._serialize_event(event),
version=event.version,
occurred_at=event.occurred_at
)
def get_events(self, aggregate_id: int, from_version: int = 0) -> List[DomainEvent]:
"""Get events for aggregate from specific version"""
records = EventStoreRecord.objects.filter(
aggregate_id=aggregate_id,
version__gt=from_version
).order_by('version')
return [self._deserialize_event(record) for record in records]
def get_all_events(self, from_timestamp: Optional[datetime] = None) -> List[DomainEvent]:
"""Get all events from specific timestamp"""
queryset = EventStoreRecord.objects.all()
if from_timestamp:
queryset = queryset.filter(occurred_at__gte=from_timestamp)
return [self._deserialize_event(record) for record in queryset.order_by('occurred_at')]
def _get_current_version(self, aggregate_id: int) -> int:
"""Get current version of aggregate"""
latest_event = EventStoreRecord.objects.filter(
aggregate_id=aggregate_id
).order_by('-version').first()
return latest_event.version if latest_event else 0
def _serialize_event(self, event: DomainEvent) -> dict:
"""Serialize event to dictionary"""
return {
'aggregate_id': event.aggregate_id,
'event_id': event.event_id,
'occurred_at': event.occurred_at.isoformat(),
'version': event.version,
**{k: v for k, v in event.__dict__.items()
if k not in ['aggregate_id', 'event_id', 'occurred_at', 'version']}
}
def _deserialize_event(self, record) -> DomainEvent:
"""Deserialize event from record"""
event_data = record.event_data
event_class = globals()[record.event_type] # Get event class by name
return event_class(
aggregate_id=record.aggregate_id,
event_id=record.event_id,
occurred_at=record.occurred_at,
version=record.version,
**{k: v for k, v in event_data.items()
if k not in ['aggregate_id', 'event_id', 'occurred_at', 'version']}
)
class EventStoreRecord(models.Model):
"""Database table for event store"""
aggregate_id = models.IntegerField(db_index=True)
event_id = models.CharField(max_length=36, unique=True)
event_type = models.CharField(max_length=100)
event_data = models.JSONField()
version = models.IntegerField()
occurred_at = models.DateTimeField(db_index=True)
class Meta:
indexes = [
models.Index(fields=['aggregate_id', 'version']),
models.Index(fields=['event_type', 'occurred_at']),
]
unique_together = [['aggregate_id', 'version']]
# Repository for Event-Sourced Aggregates
class ProductAggregateRepository:
"""Repository for product aggregates using event sourcing"""
def __init__(self, event_store: EventStore):
self.event_store = event_store
def get_by_id(self, product_id: int) -> Optional[ProductAggregate]:
"""Get product aggregate by ID"""
events = self.event_store.get_events(product_id)
if not events:
return None
aggregate = ProductAggregate(product_id)
aggregate.load_from_history(events)
return aggregate
def save(self, aggregate: ProductAggregate) -> None:
"""Save aggregate by storing uncommitted events"""
if not aggregate.uncommitted_events:
return
expected_version = aggregate.version - len(aggregate.uncommitted_events)
self.event_store.save_events(
aggregate.id,
aggregate.uncommitted_events,
expected_version
)
aggregate.mark_events_as_committed()
# Event Projections for Read Models
class ProductProjection:
"""Projection that builds read models from events"""
def __init__(self):
self.handlers = {
ProductCreatedEvent: self._handle_product_created,
ProductPriceChangedEvent: self._handle_price_changed,
ProductDeactivatedEvent: self._handle_product_deactivated,
}
def project_event(self, event: DomainEvent):
"""Project event to read model"""
handler = self.handlers.get(type(event))
if handler:
handler(event)
def _handle_product_created(self, event: ProductCreatedEvent):
"""Handle product created event"""
ProductReadModel.objects.create(
product_id=event.aggregate_id,
name=event.name,
description=event.description,
price=event.price,
category_id=event.category_id,
is_active=True,
created_at=event.occurred_at
)
def _handle_price_changed(self, event: ProductPriceChangedEvent):
"""Handle price changed event"""
ProductReadModel.objects.filter(
product_id=event.aggregate_id
).update(price=event.new_price)
def _handle_product_deactivated(self, event: ProductDeactivatedEvent):
"""Handle product deactivated event"""
ProductReadModel.objects.filter(
product_id=event.aggregate_id
).update(is_active=False)
# Event Processing
class EventProcessor:
"""Process events for projections and side effects"""
def __init__(self, event_store: EventStore):
self.event_store = event_store
self.projections = []
self.event_handlers = []
def add_projection(self, projection):
"""Add projection to process events"""
self.projections.append(projection)
def add_event_handler(self, handler):
"""Add event handler for side effects"""
self.event_handlers.append(handler)
def process_new_events(self, from_timestamp: Optional[datetime] = None):
"""Process new events"""
events = self.event_store.get_all_events(from_timestamp)
for event in events:
# Update projections
for projection in self.projections:
projection.project_event(event)
# Handle side effects
for handler in self.event_handlers:
handler.handle(event)
# Usage Example
def create_product_with_event_sourcing(request):
"""Create product using event sourcing"""
event_store = EventStore()
repo = ProductAggregateRepository(event_store)
# Create aggregate
product_id = generate_product_id()
aggregate = ProductAggregate.create(
product_id=product_id,
name=request.data['name'],
description=request.data['description'],
price=Decimal(request.data['price']),
category_id=request.data['category_id'],
created_by=request.user.id
)
# Save aggregate (stores events)
repo.save(aggregate)
# Process events for projections
processor = EventProcessor(event_store)
processor.add_projection(ProductProjection())
processor.process_new_events()
return JsonResponse({'product_id': product_id}, status=201)
Layered Architecture
Hexagonal Architecture
Clean Architecture
CQRS
Event Sourcing
# Example: CQRS + Event Sourcing + Clean Architecture
class OrderApplicationService:
"""Application service combining multiple patterns"""
def __init__(self,
command_bus: CommandBus,
query_bus: QueryBus,
event_store: EventStore):
self.command_bus = command_bus
self.query_bus = query_bus
self.event_store = event_store
def place_order(self, command: PlaceOrderCommand) -> OrderId:
"""Place order using CQRS and event sourcing"""
# Use command bus for write operations
order_id = self.command_bus.dispatch(command)
# Events are automatically stored in event store
# Projections are updated asynchronously
return order_id
def get_order_details(self, order_id: int) -> OrderDetails:
"""Get order details using CQRS query side"""
query = GetOrderDetailsQuery(order_id=order_id)
return self.query_bus.dispatch(query)
def get_order_history(self, order_id: int) -> List[OrderEvent]:
"""Get complete order history from event store"""
return self.event_store.get_events(order_id)
System architecture patterns provide the foundation for building maintainable, scalable Django applications. Choose patterns based on your specific requirements, team expertise, and long-term goals. Start with simpler patterns like layered architecture and evolve to more sophisticated patterns as your application grows in complexity.
The key is understanding that these patterns are tools to solve specific problems. Use them judiciously, and don't over-engineer solutions for simple problems. The best architecture is the one that solves your current problems while providing a clear path for future evolution.
Advanced and Expert Topics
This comprehensive guide explores advanced Django concepts and expert-level techniques for building enterprise-scale applications. These topics represent the pinnacle of Django development, covering system architecture patterns, domain-driven design, large-scale project organization, and advanced customization techniques that separate expert developers from intermediate practitioners.
Domain Driven Design with Django
Domain Driven Design (DDD) is a software development approach that focuses on creating a rich model of the business domain. When applied to Django applications, DDD helps create maintainable, expressive code that closely reflects business requirements and enables effective communication between developers and domain experts.