Advanced and Expert Topics

System Architecture Patterns

System architecture patterns provide proven solutions for organizing complex Django applications. These patterns help create maintainable, scalable, and testable systems by defining clear boundaries between components and establishing consistent communication patterns. This comprehensive guide covers the most important architectural patterns for Django applications.

System Architecture Patterns

System architecture patterns provide proven solutions for organizing complex Django applications. These patterns help create maintainable, scalable, and testable systems by defining clear boundaries between components and establishing consistent communication patterns. This comprehensive guide covers the most important architectural patterns for Django applications.

Understanding Architecture Patterns

Architecture patterns are high-level design templates that provide structure for organizing application components. They define how different parts of your system interact, where business logic resides, and how to manage dependencies between components.

Why Architecture Patterns Matter

Maintainability

  • Clear separation of concerns
  • Predictable code organization
  • Easier to understand and modify
  • Reduced coupling between components

Scalability

  • Components can be scaled independently
  • Clear boundaries enable distributed systems
  • Performance bottlenecks are isolated
  • Team scaling becomes manageable

Testability

  • Business logic is isolated and testable
  • Dependencies can be mocked easily
  • Unit tests focus on specific concerns
  • Integration testing is more targeted

Flexibility

  • Easy to swap implementations
  • New features don't break existing code
  • Technology changes are contained
  • Business rules are protected from infrastructure changes

Layered Architecture Pattern

The layered architecture pattern organizes code into horizontal layers, each with specific responsibilities. This is the most common pattern in Django applications.

Traditional Django Layers

# Presentation Layer (Views)
from django.http import JsonResponse
from django.views import View
from .services import ProductService
from .serializers import ProductSerializer

class ProductView(View):
    """Presentation layer - handles HTTP concerns"""
    
    def __init__(self):
        self.product_service = ProductService()
        self.serializer = ProductSerializer()
    
    def get(self, request, product_id):
        """Handle GET request for product details"""
        try:
            product = self.product_service.get_product(product_id)
            data = self.serializer.serialize(product)
            return JsonResponse(data)
        except ProductNotFound:
            return JsonResponse({'error': 'Product not found'}, status=404)
    
    def post(self, request):
        """Handle POST request for product creation"""
        try:
            data = self.serializer.deserialize(request.body)
            product = self.product_service.create_product(data)
            response_data = self.serializer.serialize(product)
            return JsonResponse(response_data, status=201)
        except ValidationError as e:
            return JsonResponse({'errors': e.messages}, status=400)

# Business Logic Layer (Services)
from .models import Product, Category
from .repositories import ProductRepository
from django.db import transaction

class ProductService:
    """Business logic layer - contains use cases and business rules"""
    
    def __init__(self):
        self.product_repo = ProductRepository()
    
    def get_product(self, product_id: int) -> Product:
        """Get product with business logic"""
        product = self.product_repo.get_by_id(product_id)
        if not product:
            raise ProductNotFound(f"Product {product_id} not found")
        
        # Apply business rules
        if not product.is_active:
            raise ProductNotAvailable("Product is not available")
        
        return product
    
    @transaction.atomic
    def create_product(self, data: dict) -> Product:
        """Create product with business validation"""
        # Validate business rules
        if self.product_repo.exists_by_name(data['name']):
            raise BusinessRuleError("Product name must be unique")
        
        category = Category.objects.get(id=data['category_id'])
        if not category.allows_products:
            raise BusinessRuleError("Category does not allow products")
        
        # Create product
        product = Product(
            name=data['name'],
            description=data['description'],
            price=data['price'],
            category=category
        )
        
        # Apply business logic
        product.calculate_pricing()
        product.set_initial_status()
        
        return self.product_repo.save(product)
    
    def update_product_price(self, product_id: int, new_price: Decimal) -> Product:
        """Update product price with business rules"""
        product = self.get_product(product_id)
        
        # Business rule: price changes require approval for expensive items
        if product.price > 1000 and new_price != product.price:
            product.requires_approval = True
        
        # Business rule: cannot decrease price by more than 50%
        if new_price < product.price * Decimal('0.5'):
            raise BusinessRuleError("Cannot decrease price by more than 50%")
        
        product.price = new_price
        product.price_updated_at = timezone.now()
        
        return self.product_repo.save(product)

# Data Access Layer (Repository Pattern)
from typing import Optional, List
from django.db.models import Q

class ProductRepository:
    """Data access layer - abstracts database operations"""
    
    def get_by_id(self, product_id: int) -> Optional[Product]:
        """Get product by ID"""
        try:
            return Product.objects.select_related('category').get(id=product_id)
        except Product.DoesNotExist:
            return None
    
    def get_by_name(self, name: str) -> Optional[Product]:
        """Get product by name"""
        try:
            return Product.objects.get(name=name)
        except Product.DoesNotExist:
            return None
    
    def exists_by_name(self, name: str) -> bool:
        """Check if product exists by name"""
        return Product.objects.filter(name=name).exists()
    
    def find_by_category(self, category_id: int) -> List[Product]:
        """Find products by category"""
        return list(
            Product.objects.filter(category_id=category_id, is_active=True)
            .select_related('category')
            .order_by('name')
        )
    
    def search(self, query: str) -> List[Product]:
        """Search products by name or description"""
        return list(
            Product.objects.filter(
                Q(name__icontains=query) | Q(description__icontains=query),
                is_active=True
            ).select_related('category')
        )
    
    def save(self, product: Product) -> Product:
        """Save product to database"""
        product.save()
        return product
    
    def delete(self, product: Product) -> None:
        """Delete product from database"""
        product.delete()

# Domain Layer (Models with Business Logic)
from django.db import models
from decimal import Decimal

class Product(models.Model):
    """Product model with business logic"""
    
    name = models.CharField(max_length=200, unique=True)
    description = models.TextField()
    price = models.DecimalField(max_digits=10, decimal_places=2)
    category = models.ForeignKey('Category', on_delete=models.CASCADE)
    is_active = models.BooleanField(default=True)
    requires_approval = models.BooleanField(default=False)
    created_at = models.DateTimeField(auto_now_add=True)
    price_updated_at = models.DateTimeField(auto_now=True)
    
    def calculate_pricing(self):
        """Business logic for pricing calculation"""
        # Apply category-based pricing rules
        if self.category.premium_category:
            self.price *= Decimal('1.2')  # 20% premium
        
        # Apply volume-based pricing
        if self.price > 100:
            self.price = self.price.quantize(Decimal('0.99'))  # Psychological pricing
    
    def set_initial_status(self):
        """Business logic for initial status"""
        # New products start as inactive for review
        self.is_active = False
        
        # Expensive products require approval
        if self.price > 1000:
            self.requires_approval = True
    
    def can_be_deleted(self) -> bool:
        """Business rule for deletion"""
        # Cannot delete products with orders
        return not self.orderitem_set.exists()
    
    def apply_discount(self, percentage: Decimal) -> Decimal:
        """Business logic for discount application"""
        if percentage < 0 or percentage > 50:
            raise ValueError("Discount must be between 0% and 50%")
        
        discount_amount = self.price * (percentage / 100)
        return self.price - discount_amount
    
    class Meta:
        indexes = [
            models.Index(fields=['category', 'is_active']),
            models.Index(fields=['name']),
        ]

Hexagonal Architecture (Ports and Adapters)

Hexagonal architecture, also known as Ports and Adapters, isolates the core business logic from external concerns. This pattern is excellent for complex Django applications with multiple integrations.

Core Concepts

# Domain Layer - Pure business logic
from abc import ABC, abstractmethod
from typing import List, Optional
from dataclasses import dataclass
from decimal import Decimal

@dataclass
class OrderItem:
    """Value object for order items"""
    product_id: int
    quantity: int
    price: Decimal
    
    def total(self) -> Decimal:
        return self.quantity * self.price

class Order:
    """Rich domain model with business logic"""
    
    def __init__(self, customer_id: int):
        self.customer_id = customer_id
        self.items: List[OrderItem] = []
        self.status = 'draft'
        self.total = Decimal('0.00')
        self.created_at = None
        self.id = None
    
    def add_item(self, product_id: int, quantity: int, price: Decimal):
        """Add item with business validation"""
        if quantity <= 0:
            raise ValueError("Quantity must be positive")
        
        if price <= 0:
            raise ValueError("Price must be positive")
        
        # Check if item already exists
        existing_item = self._find_item(product_id)
        if existing_item:
            existing_item.quantity += quantity
        else:
            self.items.append(OrderItem(product_id, quantity, price))
        
        self._recalculate_total()
    
    def remove_item(self, product_id: int):
        """Remove item from order"""
        self.items = [item for item in self.items if item.product_id != product_id]
        self._recalculate_total()
    
    def confirm(self):
        """Confirm order with business rules"""
        if not self.items:
            raise ValueError("Cannot confirm empty order")
        
        if self.status != 'draft':
            raise ValueError("Can only confirm draft orders")
        
        self.status = 'confirmed'
    
    def cancel(self):
        """Cancel order with business rules"""
        if self.status not in ['draft', 'confirmed']:
            raise ValueError("Cannot cancel order in current status")
        
        self.status = 'cancelled'
    
    def _find_item(self, product_id: int) -> Optional[OrderItem]:
        """Find item by product ID"""
        return next((item for item in self.items if item.product_id == product_id), None)
    
    def _recalculate_total(self):
        """Recalculate order total"""
        self.total = sum(item.total() for item in self.items)

# Ports (Interfaces) - Define contracts
class OrderRepository(ABC):
    """Port for order persistence"""
    
    @abstractmethod
    def save(self, order: Order) -> Order:
        pass
    
    @abstractmethod
    def get_by_id(self, order_id: int) -> Optional[Order]:
        pass
    
    @abstractmethod
    def get_by_customer(self, customer_id: int) -> List[Order]:
        pass

class PaymentService(ABC):
    """Port for payment processing"""
    
    @abstractmethod
    def process_payment(self, order: Order, payment_method: str) -> bool:
        pass

class InventoryService(ABC):
    """Port for inventory management"""
    
    @abstractmethod
    def reserve_items(self, items: List[OrderItem]) -> bool:
        pass
    
    @abstractmethod
    def release_items(self, items: List[OrderItem]) -> bool:
        pass

class NotificationService(ABC):
    """Port for notifications"""
    
    @abstractmethod
    def send_order_confirmation(self, order: Order) -> bool:
        pass

# Application Service - Orchestrates use cases
class OrderService:
    """Application service using dependency inversion"""
    
    def __init__(self, 
                 order_repo: OrderRepository,
                 payment_service: PaymentService,
                 inventory_service: InventoryService,
                 notification_service: NotificationService):
        self.order_repo = order_repo
        self.payment_service = payment_service
        self.inventory_service = inventory_service
        self.notification_service = notification_service
    
    def create_order(self, customer_id: int, items: List[dict]) -> Order:
        """Create new order use case"""
        order = Order(customer_id)
        
        # Add items to order
        for item_data in items:
            order.add_item(
                product_id=item_data['product_id'],
                quantity=item_data['quantity'],
                price=item_data['price']
            )
        
        # Save order
        return self.order_repo.save(order)
    
    def place_order(self, order_id: int, payment_method: str) -> bool:
        """Place order use case"""
        order = self.order_repo.get_by_id(order_id)
        if not order:
            raise ValueError("Order not found")
        
        # Reserve inventory
        if not self.inventory_service.reserve_items(order.items):
            raise ValueError("Insufficient inventory")
        
        try:
            # Process payment
            if not self.payment_service.process_payment(order, payment_method):
                # Release inventory if payment fails
                self.inventory_service.release_items(order.items)
                raise ValueError("Payment processing failed")
            
            # Confirm order
            order.confirm()
            self.order_repo.save(order)
            
            # Send confirmation
            self.notification_service.send_order_confirmation(order)
            
            return True
            
        except Exception:
            # Release inventory on any failure
            self.inventory_service.release_items(order.items)
            raise

# Adapters - Implement the ports
from django.db import models, transaction

class DjangoOrderRepository(OrderRepository):
    """Django ORM adapter for order repository"""
    
    def save(self, order: Order) -> Order:
        """Save order using Django ORM"""
        with transaction.atomic():
            if order.id:
                # Update existing order
                django_order = OrderModel.objects.get(id=order.id)
                django_order.status = order.status
                django_order.total = order.total
                django_order.save()
                
                # Update items
                django_order.items.all().delete()
            else:
                # Create new order
                django_order = OrderModel.objects.create(
                    customer_id=order.customer_id,
                    status=order.status,
                    total=order.total
                )
                order.id = django_order.id
                order.created_at = django_order.created_at
            
            # Save items
            for item in order.items:
                OrderItemModel.objects.create(
                    order=django_order,
                    product_id=item.product_id,
                    quantity=item.quantity,
                    price=item.price
                )
        
        return order
    
    def get_by_id(self, order_id: int) -> Optional[Order]:
        """Get order by ID"""
        try:
            django_order = OrderModel.objects.prefetch_related('items').get(id=order_id)
            return self._to_domain_object(django_order)
        except OrderModel.DoesNotExist:
            return None
    
    def get_by_customer(self, customer_id: int) -> List[Order]:
        """Get orders by customer"""
        django_orders = OrderModel.objects.filter(
            customer_id=customer_id
        ).prefetch_related('items').order_by('-created_at')
        
        return [self._to_domain_object(django_order) for django_order in django_orders]
    
    def _to_domain_object(self, django_order) -> Order:
        """Convert Django model to domain object"""
        order = Order(django_order.customer_id)
        order.id = django_order.id
        order.status = django_order.status
        order.total = django_order.total
        order.created_at = django_order.created_at
        
        # Add items
        for django_item in django_order.items.all():
            order.items.append(OrderItem(
                product_id=django_item.product_id,
                quantity=django_item.quantity,
                price=django_item.price
            ))
        
        return order

class StripePaymentService(PaymentService):
    """Stripe adapter for payment processing"""
    
    def __init__(self, api_key: str):
        self.api_key = api_key
    
    def process_payment(self, order: Order, payment_method: str) -> bool:
        """Process payment using Stripe"""
        try:
            # Stripe payment processing logic
            import stripe
            stripe.api_key = self.api_key
            
            charge = stripe.Charge.create(
                amount=int(order.total * 100),  # Convert to cents
                currency='usd',
                source=payment_method,
                description=f'Order {order.id}'
            )
            
            return charge.paid
        except Exception as e:
            logger.error(f"Payment processing failed: {e}")
            return False

class RedisInventoryService(InventoryService):
    """Redis adapter for inventory management"""
    
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def reserve_items(self, items: List[OrderItem]) -> bool:
        """Reserve items in Redis"""
        pipe = self.redis.pipeline()
        
        try:
            # Check availability first
            for item in items:
                available = self.redis.get(f"inventory:{item.product_id}")
                if not available or int(available) < item.quantity:
                    return False
            
            # Reserve items
            for item in items:
                pipe.decrby(f"inventory:{item.product_id}", item.quantity)
            
            pipe.execute()
            return True
            
        except Exception as e:
            logger.error(f"Inventory reservation failed: {e}")
            return False
    
    def release_items(self, items: List[OrderItem]) -> bool:
        """Release reserved items"""
        pipe = self.redis.pipeline()
        
        try:
            for item in items:
                pipe.incrby(f"inventory:{item.product_id}", item.quantity)
            
            pipe.execute()
            return True
            
        except Exception as e:
            logger.error(f"Inventory release failed: {e}")
            return False

# Django Models (Infrastructure)
class OrderModel(models.Model):
    """Django model for order persistence"""
    
    customer_id = models.IntegerField()
    status = models.CharField(max_length=20, default='draft')
    total = models.DecimalField(max_digits=10, decimal_places=2, default=0)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

class OrderItemModel(models.Model):
    """Django model for order items"""
    
    order = models.ForeignKey(OrderModel, related_name='items', on_delete=models.CASCADE)
    product_id = models.IntegerField()
    quantity = models.IntegerField()
    price = models.DecimalField(max_digits=10, decimal_places=2)

# Dependency Injection Configuration
class DIContainer:
    """Simple dependency injection container"""
    
    def __init__(self):
        self._services = {}
    
    def register(self, interface, implementation):
        """Register service implementation"""
        self._services[interface] = implementation
    
    def get(self, interface):
        """Get service implementation"""
        return self._services.get(interface)

# Configuration
def configure_dependencies():
    """Configure dependency injection"""
    container = DIContainer()
    
    # Register implementations
    container.register(OrderRepository, DjangoOrderRepository())
    container.register(PaymentService, StripePaymentService(settings.STRIPE_API_KEY))
    container.register(InventoryService, RedisInventoryService(redis_client))
    container.register(NotificationService, EmailNotificationService())
    
    return container

# Usage in views
def create_order_view(request):
    """View using hexagonal architecture"""
    container = configure_dependencies()
    
    order_service = OrderService(
        order_repo=container.get(OrderRepository),
        payment_service=container.get(PaymentService),
        inventory_service=container.get(InventoryService),
        notification_service=container.get(NotificationService)
    )
    
    # Extract data from request
    customer_id = request.user.id
    items = request.data.get('items', [])
    
    # Create order
    order = order_service.create_order(customer_id, items)
    
    return JsonResponse({
        'order_id': order.id,
        'total': str(order.total),
        'status': order.status
    })

Clean Architecture

Clean Architecture takes the separation of concerns further by organizing code into concentric circles, with business logic at the center and external concerns at the outer layers.

Clean Architecture Implementation

# Entities (Innermost layer) - Enterprise business rules
class User:
    """User entity with business rules"""
    
    def __init__(self, email: str, password: str):
        self.email = email
        self.password_hash = self._hash_password(password)
        self.is_active = True
        self.created_at = timezone.now()
        self.last_login = None
    
    def authenticate(self, password: str) -> bool:
        """Authenticate user with business rules"""
        if not self.is_active:
            return False
        
        return self._verify_password(password, self.password_hash)
    
    def change_password(self, old_password: str, new_password: str):
        """Change password with business validation"""
        if not self.authenticate(old_password):
            raise ValueError("Current password is incorrect")
        
        if len(new_password) < 8:
            raise ValueError("Password must be at least 8 characters")
        
        self.password_hash = self._hash_password(new_password)
    
    def deactivate(self):
        """Deactivate user account"""
        self.is_active = False
    
    def record_login(self):
        """Record successful login"""
        self.last_login = timezone.now()
    
    def _hash_password(self, password: str) -> str:
        """Hash password using secure algorithm"""
        import bcrypt
        return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
    
    def _verify_password(self, password: str, hash: str) -> bool:
        """Verify password against hash"""
        import bcrypt
        return bcrypt.checkpw(password.encode('utf-8'), hash.encode('utf-8'))

# Use Cases (Application layer) - Application business rules
from abc import ABC, abstractmethod

class UserRepository(ABC):
    """Abstract repository for user persistence"""
    
    @abstractmethod
    def save(self, user: User) -> User:
        pass
    
    @abstractmethod
    def get_by_email(self, email: str) -> Optional[User]:
        pass
    
    @abstractmethod
    def get_by_id(self, user_id: int) -> Optional[User]:
        pass

class EmailService(ABC):
    """Abstract service for email operations"""
    
    @abstractmethod
    def send_welcome_email(self, user: User) -> bool:
        pass
    
    @abstractmethod
    def send_password_reset_email(self, user: User, reset_token: str) -> bool:
        pass

class RegisterUserUseCase:
    """Use case for user registration"""
    
    def __init__(self, user_repo: UserRepository, email_service: EmailService):
        self.user_repo = user_repo
        self.email_service = email_service
    
    def execute(self, email: str, password: str) -> User:
        """Execute user registration use case"""
        # Validate input
        if not email or '@' not in email:
            raise ValueError("Invalid email address")
        
        # Check if user already exists
        existing_user = self.user_repo.get_by_email(email)
        if existing_user:
            raise ValueError("User already exists")
        
        # Create user entity
        user = User(email, password)
        
        # Save user
        saved_user = self.user_repo.save(user)
        
        # Send welcome email
        self.email_service.send_welcome_email(saved_user)
        
        return saved_user

class AuthenticateUserUseCase:
    """Use case for user authentication"""
    
    def __init__(self, user_repo: UserRepository):
        self.user_repo = user_repo
    
    def execute(self, email: str, password: str) -> Optional[User]:
        """Execute user authentication use case"""
        user = self.user_repo.get_by_email(email)
        if not user:
            return None
        
        if user.authenticate(password):
            user.record_login()
            self.user_repo.save(user)
            return user
        
        return None

# Interface Adapters (Controllers, Presenters, Gateways)
class UserController:
    """Controller for user-related HTTP requests"""
    
    def __init__(self, register_use_case: RegisterUserUseCase,
                 auth_use_case: AuthenticateUserUseCase):
        self.register_use_case = register_use_case
        self.auth_use_case = auth_use_case
    
    def register(self, request):
        """Handle user registration request"""
        try:
            email = request.data.get('email')
            password = request.data.get('password')
            
            user = self.register_use_case.execute(email, password)
            
            return JsonResponse({
                'id': user.id,
                'email': user.email,
                'created_at': user.created_at.isoformat()
            }, status=201)
            
        except ValueError as e:
            return JsonResponse({'error': str(e)}, status=400)
    
    def login(self, request):
        """Handle user login request"""
        email = request.data.get('email')
        password = request.data.get('password')
        
        user = self.auth_use_case.execute(email, password)
        
        if user:
            # Create session or JWT token
            request.session['user_id'] = user.id
            
            return JsonResponse({
                'id': user.id,
                'email': user.email,
                'last_login': user.last_login.isoformat() if user.last_login else None
            })
        else:
            return JsonResponse({'error': 'Invalid credentials'}, status=401)

# Frameworks and Drivers (Outermost layer)
class DjangoUserRepository(UserRepository):
    """Django implementation of user repository"""
    
    def save(self, user: User) -> User:
        """Save user using Django ORM"""
        if hasattr(user, 'id') and user.id:
            # Update existing user
            django_user = UserModel.objects.get(id=user.id)
            django_user.email = user.email
            django_user.password_hash = user.password_hash
            django_user.is_active = user.is_active
            django_user.last_login = user.last_login
            django_user.save()
        else:
            # Create new user
            django_user = UserModel.objects.create(
                email=user.email,
                password_hash=user.password_hash,
                is_active=user.is_active,
                last_login=user.last_login
            )
            user.id = django_user.id
        
        return user
    
    def get_by_email(self, email: str) -> Optional[User]:
        """Get user by email"""
        try:
            django_user = UserModel.objects.get(email=email)
            return self._to_entity(django_user)
        except UserModel.DoesNotExist:
            return None
    
    def get_by_id(self, user_id: int) -> Optional[User]:
        """Get user by ID"""
        try:
            django_user = UserModel.objects.get(id=user_id)
            return self._to_entity(django_user)
        except UserModel.DoesNotExist:
            return None
    
    def _to_entity(self, django_user) -> User:
        """Convert Django model to entity"""
        user = User.__new__(User)  # Create without calling __init__
        user.id = django_user.id
        user.email = django_user.email
        user.password_hash = django_user.password_hash
        user.is_active = django_user.is_active
        user.created_at = django_user.created_at
        user.last_login = django_user.last_login
        return user

class UserModel(models.Model):
    """Django model for user persistence"""
    
    email = models.EmailField(unique=True)
    password_hash = models.CharField(max_length=128)
    is_active = models.BooleanField(default=True)
    created_at = models.DateTimeField(auto_now_add=True)
    last_login = models.DateTimeField(null=True, blank=True)

# Dependency injection and configuration
def create_user_controller():
    """Factory function for user controller"""
    user_repo = DjangoUserRepository()
    email_service = DjangoEmailService()
    
    register_use_case = RegisterUserUseCase(user_repo, email_service)
    auth_use_case = AuthenticateUserUseCase(user_repo)
    
    return UserController(register_use_case, auth_use_case)

# Django views using clean architecture
def register_view(request):
    """Django view for user registration"""
    controller = create_user_controller()
    return controller.register(request)

def login_view(request):
    """Django view for user login"""
    controller = create_user_controller()
    return controller.login(request)

CQRS (Command Query Responsibility Segregation)

CQRS separates read and write operations, allowing for optimized data models and improved scalability. This pattern is particularly useful for complex Django applications with different read and write requirements.

CQRS Implementation

# Commands - Write operations
from dataclasses import dataclass
from typing import Optional
from abc import ABC, abstractmethod

@dataclass
class CreateProductCommand:
    """Command for creating a product"""
    name: str
    description: str
    price: Decimal
    category_id: int
    user_id: int

@dataclass
class UpdateProductPriceCommand:
    """Command for updating product price"""
    product_id: int
    new_price: Decimal
    user_id: int

@dataclass
class DeactivateProductCommand:
    """Command for deactivating a product"""
    product_id: int
    reason: str
    user_id: int

# Command Handlers
class CommandHandler(ABC):
    """Abstract base for command handlers"""
    
    @abstractmethod
    def handle(self, command) -> any:
        pass

class CreateProductCommandHandler(CommandHandler):
    """Handler for product creation commands"""
    
    def __init__(self, product_repo: ProductRepository, 
                 event_store: EventStore):
        self.product_repo = product_repo
        self.event_store = event_store
    
    def handle(self, command: CreateProductCommand) -> int:
        """Handle product creation command"""
        # Validate business rules
        if self.product_repo.exists_by_name(command.name):
            raise BusinessRuleError("Product name must be unique")
        
        # Create product
        product = Product(
            name=command.name,
            description=command.description,
            price=command.price,
            category_id=command.category_id
        )
        
        # Save product
        saved_product = self.product_repo.save(product)
        
        # Record event
        event = ProductCreatedEvent(
            product_id=saved_product.id,
            name=command.name,
            price=command.price,
            created_by=command.user_id,
            created_at=timezone.now()
        )
        self.event_store.append(event)
        
        return saved_product.id

class UpdateProductPriceCommandHandler(CommandHandler):
    """Handler for product price update commands"""
    
    def __init__(self, product_repo: ProductRepository,
                 event_store: EventStore):
        self.product_repo = product_repo
        self.event_store = event_store
    
    def handle(self, command: UpdateProductPriceCommand) -> None:
        """Handle product price update command"""
        product = self.product_repo.get_by_id(command.product_id)
        if not product:
            raise ValueError("Product not found")
        
        old_price = product.price
        
        # Apply business rules
        if command.new_price <= 0:
            raise BusinessRuleError("Price must be positive")
        
        if command.new_price < old_price * Decimal('0.5'):
            raise BusinessRuleError("Cannot decrease price by more than 50%")
        
        # Update price
        product.price = command.new_price
        self.product_repo.save(product)
        
        # Record event
        event = ProductPriceUpdatedEvent(
            product_id=product.id,
            old_price=old_price,
            new_price=command.new_price,
            updated_by=command.user_id,
            updated_at=timezone.now()
        )
        self.event_store.append(event)

# Queries - Read operations
@dataclass
class GetProductQuery:
    """Query for getting a single product"""
    product_id: int

@dataclass
class SearchProductsQuery:
    """Query for searching products"""
    search_term: str
    category_id: Optional[int] = None
    min_price: Optional[Decimal] = None
    max_price: Optional[Decimal] = None
    page: int = 1
    page_size: int = 20

@dataclass
class GetProductStatsQuery:
    """Query for product statistics"""
    product_id: int

# Query Handlers
class QueryHandler(ABC):
    """Abstract base for query handlers"""
    
    @abstractmethod
    def handle(self, query) -> any:
        pass

class GetProductQueryHandler(QueryHandler):
    """Handler for single product queries"""
    
    def __init__(self, read_model_repo: ProductReadModelRepository):
        self.read_model_repo = read_model_repo
    
    def handle(self, query: GetProductQuery) -> Optional[ProductReadModel]:
        """Handle get product query"""
        return self.read_model_repo.get_by_id(query.product_id)

class SearchProductsQueryHandler(QueryHandler):
    """Handler for product search queries"""
    
    def __init__(self, read_model_repo: ProductReadModelRepository):
        self.read_model_repo = read_model_repo
    
    def handle(self, query: SearchProductsQuery) -> dict:
        """Handle product search query"""
        filters = {}
        
        if query.search_term:
            filters['search_term'] = query.search_term
        
        if query.category_id:
            filters['category_id'] = query.category_id
        
        if query.min_price:
            filters['min_price'] = query.min_price
        
        if query.max_price:
            filters['max_price'] = query.max_price
        
        products = self.read_model_repo.search(
            filters=filters,
            page=query.page,
            page_size=query.page_size
        )
        
        total_count = self.read_model_repo.count(filters)
        
        return {
            'products': products,
            'total_count': total_count,
            'page': query.page,
            'page_size': query.page_size,
            'total_pages': (total_count + query.page_size - 1) // query.page_size
        }

# Read Models - Optimized for queries
class ProductReadModel:
    """Read model optimized for product queries"""
    
    def __init__(self, product_id: int, name: str, description: str,
                 price: Decimal, category_name: str, is_active: bool,
                 created_at: datetime, average_rating: Optional[Decimal] = None,
                 review_count: int = 0, view_count: int = 0):
        self.product_id = product_id
        self.name = name
        self.description = description
        self.price = price
        self.category_name = category_name
        self.is_active = is_active
        self.created_at = created_at
        self.average_rating = average_rating
        self.review_count = review_count
        self.view_count = view_count

class ProductReadModelRepository:
    """Repository for product read models"""
    
    def get_by_id(self, product_id: int) -> Optional[ProductReadModel]:
        """Get product read model by ID"""
        try:
            # Use optimized read model table
            data = ProductReadModelTable.objects.get(product_id=product_id)
            return self._to_read_model(data)
        except ProductReadModelTable.DoesNotExist:
            return None
    
    def search(self, filters: dict, page: int, page_size: int) -> List[ProductReadModel]:
        """Search products with filters"""
        queryset = ProductReadModelTable.objects.filter(is_active=True)
        
        if 'search_term' in filters:
            queryset = queryset.filter(
                Q(name__icontains=filters['search_term']) |
                Q(description__icontains=filters['search_term'])
            )
        
        if 'category_id' in filters:
            queryset = queryset.filter(category_id=filters['category_id'])
        
        if 'min_price' in filters:
            queryset = queryset.filter(price__gte=filters['min_price'])
        
        if 'max_price' in filters:
            queryset = queryset.filter(price__lte=filters['max_price'])
        
        # Pagination
        offset = (page - 1) * page_size
        queryset = queryset[offset:offset + page_size]
        
        return [self._to_read_model(data) for data in queryset]
    
    def count(self, filters: dict) -> int:
        """Count products matching filters"""
        # Similar filtering logic as search
        queryset = ProductReadModelTable.objects.filter(is_active=True)
        # Apply filters...
        return queryset.count()
    
    def _to_read_model(self, data) -> ProductReadModel:
        """Convert table data to read model"""
        return ProductReadModel(
            product_id=data.product_id,
            name=data.name,
            description=data.description,
            price=data.price,
            category_name=data.category_name,
            is_active=data.is_active,
            created_at=data.created_at,
            average_rating=data.average_rating,
            review_count=data.review_count,
            view_count=data.view_count
        )

# Read Model Table (Denormalized for performance)
class ProductReadModelTable(models.Model):
    """Denormalized table for product read operations"""
    
    product_id = models.IntegerField(unique=True, db_index=True)
    name = models.CharField(max_length=200, db_index=True)
    description = models.TextField()
    price = models.DecimalField(max_digits=10, decimal_places=2, db_index=True)
    category_id = models.IntegerField(db_index=True)
    category_name = models.CharField(max_length=100)
    is_active = models.BooleanField(default=True, db_index=True)
    created_at = models.DateTimeField(db_index=True)
    average_rating = models.DecimalField(max_digits=3, decimal_places=2, null=True)
    review_count = models.IntegerField(default=0)
    view_count = models.IntegerField(default=0)
    
    class Meta:
        indexes = [
            models.Index(fields=['category_id', 'is_active']),
            models.Index(fields=['price', 'is_active']),
            models.Index(fields=['name', 'is_active']),
        ]

# Event Store for CQRS
class Event:
    """Base class for domain events"""
    
    def __init__(self, event_type: str, aggregate_id: int, 
                 event_data: dict, occurred_at: datetime):
        self.event_type = event_type
        self.aggregate_id = aggregate_id
        self.event_data = event_data
        self.occurred_at = occurred_at

class EventStore:
    """Store for domain events"""
    
    def append(self, event: Event) -> None:
        """Append event to store"""
        EventStoreTable.objects.create(
            event_type=event.event_type,
            aggregate_id=event.aggregate_id,
            event_data=event.event_data,
            occurred_at=event.occurred_at
        )
    
    def get_events(self, aggregate_id: int) -> List[Event]:
        """Get all events for an aggregate"""
        events = EventStoreTable.objects.filter(
            aggregate_id=aggregate_id
        ).order_by('occurred_at')
        
        return [
            Event(
                event_type=e.event_type,
                aggregate_id=e.aggregate_id,
                event_data=e.event_data,
                occurred_at=e.occurred_at
            )
            for e in events
        ]

class EventStoreTable(models.Model):
    """Table for storing domain events"""
    
    event_type = models.CharField(max_length=100)
    aggregate_id = models.IntegerField(db_index=True)
    event_data = models.JSONField()
    occurred_at = models.DateTimeField(db_index=True)
    
    class Meta:
        indexes = [
            models.Index(fields=['aggregate_id', 'occurred_at']),
            models.Index(fields=['event_type', 'occurred_at']),
        ]

# Command and Query Bus
class CommandBus:
    """Bus for dispatching commands to handlers"""
    
    def __init__(self):
        self.handlers = {}
    
    def register_handler(self, command_type: type, handler: CommandHandler):
        """Register command handler"""
        self.handlers[command_type] = handler
    
    def dispatch(self, command) -> any:
        """Dispatch command to appropriate handler"""
        command_type = type(command)
        handler = self.handlers.get(command_type)
        
        if not handler:
            raise ValueError(f"No handler registered for {command_type}")
        
        return handler.handle(command)

class QueryBus:
    """Bus for dispatching queries to handlers"""
    
    def __init__(self):
        self.handlers = {}
    
    def register_handler(self, query_type: type, handler: QueryHandler):
        """Register query handler"""
        self.handlers[query_type] = handler
    
    def dispatch(self, query) -> any:
        """Dispatch query to appropriate handler"""
        query_type = type(query)
        handler = self.handlers.get(query_type)
        
        if not handler:
            raise ValueError(f"No handler registered for {query_type}")
        
        return handler.handle(query)

# Configuration
def configure_cqrs():
    """Configure CQRS command and query buses"""
    # Command bus setup
    command_bus = CommandBus()
    
    product_repo = DjangoProductRepository()
    event_store = EventStore()
    
    command_bus.register_handler(
        CreateProductCommand,
        CreateProductCommandHandler(product_repo, event_store)
    )
    command_bus.register_handler(
        UpdateProductPriceCommand,
        UpdateProductPriceCommandHandler(product_repo, event_store)
    )
    
    # Query bus setup
    query_bus = QueryBus()
    
    read_model_repo = ProductReadModelRepository()
    
    query_bus.register_handler(
        GetProductQuery,
        GetProductQueryHandler(read_model_repo)
    )
    query_bus.register_handler(
        SearchProductsQuery,
        SearchProductsQueryHandler(read_model_repo)
    )
    
    return command_bus, query_bus

# Usage in views
def create_product_view(request):
    """View for creating products using CQRS"""
    command_bus, _ = configure_cqrs()
    
    command = CreateProductCommand(
        name=request.data['name'],
        description=request.data['description'],
        price=Decimal(request.data['price']),
        category_id=request.data['category_id'],
        user_id=request.user.id
    )
    
    try:
        product_id = command_bus.dispatch(command)
        return JsonResponse({'product_id': product_id}, status=201)
    except BusinessRuleError as e:
        return JsonResponse({'error': str(e)}, status=400)

def search_products_view(request):
    """View for searching products using CQRS"""
    _, query_bus = configure_cqrs()
    
    query = SearchProductsQuery(
        search_term=request.GET.get('q', ''),
        category_id=request.GET.get('category_id'),
        min_price=request.GET.get('min_price'),
        max_price=request.GET.get('max_price'),
        page=int(request.GET.get('page', 1)),
        page_size=int(request.GET.get('page_size', 20))
    )
    
    result = query_bus.dispatch(query)
    
    return JsonResponse({
        'products': [
            {
                'id': p.product_id,
                'name': p.name,
                'price': str(p.price),
                'category': p.category_name,
                'rating': str(p.average_rating) if p.average_rating else None,
                'review_count': p.review_count
            }
            for p in result['products']
        ],
        'pagination': {
            'total_count': result['total_count'],
            'page': result['page'],
            'page_size': result['page_size'],
            'total_pages': result['total_pages']
        }
    })

Event Sourcing

Event Sourcing stores all changes to application state as a sequence of events. This pattern provides complete audit trails and enables powerful features like time travel and event replay.

Event Sourcing Implementation

# Domain Events
from dataclasses import dataclass
from datetime import datetime
from decimal import Decimal
from typing import List, Optional

@dataclass
class DomainEvent:
    """Base class for domain events"""
    aggregate_id: int
    event_id: str
    occurred_at: datetime
    version: int

@dataclass
class ProductCreatedEvent(DomainEvent):
    """Event for product creation"""
    name: str
    description: str
    price: Decimal
    category_id: int
    created_by: int

@dataclass
class ProductPriceChangedEvent(DomainEvent):
    """Event for product price changes"""
    old_price: Decimal
    new_price: Decimal
    changed_by: int

@dataclass
class ProductDeactivatedEvent(DomainEvent):
    """Event for product deactivation"""
    reason: str
    deactivated_by: int

# Event-Sourced Aggregate
class ProductAggregate:
    """Product aggregate using event sourcing"""
    
    def __init__(self, product_id: int):
        self.id = product_id
        self.name = None
        self.description = None
        self.price = None
        self.category_id = None
        self.is_active = True
        self.version = 0
        self.uncommitted_events = []
    
    @classmethod
    def create(cls, product_id: int, name: str, description: str,
               price: Decimal, category_id: int, created_by: int) -> 'ProductAggregate':
        """Create new product aggregate"""
        aggregate = cls(product_id)
        
        event = ProductCreatedEvent(
            aggregate_id=product_id,
            event_id=str(uuid.uuid4()),
            occurred_at=timezone.now(),
            version=1,
            name=name,
            description=description,
            price=price,
            category_id=category_id,
            created_by=created_by
        )
        
        aggregate._apply_event(event)
        aggregate.uncommitted_events.append(event)
        
        return aggregate
    
    def change_price(self, new_price: Decimal, changed_by: int):
        """Change product price"""
        if new_price <= 0:
            raise ValueError("Price must be positive")
        
        if new_price == self.price:
            return  # No change needed
        
        old_price = self.price
        
        event = ProductPriceChangedEvent(
            aggregate_id=self.id,
            event_id=str(uuid.uuid4()),
            occurred_at=timezone.now(),
            version=self.version + 1,
            old_price=old_price,
            new_price=new_price,
            changed_by=changed_by
        )
        
        self._apply_event(event)
        self.uncommitted_events.append(event)
    
    def deactivate(self, reason: str, deactivated_by: int):
        """Deactivate product"""
        if not self.is_active:
            raise ValueError("Product is already deactivated")
        
        event = ProductDeactivatedEvent(
            aggregate_id=self.id,
            event_id=str(uuid.uuid4()),
            occurred_at=timezone.now(),
            version=self.version + 1,
            reason=reason,
            deactivated_by=deactivated_by
        )
        
        self._apply_event(event)
        self.uncommitted_events.append(event)
    
    def load_from_history(self, events: List[DomainEvent]):
        """Rebuild aggregate from event history"""
        for event in events:
            self._apply_event(event)
    
    def mark_events_as_committed(self):
        """Mark uncommitted events as committed"""
        self.uncommitted_events.clear()
    
    def _apply_event(self, event: DomainEvent):
        """Apply event to aggregate state"""
        if isinstance(event, ProductCreatedEvent):
            self.name = event.name
            self.description = event.description
            self.price = event.price
            self.category_id = event.category_id
            self.is_active = True
        
        elif isinstance(event, ProductPriceChangedEvent):
            self.price = event.new_price
        
        elif isinstance(event, ProductDeactivatedEvent):
            self.is_active = False
        
        self.version = event.version

# Event Store
class EventStore:
    """Store for domain events with event sourcing"""
    
    def save_events(self, aggregate_id: int, events: List[DomainEvent],
                   expected_version: int) -> None:
        """Save events with optimistic concurrency control"""
        with transaction.atomic():
            # Check current version
            current_version = self._get_current_version(aggregate_id)
            
            if current_version != expected_version:
                raise ConcurrencyError(
                    f"Expected version {expected_version}, "
                    f"but current version is {current_version}"
                )
            
            # Save events
            for event in events:
                EventStoreRecord.objects.create(
                    aggregate_id=event.aggregate_id,
                    event_id=event.event_id,
                    event_type=event.__class__.__name__,
                    event_data=self._serialize_event(event),
                    version=event.version,
                    occurred_at=event.occurred_at
                )
    
    def get_events(self, aggregate_id: int, from_version: int = 0) -> List[DomainEvent]:
        """Get events for aggregate from specific version"""
        records = EventStoreRecord.objects.filter(
            aggregate_id=aggregate_id,
            version__gt=from_version
        ).order_by('version')
        
        return [self._deserialize_event(record) for record in records]
    
    def get_all_events(self, from_timestamp: Optional[datetime] = None) -> List[DomainEvent]:
        """Get all events from specific timestamp"""
        queryset = EventStoreRecord.objects.all()
        
        if from_timestamp:
            queryset = queryset.filter(occurred_at__gte=from_timestamp)
        
        return [self._deserialize_event(record) for record in queryset.order_by('occurred_at')]
    
    def _get_current_version(self, aggregate_id: int) -> int:
        """Get current version of aggregate"""
        latest_event = EventStoreRecord.objects.filter(
            aggregate_id=aggregate_id
        ).order_by('-version').first()
        
        return latest_event.version if latest_event else 0
    
    def _serialize_event(self, event: DomainEvent) -> dict:
        """Serialize event to dictionary"""
        return {
            'aggregate_id': event.aggregate_id,
            'event_id': event.event_id,
            'occurred_at': event.occurred_at.isoformat(),
            'version': event.version,
            **{k: v for k, v in event.__dict__.items() 
               if k not in ['aggregate_id', 'event_id', 'occurred_at', 'version']}
        }
    
    def _deserialize_event(self, record) -> DomainEvent:
        """Deserialize event from record"""
        event_data = record.event_data
        event_class = globals()[record.event_type]  # Get event class by name
        
        return event_class(
            aggregate_id=record.aggregate_id,
            event_id=record.event_id,
            occurred_at=record.occurred_at,
            version=record.version,
            **{k: v for k, v in event_data.items() 
               if k not in ['aggregate_id', 'event_id', 'occurred_at', 'version']}
        )

class EventStoreRecord(models.Model):
    """Database table for event store"""
    
    aggregate_id = models.IntegerField(db_index=True)
    event_id = models.CharField(max_length=36, unique=True)
    event_type = models.CharField(max_length=100)
    event_data = models.JSONField()
    version = models.IntegerField()
    occurred_at = models.DateTimeField(db_index=True)
    
    class Meta:
        indexes = [
            models.Index(fields=['aggregate_id', 'version']),
            models.Index(fields=['event_type', 'occurred_at']),
        ]
        unique_together = [['aggregate_id', 'version']]

# Repository for Event-Sourced Aggregates
class ProductAggregateRepository:
    """Repository for product aggregates using event sourcing"""
    
    def __init__(self, event_store: EventStore):
        self.event_store = event_store
    
    def get_by_id(self, product_id: int) -> Optional[ProductAggregate]:
        """Get product aggregate by ID"""
        events = self.event_store.get_events(product_id)
        
        if not events:
            return None
        
        aggregate = ProductAggregate(product_id)
        aggregate.load_from_history(events)
        
        return aggregate
    
    def save(self, aggregate: ProductAggregate) -> None:
        """Save aggregate by storing uncommitted events"""
        if not aggregate.uncommitted_events:
            return
        
        expected_version = aggregate.version - len(aggregate.uncommitted_events)
        
        self.event_store.save_events(
            aggregate.id,
            aggregate.uncommitted_events,
            expected_version
        )
        
        aggregate.mark_events_as_committed()

# Event Projections for Read Models
class ProductProjection:
    """Projection that builds read models from events"""
    
    def __init__(self):
        self.handlers = {
            ProductCreatedEvent: self._handle_product_created,
            ProductPriceChangedEvent: self._handle_price_changed,
            ProductDeactivatedEvent: self._handle_product_deactivated,
        }
    
    def project_event(self, event: DomainEvent):
        """Project event to read model"""
        handler = self.handlers.get(type(event))
        if handler:
            handler(event)
    
    def _handle_product_created(self, event: ProductCreatedEvent):
        """Handle product created event"""
        ProductReadModel.objects.create(
            product_id=event.aggregate_id,
            name=event.name,
            description=event.description,
            price=event.price,
            category_id=event.category_id,
            is_active=True,
            created_at=event.occurred_at
        )
    
    def _handle_price_changed(self, event: ProductPriceChangedEvent):
        """Handle price changed event"""
        ProductReadModel.objects.filter(
            product_id=event.aggregate_id
        ).update(price=event.new_price)
    
    def _handle_product_deactivated(self, event: ProductDeactivatedEvent):
        """Handle product deactivated event"""
        ProductReadModel.objects.filter(
            product_id=event.aggregate_id
        ).update(is_active=False)

# Event Processing
class EventProcessor:
    """Process events for projections and side effects"""
    
    def __init__(self, event_store: EventStore):
        self.event_store = event_store
        self.projections = []
        self.event_handlers = []
    
    def add_projection(self, projection):
        """Add projection to process events"""
        self.projections.append(projection)
    
    def add_event_handler(self, handler):
        """Add event handler for side effects"""
        self.event_handlers.append(handler)
    
    def process_new_events(self, from_timestamp: Optional[datetime] = None):
        """Process new events"""
        events = self.event_store.get_all_events(from_timestamp)
        
        for event in events:
            # Update projections
            for projection in self.projections:
                projection.project_event(event)
            
            # Handle side effects
            for handler in self.event_handlers:
                handler.handle(event)

# Usage Example
def create_product_with_event_sourcing(request):
    """Create product using event sourcing"""
    event_store = EventStore()
    repo = ProductAggregateRepository(event_store)
    
    # Create aggregate
    product_id = generate_product_id()
    aggregate = ProductAggregate.create(
        product_id=product_id,
        name=request.data['name'],
        description=request.data['description'],
        price=Decimal(request.data['price']),
        category_id=request.data['category_id'],
        created_by=request.user.id
    )
    
    # Save aggregate (stores events)
    repo.save(aggregate)
    
    # Process events for projections
    processor = EventProcessor(event_store)
    processor.add_projection(ProductProjection())
    processor.process_new_events()
    
    return JsonResponse({'product_id': product_id}, status=201)

Architecture Pattern Selection Guide

When to Use Each Pattern

Layered Architecture

  • Traditional web applications
  • Clear separation of concerns needed
  • Team familiar with MVC patterns
  • Moderate complexity applications

Hexagonal Architecture

  • Multiple external integrations
  • Need for high testability
  • Complex business logic
  • Long-term maintainability critical

Clean Architecture

  • Enterprise applications
  • Complex business rules
  • Multiple delivery mechanisms
  • Framework independence required

CQRS

  • Different read/write performance requirements
  • Complex reporting needs
  • High-scale applications
  • Event-driven architectures

Event Sourcing

  • Complete audit trail required
  • Time-travel functionality needed
  • Complex business processes
  • High-reliability systems

Combining Patterns

# Example: CQRS + Event Sourcing + Clean Architecture
class OrderApplicationService:
    """Application service combining multiple patterns"""
    
    def __init__(self, 
                 command_bus: CommandBus,
                 query_bus: QueryBus,
                 event_store: EventStore):
        self.command_bus = command_bus
        self.query_bus = query_bus
        self.event_store = event_store
    
    def place_order(self, command: PlaceOrderCommand) -> OrderId:
        """Place order using CQRS and event sourcing"""
        # Use command bus for write operations
        order_id = self.command_bus.dispatch(command)
        
        # Events are automatically stored in event store
        # Projections are updated asynchronously
        
        return order_id
    
    def get_order_details(self, order_id: int) -> OrderDetails:
        """Get order details using CQRS query side"""
        query = GetOrderDetailsQuery(order_id=order_id)
        return self.query_bus.dispatch(query)
    
    def get_order_history(self, order_id: int) -> List[OrderEvent]:
        """Get complete order history from event store"""
        return self.event_store.get_events(order_id)

System architecture patterns provide the foundation for building maintainable, scalable Django applications. Choose patterns based on your specific requirements, team expertise, and long-term goals. Start with simpler patterns like layered architecture and evolve to more sophisticated patterns as your application grows in complexity.

The key is understanding that these patterns are tools to solve specific problems. Use them judiciously, and don't over-engineer solutions for simple problems. The best architecture is the one that solves your current problems while providing a clear path for future evolution.