While Django's ORM handles most database operations elegantly, there are times when you need the power and flexibility of raw SQL. Understanding how to safely execute raw SQL queries enables you to optimize performance, use database-specific features, and handle complex operations that are difficult to express with the ORM.
# Complex analytical queries
def get_monthly_revenue_report():
"""Complex financial report that's easier in raw SQL"""
sql = """
SELECT
DATE_TRUNC('month', o.created_at) as month,
COUNT(DISTINCT o.id) as order_count,
COUNT(DISTINCT o.customer_id) as unique_customers,
SUM(oi.quantity * oi.unit_price) as gross_revenue,
SUM(oi.quantity * p.cost_price) as total_cost,
SUM(oi.quantity * oi.unit_price) - SUM(oi.quantity * p.cost_price) as profit,
AVG(oi.quantity * oi.unit_price) as avg_order_value
FROM orders_order o
JOIN orders_orderitem oi ON o.id = oi.order_id
JOIN products_product p ON oi.product_id = p.id
WHERE o.status = 'completed'
AND o.created_at >= %s
GROUP BY DATE_TRUNC('month', o.created_at)
ORDER BY month DESC
"""
from django.db import connection
from datetime import datetime, timedelta
start_date = datetime.now() - timedelta(days=365)
with connection.cursor() as cursor:
cursor.execute(sql, [start_date])
columns = [col[0] for col in cursor.description]
return [dict(zip(columns, row)) for row in cursor.fetchall()]
# Database-specific optimizations
def get_top_products_with_window_functions():
"""Use PostgreSQL window functions for ranking"""
sql = """
SELECT
p.name,
p.category_id,
c.name as category_name,
SUM(oi.quantity) as total_sold,
SUM(oi.quantity * oi.unit_price) as revenue,
RANK() OVER (PARTITION BY p.category_id ORDER BY SUM(oi.quantity) DESC) as category_rank,
PERCENT_RANK() OVER (ORDER BY SUM(oi.quantity) DESC) as overall_percentile
FROM products_product p
JOIN products_category c ON p.category_id = c.id
JOIN orders_orderitem oi ON p.id = oi.product_id
JOIN orders_order o ON oi.order_id = o.id
WHERE o.status = 'completed'
AND o.created_at >= CURRENT_DATE - INTERVAL '90 days'
GROUP BY p.id, p.name, p.category_id, c.name
HAVING SUM(oi.quantity) > 0
ORDER BY category_id, category_rank
"""
return execute_raw_query(sql)
# Performance-critical queries
def get_user_activity_summary(user_id):
"""Optimized user activity query"""
sql = """
WITH user_stats AS (
SELECT
COUNT(DISTINCT p.id) as post_count,
COUNT(DISTINCT c.id) as comment_count,
COUNT(DISTINCT l.id) as like_count,
MAX(p.created_at) as last_post_date,
MAX(c.created_at) as last_comment_date
FROM auth_user u
LEFT JOIN blog_post p ON u.id = p.author_id
LEFT JOIN blog_comment c ON u.id = c.author_id
LEFT JOIN blog_like l ON u.id = l.user_id
WHERE u.id = %s
),
engagement_score AS (
SELECT
(post_count * 10 + comment_count * 2 + like_count) as score
FROM user_stats
)
SELECT
us.*,
es.score as engagement_score,
CASE
WHEN es.score >= 100 THEN 'High'
WHEN es.score >= 50 THEN 'Medium'
ELSE 'Low'
END as engagement_level
FROM user_stats us, engagement_score es
"""
with connection.cursor() as cursor:
cursor.execute(sql, [user_id])
columns = [col[0] for col in cursor.description]
result = cursor.fetchone()
return dict(zip(columns, result)) if result else None
from django.db import models
class PostManager(models.Manager):
def popular_posts_raw(self, min_views=1000):
"""Get popular posts using raw SQL but return model instances"""
return self.raw("""
SELECT * FROM blog_post
WHERE view_count >= %s
AND status = 'published'
ORDER BY view_count DESC, created_at DESC
""", [min_views])
def posts_with_engagement_score(self):
"""Calculate engagement score using raw SQL"""
return self.raw("""
SELECT
p.*,
(p.view_count + COUNT(c.id) * 5 + COUNT(l.id) * 2) as engagement_score
FROM blog_post p
LEFT JOIN blog_comment c ON p.id = c.post_id AND c.is_approved = true
LEFT JOIN blog_like l ON p.id = l.post_id
WHERE p.status = 'published'
GROUP BY p.id
ORDER BY engagement_score DESC
""")
def search_posts_full_text(self, query):
"""PostgreSQL full-text search using raw SQL"""
return self.raw("""
SELECT
*,
ts_rank(to_tsvector('english', title || ' ' || content), plainto_tsquery('english', %s)) as rank
FROM blog_post
WHERE to_tsvector('english', title || ' ' || content) @@ plainto_tsquery('english', %s)
AND status = 'published'
ORDER BY rank DESC, created_at DESC
""", [query, query])
class Post(models.Model):
title = models.CharField(max_length=200)
content = models.TextField()
view_count = models.PositiveIntegerField(default=0)
status = models.CharField(max_length=20, default='draft')
created_at = models.DateTimeField(auto_now_add=True)
objects = PostManager()
# Usage
popular_posts = Post.objects.popular_posts_raw(min_views=500)
for post in popular_posts:
print(f"{post.title}: {post.view_count} views")
# Access additional fields from raw SQL
posts_with_scores = Post.objects.posts_with_engagement_score()
for post in posts_with_scores:
print(f"{post.title}: {post.engagement_score} engagement")
class AdvancedPostManager(models.Manager):
def annotate_with_raw_sql(self):
"""Combine ORM with raw SQL for complex calculations"""
return self.extra(
select={
'comment_count': """
SELECT COUNT(*)
FROM blog_comment
WHERE blog_comment.post_id = blog_post.id
AND blog_comment.is_approved = true
""",
'avg_rating': """
SELECT AVG(rating)
FROM blog_rating
WHERE blog_rating.post_id = blog_post.id
""",
'days_since_published': """
CASE
WHEN published_at IS NOT NULL
THEN EXTRACT(days FROM (NOW() - published_at))
ELSE NULL
END
"""
},
where=[
"status = %s",
"created_at >= %s"
],
params=['published', '2023-01-01'],
order_by=['-view_count']
)
def complex_filtering_with_raw(self, author_ids, min_engagement=100):
"""Complex filtering using raw SQL conditions"""
return self.extra(
select={
'engagement_score': """
(view_count +
(SELECT COUNT(*) FROM blog_comment WHERE post_id = blog_post.id) * 5 +
(SELECT COUNT(*) FROM blog_like WHERE post_id = blog_post.id) * 2)
"""
},
where=[
"""
author_id = ANY(%s) AND
(view_count +
(SELECT COUNT(*) FROM blog_comment WHERE post_id = blog_post.id) * 5 +
(SELECT COUNT(*) FROM blog_like WHERE post_id = blog_post.id) * 2) >= %s
"""
],
params=[author_ids, min_engagement]
)
# Usage
annotated_posts = Post.objects.annotate_with_raw_sql()
for post in annotated_posts:
print(f"{post.title}: {post.comment_count} comments, {post.avg_rating} rating")
from django.db import connection, connections
def execute_raw_query(sql, params=None, database='default'):
"""Execute raw SQL query and return results"""
db_connection = connections[database]
with db_connection.cursor() as cursor:
cursor.execute(sql, params or [])
if cursor.description:
# SELECT query - return results
columns = [col[0] for col in cursor.description]
return [dict(zip(columns, row)) for row in cursor.fetchall()]
else:
# INSERT/UPDATE/DELETE - return affected rows
return cursor.rowcount
def execute_many_raw(sql, param_list, database='default'):
"""Execute raw SQL with multiple parameter sets"""
db_connection = connections[database]
with db_connection.cursor() as cursor:
cursor.executemany(sql, param_list)
return cursor.rowcount
# Complex reporting queries
def generate_sales_report(start_date, end_date):
"""Generate comprehensive sales report"""
sql = """
WITH daily_sales AS (
SELECT
DATE(o.created_at) as sale_date,
COUNT(DISTINCT o.id) as order_count,
COUNT(DISTINCT o.customer_id) as unique_customers,
SUM(oi.quantity * oi.unit_price) as daily_revenue,
AVG(oi.quantity * oi.unit_price) as avg_order_value
FROM orders_order o
JOIN orders_orderitem oi ON o.id = oi.order_id
WHERE o.status = 'completed'
AND o.created_at BETWEEN %s AND %s
GROUP BY DATE(o.created_at)
),
product_performance AS (
SELECT
p.name as product_name,
p.category_id,
SUM(oi.quantity) as units_sold,
SUM(oi.quantity * oi.unit_price) as product_revenue,
COUNT(DISTINCT oi.order_id) as orders_containing_product
FROM products_product p
JOIN orders_orderitem oi ON p.id = oi.product_id
JOIN orders_order o ON oi.order_id = o.id
WHERE o.status = 'completed'
AND o.created_at BETWEEN %s AND %s
GROUP BY p.id, p.name, p.category_id
),
customer_segments AS (
SELECT
CASE
WHEN total_spent >= 1000 THEN 'VIP'
WHEN total_spent >= 500 THEN 'Premium'
WHEN total_spent >= 100 THEN 'Regular'
ELSE 'New'
END as segment,
COUNT(*) as customer_count,
AVG(total_spent) as avg_spent_per_customer
FROM (
SELECT
o.customer_id,
SUM(oi.quantity * oi.unit_price) as total_spent
FROM orders_order o
JOIN orders_orderitem oi ON o.id = oi.order_id
WHERE o.status = 'completed'
AND o.created_at BETWEEN %s AND %s
GROUP BY o.customer_id
) customer_totals
GROUP BY segment
)
SELECT
'daily_sales' as report_type,
json_agg(daily_sales.*) as data
FROM daily_sales
UNION ALL
SELECT
'product_performance' as report_type,
json_agg(product_performance.*) as data
FROM product_performance
UNION ALL
SELECT
'customer_segments' as report_type,
json_agg(customer_segments.*) as data
FROM customer_segments
"""
params = [start_date, end_date] * 3 # Same date range for all CTEs
results = execute_raw_query(sql, params)
# Organize results by report type
report_data = {}
for row in results:
report_data[row['report_type']] = row['data']
return report_data
# Bulk operations with raw SQL
def bulk_update_view_counts(post_view_data):
"""Efficiently update view counts for multiple posts"""
sql = """
UPDATE blog_post
SET view_count = view_count + data.increment
FROM (VALUES %s) AS data(post_id, increment)
WHERE blog_post.id = data.post_id
"""
# Prepare values for PostgreSQL
values = ','.join([f"({post_id}, {increment})" for post_id, increment in post_view_data])
final_sql = sql.replace('%s', values)
with connection.cursor() as cursor:
cursor.execute(final_sql)
return cursor.rowcount
def bulk_insert_analytics_data(analytics_records):
"""Bulk insert analytics data"""
sql = """
INSERT INTO analytics_pageview (url, user_id, session_id, timestamp, user_agent)
VALUES %s
ON CONFLICT (url, user_id, session_id, timestamp) DO NOTHING
"""
values = ','.join([
f"('{record['url']}', {record['user_id']}, '{record['session_id']}', "
f"'{record['timestamp']}', '{record['user_agent']}')"
for record in analytics_records
])
final_sql = sql.replace('%s', values)
with connection.cursor() as cursor:
cursor.execute(final_sql)
return cursor.rowcount
# PostgreSQL-specific raw SQL features
class PostgreSQLQueries:
@staticmethod
def jsonb_aggregation_query():
"""Use PostgreSQL JSONB aggregation"""
sql = """
SELECT
c.name as category_name,
jsonb_agg(
jsonb_build_object(
'id', p.id,
'title', p.title,
'view_count', p.view_count,
'tags', (
SELECT jsonb_agg(t.name)
FROM blog_tag t
JOIN blog_post_tags pt ON t.id = pt.tag_id
WHERE pt.post_id = p.id
)
) ORDER BY p.view_count DESC
) as posts
FROM blog_category c
JOIN blog_post p ON c.id = p.category_id
WHERE p.status = 'published'
GROUP BY c.id, c.name
ORDER BY c.name
"""
return execute_raw_query(sql)
@staticmethod
def array_operations():
"""PostgreSQL array operations"""
sql = """
SELECT
p.title,
array_agg(DISTINCT t.name ORDER BY t.name) as tag_names,
array_length(array_agg(DISTINCT t.name), 1) as tag_count
FROM blog_post p
LEFT JOIN blog_post_tags pt ON p.id = pt.post_id
LEFT JOIN blog_tag t ON pt.tag_id = t.id
WHERE p.status = 'published'
GROUP BY p.id, p.title
HAVING array_length(array_agg(DISTINCT t.name), 1) > 2
ORDER BY tag_count DESC
"""
return execute_raw_query(sql)
@staticmethod
def recursive_category_tree():
"""Recursive CTE for category hierarchy"""
sql = """
WITH RECURSIVE category_tree AS (
-- Base case: root categories
SELECT
id,
name,
parent_id,
0 as level,
ARRAY[name] as path,
name as root_category
FROM blog_category
WHERE parent_id IS NULL
UNION ALL
-- Recursive case: child categories
SELECT
c.id,
c.name,
c.parent_id,
ct.level + 1,
ct.path || c.name,
ct.root_category
FROM blog_category c
JOIN category_tree ct ON c.parent_id = ct.id
)
SELECT
ct.*,
array_to_string(ct.path, ' > ') as full_path,
(SELECT COUNT(*) FROM blog_post WHERE category_id = ct.id) as post_count
FROM category_tree ct
ORDER BY ct.root_category, ct.level, ct.name
"""
return execute_raw_query(sql)
# MySQL-specific features
class MySQLQueries:
@staticmethod
def full_text_search_with_boolean_mode():
"""MySQL full-text search with boolean operators"""
sql = """
SELECT
*,
MATCH(title, content) AGAINST(%s IN BOOLEAN MODE) as relevance_score
FROM blog_post
WHERE MATCH(title, content) AGAINST(%s IN BOOLEAN MODE)
AND status = 'published'
ORDER BY relevance_score DESC
LIMIT 20
"""
# Boolean search: +django +tutorial -beginner
query = "+django +tutorial -beginner"
return execute_raw_query(sql, [query, query])
@staticmethod
def json_operations():
"""MySQL JSON operations (MySQL 5.7+)"""
sql = """
SELECT
id,
title,
JSON_EXTRACT(metadata, '$.tags') as tags,
JSON_EXTRACT(metadata, '$.difficulty') as difficulty,
JSON_LENGTH(JSON_EXTRACT(metadata, '$.tags')) as tag_count
FROM blog_post
WHERE JSON_EXTRACT(metadata, '$.difficulty') = 'intermediate'
AND JSON_LENGTH(JSON_EXTRACT(metadata, '$.tags')) > 2
ORDER BY created_at DESC
"""
return execute_raw_query(sql)
# SQLite-specific optimizations
class SQLiteQueries:
@staticmethod
def fts_search():
"""SQLite FTS (Full-Text Search)"""
# First, create FTS virtual table (in migration)
"""
CREATE VIRTUAL TABLE post_fts USING fts5(
title, content, content='blog_post', content_rowid='id'
);
-- Populate FTS table
INSERT INTO post_fts(rowid, title, content)
SELECT id, title, content FROM blog_post WHERE status = 'published';
"""
sql = """
SELECT
p.*,
bm25(post_fts) as rank
FROM post_fts
JOIN blog_post p ON post_fts.rowid = p.id
WHERE post_fts MATCH %s
AND p.status = 'published'
ORDER BY rank
"""
return execute_raw_query(sql, ['django tutorial'])
# SECURE: Using parameterized queries
def secure_user_posts(user_id, status='published'):
"""Secure way to query user posts"""
sql = """
SELECT * FROM blog_post
WHERE author_id = %s AND status = %s
ORDER BY created_at DESC
"""
return execute_raw_query(sql, [user_id, status])
# INSECURE: String formatting (DON'T DO THIS)
def insecure_user_posts(user_id, status='published'):
"""INSECURE - vulnerable to SQL injection"""
# DON'T DO THIS!
sql = f"""
SELECT * FROM blog_post
WHERE author_id = {user_id} AND status = '{status}'
ORDER BY created_at DESC
"""
return execute_raw_query(sql)
# Safe dynamic query building
def build_dynamic_search_query(filters):
"""Safely build dynamic queries"""
base_sql = "SELECT * FROM blog_post WHERE 1=1"
params = []
if filters.get('author_id'):
base_sql += " AND author_id = %s"
params.append(filters['author_id'])
if filters.get('category_id'):
base_sql += " AND category_id = %s"
params.append(filters['category_id'])
if filters.get('status'):
base_sql += " AND status = %s"
params.append(filters['status'])
if filters.get('search_term'):
base_sql += " AND (title ILIKE %s OR content ILIKE %s)"
search_pattern = f"%{filters['search_term']}%"
params.extend([search_pattern, search_pattern])
base_sql += " ORDER BY created_at DESC"
return execute_raw_query(base_sql, params)
# Input validation and sanitization
def validate_and_execute_query(table_name, columns, filters):
"""Validate inputs before executing raw SQL"""
# Whitelist allowed tables and columns
ALLOWED_TABLES = ['blog_post', 'blog_category', 'auth_user']
ALLOWED_COLUMNS = {
'blog_post': ['id', 'title', 'content', 'author_id', 'status', 'created_at'],
'blog_category': ['id', 'name', 'slug'],
'auth_user': ['id', 'username', 'email', 'first_name', 'last_name']
}
# Validate table name
if table_name not in ALLOWED_TABLES:
raise ValueError(f"Table '{table_name}' not allowed")
# Validate columns
for column in columns:
if column not in ALLOWED_COLUMNS[table_name]:
raise ValueError(f"Column '{column}' not allowed for table '{table_name}'")
# Build safe query
column_list = ', '.join(columns)
sql = f"SELECT {column_list} FROM {table_name} WHERE 1=1"
params = []
# Add filters safely
for field, value in filters.items():
if field in ALLOWED_COLUMNS[table_name]:
sql += f" AND {field} = %s"
params.append(value)
return execute_raw_query(sql, params)
# Transaction management with raw SQL
def execute_raw_transaction(queries_and_params):
"""Execute multiple raw SQL queries in a transaction"""
from django.db import transaction
results = []
with transaction.atomic():
with connection.cursor() as cursor:
for sql, params in queries_and_params:
cursor.execute(sql, params)
if cursor.description:
columns = [col[0] for col in cursor.description]
results.append([dict(zip(columns, row)) for row in cursor.fetchall()])
else:
results.append(cursor.rowcount)
return results
# Usage
transaction_queries = [
("UPDATE blog_post SET view_count = view_count + 1 WHERE id = %s", [1]),
("INSERT INTO analytics_view (post_id, timestamp) VALUES (%s, %s)", [1, timezone.now()]),
("SELECT view_count FROM blog_post WHERE id = %s", [1])
]
results = execute_raw_transaction(transaction_queries)
Raw SQL queries provide the ultimate flexibility for complex database operations, but they should be used judiciously with proper security measures and clear documentation of why the ORM wasn't sufficient for the specific use case.
Search
Django provides multiple approaches for implementing search functionality, from simple text-based searches to full-text search capabilities. Understanding these options enables you to choose the right search solution for your application's needs.
Transactions
Database transactions ensure data consistency and integrity by grouping multiple database operations into atomic units. Django provides comprehensive transaction management tools that help you maintain data consistency even in complex scenarios.