Models and Databases

Database Optimization

Database optimization is crucial for building scalable Django applications. Understanding how to identify performance bottlenecks, optimize queries, and implement efficient database patterns ensures your application can handle growth and maintain responsiveness.

Database Optimization

Database optimization is crucial for building scalable Django applications. Understanding how to identify performance bottlenecks, optimize queries, and implement efficient database patterns ensures your application can handle growth and maintain responsiveness.

Query Optimization Fundamentals

Understanding Query Performance

# Enable query logging for development
# settings.py
LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
        },
    },
    'loggers': {
        'django.db.backends': {
            'handlers': ['console'],
            'level': 'DEBUG',
            'propagate': False,
        },
    },
}

# Query analysis tools
from django.db import connection
from django.conf import settings
import time

class QueryAnalyzer:
    """Analyze and optimize database queries"""
    
    @staticmethod
    def analyze_queries(func):
        """Decorator to analyze queries executed by a function"""
        
        def wrapper(*args, **kwargs):
            # Reset query log
            connection.queries_log.clear()
            
            start_time = time.time()
            result = func(*args, **kwargs)
            end_time = time.time()
            
            # Analyze queries
            total_queries = len(connection.queries)
            total_time = sum(float(q['time']) for q in connection.queries)
            
            print(f"\n=== Query Analysis for {func.__name__} ===")
            print(f"Total queries: {total_queries}")
            print(f"Total time: {total_time:.4f}s")
            print(f"Function time: {end_time - start_time:.4f}s")
            
            # Show slow queries
            slow_queries = [q for q in connection.queries if float(q['time']) > 0.01]
            if slow_queries:
                print(f"\nSlow queries ({len(slow_queries)}):")
                for i, query in enumerate(slow_queries, 1):
                    print(f"{i}. Time: {query['time']}s")
                    print(f"   SQL: {query['sql'][:100]}...")
            
            # Detect N+1 queries
            similar_queries = {}
            for query in connection.queries:
                sql_pattern = query['sql'].split('WHERE')[0] if 'WHERE' in query['sql'] else query['sql']
                similar_queries[sql_pattern] = similar_queries.get(sql_pattern, 0) + 1
            
            n_plus_one = {pattern: count for pattern, count in similar_queries.items() if count > 5}
            if n_plus_one:
                print(f"\nPotential N+1 queries:")
                for pattern, count in n_plus_one.items():
                    print(f"  {count}x: {pattern[:80]}...")
            
            return result
        
        return wrapper
    
    @staticmethod
    def explain_query(queryset):
        """Get query execution plan"""
        
        if connection.vendor == 'postgresql':
            with connection.cursor() as cursor:
                cursor.execute(f"EXPLAIN ANALYZE {queryset.query}")
                return cursor.fetchall()
        
        elif connection.vendor == 'mysql':
            with connection.cursor() as cursor:
                cursor.execute(f"EXPLAIN {queryset.query}")
                return cursor.fetchall()
        
        else:
            return str(queryset.query)

# Example usage
@QueryAnalyzer.analyze_queries
def get_user_posts_inefficient(user_id):
    """Inefficient version with N+1 queries"""
    
    user = User.objects.get(id=user_id)
    posts = Post.objects.filter(author=user)
    
    # This creates N+1 queries
    result = []
    for post in posts:
        result.append({
            'title': post.title,
            'author_name': post.author.username,  # N+1 query here
            'category': post.category.name,       # N+1 query here
            'comment_count': post.comments.count()  # N+1 query here
        })
    
    return result

@QueryAnalyzer.analyze_queries
def get_user_posts_optimized(user_id):
    """Optimized version with proper prefetching"""
    
    posts = Post.objects.filter(author_id=user_id) \
                       .select_related('author', 'category') \
                       .prefetch_related('comments')
    
    result = []
    for post in posts:
        result.append({
            'title': post.title,
            'author_name': post.author.username,
            'category': post.category.name,
            'comment_count': post.comments.count()
        })
    
    return result
from django.db.models import Prefetch, Count

class OptimizedQueryPatterns:
    """Patterns for optimized database queries"""
    
    @staticmethod
    def basic_select_related():
        """Basic select_related usage for foreign keys"""
        
        # Bad: N+1 queries
        posts = Post.objects.all()
        for post in posts:
            print(f"{post.title} by {post.author.username}")  # Query per post
        
        # Good: Single query with JOIN
        posts = Post.objects.select_related('author').all()
        for post in posts:
            print(f"{post.title} by {post.author.username}")  # No additional queries
    
    @staticmethod
    def multi_level_select_related():
        """Multi-level select_related for nested relationships"""
        
        # Select related across multiple levels
        posts = Post.objects.select_related(
            'author',
            'author__profile',
            'category',
            'category__parent'
        ).all()
        
        for post in posts:
            print(f"{post.title} by {post.author.profile.full_name}")
            print(f"Category: {post.category.parent.name} > {post.category.name}")
    
    @staticmethod
    def basic_prefetch_related():
        """Basic prefetch_related for many-to-many and reverse foreign keys"""
        
        # Bad: N+1 queries for tags
        posts = Post.objects.all()
        for post in posts:
            tags = list(post.tags.all())  # Query per post
        
        # Good: Prefetch tags in separate query
        posts = Post.objects.prefetch_related('tags').all()
        for post in posts:
            tags = list(post.tags.all())  # No additional queries
    
    @staticmethod
    def custom_prefetch_related():
        """Custom prefetch with filtering and ordering"""
        
        # Prefetch only approved comments, ordered by date
        posts = Post.objects.prefetch_related(
            Prefetch(
                'comments',
                queryset=Comment.objects.filter(is_approved=True)
                                       .select_related('author')
                                       .order_by('-created_at')
            )
        ).all()
        
        for post in posts:
            approved_comments = post.comments.all()  # Already filtered and ordered
    
    @staticmethod
    def complex_prefetch_patterns():
        """Complex prefetch patterns for nested relationships"""
        
        # Prefetch posts with their comments and comment authors
        authors = Author.objects.prefetch_related(
            Prefetch(
                'posts',
                queryset=Post.objects.select_related('category')
                                   .prefetch_related(
                                       Prefetch(
                                           'comments',
                                           queryset=Comment.objects.select_related('author')
                                                                  .filter(is_approved=True)
                                       )
                                   )
            )
        ).all()
        
        for author in authors:
            for post in author.posts.all():
                print(f"Post: {post.title} in {post.category.name}")
                for comment in post.comments.all():
                    print(f"  Comment by {comment.author.username}")
    
    @staticmethod
    def conditional_prefetch():
        """Conditional prefetching based on user permissions"""
        
        def get_posts_for_user(user):
            """Get posts with appropriate prefetching based on user role"""
            
            base_queryset = Post.objects.select_related('author', 'category')
            
            if user.is_staff:
                # Staff can see all comments
                return base_queryset.prefetch_related('comments__author')
            else:
                # Regular users only see approved comments
                return base_queryset.prefetch_related(
                    Prefetch(
                        'comments',
                        queryset=Comment.objects.filter(is_approved=True)
                                               .select_related('author')
                    )
                )
    
    @staticmethod
    def optimized_aggregation_queries():
        """Optimize queries with aggregations"""
        
        # Bad: Multiple queries for counts
        posts = Post.objects.all()
        for post in posts:
            comment_count = post.comments.count()  # Query per post
            like_count = post.likes.count()        # Query per post
        
        # Good: Annotate with counts
        posts = Post.objects.annotate(
            comment_count=Count('comments'),
            like_count=Count('likes')
        ).select_related('author', 'category')
        
        for post in posts:
            print(f"{post.title}: {post.comment_count} comments, {post.like_count} likes")

Database Indexing Strategies

Index Creation and Management

# models.py with strategic indexing
from django.db import models

class OptimizedPost(models.Model):
    """Post model with optimized indexing"""
    
    title = models.CharField(max_length=200)
    slug = models.SlugField(max_length=200, unique=True)  # Automatic index
    content = models.TextField()
    author = models.ForeignKey(User, on_delete=models.CASCADE)  # Automatic index
    category = models.ForeignKey(Category, on_delete=models.SET_NULL, null=True)
    status = models.CharField(max_length=20, default='draft')
    is_featured = models.BooleanField(default=False)
    view_count = models.PositiveIntegerField(default=0)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    published_at = models.DateTimeField(null=True, blank=True)
    
    class Meta:
        # Composite indexes for common query patterns
        indexes = [
            # For filtering published posts by date
            models.Index(fields=['status', 'published_at']),
            
            # For author's posts ordered by date
            models.Index(fields=['author', 'created_at']),
            
            # For category posts with status
            models.Index(fields=['category', 'status', 'created_at']),
            
            # For featured posts
            models.Index(fields=['is_featured', 'published_at']),
            
            # For popular posts
            models.Index(fields=['view_count']),
            
            # Partial index for published posts only (PostgreSQL)
            models.Index(
                fields=['created_at'],
                name='idx_published_posts_date',
                condition=models.Q(status='published')
            ),
        ]
        
        # Default ordering uses indexed fields
        ordering = ['-created_at']

class IndexOptimizationManager:
    """Manage database indexes for optimal performance"""
    
    @staticmethod
    def analyze_missing_indexes():
        """Analyze queries to identify missing indexes"""
        
        # PostgreSQL: Check for sequential scans on large tables
        missing_indexes_query = """
        SELECT 
            schemaname,
            tablename,
            seq_scan,
            seq_tup_read,
            idx_scan,
            idx_tup_fetch,
            seq_tup_read / seq_scan as avg_seq_read
        FROM pg_stat_user_tables
        WHERE seq_scan > 0
            AND seq_tup_read / seq_scan > 1000
        ORDER BY seq_tup_read DESC
        """
        
        with connection.cursor() as cursor:
            cursor.execute(missing_indexes_query)
            return cursor.fetchall()
    
    @staticmethod
    def analyze_unused_indexes():
        """Find unused indexes that can be dropped"""
        
        unused_indexes_query = """
        SELECT 
            schemaname,
            tablename,
            indexname,
            idx_scan,
            pg_size_pretty(pg_relation_size(indexrelid)) as size
        FROM pg_stat_user_indexes
        WHERE idx_scan = 0
            AND indexrelname NOT LIKE '%_pkey'
        ORDER BY pg_relation_size(indexrelid) DESC
        """
        
        with connection.cursor() as cursor:
            cursor.execute(unused_indexes_query)
            return cursor.fetchall()
    
    @staticmethod
    def create_conditional_indexes():
        """Create conditional indexes for specific use cases"""
        
        conditional_indexes = [
            # Index only for published posts
            """
            CREATE INDEX CONCURRENTLY idx_published_posts_category
            ON blog_post (category_id, created_at)
            WHERE status = 'published'
            """,
            
            # Index for active users only
            """
            CREATE INDEX CONCURRENTLY idx_active_users_login
            ON auth_user (last_login)
            WHERE is_active = true
            """,
            
            # Index for recent data only
            """
            CREATE INDEX CONCURRENTLY idx_recent_analytics
            ON analytics_event (event_type, timestamp)
            WHERE timestamp > NOW() - INTERVAL '30 days'
            """,
        ]
        
        with connection.cursor() as cursor:
            for index_sql in conditional_indexes:
                try:
                    cursor.execute(index_sql)
                    print(f"Created index: {index_sql.split()[4]}")
                except Exception as e:
                    print(f"Error creating index: {e}")
    
    @staticmethod
    def optimize_text_search_indexes():
        """Create indexes for text search optimization"""
        
        # PostgreSQL GIN indexes for full-text search
        text_search_indexes = [
            # Full-text search on title and content
            """
            CREATE INDEX CONCURRENTLY idx_post_fulltext
            ON blog_post USING GIN (to_tsvector('english', title || ' ' || content))
            """,
            
            # Trigram index for fuzzy matching
            """
            CREATE EXTENSION IF NOT EXISTS pg_trgm;
            CREATE INDEX CONCURRENTLY idx_post_title_trgm
            ON blog_post USING GIN (title gin_trgm_ops)
            """,
            
            # JSON field indexing
            """
            CREATE INDEX CONCURRENTLY idx_post_metadata_gin
            ON blog_post USING GIN (metadata)
            """,
        ]
        
        with connection.cursor() as cursor:
            for index_sql in text_search_indexes:
                try:
                    cursor.execute(index_sql)
                    print(f"Created text search index")
                except Exception as e:
                    print(f"Error creating text search index: {e}")

# Index monitoring and maintenance
class IndexMaintenanceManager:
    """Monitor and maintain database indexes"""
    
    @staticmethod
    def get_index_usage_stats():
        """Get comprehensive index usage statistics"""
        
        index_stats_query = """
        SELECT 
            t.schemaname,
            t.tablename,
            i.indexname,
            i.idx_scan,
            i.idx_tup_read,
            i.idx_tup_fetch,
            pg_size_pretty(pg_relation_size(i.indexrelid)) as size,
            pg_relation_size(i.indexrelid) as size_bytes,
            CASE 
                WHEN i.idx_scan = 0 THEN 'Unused'
                WHEN i.idx_scan < 100 THEN 'Low Usage'
                WHEN i.idx_scan < 1000 THEN 'Medium Usage'
                ELSE 'High Usage'
            END as usage_category
        FROM pg_stat_user_indexes i
        JOIN pg_stat_user_tables t ON i.relid = t.relid
        ORDER BY i.idx_scan DESC, pg_relation_size(i.indexrelid) DESC
        """
        
        with connection.cursor() as cursor:
            cursor.execute(index_stats_query)
            columns = [desc[0] for desc in cursor.description]
            return [dict(zip(columns, row)) for row in cursor.fetchall()]
    
    @staticmethod
    def reindex_fragmented_indexes():
        """Reindex fragmented indexes"""
        
        # Find fragmented indexes (PostgreSQL)
        fragmentation_query = """
        SELECT 
            schemaname,
            tablename,
            indexname,
            pg_size_pretty(pg_relation_size(indexrelid)) as size
        FROM pg_stat_user_indexes
        WHERE idx_scan > 1000  -- Only reindex used indexes
        ORDER BY pg_relation_size(indexrelid) DESC
        """
        
        with connection.cursor() as cursor:
            cursor.execute(fragmentation_query)
            indexes_to_reindex = cursor.fetchall()
            
            for schema, table, index, size in indexes_to_reindex:
                try:
                    cursor.execute(f"REINDEX INDEX CONCURRENTLY {index}")
                    print(f"Reindexed {index} ({size})")
                except Exception as e:
                    print(f"Error reindexing {index}: {e}")
    
    @staticmethod
    def analyze_table_statistics():
        """Update table statistics for query planner"""
        
        with connection.cursor() as cursor:
            # PostgreSQL: Update statistics
            cursor.execute("ANALYZE")
            
            # Get table statistics
            stats_query = """
            SELECT 
                schemaname,
                tablename,
                n_live_tup,
                n_dead_tup,
                last_vacuum,
                last_autovacuum,
                last_analyze,
                last_autoanalyze
            FROM pg_stat_user_tables
            ORDER BY n_live_tup DESC
            """
            
            cursor.execute(stats_query)
            return cursor.fetchall()

Query Optimization Techniques

Advanced Query Patterns

from django.db.models import Q, F, Case, When, Value, Exists, OuterRef, Subquery

class AdvancedQueryOptimization:
    """Advanced query optimization techniques"""
    
    @staticmethod
    def optimize_exists_queries():
        """Use EXISTS instead of IN for better performance"""
        
        # Bad: Using IN with subquery (can be slow for large datasets)
        popular_categories = Category.objects.filter(
            id__in=Post.objects.filter(view_count__gt=1000).values('category_id')
        )
        
        # Good: Using EXISTS (often faster)
        popular_categories = Category.objects.filter(
            Exists(Post.objects.filter(category=OuterRef('pk'), view_count__gt=1000))
        )
        
        return popular_categories
    
    @staticmethod
    def optimize_count_queries():
        """Optimize counting queries"""
        
        # Bad: count() on large tables
        total_posts = Post.objects.count()  # Can be slow on large tables
        
        # Good: Use aggregation or estimation for large datasets
        from django.db.models import Count
        
        # For approximate counts (PostgreSQL)
        with connection.cursor() as cursor:
            cursor.execute("""
                SELECT reltuples::BIGINT AS estimate
                FROM pg_class
                WHERE relname = 'blog_post'
            """)
            approximate_count = cursor.fetchone()[0]
        
        # For conditional counts, use aggregation
        category_stats = Category.objects.annotate(
            post_count=Count('posts'),
            published_count=Count('posts', filter=Q(posts__status='published'))
        )
        
        return category_stats
    
    @staticmethod
    def optimize_pagination():
        """Optimize pagination for large datasets"""
        
        # Bad: OFFSET pagination (slow for large offsets)
        def bad_pagination(page, per_page=20):
            offset = (page - 1) * per_page
            return Post.objects.all()[offset:offset + per_page]
        
        # Good: Cursor-based pagination
        def cursor_pagination(cursor_id=None, per_page=20):
            queryset = Post.objects.order_by('-id')
            
            if cursor_id:
                queryset = queryset.filter(id__lt=cursor_id)
            
            posts = list(queryset[:per_page + 1])
            
            has_next = len(posts) > per_page
            if has_next:
                posts = posts[:-1]
            
            next_cursor = posts[-1].id if posts and has_next else None
            
            return {
                'posts': posts,
                'next_cursor': next_cursor,
                'has_next': has_next
            }
    
    @staticmethod
    def optimize_complex_filters():
        """Optimize complex filtering operations"""
        
        # Use database functions instead of Python processing
        from django.db.models.functions import Extract, TruncDate
        
        # Bad: Filter in Python
        def bad_date_filtering():
            posts = Post.objects.all()
            recent_posts = [p for p in posts if p.created_at.year == 2023]
            return recent_posts
        
        # Good: Filter in database
        def good_date_filtering():
            return Post.objects.filter(created_at__year=2023)
        
        # Use annotations for complex calculations
        posts_with_metrics = Post.objects.annotate(
            # Calculate engagement score in database
            engagement_score=F('view_count') + F('like_count') * 2,
            
            # Extract date parts
            created_year=Extract('created_at', 'year'),
            created_month=Extract('created_at', 'month'),
            
            # Conditional values
            popularity_tier=Case(
                When(view_count__gte=10000, then=Value('high')),
                When(view_count__gte=1000, then=Value('medium')),
                default=Value('low')
            )
        ).filter(engagement_score__gte=100)
        
        return posts_with_metrics
    
    @staticmethod
    def optimize_bulk_operations():
        """Optimize bulk create, update, and delete operations"""
        
        # Bulk create
        def bulk_create_posts(posts_data):
            posts = [
                Post(
                    title=data['title'],
                    content=data['content'],
                    author_id=data['author_id']
                )
                for data in posts_data
            ]
            
            return Post.objects.bulk_create(posts, batch_size=1000)
        
        # Bulk update
        def bulk_update_view_counts(post_updates):
            posts_to_update = []
            
            for post_id, increment in post_updates.items():
                post = Post(id=post_id)
                post.view_count = F('view_count') + increment
                posts_to_update.append(post)
            
            return Post.objects.bulk_update(
                posts_to_update,
                ['view_count'],
                batch_size=1000
            )
        
        # Efficient bulk delete with chunking
        def bulk_delete_old_posts(cutoff_date, chunk_size=1000):
            total_deleted = 0
            
            while True:
                # Get IDs to delete in chunks
                ids_to_delete = list(
                    Post.objects.filter(created_at__lt=cutoff_date)
                               .values_list('id', flat=True)[:chunk_size]
                )
                
                if not ids_to_delete:
                    break
                
                # Delete chunk
                deleted_count = Post.objects.filter(id__in=ids_to_delete).delete()[0]
                total_deleted += deleted_count
                
                if len(ids_to_delete) < chunk_size:
                    break
            
            return total_deleted

# Query caching strategies
class QueryCachingOptimization:
    """Implement query caching for performance"""
    
    @staticmethod
    def cache_expensive_queries():
        """Cache results of expensive queries"""
        
        from django.core.cache import cache
        
        def get_popular_posts(cache_timeout=300):
            cache_key = 'popular_posts'
            cached_posts = cache.get(cache_key)
            
            if cached_posts is None:
                # Expensive query
                cached_posts = list(
                    Post.objects.select_related('author', 'category')
                               .annotate(
                                   comment_count=Count('comments'),
                                   engagement_score=F('view_count') + Count('comments') * 5
                               )
                               .filter(engagement_score__gte=100)
                               .order_by('-engagement_score')[:20]
                )
                
                cache.set(cache_key, cached_posts, cache_timeout)
            
            return cached_posts
    
    @staticmethod
    def implement_query_result_caching():
        """Implement automatic query result caching"""
        
        class CachedQuerySet(models.QuerySet):
            def cache_key(self):
                """Generate cache key from query"""
                import hashlib
                query_str = str(self.query)
                return f"queryset_{hashlib.md5(query_str.encode()).hexdigest()}"
            
            def cached(self, timeout=300):
                """Return cached results if available"""
                cache_key = self.cache_key()
                cached_result = cache.get(cache_key)
                
                if cached_result is None:
                    cached_result = list(self)
                    cache.set(cache_key, cached_result, timeout)
                
                return cached_result
        
        class CachedManager(models.Manager):
            def get_queryset(self):
                return CachedQuerySet(self.model, using=self._db)
        
        # Usage in model
        class CachedPost(Post):
            cached_objects = CachedManager()
            
            class Meta:
                proxy = True
        
        # Usage
        # popular_posts = CachedPost.cached_objects.filter(view_count__gte=1000).cached()
    
    @staticmethod
    def invalidate_related_caches():
        """Invalidate caches when data changes"""
        
        from django.db.models.signals import post_save, post_delete
        from django.dispatch import receiver
        
        @receiver([post_save, post_delete], sender=Post)
        def invalidate_post_caches(sender, **kwargs):
            """Invalidate post-related caches"""
            
            cache_keys_to_invalidate = [
                'popular_posts',
                'recent_posts',
                'featured_posts',
            ]
            
            for key in cache_keys_to_invalidate:
                cache.delete(key)
            
            # Invalidate category-specific caches
            instance = kwargs.get('instance')
            if instance and instance.category:
                cache.delete(f'category_posts_{instance.category.id}')

# Database connection optimization
class ConnectionOptimization:
    """Optimize database connections"""
    
    @staticmethod
    def configure_connection_pooling():
        """Configure connection pooling for better performance"""
        
        # Example configuration for django-db-pool
        connection_pool_settings = {
            'default': {
                'ENGINE': 'django_db_pool.backends.postgresql',
                'NAME': 'myapp_db',
                'USER': 'postgres',
                'PASSWORD': 'password',
                'HOST': 'localhost',
                'PORT': '5432',
                'POOL_OPTIONS': {
                    'POOL_SIZE': 20,
                    'MAX_OVERFLOW': 30,
                    'RECYCLE': 3600,  # Recycle connections after 1 hour
                    'PRE_PING': True,  # Validate connections before use
                },
            }
        }
        
        return connection_pool_settings
    
    @staticmethod
    def optimize_connection_settings():
        """Optimize database connection settings"""
        
        optimized_settings = {
            'default': {
                'ENGINE': 'django.db.backends.postgresql',
                'OPTIONS': {
                    # Connection settings
                    'MAX_CONNS': 20,
                    'connect_timeout': 10,
                    
                    # Performance settings
                    'init_command': """
                        SET default_transaction_isolation TO 'read committed';
                        SET timezone TO 'UTC';
                        SET shared_preload_libraries TO 'pg_stat_statements';
                    """,
                    
                    # Memory settings
                    'work_mem': '256MB',
                    'shared_buffers': '1GB',
                    'effective_cache_size': '4GB',
                },
            }
        }
        
        return optimized_settings

Database optimization is an ongoing process that requires monitoring, analysis, and iterative improvements. By understanding query patterns, implementing proper indexing strategies, and using Django's ORM efficiently, you can build applications that scale effectively with your data growth.