Working with Files

Managing Media in Production

Production media management requires careful consideration of performance, security, scalability, and cost. This chapter covers best practices and strategies for handling files in production Django applications.

Managing Media in Production

Production media management requires careful consideration of performance, security, scalability, and cost. This chapter covers best practices and strategies for handling files in production Django applications.

Production Architecture

Typical Production Setup

# settings/production.py
import os

# Separate static and media handling
STATIC_URL = 'https://cdn.example.com/static/'
MEDIA_URL = 'https://cdn.example.com/media/'

# Cloud storage for media files
STORAGES = {
    "default": {
        "BACKEND": "storages.backends.s3boto3.S3Boto3Storage",
        "OPTIONS": {
            "bucket_name": os.environ.get("MEDIA_BUCKET_NAME"),
            "custom_domain": "cdn.example.com",
            "object_parameters": {
                "CacheControl": "max-age=86400",
            },
        },
    },
    "staticfiles": {
        "BACKEND": "storages.backends.s3boto3.S3StaticStorage",
        "OPTIONS": {
            "bucket_name": os.environ.get("STATIC_BUCKET_NAME"),
            "location": "static",
            "custom_domain": "static.example.com",
        },
    },
}

# Security settings
SECURE_MEDIA_URL = True
MEDIA_FILE_PERMISSIONS = 0o644
FILE_UPLOAD_PERMISSIONS = 0o644

Multi-Environment Configuration

# settings/base.py
class MediaConfig:
    """Base media configuration"""
    
    # File upload limits
    FILE_UPLOAD_MAX_MEMORY_SIZE = 5 * 1024 * 1024  # 5MB
    DATA_UPLOAD_MAX_MEMORY_SIZE = 10 * 1024 * 1024  # 10MB
    
    # Allowed file types
    ALLOWED_IMAGE_TYPES = ['image/jpeg', 'image/png', 'image/gif', 'image/webp']
    ALLOWED_DOCUMENT_TYPES = ['application/pdf', 'text/plain', 'application/msword']
    
    # File size limits by type
    MAX_IMAGE_SIZE = 10 * 1024 * 1024  # 10MB
    MAX_DOCUMENT_SIZE = 50 * 1024 * 1024  # 50MB

# settings/development.py
class DevelopmentMediaConfig(MediaConfig):
    MEDIA_ROOT = os.path.join(BASE_DIR, 'media')
    MEDIA_URL = '/media/'
    
    # Local file serving
    STORAGES = {
        "default": {
            "BACKEND": "django.core.files.storage.FileSystemStorage",
        },
    }

# settings/production.py
class ProductionMediaConfig(MediaConfig):
    # Cloud storage configuration
    STORAGES = {
        "default": {
            "BACKEND": "myapp.storage.ProductionMediaStorage",
        },
    }

File Upload Optimization

Chunked Upload Implementation

# views.py
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
from django.views import View
import os
import hashlib

@method_decorator(csrf_exempt, name='dispatch')
class ChunkedUploadView(View):
    """Handle large file uploads in chunks"""
    
    def post(self, request):
        chunk = request.FILES.get('chunk')
        chunk_number = int(request.POST.get('chunkNumber', 0))
        total_chunks = int(request.POST.get('totalChunks', 1))
        file_id = request.POST.get('fileId')
        
        if not all([chunk, file_id]):
            return JsonResponse({'error': 'Missing required parameters'}, status=400)
        
        # Create upload directory
        upload_dir = os.path.join(settings.MEDIA_ROOT, 'temp_uploads', file_id)
        os.makedirs(upload_dir, exist_ok=True)
        
        # Save chunk
        chunk_path = os.path.join(upload_dir, f'chunk_{chunk_number}')
        with open(chunk_path, 'wb') as f:
            for chunk_data in chunk.chunks():
                f.write(chunk_data)
        
        # Check if all chunks are uploaded
        if chunk_number == total_chunks - 1:
            final_file_path = self.assemble_chunks(upload_dir, total_chunks)
            return JsonResponse({
                'status': 'complete',
                'file_path': final_file_path
            })
        
        return JsonResponse({'status': 'chunk_uploaded'})
    
    def assemble_chunks(self, upload_dir, total_chunks):
        """Assemble uploaded chunks into final file"""
        final_file_path = os.path.join(upload_dir, 'assembled_file')
        
        with open(final_file_path, 'wb') as final_file:
            for i in range(total_chunks):
                chunk_path = os.path.join(upload_dir, f'chunk_{i}')
                with open(chunk_path, 'rb') as chunk_file:
                    final_file.write(chunk_file.read())
                os.remove(chunk_path)  # Clean up chunk
        
        return final_file_path

Asynchronous File Processing

# tasks.py (using Celery)
from celery import shared_task
from PIL import Image
import os

@shared_task
def process_uploaded_image(image_path, user_id):
    """Process uploaded image asynchronously"""
    
    try:
        # Open and process image
        with Image.open(image_path) as img:
            # Create thumbnails
            thumbnails = {
                'small': (150, 150),
                'medium': (300, 300),
                'large': (800, 600)
            }
            
            for size_name, dimensions in thumbnails.items():
                # Create thumbnail
                thumbnail = img.copy()
                thumbnail.thumbnail(dimensions, Image.Resampling.LANCZOS)
                
                # Save thumbnail
                thumb_path = f"{image_path}_{size_name}.jpg"
                thumbnail.save(thumb_path, 'JPEG', quality=85)
                
                # Upload to cloud storage
                upload_to_cloud.delay(thumb_path, f"thumbnails/{user_id}/")
        
        # Upload original to cloud storage
        upload_to_cloud.delay(image_path, f"images/{user_id}/")
        
        # Clean up local file
        os.remove(image_path)
        
    except Exception as e:
        # Log error and handle failure
        logger.error(f"Image processing failed: {e}")

@shared_task
def upload_to_cloud(file_path, cloud_path):
    """Upload file to cloud storage"""
    
    from django.core.files.storage import default_storage
    from django.core.files import File
    
    try:
        with open(file_path, 'rb') as f:
            django_file = File(f)
            cloud_name = default_storage.save(cloud_path, django_file)
            
        # Clean up local file
        os.remove(file_path)
        
        return cloud_name
        
    except Exception as e:
        logger.error(f"Cloud upload failed: {e}")
        raise

Security Implementation

File Validation and Sanitization

# validators.py
import magic
import os
from django.core.exceptions import ValidationError
from PIL import Image

class FileValidator:
    """Comprehensive file validation"""
    
    def __init__(self, allowed_types=None, max_size=None):
        self.allowed_types = allowed_types or []
        self.max_size = max_size
    
    def __call__(self, file):
        self.validate_size(file)
        self.validate_type(file)
        self.validate_content(file)
        self.scan_for_malware(file)
    
    def validate_size(self, file):
        """Validate file size"""
        if self.max_size and file.size > self.max_size:
            raise ValidationError(f"File size exceeds {self.max_size} bytes")
    
    def validate_type(self, file):
        """Validate file type using python-magic"""
        file.seek(0)
        file_header = file.read(1024)
        file.seek(0)
        
        detected_type = magic.from_buffer(file_header, mime=True)
        
        if self.allowed_types and detected_type not in self.allowed_types:
            raise ValidationError(f"File type {detected_type} not allowed")
    
    def validate_content(self, file):
        """Validate file content for images"""
        if file.content_type.startswith('image/'):
            try:
                # Verify image can be opened
                file.seek(0)
                with Image.open(file) as img:
                    img.verify()
                file.seek(0)
            except Exception:
                raise ValidationError("Invalid image file")
    
    def scan_for_malware(self, file):
        """Basic malware scanning"""
        file.seek(0)
        content = file.read(8192)  # Read first 8KB
        file.seek(0)
        
        # Check for suspicious patterns
        suspicious_patterns = [
            b'<script',
            b'javascript:',
            b'<?php',
            b'<%',
            b'eval(',
        ]
        
        content_lower = content.lower()
        for pattern in suspicious_patterns:
            if pattern in content_lower:
                raise ValidationError("Potentially malicious content detected")

# Usage in models
image_validator = FileValidator(
    allowed_types=['image/jpeg', 'image/png', 'image/gif'],
    max_size=10 * 1024 * 1024  # 10MB
)

class UserPhoto(models.Model):
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    photo = models.ImageField(
        upload_to='photos/',
        validators=[image_validator]
    )

Secure File Serving

# views.py
from django.http import HttpResponse, Http404, HttpResponseForbidden
from django.contrib.auth.decorators import login_required
from django.shortcuts import get_object_or_404
import mimetypes

@login_required
def serve_protected_file(request, file_id):
    """Serve protected files with access control"""
    
    # Get file object with permission check
    file_obj = get_object_or_404(
        ProtectedFile,
        id=file_id,
        user=request.user  # Only owner can access
    )
    
    # Check additional permissions
    if not file_obj.can_be_accessed_by(request.user):
        return HttpResponseForbidden()
    
    # For cloud storage, redirect to signed URL
    if hasattr(file_obj.file.storage, 'generate_signed_url'):
        signed_url = file_obj.file.storage.generate_signed_url(
            file_obj.file.name,
            expires_in=3600
        )
        return HttpResponseRedirect(signed_url)
    
    # For local files, serve through Django
    try:
        with file_obj.file.open('rb') as f:
            content = f.read()
        
        content_type, _ = mimetypes.guess_type(file_obj.file.name)
        response = HttpResponse(content, content_type=content_type)
        response['Content-Disposition'] = f'attachment; filename="{file_obj.filename}"'
        
        return response
        
    except FileNotFoundError:
        raise Http404("File not found")

# X-Accel-Redirect for Nginx
def serve_file_nginx(request, file_path):
    """Serve file using Nginx X-Accel-Redirect"""
    
    # Perform access control checks here
    if not user_can_access_file(request.user, file_path):
        return HttpResponseForbidden()
    
    response = HttpResponse()
    response['X-Accel-Redirect'] = f'/protected/{file_path}'
    response['Content-Type'] = ''  # Let Nginx determine
    
    return response

Performance Optimization

CDN Integration

# storage.py
from storages.backends.s3boto3 import S3Boto3Storage

class CDNStorage(S3Boto3Storage):
    """S3 storage with CDN integration"""
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.cdn_domain = getattr(settings, 'CDN_DOMAIN', None)
    
    def url(self, name):
        """Return CDN URL if available"""
        if self.cdn_domain:
            return f"https://{self.cdn_domain}/{name}"
        return super().url(name)

# settings.py
CDN_DOMAIN = 'cdn.example.com'

# CloudFront configuration
AWS_S3_OBJECT_PARAMETERS = {
    'CacheControl': 'max-age=86400',  # 24 hours
    'Expires': 'Thu, 31 Dec 2099 20:00:00 GMT',
}

Image Optimization Pipeline

# image_processing.py
from PIL import Image, ImageOpt
import io
from django.core.files.base import ContentFile

class ImageOptimizer:
    """Optimize images for web delivery"""
    
    def __init__(self):
        self.formats = {
            'JPEG': {'quality': 85, 'optimize': True},
            'PNG': {'optimize': True},
            'WEBP': {'quality': 80, 'method': 6},
        }
    
    def optimize_image(self, image_file, target_format='JPEG'):
        """Optimize image for web"""
        
        with Image.open(image_file) as img:
            # Convert to RGB if necessary
            if img.mode in ('RGBA', 'LA', 'P') and target_format == 'JPEG':
                img = img.convert('RGB')
            
            # Resize if too large
            max_dimension = 1920
            if max(img.size) > max_dimension:
                img.thumbnail((max_dimension, max_dimension), Image.Resampling.LANCZOS)
            
            # Save optimized image
            output = io.BytesIO()
            save_kwargs = self.formats.get(target_format, {})
            img.save(output, format=target_format, **save_kwargs)
            output.seek(0)
            
            return ContentFile(output.getvalue())
    
    def create_responsive_images(self, image_file):
        """Create multiple sizes for responsive design"""
        
        sizes = {
            'thumbnail': (150, 150),
            'small': (400, 300),
            'medium': (800, 600),
            'large': (1200, 900),
        }
        
        optimized_images = {}
        
        with Image.open(image_file) as img:
            for size_name, dimensions in sizes.items():
                # Create resized image
                resized = img.copy()
                resized.thumbnail(dimensions, Image.Resampling.LANCZOS)
                
                # Save as WebP for modern browsers
                output = io.BytesIO()
                resized.save(output, format='WEBP', quality=80, method=6)
                output.seek(0)
                
                optimized_images[size_name] = ContentFile(output.getvalue())
        
        return optimized_images

Monitoring and Analytics

File Usage Tracking

# models.py
class FileAccessLog(models.Model):
    """Track file access for analytics"""
    
    file = models.ForeignKey('MediaFile', on_delete=models.CASCADE)
    user = models.ForeignKey(User, on_delete=models.SET_NULL, null=True)
    ip_address = models.GenericIPAddressField()
    user_agent = models.TextField()
    accessed_at = models.DateTimeField(auto_now_add=True)
    
    class Meta:
        indexes = [
            models.Index(fields=['file', 'accessed_at']),
            models.Index(fields=['user', 'accessed_at']),
        ]

# middleware.py
class FileAccessMiddleware:
    """Track file access"""
    
    def __init__(self, get_response):
        self.get_response = get_response
    
    def __call__(self, request):
        response = self.get_response(request)
        
        # Track media file access
        if request.path.startswith('/media/'):
            self.log_file_access(request)
        
        return response
    
    def log_file_access(self, request):
        """Log file access asynchronously"""
        from .tasks import log_file_access_task
        
        log_file_access_task.delay(
            file_path=request.path,
            user_id=request.user.id if request.user.is_authenticated else None,
            ip_address=self.get_client_ip(request),
            user_agent=request.META.get('HTTP_USER_AGENT', '')
        )
    
    def get_client_ip(self, request):
        """Get client IP address"""
        x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
        if x_forwarded_for:
            ip = x_forwarded_for.split(',')[0]
        else:
            ip = request.META.get('REMOTE_ADDR')
        return ip

Storage Monitoring

# monitoring.py
import boto3
from django.core.management.base import BaseCommand
from django.core.mail import send_mail

class Command(BaseCommand):
    """Monitor storage usage and costs"""
    
    def handle(self, *args, **options):
        # Monitor S3 usage
        s3_usage = self.get_s3_usage()
        
        # Monitor costs
        costs = self.get_storage_costs()
        
        # Check thresholds
        if s3_usage['size_gb'] > 100:  # 100GB threshold
            self.send_alert(f"S3 usage: {s3_usage['size_gb']:.2f} GB")
        
        if costs['monthly_cost'] > 50:  # $50 threshold
            self.send_alert(f"Monthly storage cost: ${costs['monthly_cost']:.2f}")
    
    def get_s3_usage(self):
        """Get S3 bucket usage statistics"""
        cloudwatch = boto3.client('cloudwatch')
        
        response = cloudwatch.get_metric_statistics(
            Namespace='AWS/S3',
            MetricName='BucketSizeBytes',
            Dimensions=[
                {'Name': 'BucketName', 'Value': settings.AWS_STORAGE_BUCKET_NAME},
                {'Name': 'StorageType', 'Value': 'StandardStorage'}
            ],
            StartTime=datetime.utcnow() - timedelta(days=2),
            EndTime=datetime.utcnow(),
            Period=86400,
            Statistics=['Average']
        )
        
        size_bytes = response['Datapoints'][0]['Average'] if response['Datapoints'] else 0
        
        return {
            'size_bytes': size_bytes,
            'size_gb': size_bytes / (1024**3),
            'file_count': self.get_file_count()
        }
    
    def send_alert(self, message):
        """Send monitoring alert"""
        send_mail(
            'Storage Alert',
            message,
            'monitoring@example.com',
            ['admin@example.com'],
            fail_silently=False,
        )

Backup and Disaster Recovery

Automated Backup Strategy

# backup.py
import boto3
from django.core.management.base import BaseCommand

class Command(BaseCommand):
    """Backup media files to different region/provider"""
    
    def handle(self, *args, **options):
        # Setup source and destination
        source_bucket = settings.AWS_STORAGE_BUCKET_NAME
        backup_bucket = settings.AWS_BACKUP_BUCKET_NAME
        
        s3_client = boto3.client('s3')
        
        # List all objects in source bucket
        paginator = s3_client.get_paginator('list_objects_v2')
        pages = paginator.paginate(Bucket=source_bucket)
        
        for page in pages:
            if 'Contents' in page:
                for obj in page['Contents']:
                    self.backup_object(s3_client, source_bucket, backup_bucket, obj['Key'])
    
    def backup_object(self, s3_client, source_bucket, backup_bucket, key):
        """Backup individual object"""
        try:
            # Check if backup already exists
            try:
                s3_client.head_object(Bucket=backup_bucket, Key=key)
                return  # Already backed up
            except s3_client.exceptions.NoSuchKey:
                pass
            
            # Copy object to backup bucket
            copy_source = {'Bucket': source_bucket, 'Key': key}
            s3_client.copy_object(
                CopySource=copy_source,
                Bucket=backup_bucket,
                Key=key,
                StorageClass='GLACIER'  # Use cheaper storage class
            )
            
            self.stdout.write(f"Backed up: {key}")
            
        except Exception as e:
            self.stderr.write(f"Failed to backup {key}: {e}")

Best Practices Summary

Security

  • Implement comprehensive file validation
  • Use signed URLs for private files
  • Scan for malware in production
  • Implement proper access controls
  • Sanitize file names and paths

Performance

  • Use CDN for file delivery
  • Implement image optimization
  • Use appropriate caching headers
  • Process files asynchronously
  • Implement chunked uploads for large files

Scalability

  • Use cloud storage for production
  • Implement horizontal scaling strategies
  • Monitor usage and costs
  • Plan for traffic spikes
  • Use multiple storage regions

Reliability

  • Implement backup strategies
  • Monitor file integrity
  • Have disaster recovery plans
  • Use redundant storage
  • Implement health checks

Cost Optimization

  • Use appropriate storage classes
  • Implement lifecycle policies
  • Monitor and optimize usage
  • Clean up unused files
  • Use compression when appropriate

Production media management is a complex topic that requires balancing multiple concerns. By following these practices and continuously monitoring your system, you can build a robust, secure, and scalable file handling system for your Django applications.