Models and Databases

Raw SQL Queries

While Django's ORM handles most database operations elegantly, there are times when you need the power and flexibility of raw SQL. Understanding how to safely execute raw SQL queries enables you to optimize performance, use database-specific features, and handle complex operations that are difficult to express with the ORM.

Raw SQL Queries

While Django's ORM handles most database operations elegantly, there are times when you need the power and flexibility of raw SQL. Understanding how to safely execute raw SQL queries enables you to optimize performance, use database-specific features, and handle complex operations that are difficult to express with the ORM.

When to Use Raw SQL

Scenarios for Raw SQL

# Complex analytical queries
def get_monthly_revenue_report():
    """Complex financial report that's easier in raw SQL"""
    
    sql = """
    SELECT 
        DATE_TRUNC('month', o.created_at) as month,
        COUNT(DISTINCT o.id) as order_count,
        COUNT(DISTINCT o.customer_id) as unique_customers,
        SUM(oi.quantity * oi.unit_price) as gross_revenue,
        SUM(oi.quantity * p.cost_price) as total_cost,
        SUM(oi.quantity * oi.unit_price) - SUM(oi.quantity * p.cost_price) as profit,
        AVG(oi.quantity * oi.unit_price) as avg_order_value
    FROM orders_order o
    JOIN orders_orderitem oi ON o.id = oi.order_id
    JOIN products_product p ON oi.product_id = p.id
    WHERE o.status = 'completed'
        AND o.created_at >= %s
    GROUP BY DATE_TRUNC('month', o.created_at)
    ORDER BY month DESC
    """
    
    from django.db import connection
    from datetime import datetime, timedelta
    
    start_date = datetime.now() - timedelta(days=365)
    
    with connection.cursor() as cursor:
        cursor.execute(sql, [start_date])
        columns = [col[0] for col in cursor.description]
        return [dict(zip(columns, row)) for row in cursor.fetchall()]

# Database-specific optimizations
def get_top_products_with_window_functions():
    """Use PostgreSQL window functions for ranking"""
    
    sql = """
    SELECT 
        p.name,
        p.category_id,
        c.name as category_name,
        SUM(oi.quantity) as total_sold,
        SUM(oi.quantity * oi.unit_price) as revenue,
        RANK() OVER (PARTITION BY p.category_id ORDER BY SUM(oi.quantity) DESC) as category_rank,
        PERCENT_RANK() OVER (ORDER BY SUM(oi.quantity) DESC) as overall_percentile
    FROM products_product p
    JOIN products_category c ON p.category_id = c.id
    JOIN orders_orderitem oi ON p.id = oi.product_id
    JOIN orders_order o ON oi.order_id = o.id
    WHERE o.status = 'completed'
        AND o.created_at >= CURRENT_DATE - INTERVAL '90 days'
    GROUP BY p.id, p.name, p.category_id, c.name
    HAVING SUM(oi.quantity) > 0
    ORDER BY category_id, category_rank
    """
    
    return execute_raw_query(sql)

# Performance-critical queries
def get_user_activity_summary(user_id):
    """Optimized user activity query"""
    
    sql = """
    WITH user_stats AS (
        SELECT 
            COUNT(DISTINCT p.id) as post_count,
            COUNT(DISTINCT c.id) as comment_count,
            COUNT(DISTINCT l.id) as like_count,
            MAX(p.created_at) as last_post_date,
            MAX(c.created_at) as last_comment_date
        FROM auth_user u
        LEFT JOIN blog_post p ON u.id = p.author_id
        LEFT JOIN blog_comment c ON u.id = c.author_id
        LEFT JOIN blog_like l ON u.id = l.user_id
        WHERE u.id = %s
    ),
    engagement_score AS (
        SELECT 
            (post_count * 10 + comment_count * 2 + like_count) as score
        FROM user_stats
    )
    SELECT 
        us.*,
        es.score as engagement_score,
        CASE 
            WHEN es.score >= 100 THEN 'High'
            WHEN es.score >= 50 THEN 'Medium'
            ELSE 'Low'
        END as engagement_level
    FROM user_stats us, engagement_score es
    """
    
    with connection.cursor() as cursor:
        cursor.execute(sql, [user_id])
        columns = [col[0] for col in cursor.description]
        result = cursor.fetchone()
        return dict(zip(columns, result)) if result else None

Raw SQL with Model Instances

Using raw() Method

from django.db import models

class PostManager(models.Manager):
    def popular_posts_raw(self, min_views=1000):
        """Get popular posts using raw SQL but return model instances"""
        
        return self.raw("""
            SELECT * FROM blog_post 
            WHERE view_count >= %s 
                AND status = 'published'
            ORDER BY view_count DESC, created_at DESC
        """, [min_views])
    
    def posts_with_engagement_score(self):
        """Calculate engagement score using raw SQL"""
        
        return self.raw("""
            SELECT 
                p.*,
                (p.view_count + COUNT(c.id) * 5 + COUNT(l.id) * 2) as engagement_score
            FROM blog_post p
            LEFT JOIN blog_comment c ON p.id = c.post_id AND c.is_approved = true
            LEFT JOIN blog_like l ON p.id = l.post_id
            WHERE p.status = 'published'
            GROUP BY p.id
            ORDER BY engagement_score DESC
        """)
    
    def search_posts_full_text(self, query):
        """PostgreSQL full-text search using raw SQL"""
        
        return self.raw("""
            SELECT 
                *,
                ts_rank(to_tsvector('english', title || ' ' || content), plainto_tsquery('english', %s)) as rank
            FROM blog_post
            WHERE to_tsvector('english', title || ' ' || content) @@ plainto_tsquery('english', %s)
                AND status = 'published'
            ORDER BY rank DESC, created_at DESC
        """, [query, query])

class Post(models.Model):
    title = models.CharField(max_length=200)
    content = models.TextField()
    view_count = models.PositiveIntegerField(default=0)
    status = models.CharField(max_length=20, default='draft')
    created_at = models.DateTimeField(auto_now_add=True)
    
    objects = PostManager()

# Usage
popular_posts = Post.objects.popular_posts_raw(min_views=500)
for post in popular_posts:
    print(f"{post.title}: {post.view_count} views")

# Access additional fields from raw SQL
posts_with_scores = Post.objects.posts_with_engagement_score()
for post in posts_with_scores:
    print(f"{post.title}: {post.engagement_score} engagement")

Raw SQL with Annotations

class AdvancedPostManager(models.Manager):
    def annotate_with_raw_sql(self):
        """Combine ORM with raw SQL for complex calculations"""
        
        return self.extra(
            select={
                'comment_count': """
                    SELECT COUNT(*) 
                    FROM blog_comment 
                    WHERE blog_comment.post_id = blog_post.id 
                        AND blog_comment.is_approved = true
                """,
                'avg_rating': """
                    SELECT AVG(rating) 
                    FROM blog_rating 
                    WHERE blog_rating.post_id = blog_post.id
                """,
                'days_since_published': """
                    CASE 
                        WHEN published_at IS NOT NULL 
                        THEN EXTRACT(days FROM (NOW() - published_at))
                        ELSE NULL 
                    END
                """
            },
            where=[
                "status = %s",
                "created_at >= %s"
            ],
            params=['published', '2023-01-01'],
            order_by=['-view_count']
        )
    
    def complex_filtering_with_raw(self, author_ids, min_engagement=100):
        """Complex filtering using raw SQL conditions"""
        
        return self.extra(
            select={
                'engagement_score': """
                    (view_count + 
                     (SELECT COUNT(*) FROM blog_comment WHERE post_id = blog_post.id) * 5 +
                     (SELECT COUNT(*) FROM blog_like WHERE post_id = blog_post.id) * 2)
                """
            },
            where=[
                """
                author_id = ANY(%s) AND
                (view_count + 
                 (SELECT COUNT(*) FROM blog_comment WHERE post_id = blog_post.id) * 5 +
                 (SELECT COUNT(*) FROM blog_like WHERE post_id = blog_post.id) * 2) >= %s
                """
            ],
            params=[author_ids, min_engagement]
        )

# Usage
annotated_posts = Post.objects.annotate_with_raw_sql()
for post in annotated_posts:
    print(f"{post.title}: {post.comment_count} comments, {post.avg_rating} rating")

Direct Database Access

Using Django's Database Connection

from django.db import connection, connections

def execute_raw_query(sql, params=None, database='default'):
    """Execute raw SQL query and return results"""
    
    db_connection = connections[database]
    
    with db_connection.cursor() as cursor:
        cursor.execute(sql, params or [])
        
        if cursor.description:
            # SELECT query - return results
            columns = [col[0] for col in cursor.description]
            return [dict(zip(columns, row)) for row in cursor.fetchall()]
        else:
            # INSERT/UPDATE/DELETE - return affected rows
            return cursor.rowcount

def execute_many_raw(sql, param_list, database='default'):
    """Execute raw SQL with multiple parameter sets"""
    
    db_connection = connections[database]
    
    with db_connection.cursor() as cursor:
        cursor.executemany(sql, param_list)
        return cursor.rowcount

# Complex reporting queries
def generate_sales_report(start_date, end_date):
    """Generate comprehensive sales report"""
    
    sql = """
    WITH daily_sales AS (
        SELECT 
            DATE(o.created_at) as sale_date,
            COUNT(DISTINCT o.id) as order_count,
            COUNT(DISTINCT o.customer_id) as unique_customers,
            SUM(oi.quantity * oi.unit_price) as daily_revenue,
            AVG(oi.quantity * oi.unit_price) as avg_order_value
        FROM orders_order o
        JOIN orders_orderitem oi ON o.id = oi.order_id
        WHERE o.status = 'completed'
            AND o.created_at BETWEEN %s AND %s
        GROUP BY DATE(o.created_at)
    ),
    product_performance AS (
        SELECT 
            p.name as product_name,
            p.category_id,
            SUM(oi.quantity) as units_sold,
            SUM(oi.quantity * oi.unit_price) as product_revenue,
            COUNT(DISTINCT oi.order_id) as orders_containing_product
        FROM products_product p
        JOIN orders_orderitem oi ON p.id = oi.product_id
        JOIN orders_order o ON oi.order_id = o.id
        WHERE o.status = 'completed'
            AND o.created_at BETWEEN %s AND %s
        GROUP BY p.id, p.name, p.category_id
    ),
    customer_segments AS (
        SELECT 
            CASE 
                WHEN total_spent >= 1000 THEN 'VIP'
                WHEN total_spent >= 500 THEN 'Premium'
                WHEN total_spent >= 100 THEN 'Regular'
                ELSE 'New'
            END as segment,
            COUNT(*) as customer_count,
            AVG(total_spent) as avg_spent_per_customer
        FROM (
            SELECT 
                o.customer_id,
                SUM(oi.quantity * oi.unit_price) as total_spent
            FROM orders_order o
            JOIN orders_orderitem oi ON o.id = oi.order_id
            WHERE o.status = 'completed'
                AND o.created_at BETWEEN %s AND %s
            GROUP BY o.customer_id
        ) customer_totals
        GROUP BY segment
    )
    SELECT 
        'daily_sales' as report_type,
        json_agg(daily_sales.*) as data
    FROM daily_sales
    UNION ALL
    SELECT 
        'product_performance' as report_type,
        json_agg(product_performance.*) as data
    FROM product_performance
    UNION ALL
    SELECT 
        'customer_segments' as report_type,
        json_agg(customer_segments.*) as data
    FROM customer_segments
    """
    
    params = [start_date, end_date] * 3  # Same date range for all CTEs
    results = execute_raw_query(sql, params)
    
    # Organize results by report type
    report_data = {}
    for row in results:
        report_data[row['report_type']] = row['data']
    
    return report_data

# Bulk operations with raw SQL
def bulk_update_view_counts(post_view_data):
    """Efficiently update view counts for multiple posts"""
    
    sql = """
    UPDATE blog_post 
    SET view_count = view_count + data.increment
    FROM (VALUES %s) AS data(post_id, increment)
    WHERE blog_post.id = data.post_id
    """
    
    # Prepare values for PostgreSQL
    values = ','.join([f"({post_id}, {increment})" for post_id, increment in post_view_data])
    final_sql = sql.replace('%s', values)
    
    with connection.cursor() as cursor:
        cursor.execute(final_sql)
        return cursor.rowcount

def bulk_insert_analytics_data(analytics_records):
    """Bulk insert analytics data"""
    
    sql = """
    INSERT INTO analytics_pageview (url, user_id, session_id, timestamp, user_agent)
    VALUES %s
    ON CONFLICT (url, user_id, session_id, timestamp) DO NOTHING
    """
    
    values = ','.join([
        f"('{record['url']}', {record['user_id']}, '{record['session_id']}', "
        f"'{record['timestamp']}', '{record['user_agent']}')"
        for record in analytics_records
    ])
    
    final_sql = sql.replace('%s', values)
    
    with connection.cursor() as cursor:
        cursor.execute(final_sql)
        return cursor.rowcount

Database-Specific Features

PostgreSQL Advanced Features

# PostgreSQL-specific raw SQL features
class PostgreSQLQueries:
    
    @staticmethod
    def jsonb_aggregation_query():
        """Use PostgreSQL JSONB aggregation"""
        
        sql = """
        SELECT 
            c.name as category_name,
            jsonb_agg(
                jsonb_build_object(
                    'id', p.id,
                    'title', p.title,
                    'view_count', p.view_count,
                    'tags', (
                        SELECT jsonb_agg(t.name)
                        FROM blog_tag t
                        JOIN blog_post_tags pt ON t.id = pt.tag_id
                        WHERE pt.post_id = p.id
                    )
                ) ORDER BY p.view_count DESC
            ) as posts
        FROM blog_category c
        JOIN blog_post p ON c.id = p.category_id
        WHERE p.status = 'published'
        GROUP BY c.id, c.name
        ORDER BY c.name
        """
        
        return execute_raw_query(sql)
    
    @staticmethod
    def array_operations():
        """PostgreSQL array operations"""
        
        sql = """
        SELECT 
            p.title,
            array_agg(DISTINCT t.name ORDER BY t.name) as tag_names,
            array_length(array_agg(DISTINCT t.name), 1) as tag_count
        FROM blog_post p
        LEFT JOIN blog_post_tags pt ON p.id = pt.post_id
        LEFT JOIN blog_tag t ON pt.tag_id = t.id
        WHERE p.status = 'published'
        GROUP BY p.id, p.title
        HAVING array_length(array_agg(DISTINCT t.name), 1) > 2
        ORDER BY tag_count DESC
        """
        
        return execute_raw_query(sql)
    
    @staticmethod
    def recursive_category_tree():
        """Recursive CTE for category hierarchy"""
        
        sql = """
        WITH RECURSIVE category_tree AS (
            -- Base case: root categories
            SELECT 
                id, 
                name, 
                parent_id, 
                0 as level,
                ARRAY[name] as path,
                name as root_category
            FROM blog_category 
            WHERE parent_id IS NULL
            
            UNION ALL
            
            -- Recursive case: child categories
            SELECT 
                c.id, 
                c.name, 
                c.parent_id,
                ct.level + 1,
                ct.path || c.name,
                ct.root_category
            FROM blog_category c
            JOIN category_tree ct ON c.parent_id = ct.id
        )
        SELECT 
            ct.*,
            array_to_string(ct.path, ' > ') as full_path,
            (SELECT COUNT(*) FROM blog_post WHERE category_id = ct.id) as post_count
        FROM category_tree ct
        ORDER BY ct.root_category, ct.level, ct.name
        """
        
        return execute_raw_query(sql)

# MySQL-specific features
class MySQLQueries:
    
    @staticmethod
    def full_text_search_with_boolean_mode():
        """MySQL full-text search with boolean operators"""
        
        sql = """
        SELECT 
            *,
            MATCH(title, content) AGAINST(%s IN BOOLEAN MODE) as relevance_score
        FROM blog_post
        WHERE MATCH(title, content) AGAINST(%s IN BOOLEAN MODE)
            AND status = 'published'
        ORDER BY relevance_score DESC
        LIMIT 20
        """
        
        # Boolean search: +django +tutorial -beginner
        query = "+django +tutorial -beginner"
        return execute_raw_query(sql, [query, query])
    
    @staticmethod
    def json_operations():
        """MySQL JSON operations (MySQL 5.7+)"""
        
        sql = """
        SELECT 
            id,
            title,
            JSON_EXTRACT(metadata, '$.tags') as tags,
            JSON_EXTRACT(metadata, '$.difficulty') as difficulty,
            JSON_LENGTH(JSON_EXTRACT(metadata, '$.tags')) as tag_count
        FROM blog_post
        WHERE JSON_EXTRACT(metadata, '$.difficulty') = 'intermediate'
            AND JSON_LENGTH(JSON_EXTRACT(metadata, '$.tags')) > 2
        ORDER BY created_at DESC
        """
        
        return execute_raw_query(sql)

# SQLite-specific optimizations
class SQLiteQueries:
    
    @staticmethod
    def fts_search():
        """SQLite FTS (Full-Text Search)"""
        
        # First, create FTS virtual table (in migration)
        """
        CREATE VIRTUAL TABLE post_fts USING fts5(
            title, content, content='blog_post', content_rowid='id'
        );
        
        -- Populate FTS table
        INSERT INTO post_fts(rowid, title, content)
        SELECT id, title, content FROM blog_post WHERE status = 'published';
        """
        
        sql = """
        SELECT 
            p.*,
            bm25(post_fts) as rank
        FROM post_fts
        JOIN blog_post p ON post_fts.rowid = p.id
        WHERE post_fts MATCH %s
            AND p.status = 'published'
        ORDER BY rank
        """
        
        return execute_raw_query(sql, ['django tutorial'])

Security and Best Practices

SQL Injection Prevention

# SECURE: Using parameterized queries
def secure_user_posts(user_id, status='published'):
    """Secure way to query user posts"""
    
    sql = """
    SELECT * FROM blog_post 
    WHERE author_id = %s AND status = %s
    ORDER BY created_at DESC
    """
    
    return execute_raw_query(sql, [user_id, status])

# INSECURE: String formatting (DON'T DO THIS)
def insecure_user_posts(user_id, status='published'):
    """INSECURE - vulnerable to SQL injection"""
    
    # DON'T DO THIS!
    sql = f"""
    SELECT * FROM blog_post 
    WHERE author_id = {user_id} AND status = '{status}'
    ORDER BY created_at DESC
    """
    
    return execute_raw_query(sql)

# Safe dynamic query building
def build_dynamic_search_query(filters):
    """Safely build dynamic queries"""
    
    base_sql = "SELECT * FROM blog_post WHERE 1=1"
    params = []
    
    if filters.get('author_id'):
        base_sql += " AND author_id = %s"
        params.append(filters['author_id'])
    
    if filters.get('category_id'):
        base_sql += " AND category_id = %s"
        params.append(filters['category_id'])
    
    if filters.get('status'):
        base_sql += " AND status = %s"
        params.append(filters['status'])
    
    if filters.get('search_term'):
        base_sql += " AND (title ILIKE %s OR content ILIKE %s)"
        search_pattern = f"%{filters['search_term']}%"
        params.extend([search_pattern, search_pattern])
    
    base_sql += " ORDER BY created_at DESC"
    
    return execute_raw_query(base_sql, params)

# Input validation and sanitization
def validate_and_execute_query(table_name, columns, filters):
    """Validate inputs before executing raw SQL"""
    
    # Whitelist allowed tables and columns
    ALLOWED_TABLES = ['blog_post', 'blog_category', 'auth_user']
    ALLOWED_COLUMNS = {
        'blog_post': ['id', 'title', 'content', 'author_id', 'status', 'created_at'],
        'blog_category': ['id', 'name', 'slug'],
        'auth_user': ['id', 'username', 'email', 'first_name', 'last_name']
    }
    
    # Validate table name
    if table_name not in ALLOWED_TABLES:
        raise ValueError(f"Table '{table_name}' not allowed")
    
    # Validate columns
    for column in columns:
        if column not in ALLOWED_COLUMNS[table_name]:
            raise ValueError(f"Column '{column}' not allowed for table '{table_name}'")
    
    # Build safe query
    column_list = ', '.join(columns)
    sql = f"SELECT {column_list} FROM {table_name} WHERE 1=1"
    params = []
    
    # Add filters safely
    for field, value in filters.items():
        if field in ALLOWED_COLUMNS[table_name]:
            sql += f" AND {field} = %s"
            params.append(value)
    
    return execute_raw_query(sql, params)

# Transaction management with raw SQL
def execute_raw_transaction(queries_and_params):
    """Execute multiple raw SQL queries in a transaction"""
    
    from django.db import transaction
    
    results = []
    
    with transaction.atomic():
        with connection.cursor() as cursor:
            for sql, params in queries_and_params:
                cursor.execute(sql, params)
                
                if cursor.description:
                    columns = [col[0] for col in cursor.description]
                    results.append([dict(zip(columns, row)) for row in cursor.fetchall()])
                else:
                    results.append(cursor.rowcount)
    
    return results

# Usage
transaction_queries = [
    ("UPDATE blog_post SET view_count = view_count + 1 WHERE id = %s", [1]),
    ("INSERT INTO analytics_view (post_id, timestamp) VALUES (%s, %s)", [1, timezone.now()]),
    ("SELECT view_count FROM blog_post WHERE id = %s", [1])
]

results = execute_raw_transaction(transaction_queries)

Raw SQL queries provide the ultimate flexibility for complex database operations, but they should be used judiciously with proper security measures and clear documentation of why the ORM wasn't sufficient for the specific use case.