Profiling is essential for identifying performance bottlenecks in Django applications. Without proper profiling, optimization efforts often target the wrong areas, wasting time and resources. This comprehensive guide covers profiling tools, techniques, and methodologies that help you understand where your application spends time and how to make it faster.
Profiling reveals where your application spends time and resources. Django applications have multiple performance layers that require different profiling approaches: database queries, Python code execution, template rendering, and external service calls.
Response Time Profiling
Database Query Profiling
Memory Profiling
CPU Profiling
Django Debug Toolbar is the essential first tool for development profiling. It provides real-time insights into request processing without code changes.
# settings.py
INSTALLED_APPS = [
# ... other apps
'debug_toolbar',
]
MIDDLEWARE = [
'debug_toolbar.middleware.DebugToolbarMiddleware',
# ... other middleware
]
# Show toolbar for these IPs
INTERNAL_IPS = [
'127.0.0.1',
'::1', # IPv6 localhost
]
# Debug toolbar configuration
DEBUG_TOOLBAR_CONFIG = {
'SHOW_TOOLBAR_CALLBACK': lambda request: DEBUG and request.user.is_superuser,
'SHOW_COLLAPSED': True,
'PROFILER_MAX_DEPTH': 10,
}
# Enable SQL query logging
DEBUG_TOOLBAR_PANELS = [
'debug_toolbar.panels.versions.VersionsPanel',
'debug_toolbar.panels.timer.TimerPanel',
'debug_toolbar.panels.settings.SettingsPanel',
'debug_toolbar.panels.headers.HeadersPanel',
'debug_toolbar.panels.request.RequestPanel',
'debug_toolbar.panels.sql.SQLPanel',
'debug_toolbar.panels.staticfiles.StaticFilesPanel',
'debug_toolbar.panels.templates.TemplatesPanel',
'debug_toolbar.panels.cache.CachePanel',
'debug_toolbar.panels.signals.SignalsPanel',
'debug_toolbar.panels.logging.LoggingPanel',
'debug_toolbar.panels.redirects.RedirectsPanel',
'debug_toolbar.panels.profiling.ProfilingPanel',
]
# Example view with performance issues
def slow_product_list(request):
"""Intentionally slow view for demonstration"""
# Issue 1: N+1 queries
products = Product.objects.all()[:50]
# Issue 2: Inefficient loop with database queries
product_data = []
for product in products:
# Each iteration hits the database
review_count = product.reviews.count() # Query per product
avg_rating = product.reviews.aggregate(
avg=Avg('rating')
)['avg'] # Another query per product
# Issue 3: Expensive computation in loop
related_products = Product.objects.filter(
category=product.category
).exclude(id=product.id)[:5] # Query per product
product_data.append({
'product': product,
'review_count': review_count,
'avg_rating': avg_rating,
'related_products': list(related_products),
})
# Debug Toolbar will show:
# - 151+ SQL queries (1 + 50*3)
# - High response time (2-3 seconds)
# - Template rendering time
# - Memory usage spikes
return render(request, 'products/list.html', {
'product_data': product_data
})
# Optimized version
def fast_product_list(request):
"""Optimized view using profiling insights"""
# Fix 1: Use select_related and prefetch_related
products = Product.objects.select_related('category').prefetch_related(
'reviews'
).all()[:50]
# Fix 2: Use annotations for aggregations
products = products.annotate(
review_count=Count('reviews'),
avg_rating=Avg('reviews__rating')
)
# Fix 3: Batch related product queries
category_ids = [p.category_id for p in products]
related_products_map = {}
for category_id in set(category_ids):
related_products_map[category_id] = list(
Product.objects.filter(category_id=category_id)
.exclude(id__in=[p.id for p in products])[:5]
)
# Build product data without additional queries
product_data = []
for product in products:
product_data.append({
'product': product,
'review_count': product.review_count,
'avg_rating': product.avg_rating,
'related_products': related_products_map.get(product.category_id, []),
})
# Debug Toolbar now shows:
# - 3-4 SQL queries total
# - Response time under 200ms
# - Reduced memory usage
return render(request, 'products/list.html', {
'product_data': product_data
})
Django Silk provides production-ready profiling with detailed request analysis, SQL query profiling, and performance monitoring.
# Install: pip install django-silk
# settings.py
INSTALLED_APPS = [
# ... other apps
'silk',
]
MIDDLEWARE = [
'silk.middleware.SilkyMiddleware',
# ... other middleware
]
# Silk configuration
SILKY_PYTHON_PROFILER = True
SILKY_PYTHON_PROFILER_BINARY = True
SILKY_PYTHON_PROFILER_RESULT_PATH = '/tmp/silk_profiles/'
# Authentication for Silk interface
SILKY_AUTHENTICATION = True
SILKY_AUTHORISATION = True
# Custom authorization
def silk_permission_check(user):
return user.is_superuser
SILKY_PERMISSIONS = silk_permission_check
# Performance thresholds
SILKY_MAX_REQUEST_BODY_SIZE = 1024 * 1024 # 1MB
SILKY_MAX_RESPONSE_BODY_SIZE = 1024 * 1024 # 1MB
SILKY_INTERCEPT_PERCENT = 100 # Profile 100% of requests in development
# Production settings
if not DEBUG:
SILKY_INTERCEPT_PERCENT = 10 # Profile 10% of requests in production
SILKY_MAX_RECORDED_REQUESTS = 10000
SILKY_MAX_RECORDED_REQUESTS_CHECK_PERCENT = 10
from silk.profiling.profiler import silk_profile
# Profile specific functions
@silk_profile(name='Calculate User Statistics')
def calculate_user_stats(user_id):
"""Calculate comprehensive user statistics"""
user = User.objects.select_related('profile').get(id=user_id)
# This will be profiled by Silk
stats = {
'total_orders': user.orders.count(),
'total_spent': user.orders.aggregate(
total=Sum('total_amount')
)['total'] or 0,
'avg_order_value': user.orders.aggregate(
avg=Avg('total_amount')
)['avg'] or 0,
'favorite_categories': list(
user.orders.values('items__product__category__name')
.annotate(count=Count('id'))
.order_by('-count')[:5]
),
}
return stats
# Profile view methods
class ProductListView(ListView):
model = Product
template_name = 'products/list.html'
paginate_by = 20
@silk_profile(name='Get Product Queryset')
def get_queryset(self):
"""Profiled queryset method"""
return Product.objects.select_related('category').prefetch_related(
'reviews'
).filter(is_active=True).annotate(
avg_rating=Avg('reviews__rating'),
review_count=Count('reviews')
)
@silk_profile(name='Get Context Data')
def get_context_data(self, **kwargs):
"""Profiled context method"""
context = super().get_context_data(**kwargs)
# Add expensive context data
context['categories'] = Category.objects.filter(
is_active=True
).annotate(
product_count=Count('products')
)
context['featured_products'] = Product.objects.filter(
is_featured=True,
is_active=True
)[:6]
return context
# Custom profiling decorator
def profile_database_queries(func):
"""Decorator to profile database query performance"""
def wrapper(*args, **kwargs):
from django.db import connection, reset_queries
from django.conf import settings
# Enable query logging
old_debug = settings.DEBUG
settings.DEBUG = True
reset_queries()
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
# Analyze queries
queries = connection.queries
total_time = sum(float(q['time']) for q in queries)
print(f"\n=== Database Query Profile for {func.__name__} ===")
print(f"Execution time: {end_time - start_time:.4f}s")
print(f"Database time: {total_time:.4f}s")
print(f"Query count: {len(queries)}")
print(f"DB time percentage: {(total_time / (end_time - start_time)) * 100:.1f}%")
# Show slow queries
slow_queries = [q for q in queries if float(q['time']) > 0.01]
if slow_queries:
print(f"\nSlow queries (>10ms): {len(slow_queries)}")
for i, query in enumerate(slow_queries[:5]):
print(f"{i+1}. Time: {query['time']}s")
print(f" SQL: {query['sql'][:100]}...")
# Restore debug setting
settings.DEBUG = old_debug
return result
return wrapper
# Usage
@profile_database_queries
def expensive_report_generation():
"""Generate expensive report with query profiling"""
# Your expensive operations here
pass
import cProfile
import pstats
import io
from django.http import HttpResponse
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
def profile_view(func):
"""Decorator to profile view functions"""
def wrapper(request, *args, **kwargs):
if request.GET.get('profile') == '1' and request.user.is_superuser:
# Create profiler
profiler = cProfile.Profile()
# Profile the view
profiler.enable()
response = func(request, *args, **kwargs)
profiler.disable()
# Generate profile report
s = io.StringIO()
ps = pstats.Stats(profiler, stream=s)
ps.sort_stats('cumulative')
ps.print_stats(50) # Top 50 functions
# Return profile as response
profile_output = s.getvalue()
return HttpResponse(
f"<pre>{profile_output}</pre>",
content_type='text/html'
)
else:
return func(request, *args, **kwargs)
return wrapper
# Usage
@profile_view
def complex_dashboard(request):
"""Complex dashboard with profiling capability"""
# Access with ?profile=1 to see profile
# Expensive operations
user_stats = calculate_user_stats(request.user.id)
recent_orders = get_recent_orders(request.user.id)
recommendations = generate_recommendations(request.user.id)
context = {
'user_stats': user_stats,
'recent_orders': recent_orders,
'recommendations': recommendations,
}
return render(request, 'dashboard.html', context)
# Line-by-line profiling with line_profiler
def line_profile_function():
"""
Use with kernprof:
kernprof -l -v manage.py runserver
Add @profile decorator to functions you want to profile
"""
pass
# Memory profiling with memory_profiler
from memory_profiler import profile as memory_profile
@memory_profile
def memory_intensive_function():
"""
Run with: python -m memory_profiler your_script.py
Shows line-by-line memory usage
"""
# Memory-intensive operations
large_list = [i for i in range(1000000)]
processed_data = [x * 2 for x in large_list]
return processed_data
import time
import cProfile
import pstats
import io
from django.conf import settings
from django.utils.deprecation import MiddlewareMixin
from django.http import HttpResponse
class ProfilingMiddleware(MiddlewareMixin):
"""Comprehensive profiling middleware"""
def process_request(self, request):
# Initialize profiling data
request.profiling_data = {
'start_time': time.time(),
'start_queries': len(connection.queries) if settings.DEBUG else 0,
}
# Start cProfile if requested
if (request.GET.get('profile') == 'cprofile' and
request.user.is_authenticated and
request.user.is_superuser):
request.profiler = cProfile.Profile()
request.profiler.enable()
def process_response(self, request, response):
if hasattr(request, 'profiling_data'):
# Calculate timing
end_time = time.time()
duration = end_time - request.profiling_data['start_time']
# Calculate query stats
if settings.DEBUG:
end_queries = len(connection.queries)
query_count = end_queries - request.profiling_data['start_queries']
query_time = sum(
float(q['time'])
for q in connection.queries[request.profiling_data['start_queries']:]
)
else:
query_count = 0
query_time = 0
# Add performance headers
response['X-Response-Time'] = f'{duration:.3f}'
response['X-DB-Queries'] = str(query_count)
response['X-DB-Time'] = f'{query_time:.3f}'
# Handle cProfile output
if hasattr(request, 'profiler'):
request.profiler.disable()
s = io.StringIO()
ps = pstats.Stats(request.profiler, stream=s)
ps.sort_stats('cumulative')
ps.print_stats(100)
profile_output = s.getvalue()
return HttpResponse(
f"<pre>{profile_output}</pre>",
content_type='text/html'
)
# Log slow requests
if duration > 1.0:
logger.warning(
f'Slow request: {request.path} '
f'took {duration:.3f}s with {query_count} queries'
)
return response
from django.db import connection
from django.core.management.base import BaseCommand
import time
class QueryAnalyzer:
"""Analyze database query performance"""
def __init__(self):
self.reset_stats()
def reset_stats(self):
"""Reset query statistics"""
self.queries = []
self.start_time = time.time()
self.start_query_count = len(connection.queries)
def analyze_queries(self):
"""Analyze executed queries"""
end_time = time.time()
current_queries = connection.queries[self.start_query_count:]
analysis = {
'total_time': end_time - self.start_time,
'query_count': len(current_queries),
'total_db_time': sum(float(q['time']) for q in current_queries),
'slow_queries': [],
'duplicate_queries': {},
'n_plus_one_candidates': [],
}
# Identify slow queries
for query in current_queries:
if float(query['time']) > 0.01: # Slower than 10ms
analysis['slow_queries'].append({
'time': float(query['time']),
'sql': query['sql'][:200] + '...' if len(query['sql']) > 200 else query['sql']
})
# Identify duplicate queries
for query in current_queries:
sql = query['sql']
if sql in analysis['duplicate_queries']:
analysis['duplicate_queries'][sql] += 1
else:
analysis['duplicate_queries'][sql] = 1
# Remove non-duplicates
analysis['duplicate_queries'] = {
sql: count for sql, count in analysis['duplicate_queries'].items()
if count > 1
}
# Identify potential N+1 queries
similar_queries = {}
for query in current_queries:
# Normalize query by removing specific IDs
normalized = self._normalize_query(query['sql'])
if normalized in similar_queries:
similar_queries[normalized].append(query)
else:
similar_queries[normalized] = [query]
for normalized, queries in similar_queries.items():
if len(queries) > 5: # More than 5 similar queries
analysis['n_plus_one_candidates'].append({
'pattern': normalized,
'count': len(queries),
'example': queries[0]['sql'][:200]
})
return analysis
def _normalize_query(self, sql):
"""Normalize SQL query for pattern matching"""
import re
# Replace numbers with placeholder
normalized = re.sub(r'\b\d+\b', 'N', sql)
# Replace quoted strings with placeholder
normalized = re.sub(r"'[^']*'", "'STRING'", normalized)
return normalized
def print_analysis(self, analysis):
"""Print query analysis results"""
print(f"\n=== Query Performance Analysis ===")
print(f"Total execution time: {analysis['total_time']:.3f}s")
print(f"Database time: {analysis['total_db_time']:.3f}s")
print(f"Query count: {analysis['query_count']}")
print(f"DB percentage: {(analysis['total_db_time'] / analysis['total_time']) * 100:.1f}%")
if analysis['slow_queries']:
print(f"\n--- Slow Queries (>{10}ms) ---")
for i, query in enumerate(analysis['slow_queries'][:5]):
print(f"{i+1}. {query['time']:.3f}s: {query['sql']}")
if analysis['duplicate_queries']:
print(f"\n--- Duplicate Queries ---")
for sql, count in list(analysis['duplicate_queries'].items())[:5]:
print(f"{count}x: {sql[:100]}...")
if analysis['n_plus_one_candidates']:
print(f"\n--- Potential N+1 Queries ---")
for candidate in analysis['n_plus_one_candidates'][:3]:
print(f"{candidate['count']}x similar: {candidate['example'][:100]}...")
# Usage in views
def analyze_view_performance(request):
"""Example view with query analysis"""
analyzer = QueryAnalyzer()
# Your view logic here
products = Product.objects.all()[:50]
# This will trigger N+1 queries
for product in products:
print(product.category.name) # Query per product
# Analyze the queries
analysis = analyzer.analyze_queries()
analyzer.print_analysis(analysis)
return render(request, 'products.html', {'products': products})
from django.db import connection
class QueryExplainer:
"""Analyze query execution plans"""
@staticmethod
def explain_queryset(queryset, analyze=False):
"""Get EXPLAIN output for a queryset"""
query = str(queryset.query)
with connection.cursor() as cursor:
if connection.vendor == 'postgresql':
explain_query = f"EXPLAIN {'ANALYZE ' if analyze else ''}(BUFFERS, FORMAT JSON) {query}"
cursor.execute(explain_query)
result = cursor.fetchone()[0]
return result
elif connection.vendor == 'mysql':
cursor.execute(f"EXPLAIN FORMAT=JSON {query}")
result = cursor.fetchone()[0]
return result
else:
cursor.execute(f"EXPLAIN QUERY PLAN {query}")
return cursor.fetchall()
@staticmethod
def analyze_execution_plan(plan_data):
"""Analyze execution plan for performance issues"""
issues = []
if isinstance(plan_data, dict):
# PostgreSQL JSON format
plan = plan_data.get('Plan', {})
# Check for sequential scans
if plan.get('Node Type') == 'Seq Scan':
issues.append({
'type': 'sequential_scan',
'message': f"Sequential scan on {plan.get('Relation Name')}",
'cost': plan.get('Total Cost'),
'rows': plan.get('Plan Rows')
})
# Check for expensive operations
if plan.get('Total Cost', 0) > 1000:
issues.append({
'type': 'expensive_operation',
'message': f"High cost operation: {plan.get('Node Type')}",
'cost': plan.get('Total Cost')
})
# Recursively check child plans
for child in plan.get('Plans', []):
child_issues = QueryExplainer.analyze_execution_plan({'Plan': child})
issues.extend(child_issues)
return issues
# Usage examples
def analyze_slow_query():
"""Analyze a potentially slow query"""
# Create a complex queryset
queryset = Product.objects.select_related('category').prefetch_related(
'reviews'
).filter(
is_active=True,
price__gte=100
).annotate(
avg_rating=Avg('reviews__rating'),
review_count=Count('reviews')
).order_by('-avg_rating')
# Get execution plan
plan = QueryExplainer.explain_queryset(queryset, analyze=True)
issues = QueryExplainer.analyze_execution_plan(plan)
print("=== Query Execution Plan Analysis ===")
print(f"Query: {queryset.query}")
print(f"Execution plan: {plan}")
if issues:
print("\n--- Performance Issues ---")
for issue in issues:
print(f"- {issue['type']}: {issue['message']}")
else:
print("No obvious performance issues found.")
# Management command for query analysis
class Command(BaseCommand):
"""Management command to analyze query performance"""
help = 'Analyze query performance for specific models'
def add_arguments(self, parser):
parser.add_argument('model', type=str, help='Model to analyze')
parser.add_argument('--limit', type=int, default=100, help='Query limit')
def handle(self, *args, **options):
from django.apps import apps
try:
model = apps.get_model(options['model'])
except LookupError:
self.stdout.write(
self.style.ERROR(f"Model {options['model']} not found")
)
return
# Analyze basic query
queryset = model.objects.all()[:options['limit']]
analyzer = QueryAnalyzer()
# Execute query
list(queryset) # Force evaluation
# Analyze results
analysis = analyzer.analyze_queries()
analyzer.print_analysis(analysis)
# Get execution plan
plan = QueryExplainer.explain_queryset(queryset)
issues = QueryExplainer.analyze_execution_plan(plan)
if issues:
self.stdout.write("\n--- Execution Plan Issues ---")
for issue in issues:
self.stdout.write(f"- {issue['message']}")
import tracemalloc
import psutil
import os
from django.utils.deprecation import MiddlewareMixin
class MemoryProfilingMiddleware(MiddlewareMixin):
"""Monitor memory usage during request processing"""
def process_request(self, request):
if request.GET.get('memory_profile') == '1' and request.user.is_superuser:
# Start memory tracing
tracemalloc.start()
# Get initial memory usage
process = psutil.Process(os.getpid())
request.initial_memory = process.memory_info().rss / 1024 / 1024 # MB
request.memory_profiling = True
def process_response(self, request, response):
if hasattr(request, 'memory_profiling'):
# Get final memory usage
process = psutil.Process(os.getpid())
final_memory = process.memory_info().rss / 1024 / 1024 # MB
# Get memory trace
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
# Calculate memory delta
memory_delta = final_memory - request.initial_memory
# Add memory headers
response['X-Memory-Initial'] = f'{request.initial_memory:.1f}MB'
response['X-Memory-Final'] = f'{final_memory:.1f}MB'
response['X-Memory-Delta'] = f'{memory_delta:.1f}MB'
response['X-Memory-Peak'] = f'{peak / 1024 / 1024:.1f}MB'
# Log high memory usage
if memory_delta > 50: # More than 50MB increase
logger.warning(
f'High memory usage: {request.path} '
f'increased memory by {memory_delta:.1f}MB'
)
return response
# Memory profiling decorator
def memory_profile(func):
"""Decorator to profile memory usage of functions"""
def wrapper(*args, **kwargs):
tracemalloc.start()
# Get initial memory
process = psutil.Process(os.getpid())
initial_memory = process.memory_info().rss
# Execute function
result = func(*args, **kwargs)
# Get final memory
final_memory = process.memory_info().rss
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
# Calculate usage
memory_delta = (final_memory - initial_memory) / 1024 / 1024 # MB
peak_mb = peak / 1024 / 1024
print(f"\n=== Memory Profile for {func.__name__} ===")
print(f"Memory delta: {memory_delta:.1f}MB")
print(f"Peak memory: {peak_mb:.1f}MB")
return result
return wrapper
# Usage
@memory_profile
def memory_intensive_operation():
"""Example memory-intensive operation"""
# Create large data structures
large_list = [i for i in range(1000000)]
large_dict = {i: f"value_{i}" for i in range(100000)}
# Process data
processed = [x * 2 for x in large_list if x % 2 == 0]
return processed
import gc
import weakref
from collections import defaultdict
class MemoryLeakDetector:
"""Detect potential memory leaks in Django applications"""
def __init__(self):
self.object_counts = defaultdict(int)
self.tracked_objects = []
def start_tracking(self):
"""Start tracking object creation"""
self.object_counts.clear()
self.tracked_objects.clear()
# Count existing objects
for obj in gc.get_objects():
obj_type = type(obj).__name__
self.object_counts[obj_type] += 1
def check_leaks(self):
"""Check for potential memory leaks"""
current_counts = defaultdict(int)
# Count current objects
for obj in gc.get_objects():
obj_type = type(obj).__name__
current_counts[obj_type] += 1
# Find objects that increased significantly
leaks = {}
for obj_type, current_count in current_counts.items():
initial_count = self.object_counts.get(obj_type, 0)
increase = current_count - initial_count
# Consider it a potential leak if increase > 1000 objects
if increase > 1000:
leaks[obj_type] = {
'initial': initial_count,
'current': current_count,
'increase': increase
}
return leaks
def track_object_lifecycle(self, obj):
"""Track specific object lifecycle"""
def cleanup_callback(ref):
print(f"Object {ref} was garbage collected")
weak_ref = weakref.ref(obj, cleanup_callback)
self.tracked_objects.append(weak_ref)
return weak_ref
def force_garbage_collection(self):
"""Force garbage collection and report"""
print("=== Garbage Collection Report ===")
# Get stats before collection
before_counts = [len(gc.get_objects(generation)) for generation in range(3)]
# Force collection
collected = gc.collect()
# Get stats after collection
after_counts = [len(gc.get_objects(generation)) for generation in range(3)]
print(f"Objects collected: {collected}")
for i, (before, after) in enumerate(zip(before_counts, after_counts)):
print(f"Generation {i}: {before} -> {after} (freed: {before - after})")
# Check for uncollectable objects
uncollectable = gc.garbage
if uncollectable:
print(f"Uncollectable objects: {len(uncollectable)}")
for obj in uncollectable[:5]: # Show first 5
print(f" - {type(obj).__name__}: {repr(obj)[:100]}")
# Usage in views
leak_detector = MemoryLeakDetector()
def memory_leak_test_view(request):
"""View to test for memory leaks"""
if request.GET.get('start_tracking'):
leak_detector.start_tracking()
return HttpResponse("Started memory leak tracking")
elif request.GET.get('check_leaks'):
leaks = leak_detector.check_leaks()
if leaks:
response = "Potential memory leaks detected:\n"
for obj_type, data in leaks.items():
response += f"- {obj_type}: {data['increase']} new objects\n"
else:
response = "No significant memory leaks detected"
return HttpResponse(response, content_type='text/plain')
elif request.GET.get('gc_report'):
leak_detector.force_garbage_collection()
return HttpResponse("Garbage collection report printed to console")
# Regular view logic
return render(request, 'memory_test.html')
# Integration with APM services like New Relic, DataDog, or Sentry
# New Relic integration
import newrelic.agent
@newrelic.agent.function_trace()
def expensive_calculation():
"""Function traced by New Relic"""
# Your expensive code here
pass
@newrelic.agent.background_task()
def background_task():
"""Background task traced by New Relic"""
# Your background task code here
pass
# Custom metrics
def record_custom_metrics():
"""Record custom performance metrics"""
# Record response time
newrelic.agent.record_metric('Custom/ResponseTime', response_time)
# Record business metrics
newrelic.agent.record_metric('Custom/OrdersProcessed', order_count)
# Record database metrics
newrelic.agent.record_metric('Custom/DatabaseQueries', query_count)
# Sentry performance monitoring
import sentry_sdk
from sentry_sdk.integrations.django import DjangoIntegration
sentry_sdk.init(
dsn="your-sentry-dsn",
integrations=[DjangoIntegration()],
traces_sample_rate=0.1, # 10% of transactions
send_default_pii=True,
)
# Custom transaction monitoring
def monitored_view(request):
"""View with custom Sentry monitoring"""
with sentry_sdk.start_transaction(op="view", name="complex_dashboard"):
# Database span
with sentry_sdk.start_span(op="db", description="fetch user data"):
user_data = get_user_data(request.user.id)
# External API span
with sentry_sdk.start_span(op="http", description="fetch recommendations"):
recommendations = fetch_recommendations(request.user.id)
# Template rendering span
with sentry_sdk.start_span(op="template", description="render dashboard"):
return render(request, 'dashboard.html', {
'user_data': user_data,
'recommendations': recommendations,
})
import time
import json
from django.core.cache import cache
from django.utils import timezone
from datetime import timedelta
class PerformanceMonitor:
"""Custom performance monitoring system"""
def __init__(self):
self.metrics = defaultdict(list)
def record_metric(self, name, value, tags=None):
"""Record a performance metric"""
metric_data = {
'timestamp': timezone.now().isoformat(),
'value': value,
'tags': tags or {}
}
# Store in cache for real-time access
cache_key = f"metric_{name}"
cached_metrics = cache.get(cache_key, [])
cached_metrics.append(metric_data)
# Keep only last 1000 metrics
if len(cached_metrics) > 1000:
cached_metrics = cached_metrics[-1000:]
cache.set(cache_key, cached_metrics, 3600) # 1 hour
def get_metrics(self, name, hours=1):
"""Get metrics for the last N hours"""
cache_key = f"metric_{name}"
cached_metrics = cache.get(cache_key, [])
# Filter by time
cutoff_time = timezone.now() - timedelta(hours=hours)
recent_metrics = [
m for m in cached_metrics
if timezone.datetime.fromisoformat(m['timestamp']) > cutoff_time
]
return recent_metrics
def calculate_percentiles(self, name, hours=1):
"""Calculate percentiles for a metric"""
metrics = self.get_metrics(name, hours)
values = [m['value'] for m in metrics]
if not values:
return {}
values.sort()
count = len(values)
return {
'count': count,
'min': values[0],
'max': values[-1],
'median': values[count // 2],
'p95': values[int(count * 0.95)] if count > 20 else values[-1],
'p99': values[int(count * 0.99)] if count > 100 else values[-1],
'avg': sum(values) / count
}
# Global monitor instance
performance_monitor = PerformanceMonitor()
# Middleware for automatic monitoring
class PerformanceMonitoringMiddleware(MiddlewareMixin):
"""Middleware to automatically record performance metrics"""
def process_request(self, request):
request.perf_start_time = time.time()
request.perf_start_queries = len(connection.queries) if settings.DEBUG else 0
def process_response(self, request, response):
if hasattr(request, 'perf_start_time'):
# Calculate metrics
duration = time.time() - request.perf_start_time
if settings.DEBUG:
query_count = len(connection.queries) - request.perf_start_queries
query_time = sum(
float(q['time'])
for q in connection.queries[request.perf_start_queries:]
)
else:
query_count = 0
query_time = 0
# Record metrics
performance_monitor.record_metric('response_time', duration, {
'path': request.path,
'method': request.method,
'status_code': response.status_code
})
performance_monitor.record_metric('database_queries', query_count, {
'path': request.path
})
performance_monitor.record_metric('database_time', query_time, {
'path': request.path
})
return response
# Performance dashboard view
def performance_dashboard(request):
"""View to display performance metrics"""
hours = int(request.GET.get('hours', 1))
# Get performance statistics
response_time_stats = performance_monitor.calculate_percentiles('response_time', hours)
query_count_stats = performance_monitor.calculate_percentiles('database_queries', hours)
# Get recent slow requests
response_time_metrics = performance_monitor.get_metrics('response_time', hours)
slow_requests = [
m for m in response_time_metrics
if m['value'] > 1.0 # Slower than 1 second
]
context = {
'response_time_stats': response_time_stats,
'query_count_stats': query_count_stats,
'slow_requests': slow_requests[:20], # Last 20 slow requests
'hours': hours,
}
return render(request, 'performance_dashboard.html', context)
# locustfile.py
from locust import HttpUser, task, between
import random
class DjangoUser(HttpUser):
wait_time = between(1, 3)
def on_start(self):
"""Login user at start"""
response = self.client.post("/login/", {
"username": "testuser",
"password": "testpass"
})
if response.status_code != 200:
print(f"Login failed: {response.status_code}")
@task(3)
def view_homepage(self):
"""Test homepage performance"""
with self.client.get("/", catch_response=True) as response:
if response.status_code == 200:
if response.elapsed.total_seconds() > 2.0:
response.failure(f"Homepage too slow: {response.elapsed.total_seconds()}s")
else:
response.failure(f"Homepage failed: {response.status_code}")
@task(2)
def view_product_list(self):
"""Test product list performance"""
page = random.randint(1, 10)
with self.client.get(f"/products/?page={page}", catch_response=True) as response:
if response.status_code == 200:
# Check for performance indicators
if "X-DB-Queries" in response.headers:
query_count = int(response.headers["X-DB-Queries"])
if query_count > 20:
response.failure(f"Too many queries: {query_count}")
if response.elapsed.total_seconds() > 1.0:
response.failure(f"Product list too slow: {response.elapsed.total_seconds()}s")
else:
response.failure(f"Product list failed: {response.status_code}")
@task(1)
def view_product_detail(self):
"""Test product detail performance"""
product_id = random.randint(1, 1000)
with self.client.get(f"/products/{product_id}/", catch_response=True) as response:
if response.status_code == 200:
if response.elapsed.total_seconds() > 0.5:
response.failure(f"Product detail too slow: {response.elapsed.total_seconds()}s")
elif response.status_code == 404:
# 404 is acceptable for random product IDs
pass
else:
response.failure(f"Product detail failed: {response.status_code}")
@task(1)
def search_products(self):
"""Test search performance"""
search_terms = ["laptop", "phone", "book", "shirt", "shoes"]
term = random.choice(search_terms)
with self.client.get(f"/search/?q={term}", catch_response=True) as response:
if response.status_code == 200:
if response.elapsed.total_seconds() > 1.5:
response.failure(f"Search too slow: {response.elapsed.total_seconds()}s")
else:
response.failure(f"Search failed: {response.status_code}")
# Run with: locust -f locustfile.py --host=http://localhost:8000
# management/commands/benchmark.py
from django.core.management.base import BaseCommand
from django.test import Client
from django.contrib.auth.models import User
import time
import statistics
class Command(BaseCommand):
help = 'Benchmark Django application performance'
def add_arguments(self, parser):
parser.add_argument('--requests', type=int, default=100, help='Number of requests')
parser.add_argument('--url', type=str, default='/', help='URL to benchmark')
parser.add_argument('--warmup', type=int, default=10, help='Warmup requests')
def handle(self, *args, **options):
client = Client()
# Login if needed
try:
user = User.objects.get(username='testuser')
client.force_login(user)
except User.DoesNotExist:
pass
url = options['url']
num_requests = options['requests']
warmup_requests = options['warmup']
self.stdout.write(f"Benchmarking {url} with {num_requests} requests...")
# Warmup
self.stdout.write(f"Warming up with {warmup_requests} requests...")
for _ in range(warmup_requests):
client.get(url)
# Benchmark
response_times = []
successful_requests = 0
start_time = time.time()
for i in range(num_requests):
request_start = time.time()
response = client.get(url)
request_end = time.time()
response_time = request_end - request_start
response_times.append(response_time)
if response.status_code == 200:
successful_requests += 1
if (i + 1) % 10 == 0:
self.stdout.write(f"Completed {i + 1}/{num_requests} requests")
end_time = time.time()
total_time = end_time - start_time
# Calculate statistics
avg_response_time = statistics.mean(response_times)
median_response_time = statistics.median(response_times)
min_response_time = min(response_times)
max_response_time = max(response_times)
if len(response_times) > 1:
stdev_response_time = statistics.stdev(response_times)
p95_response_time = sorted(response_times)[int(len(response_times) * 0.95)]
p99_response_time = sorted(response_times)[int(len(response_times) * 0.99)]
else:
stdev_response_time = 0
p95_response_time = max_response_time
p99_response_time = max_response_time
requests_per_second = num_requests / total_time
success_rate = (successful_requests / num_requests) * 100
# Print results
self.stdout.write(self.style.SUCCESS("\n=== Benchmark Results ==="))
self.stdout.write(f"URL: {url}")
self.stdout.write(f"Total requests: {num_requests}")
self.stdout.write(f"Successful requests: {successful_requests}")
self.stdout.write(f"Success rate: {success_rate:.1f}%")
self.stdout.write(f"Total time: {total_time:.2f}s")
self.stdout.write(f"Requests per second: {requests_per_second:.2f}")
self.stdout.write(f"\n--- Response Times ---")
self.stdout.write(f"Average: {avg_response_time:.3f}s")
self.stdout.write(f"Median: {median_response_time:.3f}s")
self.stdout.write(f"Min: {min_response_time:.3f}s")
self.stdout.write(f"Max: {max_response_time:.3f}s")
self.stdout.write(f"Standard deviation: {stdev_response_time:.3f}s")
self.stdout.write(f"95th percentile: {p95_response_time:.3f}s")
self.stdout.write(f"99th percentile: {p99_response_time:.3f}s")
Effective profiling transforms Django applications from slow, resource-hungry systems into efficient, scalable platforms. By combining development profiling tools with production monitoring, you can maintain optimal performance while continuously improving your application's efficiency.
The key is to profile systematically, focus on high-impact optimizations, and maintain performance awareness throughout the development lifecycle. With proper profiling practices, you can build Django applications that perform excellently under real-world conditions.
Caching Strategies
Caching is one of the most effective performance optimization techniques in Django applications. By storing frequently accessed data in fast storage systems, you can dramatically reduce database load, improve response times, and enhance user experience. This comprehensive guide covers multi-level caching strategies, cache backends, invalidation patterns, and advanced caching techniques.
Advanced and Expert Topics
This comprehensive guide explores advanced Django concepts and expert-level techniques for building enterprise-scale applications. These topics represent the pinnacle of Django development, covering system architecture patterns, domain-driven design, large-scale project organization, and advanced customization techniques that separate expert developers from intermediate practitioners.