Production media management requires careful consideration of performance, security, scalability, and cost. This chapter covers best practices and strategies for handling files in production Django applications.
# settings/production.py
import os
# Separate static and media handling
STATIC_URL = 'https://cdn.example.com/static/'
MEDIA_URL = 'https://cdn.example.com/media/'
# Cloud storage for media files
STORAGES = {
"default": {
"BACKEND": "storages.backends.s3boto3.S3Boto3Storage",
"OPTIONS": {
"bucket_name": os.environ.get("MEDIA_BUCKET_NAME"),
"custom_domain": "cdn.example.com",
"object_parameters": {
"CacheControl": "max-age=86400",
},
},
},
"staticfiles": {
"BACKEND": "storages.backends.s3boto3.S3StaticStorage",
"OPTIONS": {
"bucket_name": os.environ.get("STATIC_BUCKET_NAME"),
"location": "static",
"custom_domain": "static.example.com",
},
},
}
# Security settings
SECURE_MEDIA_URL = True
MEDIA_FILE_PERMISSIONS = 0o644
FILE_UPLOAD_PERMISSIONS = 0o644
# settings/base.py
class MediaConfig:
"""Base media configuration"""
# File upload limits
FILE_UPLOAD_MAX_MEMORY_SIZE = 5 * 1024 * 1024 # 5MB
DATA_UPLOAD_MAX_MEMORY_SIZE = 10 * 1024 * 1024 # 10MB
# Allowed file types
ALLOWED_IMAGE_TYPES = ['image/jpeg', 'image/png', 'image/gif', 'image/webp']
ALLOWED_DOCUMENT_TYPES = ['application/pdf', 'text/plain', 'application/msword']
# File size limits by type
MAX_IMAGE_SIZE = 10 * 1024 * 1024 # 10MB
MAX_DOCUMENT_SIZE = 50 * 1024 * 1024 # 50MB
# settings/development.py
class DevelopmentMediaConfig(MediaConfig):
MEDIA_ROOT = os.path.join(BASE_DIR, 'media')
MEDIA_URL = '/media/'
# Local file serving
STORAGES = {
"default": {
"BACKEND": "django.core.files.storage.FileSystemStorage",
},
}
# settings/production.py
class ProductionMediaConfig(MediaConfig):
# Cloud storage configuration
STORAGES = {
"default": {
"BACKEND": "myapp.storage.ProductionMediaStorage",
},
}
# views.py
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
from django.views import View
import os
import hashlib
@method_decorator(csrf_exempt, name='dispatch')
class ChunkedUploadView(View):
"""Handle large file uploads in chunks"""
def post(self, request):
chunk = request.FILES.get('chunk')
chunk_number = int(request.POST.get('chunkNumber', 0))
total_chunks = int(request.POST.get('totalChunks', 1))
file_id = request.POST.get('fileId')
if not all([chunk, file_id]):
return JsonResponse({'error': 'Missing required parameters'}, status=400)
# Create upload directory
upload_dir = os.path.join(settings.MEDIA_ROOT, 'temp_uploads', file_id)
os.makedirs(upload_dir, exist_ok=True)
# Save chunk
chunk_path = os.path.join(upload_dir, f'chunk_{chunk_number}')
with open(chunk_path, 'wb') as f:
for chunk_data in chunk.chunks():
f.write(chunk_data)
# Check if all chunks are uploaded
if chunk_number == total_chunks - 1:
final_file_path = self.assemble_chunks(upload_dir, total_chunks)
return JsonResponse({
'status': 'complete',
'file_path': final_file_path
})
return JsonResponse({'status': 'chunk_uploaded'})
def assemble_chunks(self, upload_dir, total_chunks):
"""Assemble uploaded chunks into final file"""
final_file_path = os.path.join(upload_dir, 'assembled_file')
with open(final_file_path, 'wb') as final_file:
for i in range(total_chunks):
chunk_path = os.path.join(upload_dir, f'chunk_{i}')
with open(chunk_path, 'rb') as chunk_file:
final_file.write(chunk_file.read())
os.remove(chunk_path) # Clean up chunk
return final_file_path
# tasks.py (using Celery)
from celery import shared_task
from PIL import Image
import os
@shared_task
def process_uploaded_image(image_path, user_id):
"""Process uploaded image asynchronously"""
try:
# Open and process image
with Image.open(image_path) as img:
# Create thumbnails
thumbnails = {
'small': (150, 150),
'medium': (300, 300),
'large': (800, 600)
}
for size_name, dimensions in thumbnails.items():
# Create thumbnail
thumbnail = img.copy()
thumbnail.thumbnail(dimensions, Image.Resampling.LANCZOS)
# Save thumbnail
thumb_path = f"{image_path}_{size_name}.jpg"
thumbnail.save(thumb_path, 'JPEG', quality=85)
# Upload to cloud storage
upload_to_cloud.delay(thumb_path, f"thumbnails/{user_id}/")
# Upload original to cloud storage
upload_to_cloud.delay(image_path, f"images/{user_id}/")
# Clean up local file
os.remove(image_path)
except Exception as e:
# Log error and handle failure
logger.error(f"Image processing failed: {e}")
@shared_task
def upload_to_cloud(file_path, cloud_path):
"""Upload file to cloud storage"""
from django.core.files.storage import default_storage
from django.core.files import File
try:
with open(file_path, 'rb') as f:
django_file = File(f)
cloud_name = default_storage.save(cloud_path, django_file)
# Clean up local file
os.remove(file_path)
return cloud_name
except Exception as e:
logger.error(f"Cloud upload failed: {e}")
raise
# validators.py
import magic
import os
from django.core.exceptions import ValidationError
from PIL import Image
class FileValidator:
"""Comprehensive file validation"""
def __init__(self, allowed_types=None, max_size=None):
self.allowed_types = allowed_types or []
self.max_size = max_size
def __call__(self, file):
self.validate_size(file)
self.validate_type(file)
self.validate_content(file)
self.scan_for_malware(file)
def validate_size(self, file):
"""Validate file size"""
if self.max_size and file.size > self.max_size:
raise ValidationError(f"File size exceeds {self.max_size} bytes")
def validate_type(self, file):
"""Validate file type using python-magic"""
file.seek(0)
file_header = file.read(1024)
file.seek(0)
detected_type = magic.from_buffer(file_header, mime=True)
if self.allowed_types and detected_type not in self.allowed_types:
raise ValidationError(f"File type {detected_type} not allowed")
def validate_content(self, file):
"""Validate file content for images"""
if file.content_type.startswith('image/'):
try:
# Verify image can be opened
file.seek(0)
with Image.open(file) as img:
img.verify()
file.seek(0)
except Exception:
raise ValidationError("Invalid image file")
def scan_for_malware(self, file):
"""Basic malware scanning"""
file.seek(0)
content = file.read(8192) # Read first 8KB
file.seek(0)
# Check for suspicious patterns
suspicious_patterns = [
b'<script',
b'javascript:',
b'<?php',
b'<%',
b'eval(',
]
content_lower = content.lower()
for pattern in suspicious_patterns:
if pattern in content_lower:
raise ValidationError("Potentially malicious content detected")
# Usage in models
image_validator = FileValidator(
allowed_types=['image/jpeg', 'image/png', 'image/gif'],
max_size=10 * 1024 * 1024 # 10MB
)
class UserPhoto(models.Model):
user = models.ForeignKey(User, on_delete=models.CASCADE)
photo = models.ImageField(
upload_to='photos/',
validators=[image_validator]
)
# views.py
from django.http import HttpResponse, Http404, HttpResponseForbidden
from django.contrib.auth.decorators import login_required
from django.shortcuts import get_object_or_404
import mimetypes
@login_required
def serve_protected_file(request, file_id):
"""Serve protected files with access control"""
# Get file object with permission check
file_obj = get_object_or_404(
ProtectedFile,
id=file_id,
user=request.user # Only owner can access
)
# Check additional permissions
if not file_obj.can_be_accessed_by(request.user):
return HttpResponseForbidden()
# For cloud storage, redirect to signed URL
if hasattr(file_obj.file.storage, 'generate_signed_url'):
signed_url = file_obj.file.storage.generate_signed_url(
file_obj.file.name,
expires_in=3600
)
return HttpResponseRedirect(signed_url)
# For local files, serve through Django
try:
with file_obj.file.open('rb') as f:
content = f.read()
content_type, _ = mimetypes.guess_type(file_obj.file.name)
response = HttpResponse(content, content_type=content_type)
response['Content-Disposition'] = f'attachment; filename="{file_obj.filename}"'
return response
except FileNotFoundError:
raise Http404("File not found")
# X-Accel-Redirect for Nginx
def serve_file_nginx(request, file_path):
"""Serve file using Nginx X-Accel-Redirect"""
# Perform access control checks here
if not user_can_access_file(request.user, file_path):
return HttpResponseForbidden()
response = HttpResponse()
response['X-Accel-Redirect'] = f'/protected/{file_path}'
response['Content-Type'] = '' # Let Nginx determine
return response
# storage.py
from storages.backends.s3boto3 import S3Boto3Storage
class CDNStorage(S3Boto3Storage):
"""S3 storage with CDN integration"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cdn_domain = getattr(settings, 'CDN_DOMAIN', None)
def url(self, name):
"""Return CDN URL if available"""
if self.cdn_domain:
return f"https://{self.cdn_domain}/{name}"
return super().url(name)
# settings.py
CDN_DOMAIN = 'cdn.example.com'
# CloudFront configuration
AWS_S3_OBJECT_PARAMETERS = {
'CacheControl': 'max-age=86400', # 24 hours
'Expires': 'Thu, 31 Dec 2099 20:00:00 GMT',
}
# image_processing.py
from PIL import Image, ImageOpt
import io
from django.core.files.base import ContentFile
class ImageOptimizer:
"""Optimize images for web delivery"""
def __init__(self):
self.formats = {
'JPEG': {'quality': 85, 'optimize': True},
'PNG': {'optimize': True},
'WEBP': {'quality': 80, 'method': 6},
}
def optimize_image(self, image_file, target_format='JPEG'):
"""Optimize image for web"""
with Image.open(image_file) as img:
# Convert to RGB if necessary
if img.mode in ('RGBA', 'LA', 'P') and target_format == 'JPEG':
img = img.convert('RGB')
# Resize if too large
max_dimension = 1920
if max(img.size) > max_dimension:
img.thumbnail((max_dimension, max_dimension), Image.Resampling.LANCZOS)
# Save optimized image
output = io.BytesIO()
save_kwargs = self.formats.get(target_format, {})
img.save(output, format=target_format, **save_kwargs)
output.seek(0)
return ContentFile(output.getvalue())
def create_responsive_images(self, image_file):
"""Create multiple sizes for responsive design"""
sizes = {
'thumbnail': (150, 150),
'small': (400, 300),
'medium': (800, 600),
'large': (1200, 900),
}
optimized_images = {}
with Image.open(image_file) as img:
for size_name, dimensions in sizes.items():
# Create resized image
resized = img.copy()
resized.thumbnail(dimensions, Image.Resampling.LANCZOS)
# Save as WebP for modern browsers
output = io.BytesIO()
resized.save(output, format='WEBP', quality=80, method=6)
output.seek(0)
optimized_images[size_name] = ContentFile(output.getvalue())
return optimized_images
# models.py
class FileAccessLog(models.Model):
"""Track file access for analytics"""
file = models.ForeignKey('MediaFile', on_delete=models.CASCADE)
user = models.ForeignKey(User, on_delete=models.SET_NULL, null=True)
ip_address = models.GenericIPAddressField()
user_agent = models.TextField()
accessed_at = models.DateTimeField(auto_now_add=True)
class Meta:
indexes = [
models.Index(fields=['file', 'accessed_at']),
models.Index(fields=['user', 'accessed_at']),
]
# middleware.py
class FileAccessMiddleware:
"""Track file access"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
response = self.get_response(request)
# Track media file access
if request.path.startswith('/media/'):
self.log_file_access(request)
return response
def log_file_access(self, request):
"""Log file access asynchronously"""
from .tasks import log_file_access_task
log_file_access_task.delay(
file_path=request.path,
user_id=request.user.id if request.user.is_authenticated else None,
ip_address=self.get_client_ip(request),
user_agent=request.META.get('HTTP_USER_AGENT', '')
)
def get_client_ip(self, request):
"""Get client IP address"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
# monitoring.py
import boto3
from django.core.management.base import BaseCommand
from django.core.mail import send_mail
class Command(BaseCommand):
"""Monitor storage usage and costs"""
def handle(self, *args, **options):
# Monitor S3 usage
s3_usage = self.get_s3_usage()
# Monitor costs
costs = self.get_storage_costs()
# Check thresholds
if s3_usage['size_gb'] > 100: # 100GB threshold
self.send_alert(f"S3 usage: {s3_usage['size_gb']:.2f} GB")
if costs['monthly_cost'] > 50: # $50 threshold
self.send_alert(f"Monthly storage cost: ${costs['monthly_cost']:.2f}")
def get_s3_usage(self):
"""Get S3 bucket usage statistics"""
cloudwatch = boto3.client('cloudwatch')
response = cloudwatch.get_metric_statistics(
Namespace='AWS/S3',
MetricName='BucketSizeBytes',
Dimensions=[
{'Name': 'BucketName', 'Value': settings.AWS_STORAGE_BUCKET_NAME},
{'Name': 'StorageType', 'Value': 'StandardStorage'}
],
StartTime=datetime.utcnow() - timedelta(days=2),
EndTime=datetime.utcnow(),
Period=86400,
Statistics=['Average']
)
size_bytes = response['Datapoints'][0]['Average'] if response['Datapoints'] else 0
return {
'size_bytes': size_bytes,
'size_gb': size_bytes / (1024**3),
'file_count': self.get_file_count()
}
def send_alert(self, message):
"""Send monitoring alert"""
send_mail(
'Storage Alert',
message,
'monitoring@example.com',
['admin@example.com'],
fail_silently=False,
)
# backup.py
import boto3
from django.core.management.base import BaseCommand
class Command(BaseCommand):
"""Backup media files to different region/provider"""
def handle(self, *args, **options):
# Setup source and destination
source_bucket = settings.AWS_STORAGE_BUCKET_NAME
backup_bucket = settings.AWS_BACKUP_BUCKET_NAME
s3_client = boto3.client('s3')
# List all objects in source bucket
paginator = s3_client.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=source_bucket)
for page in pages:
if 'Contents' in page:
for obj in page['Contents']:
self.backup_object(s3_client, source_bucket, backup_bucket, obj['Key'])
def backup_object(self, s3_client, source_bucket, backup_bucket, key):
"""Backup individual object"""
try:
# Check if backup already exists
try:
s3_client.head_object(Bucket=backup_bucket, Key=key)
return # Already backed up
except s3_client.exceptions.NoSuchKey:
pass
# Copy object to backup bucket
copy_source = {'Bucket': source_bucket, 'Key': key}
s3_client.copy_object(
CopySource=copy_source,
Bucket=backup_bucket,
Key=key,
StorageClass='GLACIER' # Use cheaper storage class
)
self.stdout.write(f"Backed up: {key}")
except Exception as e:
self.stderr.write(f"Failed to backup {key}: {e}")
Production media management is a complex topic that requires balancing multiple concerns. By following these practices and continuously monitoring your system, you can build a robust, secure, and scalable file handling system for your Django applications.
Using Cloud Storage Providers
Cloud storage providers offer scalable, reliable, and cost-effective solutions for handling files in production Django applications. This chapter covers integration with major cloud storage services and best practices for cloud-based file management.
Admin Site
Django's admin interface is one of its most powerful features, providing a ready-to-use administrative interface for your models. It's designed to be used by non-technical users to manage content and data, making it an essential tool for content management and site administration.