Production Django applications require sophisticated caching architectures that span multiple layers, from browser caches to CDNs, reverse proxies, and application-level caches. This chapter covers enterprise-grade caching patterns, multi-tier architectures, and deployment strategies that enable applications to handle massive scale while maintaining excellent performance and reliability.
┌─────────────────┐
│ Browser Cache │ ← HTTP Headers, Service Workers
└─────────────────┘
↓
┌─────────────────┐
│ CDN Cache │ ← CloudFlare, AWS CloudFront, Fastly
└─────────────────┘
↓
┌─────────────────┐
│ Reverse Proxy │ ← Nginx, Varnish, HAProxy
└─────────────────┘
↓
┌─────────────────┐
│ Load Balancer │ ← Application-level load balancing
└─────────────────┘
↓
┌─────────────────┐
│ Django App │ ← View caching, template fragments
└─────────────────┘
↓
┌─────────────────┐
│ Application │ ← Redis, Memcached
│ Cache │
└─────────────────┘
↓
┌─────────────────┐
│ Database Cache │ ← Query caching, connection pooling
└─────────────────┘
# settings/production.py
import os
from datetime import timedelta
# Cache configuration for production
CACHES = {
'default': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': [
f"redis://{os.environ['REDIS_PRIMARY_HOST']}:6379/1",
f"redis://{os.environ['REDIS_REPLICA_1_HOST']}:6379/1",
f"redis://{os.environ['REDIS_REPLICA_2_HOST']}:6379/1",
],
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.ShardClient',
'CONNECTION_POOL_KWARGS': {
'max_connections': 100,
'retry_on_timeout': True,
'socket_keepalive': True,
},
'COMPRESSOR': 'django_redis.compressors.lz4.Lz4Compressor',
'SERIALIZER': 'django_redis.serializers.msgpack.MSGPackSerializer',
'IGNORE_EXCEPTIONS': True, # Graceful degradation
},
'KEY_PREFIX': f"{os.environ['APP_NAME']}_prod",
'VERSION': int(os.environ.get('CACHE_VERSION', 1)),
},
# Separate cache for sessions (high availability)
'sessions': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': f"redis://{os.environ['REDIS_SESSIONS_HOST']}:6379/2",
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
'CONNECTION_POOL_KWARGS': {
'max_connections': 50,
'retry_on_timeout': True,
},
},
'TIMEOUT': 1800, # 30 minutes
},
# Cache for temporary data (can be volatile)
'temporary': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': f"redis://{os.environ['REDIS_TEMP_HOST']}:6379/3",
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
},
'TIMEOUT': 300, # 5 minutes
},
# Local memory cache for hot data
'local': {
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
'LOCATION': 'local-cache',
'TIMEOUT': 60, # 1 minute
'OPTIONS': {
'MAX_ENTRIES': 10000,
'CULL_FREQUENCY': 3,
}
}
}
# Session configuration
SESSION_ENGINE = 'django.contrib.sessions.backends.cache'
SESSION_CACHE_ALIAS = 'sessions'
SESSION_COOKIE_AGE = 1800 # 30 minutes
# Cache middleware configuration
CACHE_MIDDLEWARE_ALIAS = 'default'
CACHE_MIDDLEWARE_SECONDS = 300 # 5 minutes
CACHE_MIDDLEWARE_KEY_PREFIX = f"{os.environ['APP_NAME']}_page"
# HTTP caching headers
USE_ETAGS = True
USE_LAST_MODIFIED = True
# Static files caching
STATICFILES_STORAGE = 'whitenoise.storage.CompressedManifestStaticFilesStorage'
STATIC_URL = f"https://{os.environ['CDN_DOMAIN']}/static/"
# Media files caching (if using cloud storage)
DEFAULT_FILE_STORAGE = 'storages.backends.s3boto3.S3Boto3Storage'
AWS_S3_CUSTOM_DOMAIN = os.environ['CDN_DOMAIN']
AWS_S3_OBJECT_PARAMETERS = {
'CacheControl': 'max-age=86400', # 24 hours
}
# utils/cdn.py
import requests
import os
from django.conf import settings
from django.core.cache import cache
class CloudFlareCache:
"""Manage CloudFlare cache operations."""
def __init__(self):
self.api_token = os.environ['CLOUDFLARE_API_TOKEN']
self.zone_id = os.environ['CLOUDFLARE_ZONE_ID']
self.base_url = 'https://api.cloudflare.com/client/v4'
def purge_cache(self, urls=None, tags=None, purge_everything=False):
"""Purge CloudFlare cache."""
headers = {
'Authorization': f'Bearer {self.api_token}',
'Content-Type': 'application/json',
}
url = f"{self.base_url}/zones/{self.zone_id}/purge_cache"
if purge_everything:
data = {'purge_everything': True}
elif urls:
data = {'files': urls}
elif tags:
data = {'tags': tags}
else:
raise ValueError("Must specify urls, tags, or purge_everything")
response = requests.post(url, json=data, headers=headers)
response.raise_for_status()
return response.json()
def purge_url(self, url):
"""Purge a specific URL from CloudFlare cache."""
return self.purge_cache(urls=[url])
def purge_tag(self, tag):
"""Purge all URLs with a specific cache tag."""
return self.purge_cache(tags=[tag])
def set_cache_rules(self, rules):
"""Set CloudFlare cache rules via API."""
# Implementation for setting cache rules
pass
# Django middleware for CDN cache headers
class CDNCacheMiddleware:
"""Add CDN-specific cache headers."""
def __init__(self, get_response):
self.get_response = get_response
self.cf_cache = CloudFlareCache()
def __call__(self, request):
response = self.get_response(request)
# Add cache tags for selective purging
if hasattr(request, 'cache_tags'):
cache_tags = ','.join(request.cache_tags)
response['Cache-Tag'] = cache_tags
# Set appropriate cache headers based on content type
if request.path.startswith('/api/'):
# API responses - shorter cache times
response['Cache-Control'] = 'public, max-age=300, s-maxage=600'
elif request.path.startswith('/static/'):
# Static files - long cache times
response['Cache-Control'] = 'public, max-age=31536000, immutable'
elif request.path.startswith('/media/'):
# Media files - medium cache times
response['Cache-Control'] = 'public, max-age=86400'
else:
# HTML pages - moderate cache times
response['Cache-Control'] = 'public, max-age=300, s-maxage=1800'
return response
# Usage in views
from django.views.decorators.cache import cache_control
from django.views.decorators.vary import vary_on_headers
@cache_control(public=True, max_age=3600, s_maxage=7200)
@vary_on_headers('Accept-Language', 'Accept-Encoding')
def cached_blog_post(request, slug):
"""Blog post with CDN caching."""
post = get_object_or_404(Post, slug=slug, published=True)
# Add cache tags for selective purging
request.cache_tags = [
f'post_{post.id}',
f'category_{post.category.slug}',
f'author_{post.author.id}',
]
context = {'post': post}
return render(request, 'blog/post_detail.html', context)
# utils/cloudfront.py
import boto3
import json
from django.conf import settings
class CloudFrontCache:
"""Manage AWS CloudFront cache operations."""
def __init__(self):
self.client = boto3.client(
'cloudfront',
aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
region_name='us-east-1' # CloudFront is global but uses us-east-1
)
self.distribution_id = settings.CLOUDFRONT_DISTRIBUTION_ID
def create_invalidation(self, paths):
"""Create CloudFront invalidation for specified paths."""
if isinstance(paths, str):
paths = [paths]
response = self.client.create_invalidation(
DistributionId=self.distribution_id,
InvalidationBatch={
'Paths': {
'Quantity': len(paths),
'Items': paths
},
'CallerReference': f"django-invalidation-{int(time.time())}"
}
)
return response['Invalidation']['Id']
def get_invalidation_status(self, invalidation_id):
"""Get status of CloudFront invalidation."""
response = self.client.get_invalidation(
DistributionId=self.distribution_id,
Id=invalidation_id
)
return response['Invalidation']['Status']
def invalidate_post(self, post):
"""Invalidate CloudFront cache for a specific post."""
paths = [
f'/blog/{post.slug}/',
f'/blog/category/{post.category.slug}/',
'/blog/', # Blog list page
'/', # Home page (if it includes recent posts)
]
return self.create_invalidation(paths)
def invalidate_static_files(self, file_patterns):
"""Invalidate static files in CloudFront."""
paths = [f'/static/{pattern}' for pattern in file_patterns]
return self.create_invalidation(paths)
# Signal handlers for automatic CDN invalidation
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
cloudfront = CloudFrontCache()
@receiver(post_save, sender=Post)
def invalidate_cdn_on_post_save(sender, instance, **kwargs):
"""Invalidate CDN cache when post is saved."""
if instance.published:
try:
invalidation_id = cloudfront.invalidate_post(instance)
cache.set(f'invalidation_{instance.id}', invalidation_id, 3600)
except Exception as e:
# Log error but don't break the application
import logging
logger = logging.getLogger(__name__)
logger.error(f"CDN invalidation failed: {e}")
# /etc/nginx/sites-available/django-app
upstream django_app {
server 127.0.0.1:8000;
server 127.0.0.1:8001;
server 127.0.0.1:8002;
keepalive 32;
}
# Cache configuration
proxy_cache_path /var/cache/nginx/django
levels=1:2
keys_zone=django_cache:100m
max_size=10g
inactive=60m
use_temp_path=off;
# Rate limiting
limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;
limit_req_zone $binary_remote_addr zone=login:10m rate=5r/m;
server {
listen 80;
server_name example.com www.example.com;
# Security headers
add_header X-Frame-Options DENY;
add_header X-Content-Type-Options nosniff;
add_header X-XSS-Protection "1; mode=block";
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains";
# Gzip compression
gzip on;
gzip_vary on;
gzip_min_length 1024;
gzip_types
text/plain
text/css
text/xml
text/javascript
application/javascript
application/xml+rss
application/json;
# Static files with long cache times
location /static/ {
alias /app/staticfiles/;
expires 1y;
add_header Cache-Control "public, immutable";
# Brotli compression for static files
location ~* \.(js|css)$ {
add_header Content-Encoding br;
add_header Vary "Accept-Encoding";
}
}
# Media files with moderate cache times
location /media/ {
alias /app/media/;
expires 30d;
add_header Cache-Control "public";
}
# API endpoints with short cache times
location /api/ {
limit_req zone=api burst=20 nodelay;
proxy_cache django_cache;
proxy_cache_key "$scheme$request_method$host$request_uri";
proxy_cache_valid 200 302 5m;
proxy_cache_valid 404 1m;
proxy_cache_bypass $http_cache_control;
add_header X-Cache-Status $upstream_cache_status;
proxy_pass http://django_app;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
# Authentication endpoints with rate limiting
location ~ ^/(login|register|password-reset)/ {
limit_req zone=login burst=5 nodelay;
proxy_pass http://django_app;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
# Main application with caching
location / {
# Cache configuration
proxy_cache django_cache;
proxy_cache_key "$scheme$request_method$host$request_uri$is_args$args";
proxy_cache_valid 200 302 10m;
proxy_cache_valid 404 5m;
# Cache bypass conditions
proxy_cache_bypass $cookie_sessionid;
proxy_cache_bypass $http_authorization;
proxy_cache_bypass $arg_nocache;
# Don't cache if user is authenticated
proxy_no_cache $cookie_sessionid;
add_header X-Cache-Status $upstream_cache_status;
proxy_pass http://django_app;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# Connection settings
proxy_connect_timeout 30s;
proxy_send_timeout 30s;
proxy_read_timeout 30s;
proxy_buffering on;
proxy_buffer_size 4k;
proxy_buffers 8 4k;
}
# Health check endpoint (no caching)
location /health/ {
proxy_pass http://django_app;
proxy_set_header Host $host;
access_log off;
}
# Cache purge endpoint (restricted access)
location ~ /purge(/.*) {
allow 127.0.0.1;
allow 10.0.0.0/8;
deny all;
proxy_cache_purge django_cache "$scheme$request_method$host$1";
}
}
# SSL configuration (separate server block)
server {
listen 443 ssl http2;
server_name example.com www.example.com;
ssl_certificate /etc/letsencrypt/live/example.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/example.com/privkey.pem;
# SSL optimization
ssl_session_cache shared:SSL:10m;
ssl_session_timeout 10m;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384;
ssl_prefer_server_ciphers off;
# Include the same location blocks as HTTP server
# ... (same as above)
}
# /etc/varnish/default.vcl
vcl 4.1;
import directors;
import std;
# Backend servers
backend django1 {
.host = "127.0.0.1";
.port = "8000";
.probe = {
.url = "/health/";
.timeout = 5s;
.interval = 10s;
.window = 5;
.threshold = 3;
}
}
backend django2 {
.host = "127.0.0.1";
.port = "8001";
.probe = {
.url = "/health/";
.timeout = 5s;
.interval = 10s;
.window = 5;
.threshold = 3;
}
}
# Load balancer
sub vcl_init {
new django_director = directors.round_robin();
django_director.add_backend(django1);
django_director.add_backend(django2);
}
sub vcl_recv {
# Set backend
set req.backend_hint = django_director.backend();
# Remove Google Analytics cookies
set req.http.Cookie = regsuball(req.http.Cookie, "(^|;\s*)(_ga|_gid|_gat)=[^;]*", "");
set req.http.Cookie = regsuball(req.http.Cookie, "^;\s*", "");
# Don't cache authenticated requests
if (req.http.Authorization || req.http.Cookie ~ "sessionid") {
return (pass);
}
# Don't cache POST, PUT, DELETE requests
if (req.method != "GET" && req.method != "HEAD") {
return (pass);
}
# Don't cache admin pages
if (req.url ~ "^/admin/") {
return (pass);
}
# Cache static files for a long time
if (req.url ~ "^/static/") {
unset req.http.Cookie;
return (hash);
}
# Cache API responses briefly
if (req.url ~ "^/api/") {
unset req.http.Cookie;
return (hash);
}
# Handle cache purging
if (req.method == "PURGE") {
if (!client.ip ~ purge_acl) {
return (synth(405, "Not allowed"));
}
return (purge);
}
return (hash);
}
sub vcl_backend_response {
# Set cache TTL based on response
if (bereq.url ~ "^/static/") {
set beresp.ttl = 1y;
set beresp.http.Cache-Control = "public, max-age=31536000, immutable";
} elsif (bereq.url ~ "^/api/") {
set beresp.ttl = 5m;
set beresp.http.Cache-Control = "public, max-age=300";
} elsif (beresp.status == 200) {
set beresp.ttl = 10m;
set beresp.http.Cache-Control = "public, max-age=600";
}
# Don't cache errors for long
if (beresp.status >= 400) {
set beresp.ttl = 1m;
}
# Enable ESI for dynamic content
if (beresp.http.Content-Type ~ "text/html") {
set beresp.do_esi = true;
}
return (deliver);
}
sub vcl_deliver {
# Add cache status header
if (obj.hits > 0) {
set resp.http.X-Cache = "HIT";
set resp.http.X-Cache-Hits = obj.hits;
} else {
set resp.http.X-Cache = "MISS";
}
# Remove backend information
unset resp.http.Server;
unset resp.http.X-Powered-By;
return (deliver);
}
# ACL for cache purging
acl purge_acl {
"localhost";
"127.0.0.1";
"10.0.0.0"/8;
}
# utils/db_cache.py
from django.core.cache import cache
from django.db import models
from django.db.models.signals import post_save, post_delete
import hashlib
import json
class QueryCacheManager:
"""Advanced database query caching."""
def __init__(self, timeout=3600):
self.timeout = timeout
self.cache = cache
def cache_queryset(self, queryset, cache_key=None, timeout=None):
"""Cache queryset results with automatic invalidation."""
if cache_key is None:
# Generate cache key from query
query_str = str(queryset.query)
cache_key = f"queryset:{hashlib.md5(query_str.encode()).hexdigest()}"
if timeout is None:
timeout = self.timeout
# Check cache first
cached_result = self.cache.get(cache_key)
if cached_result is not None:
return cached_result
# Execute query and cache results
result = list(queryset)
# Store with dependency tracking
self._store_with_dependencies(cache_key, result, queryset.model, timeout)
return result
def _store_with_dependencies(self, cache_key, result, model, timeout):
"""Store cached result with model dependency tracking."""
# Store the actual result
self.cache.set(cache_key, result, timeout)
# Track dependencies for invalidation
model_key = f"model_deps:{model._meta.label_lower}"
dependent_keys = self.cache.get(model_key, set())
dependent_keys.add(cache_key)
self.cache.set(model_key, dependent_keys, timeout)
def invalidate_model_cache(self, model):
"""Invalidate all cached queries for a model."""
model_key = f"model_deps:{model._meta.label_lower}"
dependent_keys = self.cache.get(model_key, set())
if dependent_keys:
# Delete all dependent cache keys
self.cache.delete_many(list(dependent_keys))
# Clear the dependency tracking
self.cache.delete(model_key)
# Global query cache manager
query_cache = QueryCacheManager()
# Automatic cache invalidation
@receiver([post_save, post_delete])
def invalidate_model_cache_on_change(sender, **kwargs):
"""Invalidate model cache when any instance changes."""
query_cache.invalidate_model_cache(sender)
# Enhanced model manager with caching
class CachedManager(models.Manager):
"""Model manager with built-in caching."""
def __init__(self, cache_timeout=3600):
super().__init__()
self.cache_timeout = cache_timeout
def cached_filter(self, **kwargs):
"""Filter with automatic caching."""
# Generate cache key from filter parameters
filter_str = json.dumps(kwargs, sort_keys=True)
cache_key = f"{self.model._meta.label_lower}:filter:{hashlib.md5(filter_str.encode()).hexdigest()}"
queryset = self.filter(**kwargs)
return query_cache.cache_queryset(queryset, cache_key, self.cache_timeout)
def cached_get(self, **kwargs):
"""Get with caching (use carefully - only for immutable lookups)."""
filter_str = json.dumps(kwargs, sort_keys=True)
cache_key = f"{self.model._meta.label_lower}:get:{hashlib.md5(filter_str.encode()).hexdigest()}"
# Try cache first
cached_result = cache.get(cache_key)
if cached_result is not None:
return cached_result
# Get from database
try:
result = self.get(**kwargs)
cache.set(cache_key, result, self.cache_timeout)
return result
except self.model.DoesNotExist:
# Cache negative results briefly
cache.set(cache_key, None, 300)
raise
def cached_count(self, **kwargs):
"""Count with caching."""
filter_str = json.dumps(kwargs, sort_keys=True)
cache_key = f"{self.model._meta.label_lower}:count:{hashlib.md5(filter_str.encode()).hexdigest()}"
cached_count = cache.get(cache_key)
if cached_count is not None:
return cached_count
count = self.filter(**kwargs).count()
cache.set(cache_key, count, self.cache_timeout)
return count
# Usage in models
class Post(models.Model):
title = models.CharField(max_length=200)
content = models.TextField()
published = models.BooleanField(default=False)
created_at = models.DateTimeField(auto_now_add=True)
objects = models.Manager() # Default manager
cached = CachedManager(cache_timeout=1800) # Cached manager
class Meta:
ordering = ['-created_at']
# Usage examples
# Cached filtering
popular_posts = Post.cached.cached_filter(published=True, views__gte=100)
# Cached counting
published_count = Post.cached.cached_count(published=True)
# Regular queryset with manual caching
recent_posts = query_cache.cache_queryset(
Post.objects.filter(published=True)[:10],
cache_key='recent_posts',
timeout=900
)
# monitoring/cache_monitor.py
import time
import json
from django.core.cache import cache, caches
from django.core.management.base import BaseCommand
from django.conf import settings
import logging
logger = logging.getLogger('cache_monitor')
class CacheMonitor:
"""Monitor cache performance across all backends."""
def __init__(self):
self.backends = {}
for alias, config in settings.CACHES.items():
self.backends[alias] = caches[alias]
def get_cache_stats(self):
"""Get statistics for all cache backends."""
stats = {}
for alias, cache_backend in self.backends.items():
try:
backend_stats = self._get_backend_stats(alias, cache_backend)
stats[alias] = backend_stats
except Exception as e:
logger.error(f"Failed to get stats for {alias}: {e}")
stats[alias] = {'error': str(e)}
return stats
def _get_backend_stats(self, alias, cache_backend):
"""Get statistics for a specific cache backend."""
stats = {
'backend_type': cache_backend.__class__.__name__,
'alias': alias,
}
# Redis-specific stats
if 'redis' in cache_backend.__class__.__module__.lower():
try:
from django_redis import get_redis_connection
redis_conn = get_redis_connection(alias)
info = redis_conn.info()
stats.update({
'connected_clients': info.get('connected_clients', 0),
'used_memory': info.get('used_memory', 0),
'used_memory_human': info.get('used_memory_human', '0B'),
'keyspace_hits': info.get('keyspace_hits', 0),
'keyspace_misses': info.get('keyspace_misses', 0),
'total_commands_processed': info.get('total_commands_processed', 0),
})
# Calculate hit rate
hits = stats['keyspace_hits']
misses = stats['keyspace_misses']
total = hits + misses
stats['hit_rate'] = (hits / total * 100) if total > 0 else 0
except Exception as e:
stats['redis_error'] = str(e)
# Memcached-specific stats
elif 'memcached' in cache_backend.__class__.__module__.lower():
try:
# This would require additional implementation
# depending on the memcached client
pass
except Exception as e:
stats['memcached_error'] = str(e)
return stats
def check_cache_health(self):
"""Check health of all cache backends."""
health_status = {}
for alias, cache_backend in self.backends.items():
try:
# Test basic operations
test_key = f"health_check_{alias}_{int(time.time())}"
test_value = "health_check_value"
# Test set
cache_backend.set(test_key, test_value, 60)
# Test get
retrieved_value = cache_backend.get(test_key)
# Test delete
cache_backend.delete(test_key)
health_status[alias] = {
'status': 'healthy' if retrieved_value == test_value else 'unhealthy',
'response_time': self._measure_response_time(cache_backend),
}
except Exception as e:
health_status[alias] = {
'status': 'error',
'error': str(e),
'response_time': None,
}
return health_status
def _measure_response_time(self, cache_backend):
"""Measure cache response time."""
test_key = f"perf_test_{int(time.time())}"
test_value = "performance_test"
# Measure set operation
start_time = time.time()
cache_backend.set(test_key, test_value, 60)
set_time = time.time() - start_time
# Measure get operation
start_time = time.time()
cache_backend.get(test_key)
get_time = time.time() - start_time
# Cleanup
cache_backend.delete(test_key)
return {
'set_time': set_time,
'get_time': get_time,
'avg_time': (set_time + get_time) / 2,
}
def generate_report(self):
"""Generate comprehensive cache monitoring report."""
report = {
'timestamp': time.time(),
'stats': self.get_cache_stats(),
'health': self.check_cache_health(),
}
return report
# Management command for cache monitoring
class Command(BaseCommand):
help = 'Monitor cache performance and health'
def add_arguments(self, parser):
parser.add_argument(
'--format',
choices=['json', 'table'],
default='table',
help='Output format'
)
parser.add_argument(
'--continuous',
action='store_true',
help='Run continuous monitoring'
)
parser.add_argument(
'--interval',
type=int,
default=60,
help='Monitoring interval in seconds'
)
def handle(self, *args, **options):
monitor = CacheMonitor()
if options['continuous']:
self.run_continuous_monitoring(monitor, options)
else:
self.run_single_report(monitor, options)
def run_single_report(self, monitor, options):
"""Run a single monitoring report."""
report = monitor.generate_report()
if options['format'] == 'json':
self.stdout.write(json.dumps(report, indent=2))
else:
self.print_table_report(report)
def run_continuous_monitoring(self, monitor, options):
"""Run continuous monitoring."""
interval = options['interval']
self.stdout.write(f'Starting continuous monitoring (interval: {interval}s)')
try:
while True:
report = monitor.generate_report()
if options['format'] == 'json':
self.stdout.write(json.dumps(report, indent=2))
else:
self.print_table_report(report)
time.sleep(interval)
except KeyboardInterrupt:
self.stdout.write('\nMonitoring stopped')
def print_table_report(self, report):
"""Print report in table format."""
self.stdout.write('\n' + '='*80)
self.stdout.write(f"Cache Monitoring Report - {time.ctime(report['timestamp'])}")
self.stdout.write('='*80)
# Health status
self.stdout.write('\nHealth Status:')
for alias, health in report['health'].items():
status = health['status']
color = self.style.SUCCESS if status == 'healthy' else self.style.ERROR
self.stdout.write(f" {alias}: {color(status.upper())}")
if health.get('response_time'):
rt = health['response_time']
self.stdout.write(f" Response time: {rt['avg_time']:.4f}s")
# Cache statistics
self.stdout.write('\nCache Statistics:')
for alias, stats in report['stats'].items():
if 'error' in stats:
self.stdout.write(f" {alias}: {self.style.ERROR('ERROR')} - {stats['error']}")
continue
self.stdout.write(f" {alias} ({stats['backend_type']}):")
if 'hit_rate' in stats:
hit_rate = stats['hit_rate']
color = self.style.SUCCESS if hit_rate > 80 else self.style.WARNING
self.stdout.write(f" Hit rate: {color(f'{hit_rate:.1f}%')}")
if 'used_memory_human' in stats:
self.stdout.write(f" Memory usage: {stats['used_memory_human']}")
if 'connected_clients' in stats:
self.stdout.write(f" Connected clients: {stats['connected_clients']}")
# deploy/cache_deployment.py
import subprocess
import time
import requests
from django.core.management.base import BaseCommand
from django.conf import settings
class CacheDeploymentManager:
"""Manage cache operations during deployment."""
def __init__(self):
self.redis_hosts = settings.REDIS_HOSTS
self.cdn_config = settings.CDN_CONFIG
def pre_deployment_cache_warm(self):
"""Warm cache before deployment."""
self.stdout.write('Starting pre-deployment cache warming...')
# Warm critical pages
critical_urls = [
'/',
'/blog/',
'/api/posts/',
'/about/',
]
for url in critical_urls:
try:
response = requests.get(f"https://{settings.DOMAIN}{url}")
if response.status_code == 200:
self.stdout.write(f"Warmed: {url}")
else:
self.stdout.write(f"Failed to warm: {url} ({response.status_code})")
except Exception as e:
self.stdout.write(f"Error warming {url}: {e}")
def deployment_cache_strategy(self):
"""Execute cache strategy during deployment."""
# 1. Increment cache version to invalidate old cache
self.increment_cache_version()
# 2. Clear specific cache patterns
self.clear_deployment_cache()
# 3. Warm critical cache entries
self.warm_critical_cache()
# 4. Update CDN cache
self.update_cdn_cache()
def increment_cache_version(self):
"""Increment cache version for cache invalidation."""
from django.core.cache import cache
current_version = cache.get('cache_version', 1)
new_version = current_version + 1
cache.set('cache_version', new_version, timeout=None)
self.stdout.write(f'Cache version incremented: {current_version} -> {new_version}')
def clear_deployment_cache(self):
"""Clear cache entries that should be refreshed on deployment."""
from django.core.cache import cache
# Clear template fragment cache
patterns_to_clear = [
'template.cache.*',
'view_cache.*',
'api_cache.*',
]
# Redis-specific cache clearing
try:
from django_redis import get_redis_connection
redis_conn = get_redis_connection("default")
for pattern in patterns_to_clear:
keys = redis_conn.keys(pattern)
if keys:
redis_conn.delete(*keys)
self.stdout.write(f'Cleared {len(keys)} keys matching {pattern}')
except ImportError:
# Fallback for non-Redis backends
cache.clear()
self.stdout.write('Cleared entire cache (non-Redis backend)')
def warm_critical_cache(self):
"""Warm critical cache entries after deployment."""
# Import here to avoid circular imports
from django.core.management import call_command
try:
call_command('warm_cache', '--critical-only')
self.stdout.write('Critical cache warming completed')
except Exception as e:
self.stdout.write(f'Cache warming failed: {e}')
def update_cdn_cache(self):
"""Update CDN cache after deployment."""
try:
# Purge CDN cache for updated assets
from utils.cdn import CloudFlareCache
cf_cache = CloudFlareCache()
# Purge static files
cf_cache.purge_cache(tags=['static-assets'])
# Purge main pages
cf_cache.purge_cache(urls=[
f"https://{settings.DOMAIN}/",
f"https://{settings.DOMAIN}/blog/",
])
self.stdout.write('CDN cache purged successfully')
except Exception as e:
self.stdout.write(f'CDN cache purge failed: {e}')
def post_deployment_verification(self):
"""Verify cache is working correctly after deployment."""
verification_urls = [
('/', 'Home page'),
('/blog/', 'Blog list'),
('/api/health/', 'Health check'),
]
for url, description in verification_urls:
try:
# First request (should be cache miss)
start_time = time.time()
response1 = requests.get(f"https://{settings.DOMAIN}{url}")
first_time = time.time() - start_time
# Second request (should be cache hit)
start_time = time.time()
response2 = requests.get(f"https://{settings.DOMAIN}{url}")
second_time = time.time() - start_time
if response1.status_code == 200 and response2.status_code == 200:
cache_improvement = ((first_time - second_time) / first_time) * 100
self.stdout.write(
f"{description}: OK (Cache improvement: {cache_improvement:.1f}%)"
)
else:
self.stdout.write(f"{description}: FAILED")
except Exception as e:
self.stdout.write(f"{description}: ERROR - {e}")
# Management command for deployment
class Command(BaseCommand):
help = 'Manage cache during deployment'
def add_arguments(self, parser):
parser.add_argument(
'--phase',
choices=['pre', 'deploy', 'post'],
required=True,
help='Deployment phase'
)
def handle(self, *args, **options):
manager = CacheDeploymentManager()
manager.stdout = self.stdout
phase = options['phase']
if phase == 'pre':
manager.pre_deployment_cache_warm()
elif phase == 'deploy':
manager.deployment_cache_strategy()
elif phase == 'post':
manager.post_deployment_verification()
self.stdout.write(
self.style.SUCCESS(f'Deployment phase "{phase}" completed')
)
Deployment-level caching requires careful orchestration of multiple caching layers, from browser caches to CDNs and reverse proxies. The key is implementing a coherent strategy that maximizes performance while maintaining cache consistency and providing graceful degradation when cache systems fail. Start with basic HTTP caching headers and gradually implement more sophisticated patterns like CDN integration and multi-tier cache architectures as your application scales.
Template Fragment Caching
Template fragment caching allows you to cache specific portions of templates rather than entire pages, providing fine-grained control over what gets cached and when. This approach is particularly effective for templates with mixed dynamic and static content, enabling you to cache expensive template fragments while keeping other parts dynamic and personalized.
Asynchronous Django
Modern web applications demand high concurrency, real-time features, and efficient resource utilization. Django's asynchronous capabilities enable building applications that handle thousands of concurrent connections, provide real-time updates, and process background tasks efficiently. This comprehensive guide covers Django's async ecosystem, from ASGI and async views to WebSockets and background task processing.