Django's ORM provides powerful expression APIs that allow you to create custom database expressions, functions, and operations. This enables sophisticated database queries while maintaining the benefits of Django's ORM abstraction.
ORM expressions represent database operations that can be combined, reused, and optimized by Django's query compiler. They provide a way to generate complex SQL while staying within Django's ORM framework.
from django.db.models import Expression, Value, F, Case, When
from django.db.models.functions import Coalesce, Greatest, Least
# Custom expression for calculating age
class Age(Expression):
"""Calculate age from birth date"""
def __init__(self, birth_date_field):
super().__init__()
self.birth_date_field = birth_date_field
def as_sql(self, compiler, connection):
"""Generate SQL for age calculation"""
birth_date_sql, birth_date_params = compiler.compile(self.birth_date_field)
if connection.vendor == 'postgresql':
sql = f"EXTRACT(YEAR FROM AGE({birth_date_sql}))"
elif connection.vendor == 'mysql':
sql = f"TIMESTAMPDIFF(YEAR, {birth_date_sql}, NOW())"
elif connection.vendor == 'sqlite':
sql = f"(strftime('%Y', 'now') - strftime('%Y', {birth_date_sql}))"
else:
raise NotImplementedError(f"Age calculation not supported for {connection.vendor}")
return sql, birth_date_params
# Custom function for full-text search
class FullTextSearch(Expression):
"""PostgreSQL full-text search expression"""
def __init__(self, search_vector, query, config='english'):
super().__init__()
self.search_vector = search_vector
self.query = query
self.config = config
def as_sql(self, compiler, connection):
"""Generate SQL for full-text search"""
if connection.vendor != 'postgresql':
raise NotImplementedError("Full-text search only supported on PostgreSQL")
vector_sql, vector_params = compiler.compile(self.search_vector)
query_sql, query_params = compiler.compile(self.query)
sql = f"to_tsvector('{self.config}', {vector_sql}) @@ plainto_tsquery('{self.config}', {query_sql})"
params = vector_params + query_params
return sql, params
# Custom aggregation function
from django.db.models import Aggregate
class StringAgg(Aggregate):
"""String aggregation function"""
function = 'STRING_AGG'
template = '%(function)s(%(expressions)s, %(separator)s)'
def __init__(self, expression, separator=',', **extra):
super().__init__(
expression,
separator=Value(separator),
**extra
)
def as_sql(self, compiler, connection):
"""Generate database-specific SQL"""
if connection.vendor == 'postgresql':
return super().as_sql(compiler, connection)
elif connection.vendor == 'mysql':
# MySQL uses GROUP_CONCAT
self.function = 'GROUP_CONCAT'
self.template = '%(function)s(%(expressions)s SEPARATOR %(separator)s)'
return super().as_sql(compiler, connection)
elif connection.vendor == 'sqlite':
# SQLite uses GROUP_CONCAT
self.function = 'GROUP_CONCAT'
return super().as_sql(compiler, connection)
else:
raise NotImplementedError(f"StringAgg not supported for {connection.vendor}")
# Usage examples
from django.db import models
class Person(models.Model):
name = models.CharField(max_length=100)
birth_date = models.DateField()
bio = models.TextField()
# Query using custom expressions
people_with_age = Person.objects.annotate(
age=Age(F('birth_date'))
).filter(age__gte=18)
# Full-text search
search_results = Person.objects.annotate(
search_match=FullTextSearch(F('bio'), Value('django developer'))
).filter(search_match=True)
# String aggregation
departments = Department.objects.annotate(
employee_names=StringAgg('employees__name', separator=', ')
)
from django.db.models import Window, F
from django.db.models.functions import RowNumber, Rank, DenseRank, Lag, Lead
class RunningTotal(Expression):
"""Calculate running total using window functions"""
def __init__(self, expression, partition_by=None, order_by=None):
super().__init__()
self.expression = expression
self.partition_by = partition_by or []
self.order_by = order_by or []
def as_sql(self, compiler, connection):
"""Generate SQL for running total"""
expr_sql, expr_params = compiler.compile(self.expression)
# Build OVER clause
over_parts = []
if self.partition_by:
partition_sql = ', '.join(
compiler.compile(field)[0] for field in self.partition_by
)
over_parts.append(f"PARTITION BY {partition_sql}")
if self.order_by:
order_sql = ', '.join(
compiler.compile(field)[0] for field in self.order_by
)
over_parts.append(f"ORDER BY {order_sql}")
over_clause = ' '.join(over_parts)
sql = f"SUM({expr_sql}) OVER ({over_clause})"
return sql, expr_params
class PercentileRank(Expression):
"""Calculate percentile rank"""
def __init__(self, expression, partition_by=None):
super().__init__()
self.expression = expression
self.partition_by = partition_by or []
def as_sql(self, compiler, connection):
"""Generate SQL for percentile rank"""
expr_sql, expr_params = compiler.compile(self.expression)
over_parts = []
if self.partition_by:
partition_sql = ', '.join(
compiler.compile(field)[0] for field in self.partition_by
)
over_parts.append(f"PARTITION BY {partition_sql}")
over_parts.append(f"ORDER BY {expr_sql}")
over_clause = ' '.join(over_parts)
sql = f"PERCENT_RANK() OVER ({over_clause})"
return sql, expr_params
# Usage with window functions
class Sale(models.Model):
product = models.ForeignKey(Product, on_delete=models.CASCADE)
amount = models.DecimalField(max_digits=10, decimal_places=2)
sale_date = models.DateField()
salesperson = models.ForeignKey(User, on_delete=models.CASCADE)
# Running total by salesperson
sales_with_running_total = Sale.objects.annotate(
running_total=RunningTotal(
F('amount'),
partition_by=[F('salesperson')],
order_by=[F('sale_date')]
)
).order_by('salesperson', 'sale_date')
# Rank salespeople by total sales
top_salespeople = Sale.objects.values('salesperson__name').annotate(
total_sales=models.Sum('amount'),
rank=Window(
expression=Rank(),
order_by=F('total_sales').desc()
)
).order_by('rank')
# Percentile ranking
sales_with_percentile = Sale.objects.annotate(
percentile_rank=PercentileRank(F('amount'))
)
class JSONExtract(Expression):
"""Extract value from JSON field"""
def __init__(self, json_field, path):
super().__init__()
self.json_field = json_field
self.path = path
def as_sql(self, compiler, connection):
"""Generate database-specific JSON extraction SQL"""
field_sql, field_params = compiler.compile(self.json_field)
if connection.vendor == 'postgresql':
# PostgreSQL JSON operators
if isinstance(self.path, str):
sql = f"{field_sql} ->> %s"
params = field_params + [self.path]
else:
# Array path
path_str = '->'.join(f"'{p}'" if isinstance(p, str) else str(p) for p in self.path[:-1])
sql = f"{field_sql} -> {path_str} ->> %s"
params = field_params + [self.path[-1]]
elif connection.vendor == 'mysql':
# MySQL JSON_EXTRACT
if isinstance(self.path, str):
sql = f"JSON_UNQUOTE(JSON_EXTRACT({field_sql}, %s))"
params = field_params + [f'$.{self.path}']
else:
path_str = '$.' + '.'.join(str(p) for p in self.path)
sql = f"JSON_UNQUOTE(JSON_EXTRACT({field_sql}, %s))"
params = field_params + [path_str]
elif connection.vendor == 'sqlite':
# SQLite JSON_EXTRACT
if isinstance(self.path, str):
sql = f"JSON_EXTRACT({field_sql}, %s)"
params = field_params + [f'$.{self.path}']
else:
path_str = '$.' + '.'.join(str(p) for p in self.path)
sql = f"JSON_EXTRACT({field_sql}, %s)"
params = field_params + [path_str]
else:
raise NotImplementedError(f"JSON operations not supported for {connection.vendor}")
return sql, params
class JSONArrayLength(Expression):
"""Get length of JSON array"""
def __init__(self, json_field, path=None):
super().__init__()
self.json_field = json_field
self.path = path
def as_sql(self, compiler, connection):
"""Generate SQL for JSON array length"""
field_sql, field_params = compiler.compile(self.json_field)
if connection.vendor == 'postgresql':
if self.path:
sql = f"JSON_ARRAY_LENGTH({field_sql} -> %s)"
params = field_params + [self.path]
else:
sql = f"JSON_ARRAY_LENGTH({field_sql})"
params = field_params
elif connection.vendor == 'mysql':
if self.path:
sql = f"JSON_LENGTH({field_sql}, %s)"
params = field_params + [f'$.{self.path}']
else:
sql = f"JSON_LENGTH({field_sql})"
params = field_params
else:
raise NotImplementedError(f"JSON array length not supported for {connection.vendor}")
return sql, params
# Usage with JSON operations
class Product(models.Model):
name = models.CharField(max_length=200)
attributes = models.JSONField() # e.g., {"color": "red", "sizes": ["S", "M", "L"]}
# Extract specific JSON values
products_with_color = Product.objects.annotate(
color=JSONExtract(F('attributes'), 'color')
).filter(color='red')
# Get array length
products_with_size_count = Product.objects.annotate(
size_count=JSONArrayLength(F('attributes'), 'sizes')
).filter(size_count__gt=2)
class ConditionalSum(Expression):
"""Sum with conditions"""
def __init__(self, expression, condition):
super().__init__()
self.expression = expression
self.condition = condition
def as_sql(self, compiler, connection):
"""Generate SQL for conditional sum"""
expr_sql, expr_params = compiler.compile(self.expression)
cond_sql, cond_params = compiler.compile(self.condition)
sql = f"SUM(CASE WHEN {cond_sql} THEN {expr_sql} ELSE 0 END)"
params = cond_params + expr_params
return sql, params
class MovingAverage(Expression):
"""Calculate moving average"""
def __init__(self, expression, window_size, order_by):
super().__init__()
self.expression = expression
self.window_size = window_size
self.order_by = order_by
def as_sql(self, compiler, connection):
"""Generate SQL for moving average"""
expr_sql, expr_params = compiler.compile(self.expression)
order_sql, order_params = compiler.compile(self.order_by)
sql = f"""
AVG({expr_sql}) OVER (
ORDER BY {order_sql}
ROWS BETWEEN {self.window_size - 1} PRECEDING AND CURRENT ROW
)
"""
params = expr_params + order_params
return sql, params
# Usage examples
class Order(models.Model):
customer = models.ForeignKey(Customer, on_delete=models.CASCADE)
total = models.DecimalField(max_digits=10, decimal_places=2)
status = models.CharField(max_length=20)
created_at = models.DateTimeField(auto_now_add=True)
# Conditional aggregation
customer_stats = Customer.objects.annotate(
total_completed_orders=ConditionalSum(
F('orders__total'),
Q(orders__status='completed')
),
total_pending_orders=ConditionalSum(
F('orders__total'),
Q(orders__status='pending')
)
)
# Moving average of order totals
orders_with_moving_avg = Order.objects.annotate(
moving_avg=MovingAverage(
F('total'),
window_size=7,
order_by=F('created_at')
)
).order_by('created_at')
from django.contrib.gis.db.models import PointField
from django.contrib.gis.geos import Point
class DistanceFromPoint(Expression):
"""Calculate distance from a specific point"""
def __init__(self, point_field, reference_point):
super().__init__()
self.point_field = point_field
self.reference_point = reference_point
def as_sql(self, compiler, connection):
"""Generate SQL for distance calculation"""
field_sql, field_params = compiler.compile(self.point_field)
if connection.vendor == 'postgresql':
# PostGIS distance calculation
sql = f"ST_Distance({field_sql}, ST_GeomFromText(%s, 4326))"
params = field_params + [self.reference_point.wkt]
elif connection.vendor == 'mysql':
# MySQL spatial distance
sql = f"ST_Distance({field_sql}, ST_GeomFromText(%s))"
params = field_params + [self.reference_point.wkt]
else:
raise NotImplementedError(f"Geographic distance not supported for {connection.vendor}")
return sql, params
class WithinRadius(Expression):
"""Check if point is within radius"""
def __init__(self, point_field, center_point, radius_meters):
super().__init__()
self.point_field = point_field
self.center_point = center_point
self.radius_meters = radius_meters
def as_sql(self, compiler, connection):
"""Generate SQL for radius check"""
field_sql, field_params = compiler.compile(self.point_field)
if connection.vendor == 'postgresql':
# PostGIS within distance
sql = f"ST_DWithin({field_sql}, ST_GeomFromText(%s, 4326), %s)"
params = field_params + [self.center_point.wkt, self.radius_meters]
else:
raise NotImplementedError(f"Geographic radius check not supported for {connection.vendor}")
return sql, params
# Usage with geographic data
class Store(models.Model):
name = models.CharField(max_length=200)
location = PointField()
# Find stores within 5km of a point
center = Point(-122.4194, 37.7749) # San Francisco
nearby_stores = Store.objects.annotate(
distance=DistanceFromPoint(F('location'), center)
).filter(
WithinRadius(F('location'), center, 5000) # 5000 meters
).order_by('distance')
class OptimizedExists(Expression):
"""Optimized EXISTS subquery"""
def __init__(self, queryset):
super().__init__()
self.queryset = queryset
def as_sql(self, compiler, connection):
"""Generate optimized EXISTS SQL"""
# Limit subquery to just check existence
limited_queryset = self.queryset.values('pk')[:1]
subquery_sql, subquery_params = limited_queryset.query.as_sql(compiler, connection)
sql = f"EXISTS ({subquery_sql})"
return sql, subquery_params
class BatchedIn(Expression):
"""Batched IN clause for large lists"""
def __init__(self, field, values, batch_size=1000):
super().__init__()
self.field = field
self.values = list(values)
self.batch_size = batch_size
def as_sql(self, compiler, connection):
"""Generate batched IN SQL"""
field_sql, field_params = compiler.compile(self.field)
if len(self.values) <= self.batch_size:
# Single IN clause
placeholders = ', '.join(['%s'] * len(self.values))
sql = f"{field_sql} IN ({placeholders})"
params = field_params + self.values
else:
# Multiple IN clauses with OR
batches = [
self.values[i:i + self.batch_size]
for i in range(0, len(self.values), self.batch_size)
]
batch_clauses = []
params = field_params[:]
for batch in batches:
placeholders = ', '.join(['%s'] * len(batch))
batch_clauses.append(f"{field_sql} IN ({placeholders})")
params.extend(batch)
sql = f"({' OR '.join(batch_clauses)})"
return sql, params
# Usage for optimization
# Optimized existence check
users_with_orders = User.objects.filter(
OptimizedExists(Order.objects.filter(customer=OuterRef('pk')))
)
# Batched IN for large lists
large_id_list = list(range(1, 10000))
products = Product.objects.filter(
BatchedIn(F('id'), large_id_list)
)
Custom ORM expressions provide powerful ways to leverage database capabilities while maintaining Django's ORM benefits. They enable complex queries, database-specific optimizations, and custom business logic at the database level, resulting in more efficient and expressive Django applications.
Extending Django's Core
Django's architecture allows deep customization and extension of its core components. This comprehensive guide covers advanced techniques for extending Django's ORM, admin interface, authentication system, and other core components to create powerful, customized functionality that integrates seamlessly with the framework.
Custom Management Commands
Django's management command system provides a powerful way to create command-line tools for administrative tasks, data processing, and automation. This guide covers creating sophisticated management commands with advanced features like progress tracking, parallel processing, and integration with external systems.