Performance optimization in microservices requires a multi-layered approach to caching, from application-level caching to distributed caching strategies. This chapter covers comprehensive caching techniques, performance monitoring, and optimization strategies for Django microservices.
Microservices face unique performance challenges:
# shared/cache_config.py
import os
CACHES = {
'default': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': os.getenv('REDIS_URL', 'redis://localhost:6379/0'),
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
'CONNECTION_POOL_KWARGS': {
'max_connections': 50,
'retry_on_timeout': True,
},
'SERIALIZER': 'django_redis.serializers.json.JSONSerializer',
'COMPRESSOR': 'django_redis.compressors.zlib.ZlibCompressor',
},
'KEY_PREFIX': 'microservices',
'VERSION': 1,
'TIMEOUT': 300, # 5 minutes default
},
'sessions': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': os.getenv('REDIS_URL', 'redis://localhost:6379/1'),
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
},
'KEY_PREFIX': 'sessions',
'TIMEOUT': 86400, # 24 hours
},
'api_cache': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': os.getenv('REDIS_URL', 'redis://localhost:6379/2'),
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
},
'KEY_PREFIX': 'api',
'TIMEOUT': 600, # 10 minutes
}
}
# Session configuration
SESSION_ENGINE = 'django.contrib.sessions.backends.cache'
SESSION_CACHE_ALIAS = 'sessions'
SESSION_COOKIE_AGE = 86400 # 24 hours
# user_service/settings/production.py
from shared.cache_config import CACHES
# Add service-specific cache configuration
CACHES['user_cache'] = {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': os.getenv('REDIS_URL', 'redis://localhost:6379/3'),
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
},
'KEY_PREFIX': 'user_service',
'TIMEOUT': 1800, # 30 minutes
}
# Cache middleware
MIDDLEWARE = [
'django.middleware.cache.UpdateCacheMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.cache.FetchFromCacheMiddleware',
# ... other middleware
]
# Cache settings
CACHE_MIDDLEWARE_ALIAS = 'default'
CACHE_MIDDLEWARE_SECONDS = 600
CACHE_MIDDLEWARE_KEY_PREFIX = 'user_service'
# user_service/views.py
from django.views.decorators.cache import cache_page
from django.views.decorators.vary import vary_on_headers
from django.core.cache import cache, caches
from django.utils.decorators import method_decorator
from rest_framework.decorators import api_view
from rest_framework.response import Response
import hashlib
import json
# Function-based view caching
@cache_page(60 * 15) # Cache for 15 minutes
@vary_on_headers('Authorization')
@api_view(['GET'])
def user_list(request):
"""Cached user list endpoint"""
users = User.objects.select_related('profile').all()
serializer = UserSerializer(users, many=True)
return Response(serializer.data)
# Class-based view caching
@method_decorator(cache_page(60 * 10), name='get')
@method_decorator(vary_on_headers('Authorization'), name='get')
class UserDetailView(RetrieveAPIView):
queryset = User.objects.select_related('profile')
serializer_class = UserDetailSerializer
# Custom cache key generation
def generate_cache_key(prefix, *args, **kwargs):
"""Generate consistent cache keys"""
key_data = {
'args': args,
'kwargs': sorted(kwargs.items())
}
key_string = json.dumps(key_data, sort_keys=True)
key_hash = hashlib.md5(key_string.encode()).hexdigest()
return f"{prefix}:{key_hash}"
@api_view(['GET'])
def user_statistics(request):
"""Custom caching with dynamic keys"""
filters = request.GET.dict()
cache_key = generate_cache_key('user_stats', **filters)
# Try to get from cache
cached_result = cache.get(cache_key)
if cached_result is not None:
return Response(cached_result)
# Calculate statistics
stats = calculate_user_statistics(filters)
# Cache the result for 1 hour
cache.set(cache_key, stats, 3600)
return Response(stats)
# user_service/templatetags/cache_tags.py
from django import template
from django.core.cache import cache
from django.template.loader import render_to_string
import hashlib
register = template.Library()
@register.simple_tag(takes_context=True)
def cached_include(context, template_name, cache_timeout=300, **kwargs):
"""Cache template fragments with custom keys"""
# Generate cache key from template name and context
cache_key_data = {
'template': template_name,
'context': {k: str(v) for k, v in kwargs.items()},
'user_id': getattr(context.get('user'), 'id', 'anonymous')
}
cache_key = hashlib.md5(
str(cache_key_data).encode()
).hexdigest()
# Try to get cached content
cached_content = cache.get(f"template_fragment:{cache_key}")
if cached_content is not None:
return cached_content
# Render template
content = render_to_string(template_name, {**context.flatten(), **kwargs})
# Cache the rendered content
cache.set(f"template_fragment:{cache_key}", content, cache_timeout)
return content
# user_service/managers.py
from django.db import models
from django.core.cache import cache
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
class CachedUserManager(models.Manager):
"""Manager with built-in caching"""
def get_cached_user(self, user_id, timeout=1800):
"""Get user with caching"""
cache_key = f"user:{user_id}"
# Try cache first
user = cache.get(cache_key)
if user is not None:
return user
# Get from database
try:
user = self.select_related('profile').get(id=user_id)
cache.set(cache_key, user, timeout)
return user
except self.model.DoesNotExist:
# Cache negative results for shorter time
cache.set(cache_key, None, 60)
raise
def get_user_list_cached(self, filters=None, timeout=600):
"""Get filtered user list with caching"""
filters = filters or {}
cache_key = f"user_list:{hash(frozenset(filters.items()))}"
# Try cache first
users = cache.get(cache_key)
if users is not None:
return users
# Query database
queryset = self.select_related('profile')
for field, value in filters.items():
queryset = queryset.filter(**{field: value})
users = list(queryset.all())
cache.set(cache_key, users, timeout)
return users
# Cache invalidation signals
@receiver([post_save, post_delete], sender='user_service.User')
def invalidate_user_cache(sender, instance, **kwargs):
"""Invalidate user-related caches on model changes"""
cache_keys = [
f"user:{instance.id}",
"user_list:*", # Pattern for list caches
f"user_stats:*", # Pattern for statistics caches
]
# Delete specific keys
for key in cache_keys:
if '*' in key:
# Delete pattern-based keys
cache.delete_many(cache.keys(key.replace('*', '*')))
else:
cache.delete(key)
# shared/cache_layers.py
from django.core.cache import caches
import logging
import pickle
import time
logger = logging.getLogger(__name__)
class MultiLevelCache:
"""Multi-level caching with L1 (local) and L2 (distributed) cache"""
def __init__(self, l1_cache='default', l2_cache='api_cache'):
self.l1_cache = caches[l1_cache]
self.l2_cache = caches[l2_cache]
self.l1_timeout = 300 # 5 minutes
self.l2_timeout = 1800 # 30 minutes
def get(self, key):
"""Get value from multi-level cache"""
# Try L1 cache first (fastest)
value = self.l1_cache.get(key)
if value is not None:
logger.debug(f"Cache hit L1: {key}")
return value
# Try L2 cache
value = self.l2_cache.get(key)
if value is not None:
logger.debug(f"Cache hit L2: {key}")
# Populate L1 cache
self.l1_cache.set(key, value, self.l1_timeout)
return value
logger.debug(f"Cache miss: {key}")
return None
def set(self, key, value, timeout=None):
"""Set value in both cache levels"""
l1_timeout = min(timeout or self.l1_timeout, self.l1_timeout)
l2_timeout = timeout or self.l2_timeout
self.l1_cache.set(key, value, l1_timeout)
self.l2_cache.set(key, value, l2_timeout)
def delete(self, key):
"""Delete from both cache levels"""
self.l1_cache.delete(key)
self.l2_cache.delete(key)
# Usage in services
multi_cache = MultiLevelCache()
def get_user_profile(user_id):
"""Get user profile with multi-level caching"""
cache_key = f"user_profile:{user_id}"
# Try cache first
profile = multi_cache.get(cache_key)
if profile is not None:
return profile
# Get from database
try:
user = User.objects.select_related('profile').get(id=user_id)
profile_data = {
'id': user.id,
'username': user.username,
'email': user.email,
'profile': {
'bio': user.profile.bio,
'avatar_url': user.profile.avatar_url,
}
}
# Cache for 30 minutes
multi_cache.set(cache_key, profile_data, 1800)
return profile_data
except User.DoesNotExist:
return None
# user_service/management/commands/warm_cache.py
from django.core.management.base import BaseCommand
from django.core.cache import cache
from django.contrib.auth.models import User
from user_service.services import UserService
import logging
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = 'Warm up application caches'
def add_arguments(self, parser):
parser.add_argument(
'--batch-size',
type=int,
default=100,
help='Batch size for processing users'
)
parser.add_argument(
'--cache-timeout',
type=int,
default=3600,
help='Cache timeout in seconds'
)
def handle(self, *args, **options):
batch_size = options['batch_size']
cache_timeout = options['cache_timeout']
self.stdout.write('Starting cache warming...')
# Warm user profile cache
self.warm_user_profiles(batch_size, cache_timeout)
# Warm user statistics cache
self.warm_user_statistics(cache_timeout)
# Warm frequently accessed data
self.warm_frequent_data(cache_timeout)
self.stdout.write(
self.style.SUCCESS('Cache warming completed successfully')
)
def warm_user_profiles(self, batch_size, timeout):
"""Warm user profile cache"""
user_service = UserService()
total_users = User.objects.count()
self.stdout.write(f'Warming {total_users} user profiles...')
for i in range(0, total_users, batch_size):
users = User.objects.select_related('profile')[i:i + batch_size]
for user in users:
cache_key = f"user_profile:{user.id}"
profile_data = user_service.get_profile_data(user)
cache.set(cache_key, profile_data, timeout)
self.stdout.write(f'Processed {min(i + batch_size, total_users)} users')
def warm_user_statistics(self, timeout):
"""Warm user statistics cache"""
self.stdout.write('Warming user statistics...')
# Common statistics queries
stats_queries = [
{'is_active': True},
{'date_joined__gte': timezone.now() - timedelta(days=30)},
{'is_staff': True},
]
for filters in stats_queries:
cache_key = f"user_stats:{hash(frozenset(filters.items()))}"
stats = self.calculate_user_stats(filters)
cache.set(cache_key, stats, timeout)
def warm_frequent_data(self, timeout):
"""Warm frequently accessed data"""
self.stdout.write('Warming frequent data...')
# Cache popular user lists
popular_users = User.objects.filter(
is_active=True
).order_by('-last_login')[:100]
cache.set('popular_users', list(popular_users), timeout)
# Cache system statistics
system_stats = {
'total_users': User.objects.count(),
'active_users': User.objects.filter(is_active=True).count(),
'new_users_today': User.objects.filter(
date_joined__date=timezone.now().date()
).count(),
}
cache.set('system_stats', system_stats, timeout)
# shared/decorators.py
from functools import wraps
from django.core.cache import cache
from django.http import JsonResponse
import json
import hashlib
import logging
logger = logging.getLogger(__name__)
def cache_api_response(timeout=300, cache_alias='api_cache', vary_on=None):
"""Decorator for caching API responses"""
def decorator(view_func):
@wraps(view_func)
def wrapper(request, *args, **kwargs):
# Generate cache key
cache_key_parts = [
request.path,
request.method,
str(sorted(request.GET.items())),
]
# Add vary_on parameters
if vary_on:
for header in vary_on:
cache_key_parts.append(request.META.get(header, ''))
cache_key = hashlib.md5(
'|'.join(cache_key_parts).encode()
).hexdigest()
cache_key = f"api_response:{cache_key}"
# Try to get cached response
cached_response = cache.get(cache_key, cache_alias)
if cached_response is not None:
logger.debug(f"API cache hit: {cache_key}")
return JsonResponse(cached_response)
# Execute view function
response = view_func(request, *args, **kwargs)
# Cache successful responses
if hasattr(response, 'data') and response.status_code == 200:
cache.set(cache_key, response.data, timeout, cache_alias)
logger.debug(f"API response cached: {cache_key}")
return response
return wrapper
return decorator
# Usage in views
@cache_api_response(timeout=600, vary_on=['Authorization'])
@api_view(['GET'])
def user_dashboard_data(request):
"""Cached dashboard data endpoint"""
user = request.user
dashboard_data = {
'user_info': UserSerializer(user).data,
'recent_orders': get_recent_orders(user.id),
'notifications': get_user_notifications(user.id),
'statistics': get_user_statistics(user.id),
}
return Response(dashboard_data)
# shared/service_client.py
import requests
from django.core.cache import cache
from django.conf import settings
import logging
import time
logger = logging.getLogger(__name__)
class CachedServiceClient:
"""HTTP client with caching for inter-service communication"""
def __init__(self, base_url, cache_timeout=300, cache_alias='default'):
self.base_url = base_url.rstrip('/')
self.cache_timeout = cache_timeout
self.cache_alias = cache_alias
self.session = requests.Session()
# Set default headers
self.session.headers.update({
'Content-Type': 'application/json',
'User-Agent': f'{settings.SERVICE_NAME}/1.0'
})
def get(self, endpoint, params=None, cache_timeout=None, use_cache=True):
"""GET request with caching"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
if not use_cache:
return self._make_request('GET', url, params=params)
# Generate cache key
cache_key = self._generate_cache_key('GET', url, params)
# Try cache first
cached_response = cache.get(cache_key, cache_alias=self.cache_alias)
if cached_response is not None:
logger.debug(f"Service cache hit: {cache_key}")
return cached_response
# Make request
response_data = self._make_request('GET', url, params=params)
# Cache successful responses
if response_data.get('status_code') == 200:
timeout = cache_timeout or self.cache_timeout
cache.set(cache_key, response_data, timeout, self.cache_alias)
logger.debug(f"Service response cached: {cache_key}")
return response_data
def post(self, endpoint, data=None, invalidate_cache_patterns=None):
"""POST request with cache invalidation"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
response_data = self._make_request('POST', url, json=data)
# Invalidate related cache entries
if invalidate_cache_patterns and response_data.get('status_code') in [200, 201]:
self._invalidate_cache_patterns(invalidate_cache_patterns)
return response_data
def _make_request(self, method, url, **kwargs):
"""Make HTTP request with error handling"""
try:
response = self.session.request(method, url, timeout=10, **kwargs)
return {
'status_code': response.status_code,
'data': response.json() if response.content else None,
'headers': dict(response.headers)
}
except requests.RequestException as e:
logger.error(f"Service request failed: {method} {url} - {e}")
return {
'status_code': 500,
'error': str(e),
'data': None
}
def _generate_cache_key(self, method, url, params=None):
"""Generate consistent cache key"""
key_parts = [method, url]
if params:
key_parts.append(str(sorted(params.items())))
key_string = '|'.join(key_parts)
return f"service_request:{hashlib.md5(key_string.encode()).hexdigest()}"
def _invalidate_cache_patterns(self, patterns):
"""Invalidate cache entries matching patterns"""
for pattern in patterns:
# Simple pattern matching - in production, use Redis SCAN
cache_keys = cache.keys(f"service_request:*{pattern}*")
if cache_keys:
cache.delete_many(cache_keys)
logger.debug(f"Invalidated {len(cache_keys)} cache entries for pattern: {pattern}")
# Service clients
user_service_client = CachedServiceClient(
base_url=settings.USER_SERVICE_URL,
cache_timeout=600
)
order_service_client = CachedServiceClient(
base_url=settings.ORDER_SERVICE_URL,
cache_timeout=300
)
# Usage in services
def get_user_profile(user_id):
"""Get user profile from user service with caching"""
response = user_service_client.get(
f'/api/users/{user_id}/',
cache_timeout=1800 # Cache for 30 minutes
)
if response['status_code'] == 200:
return response['data']
else:
logger.error(f"Failed to get user profile: {response}")
return None
def create_order(order_data):
"""Create order and invalidate related caches"""
response = order_service_client.post(
'/api/orders/',
data=order_data,
invalidate_cache_patterns=['orders', f"user_{order_data['user_id']}"]
)
return response
# shared/cache_metrics.py
from django.core.cache import cache
from django.utils import timezone
import time
import logging
from contextlib import contextmanager
logger = logging.getLogger(__name__)
class CacheMetrics:
"""Cache performance monitoring"""
def __init__(self):
self.metrics = {
'hits': 0,
'misses': 0,
'sets': 0,
'deletes': 0,
'total_time': 0,
'operations': []
}
@contextmanager
def measure_operation(self, operation_type, key):
"""Context manager to measure cache operations"""
start_time = time.time()
try:
yield
self.metrics[operation_type] += 1
finally:
duration = time.time() - start_time
self.metrics['total_time'] += duration
self.metrics['operations'].append({
'type': operation_type,
'key': key,
'duration': duration,
'timestamp': timezone.now().isoformat()
})
# Log slow operations
if duration > 0.1: # 100ms threshold
logger.warning(f"Slow cache operation: {operation_type} {key} took {duration:.3f}s")
def get_hit_rate(self):
"""Calculate cache hit rate"""
total_reads = self.metrics['hits'] + self.metrics['misses']
if total_reads == 0:
return 0
return (self.metrics['hits'] / total_reads) * 100
def get_average_operation_time(self):
"""Calculate average operation time"""
total_ops = sum([
self.metrics['hits'],
self.metrics['misses'],
self.metrics['sets'],
self.metrics['deletes']
])
if total_ops == 0:
return 0
return self.metrics['total_time'] / total_ops
def reset(self):
"""Reset metrics"""
self.metrics = {
'hits': 0,
'misses': 0,
'sets': 0,
'deletes': 0,
'total_time': 0,
'operations': []
}
# Global metrics instance
cache_metrics = CacheMetrics()
class MonitoredCache:
"""Cache wrapper with performance monitoring"""
def __init__(self, cache_alias='default'):
self.cache = cache if cache_alias == 'default' else caches[cache_alias]
self.metrics = cache_metrics
def get(self, key, default=None):
"""Get with metrics tracking"""
with self.metrics.measure_operation('hits' if self.cache.has_key(key) else 'misses', key):
return self.cache.get(key, default)
def set(self, key, value, timeout=None):
"""Set with metrics tracking"""
with self.metrics.measure_operation('sets', key):
return self.cache.set(key, value, timeout)
def delete(self, key):
"""Delete with metrics tracking"""
with self.metrics.measure_operation('deletes', key):
return self.cache.delete(key)
# Usage
monitored_cache = MonitoredCache()
# Metrics endpoint
@api_view(['GET'])
def cache_metrics_view(request):
"""Endpoint to view cache metrics"""
metrics = {
'hit_rate': cache_metrics.get_hit_rate(),
'average_operation_time': cache_metrics.get_average_operation_time(),
'total_operations': cache_metrics.metrics,
'recent_operations': cache_metrics.metrics['operations'][-10:] # Last 10 operations
}
return Response(metrics)
# shared/query_optimization.py
from django.db import connection
from django.core.cache import cache
import logging
import time
from contextlib import contextmanager
logger = logging.getLogger(__name__)
@contextmanager
def monitor_queries():
"""Context manager to monitor database queries"""
initial_queries = len(connection.queries)
start_time = time.time()
try:
yield
finally:
end_time = time.time()
query_count = len(connection.queries) - initial_queries
duration = end_time - start_time
if query_count > 10: # Threshold for too many queries
logger.warning(f"High query count: {query_count} queries in {duration:.3f}s")
if duration > 1.0: # Threshold for slow operations
logger.warning(f"Slow operation: {duration:.3f}s with {query_count} queries")
class OptimizedQuerySet:
"""QuerySet wrapper with caching and optimization"""
def __init__(self, queryset, cache_timeout=300):
self.queryset = queryset
self.cache_timeout = cache_timeout
def cached_count(self):
"""Cached count query"""
cache_key = f"queryset_count:{hash(str(self.queryset.query))}"
count = cache.get(cache_key)
if count is None:
with monitor_queries():
count = self.queryset.count()
cache.set(cache_key, count, self.cache_timeout)
return count
def cached_exists(self):
"""Cached exists query"""
cache_key = f"queryset_exists:{hash(str(self.queryset.query))}"
exists = cache.get(cache_key)
if exists is None:
with monitor_queries():
exists = self.queryset.exists()
cache.set(cache_key, exists, self.cache_timeout)
return exists
def cached_list(self, select_related=None, prefetch_related=None):
"""Cached list query with optimizations"""
queryset = self.queryset
if select_related:
queryset = queryset.select_related(*select_related)
if prefetch_related:
queryset = queryset.prefetch_related(*prefetch_related)
cache_key = f"queryset_list:{hash(str(queryset.query))}"
results = cache.get(cache_key)
if results is None:
with monitor_queries():
results = list(queryset.all())
cache.set(cache_key, results, self.cache_timeout)
return results
# Usage in services
def get_user_orders_optimized(user_id):
"""Get user orders with query optimization"""
from order_service.models import Order
queryset = Order.objects.filter(user_id=user_id)
optimized_qs = OptimizedQuerySet(queryset, cache_timeout=600)
return optimized_qs.cached_list(
select_related=['user'],
prefetch_related=['items__product']
)
# shared/memory_optimization.py
import gc
import psutil
import logging
from django.core.management.base import BaseCommand
from django.core.cache import cache
logger = logging.getLogger(__name__)
class MemoryMonitor:
"""Monitor and optimize memory usage"""
def __init__(self):
self.process = psutil.Process()
def get_memory_usage(self):
"""Get current memory usage"""
memory_info = self.process.memory_info()
return {
'rss': memory_info.rss / 1024 / 1024, # MB
'vms': memory_info.vms / 1024 / 1024, # MB
'percent': self.process.memory_percent()
}
def optimize_memory(self):
"""Perform memory optimization"""
initial_memory = self.get_memory_usage()
# Force garbage collection
collected = gc.collect()
# Clear expired cache entries
self.clear_expired_cache()
final_memory = self.get_memory_usage()
logger.info(f"Memory optimization: {collected} objects collected, "
f"memory reduced from {initial_memory['rss']:.1f}MB to {final_memory['rss']:.1f}MB")
return {
'initial_memory': initial_memory,
'final_memory': final_memory,
'objects_collected': collected
}
def clear_expired_cache(self):
"""Clear expired cache entries"""
try:
# This is Redis-specific - adapt for other cache backends
from django_redis import get_redis_connection
redis_conn = get_redis_connection("default")
# Get all keys
keys = redis_conn.keys("*")
expired_keys = []
for key in keys:
ttl = redis_conn.ttl(key)
if ttl == -1: # No expiration set
continue
elif ttl <= 0: # Expired
expired_keys.append(key)
if expired_keys:
redis_conn.delete(*expired_keys)
logger.info(f"Cleared {len(expired_keys)} expired cache entries")
except Exception as e:
logger.error(f"Failed to clear expired cache: {e}")
# Memory monitoring middleware
class MemoryMonitoringMiddleware:
"""Middleware to monitor memory usage per request"""
def __init__(self, get_response):
self.get_response = get_response
self.memory_monitor = MemoryMonitor()
def __call__(self, request):
initial_memory = self.memory_monitor.get_memory_usage()
response = self.get_response(request)
final_memory = self.memory_monitor.get_memory_usage()
memory_diff = final_memory['rss'] - initial_memory['rss']
# Log high memory usage requests
if memory_diff > 50: # 50MB threshold
logger.warning(f"High memory usage request: {request.path} used {memory_diff:.1f}MB")
# Add memory usage to response headers (for debugging)
if hasattr(response, '__setitem__'):
response['X-Memory-Usage'] = f"{final_memory['rss']:.1f}MB"
response['X-Memory-Delta'] = f"{memory_diff:.1f}MB"
return response
# Management command for memory optimization
class Command(BaseCommand):
help = 'Optimize memory usage'
def handle(self, *args, **options):
monitor = MemoryMonitor()
result = monitor.optimize_memory()
self.stdout.write(
self.style.SUCCESS(
f"Memory optimization completed: "
f"{result['objects_collected']} objects collected"
)
)
# shared/cache_patterns.py
from django.core.cache import cache
from django.db import transaction
import logging
logger = logging.getLogger(__name__)
class CacheAsidePattern:
"""Implement cache-aside pattern with write-through"""
def __init__(self, model_class, cache_timeout=1800):
self.model_class = model_class
self.cache_timeout = cache_timeout
def get(self, pk):
"""Get object with cache-aside pattern"""
cache_key = f"{self.model_class._meta.label_lower}:{pk}"
# Try cache first
obj = cache.get(cache_key)
if obj is not None:
logger.debug(f"Cache hit: {cache_key}")
return obj
# Get from database
try:
obj = self.model_class.objects.get(pk=pk)
# Cache the object
cache.set(cache_key, obj, self.cache_timeout)
logger.debug(f"Cached object: {cache_key}")
return obj
except self.model_class.DoesNotExist:
# Cache negative result
cache.set(cache_key, None, 60) # Short timeout for negative results
return None
def save(self, obj):
"""Save object with write-through caching"""
cache_key = f"{self.model_class._meta.label_lower}:{obj.pk}"
with transaction.atomic():
# Save to database
obj.save()
# Update cache
cache.set(cache_key, obj, self.cache_timeout)
logger.debug(f"Write-through cache update: {cache_key}")
return obj
def delete(self, obj):
"""Delete object and invalidate cache"""
cache_key = f"{self.model_class._meta.label_lower}:{obj.pk}"
with transaction.atomic():
# Delete from database
obj.delete()
# Remove from cache
cache.delete(cache_key)
logger.debug(f"Cache invalidated: {cache_key}")
# Usage
from user_service.models import User
user_cache = CacheAsidePattern(User, cache_timeout=3600)
def get_user_cached(user_id):
return user_cache.get(user_id)
def update_user_cached(user):
return user_cache.save(user)
# shared/cache_invalidation.py
from django.core.cache import cache
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
import json
import logging
logger = logging.getLogger(__name__)
class CacheInvalidationManager:
"""Manage cache invalidation across services"""
def __init__(self):
self.invalidation_patterns = {}
def register_pattern(self, model_class, patterns):
"""Register cache invalidation patterns for a model"""
model_label = model_class._meta.label_lower
self.invalidation_patterns[model_label] = patterns
def invalidate_for_instance(self, instance, action='update'):
"""Invalidate cache for a model instance"""
model_label = instance._meta.label_lower
patterns = self.invalidation_patterns.get(model_label, [])
for pattern in patterns:
if callable(pattern):
keys_to_invalidate = pattern(instance, action)
else:
keys_to_invalidate = [pattern.format(instance=instance)]
for key in keys_to_invalidate:
if '*' in key:
# Pattern-based invalidation
self.invalidate_pattern(key)
else:
# Direct key invalidation
cache.delete(key)
logger.debug(f"Invalidated cache key: {key}")
def invalidate_pattern(self, pattern):
"""Invalidate cache keys matching a pattern"""
try:
# Redis-specific pattern matching
from django_redis import get_redis_connection
redis_conn = get_redis_connection("default")
keys = redis_conn.keys(pattern)
if keys:
redis_conn.delete(*keys)
logger.debug(f"Invalidated {len(keys)} keys matching pattern: {pattern}")
except Exception as e:
logger.error(f"Failed to invalidate pattern {pattern}: {e}")
# Global invalidation manager
cache_invalidation_manager = CacheInvalidationManager()
# Register invalidation patterns
def user_invalidation_patterns(instance, action):
"""Generate cache keys to invalidate for user changes"""
patterns = [
f"user:{instance.id}",
f"user_profile:{instance.id}",
"user_list:*",
"user_stats:*",
]
# Add service-specific patterns
if hasattr(instance, 'email'):
patterns.append(f"user_by_email:{instance.email}")
return patterns
# Register patterns
from user_service.models import User
cache_invalidation_manager.register_pattern(User, [user_invalidation_patterns])
# Signal handlers
@receiver([post_save, post_delete], sender=User)
def invalidate_user_cache(sender, instance, **kwargs):
"""Invalidate user-related caches"""
action = 'delete' if kwargs.get('created') is False else 'update'
cache_invalidation_manager.invalidate_for_instance(instance, action)
This comprehensive caching guide provides strategies for optimizing performance across your Django microservices architecture. The techniques covered include application-level caching, distributed caching, performance monitoring, and advanced caching patterns that ensure both performance and data consistency.
Securing Microservices
Security in microservices architecture requires a comprehensive approach covering authentication, authorization, network security, data protection, and monitoring. This section explores security best practices and implementation strategies for Django microservices.
Best Practices
This section consolidates essential best practices for developing, deploying, and maintaining Django microservices. Following these practices ensures scalable, maintainable, and reliable microservices architecture.