Authentication and Authorization

Login and Logout

Implementing secure and user-friendly login and logout functionality is fundamental to any Django application. Understanding how to handle authentication flows, manage sessions, and provide a smooth user experience while maintaining security is essential for building robust authentication systems.

Login and Logout

Implementing secure and user-friendly login and logout functionality is fundamental to any Django application. Understanding how to handle authentication flows, manage sessions, and provide a smooth user experience while maintaining security is essential for building robust authentication systems.

Login Implementation

Basic Login Flow

# Basic login implementation
from django.contrib.auth import authenticate, login, logout
from django.contrib.auth.forms import AuthenticationForm
from django.shortcuts import render, redirect
from django.contrib import messages
from django.views.decorators.csrf import csrf_protect
from django.views.decorators.cache import never_cache
from django.utils.decorators import method_decorator

class LoginFlowManager:
    """Manage the complete login flow"""
    
    @staticmethod
    def basic_login_process(request, username, password):
        """Basic login process implementation"""
        
        # Step 1: Authenticate user credentials
        user = authenticate(request, username=username, password=password)
        
        if user is not None:
            # Step 2: Check if user account is active
            if user.is_active:
                # Step 3: Log in the user (create session)
                login(request, user)
                
                # Step 4: Set session preferences
                request.session['login_timestamp'] = timezone.now().isoformat()
                request.session['user_preferences'] = {
                    'theme': getattr(user.profile, 'theme', 'light') if hasattr(user, 'profile') else 'light',
                    'language': getattr(user.profile, 'language', 'en') if hasattr(user, 'profile') else 'en',
                }
                
                # Step 5: Update user's last login
                user.last_login = timezone.now()
                user.save(update_fields=['last_login'])
                
                return True, "Login successful"
            else:
                return False, "Account is disabled"
        else:
            return False, "Invalid credentials"
    
    @staticmethod
    def enhanced_login_process(request, username, password, remember_me=False):
        """Enhanced login with additional security features"""
        
        # Rate limiting check
        if not LoginFlowManager.check_rate_limit(request, username):
            return False, "Too many login attempts. Please try again later."
        
        # Authenticate user
        user = authenticate(request, username=username, password=password)
        
        if user is not None:
            if user.is_active:
                # Check for account lockout
                if LoginFlowManager.is_account_locked(user):
                    return False, "Account is temporarily locked due to suspicious activity."
                
                # Successful login
                login(request, user)
                
                # Handle remember me functionality
                if remember_me:
                    request.session.set_expiry(30 * 24 * 60 * 60)  # 30 days
                else:
                    request.session.set_expiry(0)  # Browser session
                
                # Enhanced session data
                LoginFlowManager.set_enhanced_session_data(request, user)
                
                # Log successful login
                LoginFlowManager.log_login_attempt(request, user, success=True)
                
                # Clear failed login attempts
                LoginFlowManager.clear_failed_attempts(request, username)
                
                return True, f"Welcome back, {user.get_full_name() or user.username}!"
            else:
                LoginFlowManager.log_login_attempt(request, user, success=False, reason="inactive_account")
                return False, "Your account has been deactivated. Please contact support."
        else:
            # Failed authentication
            LoginFlowManager.log_failed_attempt(request, username)
            return False, "Invalid username or password."
    
    @staticmethod
    def check_rate_limit(request, username):
        """Check if login attempts are within rate limits"""
        
        from django.core.cache import cache
        
        # IP-based rate limiting
        ip_address = request.META.get('REMOTE_ADDR')
        ip_key = f"login_attempts_ip_{ip_address}"
        ip_attempts = cache.get(ip_key, 0)
        
        # Username-based rate limiting
        username_key = f"login_attempts_user_{username}"
        username_attempts = cache.get(username_key, 0)
        
        # Check limits
        if ip_attempts >= 10 or username_attempts >= 5:
            return False
        
        return True
    
    @staticmethod
    def log_failed_attempt(request, username):
        """Log and track failed login attempts"""
        
        from django.core.cache import cache
        import logging
        
        ip_address = request.META.get('REMOTE_ADDR')
        user_agent = request.META.get('HTTP_USER_AGENT', '')
        
        # Increment counters
        ip_key = f"login_attempts_ip_{ip_address}"
        username_key = f"login_attempts_user_{username}"
        
        ip_attempts = cache.get(ip_key, 0) + 1
        username_attempts = cache.get(username_key, 0) + 1
        
        cache.set(ip_key, ip_attempts, 300)  # 5 minutes
        cache.set(username_key, username_attempts, 300)
        
        # Log the attempt
        logger = logging.getLogger('security')
        logger.warning(
            f"Failed login attempt - Username: {username}, "
            f"IP: {ip_address}, User-Agent: {user_agent[:100]}"
        )
    
    @staticmethod
    def clear_failed_attempts(request, username):
        """Clear failed login attempt counters"""
        
        from django.core.cache import cache
        
        ip_address = request.META.get('REMOTE_ADDR')
        ip_key = f"login_attempts_ip_{ip_address}"
        username_key = f"login_attempts_user_{username}"
        
        cache.delete(ip_key)
        cache.delete(username_key)
    
    @staticmethod
    def set_enhanced_session_data(request, user):
        """Set enhanced session data for security and UX"""
        
        request.session.update({
            'login_timestamp': timezone.now().isoformat(),
            'login_ip': request.META.get('REMOTE_ADDR'),
            'user_agent_hash': hashlib.md5(
                request.META.get('HTTP_USER_AGENT', '').encode()
            ).hexdigest(),
            'user_id': user.id,
            'username': user.username,
            'is_staff': user.is_staff,
            'is_superuser': user.is_superuser,
        })
        
        # User preferences
        if hasattr(user, 'profile'):
            request.session['user_preferences'] = {
                'theme': user.profile.theme if hasattr(user.profile, 'theme') else 'light',
                'language': user.profile.language if hasattr(user.profile, 'language') else 'en',
                'timezone': str(user.profile.timezone) if hasattr(user.profile, 'timezone') else 'UTC',
            }
    
    @staticmethod
    def log_login_attempt(request, user, success=True, reason=None):
        """Log login attempts for audit purposes"""
        
        import logging
        
        logger = logging.getLogger('auth_audit')
        
        log_data = {
            'username': user.username if user else 'unknown',
            'ip_address': request.META.get('REMOTE_ADDR'),
            'user_agent': request.META.get('HTTP_USER_AGENT', '')[:200],
            'success': success,
            'timestamp': timezone.now().isoformat(),
        }
        
        if reason:
            log_data['reason'] = reason
        
        if success:
            logger.info(f"Successful login: {log_data}")
        else:
            logger.warning(f"Failed login: {log_data}")
    
    @staticmethod
    def is_account_locked(user):
        """Check if account is locked due to suspicious activity"""
        
        # This would integrate with your account security system
        # Example implementation:
        
        from django.core.cache import cache
        
        lock_key = f"account_locked_{user.username}"
        return cache.get(lock_key, False)

# Advanced login views
class AdvancedLoginView(LoginView):
    """Advanced login view with enhanced security"""
    
    template_name = 'auth/advanced_login.html'
    form_class = AuthenticationForm
    redirect_authenticated_user = True
    
    @method_decorator(csrf_protect)
    @method_decorator(never_cache)
    def dispatch(self, request, *args, **kwargs):
        return super().dispatch(request, *args, **kwargs)
    
    def get_context_data(self, **kwargs):
        """Add security context to login form"""
        
        context = super().get_context_data(**kwargs)
        
        # Add security information
        context.update({
            'security_features': [
                'Secure password hashing',
                'Rate limiting protection',
                'Session security',
                'Login attempt monitoring'
            ],
            'login_help': {
                'forgot_password': True,
                'account_locked': True,
                'registration': True,
            }
        })
        
        return context
    
    def form_valid(self, form):
        """Handle successful login with enhanced features"""
        
        user = form.get_user()
        remember_me = self.request.POST.get('remember_me', False)
        
        # Enhanced login process
        success, message = LoginFlowManager.enhanced_login_process(
            self.request, user.username, None, remember_me
        )
        
        if success:
            messages.success(self.request, message)
            
            # Check if password needs updating
            if self.password_needs_update(user):
                messages.warning(
                    self.request,
                    "Your password is old. Consider updating it for better security."
                )
            
            # Check for security alerts
            self.check_security_alerts(user)
            
            return super().form_valid(form)
        else:
            messages.error(self.request, message)
            return self.form_invalid(form)
    
    def password_needs_update(self, user):
        """Check if user's password needs updating"""
        
        if hasattr(user, 'profile') and hasattr(user.profile, 'password_changed_at'):
            password_age = timezone.now() - user.profile.password_changed_at
            return password_age.days > 90  # 90 days
        
        return False
    
    def check_security_alerts(self, user):
        """Check for security alerts to show user"""
        
        alerts = []
        
        # Check for new login from different location
        last_ip = self.request.session.get('last_login_ip')
        current_ip = self.request.META.get('REMOTE_ADDR')
        
        if last_ip and last_ip != current_ip:
            alerts.append("New login from a different location detected.")
        
        # Check for unusual login time
        current_hour = timezone.now().hour
        if current_hour < 6 or current_hour > 22:  # Outside normal hours
            alerts.append("Login outside normal hours detected.")
        
        # Show alerts
        for alert in alerts:
            messages.info(self.request, f"Security Notice: {alert}")
        
        # Store current IP for next login
        self.request.session['last_login_ip'] = current_ip

# Multi-step login with 2FA
class TwoFactorLoginView(LoginView):
    """Login view with two-factor authentication"""
    
    template_name = 'auth/2fa_login.html'
    
    def form_valid(self, form):
        """Handle first step of 2FA login"""
        
        user = form.get_user()
        
        # Check if user has 2FA enabled
        if self.user_has_2fa_enabled(user):
            # Store user temporarily in session
            self.request.session['2fa_user_id'] = user.id
            self.request.session['2fa_timestamp'] = timezone.now().isoformat()
            
            # Don't log in yet, redirect to 2FA verification
            return redirect('2fa_verify')
        
        # Normal login for users without 2FA
        return super().form_valid(form)
    
    def user_has_2fa_enabled(self, user):
        """Check if user has 2FA enabled"""
        
        return (hasattr(user, 'profile') and 
                getattr(user.profile, 'two_factor_enabled', False))

def two_factor_verify_view(request):
    """Verify two-factor authentication code"""
    
    # Check if user is in 2FA flow
    user_id = request.session.get('2fa_user_id')
    if not user_id:
        messages.error(request, 'Invalid 2FA session. Please log in again.')
        return redirect('login')
    
    # Check session timeout (5 minutes)
    timestamp_str = request.session.get('2fa_timestamp')
    if timestamp_str:
        timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
        if timezone.now() - timestamp > timedelta(minutes=5):
            del request.session['2fa_user_id']
            del request.session['2fa_timestamp']
            messages.error(request, '2FA session expired. Please log in again.')
            return redirect('login')
    
    try:
        user = User.objects.get(id=user_id)
    except User.DoesNotExist:
        messages.error(request, 'Invalid user. Please log in again.')
        return redirect('login')
    
    if request.method == 'POST':
        token = request.POST.get('token', '').strip()
        
        if len(token) == 6 and token.isdigit():
            # Verify TOTP token
            if verify_totp_token(user, token):
                # Complete login
                login(request, user)
                
                # Clean up 2FA session data
                del request.session['2fa_user_id']
                del request.session['2fa_timestamp']
                
                # Set enhanced session data
                LoginFlowManager.set_enhanced_session_data(request, user)
                
                messages.success(request, 'Login successful!')
                return redirect('dashboard')
            else:
                messages.error(request, 'Invalid verification code. Please try again.')
        else:
            messages.error(request, 'Please enter a valid 6-digit code.')
    
    return render(request, 'auth/2fa_verify.html', {'user': user})

def verify_totp_token(user, token):
    """Verify TOTP token for user"""
    
    if not hasattr(user, 'profile') or not user.profile.totp_secret:
        return False
    
    try:
        import pyotp
        totp = pyotp.TOTP(user.profile.totp_secret)
        return totp.verify(token, valid_window=1)
    except ImportError:
        return False

Logout Implementation

Secure Logout Process

# Comprehensive logout implementation
class LogoutManager:
    """Manage the complete logout process"""
    
    @staticmethod
    def basic_logout_process(request):
        """Basic logout process"""
        
        if request.user.is_authenticated:
            username = request.user.username
            
            # Log the logout
            import logging
            logger = logging.getLogger('auth')
            logger.info(f"User {username} logged out")
            
            # Perform logout
            logout(request)
            
            return True, f"Goodbye, {username}!"
        
        return False, "No user was logged in."
    
    @staticmethod
    def enhanced_logout_process(request):
        """Enhanced logout with security cleanup"""
        
        if not request.user.is_authenticated:
            return False, "No user was logged in."
        
        user = request.user
        username = user.username
        
        # Log logout with details
        LogoutManager.log_logout_attempt(request, user)
        
        # Clear sensitive session data
        LogoutManager.clear_sensitive_session_data(request)
        
        # Invalidate remember me tokens if any
        LogoutManager.invalidate_remember_tokens(user)
        
        # Perform logout
        logout(request)
        
        # Clear any cached user data
        LogoutManager.clear_user_cache(user)
        
        return True, f"You have been securely logged out, {username}."
    
    @staticmethod
    def log_logout_attempt(request, user):
        """Log logout attempt for audit purposes"""
        
        import logging
        
        logger = logging.getLogger('auth_audit')
        
        log_data = {
            'username': user.username,
            'user_id': user.id,
            'ip_address': request.META.get('REMOTE_ADDR'),
            'user_agent': request.META.get('HTTP_USER_AGENT', '')[:200],
            'session_key': request.session.session_key,
            'timestamp': timezone.now().isoformat(),
        }
        
        logger.info(f"User logout: {log_data}")
    
    @staticmethod
    def clear_sensitive_session_data(request):
        """Clear sensitive data from session"""
        
        # List of sensitive session keys to clear
        sensitive_keys = [
            'user_preferences',
            'shopping_cart',
            'temp_data',
            'form_data',
            'search_history',
            'last_viewed_items',
        ]
        
        for key in sensitive_keys:
            request.session.pop(key, None)
        
        # Clear any keys that might contain sensitive data
        keys_to_remove = []
        for key in request.session.keys():
            if any(sensitive in key.lower() for sensitive in ['password', 'token', 'secret']):
                keys_to_remove.append(key)
        
        for key in keys_to_remove:
            del request.session[key]
    
    @staticmethod
    def invalidate_remember_tokens(user):
        """Invalidate any remember me tokens"""
        
        # This would depend on your remember me implementation
        # Example with a custom token model:
        
        try:
            from myapp.models import RememberToken
            RememberToken.objects.filter(user=user).delete()
        except ImportError:
            pass
    
    @staticmethod
    def clear_user_cache(user):
        """Clear cached user data"""
        
        from django.core.cache import cache
        
        # Clear permission cache
        cache.delete(f'user_permissions_{user.id}')
        
        # Clear profile cache
        cache.delete(f'user_profile_{user.id}')
        
        # Clear any other user-specific cache
        cache_keys = [
            f'user_settings_{user.id}',
            f'user_notifications_{user.id}',
            f'user_dashboard_{user.id}',
        ]
        
        cache.delete_many(cache_keys)

class EnhancedLogoutView(LogoutView):
    """Enhanced logout view with security features"""
    
    template_name = 'auth/logout.html'
    
    def dispatch(self, request, *args, **kwargs):
        """Handle logout with enhanced security"""
        
        if request.user.is_authenticated:
            # Enhanced logout process
            success, message = LogoutManager.enhanced_logout_process(request)
            
            if success:
                messages.success(request, message)
            else:
                messages.info(request, message)
        
        return super().dispatch(request, *args, **kwargs)
    
    def get_context_data(self, **kwargs):
        """Add logout context"""
        
        context = super().get_context_data(**kwargs)
        
        context.update({
            'logout_message': 'You have been securely logged out.',
            'security_tips': [
                'Close your browser if using a shared computer',
                'Clear your browser cache if on a public computer',
                'Consider changing your password if you suspect unauthorized access'
            ],
            'next_steps': [
                ('Login again', 'login'),
                ('Go to homepage', 'home'),
                ('Contact support', 'support'),
            ]
        })
        
        return context

# AJAX logout support
class AjaxLogoutView(View):
    """AJAX-enabled logout view"""
    
    def post(self, request, *args, **kwargs):
        """Handle AJAX logout request"""
        
        if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
            if request.user.is_authenticated:
                username = request.user.username
                
                # Enhanced logout
                success, message = LogoutManager.enhanced_logout_process(request)
                
                return JsonResponse({
                    'success': success,
                    'message': message,
                    'redirect_url': reverse('home')
                })
            else:
                return JsonResponse({
                    'success': False,
                    'message': 'No user was logged in.'
                })
        
        # Fallback to regular logout
        return redirect('logout')

# Global logout (terminate all sessions)
class GlobalLogoutView(LoginRequiredMixin, View):
    """Logout from all devices/sessions"""
    
    def post(self, request, *args, **kwargs):
        """Terminate all user sessions"""
        
        user = request.user
        current_session_key = request.session.session_key
        
        # Get all user sessions
        from django.contrib.sessions.models import Session
        
        terminated_sessions = 0
        
        for session in Session.objects.filter(expire_date__gt=timezone.now()):
            try:
                session_data = session.get_decoded()
                session_user_id = session_data.get('_auth_user_id')
                
                if session_user_id and int(session_user_id) == user.id:
                    # Don't terminate current session yet
                    if session.session_key != current_session_key:
                        session.delete()
                        terminated_sessions += 1
            except:
                continue
        
        # Log global logout
        import logging
        logger = logging.getLogger('security')
        logger.info(
            f"Global logout performed by user {user.username}. "
            f"Terminated {terminated_sessions} sessions."
        )
        
        # Now logout current session
        LogoutManager.enhanced_logout_process(request)
        
        messages.success(
            request,
            f'You have been logged out from all devices. '
            f'{terminated_sessions} other sessions were terminated.'
        )
        
        return redirect('home')

Session Management

Advanced Session Handling

# Advanced session management for login/logout
class SessionManager:
    """Advanced session management utilities"""
    
    @staticmethod
    def create_secure_session(request, user):
        """Create a secure session for user"""
        
        # Regenerate session key for security
        request.session.cycle_key()
        
        # Set session data
        request.session.update({
            '_auth_user_id': str(user.pk),
            '_auth_user_backend': 'django.contrib.auth.backends.ModelBackend',
            'login_timestamp': timezone.now().isoformat(),
            'session_security': {
                'ip_address': request.META.get('REMOTE_ADDR'),
                'user_agent_hash': hashlib.md5(
                    request.META.get('HTTP_USER_AGENT', '').encode()
                ).hexdigest(),
                'created_at': timezone.now().isoformat(),
            }
        })
        
        # Set session expiry based on user preferences
        SessionManager.set_session_expiry(request, user)
    
    @staticmethod
    def set_session_expiry(request, user):
        """Set appropriate session expiry"""
        
        # Check user preferences
        if hasattr(user, 'profile'):
            session_timeout = getattr(user.profile, 'session_timeout', None)
            
            if session_timeout:
                request.session.set_expiry(session_timeout)
                return
        
        # Default: 8 hours for regular users, 1 hour for staff
        if user.is_staff:
            request.session.set_expiry(3600)  # 1 hour
        else:
            request.session.set_expiry(28800)  # 8 hours
    
    @staticmethod
    def validate_session_security(request):
        """Validate session security"""
        
        if not request.user.is_authenticated:
            return True
        
        session_security = request.session.get('session_security', {})
        
        # Check IP address consistency
        stored_ip = session_security.get('ip_address')
        current_ip = request.META.get('REMOTE_ADDR')
        
        if stored_ip and stored_ip != current_ip:
            # IP changed - potential session hijacking
            return False
        
        # Check user agent consistency
        stored_ua_hash = session_security.get('user_agent_hash')
        current_ua_hash = hashlib.md5(
            request.META.get('HTTP_USER_AGENT', '').encode()
        ).hexdigest()
        
        if stored_ua_hash and stored_ua_hash != current_ua_hash:
            # User agent changed - potential session hijacking
            return False
        
        return True
    
    @staticmethod
    def get_session_info(request):
        """Get comprehensive session information"""
        
        if not request.user.is_authenticated:
            return None
        
        session_security = request.session.get('session_security', {})
        
        return {
            'session_key': request.session.session_key,
            'user_id': request.user.id,
            'username': request.user.username,
            'login_timestamp': request.session.get('login_timestamp'),
            'ip_address': session_security.get('ip_address'),
            'user_agent': request.META.get('HTTP_USER_AGENT', '')[:100],
            'expires_at': request.session.get_expiry_date(),
            'is_secure': request.is_secure(),
        }
    
    @staticmethod
    def cleanup_expired_sessions():
        """Clean up expired sessions (run as management command)"""
        
        from django.contrib.sessions.models import Session
        from django.core.management.base import BaseCommand
        
        class Command(BaseCommand):
            help = 'Clean up expired sessions'
            
            def handle(self, *args, **options):
                expired_sessions = Session.objects.filter(
                    expire_date__lt=timezone.now()
                )
                
                count = expired_sessions.count()
                expired_sessions.delete()
                
                self.stdout.write(
                    self.style.SUCCESS(
                        f'Successfully cleaned up {count} expired sessions'
                    )
                )
        
        return Command

# Session security middleware
class SessionSecurityMiddleware:
    """Middleware to enforce session security"""
    
    def __init__(self, get_response):
        self.get_response = get_response
    
    def __call__(self, request):
        # Validate session security before processing request
        if hasattr(request, 'user') and request.user.is_authenticated:
            if not SessionManager.validate_session_security(request):
                # Session security compromised
                logout(request)
                
                messages.warning(
                    request,
                    'Your session was terminated for security reasons. Please log in again.'
                )
                
                return redirect('login')
        
        response = self.get_response(request)
        
        # Update session security info after successful request
        if (hasattr(request, 'user') and request.user.is_authenticated and 
            request.method == 'POST'):
            
            session_security = request.session.get('session_security', {})
            session_security['last_activity'] = timezone.now().isoformat()
            request.session['session_security'] = session_security
        
        return response

# Remember me functionality
class RememberMeManager:
    """Manage remember me functionality"""
    
    @staticmethod
    def create_remember_token(user):
        """Create a remember me token"""
        
        import secrets
        
        # Generate secure token
        token = secrets.token_urlsafe(32)
        
        # Store token (you'd need a RememberToken model)
        try:
            from myapp.models import RememberToken
            
            RememberToken.objects.create(
                user=user,
                token=token,
                expires_at=timezone.now() + timedelta(days=30)
            )
            
            return token
        except ImportError:
            return None
    
    @staticmethod
    def validate_remember_token(token):
        """Validate and use remember me token"""
        
        try:
            from myapp.models import RememberToken
            
            remember_token = RememberToken.objects.get(
                token=token,
                expires_at__gt=timezone.now()
            )
            
            user = remember_token.user
            
            # Token is valid, but delete it (one-time use)
            remember_token.delete()
            
            return user
        except (ImportError, RememberToken.DoesNotExist):
            return None
    
    @staticmethod
    def cleanup_expired_tokens():
        """Clean up expired remember me tokens"""
        
        try:
            from myapp.models import RememberToken
            
            expired_count = RememberToken.objects.filter(
                expires_at__lt=timezone.now()
            ).delete()[0]
            
            return expired_count
        except ImportError:
            return 0

Implementing secure and user-friendly login and logout functionality requires careful attention to security details, user experience, and proper session management. By following these patterns and implementing comprehensive security measures, you can create authentication flows that protect users while providing a smooth experience.