Advanced and Expert Topics

Extending Django's Core

Django's architecture allows deep customization and extension of its core components. This comprehensive guide covers advanced techniques for extending Django's ORM, admin interface, authentication system, and other core components to create powerful, customized functionality that integrates seamlessly with the framework.

Extending Django's Core

Django's architecture allows deep customization and extension of its core components. This comprehensive guide covers advanced techniques for extending Django's ORM, admin interface, authentication system, and other core components to create powerful, customized functionality that integrates seamlessly with the framework.

Custom Model Fields

Custom model fields allow you to store and manipulate specialized data types that aren't supported by Django's built-in fields.

# Custom field for storing encrypted data
import json
from cryptography.fernet import Fernet
from django.db import models
from django.core.exceptions import ValidationError
from django.conf import settings

class EncryptedField(models.TextField):
    """Field that automatically encrypts/decrypts data"""
    
    def __init__(self, *args, **kwargs):
        self.encryption_key = kwargs.pop('encryption_key', None)
        super().__init__(*args, **kwargs)
        
        if not self.encryption_key:
            self.encryption_key = getattr(settings, 'FIELD_ENCRYPTION_KEY', None)
        
        if not self.encryption_key:
            raise ValueError("Encryption key is required")
        
        self.cipher = Fernet(self.encryption_key.encode())
    
    def from_db_value(self, value, expression, connection):
        """Decrypt value when loading from database"""
        if value is None:
            return value
        
        try:
            return self.cipher.decrypt(value.encode()).decode()
        except Exception:
            # Return original value if decryption fails
            return value
    
    def to_python(self, value):
        """Convert value to Python type"""
        if isinstance(value, str) or value is None:
            return value
        return str(value)
    
    def get_prep_value(self, value):
        """Encrypt value before saving to database"""
        if value is None:
            return value
        
        try:
            return self.cipher.encrypt(str(value).encode()).decode()
        except Exception as e:
            raise ValidationError(f"Encryption failed: {e}")

# Custom field for storing JSON with validation
class ValidatedJSONField(models.JSONField):
    """JSON field with schema validation"""
    
    def __init__(self, schema=None, *args, **kwargs):
        self.schema = schema
        super().__init__(*args, **kwargs)
    
    def validate(self, value, model_instance):
        """Validate JSON against schema"""
        super().validate(value, model_instance)
        
        if self.schema and value is not None:
            try:
                import jsonschema
                jsonschema.validate(value, self.schema)
            except jsonschema.ValidationError as e:
                raise ValidationError(f"JSON validation failed: {e.message}")
            except ImportError:
                # jsonschema not installed, skip validation
                pass

# Custom field for storing money amounts
from decimal import Decimal

class MoneyField(models.DecimalField):
    """Field for storing monetary amounts with currency"""
    
    def __init__(self, currency_field=None, *args, **kwargs):
        self.currency_field = currency_field or 'currency'
        kwargs.setdefault('max_digits', 10)
        kwargs.setdefault('decimal_places', 2)
        super().__init__(*args, **kwargs)
    
    def contribute_to_class(self, cls, name, **kwargs):
        """Add currency field to model if it doesn't exist"""
        super().contribute_to_class(cls, name, **kwargs)
        
        # Add currency field if it doesn't exist
        if not hasattr(cls, self.currency_field):
            currency_field = models.CharField(
                max_length=3,
                default='USD',
                help_text='Currency code (ISO 4217)'
            )
            currency_field.contribute_to_class(cls, self.currency_field)
    
    def from_db_value(self, value, expression, connection):
        """Return Money object instead of Decimal"""
        if value is None:
            return None
        return Money(amount=value, currency='USD')  # Currency loaded separately

class Money:
    """Money value object"""
    
    def __init__(self, amount, currency='USD'):
        self.amount = Decimal(str(amount))
        self.currency = currency
    
    def __str__(self):
        return f"{self.amount} {self.currency}"
    
    def __add__(self, other):
        if isinstance(other, Money):
            if self.currency != other.currency:
                raise ValueError("Cannot add different currencies")
            return Money(self.amount + other.amount, self.currency)
        return Money(self.amount + Decimal(str(other)), self.currency)

# Usage example
class Product(models.Model):
    name = models.CharField(max_length=200)
    price = MoneyField()  # Automatically adds 'currency' field
    secret_data = EncryptedField()
    metadata = ValidatedJSONField(schema={
        'type': 'object',
        'properties': {
            'weight': {'type': 'number'},
            'dimensions': {
                'type': 'object',
                'properties': {
                    'length': {'type': 'number'},
                    'width': {'type': 'number'},
                    'height': {'type': 'number'}
                }
            }
        }
    })

Custom Database Backends

Custom database backends allow you to add support for new databases or modify existing database behavior.

# Custom database backend for read/write splitting
from django.db.backends.postgresql import base
from django.db.backends.postgresql.base import DatabaseWrapper as PostgreSQLDatabaseWrapper
import random

class ReadWriteSplitDatabaseWrapper(PostgreSQLDatabaseWrapper):
    """Database wrapper that splits reads and writes"""
    
    def __init__(self, settings_dict, alias=None):
        super().__init__(settings_dict, alias)
        
        # Configure read replicas
        self.read_replicas = settings_dict.get('READ_REPLICAS', [])
        self.write_db = settings_dict
    
    def _cursor(self, name=None):
        """Return cursor for read or write operations"""
        # Determine if this is a read or write operation
        if self._is_read_operation():
            # Use read replica
            if self.read_replicas:
                replica_config = random.choice(self.read_replicas)
                # Create connection to read replica
                return self._create_replica_cursor(replica_config)
        
        # Use write database
        return super()._cursor(name)
    
    def _is_read_operation(self):
        """Determine if current operation is read-only"""
        # This is a simplified check - in practice, you'd need
        # more sophisticated query analysis
        import inspect
        
        frame = inspect.currentframe()
        while frame:
            if 'select' in str(frame.f_code.co_name).lower():
                return True
            if any(op in str(frame.f_code.co_name).lower() 
                   for op in ['insert', 'update', 'delete', 'create']):
                return False
            frame = frame.f_back
        
        return False
    
    def _create_replica_cursor(self, replica_config):
        """Create cursor for read replica"""
        # Implementation would create connection to replica
        # This is simplified for demonstration
        pass

# Custom database operations
from django.db.backends.postgresql.operations import DatabaseOperations

class CustomDatabaseOperations(DatabaseOperations):
    """Custom database operations with additional functions"""
    
    def __init__(self, connection):
        super().__init__(connection)
        # Add custom SQL functions
        self.custom_functions = {
            'SIMILARITY': 'similarity(%s, %s)',
            'LEVENSHTEIN': 'levenshtein(%s, %s)',
            'METAPHONE': 'metaphone(%s)',
        }
    
    def sql_function(self, function_name, *args):
        """Generate SQL for custom functions"""
        if function_name in self.custom_functions:
            template = self.custom_functions[function_name]
            return template % args
        
        return super().sql_function(function_name, *args)

# Database backend configuration
# settings.py
DATABASES = {
    'default': {
        'ENGINE': 'myapp.db_backends.read_write_split',
        'NAME': 'myapp_write',
        'USER': 'myuser',
        'PASSWORD': 'mypass',
        'HOST': 'write-db.example.com',
        'PORT': '5432',
        'READ_REPLICAS': [
            {
                'NAME': 'myapp_read1',
                'HOST': 'read1-db.example.com',
                'PORT': '5432',
            },
            {
                'NAME': 'myapp_read2',
                'HOST': 'read2-db.example.com',
                'PORT': '5432',
            }
        ]
    }
}

Custom Authentication Backends

Custom authentication backends enable integration with external authentication systems and custom authentication logic.

# Multi-factor authentication backend
from django.contrib.auth.backends import BaseBackend
from django.contrib.auth.models import User
from django.core.cache import cache
import pyotp
import qrcode
from io import BytesIO
import base64

class MultiFactorAuthBackend(BaseBackend):
    """Authentication backend with MFA support"""
    
    def authenticate(self, request, username=None, password=None, mfa_token=None, **kwargs):
        """Authenticate user with MFA"""
        try:
            user = User.objects.get(username=username)
        except User.DoesNotExist:
            return None
        
        # Check password
        if not user.check_password(password):
            return None
        
        # Check if MFA is required
        if self.requires_mfa(user):
            if not mfa_token:
                # Store partial authentication in cache
                cache.set(f'mfa_pending_{username}', True, timeout=300)
                return None
            
            # Verify MFA token
            if not self.verify_mfa_token(user, mfa_token):
                return None
            
            # Clear MFA pending status
            cache.delete(f'mfa_pending_{username}')
        
        return user
    
    def requires_mfa(self, user):
        """Check if user requires MFA"""
        return hasattr(user, 'mfa_profile') and user.mfa_profile.is_enabled
    
    def verify_mfa_token(self, user, token):
        """Verify MFA token"""
        if not hasattr(user, 'mfa_profile'):
            return False
        
        totp = pyotp.TOTP(user.mfa_profile.secret_key)
        return totp.verify(token, valid_window=1)
    
    def get_user(self, user_id):
        """Get user by ID"""
        try:
            return User.objects.get(pk=user_id)
        except User.DoesNotExist:
            return None

# MFA Profile model
class MFAProfile(models.Model):
    """Multi-factor authentication profile"""
    
    user = models.OneToOneField(User, on_delete=models.CASCADE, related_name='mfa_profile')
    secret_key = models.CharField(max_length=32, blank=True)
    is_enabled = models.BooleanField(default=False)
    backup_codes = models.JSONField(default=list)
    created_at = models.DateTimeField(auto_now_add=True)
    
    def generate_secret_key(self):
        """Generate new secret key"""
        self.secret_key = pyotp.random_base32()
        self.save()
    
    def get_qr_code(self):
        """Generate QR code for MFA setup"""
        if not self.secret_key:
            self.generate_secret_key()
        
        totp = pyotp.TOTP(self.secret_key)
        provisioning_uri = totp.provisioning_uri(
            name=self.user.email,
            issuer_name="MyApp"
        )
        
        qr = qrcode.QRCode(version=1, box_size=10, border=5)
        qr.add_data(provisioning_uri)
        qr.make(fit=True)
        
        img = qr.make_image(fill_color="black", back_color="white")
        buffer = BytesIO()
        img.save(buffer, format='PNG')
        
        return base64.b64encode(buffer.getvalue()).decode()
    
    def generate_backup_codes(self):
        """Generate backup codes"""
        import secrets
        codes = [secrets.token_hex(4).upper() for _ in range(10)]
        self.backup_codes = codes
        self.save()
        return codes

# LDAP authentication backend
import ldap
from django_auth_ldap.backend import LDAPBackend

class CustomLDAPBackend(LDAPBackend):
    """Custom LDAP backend with additional features"""
    
    def authenticate_ldap_user(self, ldap_user, password):
        """Authenticate LDAP user with custom logic"""
        user = super().authenticate_ldap_user(ldap_user, password)
        
        if user:
            # Sync additional LDAP attributes
            self.sync_user_attributes(user, ldap_user)
            
            # Check group membership
            if not self.check_group_membership(ldap_user):
                return None
        
        return user
    
    def sync_user_attributes(self, user, ldap_user):
        """Sync additional attributes from LDAP"""
        ldap_attrs = ldap_user.attrs
        
        # Sync department
        if 'department' in ldap_attrs:
            user.profile.department = ldap_attrs['department'][0]
        
        # Sync manager
        if 'manager' in ldap_attrs:
            manager_dn = ldap_attrs['manager'][0]
            # Look up manager user
            try:
                manager = User.objects.get(username=self.extract_username_from_dn(manager_dn))
                user.profile.manager = manager
            except User.DoesNotExist:
                pass
        
        user.profile.save()
    
    def check_group_membership(self, ldap_user):
        """Check if user is member of required groups"""
        required_groups = getattr(settings, 'LDAP_REQUIRED_GROUPS', [])
        
        if not required_groups:
            return True
        
        user_groups = ldap_user.group_names
        return any(group in user_groups for group in required_groups)

Custom Admin Interface Components

Extend Django's admin interface with custom functionality and improved user experience.

# Custom admin actions
from django.contrib import admin
from django.http import HttpResponse
from django.shortcuts import render
import csv

class ExportMixin:
    """Mixin for exporting model data"""
    
    def export_as_csv(self, request, queryset):
        """Export selected objects as CSV"""
        meta = self.model._meta
        field_names = [field.name for field in meta.fields]
        
        response = HttpResponse(content_type='text/csv')
        response['Content-Disposition'] = f'attachment; filename={meta}.csv'
        
        writer = csv.writer(response)
        writer.writerow(field_names)
        
        for obj in queryset:
            writer.writerow([getattr(obj, field) for field in field_names])
        
        return response
    
    export_as_csv.short_description = "Export selected as CSV"

class BulkUpdateMixin:
    """Mixin for bulk updating objects"""
    
    def bulk_update_form(self, request, queryset):
        """Show form for bulk updating"""
        if request.method == 'POST':
            # Process bulk update
            update_fields = {}
            for key, value in request.POST.items():
                if key.startswith('bulk_') and value:
                    field_name = key[5:]  # Remove 'bulk_' prefix
                    update_fields[field_name] = value
            
            if update_fields:
                queryset.update(**update_fields)
                self.message_user(request, f"Updated {queryset.count()} objects")
            
            return None
        
        # Show form
        context = {
            'queryset': queryset,
            'model_fields': [f for f in self.model._meta.fields if f.editable],
        }
        return render(request, 'admin/bulk_update_form.html', context)
    
    bulk_update_form.short_description = "Bulk update selected"

# Custom admin widgets
from django import forms
from django.contrib.admin.widgets import AdminFileWidget

class ImagePreviewWidget(AdminFileWidget):
    """Widget that shows image preview"""
    
    def render(self, name, value, attrs=None, renderer=None):
        output = super().render(name, value, attrs, renderer)
        
        if value and hasattr(value, 'url'):
            output += f'''
            <div style="margin-top: 10px;">
                <img src="{value.url}" style="max-width: 200px; max-height: 200px;" />
            </div>
            '''
        
        return output

class ColorPickerWidget(forms.TextInput):
    """Color picker widget"""
    
    def __init__(self, attrs=None):
        default_attrs = {'type': 'color'}
        if attrs:
            default_attrs.update(attrs)
        super().__init__(default_attrs)

# Advanced admin class
@admin.register(Product)
class ProductAdmin(admin.ModelAdmin, ExportMixin, BulkUpdateMixin):
    """Advanced product admin"""
    
    list_display = ['name', 'price', 'category', 'is_active', 'created_at']
    list_filter = ['category', 'is_active', 'created_at']
    search_fields = ['name', 'description']
    list_editable = ['price', 'is_active']
    
    actions = ['export_as_csv', 'bulk_update_form', 'make_active', 'make_inactive']
    
    fieldsets = (
        ('Basic Information', {
            'fields': ('name', 'description', 'category')
        }),
        ('Pricing', {
            'fields': ('price', 'currency'),
            'classes': ('collapse',)
        }),
        ('Status', {
            'fields': ('is_active', 'featured'),
        }),
        ('Metadata', {
            'fields': ('created_at', 'updated_at'),
            'classes': ('collapse',),
        }),
    )
    
    readonly_fields = ['created_at', 'updated_at']
    
    def make_active(self, request, queryset):
        """Make selected products active"""
        updated = queryset.update(is_active=True)
        self.message_user(request, f"Made {updated} products active")
    
    def make_inactive(self, request, queryset):
        """Make selected products inactive"""
        updated = queryset.update(is_active=False)
        self.message_user(request, f"Made {updated} products inactive")
    
    def get_queryset(self, request):
        """Optimize queryset"""
        return super().get_queryset(request).select_related('category')
    
    def formfield_for_dbfield(self, db_field, request, **kwargs):
        """Customize form fields"""
        if db_field.name == 'image':
            kwargs['widget'] = ImagePreviewWidget
        elif db_field.name == 'color':
            kwargs['widget'] = ColorPickerWidget
        
        return super().formfield_for_dbfield(db_field, request, **kwargs)

# Custom admin site
class CustomAdminSite(admin.AdminSite):
    """Custom admin site with additional features"""
    
    site_header = "MyApp Administration"
    site_title = "MyApp Admin"
    index_title = "Welcome to MyApp Administration"
    
    def index(self, request, extra_context=None):
        """Custom admin index with dashboard"""
        extra_context = extra_context or {}
        
        # Add dashboard data
        extra_context.update({
            'total_users': User.objects.count(),
            'total_products': Product.objects.count(),
            'recent_orders': Order.objects.order_by('-created_at')[:5],
        })
        
        return super().index(request, extra_context)

# Register custom admin site
custom_admin_site = CustomAdminSite(name='custom_admin')
custom_admin_site.register(User, UserAdmin)
custom_admin_site.register(Product, ProductAdmin)

Extending Django's core components allows you to create highly customized applications that leverage Django's architecture while adding specialized functionality. The key is understanding Django's extension points and following established patterns to ensure your customizations integrate seamlessly with the framework.