URLs and Views

File Uploads

File uploads are a common requirement in web applications. Django provides robust support for handling file uploads securely and efficiently, with built-in validation, processing, and storage capabilities.

File Uploads

File uploads are a common requirement in web applications. Django provides robust support for handling file uploads securely and efficiently, with built-in validation, processing, and storage capabilities.

Basic File Upload Handling

Simple File Upload View

from django.shortcuts import render, redirect
from django.contrib import messages
from django.contrib.auth.decorators import login_required
from django.core.files.storage import default_storage
from django.core.files.base import ContentFile
from django.http import JsonResponse
import os
import mimetypes

@login_required
def simple_file_upload(request):
    """Basic file upload handling"""
    if request.method == 'POST':
        uploaded_file = request.FILES.get('file')
        
        if not uploaded_file:
            messages.error(request, 'No file selected.')
            return render(request, 'uploads/simple_form.html')
        
        # Basic validation
        if uploaded_file.size > 5 * 1024 * 1024:  # 5MB limit
            messages.error(request, 'File size must be less than 5MB.')
            return render(request, 'uploads/simple_form.html')
        
        # Save file
        file_path = default_storage.save(
            f'uploads/{request.user.id}/{uploaded_file.name}',
            ContentFile(uploaded_file.read())
        )
        
        # Create database record
        user_file = UserFile.objects.create(
            user=request.user,
            file=file_path,
            original_filename=uploaded_file.name,
            file_size=uploaded_file.size,
            content_type=uploaded_file.content_type
        )
        
        messages.success(request, f'File "{uploaded_file.name}" uploaded successfully!')
        return redirect('uploads:file_detail', pk=user_file.pk)
    
    return render(request, 'uploads/simple_form.html')

def multiple_file_upload(request):
    """Handle multiple file uploads"""
    if request.method == 'POST':
        files = request.FILES.getlist('files')
        
        if not files:
            messages.error(request, 'No files selected.')
            return render(request, 'uploads/multiple_form.html')
        
        uploaded_files = []
        errors = []
        
        for uploaded_file in files:
            try:
                # Validate each file
                if uploaded_file.size > 10 * 1024 * 1024:  # 10MB per file
                    errors.append(f'{uploaded_file.name}: File too large (max 10MB)')
                    continue
                
                # Check file type
                allowed_types = ['image/jpeg', 'image/png', 'image/gif', 'application/pdf']
                if uploaded_file.content_type not in allowed_types:
                    errors.append(f'{uploaded_file.name}: File type not allowed')
                    continue
                
                # Save file
                file_path = default_storage.save(
                    f'uploads/{request.user.id}/{uploaded_file.name}',
                    ContentFile(uploaded_file.read())
                )
                
                # Create database record
                user_file = UserFile.objects.create(
                    user=request.user,
                    file=file_path,
                    original_filename=uploaded_file.name,
                    file_size=uploaded_file.size,
                    content_type=uploaded_file.content_type
                )
                
                uploaded_files.append(user_file)
                
            except Exception as e:
                errors.append(f'{uploaded_file.name}: {str(e)}')
        
        # Show results
        if uploaded_files:
            messages.success(request, f'{len(uploaded_files)} files uploaded successfully!')
        
        for error in errors:
            messages.error(request, error)
        
        if uploaded_files:
            return redirect('uploads:file_list')
    
    return render(request, 'uploads/multiple_form.html')

File Upload Models

# models.py
from django.db import models
from django.contrib.auth.models import User
from django.core.validators import FileExtensionValidator
import os
import uuid

def user_upload_path(instance, filename):
    """Generate upload path for user files"""
    # Create unique filename to avoid conflicts
    ext = filename.split('.')[-1]
    filename = f'{uuid.uuid4().hex}.{ext}'
    return f'uploads/{instance.user.id}/{filename}'

def image_upload_path(instance, filename):
    """Generate upload path for images with date organization"""
    ext = filename.split('.')[-1]
    filename = f'{uuid.uuid4().hex}.{ext}'
    return f'images/{instance.created_at.year}/{instance.created_at.month:02d}/{filename}'

class UserFile(models.Model):
    """Model for user-uploaded files"""
    user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='files')
    file = models.FileField(upload_to=user_upload_path)
    original_filename = models.CharField(max_length=255)
    file_size = models.PositiveIntegerField()
    content_type = models.CharField(max_length=100)
    description = models.TextField(blank=True)
    
    # Metadata
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    is_public = models.BooleanField(default=False)
    download_count = models.PositiveIntegerField(default=0)
    
    class Meta:
        ordering = ['-created_at']
    
    def __str__(self):
        return self.original_filename
    
    def get_file_extension(self):
        return os.path.splitext(self.original_filename)[1].lower()
    
    def is_image(self):
        return self.content_type.startswith('image/')
    
    def get_file_size_display(self):
        """Human-readable file size"""
        size = self.file_size
        for unit in ['B', 'KB', 'MB', 'GB']:
            if size < 1024:
                return f"{size:.1f} {unit}"
            size /= 1024
        return f"{size:.1f} TB"

class ImageUpload(models.Model):
    """Model specifically for image uploads with processing"""
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    title = models.CharField(max_length=200, blank=True)
    
    # Original image
    image = models.ImageField(
        upload_to=image_upload_path,
        validators=[FileExtensionValidator(allowed_extensions=['jpg', 'jpeg', 'png', 'gif'])]
    )
    
    # Processed versions (auto-generated)
    thumbnail = models.ImageField(upload_to='thumbnails/', blank=True, null=True)
    medium = models.ImageField(upload_to='medium/', blank=True, null=True)
    
    # Metadata
    width = models.PositiveIntegerField(blank=True, null=True)
    height = models.PositiveIntegerField(blank=True, null=True)
    alt_text = models.CharField(max_length=200, blank=True)
    
    created_at = models.DateTimeField(auto_now_add=True)
    
    def save(self, *args, **kwargs):
        super().save(*args, **kwargs)
        
        if self.image:
            # Get image dimensions
            from PIL import Image
            with Image.open(self.image.path) as img:
                self.width, self.height = img.size
            
            # Generate processed versions
            self.create_thumbnail()
            self.create_medium_version()
            
            # Save again with dimensions
            super().save(update_fields=['width', 'height', 'thumbnail', 'medium'])
    
    def create_thumbnail(self, size=(150, 150)):
        """Create thumbnail version"""
        if not self.image:
            return
        
        from PIL import Image
        import io
        from django.core.files.base import ContentFile
        
        # Open image
        image = Image.open(self.image.path)
        
        # Create thumbnail
        image.thumbnail(size, Image.Resampling.LANCZOS)
        
        # Save to BytesIO
        thumb_io = io.BytesIO()
        image.save(thumb_io, format='JPEG', quality=85)
        thumb_io.seek(0)
        
        # Generate filename
        thumb_name = f"thumb_{os.path.basename(self.image.name)}"
        thumb_name = os.path.splitext(thumb_name)[0] + '.jpg'
        
        # Save to model
        self.thumbnail.save(
            thumb_name,
            ContentFile(thumb_io.read()),
            save=False
        )
    
    def create_medium_version(self, size=(800, 600)):
        """Create medium-sized version"""
        if not self.image:
            return
        
        from PIL import Image
        import io
        from django.core.files.base import ContentFile
        
        # Open image
        image = Image.open(self.image.path)
        
        # Resize maintaining aspect ratio
        image.thumbnail(size, Image.Resampling.LANCZOS)
        
        # Save to BytesIO
        medium_io = io.BytesIO()
        image.save(medium_io, format='JPEG', quality=90)
        medium_io.seek(0)
        
        # Generate filename
        medium_name = f"medium_{os.path.basename(self.image.name)}"
        medium_name = os.path.splitext(medium_name)[0] + '.jpg'
        
        # Save to model
        self.medium.save(
            medium_name,
            ContentFile(medium_io.read()),
            save=False
        )

class Document(models.Model):
    """Model for document uploads with validation"""
    DOCUMENT_TYPES = [
        ('pdf', 'PDF Document'),
        ('doc', 'Word Document'),
        ('xls', 'Excel Spreadsheet'),
        ('ppt', 'PowerPoint Presentation'),
        ('txt', 'Text File'),
    ]
    
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    title = models.CharField(max_length=200)
    document_type = models.CharField(max_length=10, choices=DOCUMENT_TYPES)
    
    file = models.FileField(
        upload_to='documents/',
        validators=[
            FileExtensionValidator(
                allowed_extensions=['pdf', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx', 'txt']
            )
        ]
    )
    
    description = models.TextField(blank=True)
    tags = models.CharField(max_length=200, blank=True, help_text="Comma-separated tags")
    
    # Access control
    is_private = models.BooleanField(default=True)
    allowed_users = models.ManyToManyField(User, blank=True, related_name='accessible_documents')
    
    created_at = models.DateTimeField(auto_now_add=True)
    
    def save(self, *args, **kwargs):
        # Auto-detect document type from file extension
        if self.file:
            ext = os.path.splitext(self.file.name)[1].lower()
            type_mapping = {
                '.pdf': 'pdf',
                '.doc': 'doc', '.docx': 'doc',
                '.xls': 'xls', '.xlsx': 'xls',
                '.ppt': 'ppt', '.pptx': 'ppt',
                '.txt': 'txt',
            }
            self.document_type = type_mapping.get(ext, 'txt')
        
        super().save(*args, **kwargs)

Advanced File Upload Features

AJAX File Upload

from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
import json

@login_required
@require_http_methods(["POST"])
def ajax_file_upload(request):
    """Handle AJAX file uploads"""
    try:
        uploaded_file = request.FILES.get('file')
        
        if not uploaded_file:
            return JsonResponse({'error': 'No file provided'}, status=400)
        
        # Validate file
        max_size = 10 * 1024 * 1024  # 10MB
        if uploaded_file.size > max_size:
            return JsonResponse({
                'error': f'File too large. Maximum size is {max_size // (1024*1024)}MB'
            }, status=400)
        
        # Validate file type
        allowed_types = ['image/jpeg', 'image/png', 'image/gif', 'application/pdf']
        if uploaded_file.content_type not in allowed_types:
            return JsonResponse({
                'error': 'File type not allowed'
            }, status=400)
        
        # Process and save file
        user_file = UserFile.objects.create(
            user=request.user,
            file=uploaded_file,
            original_filename=uploaded_file.name,
            file_size=uploaded_file.size,
            content_type=uploaded_file.content_type
        )
        
        # Return success response
        return JsonResponse({
            'success': True,
            'file_id': user_file.id,
            'filename': user_file.original_filename,
            'file_size': user_file.get_file_size_display(),
            'file_url': user_file.file.url,
            'is_image': user_file.is_image(),
            'created_at': user_file.created_at.isoformat()
        })
        
    except Exception as e:
        return JsonResponse({'error': str(e)}, status=500)

@login_required
def chunked_upload_start(request):
    """Start chunked upload session"""
    if request.method == 'POST':
        try:
            data = json.loads(request.body)
            
            filename = data.get('filename')
            file_size = data.get('file_size')
            chunk_size = data.get('chunk_size', 1024 * 1024)  # 1MB default
            
            if not filename or not file_size:
                return JsonResponse({'error': 'Filename and file_size required'}, status=400)
            
            # Create upload session
            upload_session = ChunkedUploadSession.objects.create(
                user=request.user,
                filename=filename,
                file_size=file_size,
                chunk_size=chunk_size
            )
            
            return JsonResponse({
                'upload_id': upload_session.id,
                'chunk_size': chunk_size,
                'total_chunks': (file_size + chunk_size - 1) // chunk_size
            })
            
        except json.JSONDecodeError:
            return JsonResponse({'error': 'Invalid JSON'}, status=400)
    
    return JsonResponse({'error': 'POST method required'}, status=405)

@login_required
def chunked_upload_chunk(request, upload_id):
    """Upload individual chunk"""
    if request.method == 'POST':
        try:
            upload_session = ChunkedUploadSession.objects.get(
                id=upload_id,
                user=request.user,
                status='active'
            )
            
            chunk_number = int(request.POST.get('chunk_number'))
            chunk_data = request.FILES.get('chunk')
            
            if not chunk_data:
                return JsonResponse({'error': 'No chunk data'}, status=400)
            
            # Save chunk
            chunk_path = f'chunks/{upload_id}/{chunk_number}'
            default_storage.save(chunk_path, chunk_data)
            
            # Update session
            upload_session.uploaded_chunks.append(chunk_number)
            upload_session.save()
            
            # Check if upload is complete
            total_chunks = (upload_session.file_size + upload_session.chunk_size - 1) // upload_session.chunk_size
            
            if len(upload_session.uploaded_chunks) == total_chunks:
                # Combine chunks
                final_file = combine_chunks(upload_session)
                
                # Create final file record
                user_file = UserFile.objects.create(
                    user=request.user,
                    file=final_file,
                    original_filename=upload_session.filename,
                    file_size=upload_session.file_size
                )
                
                # Clean up
                upload_session.status = 'completed'
                upload_session.save()
                cleanup_chunks(upload_id)
                
                return JsonResponse({
                    'success': True,
                    'completed': True,
                    'file_id': user_file.id,
                    'file_url': user_file.file.url
                })
            else:
                return JsonResponse({
                    'success': True,
                    'completed': False,
                    'uploaded_chunks': len(upload_session.uploaded_chunks),
                    'total_chunks': total_chunks
                })
                
        except ChunkedUploadSession.DoesNotExist:
            return JsonResponse({'error': 'Upload session not found'}, status=404)
        except Exception as e:
            return JsonResponse({'error': str(e)}, status=500)
    
    return JsonResponse({'error': 'POST method required'}, status=405)

def combine_chunks(upload_session):
    """Combine uploaded chunks into final file"""
    import tempfile
    
    # Create temporary file
    temp_file = tempfile.NamedTemporaryFile(delete=False)
    
    try:
        # Combine chunks in order
        for chunk_number in sorted(upload_session.uploaded_chunks):
            chunk_path = f'chunks/{upload_session.id}/{chunk_number}'
            
            with default_storage.open(chunk_path, 'rb') as chunk_file:
                temp_file.write(chunk_file.read())
        
        temp_file.close()
        
        # Save final file
        final_path = f'uploads/{upload_session.user.id}/{upload_session.filename}'
        
        with open(temp_file.name, 'rb') as final_file:
            return default_storage.save(final_path, ContentFile(final_file.read()))
    
    finally:
        # Clean up temp file
        os.unlink(temp_file.name)

def cleanup_chunks(upload_id):
    """Clean up chunk files"""
    chunk_dir = f'chunks/{upload_id}/'
    
    # List and delete all chunk files
    try:
        dirs, files = default_storage.listdir(chunk_dir)
        for file in files:
            default_storage.delete(f'{chunk_dir}{file}')
    except:
        pass  # Directory might not exist

Image Processing and Validation

from PIL import Image, ImageOps, ImageFilter
from django.core.files.base import ContentFile
import io

def process_image_upload(request):
    """Advanced image processing during upload"""
    if request.method == 'POST':
        uploaded_file = request.FILES.get('image')
        
        if not uploaded_file:
            return JsonResponse({'error': 'No image provided'}, status=400)
        
        try:
            # Validate image
            image = Image.open(uploaded_file)
            
            # Check image format
            if image.format not in ['JPEG', 'PNG', 'GIF']:
                return JsonResponse({'error': 'Invalid image format'}, status=400)
            
            # Check dimensions
            width, height = image.size
            if width > 4000 or height > 4000:
                return JsonResponse({'error': 'Image too large (max 4000x4000)'}, status=400)
            
            if width < 100 or height < 100:
                return JsonResponse({'error': 'Image too small (min 100x100)'}, status=400)
            
            # Process image
            processed_images = process_image_variants(image, uploaded_file.name)
            
            # Create database record
            image_upload = ImageUpload.objects.create(
                user=request.user,
                title=request.POST.get('title', ''),
                image=processed_images['original'],
                thumbnail=processed_images['thumbnail'],
                medium=processed_images['medium'],
                width=width,
                height=height,
                alt_text=request.POST.get('alt_text', '')
            )
            
            return JsonResponse({
                'success': True,
                'image_id': image_upload.id,
                'original_url': image_upload.image.url,
                'thumbnail_url': image_upload.thumbnail.url,
                'medium_url': image_upload.medium.url,
                'dimensions': f'{width}x{height}'
            })
            
        except Exception as e:
            return JsonResponse({'error': f'Error processing image: {str(e)}'}, status=500)
    
    return render(request, 'uploads/image_form.html')

def process_image_variants(image, filename):
    """Create multiple variants of uploaded image"""
    variants = {}
    
    # Convert to RGB if necessary
    if image.mode in ('RGBA', 'LA', 'P'):
        image = image.convert('RGB')
    
    # Original (optimized)
    original_io = io.BytesIO()
    image.save(original_io, format='JPEG', quality=90, optimize=True)
    original_io.seek(0)
    
    original_name = f"original_{filename}"
    original_name = os.path.splitext(original_name)[0] + '.jpg'
    variants['original'] = ContentFile(original_io.read(), name=original_name)
    
    # Thumbnail (150x150, cropped)
    thumb_image = ImageOps.fit(image, (150, 150), Image.Resampling.LANCZOS)
    thumb_io = io.BytesIO()
    thumb_image.save(thumb_io, format='JPEG', quality=85)
    thumb_io.seek(0)
    
    thumb_name = f"thumb_{filename}"
    thumb_name = os.path.splitext(thumb_name)[0] + '.jpg'
    variants['thumbnail'] = ContentFile(thumb_io.read(), name=thumb_name)
    
    # Medium (800x600, maintain aspect ratio)
    medium_image = image.copy()
    medium_image.thumbnail((800, 600), Image.Resampling.LANCZOS)
    medium_io = io.BytesIO()
    medium_image.save(medium_io, format='JPEG', quality=85)
    medium_io.seek(0)
    
    medium_name = f"medium_{filename}"
    medium_name = os.path.splitext(medium_name)[0] + '.jpg'
    variants['medium'] = ContentFile(medium_io.read(), name=medium_name)
    
    return variants

def apply_image_filters(request, image_id):
    """Apply filters to uploaded image"""
    image_upload = get_object_or_404(ImageUpload, id=image_id, user=request.user)
    
    if request.method == 'POST':
        filter_type = request.POST.get('filter')
        
        # Open original image
        image = Image.open(image_upload.image.path)
        
        # Apply filter
        if filter_type == 'blur':
            filtered_image = image.filter(ImageFilter.BLUR)
        elif filter_type == 'sharpen':
            filtered_image = image.filter(ImageFilter.SHARPEN)
        elif filter_type == 'grayscale':
            filtered_image = image.convert('L').convert('RGB')
        elif filter_type == 'sepia':
            filtered_image = apply_sepia_filter(image)
        else:
            return JsonResponse({'error': 'Invalid filter type'}, status=400)
        
        # Save filtered image
        filtered_io = io.BytesIO()
        filtered_image.save(filtered_io, format='JPEG', quality=90)
        filtered_io.seek(0)
        
        filtered_name = f"filtered_{filter_type}_{os.path.basename(image_upload.image.name)}"
        
        # Create new image record
        filtered_upload = ImageUpload.objects.create(
            user=request.user,
            title=f"{image_upload.title} ({filter_type})",
            image=ContentFile(filtered_io.read(), name=filtered_name),
            alt_text=image_upload.alt_text
        )
        
        return JsonResponse({
            'success': True,
            'filtered_image_id': filtered_upload.id,
            'filtered_url': filtered_upload.image.url
        })
    
    return render(request, 'uploads/image_filters.html', {'image': image_upload})

def apply_sepia_filter(image):
    """Apply sepia tone filter"""
    pixels = image.load()
    width, height = image.size
    
    for y in range(height):
        for x in range(width):
            r, g, b = pixels[x, y]
            
            # Sepia formula
            tr = int(0.393 * r + 0.769 * g + 0.189 * b)
            tg = int(0.349 * r + 0.686 * g + 0.168 * b)
            tb = int(0.272 * r + 0.534 * g + 0.131 * b)
            
            # Clamp values
            pixels[x, y] = (min(255, tr), min(255, tg), min(255, tb))
    
    return image

File Upload Security

Validation and Sanitization

import magic
from django.core.exceptions import ValidationError
from django.core.files.uploadedfile import UploadedFile

def validate_file_type(uploaded_file):
    """Validate file type using python-magic"""
    # Read file content to determine actual type
    file_content = uploaded_file.read()
    uploaded_file.seek(0)  # Reset file pointer
    
    # Use python-magic to detect file type
    file_type = magic.from_buffer(file_content, mime=True)
    
    # Define allowed types
    allowed_types = {
        'image/jpeg': ['.jpg', '.jpeg'],
        'image/png': ['.png'],
        'image/gif': ['.gif'],
        'application/pdf': ['.pdf'],
        'text/plain': ['.txt'],
        'application/msword': ['.doc'],
        'application/vnd.openxmlformats-officedocument.wordprocessingml.document': ['.docx'],
    }
    
    # Check if detected type is allowed
    if file_type not in allowed_types:
        raise ValidationError(f'File type {file_type} is not allowed')
    
    # Check if file extension matches detected type
    file_ext = os.path.splitext(uploaded_file.name)[1].lower()
    if file_ext not in allowed_types[file_type]:
        raise ValidationError(f'File extension {file_ext} does not match file type {file_type}')
    
    return True

def sanitize_filename(filename):
    """Sanitize uploaded filename"""
    import re
    
    # Remove path components
    filename = os.path.basename(filename)
    
    # Remove or replace dangerous characters
    filename = re.sub(r'[^\w\s.-]', '', filename)
    
    # Replace spaces with underscores
    filename = re.sub(r'\s+', '_', filename)
    
    # Limit length
    name, ext = os.path.splitext(filename)
    if len(name) > 100:
        name = name[:100]
    
    return f"{name}{ext}"

def scan_file_for_malware(uploaded_file):
    """Basic malware scanning (placeholder)"""
    # In production, integrate with antivirus service
    # This is a simplified example
    
    suspicious_patterns = [
        b'<script',
        b'javascript:',
        b'vbscript:',
        b'<?php',
        b'<%',
    ]
    
    # Read file content
    content = uploaded_file.read()
    uploaded_file.seek(0)
    
    # Check for suspicious patterns
    for pattern in suspicious_patterns:
        if pattern in content.lower():
            raise ValidationError('File contains suspicious content')
    
    return True

@login_required
def secure_file_upload(request):
    """Secure file upload with comprehensive validation"""
    if request.method == 'POST':
        uploaded_file = request.FILES.get('file')
        
        if not uploaded_file:
            messages.error(request, 'No file selected.')
            return render(request, 'uploads/secure_form.html')
        
        try:
            # File size validation
            max_size = 10 * 1024 * 1024  # 10MB
            if uploaded_file.size > max_size:
                raise ValidationError(f'File size exceeds {max_size // (1024*1024)}MB limit')
            
            # File type validation
            validate_file_type(uploaded_file)
            
            # Malware scanning
            scan_file_for_malware(uploaded_file)
            
            # Sanitize filename
            safe_filename = sanitize_filename(uploaded_file.name)
            
            # Generate unique filename to prevent conflicts
            unique_filename = f"{uuid.uuid4().hex}_{safe_filename}"
            
            # Save file with secure path
            file_path = f'secure_uploads/{request.user.id}/{unique_filename}'
            saved_path = default_storage.save(file_path, uploaded_file)
            
            # Create database record
            secure_file = SecureFile.objects.create(
                user=request.user,
                file=saved_path,
                original_filename=uploaded_file.name,
                safe_filename=safe_filename,
                file_size=uploaded_file.size,
                content_type=uploaded_file.content_type,
                checksum=calculate_file_checksum(uploaded_file)
            )
            
            messages.success(request, 'File uploaded successfully!')
            return redirect('uploads:secure_file_detail', pk=secure_file.pk)
            
        except ValidationError as e:
            messages.error(request, str(e))
        except Exception as e:
            messages.error(request, f'Upload failed: {str(e)}')
    
    return render(request, 'uploads/secure_form.html')

def calculate_file_checksum(uploaded_file):
    """Calculate SHA-256 checksum of uploaded file"""
    import hashlib
    
    sha256_hash = hashlib.sha256()
    
    # Read file in chunks to handle large files
    uploaded_file.seek(0)
    for chunk in iter(lambda: uploaded_file.read(4096), b""):
        sha256_hash.update(chunk)
    
    uploaded_file.seek(0)
    return sha256_hash.hexdigest()

Access Control and Download

from django.http import Http404, HttpResponse, FileResponse
from django.contrib.auth.decorators import login_required

@login_required
def secure_file_download(request, file_id):
    """Secure file download with access control"""
    try:
        user_file = UserFile.objects.get(id=file_id)
        
        # Check access permissions
        if not can_access_file(request.user, user_file):
            raise Http404("File not found")
        
        # Get file path
        file_path = user_file.file.path
        
        if not os.path.exists(file_path):
            raise Http404("File not found on disk")
        
        # Increment download counter
        user_file.download_count += 1
        user_file.save(update_fields=['download_count'])
        
        # Log download
        FileDownloadLog.objects.create(
            file=user_file,
            user=request.user,
            ip_address=request.META.get('REMOTE_ADDR'),
            user_agent=request.META.get('HTTP_USER_AGENT', '')
        )
        
        # Serve file
        response = FileResponse(
            open(file_path, 'rb'),
            content_type=user_file.content_type,
            as_attachment=True,
            filename=user_file.original_filename
        )
        
        # Add security headers
        response['X-Content-Type-Options'] = 'nosniff'
        response['X-Frame-Options'] = 'DENY'
        
        return response
        
    except UserFile.DoesNotExist:
        raise Http404("File not found")

def can_access_file(user, user_file):
    """Check if user can access the file"""
    # Owner can always access
    if user_file.user == user:
        return True
    
    # Public files can be accessed by anyone
    if user_file.is_public:
        return True
    
    # Check if user is in allowed users list
    if hasattr(user_file, 'allowed_users') and user in user_file.allowed_users.all():
        return True
    
    # Staff can access all files
    if user.is_staff:
        return True
    
    return False

def serve_protected_media(request, path):
    """Serve protected media files through Django"""
    # This view should be used with internal redirects (X-Accel-Redirect for Nginx)
    
    # Reconstruct full file path
    full_path = os.path.join(settings.MEDIA_ROOT, path)
    
    if not os.path.exists(full_path):
        raise Http404("File not found")
    
    # Extract file ID from path to check permissions
    # This assumes your file paths contain the file ID
    try:
        file_id = extract_file_id_from_path(path)
        user_file = UserFile.objects.get(id=file_id)
        
        if not can_access_file(request.user, user_file):
            raise Http404("File not found")
        
    except (UserFile.DoesNotExist, ValueError):
        raise Http404("File not found")
    
    # For production with Nginx, use X-Accel-Redirect
    if settings.USE_X_ACCEL_REDIRECT:
        response = HttpResponse()
        response['X-Accel-Redirect'] = f'/protected/{path}'
        response['Content-Type'] = user_file.content_type
        return response
    
    # For development, serve directly
    return FileResponse(
        open(full_path, 'rb'),
        content_type=user_file.content_type
    )

def extract_file_id_from_path(path):
    """Extract file ID from file path"""
    # Example: uploads/123/filename.jpg -> 123
    parts = path.split('/')
    if len(parts) >= 2:
        return int(parts[1])
    raise ValueError("Cannot extract file ID from path")

Testing File Uploads

# tests/test_file_uploads.py
from django.test import TestCase, Client
from django.core.files.uploadedfile import SimpleUploadedFile
from django.contrib.auth.models import User
import tempfile
import os

class FileUploadTests(TestCase):
    def setUp(self):
        self.client = Client()
        self.user = User.objects.create_user(
            username='testuser',
            email='test@example.com',
            password='testpass'
        )
        self.client.login(username='testuser', password='testpass')
    
    def test_simple_file_upload(self):
        """Test basic file upload"""
        # Create test file
        test_file = SimpleUploadedFile(
            "test.txt",
            b"Test file content",
            content_type="text/plain"
        )
        
        response = self.client.post('/uploads/simple/', {
            'file': test_file
        })
        
        self.assertEqual(response.status_code, 302)  # Redirect after success
        
        # Check file was created
        user_file = UserFile.objects.get(user=self.user)
        self.assertEqual(user_file.original_filename, 'test.txt')
        self.assertEqual(user_file.file_size, 17)
    
    def test_file_size_validation(self):
        """Test file size limits"""
        # Create large file
        large_content = b"x" * (6 * 1024 * 1024)  # 6MB
        large_file = SimpleUploadedFile(
            "large.txt",
            large_content,
            content_type="text/plain"
        )
        
        response = self.client.post('/uploads/simple/', {
            'file': large_file
        })
        
        # Should show error message
        self.assertContains(response, 'File size must be less than 5MB')
    
    def test_multiple_file_upload(self):
        """Test multiple file upload"""
        file1 = SimpleUploadedFile("test1.txt", b"Content 1", content_type="text/plain")
        file2 = SimpleUploadedFile("test2.txt", b"Content 2", content_type="text/plain")
        
        response = self.client.post('/uploads/multiple/', {
            'files': [file1, file2]
        })
        
        self.assertEqual(response.status_code, 302)
        self.assertEqual(UserFile.objects.filter(user=self.user).count(), 2)
    
    def test_ajax_file_upload(self):
        """Test AJAX file upload"""
        test_file = SimpleUploadedFile(
            "ajax_test.txt",
            b"AJAX test content",
            content_type="text/plain"
        )
        
        response = self.client.post('/uploads/ajax/', {
            'file': test_file
        }, HTTP_X_REQUESTED_WITH='XMLHttpRequest')
        
        self.assertEqual(response.status_code, 200)
        
        data = response.json()
        self.assertTrue(data['success'])
        self.assertEqual(data['filename'], 'ajax_test.txt')
    
    def test_image_upload_processing(self):
        """Test image upload with processing"""
        # Create test image
        from PIL import Image
        import io
        
        image = Image.new('RGB', (200, 200), color='red')
        image_io = io.BytesIO()
        image.save(image_io, format='JPEG')
        image_io.seek(0)
        
        test_image = SimpleUploadedFile(
            "test_image.jpg",
            image_io.read(),
            content_type="image/jpeg"
        )
        
        response = self.client.post('/uploads/image/', {
            'image': test_image,
            'title': 'Test Image'
        })
        
        self.assertEqual(response.status_code, 200)
        
        data = response.json()
        self.assertTrue(data['success'])
        self.assertIn('thumbnail_url', data)
        self.assertIn('medium_url', data)
    
    def test_secure_file_download(self):
        """Test secure file download"""
        # Create test file
        user_file = UserFile.objects.create(
            user=self.user,
            file=SimpleUploadedFile("download_test.txt", b"Download content"),
            original_filename="download_test.txt",
            file_size=16,
            content_type="text/plain"
        )
        
        response = self.client.get(f'/uploads/download/{user_file.id}/')
        
        self.assertEqual(response.status_code, 200)
        self.assertEqual(response['Content-Type'], 'text/plain')
        self.assertIn('attachment', response['Content-Disposition'])
    
    def test_unauthorized_file_access(self):
        """Test that users cannot access other users' files"""
        other_user = User.objects.create_user(
            username='otheruser',
            email='other@example.com',
            password='otherpass'
        )
        
        # Create file for other user
        other_file = UserFile.objects.create(
            user=other_user,
            file=SimpleUploadedFile("private.txt", b"Private content"),
            original_filename="private.txt",
            file_size=15,
            content_type="text/plain"
        )
        
        # Try to access as current user
        response = self.client.get(f'/uploads/download/{other_file.id}/')
        
        self.assertEqual(response.status_code, 404)  # Should not be found

File uploads require careful handling of security, validation, and user experience. Implementing proper validation, processing, and access control ensures your Django application can handle file uploads safely and efficiently while providing a good user experience.