Authentication and Authorization

Security Best Practices

Implementing robust security measures in Django authentication and authorization systems is crucial for protecting user data and maintaining application integrity. Understanding and applying security best practices helps prevent common vulnerabilities and ensures your authentication system remains secure against evolving threats.

Security Best Practices

Implementing robust security measures in Django authentication and authorization systems is crucial for protecting user data and maintaining application integrity. Understanding and applying security best practices helps prevent common vulnerabilities and ensures your authentication system remains secure against evolving threats.

Authentication Security

Password Security Best Practices

# Comprehensive password security implementation
class PasswordSecurityManager:
    """Comprehensive password security management"""
    
    @staticmethod
    def configure_password_security():
        """Configure Django settings for maximum password security"""
        
        security_settings = {
            # Strong password hashing
            'PASSWORD_HASHERS': [
                'django.contrib.auth.hashers.Argon2PasswordHasher',      # Most secure
                'django.contrib.auth.hashers.PBKDF2PasswordHasher',      # Fallback
                'django.contrib.auth.hashers.PBKDF2SHA1PasswordHasher',  # Legacy support
                'django.contrib.auth.hashers.BCryptSHA256PasswordHasher', # Alternative
            ],
            
            # Password validation
            'AUTH_PASSWORD_VALIDATORS': [
                {
                    'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
                    'OPTIONS': {
                        'user_attributes': ('username', 'first_name', 'last_name', 'email'),
                        'max_similarity': 0.7,
                    }
                },
                {
                    'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
                    'OPTIONS': {
                        'min_length': 12,  # Increased from default 8
                    }
                },
                {
                    'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
                },
                {
                    'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
                },
                {
                    'NAME': 'myapp.validators.CustomPasswordValidator',
                },
            ],
            
            # Argon2 configuration for maximum security
            'ARGON2_TIME_COST': 3,      # Increased iterations
            'ARGON2_MEMORY_COST': 1024, # Increased memory usage (1MB)
            'ARGON2_PARALLELISM': 2,    # Parallel threads
        }
        
        return security_settings
    
    @staticmethod
    def implement_password_policies():
        """Implement comprehensive password policies"""
        
        from django.contrib.auth.password_validation import ValidationError
        
        class CustomPasswordValidator:
            """Custom password validator with advanced rules"""
            
            def __init__(self, min_uppercase=1, min_lowercase=1, min_digits=1, 
                         min_special=1, max_consecutive=2, prevent_reuse=5):
                self.min_uppercase = min_uppercase
                self.min_lowercase = min_lowercase
                self.min_digits = min_digits
                self.min_special = min_special
                self.max_consecutive = max_consecutive
                self.prevent_reuse = prevent_reuse
            
            def validate(self, password, user=None):
                """Validate password against custom rules"""
                
                errors = []
                
                # Character type requirements
                if len(re.findall(r'[A-Z]', password)) < self.min_uppercase:
                    errors.append(f"Password must contain at least {self.min_uppercase} uppercase letter(s).")
                
                if len(re.findall(r'[a-z]', password)) < self.min_lowercase:
                    errors.append(f"Password must contain at least {self.min_lowercase} lowercase letter(s).")
                
                if len(re.findall(r'\d', password)) < self.min_digits:
                    errors.append(f"Password must contain at least {self.min_digits} digit(s).")
                
                special_chars = re.findall(r'[!@#$%^&*(),.?":{}|<>]', password)
                if len(special_chars) < self.min_special:
                    errors.append(f"Password must contain at least {self.min_special} special character(s).")
                
                # Consecutive character check
                if self.has_consecutive_chars(password, self.max_consecutive):
                    errors.append(f"Password cannot contain more than {self.max_consecutive} consecutive identical characters.")
                
                # Dictionary word check
                if self.contains_dictionary_words(password):
                    errors.append("Password cannot contain common dictionary words.")
                
                # Password reuse check
                if user and self.is_password_reused(user, password):
                    errors.append(f"Password cannot be one of your last {self.prevent_reuse} passwords.")
                
                if errors:
                    raise ValidationError(errors)
            
            def has_consecutive_chars(self, password, max_consecutive):
                """Check for consecutive identical characters"""
                
                count = 1
                for i in range(1, len(password)):
                    if password[i] == password[i-1]:
                        count += 1
                        if count > max_consecutive:
                            return True
                    else:
                        count = 1
                
                return False
            
            def contains_dictionary_words(self, password):
                """Check for common dictionary words"""
                
                # Simple implementation - in production, use a comprehensive word list
                common_words = [
                    'password', 'admin', 'user', 'login', 'welcome',
                    'qwerty', 'abc123', 'letmein', 'monkey', 'dragon'
                ]
                
                password_lower = password.lower()
                
                for word in common_words:
                    if word in password_lower:
                        return True
                
                return False
            
            def is_password_reused(self, user, password):
                """Check if password was recently used"""
                
                try:
                    from myapp.models import PasswordHistory
                    
                    recent_passwords = PasswordHistory.objects.filter(
                        user=user
                    ).order_by('-created_at')[:self.prevent_reuse]
                    
                    for old_password in recent_passwords:
                        if user.check_password(password):
                            return True
                    
                    return False
                
                except ImportError:
                    return False
            
            def get_help_text(self):
                """Return help text for password requirements"""
                
                return (
                    f"Your password must contain at least {self.min_uppercase} uppercase letter, "
                    f"{self.min_lowercase} lowercase letter, {self.min_digits} digit, "
                    f"and {self.min_special} special character. "
                    f"It cannot contain more than {self.max_consecutive} consecutive identical characters "
                    f"or common dictionary words."
                )
        
        return CustomPasswordValidator
    
    @staticmethod
    def implement_password_history():
        """Implement password history tracking"""
        
        class PasswordHistory(models.Model):
            """Track user password history"""
            
            user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='password_history')
            password_hash = models.CharField(max_length=128)
            created_at = models.DateTimeField(auto_now_add=True)
            
            class Meta:
                ordering = ['-created_at']
                indexes = [
                    models.Index(fields=['user', '-created_at']),
                ]
            
            def __str__(self):
                return f"{self.user.username} - {self.created_at}"
        
        # Signal to save password history
        from django.db.models.signals import pre_save
        from django.dispatch import receiver
        
        @receiver(pre_save, sender=User)
        def save_password_history(sender, instance, **kwargs):
            """Save password to history when changed"""
            
            if instance.pk:  # Existing user
                try:
                    old_user = User.objects.get(pk=instance.pk)
                    
                    # Check if password changed
                    if old_user.password != instance.password:
                        # Save old password to history
                        PasswordHistory.objects.create(
                            user=old_user,
                            password_hash=old_user.password
                        )
                        
                        # Keep only last 10 passwords
                        old_passwords = PasswordHistory.objects.filter(
                            user=old_user
                        ).order_by('-created_at')[10:]
                        
                        if old_passwords:
                            PasswordHistory.objects.filter(
                                id__in=[p.id for p in old_passwords]
                            ).delete()
                
                except User.DoesNotExist:
                    pass
        
        return PasswordHistory
    
    @staticmethod
    def implement_password_expiry():
        """Implement password expiry policies"""
        
        class PasswordExpiryMiddleware:
            """Middleware to enforce password expiry"""
            
            def __init__(self, get_response):
                self.get_response = get_response
            
            def __call__(self, request):
                # Check password expiry for authenticated users
                if (hasattr(request, 'user') and 
                    request.user.is_authenticated and 
                    self.is_password_expired(request.user)):
                    
                    # Skip check for password change pages
                    if not self.is_password_change_page(request):
                        return self.redirect_to_password_change(request)
                
                response = self.get_response(request)
                return response
            
            def is_password_expired(self, user):
                """Check if user's password is expired"""
                
                if hasattr(user, 'profile') and hasattr(user.profile, 'password_changed_at'):
                    password_age = timezone.now() - user.profile.password_changed_at
                    
                    # Password expires after 90 days
                    return password_age.days > 90
                
                # If no password change date, consider it expired for existing users
                return user.date_joined < timezone.now() - timedelta(days=90)
            
            def is_password_change_page(self, request):
                """Check if current page is password change related"""
                
                password_change_paths = [
                    '/accounts/password/change/',
                    '/accounts/password/set/',
                    '/accounts/logout/',
                ]
                
                return any(request.path.startswith(path) for path in password_change_paths)
            
            def redirect_to_password_change(self, request):
                """Redirect to password change page"""
                
                from django.contrib import messages
                from django.shortcuts import redirect
                
                messages.warning(
                    request,
                    'Your password has expired. Please change your password to continue.'
                )
                
                return redirect('password_change')
        
        return PasswordExpiryMiddleware

# Breach detection and monitoring
class PasswordBreachDetection:
    """Detect and handle compromised passwords"""
    
    @staticmethod
    def check_password_breach(password):
        """Check password against known breaches using HaveIBeenPwned API"""
        
        import hashlib
        import requests
        
        # Hash password with SHA-1
        sha1_hash = hashlib.sha1(password.encode('utf-8')).hexdigest().upper()
        
        # Use k-anonymity: send only first 5 characters
        prefix = sha1_hash[:5]
        suffix = sha1_hash[5:]
        
        try:
            # Query HaveIBeenPwned API
            response = requests.get(
                f"https://api.pwnedpasswords.com/range/{prefix}",
                timeout=5,
                headers={'User-Agent': 'Django-App-Password-Check'}
            )
            
            if response.status_code == 200:
                # Check if our suffix appears in results
                for line in response.text.splitlines():
                    hash_suffix, count = line.split(':')
                    if hash_suffix == suffix:
                        return True, int(count)
                
                return False, 0
            
        except requests.RequestException:
            # If API is unavailable, don't block password change
            return None, 0
        
        return False, 0
    
    @staticmethod
    def create_breach_validator():
        """Create password validator for breach checking"""
        
        class BreachPasswordValidator:
            """Validator to check passwords against known breaches"""
            
            def validate(self, password, user=None):
                """Validate password against breach database"""
                
                is_breached, count = PasswordBreachDetection.check_password_breach(password)
                
                if is_breached:
                    if count > 100:
                        raise ValidationError(
                            f"This password has been found in {count:,} data breaches. "
                            "Please choose a different password for your security."
                        )
                    elif count > 10:
                        # Warning for passwords with moderate breach counts
                        import logging
                        logger = logging.getLogger('security')
                        logger.warning(
                            f"User attempting to use password found in {count} breaches"
                        )
            
            def get_help_text(self):
                return "Password will be checked against known data breaches."
        
        return BreachPasswordValidator

Session Security

# Comprehensive session security implementation
class SessionSecurityManager:
    """Manage session security comprehensively"""
    
    @staticmethod
    def configure_secure_sessions():
        """Configure Django settings for secure sessions"""
        
        session_settings = {
            # Session security
            'SESSION_COOKIE_SECURE': True,  # HTTPS only
            'SESSION_COOKIE_HTTPONLY': True,  # No JavaScript access
            'SESSION_COOKIE_SAMESITE': 'Strict',  # CSRF protection
            'SESSION_COOKIE_AGE': 3600,  # 1 hour (adjust as needed)
            'SESSION_EXPIRE_AT_BROWSER_CLOSE': True,
            
            # CSRF protection
            'CSRF_COOKIE_SECURE': True,
            'CSRF_COOKIE_HTTPONLY': True,
            'CSRF_COOKIE_SAMESITE': 'Strict',
            'CSRF_FAILURE_VIEW': 'myapp.views.csrf_failure',
            
            # Use database sessions for better security
            'SESSION_ENGINE': 'django.contrib.sessions.backends.db',
            
            # Additional security headers
            'SECURE_BROWSER_XSS_FILTER': True,
            'SECURE_CONTENT_TYPE_NOSNIFF': True,
            'SECURE_HSTS_SECONDS': 31536000,  # 1 year
            'SECURE_HSTS_INCLUDE_SUBDOMAINS': True,
            'SECURE_HSTS_PRELOAD': True,
            'X_FRAME_OPTIONS': 'DENY',
        }
        
        return session_settings
    
    @staticmethod
    def implement_session_monitoring():
        """Implement comprehensive session monitoring"""
        
        class SessionMonitoringMiddleware:
            """Monitor and secure user sessions"""
            
            def __init__(self, get_response):
                self.get_response = get_response
            
            def __call__(self, request):
                # Monitor session before processing
                self.monitor_session_security(request)
                
                response = self.get_response(request)
                
                # Update session after processing
                self.update_session_tracking(request, response)
                
                return response
            
            def monitor_session_security(self, request):
                """Monitor session for security issues"""
                
                if not hasattr(request, 'user') or not request.user.is_authenticated:
                    return
                
                # Check session integrity
                if not self.validate_session_integrity(request):
                    self.terminate_suspicious_session(request)
                    return
                
                # Check for session hijacking
                if self.detect_session_hijacking(request):
                    self.handle_session_hijacking(request)
                    return
                
                # Check session age
                if self.is_session_too_old(request):
                    self.handle_expired_session(request)
                    return
            
            def validate_session_integrity(self, request):
                """Validate session integrity"""
                
                session_data = request.session.get('security_data', {})
                
                # Check IP consistency
                stored_ip = session_data.get('ip_address')
                current_ip = request.META.get('REMOTE_ADDR')
                
                if stored_ip and stored_ip != current_ip:
                    return False
                
                # Check user agent consistency
                stored_ua = session_data.get('user_agent_hash')
                current_ua = request.META.get('HTTP_USER_AGENT', '')
                current_ua_hash = hashlib.md5(current_ua.encode()).hexdigest()
                
                if stored_ua and stored_ua != current_ua_hash:
                    return False
                
                return True
            
            def detect_session_hijacking(self, request):
                """Detect potential session hijacking"""
                
                session_data = request.session.get('security_data', {})
                
                # Check for rapid location changes
                last_activity = session_data.get('last_activity')
                if last_activity:
                    last_time = datetime.fromisoformat(last_activity)
                    time_diff = timezone.now() - last_time
                    
                    # If less than 1 minute between requests from different IPs
                    if (time_diff.total_seconds() < 60 and 
                        session_data.get('ip_address') != request.META.get('REMOTE_ADDR')):
                        return True
                
                # Check for suspicious user agent changes
                stored_ua = session_data.get('user_agent')
                current_ua = request.META.get('HTTP_USER_AGENT', '')
                
                if (stored_ua and current_ua and 
                    self.calculate_ua_similarity(stored_ua, current_ua) < 0.5):
                    return True
                
                return False
            
            def calculate_ua_similarity(self, ua1, ua2):
                """Calculate similarity between user agents"""
                
                # Simple similarity calculation
                ua1_words = set(ua1.lower().split())
                ua2_words = set(ua2.lower().split())
                
                intersection = ua1_words.intersection(ua2_words)
                union = ua1_words.union(ua2_words)
                
                return len(intersection) / len(union) if union else 0
            
            def is_session_too_old(self, request):
                """Check if session is too old"""
                
                session_data = request.session.get('security_data', {})
                created_at = session_data.get('created_at')
                
                if created_at:
                    created_time = datetime.fromisoformat(created_at)
                    session_age = timezone.now() - created_time
                    
                    # Maximum session age: 24 hours
                    return session_age.total_seconds() > 86400
                
                return False
            
            def terminate_suspicious_session(self, request):
                """Terminate suspicious session"""
                
                from django.contrib.auth import logout
                from django.contrib import messages
                
                # Log security event
                self.log_security_event(request, 'session_integrity_violation')
                
                # Logout user
                logout(request)
                
                messages.warning(
                    request,
                    'Your session was terminated due to security concerns. Please log in again.'
                )
            
            def handle_session_hijacking(self, request):
                """Handle detected session hijacking"""
                
                from django.contrib.auth import logout
                
                # Log security incident
                self.log_security_event(request, 'session_hijacking_detected')
                
                # Terminate all user sessions
                self.terminate_all_user_sessions(request.user)
                
                # Logout current session
                logout(request)
                
                # Send security alert email
                self.send_security_alert(request.user, 'session_hijacking')
            
            def handle_expired_session(self, request):
                """Handle expired session"""
                
                from django.contrib.auth import logout
                from django.contrib import messages
                
                logout(request)
                
                messages.info(
                    request,
                    'Your session has expired for security reasons. Please log in again.'
                )
            
            def update_session_tracking(self, request, response):
                """Update session tracking data"""
                
                if hasattr(request, 'user') and request.user.is_authenticated:
                    security_data = request.session.get('security_data', {})
                    
                    security_data.update({
                        'ip_address': request.META.get('REMOTE_ADDR'),
                        'user_agent': request.META.get('HTTP_USER_AGENT', ''),
                        'user_agent_hash': hashlib.md5(
                            request.META.get('HTTP_USER_AGENT', '').encode()
                        ).hexdigest(),
                        'last_activity': timezone.now().isoformat(),
                    })
                    
                    # Set created_at if not exists
                    if 'created_at' not in security_data:
                        security_data['created_at'] = timezone.now().isoformat()
                    
                    request.session['security_data'] = security_data
            
            def terminate_all_user_sessions(self, user):
                """Terminate all sessions for a user"""
                
                from django.contrib.sessions.models import Session
                
                terminated_count = 0
                
                for session in Session.objects.filter(expire_date__gt=timezone.now()):
                    try:
                        session_data = session.get_decoded()
                        session_user_id = session_data.get('_auth_user_id')
                        
                        if session_user_id and int(session_user_id) == user.id:
                            session.delete()
                            terminated_count += 1
                    except:
                        continue
                
                return terminated_count
            
            def log_security_event(self, request, event_type):
                """Log security events"""
                
                import logging
                
                logger = logging.getLogger('security')
                
                log_data = {
                    'event_type': event_type,
                    'user': request.user.username if hasattr(request, 'user') and request.user.is_authenticated else 'anonymous',
                    'ip_address': request.META.get('REMOTE_ADDR'),
                    'user_agent': request.META.get('HTTP_USER_AGENT', '')[:200],
                    'session_key': request.session.session_key,
                    'timestamp': timezone.now().isoformat(),
                }
                
                logger.critical(f"Security event: {log_data}")
            
            def send_security_alert(self, user, alert_type):
                """Send security alert to user"""
                
                from django.core.mail import send_mail
                
                subject = 'Security Alert - Suspicious Activity Detected'
                message = f"""
                Hello {user.get_full_name() or user.username},
                
                We detected suspicious activity on your account and have taken security measures to protect it.
                
                Alert Type: {alert_type}
                Time: {timezone.now()}
                
                If this was you, you can safely ignore this message. If not, please:
                1. Change your password immediately
                2. Review your account activity
                3. Contact support if you need assistance
                
                Best regards,
                Security Team
                """
                
                send_mail(
                    subject,
                    message,
                    settings.DEFAULT_FROM_EMAIL,
                    [user.email],
                    fail_silently=True,
                )
        
        return SessionMonitoringMiddleware

Rate Limiting and Brute Force Protection

# Comprehensive rate limiting and brute force protection
class BruteForceProtection:
    """Protect against brute force attacks"""
    
    @staticmethod
    def implement_rate_limiting():
        """Implement comprehensive rate limiting"""
        
        class RateLimitingMiddleware:
            """Advanced rate limiting middleware"""
            
            def __init__(self, get_response):
                self.get_response = get_response
            
            def __call__(self, request):
                # Check rate limits before processing
                rate_limit_result = self.check_rate_limits(request)
                
                if not rate_limit_result['allowed']:
                    return self.create_rate_limit_response(request, rate_limit_result)
                
                response = self.get_response(request)
                
                # Update rate limit counters
                self.update_rate_limits(request, response)
                
                # Add rate limit headers
                self.add_rate_limit_headers(response, rate_limit_result)
                
                return response
            
            def check_rate_limits(self, request):
                """Check various rate limits"""
                
                # Skip for whitelisted IPs
                if self.is_whitelisted_ip(request):
                    return {'allowed': True, 'reason': 'whitelisted'}
                
                # Check global rate limit
                global_limit = self.check_global_rate_limit(request)
                if not global_limit['allowed']:
                    return global_limit
                
                # Check authentication-specific rate limits
                if self.is_auth_request(request):
                    auth_limit = self.check_auth_rate_limit(request)
                    if not auth_limit['allowed']:
                        return auth_limit
                
                # Check user-specific rate limits
                if hasattr(request, 'user') and request.user.is_authenticated:
                    user_limit = self.check_user_rate_limit(request)
                    if not user_limit['allowed']:
                        return user_limit
                
                return {'allowed': True}
            
            def check_global_rate_limit(self, request):
                """Check global IP-based rate limit"""
                
                from django.core.cache import cache
                
                ip_address = self.get_client_ip(request)
                cache_key = f'rate_limit_global_{ip_address}'
                
                current_requests = cache.get(cache_key, 0)
                limit = 1000  # 1000 requests per hour
                
                if current_requests >= limit:
                    return {
                        'allowed': False,
                        'reason': 'global_rate_limit',
                        'limit': limit,
                        'current': current_requests,
                        'reset_time': 3600
                    }
                
                return {
                    'allowed': True,
                    'limit': limit,
                    'current': current_requests,
                    'reset_time': 3600
                }
            
            def check_auth_rate_limit(self, request):
                """Check authentication-specific rate limits"""
                
                from django.core.cache import cache
                
                ip_address = self.get_client_ip(request)
                
                # Failed login attempts
                failed_key = f'failed_login_{ip_address}'
                failed_attempts = cache.get(failed_key, 0)
                
                if failed_attempts >= 5:  # 5 failed attempts
                    return {
                        'allowed': False,
                        'reason': 'auth_rate_limit',
                        'limit': 5,
                        'current': failed_attempts,
                        'reset_time': 900  # 15 minutes
                    }
                
                # General auth requests
                auth_key = f'auth_requests_{ip_address}'
                auth_requests = cache.get(auth_key, 0)
                
                if auth_requests >= 20:  # 20 auth requests per hour
                    return {
                        'allowed': False,
                        'reason': 'auth_rate_limit',
                        'limit': 20,
                        'current': auth_requests,
                        'reset_time': 3600
                    }
                
                return {'allowed': True}
            
            def check_user_rate_limit(self, request):
                """Check user-specific rate limits"""
                
                from django.core.cache import cache
                
                user_id = request.user.id
                cache_key = f'rate_limit_user_{user_id}'
                
                current_requests = cache.get(cache_key, 0)
                
                # Different limits based on user type
                if request.user.is_superuser:
                    limit = 10000  # Higher limit for superusers
                elif request.user.is_staff:
                    limit = 5000   # Higher limit for staff
                else:
                    limit = 2000   # Regular users
                
                if current_requests >= limit:
                    return {
                        'allowed': False,
                        'reason': 'user_rate_limit',
                        'limit': limit,
                        'current': current_requests,
                        'reset_time': 3600
                    }
                
                return {
                    'allowed': True,
                    'limit': limit,
                    'current': current_requests,
                    'reset_time': 3600
                }
            
            def is_auth_request(self, request):
                """Check if request is authentication-related"""
                
                auth_paths = [
                    '/accounts/login/',
                    '/accounts/signup/',
                    '/accounts/password/reset/',
                    '/api/auth/',
                ]
                
                return any(request.path.startswith(path) for path in auth_paths)
            
            def is_whitelisted_ip(self, request):
                """Check if IP is whitelisted"""
                
                whitelisted_ips = getattr(settings, 'RATE_LIMIT_WHITELIST', [])
                client_ip = self.get_client_ip(request)
                
                return client_ip in whitelisted_ips
            
            def get_client_ip(self, request):
                """Get client IP address"""
                
                # Check for forwarded IP (behind proxy)
                x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
                if x_forwarded_for:
                    return x_forwarded_for.split(',')[0].strip()
                
                return request.META.get('REMOTE_ADDR')
            
            def update_rate_limits(self, request, response):
                """Update rate limit counters"""
                
                from django.core.cache import cache
                
                ip_address = self.get_client_ip(request)
                
                # Update global counter
                global_key = f'rate_limit_global_{ip_address}'
                cache.set(global_key, cache.get(global_key, 0) + 1, 3600)
                
                # Update auth counter
                if self.is_auth_request(request):
                    auth_key = f'auth_requests_{ip_address}'
                    cache.set(auth_key, cache.get(auth_key, 0) + 1, 3600)
                    
                    # Track failed login attempts
                    if (request.path.startswith('/accounts/login/') and 
                        response.status_code in [400, 401, 403]):
                        failed_key = f'failed_login_{ip_address}'
                        cache.set(failed_key, cache.get(failed_key, 0) + 1, 900)
                
                # Update user counter
                if hasattr(request, 'user') and request.user.is_authenticated:
                    user_key = f'rate_limit_user_{request.user.id}'
                    cache.set(user_key, cache.get(user_key, 0) + 1, 3600)
            
            def create_rate_limit_response(self, request, rate_limit_result):
                """Create rate limit exceeded response"""
                
                from django.http import HttpResponseTooManyRequests
                import json
                
                # Log rate limit violation
                self.log_rate_limit_violation(request, rate_limit_result)
                
                # Create response
                if request.content_type == 'application/json':
                    response_data = {
                        'error': 'Rate limit exceeded',
                        'reason': rate_limit_result['reason'],
                        'limit': rate_limit_result.get('limit'),
                        'reset_in': rate_limit_result.get('reset_time'),
                    }
                    
                    response = HttpResponseTooManyRequests(
                        json.dumps(response_data),
                        content_type='application/json'
                    )
                else:
                    response = HttpResponseTooManyRequests(
                        "Rate limit exceeded. Please try again later."
                    )
                
                # Add rate limit headers
                self.add_rate_limit_headers(response, rate_limit_result)
                
                return response
            
            def add_rate_limit_headers(self, response, rate_limit_result):
                """Add rate limit headers to response"""
                
                if 'limit' in rate_limit_result:
                    response['X-RateLimit-Limit'] = str(rate_limit_result['limit'])
                
                if 'current' in rate_limit_result:
                    remaining = max(0, rate_limit_result['limit'] - rate_limit_result['current'])
                    response['X-RateLimit-Remaining'] = str(remaining)
                
                if 'reset_time' in rate_limit_result:
                    reset_timestamp = int(timezone.now().timestamp()) + rate_limit_result['reset_time']
                    response['X-RateLimit-Reset'] = str(reset_timestamp)
            
            def log_rate_limit_violation(self, request, rate_limit_result):
                """Log rate limit violations"""
                
                import logging
                
                logger = logging.getLogger('security')
                
                log_data = {
                    'event_type': 'rate_limit_exceeded',
                    'reason': rate_limit_result['reason'],
                    'ip_address': self.get_client_ip(request),
                    'user_agent': request.META.get('HTTP_USER_AGENT', '')[:200],
                    'path': request.path,
                    'method': request.method,
                    'user': request.user.username if hasattr(request, 'user') and request.user.is_authenticated else 'anonymous',
                    'timestamp': timezone.now().isoformat(),
                }
                
                logger.warning(f"Rate limit violation: {log_data}")
        
        return RateLimitingMiddleware
    
    @staticmethod
    def implement_progressive_delays():
        """Implement progressive delays for repeated failures"""
        
        def calculate_delay(attempt_count):
            """Calculate delay based on attempt count"""
            
            # Progressive delay: 1s, 2s, 4s, 8s, 16s, max 60s
            delay = min(2 ** (attempt_count - 1), 60)
            return delay
        
        def apply_progressive_delay(request, username):
            """Apply progressive delay for failed login attempts"""
            
            from django.core.cache import cache
            import time
            
            cache_key = f'login_attempts_{username}'
            attempts = cache.get(cache_key, 0)
            
            if attempts > 0:
                delay = calculate_delay(attempts)
                
                # Log the delay
                import logging
                logger = logging.getLogger('security')
                logger.info(f"Applying {delay}s delay for user {username} (attempt {attempts})")
                
                # Apply delay
                time.sleep(delay)
            
            # Increment attempt counter
            cache.set(cache_key, attempts + 1, 3600)  # 1 hour expiry
        
        def clear_progressive_delay(username):
            """Clear progressive delay on successful login"""
            
            from django.core.cache import cache
            
            cache_key = f'login_attempts_{username}'
            cache.delete(cache_key)
        
        return {
            'calculate_delay': calculate_delay,
            'apply_progressive_delay': apply_progressive_delay,
            'clear_progressive_delay': clear_progressive_delay
        }

Security Monitoring and Alerting

# Comprehensive security monitoring and alerting
class SecurityMonitoring:
    """Monitor and alert on security events"""
    
    @staticmethod
    def implement_security_logging():
        """Implement comprehensive security logging"""
        
        # Logging configuration
        logging_config = {
            'version': 1,
            'disable_existing_loggers': False,
            'formatters': {
                'security': {
                    'format': '{asctime} {levelname} {name} {message}',
                    'style': '{',
                },
                'json': {
                    '()': 'pythonjsonlogger.jsonlogger.JsonFormatter',
                    'format': '%(asctime)s %(name)s %(levelname)s %(message)s'
                }
            },
            'handlers': {
                'security_file': {
                    'level': 'INFO',
                    'class': 'logging.handlers.RotatingFileHandler',
                    'filename': 'logs/security.log',
                    'maxBytes': 10485760,  # 10MB
                    'backupCount': 5,
                    'formatter': 'json',
                },
                'security_syslog': {
                    'level': 'WARNING',
                    'class': 'logging.handlers.SysLogHandler',
                    'address': ('localhost', 514),
                    'formatter': 'security',
                },
                'security_email': {
                    'level': 'CRITICAL',
                    'class': 'django.utils.log.AdminEmailHandler',
                    'formatter': 'security',
                }
            },
            'loggers': {
                'security': {
                    'handlers': ['security_file', 'security_syslog', 'security_email'],
                    'level': 'INFO',
                    'propagate': False,
                },
                'auth_audit': {
                    'handlers': ['security_file'],
                    'level': 'INFO',
                    'propagate': False,
                }
            }
        }
        
        return logging_config
    
    @staticmethod
    def implement_anomaly_detection():
        """Implement basic anomaly detection"""
        
        class AnomalyDetector:
            """Detect anomalous authentication patterns"""
            
            @staticmethod
            def detect_unusual_login_patterns(user):
                """Detect unusual login patterns for user"""
                
                from django.core.cache import cache
                
                anomalies = []
                
                # Check login frequency
                login_key = f'login_frequency_{user.id}'
                recent_logins = cache.get(login_key, [])
                
                if len(recent_logins) > 10:  # More than 10 logins in last hour
                    anomalies.append('high_frequency_logins')
                
                # Check login times
                current_hour = timezone.now().hour
                if current_hour < 6 or current_hour > 22:  # Outside normal hours
                    anomalies.append('unusual_time_login')
                
                # Check geographic anomalies (simplified)
                ip_key = f'user_ips_{user.id}'
                user_ips = cache.get(ip_key, set())
                
                if len(user_ips) > 5:  # More than 5 different IPs
                    anomalies.append('multiple_ip_addresses')
                
                return anomalies
            
            @staticmethod
            def detect_brute_force_patterns(ip_address):
                """Detect brute force attack patterns"""
                
                from django.core.cache import cache
                
                patterns = []
                
                # Check failed login rate
                failed_key = f'failed_login_{ip_address}'
                failed_attempts = cache.get(failed_key, 0)
                
                if failed_attempts > 3:
                    patterns.append('multiple_failed_logins')
                
                # Check username enumeration
                enum_key = f'username_enum_{ip_address}'
                enum_attempts = cache.get(enum_key, 0)
                
                if enum_attempts > 10:
                    patterns.append('username_enumeration')
                
                return patterns
            
            @staticmethod
            def update_user_behavior_baseline(user, request):
                """Update user behavior baseline"""
                
                from django.core.cache import cache
                
                # Track login frequency
                login_key = f'login_frequency_{user.id}'
                recent_logins = cache.get(login_key, [])
                recent_logins.append(timezone.now().isoformat())
                
                # Keep only last hour
                one_hour_ago = timezone.now() - timedelta(hours=1)
                recent_logins = [
                    login for login in recent_logins
                    if datetime.fromisoformat(login.replace('Z', '+00:00')) > one_hour_ago
                ]
                
                cache.set(login_key, recent_logins, 3600)
                
                # Track IP addresses
                ip_key = f'user_ips_{user.id}'
                user_ips = cache.get(ip_key, set())
                user_ips.add(request.META.get('REMOTE_ADDR'))
                cache.set(ip_key, user_ips, 86400)  # 24 hours
        
        return AnomalyDetector
    
    @staticmethod
    def implement_security_alerting():
        """Implement security alerting system"""
        
        class SecurityAlerting:
            """Handle security alerts and notifications"""
            
            @staticmethod
            def send_security_alert(alert_type, details, severity='medium'):
                """Send security alert"""
                
                # Log the alert
                import logging
                logger = logging.getLogger('security')
                
                log_level = {
                    'low': logger.info,
                    'medium': logger.warning,
                    'high': logger.error,
                    'critical': logger.critical
                }.get(severity, logger.warning)
                
                log_level(f"Security Alert [{alert_type}]: {details}")
                
                # Send email for high/critical alerts
                if severity in ['high', 'critical']:
                    SecurityAlerting.send_alert_email(alert_type, details, severity)
                
                # Send to monitoring system
                SecurityAlerting.send_to_monitoring(alert_type, details, severity)
            
            @staticmethod
            def send_alert_email(alert_type, details, severity):
                """Send security alert email"""
                
                from django.core.mail import send_mail
                
                subject = f'[SECURITY ALERT - {severity.upper()}] {alert_type}'
                
                message = f"""
                Security Alert Detected
                
                Type: {alert_type}
                Severity: {severity.upper()}
                Time: {timezone.now()}
                
                Details:
                {details}
                
                Please investigate immediately.
                """
                
                # Send to security team
                security_emails = getattr(settings, 'SECURITY_ALERT_EMAILS', [])
                
                if security_emails:
                    send_mail(
                        subject,
                        message,
                        settings.DEFAULT_FROM_EMAIL,
                        security_emails,
                        fail_silently=False,
                    )
            
            @staticmethod
            def send_to_monitoring(alert_type, details, severity):
                """Send alert to monitoring system"""
                
                # Example integration with monitoring system
                try:
                    import requests
                    
                    monitoring_url = getattr(settings, 'MONITORING_WEBHOOK_URL', None)
                    
                    if monitoring_url:
                        payload = {
                            'alert_type': alert_type,
                            'severity': severity,
                            'details': details,
                            'timestamp': timezone.now().isoformat(),
                            'source': 'django_app'
                        }
                        
                        requests.post(
                            monitoring_url,
                            json=payload,
                            timeout=5
                        )
                
                except Exception as e:
                    # Don't let monitoring failures affect the application
                    import logging
                    logger = logging.getLogger('security')
                    logger.error(f"Failed to send alert to monitoring: {e}")
            
            @staticmethod
            def create_security_incident(alert_type, details, severity):
                """Create security incident record"""
                
                try:
                    from myapp.models import SecurityIncident
                    
                    incident = SecurityIncident.objects.create(
                        alert_type=alert_type,
                        severity=severity,
                        details=details,
                        status='open',
                        created_at=timezone.now()
                    )
                    
                    return incident
                
                except ImportError:
                    # Model doesn't exist
                    pass
        
        return SecurityAlerting

# Security incident model
class SecurityIncident(models.Model):
    """Model to track security incidents"""
    
    SEVERITY_CHOICES = [
        ('low', 'Low'),
        ('medium', 'Medium'),
        ('high', 'High'),
        ('critical', 'Critical'),
    ]
    
    STATUS_CHOICES = [
        ('open', 'Open'),
        ('investigating', 'Investigating'),
        ('resolved', 'Resolved'),
        ('false_positive', 'False Positive'),
    ]
    
    alert_type = models.CharField(max_length=100)
    severity = models.CharField(max_length=20, choices=SEVERITY_CHOICES)
    status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='open')
    details = models.TextField()
    
    # Investigation fields
    assigned_to = models.ForeignKey(
        User, 
        on_delete=models.SET_NULL, 
        null=True, 
        blank=True,
        related_name='assigned_incidents'
    )
    investigation_notes = models.TextField(blank=True)
    resolution = models.TextField(blank=True)
    
    # Timestamps
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    resolved_at = models.DateTimeField(null=True, blank=True)
    
    class Meta:
        ordering = ['-created_at']
        indexes = [
            models.Index(fields=['severity', 'status']),
            models.Index(fields=['alert_type', '-created_at']),
        ]
    
    def __str__(self):
        return f"{self.alert_type} - {self.severity} - {self.status}"
    
    def resolve(self, resolution_notes, resolved_by=None):
        """Mark incident as resolved"""
        
        self.status = 'resolved'
        self.resolution = resolution_notes
        self.resolved_at = timezone.now()
        
        if resolved_by:
            self.assigned_to = resolved_by
        
        self.save()

Implementing comprehensive security best practices in Django authentication and authorization systems requires attention to multiple layers of security. By following these practices - from secure password policies and session management to rate limiting and security monitoring - you can build robust authentication systems that protect against common attacks and maintain user trust.