Implementing robust security measures in Django authentication and authorization systems is crucial for protecting user data and maintaining application integrity. Understanding and applying security best practices helps prevent common vulnerabilities and ensures your authentication system remains secure against evolving threats.
# Comprehensive password security implementation
class PasswordSecurityManager:
"""Comprehensive password security management"""
@staticmethod
def configure_password_security():
"""Configure Django settings for maximum password security"""
security_settings = {
# Strong password hashing
'PASSWORD_HASHERS': [
'django.contrib.auth.hashers.Argon2PasswordHasher', # Most secure
'django.contrib.auth.hashers.PBKDF2PasswordHasher', # Fallback
'django.contrib.auth.hashers.PBKDF2SHA1PasswordHasher', # Legacy support
'django.contrib.auth.hashers.BCryptSHA256PasswordHasher', # Alternative
],
# Password validation
'AUTH_PASSWORD_VALIDATORS': [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
'OPTIONS': {
'user_attributes': ('username', 'first_name', 'last_name', 'email'),
'max_similarity': 0.7,
}
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
'OPTIONS': {
'min_length': 12, # Increased from default 8
}
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
{
'NAME': 'myapp.validators.CustomPasswordValidator',
},
],
# Argon2 configuration for maximum security
'ARGON2_TIME_COST': 3, # Increased iterations
'ARGON2_MEMORY_COST': 1024, # Increased memory usage (1MB)
'ARGON2_PARALLELISM': 2, # Parallel threads
}
return security_settings
@staticmethod
def implement_password_policies():
"""Implement comprehensive password policies"""
from django.contrib.auth.password_validation import ValidationError
class CustomPasswordValidator:
"""Custom password validator with advanced rules"""
def __init__(self, min_uppercase=1, min_lowercase=1, min_digits=1,
min_special=1, max_consecutive=2, prevent_reuse=5):
self.min_uppercase = min_uppercase
self.min_lowercase = min_lowercase
self.min_digits = min_digits
self.min_special = min_special
self.max_consecutive = max_consecutive
self.prevent_reuse = prevent_reuse
def validate(self, password, user=None):
"""Validate password against custom rules"""
errors = []
# Character type requirements
if len(re.findall(r'[A-Z]', password)) < self.min_uppercase:
errors.append(f"Password must contain at least {self.min_uppercase} uppercase letter(s).")
if len(re.findall(r'[a-z]', password)) < self.min_lowercase:
errors.append(f"Password must contain at least {self.min_lowercase} lowercase letter(s).")
if len(re.findall(r'\d', password)) < self.min_digits:
errors.append(f"Password must contain at least {self.min_digits} digit(s).")
special_chars = re.findall(r'[!@#$%^&*(),.?":{}|<>]', password)
if len(special_chars) < self.min_special:
errors.append(f"Password must contain at least {self.min_special} special character(s).")
# Consecutive character check
if self.has_consecutive_chars(password, self.max_consecutive):
errors.append(f"Password cannot contain more than {self.max_consecutive} consecutive identical characters.")
# Dictionary word check
if self.contains_dictionary_words(password):
errors.append("Password cannot contain common dictionary words.")
# Password reuse check
if user and self.is_password_reused(user, password):
errors.append(f"Password cannot be one of your last {self.prevent_reuse} passwords.")
if errors:
raise ValidationError(errors)
def has_consecutive_chars(self, password, max_consecutive):
"""Check for consecutive identical characters"""
count = 1
for i in range(1, len(password)):
if password[i] == password[i-1]:
count += 1
if count > max_consecutive:
return True
else:
count = 1
return False
def contains_dictionary_words(self, password):
"""Check for common dictionary words"""
# Simple implementation - in production, use a comprehensive word list
common_words = [
'password', 'admin', 'user', 'login', 'welcome',
'qwerty', 'abc123', 'letmein', 'monkey', 'dragon'
]
password_lower = password.lower()
for word in common_words:
if word in password_lower:
return True
return False
def is_password_reused(self, user, password):
"""Check if password was recently used"""
try:
from myapp.models import PasswordHistory
recent_passwords = PasswordHistory.objects.filter(
user=user
).order_by('-created_at')[:self.prevent_reuse]
for old_password in recent_passwords:
if user.check_password(password):
return True
return False
except ImportError:
return False
def get_help_text(self):
"""Return help text for password requirements"""
return (
f"Your password must contain at least {self.min_uppercase} uppercase letter, "
f"{self.min_lowercase} lowercase letter, {self.min_digits} digit, "
f"and {self.min_special} special character. "
f"It cannot contain more than {self.max_consecutive} consecutive identical characters "
f"or common dictionary words."
)
return CustomPasswordValidator
@staticmethod
def implement_password_history():
"""Implement password history tracking"""
class PasswordHistory(models.Model):
"""Track user password history"""
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='password_history')
password_hash = models.CharField(max_length=128)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
ordering = ['-created_at']
indexes = [
models.Index(fields=['user', '-created_at']),
]
def __str__(self):
return f"{self.user.username} - {self.created_at}"
# Signal to save password history
from django.db.models.signals import pre_save
from django.dispatch import receiver
@receiver(pre_save, sender=User)
def save_password_history(sender, instance, **kwargs):
"""Save password to history when changed"""
if instance.pk: # Existing user
try:
old_user = User.objects.get(pk=instance.pk)
# Check if password changed
if old_user.password != instance.password:
# Save old password to history
PasswordHistory.objects.create(
user=old_user,
password_hash=old_user.password
)
# Keep only last 10 passwords
old_passwords = PasswordHistory.objects.filter(
user=old_user
).order_by('-created_at')[10:]
if old_passwords:
PasswordHistory.objects.filter(
id__in=[p.id for p in old_passwords]
).delete()
except User.DoesNotExist:
pass
return PasswordHistory
@staticmethod
def implement_password_expiry():
"""Implement password expiry policies"""
class PasswordExpiryMiddleware:
"""Middleware to enforce password expiry"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
# Check password expiry for authenticated users
if (hasattr(request, 'user') and
request.user.is_authenticated and
self.is_password_expired(request.user)):
# Skip check for password change pages
if not self.is_password_change_page(request):
return self.redirect_to_password_change(request)
response = self.get_response(request)
return response
def is_password_expired(self, user):
"""Check if user's password is expired"""
if hasattr(user, 'profile') and hasattr(user.profile, 'password_changed_at'):
password_age = timezone.now() - user.profile.password_changed_at
# Password expires after 90 days
return password_age.days > 90
# If no password change date, consider it expired for existing users
return user.date_joined < timezone.now() - timedelta(days=90)
def is_password_change_page(self, request):
"""Check if current page is password change related"""
password_change_paths = [
'/accounts/password/change/',
'/accounts/password/set/',
'/accounts/logout/',
]
return any(request.path.startswith(path) for path in password_change_paths)
def redirect_to_password_change(self, request):
"""Redirect to password change page"""
from django.contrib import messages
from django.shortcuts import redirect
messages.warning(
request,
'Your password has expired. Please change your password to continue.'
)
return redirect('password_change')
return PasswordExpiryMiddleware
# Breach detection and monitoring
class PasswordBreachDetection:
"""Detect and handle compromised passwords"""
@staticmethod
def check_password_breach(password):
"""Check password against known breaches using HaveIBeenPwned API"""
import hashlib
import requests
# Hash password with SHA-1
sha1_hash = hashlib.sha1(password.encode('utf-8')).hexdigest().upper()
# Use k-anonymity: send only first 5 characters
prefix = sha1_hash[:5]
suffix = sha1_hash[5:]
try:
# Query HaveIBeenPwned API
response = requests.get(
f"https://api.pwnedpasswords.com/range/{prefix}",
timeout=5,
headers={'User-Agent': 'Django-App-Password-Check'}
)
if response.status_code == 200:
# Check if our suffix appears in results
for line in response.text.splitlines():
hash_suffix, count = line.split(':')
if hash_suffix == suffix:
return True, int(count)
return False, 0
except requests.RequestException:
# If API is unavailable, don't block password change
return None, 0
return False, 0
@staticmethod
def create_breach_validator():
"""Create password validator for breach checking"""
class BreachPasswordValidator:
"""Validator to check passwords against known breaches"""
def validate(self, password, user=None):
"""Validate password against breach database"""
is_breached, count = PasswordBreachDetection.check_password_breach(password)
if is_breached:
if count > 100:
raise ValidationError(
f"This password has been found in {count:,} data breaches. "
"Please choose a different password for your security."
)
elif count > 10:
# Warning for passwords with moderate breach counts
import logging
logger = logging.getLogger('security')
logger.warning(
f"User attempting to use password found in {count} breaches"
)
def get_help_text(self):
return "Password will be checked against known data breaches."
return BreachPasswordValidator
# Comprehensive session security implementation
class SessionSecurityManager:
"""Manage session security comprehensively"""
@staticmethod
def configure_secure_sessions():
"""Configure Django settings for secure sessions"""
session_settings = {
# Session security
'SESSION_COOKIE_SECURE': True, # HTTPS only
'SESSION_COOKIE_HTTPONLY': True, # No JavaScript access
'SESSION_COOKIE_SAMESITE': 'Strict', # CSRF protection
'SESSION_COOKIE_AGE': 3600, # 1 hour (adjust as needed)
'SESSION_EXPIRE_AT_BROWSER_CLOSE': True,
# CSRF protection
'CSRF_COOKIE_SECURE': True,
'CSRF_COOKIE_HTTPONLY': True,
'CSRF_COOKIE_SAMESITE': 'Strict',
'CSRF_FAILURE_VIEW': 'myapp.views.csrf_failure',
# Use database sessions for better security
'SESSION_ENGINE': 'django.contrib.sessions.backends.db',
# Additional security headers
'SECURE_BROWSER_XSS_FILTER': True,
'SECURE_CONTENT_TYPE_NOSNIFF': True,
'SECURE_HSTS_SECONDS': 31536000, # 1 year
'SECURE_HSTS_INCLUDE_SUBDOMAINS': True,
'SECURE_HSTS_PRELOAD': True,
'X_FRAME_OPTIONS': 'DENY',
}
return session_settings
@staticmethod
def implement_session_monitoring():
"""Implement comprehensive session monitoring"""
class SessionMonitoringMiddleware:
"""Monitor and secure user sessions"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
# Monitor session before processing
self.monitor_session_security(request)
response = self.get_response(request)
# Update session after processing
self.update_session_tracking(request, response)
return response
def monitor_session_security(self, request):
"""Monitor session for security issues"""
if not hasattr(request, 'user') or not request.user.is_authenticated:
return
# Check session integrity
if not self.validate_session_integrity(request):
self.terminate_suspicious_session(request)
return
# Check for session hijacking
if self.detect_session_hijacking(request):
self.handle_session_hijacking(request)
return
# Check session age
if self.is_session_too_old(request):
self.handle_expired_session(request)
return
def validate_session_integrity(self, request):
"""Validate session integrity"""
session_data = request.session.get('security_data', {})
# Check IP consistency
stored_ip = session_data.get('ip_address')
current_ip = request.META.get('REMOTE_ADDR')
if stored_ip and stored_ip != current_ip:
return False
# Check user agent consistency
stored_ua = session_data.get('user_agent_hash')
current_ua = request.META.get('HTTP_USER_AGENT', '')
current_ua_hash = hashlib.md5(current_ua.encode()).hexdigest()
if stored_ua and stored_ua != current_ua_hash:
return False
return True
def detect_session_hijacking(self, request):
"""Detect potential session hijacking"""
session_data = request.session.get('security_data', {})
# Check for rapid location changes
last_activity = session_data.get('last_activity')
if last_activity:
last_time = datetime.fromisoformat(last_activity)
time_diff = timezone.now() - last_time
# If less than 1 minute between requests from different IPs
if (time_diff.total_seconds() < 60 and
session_data.get('ip_address') != request.META.get('REMOTE_ADDR')):
return True
# Check for suspicious user agent changes
stored_ua = session_data.get('user_agent')
current_ua = request.META.get('HTTP_USER_AGENT', '')
if (stored_ua and current_ua and
self.calculate_ua_similarity(stored_ua, current_ua) < 0.5):
return True
return False
def calculate_ua_similarity(self, ua1, ua2):
"""Calculate similarity between user agents"""
# Simple similarity calculation
ua1_words = set(ua1.lower().split())
ua2_words = set(ua2.lower().split())
intersection = ua1_words.intersection(ua2_words)
union = ua1_words.union(ua2_words)
return len(intersection) / len(union) if union else 0
def is_session_too_old(self, request):
"""Check if session is too old"""
session_data = request.session.get('security_data', {})
created_at = session_data.get('created_at')
if created_at:
created_time = datetime.fromisoformat(created_at)
session_age = timezone.now() - created_time
# Maximum session age: 24 hours
return session_age.total_seconds() > 86400
return False
def terminate_suspicious_session(self, request):
"""Terminate suspicious session"""
from django.contrib.auth import logout
from django.contrib import messages
# Log security event
self.log_security_event(request, 'session_integrity_violation')
# Logout user
logout(request)
messages.warning(
request,
'Your session was terminated due to security concerns. Please log in again.'
)
def handle_session_hijacking(self, request):
"""Handle detected session hijacking"""
from django.contrib.auth import logout
# Log security incident
self.log_security_event(request, 'session_hijacking_detected')
# Terminate all user sessions
self.terminate_all_user_sessions(request.user)
# Logout current session
logout(request)
# Send security alert email
self.send_security_alert(request.user, 'session_hijacking')
def handle_expired_session(self, request):
"""Handle expired session"""
from django.contrib.auth import logout
from django.contrib import messages
logout(request)
messages.info(
request,
'Your session has expired for security reasons. Please log in again.'
)
def update_session_tracking(self, request, response):
"""Update session tracking data"""
if hasattr(request, 'user') and request.user.is_authenticated:
security_data = request.session.get('security_data', {})
security_data.update({
'ip_address': request.META.get('REMOTE_ADDR'),
'user_agent': request.META.get('HTTP_USER_AGENT', ''),
'user_agent_hash': hashlib.md5(
request.META.get('HTTP_USER_AGENT', '').encode()
).hexdigest(),
'last_activity': timezone.now().isoformat(),
})
# Set created_at if not exists
if 'created_at' not in security_data:
security_data['created_at'] = timezone.now().isoformat()
request.session['security_data'] = security_data
def terminate_all_user_sessions(self, user):
"""Terminate all sessions for a user"""
from django.contrib.sessions.models import Session
terminated_count = 0
for session in Session.objects.filter(expire_date__gt=timezone.now()):
try:
session_data = session.get_decoded()
session_user_id = session_data.get('_auth_user_id')
if session_user_id and int(session_user_id) == user.id:
session.delete()
terminated_count += 1
except:
continue
return terminated_count
def log_security_event(self, request, event_type):
"""Log security events"""
import logging
logger = logging.getLogger('security')
log_data = {
'event_type': event_type,
'user': request.user.username if hasattr(request, 'user') and request.user.is_authenticated else 'anonymous',
'ip_address': request.META.get('REMOTE_ADDR'),
'user_agent': request.META.get('HTTP_USER_AGENT', '')[:200],
'session_key': request.session.session_key,
'timestamp': timezone.now().isoformat(),
}
logger.critical(f"Security event: {log_data}")
def send_security_alert(self, user, alert_type):
"""Send security alert to user"""
from django.core.mail import send_mail
subject = 'Security Alert - Suspicious Activity Detected'
message = f"""
Hello {user.get_full_name() or user.username},
We detected suspicious activity on your account and have taken security measures to protect it.
Alert Type: {alert_type}
Time: {timezone.now()}
If this was you, you can safely ignore this message. If not, please:
1. Change your password immediately
2. Review your account activity
3. Contact support if you need assistance
Best regards,
Security Team
"""
send_mail(
subject,
message,
settings.DEFAULT_FROM_EMAIL,
[user.email],
fail_silently=True,
)
return SessionMonitoringMiddleware
# Comprehensive rate limiting and brute force protection
class BruteForceProtection:
"""Protect against brute force attacks"""
@staticmethod
def implement_rate_limiting():
"""Implement comprehensive rate limiting"""
class RateLimitingMiddleware:
"""Advanced rate limiting middleware"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
# Check rate limits before processing
rate_limit_result = self.check_rate_limits(request)
if not rate_limit_result['allowed']:
return self.create_rate_limit_response(request, rate_limit_result)
response = self.get_response(request)
# Update rate limit counters
self.update_rate_limits(request, response)
# Add rate limit headers
self.add_rate_limit_headers(response, rate_limit_result)
return response
def check_rate_limits(self, request):
"""Check various rate limits"""
# Skip for whitelisted IPs
if self.is_whitelisted_ip(request):
return {'allowed': True, 'reason': 'whitelisted'}
# Check global rate limit
global_limit = self.check_global_rate_limit(request)
if not global_limit['allowed']:
return global_limit
# Check authentication-specific rate limits
if self.is_auth_request(request):
auth_limit = self.check_auth_rate_limit(request)
if not auth_limit['allowed']:
return auth_limit
# Check user-specific rate limits
if hasattr(request, 'user') and request.user.is_authenticated:
user_limit = self.check_user_rate_limit(request)
if not user_limit['allowed']:
return user_limit
return {'allowed': True}
def check_global_rate_limit(self, request):
"""Check global IP-based rate limit"""
from django.core.cache import cache
ip_address = self.get_client_ip(request)
cache_key = f'rate_limit_global_{ip_address}'
current_requests = cache.get(cache_key, 0)
limit = 1000 # 1000 requests per hour
if current_requests >= limit:
return {
'allowed': False,
'reason': 'global_rate_limit',
'limit': limit,
'current': current_requests,
'reset_time': 3600
}
return {
'allowed': True,
'limit': limit,
'current': current_requests,
'reset_time': 3600
}
def check_auth_rate_limit(self, request):
"""Check authentication-specific rate limits"""
from django.core.cache import cache
ip_address = self.get_client_ip(request)
# Failed login attempts
failed_key = f'failed_login_{ip_address}'
failed_attempts = cache.get(failed_key, 0)
if failed_attempts >= 5: # 5 failed attempts
return {
'allowed': False,
'reason': 'auth_rate_limit',
'limit': 5,
'current': failed_attempts,
'reset_time': 900 # 15 minutes
}
# General auth requests
auth_key = f'auth_requests_{ip_address}'
auth_requests = cache.get(auth_key, 0)
if auth_requests >= 20: # 20 auth requests per hour
return {
'allowed': False,
'reason': 'auth_rate_limit',
'limit': 20,
'current': auth_requests,
'reset_time': 3600
}
return {'allowed': True}
def check_user_rate_limit(self, request):
"""Check user-specific rate limits"""
from django.core.cache import cache
user_id = request.user.id
cache_key = f'rate_limit_user_{user_id}'
current_requests = cache.get(cache_key, 0)
# Different limits based on user type
if request.user.is_superuser:
limit = 10000 # Higher limit for superusers
elif request.user.is_staff:
limit = 5000 # Higher limit for staff
else:
limit = 2000 # Regular users
if current_requests >= limit:
return {
'allowed': False,
'reason': 'user_rate_limit',
'limit': limit,
'current': current_requests,
'reset_time': 3600
}
return {
'allowed': True,
'limit': limit,
'current': current_requests,
'reset_time': 3600
}
def is_auth_request(self, request):
"""Check if request is authentication-related"""
auth_paths = [
'/accounts/login/',
'/accounts/signup/',
'/accounts/password/reset/',
'/api/auth/',
]
return any(request.path.startswith(path) for path in auth_paths)
def is_whitelisted_ip(self, request):
"""Check if IP is whitelisted"""
whitelisted_ips = getattr(settings, 'RATE_LIMIT_WHITELIST', [])
client_ip = self.get_client_ip(request)
return client_ip in whitelisted_ips
def get_client_ip(self, request):
"""Get client IP address"""
# Check for forwarded IP (behind proxy)
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
return x_forwarded_for.split(',')[0].strip()
return request.META.get('REMOTE_ADDR')
def update_rate_limits(self, request, response):
"""Update rate limit counters"""
from django.core.cache import cache
ip_address = self.get_client_ip(request)
# Update global counter
global_key = f'rate_limit_global_{ip_address}'
cache.set(global_key, cache.get(global_key, 0) + 1, 3600)
# Update auth counter
if self.is_auth_request(request):
auth_key = f'auth_requests_{ip_address}'
cache.set(auth_key, cache.get(auth_key, 0) + 1, 3600)
# Track failed login attempts
if (request.path.startswith('/accounts/login/') and
response.status_code in [400, 401, 403]):
failed_key = f'failed_login_{ip_address}'
cache.set(failed_key, cache.get(failed_key, 0) + 1, 900)
# Update user counter
if hasattr(request, 'user') and request.user.is_authenticated:
user_key = f'rate_limit_user_{request.user.id}'
cache.set(user_key, cache.get(user_key, 0) + 1, 3600)
def create_rate_limit_response(self, request, rate_limit_result):
"""Create rate limit exceeded response"""
from django.http import HttpResponseTooManyRequests
import json
# Log rate limit violation
self.log_rate_limit_violation(request, rate_limit_result)
# Create response
if request.content_type == 'application/json':
response_data = {
'error': 'Rate limit exceeded',
'reason': rate_limit_result['reason'],
'limit': rate_limit_result.get('limit'),
'reset_in': rate_limit_result.get('reset_time'),
}
response = HttpResponseTooManyRequests(
json.dumps(response_data),
content_type='application/json'
)
else:
response = HttpResponseTooManyRequests(
"Rate limit exceeded. Please try again later."
)
# Add rate limit headers
self.add_rate_limit_headers(response, rate_limit_result)
return response
def add_rate_limit_headers(self, response, rate_limit_result):
"""Add rate limit headers to response"""
if 'limit' in rate_limit_result:
response['X-RateLimit-Limit'] = str(rate_limit_result['limit'])
if 'current' in rate_limit_result:
remaining = max(0, rate_limit_result['limit'] - rate_limit_result['current'])
response['X-RateLimit-Remaining'] = str(remaining)
if 'reset_time' in rate_limit_result:
reset_timestamp = int(timezone.now().timestamp()) + rate_limit_result['reset_time']
response['X-RateLimit-Reset'] = str(reset_timestamp)
def log_rate_limit_violation(self, request, rate_limit_result):
"""Log rate limit violations"""
import logging
logger = logging.getLogger('security')
log_data = {
'event_type': 'rate_limit_exceeded',
'reason': rate_limit_result['reason'],
'ip_address': self.get_client_ip(request),
'user_agent': request.META.get('HTTP_USER_AGENT', '')[:200],
'path': request.path,
'method': request.method,
'user': request.user.username if hasattr(request, 'user') and request.user.is_authenticated else 'anonymous',
'timestamp': timezone.now().isoformat(),
}
logger.warning(f"Rate limit violation: {log_data}")
return RateLimitingMiddleware
@staticmethod
def implement_progressive_delays():
"""Implement progressive delays for repeated failures"""
def calculate_delay(attempt_count):
"""Calculate delay based on attempt count"""
# Progressive delay: 1s, 2s, 4s, 8s, 16s, max 60s
delay = min(2 ** (attempt_count - 1), 60)
return delay
def apply_progressive_delay(request, username):
"""Apply progressive delay for failed login attempts"""
from django.core.cache import cache
import time
cache_key = f'login_attempts_{username}'
attempts = cache.get(cache_key, 0)
if attempts > 0:
delay = calculate_delay(attempts)
# Log the delay
import logging
logger = logging.getLogger('security')
logger.info(f"Applying {delay}s delay for user {username} (attempt {attempts})")
# Apply delay
time.sleep(delay)
# Increment attempt counter
cache.set(cache_key, attempts + 1, 3600) # 1 hour expiry
def clear_progressive_delay(username):
"""Clear progressive delay on successful login"""
from django.core.cache import cache
cache_key = f'login_attempts_{username}'
cache.delete(cache_key)
return {
'calculate_delay': calculate_delay,
'apply_progressive_delay': apply_progressive_delay,
'clear_progressive_delay': clear_progressive_delay
}
# Comprehensive security monitoring and alerting
class SecurityMonitoring:
"""Monitor and alert on security events"""
@staticmethod
def implement_security_logging():
"""Implement comprehensive security logging"""
# Logging configuration
logging_config = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'security': {
'format': '{asctime} {levelname} {name} {message}',
'style': '{',
},
'json': {
'()': 'pythonjsonlogger.jsonlogger.JsonFormatter',
'format': '%(asctime)s %(name)s %(levelname)s %(message)s'
}
},
'handlers': {
'security_file': {
'level': 'INFO',
'class': 'logging.handlers.RotatingFileHandler',
'filename': 'logs/security.log',
'maxBytes': 10485760, # 10MB
'backupCount': 5,
'formatter': 'json',
},
'security_syslog': {
'level': 'WARNING',
'class': 'logging.handlers.SysLogHandler',
'address': ('localhost', 514),
'formatter': 'security',
},
'security_email': {
'level': 'CRITICAL',
'class': 'django.utils.log.AdminEmailHandler',
'formatter': 'security',
}
},
'loggers': {
'security': {
'handlers': ['security_file', 'security_syslog', 'security_email'],
'level': 'INFO',
'propagate': False,
},
'auth_audit': {
'handlers': ['security_file'],
'level': 'INFO',
'propagate': False,
}
}
}
return logging_config
@staticmethod
def implement_anomaly_detection():
"""Implement basic anomaly detection"""
class AnomalyDetector:
"""Detect anomalous authentication patterns"""
@staticmethod
def detect_unusual_login_patterns(user):
"""Detect unusual login patterns for user"""
from django.core.cache import cache
anomalies = []
# Check login frequency
login_key = f'login_frequency_{user.id}'
recent_logins = cache.get(login_key, [])
if len(recent_logins) > 10: # More than 10 logins in last hour
anomalies.append('high_frequency_logins')
# Check login times
current_hour = timezone.now().hour
if current_hour < 6 or current_hour > 22: # Outside normal hours
anomalies.append('unusual_time_login')
# Check geographic anomalies (simplified)
ip_key = f'user_ips_{user.id}'
user_ips = cache.get(ip_key, set())
if len(user_ips) > 5: # More than 5 different IPs
anomalies.append('multiple_ip_addresses')
return anomalies
@staticmethod
def detect_brute_force_patterns(ip_address):
"""Detect brute force attack patterns"""
from django.core.cache import cache
patterns = []
# Check failed login rate
failed_key = f'failed_login_{ip_address}'
failed_attempts = cache.get(failed_key, 0)
if failed_attempts > 3:
patterns.append('multiple_failed_logins')
# Check username enumeration
enum_key = f'username_enum_{ip_address}'
enum_attempts = cache.get(enum_key, 0)
if enum_attempts > 10:
patterns.append('username_enumeration')
return patterns
@staticmethod
def update_user_behavior_baseline(user, request):
"""Update user behavior baseline"""
from django.core.cache import cache
# Track login frequency
login_key = f'login_frequency_{user.id}'
recent_logins = cache.get(login_key, [])
recent_logins.append(timezone.now().isoformat())
# Keep only last hour
one_hour_ago = timezone.now() - timedelta(hours=1)
recent_logins = [
login for login in recent_logins
if datetime.fromisoformat(login.replace('Z', '+00:00')) > one_hour_ago
]
cache.set(login_key, recent_logins, 3600)
# Track IP addresses
ip_key = f'user_ips_{user.id}'
user_ips = cache.get(ip_key, set())
user_ips.add(request.META.get('REMOTE_ADDR'))
cache.set(ip_key, user_ips, 86400) # 24 hours
return AnomalyDetector
@staticmethod
def implement_security_alerting():
"""Implement security alerting system"""
class SecurityAlerting:
"""Handle security alerts and notifications"""
@staticmethod
def send_security_alert(alert_type, details, severity='medium'):
"""Send security alert"""
# Log the alert
import logging
logger = logging.getLogger('security')
log_level = {
'low': logger.info,
'medium': logger.warning,
'high': logger.error,
'critical': logger.critical
}.get(severity, logger.warning)
log_level(f"Security Alert [{alert_type}]: {details}")
# Send email for high/critical alerts
if severity in ['high', 'critical']:
SecurityAlerting.send_alert_email(alert_type, details, severity)
# Send to monitoring system
SecurityAlerting.send_to_monitoring(alert_type, details, severity)
@staticmethod
def send_alert_email(alert_type, details, severity):
"""Send security alert email"""
from django.core.mail import send_mail
subject = f'[SECURITY ALERT - {severity.upper()}] {alert_type}'
message = f"""
Security Alert Detected
Type: {alert_type}
Severity: {severity.upper()}
Time: {timezone.now()}
Details:
{details}
Please investigate immediately.
"""
# Send to security team
security_emails = getattr(settings, 'SECURITY_ALERT_EMAILS', [])
if security_emails:
send_mail(
subject,
message,
settings.DEFAULT_FROM_EMAIL,
security_emails,
fail_silently=False,
)
@staticmethod
def send_to_monitoring(alert_type, details, severity):
"""Send alert to monitoring system"""
# Example integration with monitoring system
try:
import requests
monitoring_url = getattr(settings, 'MONITORING_WEBHOOK_URL', None)
if monitoring_url:
payload = {
'alert_type': alert_type,
'severity': severity,
'details': details,
'timestamp': timezone.now().isoformat(),
'source': 'django_app'
}
requests.post(
monitoring_url,
json=payload,
timeout=5
)
except Exception as e:
# Don't let monitoring failures affect the application
import logging
logger = logging.getLogger('security')
logger.error(f"Failed to send alert to monitoring: {e}")
@staticmethod
def create_security_incident(alert_type, details, severity):
"""Create security incident record"""
try:
from myapp.models import SecurityIncident
incident = SecurityIncident.objects.create(
alert_type=alert_type,
severity=severity,
details=details,
status='open',
created_at=timezone.now()
)
return incident
except ImportError:
# Model doesn't exist
pass
return SecurityAlerting
# Security incident model
class SecurityIncident(models.Model):
"""Model to track security incidents"""
SEVERITY_CHOICES = [
('low', 'Low'),
('medium', 'Medium'),
('high', 'High'),
('critical', 'Critical'),
]
STATUS_CHOICES = [
('open', 'Open'),
('investigating', 'Investigating'),
('resolved', 'Resolved'),
('false_positive', 'False Positive'),
]
alert_type = models.CharField(max_length=100)
severity = models.CharField(max_length=20, choices=SEVERITY_CHOICES)
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='open')
details = models.TextField()
# Investigation fields
assigned_to = models.ForeignKey(
User,
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='assigned_incidents'
)
investigation_notes = models.TextField(blank=True)
resolution = models.TextField(blank=True)
# Timestamps
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
resolved_at = models.DateTimeField(null=True, blank=True)
class Meta:
ordering = ['-created_at']
indexes = [
models.Index(fields=['severity', 'status']),
models.Index(fields=['alert_type', '-created_at']),
]
def __str__(self):
return f"{self.alert_type} - {self.severity} - {self.status}"
def resolve(self, resolution_notes, resolved_by=None):
"""Mark incident as resolved"""
self.status = 'resolved'
self.resolution = resolution_notes
self.resolved_at = timezone.now()
if resolved_by:
self.assigned_to = resolved_by
self.save()
Implementing comprehensive security best practices in Django authentication and authorization systems requires attention to multiple layers of security. By following these practices - from secure password policies and session management to rate limiting and security monitoring - you can build robust authentication systems that protect against common attacks and maintain user trust.
Integrating Social Authentication
Social authentication allows users to log in using their existing accounts from popular platforms like Google, Facebook, Twitter, and GitHub. Implementing social authentication improves user experience by reducing registration friction while maintaining security. Understanding how to integrate and manage social authentication is essential for modern web applications.
Sessions, Cookies, and State Management
Managing user state across HTTP requests is fundamental to building interactive web applications. Django provides robust session management, cookie handling, and state persistence mechanisms that enable you to create seamless user experiences while maintaining security and performance.