Forms and User Input

Security Considerations for Forms

Form security is critical for protecting applications from various attacks and ensuring data integrity. This chapter covers comprehensive security measures, from CSRF protection to input validation and advanced security patterns.

Security Considerations for Forms

Form security is critical for protecting applications from various attacks and ensuring data integrity. This chapter covers comprehensive security measures, from CSRF protection to input validation and advanced security patterns.

CSRF Protection

Understanding and Implementing CSRF Protection

# settings.py - CSRF configuration
MIDDLEWARE = [
    'django.middleware.security.SecurityMiddleware',
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.common.CommonMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',  # CSRF middleware
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
    'django.middleware.clickjacking.XFrameOptionsMiddleware',
]

# CSRF settings
CSRF_COOKIE_SECURE = True  # Use HTTPS only
CSRF_COOKIE_HTTPONLY = True  # Prevent JavaScript access
CSRF_COOKIE_SAMESITE = 'Strict'  # Strict same-site policy
CSRF_COOKIE_AGE = 3600  # 1 hour expiration
CSRF_USE_SESSIONS = False  # Use cookies instead of sessions
CSRF_COOKIE_NAME = 'csrftoken'  # Custom cookie name

# Trusted origins for CSRF
CSRF_TRUSTED_ORIGINS = [
    'https://yourdomain.com',
    'https://api.yourdomain.com',
]

# views.py - CSRF protection in views
from django.views.decorators.csrf import csrf_protect, csrf_exempt
from django.utils.decorators import method_decorator
from django.middleware.csrf import get_token
from django.http import JsonResponse

@csrf_protect
def secure_form_view(request):
    """View with explicit CSRF protection"""
    if request.method == 'POST':
        form = SecureForm(request.POST)
        if form.is_valid():
            # Process form securely
            process_secure_data(form.cleaned_data)
            return redirect('success')
    else:
        form = SecureForm()
    
    return render(request, 'secure_form.html', {
        'form': form,
        'csrf_token': get_token(request)
    })

@method_decorator(csrf_protect, name='dispatch')
class SecureFormView(View):
    """Class-based view with CSRF protection"""
    
    def get(self, request):
        form = SecureForm()
        return render(request, 'secure_form.html', {'form': form})
    
    def post(self, request):
        form = SecureForm(request.POST)
        if form.is_valid():
            self.process_form(form)
            return JsonResponse({'success': True})
        return JsonResponse({'errors': form.errors}, status=400)

# AJAX CSRF handling
def get_csrf_token(request):
    """API endpoint to get CSRF token for AJAX"""
    return JsonResponse({'csrf_token': get_token(request)})

# JavaScript CSRF handling
"""
// static/js/csrf.js
function getCSRFToken() {
    const cookies = document.cookie.split(';');
    for (let cookie of cookies) {
        const [name, value] = cookie.trim().split('=');
        if (name === 'csrftoken') {
            return decodeURIComponent(value);
        }
    }
    return null;
}

// Setup CSRF for all AJAX requests
function setupCSRF() {
    const csrfToken = getCSRFToken();
    
    // jQuery setup
    if (typeof $ !== 'undefined') {
        $.ajaxSetup({
            beforeSend: function(xhr, settings) {
                if (!/^(GET|HEAD|OPTIONS|TRACE)$/i.test(settings.type) && !this.crossDomain) {
                    xhr.setRequestHeader("X-CSRFToken", csrfToken);
                }
            }
        });
    }
    
    // Fetch API setup
    const originalFetch = window.fetch;
    window.fetch = function(url, options = {}) {
        if (options.method && !/^(GET|HEAD|OPTIONS|TRACE)$/i.test(options.method)) {
            options.headers = options.headers || {};
            options.headers['X-CSRFToken'] = csrfToken;
        }
        return originalFetch(url, options);
    };
}

document.addEventListener('DOMContentLoaded', setupCSRF);
"""

Custom CSRF Validation

# forms.py - Custom CSRF validation
from django import forms
from django.middleware.csrf import get_token
from django.core.exceptions import ValidationError

class CSRFValidatedForm(forms.Form):
    """Form with additional CSRF validation"""
    
    csrf_token = forms.CharField(widget=forms.HiddenInput(), required=False)
    
    def __init__(self, request=None, *args, **kwargs):
        self.request = request
        super().__init__(*args, **kwargs)
        
        if request:
            self.fields['csrf_token'].initial = get_token(request)
    
    def clean_csrf_token(self):
        """Additional CSRF token validation"""
        token = self.cleaned_data.get('csrf_token')
        
        if self.request:
            expected_token = get_token(self.request)
            if token != expected_token:
                raise ValidationError('CSRF token mismatch.')
        
        return token

# Custom CSRF failure handling
def csrf_failure_view(request, reason=""):
    """Custom CSRF failure response"""
    context = {
        'reason': reason,
        'request_path': request.path,
        'referer': request.META.get('HTTP_REFERER', ''),
    }
    
    if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
        return JsonResponse({
            'error': 'CSRF verification failed',
            'reason': reason
        }, status=403)
    
    return render(request, 'csrf_failure.html', context, status=403)

# settings.py
CSRF_FAILURE_VIEW = 'myapp.views.csrf_failure_view'

Input Validation and Sanitization

Comprehensive Input Validation

# validators.py - Custom security validators
import re
import bleach
from django.core.exceptions import ValidationError
from django.utils.html import strip_tags

def validate_no_script_tags(value):
    """Prevent script tag injection"""
    if '<script' in value.lower() or '</script>' in value.lower():
        raise ValidationError('Script tags are not allowed.')

def validate_no_sql_injection(value):
    """Basic SQL injection prevention"""
    sql_keywords = [
        'select', 'insert', 'update', 'delete', 'drop', 'create',
        'alter', 'exec', 'execute', 'union', 'script'
    ]
    
    value_lower = value.lower()
    for keyword in sql_keywords:
        if keyword in value_lower:
            raise ValidationError(f'Potentially dangerous content detected: {keyword}')

def validate_safe_filename(value):
    """Validate filename for security"""
    # Check for directory traversal
    if '..' in value or '/' in value or '\\' in value:
        raise ValidationError('Invalid filename: directory traversal detected.')
    
    # Check for dangerous extensions
    dangerous_extensions = ['.exe', '.bat', '.cmd', '.scr', '.pif', '.php', '.asp']
    if any(value.lower().endswith(ext) for ext in dangerous_extensions):
        raise ValidationError('File type not allowed.')
    
    # Check filename length
    if len(value) > 255:
        raise ValidationError('Filename too long.')

def validate_url_safety(value):
    """Validate URL for security"""
    # Check for dangerous protocols
    dangerous_protocols = ['javascript:', 'data:', 'vbscript:', 'file:']
    value_lower = value.lower()
    
    for protocol in dangerous_protocols:
        if value_lower.startswith(protocol):
            raise ValidationError(f'Protocol {protocol} is not allowed.')

def validate_phone_number(value):
    """Validate phone number format"""
    # Remove common formatting characters
    cleaned = re.sub(r'[^\d+]', '', value)
    
    # Check format
    if not re.match(r'^\+?[\d\s\-\(\)]{10,15}$', value):
        raise ValidationError('Invalid phone number format.')
    
    return cleaned

# forms.py - Secure form with validation
from django import forms
from .validators import *

class SecureContactForm(forms.Form):
    """Contact form with comprehensive security validation"""
    
    name = forms.CharField(
        max_length=100,
        validators=[validate_no_script_tags],
        widget=forms.TextInput(attrs={'class': 'form-control'})
    )
    
    email = forms.EmailField(
        widget=forms.EmailInput(attrs={'class': 'form-control'})
    )
    
    phone = forms.CharField(
        max_length=20,
        validators=[validate_phone_number],
        required=False,
        widget=forms.TextInput(attrs={'class': 'form-control'})
    )
    
    website = forms.URLField(
        validators=[validate_url_safety],
        required=False,
        widget=forms.URLInput(attrs={'class': 'form-control'})
    )
    
    message = forms.CharField(
        validators=[validate_no_script_tags, validate_no_sql_injection],
        widget=forms.Textarea(attrs={'class': 'form-control', 'rows': 5})
    )
    
    attachment = forms.FileField(
        validators=[validate_safe_filename],
        required=False,
        widget=forms.FileInput(attrs={'class': 'form-control'})
    )
    
    def clean_name(self):
        """Additional name validation and sanitization"""
        name = self.cleaned_data['name']
        
        # Strip HTML tags
        name = strip_tags(name)
        
        # Remove excessive whitespace
        name = ' '.join(name.split())
        
        # Check for minimum length
        if len(name) < 2:
            raise ValidationError('Name must be at least 2 characters long.')
        
        # Check for valid characters
        if not re.match(r'^[a-zA-Z\s\-\'\.]+$', name):
            raise ValidationError('Name contains invalid characters.')
        
        return name
    
    def clean_message(self):
        """Sanitize message content"""
        message = self.cleaned_data['message']
        
        # Allow only safe HTML tags
        allowed_tags = ['p', 'br', 'strong', 'em', 'u']
        allowed_attributes = {}
        
        # Clean HTML
        clean_message = bleach.clean(
            message,
            tags=allowed_tags,
            attributes=allowed_attributes,
            strip=True
        )
        
        # Check minimum length
        if len(strip_tags(clean_message)) < 10:
            raise ValidationError('Message must be at least 10 characters long.')
        
        return clean_message
    
    def clean_attachment(self):
        """Validate file upload security"""
        attachment = self.cleaned_data.get('attachment')
        
        if attachment:
            # Check file size (5MB limit)
            if attachment.size > 5 * 1024 * 1024:
                raise ValidationError('File size cannot exceed 5MB.')
            
            # Check MIME type
            allowed_types = [
                'text/plain',
                'application/pdf',
                'image/jpeg',
                'image/png',
                'image/gif'
            ]
            
            if attachment.content_type not in allowed_types:
                raise ValidationError('File type not allowed.')
            
            # Scan file content for malicious patterns
            self.scan_file_content(attachment)
        
        return attachment
    
    def scan_file_content(self, file):
        """Basic file content scanning"""
        # Read first 1KB for scanning
        file.seek(0)
        content = file.read(1024).decode('utf-8', errors='ignore')
        file.seek(0)  # Reset file pointer
        
        # Check for suspicious patterns
        suspicious_patterns = [
            r'<script[^>]*>',
            r'javascript:',
            r'eval\s*\(',
            r'document\.write',
            r'<iframe[^>]*>',
        ]
        
        for pattern in suspicious_patterns:
            if re.search(pattern, content, re.IGNORECASE):
                raise ValidationError('File contains potentially malicious content.')

Rate Limiting and Abuse Prevention

Form Submission Rate Limiting

# utils.py - Rate limiting utilities
import time
from django.core.cache import cache
from django.http import HttpResponseTooManyRequests
from django.utils.decorators import method_decorator
from functools import wraps

class RateLimiter:
    """Rate limiting utility"""
    
    def __init__(self, key_prefix='rate_limit', window=3600, max_requests=10):
        self.key_prefix = key_prefix
        self.window = window  # Time window in seconds
        self.max_requests = max_requests
    
    def get_cache_key(self, identifier):
        """Generate cache key for rate limiting"""
        return f'{self.key_prefix}:{identifier}'
    
    def is_rate_limited(self, identifier):
        """Check if identifier is rate limited"""
        cache_key = self.get_cache_key(identifier)
        
        # Get current request count and timestamps
        requests = cache.get(cache_key, [])
        now = time.time()
        
        # Remove old requests outside the window
        requests = [req_time for req_time in requests if now - req_time < self.window]
        
        # Check if limit exceeded
        if len(requests) >= self.max_requests:
            return True, self.window - (now - requests[0])  # Time until reset
        
        # Add current request
        requests.append(now)
        cache.set(cache_key, requests, self.window)
        
        return False, 0
    
    def get_remaining_requests(self, identifier):
        """Get remaining requests for identifier"""
        cache_key = self.get_cache_key(identifier)
        requests = cache.get(cache_key, [])
        now = time.time()
        
        # Count recent requests
        recent_requests = [req for req in requests if now - req < self.window]
        return max(0, self.max_requests - len(recent_requests))

def rate_limit(key_func=None, window=3600, max_requests=10):
    """Rate limiting decorator"""
    
    def decorator(view_func):
        @wraps(view_func)
        def wrapper(request, *args, **kwargs):
            # Generate rate limit key
            if key_func:
                identifier = key_func(request)
            else:
                identifier = get_client_ip(request)
            
            limiter = RateLimiter(
                key_prefix=f'rate_limit_{view_func.__name__}',
                window=window,
                max_requests=max_requests
            )
            
            is_limited, reset_time = limiter.is_rate_limited(identifier)
            
            if is_limited:
                response = HttpResponseTooManyRequests(
                    'Rate limit exceeded. Try again later.'
                )
                response['Retry-After'] = str(int(reset_time))
                return response
            
            return view_func(request, *args, **kwargs)
        
        return wrapper
    return decorator

def get_client_ip(request):
    """Get client IP address"""
    x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
    if x_forwarded_for:
        return x_forwarded_for.split(',')[0].strip()
    return request.META.get('REMOTE_ADDR')

def user_rate_limit_key(request):
    """Generate rate limit key based on user"""
    if request.user.is_authenticated:
        return f'user_{request.user.id}'
    return get_client_ip(request)

# views.py - Rate limited views
from django.shortcuts import render, redirect
from django.contrib import messages
from .utils import rate_limit, user_rate_limit_key

@rate_limit(key_func=user_rate_limit_key, window=3600, max_requests=5)
def contact_form_view(request):
    """Contact form with rate limiting"""
    if request.method == 'POST':
        form = SecureContactForm(request.POST, request.FILES)
        if form.is_valid():
            # Process form
            process_contact_form(form.cleaned_data)
            messages.success(request, 'Message sent successfully!')
            return redirect('contact_success')
    else:
        form = SecureContactForm()
    
    return render(request, 'contact.html', {'form': form})

class RateLimitedFormView(View):
    """Base view with rate limiting"""
    
    rate_limit_window = 3600  # 1 hour
    rate_limit_max_requests = 10
    
    def dispatch(self, request, *args, **kwargs):
        # Check rate limit
        identifier = self.get_rate_limit_identifier(request)
        limiter = RateLimiter(
            key_prefix=f'rate_limit_{self.__class__.__name__}',
            window=self.rate_limit_window,
            max_requests=self.rate_limit_max_requests
        )
        
        is_limited, reset_time = limiter.is_rate_limited(identifier)
        
        if is_limited:
            return self.handle_rate_limit_exceeded(reset_time)
        
        return super().dispatch(request, *args, **kwargs)
    
    def get_rate_limit_identifier(self, request):
        """Get identifier for rate limiting"""
        return user_rate_limit_key(request)
    
    def handle_rate_limit_exceeded(self, reset_time):
        """Handle rate limit exceeded"""
        if self.request.headers.get('X-Requested-With') == 'XMLHttpRequest':
            return JsonResponse({
                'error': 'Rate limit exceeded',
                'retry_after': int(reset_time)
            }, status=429)
        
        return render(self.request, 'rate_limit_exceeded.html', {
            'retry_after': int(reset_time)
        }, status=429)

Honeypot and Bot Detection

# forms.py - Honeypot and bot detection
from django import forms
from django.core.exceptions import ValidationError
import time

class HoneypotMixin:
    """Mixin to add honeypot fields for bot detection"""
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        
        # Add honeypot field (hidden from users)
        self.fields['website_url'] = forms.CharField(
            required=False,
            widget=forms.TextInput(attrs={
                'style': 'display: none !important;',
                'tabindex': '-1',
                'autocomplete': 'off'
            })
        )
        
        # Add timestamp field for timing analysis
        self.fields['form_timestamp'] = forms.CharField(
            widget=forms.HiddenInput(),
            initial=str(int(time.time()))
        )
    
    def clean_website_url(self):
        """Honeypot validation"""
        website_url = self.cleaned_data.get('website_url')
        
        # If honeypot field is filled, it's likely a bot
        if website_url:
            raise ValidationError('Bot detected.')
        
        return website_url
    
    def clean_form_timestamp(self):
        """Timing-based bot detection"""
        timestamp = self.cleaned_data.get('form_timestamp')
        
        try:
            form_time = int(timestamp)
            current_time = int(time.time())
            time_diff = current_time - form_time
            
            # Form filled too quickly (likely bot)
            if time_diff < 3:  # Less than 3 seconds
                raise ValidationError('Form submitted too quickly.')
            
            # Form took too long (possible session hijacking)
            if time_diff > 3600:  # More than 1 hour
                raise ValidationError('Form session expired.')
            
        except (ValueError, TypeError):
            raise ValidationError('Invalid form timestamp.')
        
        return timestamp

class BotDetectionForm(HoneypotMixin, forms.Form):
    """Form with bot detection capabilities"""
    
    name = forms.CharField(max_length=100)
    email = forms.EmailField()
    message = forms.CharField(widget=forms.Textarea)
    
    def clean(self):
        """Additional bot detection"""
        cleaned_data = super().clean()
        
        # Check for suspicious patterns
        self.check_duplicate_content(cleaned_data)
        self.check_spam_patterns(cleaned_data)
        
        return cleaned_data
    
    def check_duplicate_content(self, data):
        """Check for duplicate or repetitive content"""
        name = data.get('name', '')
        email = data.get('email', '')
        message = data.get('message', '')
        
        # Check if name and message are identical (suspicious)
        if name.lower() == message.lower():
            raise ValidationError('Suspicious content detected.')
        
        # Check for excessive repetition
        if len(set(message.split())) < len(message.split()) * 0.3:
            raise ValidationError('Message contains excessive repetition.')
    
    def check_spam_patterns(self, data):
        """Check for common spam patterns"""
        message = data.get('message', '').lower()
        
        spam_indicators = [
            'click here',
            'free money',
            'guaranteed',
            'no risk',
            'act now',
            'limited time',
            'viagra',
            'casino',
            'lottery',
            'winner'
        ]
        
        spam_count = sum(1 for indicator in spam_indicators if indicator in message)
        
        if spam_count >= 3:
            raise ValidationError('Message appears to be spam.')

# Advanced bot detection with behavioral analysis
class BehavioralBotDetection:
    """Behavioral analysis for bot detection"""
    
    def __init__(self, request):
        self.request = request
    
    def analyze_request(self):
        """Analyze request for bot-like behavior"""
        score = 0
        
        # Check User-Agent
        user_agent = self.request.META.get('HTTP_USER_AGENT', '').lower()
        if not user_agent or 'bot' in user_agent or 'crawler' in user_agent:
            score += 30
        
        # Check for missing common headers
        common_headers = ['HTTP_ACCEPT', 'HTTP_ACCEPT_LANGUAGE', 'HTTP_ACCEPT_ENCODING']
        missing_headers = sum(1 for header in common_headers if not self.request.META.get(header))
        score += missing_headers * 10
        
        # Check referer
        referer = self.request.META.get('HTTP_REFERER', '')
        if not referer:
            score += 15
        
        # Check for suspicious IP patterns
        ip = get_client_ip(self.request)
        if self.is_suspicious_ip(ip):
            score += 25
        
        return score >= 50  # Threshold for bot detection
    
    def is_suspicious_ip(self, ip):
        """Check if IP is from known bot networks"""
        # This would typically check against known bot IP ranges
        # For demo purposes, we'll check for localhost
        return ip in ['127.0.0.1', '::1']

# Middleware for bot detection
class BotDetectionMiddleware:
    """Middleware to detect and block bots"""
    
    def __init__(self, get_response):
        self.get_response = get_response
    
    def __call__(self, request):
        # Skip bot detection for certain paths
        skip_paths = ['/admin/', '/api/', '/static/', '/media/']
        if any(request.path.startswith(path) for path in skip_paths):
            return self.get_response(request)
        
        # Perform bot detection
        detector = BehavioralBotDetection(request)
        if detector.analyze_request():
            # Log bot attempt
            import logging
            logger = logging.getLogger('security')
            logger.warning(f'Bot detected from IP {get_client_ip(request)}')
            
            # Return 403 Forbidden
            return HttpResponse('Access Denied', status=403)
        
        return self.get_response(request)

File Upload Security

Secure File Upload Handling

# utils.py - File upload security utilities
import os
import magic
import hashlib
from django.core.exceptions import ValidationError
from django.conf import settings

class SecureFileUpload:
    """Secure file upload handler"""
    
    ALLOWED_EXTENSIONS = {
        'image': ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp'],
        'document': ['.pdf', '.doc', '.docx', '.txt', '.rtf'],
        'archive': ['.zip', '.tar', '.gz'],
    }
    
    ALLOWED_MIME_TYPES = {
        'image': ['image/jpeg', 'image/png', 'image/gif', 'image/bmp', 'image/webp'],
        'document': [
            'application/pdf',
            'application/msword',
            'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
            'text/plain',
            'application/rtf'
        ],
        'archive': ['application/zip', 'application/x-tar', 'application/gzip'],
    }
    
    MAX_FILE_SIZE = 10 * 1024 * 1024  # 10MB
    
    def __init__(self, file_category='document'):
        self.file_category = file_category
        self.allowed_extensions = self.ALLOWED_EXTENSIONS.get(file_category, [])
        self.allowed_mime_types = self.ALLOWED_MIME_TYPES.get(file_category, [])
    
    def validate_file(self, uploaded_file):
        """Comprehensive file validation"""
        # Check file size
        self.validate_file_size(uploaded_file)
        
        # Check file extension
        self.validate_file_extension(uploaded_file.name)
        
        # Check MIME type
        self.validate_mime_type(uploaded_file)
        
        # Check file content
        self.validate_file_content(uploaded_file)
        
        # Scan for malware (basic)
        self.scan_for_malware(uploaded_file)
        
        return True
    
    def validate_file_size(self, uploaded_file):
        """Validate file size"""
        if uploaded_file.size > self.MAX_FILE_SIZE:
            raise ValidationError(
                f'File size ({uploaded_file.size} bytes) exceeds maximum allowed size '
                f'({self.MAX_FILE_SIZE} bytes).'
            )
    
    def validate_file_extension(self, filename):
        """Validate file extension"""
        _, ext = os.path.splitext(filename.lower())
        
        if ext not in self.allowed_extensions:
            raise ValidationError(
                f'File extension {ext} is not allowed. '
                f'Allowed extensions: {", ".join(self.allowed_extensions)}'
            )
    
    def validate_mime_type(self, uploaded_file):
        """Validate MIME type using python-magic"""
        uploaded_file.seek(0)
        file_content = uploaded_file.read(1024)  # Read first 1KB
        uploaded_file.seek(0)  # Reset file pointer
        
        try:
            mime_type = magic.from_buffer(file_content, mime=True)
        except:
            # Fallback to Django's content_type
            mime_type = uploaded_file.content_type
        
        if mime_type not in self.allowed_mime_types:
            raise ValidationError(
                f'File type {mime_type} is not allowed. '
                f'Allowed types: {", ".join(self.allowed_mime_types)}'
            )
    
    def validate_file_content(self, uploaded_file):
        """Validate file content for malicious patterns"""
        uploaded_file.seek(0)
        
        # Read file in chunks to avoid memory issues
        chunk_size = 8192
        suspicious_patterns = [
            b'<script',
            b'javascript:',
            b'eval(',
            b'document.write',
            b'<iframe',
            b'<?php',
            b'<%',
            b'exec(',
            b'system(',
            b'shell_exec(',
        ]
        
        while True:
            chunk = uploaded_file.read(chunk_size)
            if not chunk:
                break
            
            chunk_lower = chunk.lower()
            for pattern in suspicious_patterns:
                if pattern in chunk_lower:
                    raise ValidationError('File contains potentially malicious content.')
        
        uploaded_file.seek(0)  # Reset file pointer
    
    def scan_for_malware(self, uploaded_file):
        """Basic malware scanning"""
        # Calculate file hash
        uploaded_file.seek(0)
        file_hash = hashlib.sha256()
        
        for chunk in iter(lambda: uploaded_file.read(4096), b""):
            file_hash.update(chunk)
        
        uploaded_file.seek(0)  # Reset file pointer
        
        # Check against known malware hashes (simplified)
        malware_hashes = [
            # Add known malware file hashes here
        ]
        
        if file_hash.hexdigest() in malware_hashes:
            raise ValidationError('File identified as malware.')
    
    def generate_safe_filename(self, original_filename):
        """Generate safe filename"""
        # Remove directory path
        filename = os.path.basename(original_filename)
        
        # Remove dangerous characters
        safe_chars = '-_.() abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
        filename = ''.join(c for c in filename if c in safe_chars)
        
        # Limit filename length
        name, ext = os.path.splitext(filename)
        if len(name) > 100:
            name = name[:100]
        
        # Add timestamp to prevent conflicts
        import time
        timestamp = str(int(time.time()))
        
        return f"{name}_{timestamp}{ext}"

# forms.py - Secure file upload form
from django import forms
from .utils import SecureFileUpload

class SecureFileUploadForm(forms.Form):
    """Form with secure file upload"""
    
    title = forms.CharField(max_length=200)
    description = forms.CharField(widget=forms.Textarea, required=False)
    document = forms.FileField()
    
    def __init__(self, *args, **kwargs):
        self.file_category = kwargs.pop('file_category', 'document')
        super().__init__(*args, **kwargs)
        
        self.file_validator = SecureFileUpload(self.file_category)
    
    def clean_document(self):
        """Validate uploaded document"""
        document = self.cleaned_data.get('document')
        
        if document:
            # Perform security validation
            self.file_validator.validate_file(document)
            
            # Generate safe filename
            safe_filename = self.file_validator.generate_safe_filename(document.name)
            document.name = safe_filename
        
        return document
    
    def save(self, upload_path=None):
        """Save uploaded file securely"""
        if not upload_path:
            upload_path = os.path.join(settings.MEDIA_ROOT, 'secure_uploads')
        
        # Ensure upload directory exists
        os.makedirs(upload_path, exist_ok=True)
        
        document = self.cleaned_data['document']
        file_path = os.path.join(upload_path, document.name)
        
        # Save file
        with open(file_path, 'wb+') as destination:
            for chunk in document.chunks():
                destination.write(chunk)
        
        # Set secure file permissions
        os.chmod(file_path, 0o644)
        
        return file_path

# Virus scanning integration (example with ClamAV)
class VirusScanner:
    """Virus scanning utility"""
    
    def __init__(self):
        self.enabled = getattr(settings, 'VIRUS_SCANNING_ENABLED', False)
    
    def scan_file(self, file_path):
        """Scan file for viruses"""
        if not self.enabled:
            return True
        
        try:
            import pyclamd
            cd = pyclamd.ClamdUnixSocket()
            
            # Scan file
            result = cd.scan_file(file_path)
            
            if result is None:
                return True  # Clean file
            else:
                return False  # Virus detected
                
        except ImportError:
            # ClamAV not available, skip scanning
            return True
        except Exception as e:
            # Log error and allow file (fail open)
            import logging
            logger = logging.getLogger('security')
            logger.error(f'Virus scanning failed: {e}')
            return True

Session Security

Secure Session Management

# settings.py - Secure session configuration
SESSION_COOKIE_SECURE = True  # HTTPS only
SESSION_COOKIE_HTTPONLY = True  # Prevent JavaScript access
SESSION_COOKIE_SAMESITE = 'Strict'  # Strict same-site policy
SESSION_COOKIE_AGE = 3600  # 1 hour session timeout
SESSION_EXPIRE_AT_BROWSER_CLOSE = True
SESSION_SAVE_EVERY_REQUEST = True  # Update session on every request

# Custom session key
SESSION_COOKIE_NAME = 'sessionid'

# Session engine (database-backed for security)
SESSION_ENGINE = 'django.contrib.sessions.backends.db'

# middleware.py - Session security middleware
from django.utils import timezone
from django.contrib.auth import logout
from django.shortcuts import redirect

class SessionSecurityMiddleware:
    """Enhanced session security middleware"""
    
    def __init__(self, get_response):
        self.get_response = get_response
    
    def __call__(self, request):
        # Check session security
        if request.user.is_authenticated:
            self.validate_session_security(request)
        
        response = self.get_response(request)
        
        # Update session security info
        if request.user.is_authenticated:
            self.update_session_info(request)
        
        return response
    
    def validate_session_security(self, request):
        """Validate session security"""
        session = request.session
        
        # Check for session hijacking
        if self.detect_session_hijacking(request, session):
            logout(request)
            return redirect('login')
        
        # Check session timeout
        if self.is_session_expired(session):
            logout(request)
            return redirect('login')
    
    def detect_session_hijacking(self, request, session):
        """Detect potential session hijacking"""
        current_ip = get_client_ip(request)
        current_user_agent = request.META.get('HTTP_USER_AGENT', '')
        
        # Check IP address consistency
        session_ip = session.get('security_ip')
        if session_ip and session_ip != current_ip:
            return True
        
        # Check User-Agent consistency
        session_user_agent = session.get('security_user_agent')
        if session_user_agent and session_user_agent != current_user_agent:
            return True
        
        return False
    
    def is_session_expired(self, session):
        """Check if session has expired"""
        last_activity = session.get('last_activity')
        if not last_activity:
            return False
        
        # Convert string back to datetime
        from datetime import datetime
        last_activity = datetime.fromisoformat(last_activity)
        
        # Check if session has been inactive too long
        inactive_time = timezone.now() - last_activity
        max_inactive_time = timezone.timedelta(minutes=30)  # 30 minutes
        
        return inactive_time > max_inactive_time
    
    def update_session_info(self, request):
        """Update session security information"""
        session = request.session
        
        # Store security information
        session['security_ip'] = get_client_ip(request)
        session['security_user_agent'] = request.META.get('HTTP_USER_AGENT', '')
        session['last_activity'] = timezone.now().isoformat()
        
        # Regenerate session key periodically
        last_regeneration = session.get('last_key_regeneration')
        if not last_regeneration:
            session.cycle_key()
            session['last_key_regeneration'] = timezone.now().isoformat()
        else:
            last_regen = datetime.fromisoformat(last_regeneration)
            if timezone.now() - last_regen > timezone.timedelta(hours=1):
                session.cycle_key()
                session['last_key_regeneration'] = timezone.now().isoformat()

Form security requires a multi-layered approach combining CSRF protection, input validation, rate limiting, bot detection, secure file uploads, and session management. By implementing these comprehensive security measures, you can protect your applications from common attacks while maintaining usability and performance. Regular security audits and staying updated with the latest security practices are essential for maintaining robust form security.