Database optimization is crucial for building scalable Django applications. Understanding how to identify performance bottlenecks, optimize queries, and implement efficient database patterns ensures your application can handle growth and maintain responsiveness.
# Enable query logging for development
# settings.py
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'handlers': {
'console': {
'class': 'logging.StreamHandler',
},
},
'loggers': {
'django.db.backends': {
'handlers': ['console'],
'level': 'DEBUG',
'propagate': False,
},
},
}
# Query analysis tools
from django.db import connection
from django.conf import settings
import time
class QueryAnalyzer:
"""Analyze and optimize database queries"""
@staticmethod
def analyze_queries(func):
"""Decorator to analyze queries executed by a function"""
def wrapper(*args, **kwargs):
# Reset query log
connection.queries_log.clear()
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
# Analyze queries
total_queries = len(connection.queries)
total_time = sum(float(q['time']) for q in connection.queries)
print(f"\n=== Query Analysis for {func.__name__} ===")
print(f"Total queries: {total_queries}")
print(f"Total time: {total_time:.4f}s")
print(f"Function time: {end_time - start_time:.4f}s")
# Show slow queries
slow_queries = [q for q in connection.queries if float(q['time']) > 0.01]
if slow_queries:
print(f"\nSlow queries ({len(slow_queries)}):")
for i, query in enumerate(slow_queries, 1):
print(f"{i}. Time: {query['time']}s")
print(f" SQL: {query['sql'][:100]}...")
# Detect N+1 queries
similar_queries = {}
for query in connection.queries:
sql_pattern = query['sql'].split('WHERE')[0] if 'WHERE' in query['sql'] else query['sql']
similar_queries[sql_pattern] = similar_queries.get(sql_pattern, 0) + 1
n_plus_one = {pattern: count for pattern, count in similar_queries.items() if count > 5}
if n_plus_one:
print(f"\nPotential N+1 queries:")
for pattern, count in n_plus_one.items():
print(f" {count}x: {pattern[:80]}...")
return result
return wrapper
@staticmethod
def explain_query(queryset):
"""Get query execution plan"""
if connection.vendor == 'postgresql':
with connection.cursor() as cursor:
cursor.execute(f"EXPLAIN ANALYZE {queryset.query}")
return cursor.fetchall()
elif connection.vendor == 'mysql':
with connection.cursor() as cursor:
cursor.execute(f"EXPLAIN {queryset.query}")
return cursor.fetchall()
else:
return str(queryset.query)
# Example usage
@QueryAnalyzer.analyze_queries
def get_user_posts_inefficient(user_id):
"""Inefficient version with N+1 queries"""
user = User.objects.get(id=user_id)
posts = Post.objects.filter(author=user)
# This creates N+1 queries
result = []
for post in posts:
result.append({
'title': post.title,
'author_name': post.author.username, # N+1 query here
'category': post.category.name, # N+1 query here
'comment_count': post.comments.count() # N+1 query here
})
return result
@QueryAnalyzer.analyze_queries
def get_user_posts_optimized(user_id):
"""Optimized version with proper prefetching"""
posts = Post.objects.filter(author_id=user_id) \
.select_related('author', 'category') \
.prefetch_related('comments')
result = []
for post in posts:
result.append({
'title': post.title,
'author_name': post.author.username,
'category': post.category.name,
'comment_count': post.comments.count()
})
return result
from django.db.models import Prefetch, Count
class OptimizedQueryPatterns:
"""Patterns for optimized database queries"""
@staticmethod
def basic_select_related():
"""Basic select_related usage for foreign keys"""
# Bad: N+1 queries
posts = Post.objects.all()
for post in posts:
print(f"{post.title} by {post.author.username}") # Query per post
# Good: Single query with JOIN
posts = Post.objects.select_related('author').all()
for post in posts:
print(f"{post.title} by {post.author.username}") # No additional queries
@staticmethod
def multi_level_select_related():
"""Multi-level select_related for nested relationships"""
# Select related across multiple levels
posts = Post.objects.select_related(
'author',
'author__profile',
'category',
'category__parent'
).all()
for post in posts:
print(f"{post.title} by {post.author.profile.full_name}")
print(f"Category: {post.category.parent.name} > {post.category.name}")
@staticmethod
def basic_prefetch_related():
"""Basic prefetch_related for many-to-many and reverse foreign keys"""
# Bad: N+1 queries for tags
posts = Post.objects.all()
for post in posts:
tags = list(post.tags.all()) # Query per post
# Good: Prefetch tags in separate query
posts = Post.objects.prefetch_related('tags').all()
for post in posts:
tags = list(post.tags.all()) # No additional queries
@staticmethod
def custom_prefetch_related():
"""Custom prefetch with filtering and ordering"""
# Prefetch only approved comments, ordered by date
posts = Post.objects.prefetch_related(
Prefetch(
'comments',
queryset=Comment.objects.filter(is_approved=True)
.select_related('author')
.order_by('-created_at')
)
).all()
for post in posts:
approved_comments = post.comments.all() # Already filtered and ordered
@staticmethod
def complex_prefetch_patterns():
"""Complex prefetch patterns for nested relationships"""
# Prefetch posts with their comments and comment authors
authors = Author.objects.prefetch_related(
Prefetch(
'posts',
queryset=Post.objects.select_related('category')
.prefetch_related(
Prefetch(
'comments',
queryset=Comment.objects.select_related('author')
.filter(is_approved=True)
)
)
)
).all()
for author in authors:
for post in author.posts.all():
print(f"Post: {post.title} in {post.category.name}")
for comment in post.comments.all():
print(f" Comment by {comment.author.username}")
@staticmethod
def conditional_prefetch():
"""Conditional prefetching based on user permissions"""
def get_posts_for_user(user):
"""Get posts with appropriate prefetching based on user role"""
base_queryset = Post.objects.select_related('author', 'category')
if user.is_staff:
# Staff can see all comments
return base_queryset.prefetch_related('comments__author')
else:
# Regular users only see approved comments
return base_queryset.prefetch_related(
Prefetch(
'comments',
queryset=Comment.objects.filter(is_approved=True)
.select_related('author')
)
)
@staticmethod
def optimized_aggregation_queries():
"""Optimize queries with aggregations"""
# Bad: Multiple queries for counts
posts = Post.objects.all()
for post in posts:
comment_count = post.comments.count() # Query per post
like_count = post.likes.count() # Query per post
# Good: Annotate with counts
posts = Post.objects.annotate(
comment_count=Count('comments'),
like_count=Count('likes')
).select_related('author', 'category')
for post in posts:
print(f"{post.title}: {post.comment_count} comments, {post.like_count} likes")
# models.py with strategic indexing
from django.db import models
class OptimizedPost(models.Model):
"""Post model with optimized indexing"""
title = models.CharField(max_length=200)
slug = models.SlugField(max_length=200, unique=True) # Automatic index
content = models.TextField()
author = models.ForeignKey(User, on_delete=models.CASCADE) # Automatic index
category = models.ForeignKey(Category, on_delete=models.SET_NULL, null=True)
status = models.CharField(max_length=20, default='draft')
is_featured = models.BooleanField(default=False)
view_count = models.PositiveIntegerField(default=0)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
published_at = models.DateTimeField(null=True, blank=True)
class Meta:
# Composite indexes for common query patterns
indexes = [
# For filtering published posts by date
models.Index(fields=['status', 'published_at']),
# For author's posts ordered by date
models.Index(fields=['author', 'created_at']),
# For category posts with status
models.Index(fields=['category', 'status', 'created_at']),
# For featured posts
models.Index(fields=['is_featured', 'published_at']),
# For popular posts
models.Index(fields=['view_count']),
# Partial index for published posts only (PostgreSQL)
models.Index(
fields=['created_at'],
name='idx_published_posts_date',
condition=models.Q(status='published')
),
]
# Default ordering uses indexed fields
ordering = ['-created_at']
class IndexOptimizationManager:
"""Manage database indexes for optimal performance"""
@staticmethod
def analyze_missing_indexes():
"""Analyze queries to identify missing indexes"""
# PostgreSQL: Check for sequential scans on large tables
missing_indexes_query = """
SELECT
schemaname,
tablename,
seq_scan,
seq_tup_read,
idx_scan,
idx_tup_fetch,
seq_tup_read / seq_scan as avg_seq_read
FROM pg_stat_user_tables
WHERE seq_scan > 0
AND seq_tup_read / seq_scan > 1000
ORDER BY seq_tup_read DESC
"""
with connection.cursor() as cursor:
cursor.execute(missing_indexes_query)
return cursor.fetchall()
@staticmethod
def analyze_unused_indexes():
"""Find unused indexes that can be dropped"""
unused_indexes_query = """
SELECT
schemaname,
tablename,
indexname,
idx_scan,
pg_size_pretty(pg_relation_size(indexrelid)) as size
FROM pg_stat_user_indexes
WHERE idx_scan = 0
AND indexrelname NOT LIKE '%_pkey'
ORDER BY pg_relation_size(indexrelid) DESC
"""
with connection.cursor() as cursor:
cursor.execute(unused_indexes_query)
return cursor.fetchall()
@staticmethod
def create_conditional_indexes():
"""Create conditional indexes for specific use cases"""
conditional_indexes = [
# Index only for published posts
"""
CREATE INDEX CONCURRENTLY idx_published_posts_category
ON blog_post (category_id, created_at)
WHERE status = 'published'
""",
# Index for active users only
"""
CREATE INDEX CONCURRENTLY idx_active_users_login
ON auth_user (last_login)
WHERE is_active = true
""",
# Index for recent data only
"""
CREATE INDEX CONCURRENTLY idx_recent_analytics
ON analytics_event (event_type, timestamp)
WHERE timestamp > NOW() - INTERVAL '30 days'
""",
]
with connection.cursor() as cursor:
for index_sql in conditional_indexes:
try:
cursor.execute(index_sql)
print(f"Created index: {index_sql.split()[4]}")
except Exception as e:
print(f"Error creating index: {e}")
@staticmethod
def optimize_text_search_indexes():
"""Create indexes for text search optimization"""
# PostgreSQL GIN indexes for full-text search
text_search_indexes = [
# Full-text search on title and content
"""
CREATE INDEX CONCURRENTLY idx_post_fulltext
ON blog_post USING GIN (to_tsvector('english', title || ' ' || content))
""",
# Trigram index for fuzzy matching
"""
CREATE EXTENSION IF NOT EXISTS pg_trgm;
CREATE INDEX CONCURRENTLY idx_post_title_trgm
ON blog_post USING GIN (title gin_trgm_ops)
""",
# JSON field indexing
"""
CREATE INDEX CONCURRENTLY idx_post_metadata_gin
ON blog_post USING GIN (metadata)
""",
]
with connection.cursor() as cursor:
for index_sql in text_search_indexes:
try:
cursor.execute(index_sql)
print(f"Created text search index")
except Exception as e:
print(f"Error creating text search index: {e}")
# Index monitoring and maintenance
class IndexMaintenanceManager:
"""Monitor and maintain database indexes"""
@staticmethod
def get_index_usage_stats():
"""Get comprehensive index usage statistics"""
index_stats_query = """
SELECT
t.schemaname,
t.tablename,
i.indexname,
i.idx_scan,
i.idx_tup_read,
i.idx_tup_fetch,
pg_size_pretty(pg_relation_size(i.indexrelid)) as size,
pg_relation_size(i.indexrelid) as size_bytes,
CASE
WHEN i.idx_scan = 0 THEN 'Unused'
WHEN i.idx_scan < 100 THEN 'Low Usage'
WHEN i.idx_scan < 1000 THEN 'Medium Usage'
ELSE 'High Usage'
END as usage_category
FROM pg_stat_user_indexes i
JOIN pg_stat_user_tables t ON i.relid = t.relid
ORDER BY i.idx_scan DESC, pg_relation_size(i.indexrelid) DESC
"""
with connection.cursor() as cursor:
cursor.execute(index_stats_query)
columns = [desc[0] for desc in cursor.description]
return [dict(zip(columns, row)) for row in cursor.fetchall()]
@staticmethod
def reindex_fragmented_indexes():
"""Reindex fragmented indexes"""
# Find fragmented indexes (PostgreSQL)
fragmentation_query = """
SELECT
schemaname,
tablename,
indexname,
pg_size_pretty(pg_relation_size(indexrelid)) as size
FROM pg_stat_user_indexes
WHERE idx_scan > 1000 -- Only reindex used indexes
ORDER BY pg_relation_size(indexrelid) DESC
"""
with connection.cursor() as cursor:
cursor.execute(fragmentation_query)
indexes_to_reindex = cursor.fetchall()
for schema, table, index, size in indexes_to_reindex:
try:
cursor.execute(f"REINDEX INDEX CONCURRENTLY {index}")
print(f"Reindexed {index} ({size})")
except Exception as e:
print(f"Error reindexing {index}: {e}")
@staticmethod
def analyze_table_statistics():
"""Update table statistics for query planner"""
with connection.cursor() as cursor:
# PostgreSQL: Update statistics
cursor.execute("ANALYZE")
# Get table statistics
stats_query = """
SELECT
schemaname,
tablename,
n_live_tup,
n_dead_tup,
last_vacuum,
last_autovacuum,
last_analyze,
last_autoanalyze
FROM pg_stat_user_tables
ORDER BY n_live_tup DESC
"""
cursor.execute(stats_query)
return cursor.fetchall()
from django.db.models import Q, F, Case, When, Value, Exists, OuterRef, Subquery
class AdvancedQueryOptimization:
"""Advanced query optimization techniques"""
@staticmethod
def optimize_exists_queries():
"""Use EXISTS instead of IN for better performance"""
# Bad: Using IN with subquery (can be slow for large datasets)
popular_categories = Category.objects.filter(
id__in=Post.objects.filter(view_count__gt=1000).values('category_id')
)
# Good: Using EXISTS (often faster)
popular_categories = Category.objects.filter(
Exists(Post.objects.filter(category=OuterRef('pk'), view_count__gt=1000))
)
return popular_categories
@staticmethod
def optimize_count_queries():
"""Optimize counting queries"""
# Bad: count() on large tables
total_posts = Post.objects.count() # Can be slow on large tables
# Good: Use aggregation or estimation for large datasets
from django.db.models import Count
# For approximate counts (PostgreSQL)
with connection.cursor() as cursor:
cursor.execute("""
SELECT reltuples::BIGINT AS estimate
FROM pg_class
WHERE relname = 'blog_post'
""")
approximate_count = cursor.fetchone()[0]
# For conditional counts, use aggregation
category_stats = Category.objects.annotate(
post_count=Count('posts'),
published_count=Count('posts', filter=Q(posts__status='published'))
)
return category_stats
@staticmethod
def optimize_pagination():
"""Optimize pagination for large datasets"""
# Bad: OFFSET pagination (slow for large offsets)
def bad_pagination(page, per_page=20):
offset = (page - 1) * per_page
return Post.objects.all()[offset:offset + per_page]
# Good: Cursor-based pagination
def cursor_pagination(cursor_id=None, per_page=20):
queryset = Post.objects.order_by('-id')
if cursor_id:
queryset = queryset.filter(id__lt=cursor_id)
posts = list(queryset[:per_page + 1])
has_next = len(posts) > per_page
if has_next:
posts = posts[:-1]
next_cursor = posts[-1].id if posts and has_next else None
return {
'posts': posts,
'next_cursor': next_cursor,
'has_next': has_next
}
@staticmethod
def optimize_complex_filters():
"""Optimize complex filtering operations"""
# Use database functions instead of Python processing
from django.db.models.functions import Extract, TruncDate
# Bad: Filter in Python
def bad_date_filtering():
posts = Post.objects.all()
recent_posts = [p for p in posts if p.created_at.year == 2023]
return recent_posts
# Good: Filter in database
def good_date_filtering():
return Post.objects.filter(created_at__year=2023)
# Use annotations for complex calculations
posts_with_metrics = Post.objects.annotate(
# Calculate engagement score in database
engagement_score=F('view_count') + F('like_count') * 2,
# Extract date parts
created_year=Extract('created_at', 'year'),
created_month=Extract('created_at', 'month'),
# Conditional values
popularity_tier=Case(
When(view_count__gte=10000, then=Value('high')),
When(view_count__gte=1000, then=Value('medium')),
default=Value('low')
)
).filter(engagement_score__gte=100)
return posts_with_metrics
@staticmethod
def optimize_bulk_operations():
"""Optimize bulk create, update, and delete operations"""
# Bulk create
def bulk_create_posts(posts_data):
posts = [
Post(
title=data['title'],
content=data['content'],
author_id=data['author_id']
)
for data in posts_data
]
return Post.objects.bulk_create(posts, batch_size=1000)
# Bulk update
def bulk_update_view_counts(post_updates):
posts_to_update = []
for post_id, increment in post_updates.items():
post = Post(id=post_id)
post.view_count = F('view_count') + increment
posts_to_update.append(post)
return Post.objects.bulk_update(
posts_to_update,
['view_count'],
batch_size=1000
)
# Efficient bulk delete with chunking
def bulk_delete_old_posts(cutoff_date, chunk_size=1000):
total_deleted = 0
while True:
# Get IDs to delete in chunks
ids_to_delete = list(
Post.objects.filter(created_at__lt=cutoff_date)
.values_list('id', flat=True)[:chunk_size]
)
if not ids_to_delete:
break
# Delete chunk
deleted_count = Post.objects.filter(id__in=ids_to_delete).delete()[0]
total_deleted += deleted_count
if len(ids_to_delete) < chunk_size:
break
return total_deleted
# Query caching strategies
class QueryCachingOptimization:
"""Implement query caching for performance"""
@staticmethod
def cache_expensive_queries():
"""Cache results of expensive queries"""
from django.core.cache import cache
def get_popular_posts(cache_timeout=300):
cache_key = 'popular_posts'
cached_posts = cache.get(cache_key)
if cached_posts is None:
# Expensive query
cached_posts = list(
Post.objects.select_related('author', 'category')
.annotate(
comment_count=Count('comments'),
engagement_score=F('view_count') + Count('comments') * 5
)
.filter(engagement_score__gte=100)
.order_by('-engagement_score')[:20]
)
cache.set(cache_key, cached_posts, cache_timeout)
return cached_posts
@staticmethod
def implement_query_result_caching():
"""Implement automatic query result caching"""
class CachedQuerySet(models.QuerySet):
def cache_key(self):
"""Generate cache key from query"""
import hashlib
query_str = str(self.query)
return f"queryset_{hashlib.md5(query_str.encode()).hexdigest()}"
def cached(self, timeout=300):
"""Return cached results if available"""
cache_key = self.cache_key()
cached_result = cache.get(cache_key)
if cached_result is None:
cached_result = list(self)
cache.set(cache_key, cached_result, timeout)
return cached_result
class CachedManager(models.Manager):
def get_queryset(self):
return CachedQuerySet(self.model, using=self._db)
# Usage in model
class CachedPost(Post):
cached_objects = CachedManager()
class Meta:
proxy = True
# Usage
# popular_posts = CachedPost.cached_objects.filter(view_count__gte=1000).cached()
@staticmethod
def invalidate_related_caches():
"""Invalidate caches when data changes"""
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
@receiver([post_save, post_delete], sender=Post)
def invalidate_post_caches(sender, **kwargs):
"""Invalidate post-related caches"""
cache_keys_to_invalidate = [
'popular_posts',
'recent_posts',
'featured_posts',
]
for key in cache_keys_to_invalidate:
cache.delete(key)
# Invalidate category-specific caches
instance = kwargs.get('instance')
if instance and instance.category:
cache.delete(f'category_posts_{instance.category.id}')
# Database connection optimization
class ConnectionOptimization:
"""Optimize database connections"""
@staticmethod
def configure_connection_pooling():
"""Configure connection pooling for better performance"""
# Example configuration for django-db-pool
connection_pool_settings = {
'default': {
'ENGINE': 'django_db_pool.backends.postgresql',
'NAME': 'myapp_db',
'USER': 'postgres',
'PASSWORD': 'password',
'HOST': 'localhost',
'PORT': '5432',
'POOL_OPTIONS': {
'POOL_SIZE': 20,
'MAX_OVERFLOW': 30,
'RECYCLE': 3600, # Recycle connections after 1 hour
'PRE_PING': True, # Validate connections before use
},
}
}
return connection_pool_settings
@staticmethod
def optimize_connection_settings():
"""Optimize database connection settings"""
optimized_settings = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'OPTIONS': {
# Connection settings
'MAX_CONNS': 20,
'connect_timeout': 10,
# Performance settings
'init_command': """
SET default_transaction_isolation TO 'read committed';
SET timezone TO 'UTC';
SET shared_preload_libraries TO 'pg_stat_statements';
""",
# Memory settings
'work_mem': '256MB',
'shared_buffers': '1GB',
'effective_cache_size': '4GB',
},
}
}
return optimized_settings
Database optimization is an ongoing process that requires monitoring, analysis, and iterative improvements. By understanding query patterns, implementing proper indexing strategies, and using Django's ORM efficiently, you can build applications that scale effectively with your data growth.
Database Instrumentation
Database instrumentation provides visibility into your application's database performance, helping you identify bottlenecks, monitor query patterns, and optimize database operations. Understanding how to implement comprehensive monitoring enables proactive performance management.
Fixtures
Fixtures provide a way to pre-populate your database with data for testing, development, and initial application setup. Understanding how to create, manage, and use fixtures effectively enables consistent data management across different environments.