Django's middleware system provides powerful hooks for implementing authentication-related functionality that runs on every request. Understanding how to create and configure authentication middleware enables you to implement cross-cutting concerns like session security, user tracking, and custom authentication flows.
# Understanding Django's built-in authentication middleware
from django.contrib.auth.middleware import AuthenticationMiddleware
from django.contrib.auth import get_user
from django.utils.functional import SimpleLazyObject
class AuthenticationMiddlewareAnalysis:
"""Analysis of Django's built-in authentication middleware"""
@staticmethod
def how_authentication_middleware_works():
"""Explain how Django's AuthenticationMiddleware works"""
workflow = {
'process_request': [
"1. Gets session key from request",
"2. Loads user from session using get_user()",
"3. Sets request.user as SimpleLazyObject",
"4. User is loaded lazily when first accessed"
],
'lazy_loading': [
"User object is not loaded from database immediately",
"Only loaded when request.user is accessed",
"Improves performance for requests that don't need user"
],
'session_integration': [
"Relies on SessionMiddleware to provide session",
"Must be placed after SessionMiddleware in MIDDLEWARE",
"Uses session to store user ID and backend"
]
}
return workflow
@staticmethod
def middleware_order_importance():
"""Explain the importance of middleware order"""
correct_order = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware', # After Session
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
dependencies = {
'AuthenticationMiddleware': 'Requires SessionMiddleware',
'MessageMiddleware': 'Requires SessionMiddleware and AuthenticationMiddleware',
'CsrfViewMiddleware': 'Should be before AuthenticationMiddleware'
}
return {
'correct_order': correct_order,
'dependencies': dependencies
}
# Custom authentication middleware examples
class CustomAuthenticationMiddleware:
"""Custom authentication middleware with enhanced features"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
# Process request before view
self.process_request(request)
# Get response from view
response = self.get_response(request)
# Process response after view
self.process_response(request, response)
return response
def process_request(self, request):
"""Process request and set up authentication"""
# Set up user (similar to Django's AuthenticationMiddleware)
request.user = SimpleLazyObject(lambda: self.get_user(request))
# Add custom authentication enhancements
if hasattr(request, 'user') and request.user.is_authenticated:
self.enhance_authenticated_request(request)
def get_user(self, request):
"""Get user from session with custom logic"""
from django.contrib.auth import get_user
# Use Django's built-in get_user function
user = get_user(request)
# Add custom user enhancements
if user.is_authenticated:
self.add_user_enhancements(request, user)
return user
def enhance_authenticated_request(self, request):
"""Add enhancements for authenticated requests"""
# Add user preferences to request
if hasattr(request.user, 'profile'):
request.user_preferences = {
'theme': getattr(request.user.profile, 'theme', 'light'),
'language': getattr(request.user.profile, 'language', 'en'),
'timezone': getattr(request.user.profile, 'timezone', 'UTC'),
}
# Add cached permissions
request.user_permissions = self.get_cached_permissions(request.user)
# Track user activity
self.track_user_activity(request)
def add_user_enhancements(self, request, user):
"""Add enhancements to user object"""
# Add convenience methods
user.get_display_name = lambda: user.get_full_name() or user.username
# Add permission checking shortcuts
user.can = lambda perm: user.has_perm(perm)
user.can_any = lambda perms: any(user.has_perm(p) for p in perms)
user.can_all = lambda perms: all(user.has_perm(p) for p in perms)
def get_cached_permissions(self, user):
"""Get cached user permissions"""
from django.core.cache import cache
cache_key = f'user_permissions_{user.id}'
permissions = cache.get(cache_key)
if permissions is None:
permissions = list(user.get_all_permissions())
cache.set(cache_key, permissions, 300) # Cache for 5 minutes
return set(permissions)
def track_user_activity(self, request):
"""Track user activity for analytics"""
# Update last activity timestamp
request.session['last_activity'] = timezone.now().isoformat()
# Track page views (optional)
if hasattr(request.user, 'profile'):
profile = request.user.profile
profile.last_activity = timezone.now()
profile.save(update_fields=['last_activity'])
def process_response(self, request, response):
"""Process response after view execution"""
# Add security headers for authenticated users
if hasattr(request, 'user') and request.user.is_authenticated:
response['X-User-Authenticated'] = 'true'
return response
# Session security middleware
class SessionSecurityMiddleware:
"""Middleware to enforce session security policies"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
# Validate session security before processing
if not self.validate_session_security(request):
return self.handle_security_violation(request)
response = self.get_response(request)
# Update session security info after processing
self.update_session_security(request, response)
return response
def validate_session_security(self, request):
"""Validate session security"""
if not hasattr(request, 'user') or not request.user.is_authenticated:
return True
session_security = request.session.get('session_security', {})
# Check IP address consistency
if not self.validate_ip_consistency(request, session_security):
return False
# Check user agent consistency
if not self.validate_user_agent_consistency(request, session_security):
return False
# Check session age
if not self.validate_session_age(request, session_security):
return False
# Check for concurrent sessions (if enabled)
if not self.validate_concurrent_sessions(request):
return False
return True
def validate_ip_consistency(self, request, session_security):
"""Validate IP address consistency"""
stored_ip = session_security.get('ip_address')
current_ip = request.META.get('REMOTE_ADDR')
if stored_ip and stored_ip != current_ip:
# Log security event
self.log_security_event(
request, 'ip_change',
f"IP changed from {stored_ip} to {current_ip}"
)
return False
return True
def validate_user_agent_consistency(self, request, session_security):
"""Validate user agent consistency"""
stored_ua_hash = session_security.get('user_agent_hash')
current_ua = request.META.get('HTTP_USER_AGENT', '')
current_ua_hash = hashlib.md5(current_ua.encode()).hexdigest()
if stored_ua_hash and stored_ua_hash != current_ua_hash:
self.log_security_event(
request, 'user_agent_change',
"User agent changed"
)
return False
return True
def validate_session_age(self, request, session_security):
"""Validate session age"""
created_at_str = session_security.get('created_at')
if not created_at_str:
return True
try:
created_at = datetime.fromisoformat(created_at_str.replace('Z', '+00:00'))
session_age = timezone.now() - created_at
# Maximum session age: 24 hours
if session_age > timedelta(hours=24):
self.log_security_event(
request, 'session_expired',
f"Session age: {session_age}"
)
return False
except ValueError:
return False
return True
def validate_concurrent_sessions(self, request):
"""Validate concurrent session limits"""
from django.contrib.sessions.models import Session
from django.conf import settings
max_sessions = getattr(settings, 'MAX_CONCURRENT_SESSIONS', None)
if not max_sessions:
return True
user = request.user
current_session_key = request.session.session_key
# Count active sessions for user
active_sessions = 0
for session in Session.objects.filter(expire_date__gt=timezone.now()):
try:
session_data = session.get_decoded()
session_user_id = session_data.get('_auth_user_id')
if session_user_id and int(session_user_id) == user.id:
active_sessions += 1
except:
continue
if active_sessions > max_sessions:
self.log_security_event(
request, 'concurrent_sessions_exceeded',
f"Active sessions: {active_sessions}, Max: {max_sessions}"
)
return False
return True
def handle_security_violation(self, request):
"""Handle security violation"""
from django.contrib.auth import logout
from django.contrib import messages
from django.shortcuts import redirect
# Log out user
if hasattr(request, 'user') and request.user.is_authenticated:
logout(request)
# Add warning message
messages.warning(
request,
'Your session was terminated for security reasons. Please log in again.'
)
# Redirect to login
return redirect('login')
def update_session_security(self, request, response):
"""Update session security information"""
if hasattr(request, 'user') and request.user.is_authenticated:
session_security = request.session.get('session_security', {})
# Update security info
session_security.update({
'ip_address': request.META.get('REMOTE_ADDR'),
'user_agent_hash': hashlib.md5(
request.META.get('HTTP_USER_AGENT', '').encode()
).hexdigest(),
'last_activity': timezone.now().isoformat(),
})
# Set created_at if not exists
if 'created_at' not in session_security:
session_security['created_at'] = timezone.now().isoformat()
request.session['session_security'] = session_security
def log_security_event(self, request, event_type, details):
"""Log security events"""
import logging
logger = logging.getLogger('security')
log_data = {
'event_type': event_type,
'user': request.user.username if hasattr(request, 'user') and request.user.is_authenticated else 'anonymous',
'ip_address': request.META.get('REMOTE_ADDR'),
'user_agent': request.META.get('HTTP_USER_AGENT', '')[:200],
'details': details,
'timestamp': timezone.now().isoformat(),
}
logger.warning(f"Security event: {log_data}")
# Rate limiting middleware
class RateLimitingMiddleware:
"""Middleware for rate limiting authentication attempts"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
# Check rate limits before processing
if not self.check_rate_limits(request):
return self.rate_limit_exceeded_response(request)
response = self.get_response(request)
# Update rate limit counters after processing
self.update_rate_limits(request, response)
return response
def check_rate_limits(self, request):
"""Check if request is within rate limits"""
# Skip rate limiting for certain paths
if self.should_skip_rate_limiting(request):
return True
# Check IP-based rate limiting
if not self.check_ip_rate_limit(request):
return False
# Check user-based rate limiting (if authenticated)
if hasattr(request, 'user') and request.user.is_authenticated:
if not self.check_user_rate_limit(request):
return False
return True
def should_skip_rate_limiting(self, request):
"""Check if rate limiting should be skipped"""
# Skip for static files
if request.path.startswith('/static/') or request.path.startswith('/media/'):
return True
# Skip for health checks
if request.path in ['/health/', '/ping/']:
return True
# Skip for superusers (optional)
if (hasattr(request, 'user') and
request.user.is_authenticated and
request.user.is_superuser):
return True
return False
def check_ip_rate_limit(self, request):
"""Check IP-based rate limiting"""
from django.core.cache import cache
ip_address = request.META.get('REMOTE_ADDR')
cache_key = f'rate_limit_ip_{ip_address}'
# Get current request count
request_count = cache.get(cache_key, 0)
# Rate limit: 100 requests per minute per IP
if request_count >= 100:
return False
return True
def check_user_rate_limit(self, request):
"""Check user-based rate limiting"""
from django.core.cache import cache
user_id = request.user.id
cache_key = f'rate_limit_user_{user_id}'
# Get current request count
request_count = cache.get(cache_key, 0)
# Rate limit: 1000 requests per hour per user
if request_count >= 1000:
return False
return True
def update_rate_limits(self, request, response):
"""Update rate limit counters"""
from django.core.cache import cache
if self.should_skip_rate_limiting(request):
return
# Update IP-based counter
ip_address = request.META.get('REMOTE_ADDR')
ip_cache_key = f'rate_limit_ip_{ip_address}'
ip_count = cache.get(ip_cache_key, 0) + 1
cache.set(ip_cache_key, ip_count, 60) # 1 minute window
# Update user-based counter
if hasattr(request, 'user') and request.user.is_authenticated:
user_cache_key = f'rate_limit_user_{request.user.id}'
user_count = cache.get(user_cache_key, 0) + 1
cache.set(user_cache_key, user_count, 3600) # 1 hour window
def rate_limit_exceeded_response(self, request):
"""Return response when rate limit is exceeded"""
from django.http import HttpResponseTooManyRequests
return HttpResponseTooManyRequests(
"Rate limit exceeded. Please try again later.",
content_type="text/plain"
)
# User activity tracking middleware
class UserActivityTrackingMiddleware:
"""Middleware to track user activity and behavior"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
# Record request start time
start_time = timezone.now()
response = self.get_response(request)
# Calculate request duration
end_time = timezone.now()
duration = (end_time - start_time).total_seconds()
# Track activity after response
self.track_activity(request, response, duration)
return response
def track_activity(self, request, response, duration):
"""Track user activity"""
# Only track for authenticated users
if not (hasattr(request, 'user') and request.user.is_authenticated):
return
# Skip tracking for certain requests
if self.should_skip_tracking(request):
return
# Create activity record
activity_data = self.create_activity_data(request, response, duration)
# Store activity (async if possible)
self.store_activity(activity_data)
# Update user statistics
self.update_user_statistics(request.user, activity_data)
def should_skip_tracking(self, request):
"""Check if activity tracking should be skipped"""
# Skip for AJAX requests (optional)
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return True
# Skip for API endpoints (optional)
if request.path.startswith('/api/'):
return True
# Skip for static files
if request.path.startswith('/static/') or request.path.startswith('/media/'):
return True
return False
def create_activity_data(self, request, response, duration):
"""Create activity data record"""
return {
'user_id': request.user.id,
'username': request.user.username,
'session_key': request.session.session_key,
'ip_address': request.META.get('REMOTE_ADDR'),
'user_agent': request.META.get('HTTP_USER_AGENT', '')[:500],
'method': request.method,
'path': request.path,
'query_string': request.META.get('QUERY_STRING', ''),
'status_code': response.status_code,
'duration': duration,
'timestamp': timezone.now().isoformat(),
'referer': request.META.get('HTTP_REFERER', ''),
}
def store_activity(self, activity_data):
"""Store activity data"""
# Option 1: Store in database (create UserActivity model)
try:
from myapp.models import UserActivity
UserActivity.objects.create(**activity_data)
except ImportError:
pass
# Option 2: Store in cache for batch processing
from django.core.cache import cache
cache_key = f"user_activity_{activity_data['user_id']}"
activities = cache.get(cache_key, [])
activities.append(activity_data)
# Keep only last 100 activities in cache
if len(activities) > 100:
activities = activities[-100:]
cache.set(cache_key, activities, 3600) # 1 hour
# Option 3: Log to file
import logging
logger = logging.getLogger('user_activity')
logger.info(f"User activity: {activity_data}")
def update_user_statistics(self, user, activity_data):
"""Update user statistics"""
# Update last activity
if hasattr(user, 'profile'):
user.profile.last_activity = timezone.now()
user.profile.save(update_fields=['last_activity'])
# Update session activity
session_key = activity_data['session_key']
if session_key:
from django.core.cache import cache
stats_key = f"session_stats_{session_key}"
stats = cache.get(stats_key, {
'page_views': 0,
'total_duration': 0,
'start_time': timezone.now().isoformat()
})
stats['page_views'] += 1
stats['total_duration'] += activity_data['duration']
stats['last_activity'] = timezone.now().isoformat()
cache.set(stats_key, stats, 3600)
# Example UserActivity model
class UserActivity(models.Model):
"""Model to store user activity"""
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='activities')
session_key = models.CharField(max_length=40, blank=True)
# Request information
ip_address = models.GenericIPAddressField()
user_agent = models.TextField()
method = models.CharField(max_length=10)
path = models.CharField(max_length=500)
query_string = models.TextField(blank=True)
referer = models.URLField(blank=True)
# Response information
status_code = models.IntegerField()
duration = models.FloatField(help_text="Request duration in seconds")
# Metadata
timestamp = models.DateTimeField(auto_now_add=True)
class Meta:
ordering = ['-timestamp']
indexes = [
models.Index(fields=['user', '-timestamp']),
models.Index(fields=['ip_address', '-timestamp']),
models.Index(fields=['session_key', '-timestamp']),
]
def __str__(self):
return f"{self.user.username} - {self.method} {self.path} - {self.timestamp}"
# API authentication middleware
class APIAuthenticationMiddleware:
"""Middleware for API token authentication"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
# Handle API authentication
if self.is_api_request(request):
self.authenticate_api_request(request)
response = self.get_response(request)
# Add API-specific headers
if self.is_api_request(request):
self.add_api_headers(request, response)
return response
def is_api_request(self, request):
"""Check if request is an API request"""
# Check path
if request.path.startswith('/api/'):
return True
# Check Accept header
accept_header = request.META.get('HTTP_ACCEPT', '')
if 'application/json' in accept_header:
return True
# Check for API token in headers
if request.META.get('HTTP_AUTHORIZATION', '').startswith('Bearer '):
return True
return False
def authenticate_api_request(self, request):
"""Authenticate API request"""
# Try different authentication methods
user = None
# Method 1: Bearer token
user = self.authenticate_bearer_token(request)
# Method 2: API key
if not user:
user = self.authenticate_api_key(request)
# Method 3: Session authentication (for web API)
if not user:
user = self.authenticate_session(request)
# Set user or anonymous
if user and user.is_authenticated:
request.user = user
else:
from django.contrib.auth.models import AnonymousUser
request.user = AnonymousUser()
def authenticate_bearer_token(self, request):
"""Authenticate using Bearer token"""
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
if not auth_header.startswith('Bearer '):
return None
token = auth_header.split(' ')[1]
try:
from myapp.models import APIToken
api_token = APIToken.objects.select_related('user').get(
token=token,
is_active=True,
expires_at__gt=timezone.now()
)
# Update last used
api_token.last_used = timezone.now()
api_token.save(update_fields=['last_used'])
return api_token.user
except (ImportError, APIToken.DoesNotExist):
return None
def authenticate_api_key(self, request):
"""Authenticate using API key"""
api_key = request.META.get('HTTP_X_API_KEY')
if not api_key:
return None
try:
from myapp.models import APIKey
api_key_obj = APIKey.objects.select_related('user').get(
key=api_key,
is_active=True
)
return api_key_obj.user
except (ImportError, APIKey.DoesNotExist):
return None
def authenticate_session(self, request):
"""Authenticate using session (for web API)"""
from django.contrib.auth import get_user
return get_user(request)
def add_api_headers(self, request, response):
"""Add API-specific headers"""
# Add CORS headers (if needed)
response['Access-Control-Allow-Origin'] = '*'
response['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE, OPTIONS'
response['Access-Control-Allow-Headers'] = 'Authorization, Content-Type'
# Add API version
response['X-API-Version'] = '1.0'
# Add rate limit headers
if hasattr(request, 'user') and request.user.is_authenticated:
self.add_rate_limit_headers(request, response)
def add_rate_limit_headers(self, request, response):
"""Add rate limit headers"""
from django.core.cache import cache
user_id = request.user.id
cache_key = f'api_rate_limit_user_{user_id}'
# Get current usage
current_usage = cache.get(cache_key, 0)
# Rate limit: 1000 requests per hour
rate_limit = 1000
response['X-RateLimit-Limit'] = str(rate_limit)
response['X-RateLimit-Remaining'] = str(max(0, rate_limit - current_usage))
response['X-RateLimit-Reset'] = str(int(timezone.now().timestamp()) + 3600)
# Example API token model
class APIToken(models.Model):
"""API token for authentication"""
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='api_tokens')
name = models.CharField(max_length=100, help_text="Token name for identification")
token = models.CharField(max_length=64, unique=True)
# Permissions
scopes = models.JSONField(default=list, help_text="List of allowed scopes")
# Status
is_active = models.BooleanField(default=True)
# Timestamps
created_at = models.DateTimeField(auto_now_add=True)
last_used = models.DateTimeField(null=True, blank=True)
expires_at = models.DateTimeField(null=True, blank=True)
class Meta:
ordering = ['-created_at']
def save(self, *args, **kwargs):
if not self.token:
import secrets
self.token = secrets.token_urlsafe(48)
super().save(*args, **kwargs)
def __str__(self):
return f"{self.user.username} - {self.name}"
def is_expired(self):
"""Check if token is expired"""
if self.expires_at:
return timezone.now() > self.expires_at
return False
def has_scope(self, scope):
"""Check if token has specific scope"""
return scope in self.scopes
# Middleware configuration and best practices
class MiddlewareConfiguration:
"""Best practices for middleware configuration"""
@staticmethod
def recommended_middleware_order():
"""Recommended middleware order for authentication"""
middleware = [
# Security middleware (first)
'django.middleware.security.SecurityMiddleware',
# Session middleware (required for auth)
'django.contrib.sessions.middleware.SessionMiddleware',
# Common middleware
'django.middleware.common.CommonMiddleware',
# CSRF middleware (before auth)
'django.middleware.csrf.CsrfViewMiddleware',
# Authentication middleware
'django.contrib.auth.middleware.AuthenticationMiddleware',
# Custom authentication middleware
'myapp.middleware.CustomAuthenticationMiddleware',
'myapp.middleware.SessionSecurityMiddleware',
'myapp.middleware.RateLimitingMiddleware',
'myapp.middleware.UserActivityTrackingMiddleware',
'myapp.middleware.APIAuthenticationMiddleware',
# Messages middleware (after auth)
'django.contrib.messages.middleware.MessageMiddleware',
# Clickjacking protection (last)
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
return middleware
@staticmethod
def middleware_performance_tips():
"""Performance tips for authentication middleware"""
tips = {
'lazy_loading': [
"Use SimpleLazyObject for expensive operations",
"Don't load user data unless needed",
"Cache frequently accessed data"
],
'database_optimization': [
"Use select_related for user profile data",
"Implement proper database indexes",
"Consider read replicas for heavy read operations"
],
'caching_strategies': [
"Cache user permissions",
"Cache user preferences",
"Use Redis for session storage"
],
'conditional_processing': [
"Skip processing for static files",
"Use early returns for unauthenticated users",
"Implement feature flags for optional features"
]
}
return tips
@staticmethod
def security_considerations():
"""Security considerations for authentication middleware"""
considerations = {
'session_security': [
"Validate session integrity",
"Check for session hijacking",
"Implement session timeout"
],
'rate_limiting': [
"Implement per-IP rate limiting",
"Implement per-user rate limiting",
"Use progressive delays for repeated failures"
],
'logging_and_monitoring': [
"Log security events",
"Monitor for suspicious activity",
"Implement alerting for security violations"
],
'data_protection': [
"Don't log sensitive data",
"Encrypt session data if needed",
"Implement proper data retention policies"
]
}
return considerations
# Middleware testing utilities
class MiddlewareTestCase(TestCase):
"""Base test case for middleware testing"""
def setUp(self):
self.factory = RequestFactory()
self.user = User.objects.create_user(
username='testuser',
email='test@example.com',
password='testpass123'
)
def create_request(self, path='/', method='GET', user=None, **kwargs):
"""Create a test request"""
request = getattr(self.factory, method.lower())(path, **kwargs)
# Add session
from django.contrib.sessions.middleware import SessionMiddleware
middleware = SessionMiddleware(lambda r: None)
middleware.process_request(request)
request.session.save()
# Add user
if user:
request.user = user
else:
from django.contrib.auth.models import AnonymousUser
request.user = AnonymousUser()
return request
def test_middleware_with_authenticated_user(self):
"""Test middleware with authenticated user"""
request = self.create_request(user=self.user)
# Test your middleware here
middleware = CustomAuthenticationMiddleware(lambda r: HttpResponse())
response = middleware(request)
self.assertEqual(response.status_code, 200)
def test_middleware_with_anonymous_user(self):
"""Test middleware with anonymous user"""
request = self.create_request()
# Test your middleware here
middleware = CustomAuthenticationMiddleware(lambda r: HttpResponse())
response = middleware(request)
self.assertEqual(response.status_code, 200)
Authentication middleware provides powerful capabilities for implementing cross-cutting authentication concerns. By understanding how to create, configure, and optimize authentication middleware, you can build robust authentication systems that handle security, performance, and user experience requirements effectively.
Custom User Models
Django's default User model works well for many applications, but often you'll need to customize user authentication to fit your specific requirements. Understanding how to create and implement custom user models enables you to build authentication systems tailored to your application's unique needs.
Authorization in Views and Templates
Implementing proper authorization in Django views and templates ensures that users can only access resources and perform actions they're permitted to. Understanding how to apply authorization at different levels - from view-level access control to fine-grained template permissions - is crucial for building secure applications.