Advanced and Expert Topics

Custom ORM Expressions

Django's ORM provides powerful expression APIs that allow you to create custom database expressions, functions, and operations. This enables sophisticated database queries while maintaining the benefits of Django's ORM abstraction.

Custom ORM Expressions

Django's ORM provides powerful expression APIs that allow you to create custom database expressions, functions, and operations. This enables sophisticated database queries while maintaining the benefits of Django's ORM abstraction.

Understanding ORM Expressions

ORM expressions represent database operations that can be combined, reused, and optimized by Django's query compiler. They provide a way to generate complex SQL while staying within Django's ORM framework.

Basic Expression Types

from django.db.models import Expression, Value, F, Case, When
from django.db.models.functions import Coalesce, Greatest, Least

# Custom expression for calculating age
class Age(Expression):
    """Calculate age from birth date"""
    
    def __init__(self, birth_date_field):
        super().__init__()
        self.birth_date_field = birth_date_field
    
    def as_sql(self, compiler, connection):
        """Generate SQL for age calculation"""
        birth_date_sql, birth_date_params = compiler.compile(self.birth_date_field)
        
        if connection.vendor == 'postgresql':
            sql = f"EXTRACT(YEAR FROM AGE({birth_date_sql}))"
        elif connection.vendor == 'mysql':
            sql = f"TIMESTAMPDIFF(YEAR, {birth_date_sql}, NOW())"
        elif connection.vendor == 'sqlite':
            sql = f"(strftime('%Y', 'now') - strftime('%Y', {birth_date_sql}))"
        else:
            raise NotImplementedError(f"Age calculation not supported for {connection.vendor}")
        
        return sql, birth_date_params

# Custom function for full-text search
class FullTextSearch(Expression):
    """PostgreSQL full-text search expression"""
    
    def __init__(self, search_vector, query, config='english'):
        super().__init__()
        self.search_vector = search_vector
        self.query = query
        self.config = config
    
    def as_sql(self, compiler, connection):
        """Generate SQL for full-text search"""
        if connection.vendor != 'postgresql':
            raise NotImplementedError("Full-text search only supported on PostgreSQL")
        
        vector_sql, vector_params = compiler.compile(self.search_vector)
        query_sql, query_params = compiler.compile(self.query)
        
        sql = f"to_tsvector('{self.config}', {vector_sql}) @@ plainto_tsquery('{self.config}', {query_sql})"
        params = vector_params + query_params
        
        return sql, params

# Custom aggregation function
from django.db.models import Aggregate

class StringAgg(Aggregate):
    """String aggregation function"""
    
    function = 'STRING_AGG'
    template = '%(function)s(%(expressions)s, %(separator)s)'
    
    def __init__(self, expression, separator=',', **extra):
        super().__init__(
            expression,
            separator=Value(separator),
            **extra
        )
    
    def as_sql(self, compiler, connection):
        """Generate database-specific SQL"""
        if connection.vendor == 'postgresql':
            return super().as_sql(compiler, connection)
        elif connection.vendor == 'mysql':
            # MySQL uses GROUP_CONCAT
            self.function = 'GROUP_CONCAT'
            self.template = '%(function)s(%(expressions)s SEPARATOR %(separator)s)'
            return super().as_sql(compiler, connection)
        elif connection.vendor == 'sqlite':
            # SQLite uses GROUP_CONCAT
            self.function = 'GROUP_CONCAT'
            return super().as_sql(compiler, connection)
        else:
            raise NotImplementedError(f"StringAgg not supported for {connection.vendor}")

# Usage examples
from django.db import models

class Person(models.Model):
    name = models.CharField(max_length=100)
    birth_date = models.DateField()
    bio = models.TextField()

# Query using custom expressions
people_with_age = Person.objects.annotate(
    age=Age(F('birth_date'))
).filter(age__gte=18)

# Full-text search
search_results = Person.objects.annotate(
    search_match=FullTextSearch(F('bio'), Value('django developer'))
).filter(search_match=True)

# String aggregation
departments = Department.objects.annotate(
    employee_names=StringAgg('employees__name', separator=', ')
)

Advanced Database Functions

Window Functions

from django.db.models import Window, F
from django.db.models.functions import RowNumber, Rank, DenseRank, Lag, Lead

class RunningTotal(Expression):
    """Calculate running total using window functions"""
    
    def __init__(self, expression, partition_by=None, order_by=None):
        super().__init__()
        self.expression = expression
        self.partition_by = partition_by or []
        self.order_by = order_by or []
    
    def as_sql(self, compiler, connection):
        """Generate SQL for running total"""
        expr_sql, expr_params = compiler.compile(self.expression)
        
        # Build OVER clause
        over_parts = []
        
        if self.partition_by:
            partition_sql = ', '.join(
                compiler.compile(field)[0] for field in self.partition_by
            )
            over_parts.append(f"PARTITION BY {partition_sql}")
        
        if self.order_by:
            order_sql = ', '.join(
                compiler.compile(field)[0] for field in self.order_by
            )
            over_parts.append(f"ORDER BY {order_sql}")
        
        over_clause = ' '.join(over_parts)
        sql = f"SUM({expr_sql}) OVER ({over_clause})"
        
        return sql, expr_params

class PercentileRank(Expression):
    """Calculate percentile rank"""
    
    def __init__(self, expression, partition_by=None):
        super().__init__()
        self.expression = expression
        self.partition_by = partition_by or []
    
    def as_sql(self, compiler, connection):
        """Generate SQL for percentile rank"""
        expr_sql, expr_params = compiler.compile(self.expression)
        
        over_parts = []
        if self.partition_by:
            partition_sql = ', '.join(
                compiler.compile(field)[0] for field in self.partition_by
            )
            over_parts.append(f"PARTITION BY {partition_sql}")
        
        over_parts.append(f"ORDER BY {expr_sql}")
        over_clause = ' '.join(over_parts)
        
        sql = f"PERCENT_RANK() OVER ({over_clause})"
        
        return sql, expr_params

# Usage with window functions
class Sale(models.Model):
    product = models.ForeignKey(Product, on_delete=models.CASCADE)
    amount = models.DecimalField(max_digits=10, decimal_places=2)
    sale_date = models.DateField()
    salesperson = models.ForeignKey(User, on_delete=models.CASCADE)

# Running total by salesperson
sales_with_running_total = Sale.objects.annotate(
    running_total=RunningTotal(
        F('amount'),
        partition_by=[F('salesperson')],
        order_by=[F('sale_date')]
    )
).order_by('salesperson', 'sale_date')

# Rank salespeople by total sales
top_salespeople = Sale.objects.values('salesperson__name').annotate(
    total_sales=models.Sum('amount'),
    rank=Window(
        expression=Rank(),
        order_by=F('total_sales').desc()
    )
).order_by('rank')

# Percentile ranking
sales_with_percentile = Sale.objects.annotate(
    percentile_rank=PercentileRank(F('amount'))
)

JSON Operations

class JSONExtract(Expression):
    """Extract value from JSON field"""
    
    def __init__(self, json_field, path):
        super().__init__()
        self.json_field = json_field
        self.path = path
    
    def as_sql(self, compiler, connection):
        """Generate database-specific JSON extraction SQL"""
        field_sql, field_params = compiler.compile(self.json_field)
        
        if connection.vendor == 'postgresql':
            # PostgreSQL JSON operators
            if isinstance(self.path, str):
                sql = f"{field_sql} ->> %s"
                params = field_params + [self.path]
            else:
                # Array path
                path_str = '->'.join(f"'{p}'" if isinstance(p, str) else str(p) for p in self.path[:-1])
                sql = f"{field_sql} -> {path_str} ->> %s"
                params = field_params + [self.path[-1]]
        
        elif connection.vendor == 'mysql':
            # MySQL JSON_EXTRACT
            if isinstance(self.path, str):
                sql = f"JSON_UNQUOTE(JSON_EXTRACT({field_sql}, %s))"
                params = field_params + [f'$.{self.path}']
            else:
                path_str = '$.' + '.'.join(str(p) for p in self.path)
                sql = f"JSON_UNQUOTE(JSON_EXTRACT({field_sql}, %s))"
                params = field_params + [path_str]
        
        elif connection.vendor == 'sqlite':
            # SQLite JSON_EXTRACT
            if isinstance(self.path, str):
                sql = f"JSON_EXTRACT({field_sql}, %s)"
                params = field_params + [f'$.{self.path}']
            else:
                path_str = '$.' + '.'.join(str(p) for p in self.path)
                sql = f"JSON_EXTRACT({field_sql}, %s)"
                params = field_params + [path_str]
        
        else:
            raise NotImplementedError(f"JSON operations not supported for {connection.vendor}")
        
        return sql, params

class JSONArrayLength(Expression):
    """Get length of JSON array"""
    
    def __init__(self, json_field, path=None):
        super().__init__()
        self.json_field = json_field
        self.path = path
    
    def as_sql(self, compiler, connection):
        """Generate SQL for JSON array length"""
        field_sql, field_params = compiler.compile(self.json_field)
        
        if connection.vendor == 'postgresql':
            if self.path:
                sql = f"JSON_ARRAY_LENGTH({field_sql} -> %s)"
                params = field_params + [self.path]
            else:
                sql = f"JSON_ARRAY_LENGTH({field_sql})"
                params = field_params
        
        elif connection.vendor == 'mysql':
            if self.path:
                sql = f"JSON_LENGTH({field_sql}, %s)"
                params = field_params + [f'$.{self.path}']
            else:
                sql = f"JSON_LENGTH({field_sql})"
                params = field_params
        
        else:
            raise NotImplementedError(f"JSON array length not supported for {connection.vendor}")
        
        return sql, params

# Usage with JSON operations
class Product(models.Model):
    name = models.CharField(max_length=200)
    attributes = models.JSONField()  # e.g., {"color": "red", "sizes": ["S", "M", "L"]}

# Extract specific JSON values
products_with_color = Product.objects.annotate(
    color=JSONExtract(F('attributes'), 'color')
).filter(color='red')

# Get array length
products_with_size_count = Product.objects.annotate(
    size_count=JSONArrayLength(F('attributes'), 'sizes')
).filter(size_count__gt=2)

Complex Query Expressions

Conditional Expressions

class ConditionalSum(Expression):
    """Sum with conditions"""
    
    def __init__(self, expression, condition):
        super().__init__()
        self.expression = expression
        self.condition = condition
    
    def as_sql(self, compiler, connection):
        """Generate SQL for conditional sum"""
        expr_sql, expr_params = compiler.compile(self.expression)
        cond_sql, cond_params = compiler.compile(self.condition)
        
        sql = f"SUM(CASE WHEN {cond_sql} THEN {expr_sql} ELSE 0 END)"
        params = cond_params + expr_params
        
        return sql, params

class MovingAverage(Expression):
    """Calculate moving average"""
    
    def __init__(self, expression, window_size, order_by):
        super().__init__()
        self.expression = expression
        self.window_size = window_size
        self.order_by = order_by
    
    def as_sql(self, compiler, connection):
        """Generate SQL for moving average"""
        expr_sql, expr_params = compiler.compile(self.expression)
        order_sql, order_params = compiler.compile(self.order_by)
        
        sql = f"""
        AVG({expr_sql}) OVER (
            ORDER BY {order_sql}
            ROWS BETWEEN {self.window_size - 1} PRECEDING AND CURRENT ROW
        )
        """
        
        params = expr_params + order_params
        
        return sql, params

# Usage examples
class Order(models.Model):
    customer = models.ForeignKey(Customer, on_delete=models.CASCADE)
    total = models.DecimalField(max_digits=10, decimal_places=2)
    status = models.CharField(max_length=20)
    created_at = models.DateTimeField(auto_now_add=True)

# Conditional aggregation
customer_stats = Customer.objects.annotate(
    total_completed_orders=ConditionalSum(
        F('orders__total'),
        Q(orders__status='completed')
    ),
    total_pending_orders=ConditionalSum(
        F('orders__total'),
        Q(orders__status='pending')
    )
)

# Moving average of order totals
orders_with_moving_avg = Order.objects.annotate(
    moving_avg=MovingAverage(
        F('total'),
        window_size=7,
        order_by=F('created_at')
    )
).order_by('created_at')

Geographic Expressions

from django.contrib.gis.db.models import PointField
from django.contrib.gis.geos import Point

class DistanceFromPoint(Expression):
    """Calculate distance from a specific point"""
    
    def __init__(self, point_field, reference_point):
        super().__init__()
        self.point_field = point_field
        self.reference_point = reference_point
    
    def as_sql(self, compiler, connection):
        """Generate SQL for distance calculation"""
        field_sql, field_params = compiler.compile(self.point_field)
        
        if connection.vendor == 'postgresql':
            # PostGIS distance calculation
            sql = f"ST_Distance({field_sql}, ST_GeomFromText(%s, 4326))"
            params = field_params + [self.reference_point.wkt]
        
        elif connection.vendor == 'mysql':
            # MySQL spatial distance
            sql = f"ST_Distance({field_sql}, ST_GeomFromText(%s))"
            params = field_params + [self.reference_point.wkt]
        
        else:
            raise NotImplementedError(f"Geographic distance not supported for {connection.vendor}")
        
        return sql, params

class WithinRadius(Expression):
    """Check if point is within radius"""
    
    def __init__(self, point_field, center_point, radius_meters):
        super().__init__()
        self.point_field = point_field
        self.center_point = center_point
        self.radius_meters = radius_meters
    
    def as_sql(self, compiler, connection):
        """Generate SQL for radius check"""
        field_sql, field_params = compiler.compile(self.point_field)
        
        if connection.vendor == 'postgresql':
            # PostGIS within distance
            sql = f"ST_DWithin({field_sql}, ST_GeomFromText(%s, 4326), %s)"
            params = field_params + [self.center_point.wkt, self.radius_meters]
        
        else:
            raise NotImplementedError(f"Geographic radius check not supported for {connection.vendor}")
        
        return sql, params

# Usage with geographic data
class Store(models.Model):
    name = models.CharField(max_length=200)
    location = PointField()

# Find stores within 5km of a point
center = Point(-122.4194, 37.7749)  # San Francisco
nearby_stores = Store.objects.annotate(
    distance=DistanceFromPoint(F('location'), center)
).filter(
    WithinRadius(F('location'), center, 5000)  # 5000 meters
).order_by('distance')

Performance Optimization

Query Optimization Expressions

class OptimizedExists(Expression):
    """Optimized EXISTS subquery"""
    
    def __init__(self, queryset):
        super().__init__()
        self.queryset = queryset
    
    def as_sql(self, compiler, connection):
        """Generate optimized EXISTS SQL"""
        # Limit subquery to just check existence
        limited_queryset = self.queryset.values('pk')[:1]
        subquery_sql, subquery_params = limited_queryset.query.as_sql(compiler, connection)
        
        sql = f"EXISTS ({subquery_sql})"
        
        return sql, subquery_params

class BatchedIn(Expression):
    """Batched IN clause for large lists"""
    
    def __init__(self, field, values, batch_size=1000):
        super().__init__()
        self.field = field
        self.values = list(values)
        self.batch_size = batch_size
    
    def as_sql(self, compiler, connection):
        """Generate batched IN SQL"""
        field_sql, field_params = compiler.compile(self.field)
        
        if len(self.values) <= self.batch_size:
            # Single IN clause
            placeholders = ', '.join(['%s'] * len(self.values))
            sql = f"{field_sql} IN ({placeholders})"
            params = field_params + self.values
        else:
            # Multiple IN clauses with OR
            batches = [
                self.values[i:i + self.batch_size]
                for i in range(0, len(self.values), self.batch_size)
            ]
            
            batch_clauses = []
            params = field_params[:]
            
            for batch in batches:
                placeholders = ', '.join(['%s'] * len(batch))
                batch_clauses.append(f"{field_sql} IN ({placeholders})")
                params.extend(batch)
            
            sql = f"({' OR '.join(batch_clauses)})"
        
        return sql, params

# Usage for optimization
# Optimized existence check
users_with_orders = User.objects.filter(
    OptimizedExists(Order.objects.filter(customer=OuterRef('pk')))
)

# Batched IN for large lists
large_id_list = list(range(1, 10000))
products = Product.objects.filter(
    BatchedIn(F('id'), large_id_list)
)

Custom ORM expressions provide powerful ways to leverage database capabilities while maintaining Django's ORM benefits. They enable complex queries, database-specific optimizations, and custom business logic at the database level, resulting in more efficient and expressive Django applications.