Transitioning a Django application from development to production requires systematic preparation across security, performance, configuration, and infrastructure. This chapter covers essential steps to ensure your application is production-ready, secure, and optimized for real-world usage.
# settings/__init__.py
import os
# Determine which settings to use
ENVIRONMENT = os.environ.get('DJANGO_ENVIRONMENT', 'development')
if ENVIRONMENT == 'production':
from .production import *
elif ENVIRONMENT == 'staging':
from .staging import *
elif ENVIRONMENT == 'testing':
from .testing import *
else:
from .development import *
# settings/base.py
import os
from pathlib import Path
BASE_DIR = Path(__file__).resolve().parent.parent.parent
# Application definition
DJANGO_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
]
THIRD_PARTY_APPS = [
'rest_framework',
'corsheaders',
'django_extensions',
]
LOCAL_APPS = [
'accounts',
'core',
'api',
]
INSTALLED_APPS = DJANGO_APPS + THIRD_PARTY_APPS + LOCAL_APPS
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'whitenoise.middleware.WhiteNoiseMiddleware',
'corsheaders.middleware.CorsMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
ROOT_URLCONF = 'myproject.urls'
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [BASE_DIR / 'templates'],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
# Internationalization
LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'UTC'
USE_I18N = True
USE_TZ = True
# Default primary key field type
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'
# settings/production.py
import os
import dj_database_url
from .base import *
# Security Settings
DEBUG = False
SECRET_KEY = os.environ.get('DJANGO_SECRET_KEY')
if not SECRET_KEY:
raise ValueError('DJANGO_SECRET_KEY environment variable is required')
ALLOWED_HOSTS = [
'yourdomain.com',
'www.yourdomain.com',
os.environ.get('ALLOWED_HOST', ''),
]
# Database Configuration
DATABASES = {
'default': dj_database_url.config(
default=os.environ.get('DATABASE_URL'),
conn_max_age=600,
conn_health_checks=True,
)
}
# Cache Configuration
CACHES = {
'default': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': os.environ.get('REDIS_URL', 'redis://localhost:6379/1'),
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
'CONNECTION_POOL_KWARGS': {
'max_connections': 50,
'retry_on_timeout': True,
},
},
'KEY_PREFIX': 'myapp',
'VERSION': 1,
}
}
# Session Configuration
SESSION_ENGINE = 'django.contrib.sessions.backends.cache'
SESSION_CACHE_ALIAS = 'default'
SESSION_COOKIE_AGE = 86400 # 24 hours
SESSION_SAVE_EVERY_REQUEST = True
# Static Files Configuration
STATIC_URL = '/static/'
STATIC_ROOT = BASE_DIR / 'staticfiles'
STATICFILES_STORAGE = 'whitenoise.storage.CompressedManifestStaticFilesStorage'
# Media Files Configuration
MEDIA_URL = '/media/'
MEDIA_ROOT = BASE_DIR / 'media'
# Email Configuration
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST = os.environ.get('EMAIL_HOST')
EMAIL_PORT = int(os.environ.get('EMAIL_PORT', 587))
EMAIL_USE_TLS = os.environ.get('EMAIL_USE_TLS', 'True').lower() == 'true'
EMAIL_HOST_USER = os.environ.get('EMAIL_HOST_USER')
EMAIL_HOST_PASSWORD = os.environ.get('EMAIL_HOST_PASSWORD')
DEFAULT_FROM_EMAIL = os.environ.get('DEFAULT_FROM_EMAIL', 'noreply@yourdomain.com')
# Security Headers
SECURE_SSL_REDIRECT = True
SECURE_PROXY_SSL_HEADER = ('HTTP_X_FORWARDED_PROTO', 'https')
SECURE_HSTS_SECONDS = 31536000 # 1 year
SECURE_HSTS_INCLUDE_SUBDOMAINS = True
SECURE_HSTS_PRELOAD = True
SECURE_CONTENT_TYPE_NOSNIFF = True
SECURE_BROWSER_XSS_FILTER = True
X_FRAME_OPTIONS = 'DENY'
# Cookie Security
SESSION_COOKIE_SECURE = True
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = 'Strict'
CSRF_COOKIE_SECURE = True
CSRF_COOKIE_HTTPONLY = True
CSRF_COOKIE_SAMESITE = 'Strict'
# Additional Security
SECURE_REFERRER_POLICY = 'strict-origin-when-cross-origin'
PERMISSIONS_POLICY = {
'geolocation': [],
'microphone': [],
'camera': [],
'payment': [],
'usb': [],
}
# Logging Configuration
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'verbose': {
'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
'style': '{',
},
'simple': {
'format': '{levelname} {message}',
'style': '{',
},
},
'handlers': {
'file': {
'level': 'INFO',
'class': 'logging.handlers.RotatingFileHandler',
'filename': '/var/log/django/django.log',
'maxBytes': 1024*1024*15, # 15MB
'backupCount': 10,
'formatter': 'verbose',
},
'error_file': {
'level': 'ERROR',
'class': 'logging.handlers.RotatingFileHandler',
'filename': '/var/log/django/django_error.log',
'maxBytes': 1024*1024*15, # 15MB
'backupCount': 10,
'formatter': 'verbose',
},
'console': {
'level': 'INFO',
'class': 'logging.StreamHandler',
'formatter': 'simple',
},
},
'root': {
'handlers': ['console', 'file'],
'level': 'INFO',
},
'loggers': {
'django': {
'handlers': ['console', 'file', 'error_file'],
'level': 'INFO',
'propagate': False,
},
'django.request': {
'handlers': ['error_file'],
'level': 'ERROR',
'propagate': False,
},
'myproject': {
'handlers': ['console', 'file'],
'level': 'INFO',
'propagate': False,
},
},
}
# .env.production
DJANGO_ENVIRONMENT=production
DJANGO_SECRET_KEY=your-super-secret-key-here
DEBUG=False
# Database
DATABASE_URL=postgresql://user:password@localhost:5432/myapp_prod
# Cache
REDIS_URL=redis://localhost:6379/1
# Email
EMAIL_HOST=smtp.mailgun.org
EMAIL_PORT=587
EMAIL_USE_TLS=True
EMAIL_HOST_USER=postmaster@mg.yourdomain.com
EMAIL_HOST_PASSWORD=your-mailgun-password
DEFAULT_FROM_EMAIL=noreply@yourdomain.com
# Static/Media Files
AWS_ACCESS_KEY_ID=your-aws-access-key
AWS_SECRET_ACCESS_KEY=your-aws-secret-key
AWS_STORAGE_BUCKET_NAME=your-s3-bucket
AWS_S3_REGION_NAME=us-east-1
# Third-party Services
STRIPE_PUBLIC_KEY=pk_live_...
STRIPE_SECRET_KEY=sk_live_...
SENTRY_DSN=https://...@sentry.io/...
# Monitoring
NEW_RELIC_LICENSE_KEY=your-newrelic-key
DATADOG_API_KEY=your-datadog-key
# settings/validators.py
import os
import sys
def validate_environment_variables():
"""Validate required environment variables"""
required_vars = [
'DJANGO_SECRET_KEY',
'DATABASE_URL',
'REDIS_URL',
'EMAIL_HOST_USER',
'EMAIL_HOST_PASSWORD',
]
missing_vars = []
for var in required_vars:
if not os.environ.get(var):
missing_vars.append(var)
if missing_vars:
print(f"Missing required environment variables: {', '.join(missing_vars)}")
sys.exit(1)
def validate_secret_key():
"""Validate Django secret key"""
secret_key = os.environ.get('DJANGO_SECRET_KEY')
if not secret_key:
raise ValueError('DJANGO_SECRET_KEY is required')
if len(secret_key) < 50:
raise ValueError('DJANGO_SECRET_KEY must be at least 50 characters long')
if secret_key == 'django-insecure-change-me':
raise ValueError('DJANGO_SECRET_KEY must be changed from default')
def validate_database_config():
"""Validate database configuration"""
database_url = os.environ.get('DATABASE_URL')
if not database_url:
raise ValueError('DATABASE_URL is required')
if 'sqlite' in database_url.lower():
print("Warning: SQLite is not recommended for production")
# Run validations
if __name__ == '__main__':
validate_environment_variables()
validate_secret_key()
validate_database_config()
print("All environment variables validated successfully")
# security/middleware.py
import re
from django.http import HttpResponsePermanentRedirect
from django.utils.deprecation import MiddlewareMixin
class SecurityHeadersMiddleware(MiddlewareMixin):
"""Add security headers to all responses"""
def process_response(self, request, response):
# Content Security Policy
response['Content-Security-Policy'] = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
"style-src 'self' 'unsafe-inline' https://fonts.googleapis.com; "
"font-src 'self' https://fonts.gstatic.com; "
"img-src 'self' data: https:; "
"connect-src 'self'; "
"frame-ancestors 'none';"
)
# Additional security headers
response['X-Content-Type-Options'] = 'nosniff'
response['X-Frame-Options'] = 'DENY'
response['X-XSS-Protection'] = '1; mode=block'
response['Referrer-Policy'] = 'strict-origin-when-cross-origin'
response['Permissions-Policy'] = (
'geolocation=(), microphone=(), camera=(), '
'payment=(), usb=(), magnetometer=(), gyroscope=()'
)
return response
class ForceHTTPSMiddleware(MiddlewareMixin):
"""Force HTTPS for all requests"""
def process_request(self, request):
if not request.is_secure() and not settings.DEBUG:
return HttpResponsePermanentRedirect(
'https://' + request.get_host() + request.get_full_path()
)
return None
class RateLimitMiddleware(MiddlewareMixin):
"""Basic rate limiting middleware"""
def __init__(self, get_response):
self.get_response = get_response
self.cache = {}
def process_request(self, request):
client_ip = self.get_client_ip(request)
current_time = time.time()
# Clean old entries
self.clean_cache(current_time)
# Check rate limit
if client_ip in self.cache:
requests_count, first_request_time = self.cache[client_ip]
if current_time - first_request_time < 60: # 1 minute window
if requests_count >= 100: # 100 requests per minute
return HttpResponse('Rate limit exceeded', status=429)
self.cache[client_ip] = (requests_count + 1, first_request_time)
else:
self.cache[client_ip] = (1, current_time)
else:
self.cache[client_ip] = (1, current_time)
return None
def get_client_ip(self, request):
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
def clean_cache(self, current_time):
expired_keys = [
key for key, (_, timestamp) in self.cache.items()
if current_time - timestamp > 60
]
for key in expired_keys:
del self.cache[key]
# security/validators.py
import re
from django.core.exceptions import ValidationError
from django.utils.html import strip_tags
import bleach
def validate_no_html(value):
"""Validate that input contains no HTML"""
if '<' in value or '>' in value:
raise ValidationError('HTML tags are not allowed')
def validate_safe_filename(value):
"""Validate filename is safe"""
if not re.match(r'^[a-zA-Z0-9._-]+$', value):
raise ValidationError('Filename contains invalid characters')
dangerous_names = ['..', '.', 'con', 'prn', 'aux', 'nul']
if value.lower() in dangerous_names:
raise ValidationError('Filename is not allowed')
def sanitize_html_input(value):
"""Sanitize HTML input"""
allowed_tags = ['p', 'br', 'strong', 'em', 'ul', 'ol', 'li']
allowed_attributes = {}
return bleach.clean(
value,
tags=allowed_tags,
attributes=allowed_attributes,
strip=True
)
def validate_sql_injection(value):
"""Basic SQL injection detection"""
dangerous_patterns = [
r'(\b(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER)\b)',
r'(--|#|/\*|\*/)',
r'(\bUNION\b.*\bSELECT\b)',
r'(\bOR\b.*=.*\bOR\b)',
]
for pattern in dangerous_patterns:
if re.search(pattern, value, re.IGNORECASE):
raise ValidationError('Potentially dangerous input detected')
# Custom form field with validation
class SecureCharField(forms.CharField):
"""CharField with built-in security validation"""
def __init__(self, *args, **kwargs):
self.allow_html = kwargs.pop('allow_html', False)
super().__init__(*args, **kwargs)
def clean(self, value):
value = super().clean(value)
if value:
# Basic validation
validate_sql_injection(value)
if not self.allow_html:
validate_no_html(value)
value = strip_tags(value)
else:
value = sanitize_html_input(value)
return value
# settings/database.py
import os
# Production database configuration
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': os.environ.get('DB_NAME'),
'USER': os.environ.get('DB_USER'),
'PASSWORD': os.environ.get('DB_PASSWORD'),
'HOST': os.environ.get('DB_HOST'),
'PORT': os.environ.get('DB_PORT', '5432'),
'OPTIONS': {
'sslmode': 'require',
'connect_timeout': 10,
'options': '-c default_transaction_isolation=read_committed'
},
'CONN_MAX_AGE': 600,
'CONN_HEALTH_CHECKS': True,
}
}
# Database connection pooling
DATABASE_POOL_ARGS = {
'max_overflow': 10,
'pool_pre_ping': True,
'pool_recycle': 300,
}
# Read replica configuration
if os.environ.get('DB_READ_HOST'):
DATABASES['read'] = {
'ENGINE': 'django.db.backends.postgresql',
'NAME': os.environ.get('DB_NAME'),
'USER': os.environ.get('DB_READ_USER'),
'PASSWORD': os.environ.get('DB_READ_PASSWORD'),
'HOST': os.environ.get('DB_READ_HOST'),
'PORT': os.environ.get('DB_READ_PORT', '5432'),
'OPTIONS': {
'sslmode': 'require',
'connect_timeout': 10,
},
'CONN_MAX_AGE': 600,
}
DATABASE_ROUTERS = ['myproject.routers.DatabaseRouter']
# Database router for read/write splitting
class DatabaseRouter:
"""Route reads to read replica, writes to primary"""
def db_for_read(self, model, **hints):
if 'read' in settings.DATABASES:
return 'read'
return 'default'
def db_for_write(self, model, **hints):
return 'default'
def allow_migrate(self, db, app_label, model_name=None, **hints):
return db == 'default'
# settings/cache.py
import os
# Multi-level caching configuration
CACHES = {
'default': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': os.environ.get('REDIS_URL'),
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.ShardClient',
'CONNECTION_POOL_KWARGS': {
'max_connections': 50,
'retry_on_timeout': True,
},
'SERIALIZER': 'django_redis.serializers.json.JSONSerializer',
'COMPRESSOR': 'django_redis.compressors.zlib.ZlibCompressor',
},
'KEY_PREFIX': 'myapp',
'VERSION': 1,
'TIMEOUT': 300, # 5 minutes default
},
'sessions': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': os.environ.get('REDIS_SESSIONS_URL', os.environ.get('REDIS_URL')),
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
},
'KEY_PREFIX': 'sessions',
'TIMEOUT': 86400, # 24 hours
},
'local': {
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
'LOCATION': 'local-cache',
'TIMEOUT': 60, # 1 minute
'OPTIONS': {
'MAX_ENTRIES': 1000,
},
},
}
# Cache middleware
MIDDLEWARE = [
'django.middleware.cache.UpdateCacheMiddleware',
# ... other middleware ...
'django.middleware.cache.FetchFromCacheMiddleware',
]
CACHE_MIDDLEWARE_ALIAS = 'default'
CACHE_MIDDLEWARE_SECONDS = 600
CACHE_MIDDLEWARE_KEY_PREFIX = 'middleware'
# Session configuration
SESSION_ENGINE = 'django.contrib.sessions.backends.cache'
SESSION_CACHE_ALIAS = 'sessions'
SESSION_COOKIE_AGE = 86400
SESSION_SAVE_EVERY_REQUEST = False
SESSION_EXPIRE_AT_BROWSER_CLOSE = False
# settings/static.py
import os
# Static files configuration
STATIC_URL = '/static/'
STATIC_ROOT = os.path.join(BASE_DIR, 'staticfiles')
STATICFILES_DIRS = [
os.path.join(BASE_DIR, 'static'),
]
STATICFILES_FINDERS = [
'django.contrib.staticfiles.finders.FileSystemFinder',
'django.contrib.staticfiles.finders.AppDirectoriesFinder',
]
# WhiteNoise configuration for static files
STATICFILES_STORAGE = 'whitenoise.storage.CompressedManifestStaticFilesStorage'
WHITENOISE_USE_FINDERS = True
WHITENOISE_AUTOREFRESH = False
WHITENOISE_MAX_AGE = 31536000 # 1 year
# Compression settings
WHITENOISE_SKIP_COMPRESS_EXTENSIONS = ['jpg', 'jpeg', 'png', 'gif', 'webp', 'zip', 'gz', 'tgz', 'bz2', 'tbz', 'xz', 'br']
# Media files configuration
MEDIA_URL = '/media/'
MEDIA_ROOT = os.path.join(BASE_DIR, 'media')
# File upload settings
FILE_UPLOAD_MAX_MEMORY_SIZE = 5242880 # 5MB
DATA_UPLOAD_MAX_MEMORY_SIZE = 5242880 # 5MB
DATA_UPLOAD_MAX_NUMBER_FIELDS = 1000
# AWS S3 configuration (if using S3)
if os.environ.get('USE_S3') == 'True':
AWS_ACCESS_KEY_ID = os.environ.get('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.environ.get('AWS_SECRET_ACCESS_KEY')
AWS_STORAGE_BUCKET_NAME = os.environ.get('AWS_STORAGE_BUCKET_NAME')
AWS_S3_REGION_NAME = os.environ.get('AWS_S3_REGION_NAME')
AWS_S3_CUSTOM_DOMAIN = f'{AWS_STORAGE_BUCKET_NAME}.s3.amazonaws.com'
# Static files
STATICFILES_STORAGE = 'storages.backends.s3boto3.S3Boto3Storage'
STATIC_URL = f'https://{AWS_S3_CUSTOM_DOMAIN}/static/'
# Media files
DEFAULT_FILE_STORAGE = 'storages.backends.s3boto3.S3Boto3Storage'
MEDIA_URL = f'https://{AWS_S3_CUSTOM_DOMAIN}/media/'
# S3 settings
AWS_DEFAULT_ACL = 'public-read'
AWS_S3_OBJECT_PARAMETERS = {
'CacheControl': 'max-age=86400',
}
AWS_S3_FILE_OVERWRITE = False
AWS_QUERYSTRING_AUTH = False
# health/views.py
import time
import psutil
from django.http import JsonResponse
from django.db import connection
from django.core.cache import cache
from django.conf import settings
from django.utils import timezone
def health_check(request):
"""Basic health check endpoint"""
try:
# Database connectivity test
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
db_status = "connected"
except Exception as e:
db_status = f"error: {str(e)}"
try:
# Cache connectivity test
cache.set('health_check', 'ok', 30)
cache_status = "connected" if cache.get('health_check') == 'ok' else "disconnected"
except Exception as e:
cache_status = f"error: {str(e)}"
# System metrics
try:
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
system_status = {
'cpu_percent': cpu_percent,
'memory_percent': memory.percent,
'disk_percent': (disk.used / disk.total) * 100,
}
except Exception as e:
system_status = f"error: {str(e)}"
health_data = {
'status': 'healthy' if db_status == 'connected' and cache_status == 'connected' else 'unhealthy',
'timestamp': timezone.now().isoformat(),
'database': db_status,
'cache': cache_status,
'system': system_status,
'version': getattr(settings, 'VERSION', 'unknown'),
}
status_code = 200 if health_data['status'] == 'healthy' else 503
return JsonResponse(health_data, status=status_code)
def readiness_check(request):
"""Readiness check for load balancers"""
checks = {
'database': check_database_ready(),
'cache': check_cache_ready(),
'migrations': check_migrations_applied(),
'static_files': check_static_files_ready(),
}
all_ready = all(checks.values())
return JsonResponse({
'status': 'ready' if all_ready else 'not_ready',
'checks': checks,
'timestamp': timezone.now().isoformat(),
}, status=200 if all_ready else 503)
def liveness_check(request):
"""Liveness check for container orchestration"""
try:
# Basic application liveness test
current_time = time.time()
return JsonResponse({
'status': 'alive',
'timestamp': timezone.now().isoformat(),
'uptime': current_time - getattr(settings, 'START_TIME', current_time),
})
except Exception as e:
return JsonResponse({
'status': 'dead',
'error': str(e),
'timestamp': timezone.now().isoformat(),
}, status=503)
def check_database_ready():
"""Check if database is ready"""
try:
with connection.cursor() as cursor:
cursor.execute("SELECT COUNT(*) FROM django_migrations")
return True
except Exception:
return False
def check_cache_ready():
"""Check if cache is ready"""
try:
cache.set('readiness_check', 'ok', 10)
return cache.get('readiness_check') == 'ok'
except Exception:
return False
def check_migrations_applied():
"""Check if all migrations are applied"""
try:
from django.core.management import execute_from_command_line
from django.db.migrations.executor import MigrationExecutor
executor = MigrationExecutor(connection)
plan = executor.migration_plan(executor.loader.graph.leaf_nodes())
return len(plan) == 0
except Exception:
return False
def check_static_files_ready():
"""Check if static files are ready"""
try:
from django.contrib.staticfiles import finders
static_file = finders.find('admin/css/base.css')
return static_file is not None
except Exception:
return False
# monitoring/metrics.py
import time
import threading
from collections import defaultdict, deque
from django.utils import timezone
class MetricsCollector:
"""Collect application metrics"""
def __init__(self):
self.request_count = defaultdict(int)
self.response_times = deque(maxlen=1000)
self.error_count = defaultdict(int)
self.active_users = set()
self.lock = threading.Lock()
def record_request(self, method, path, status_code, response_time, user_id=None):
"""Record request metrics"""
with self.lock:
self.request_count[f"{method}:{path}"] += 1
self.response_times.append(response_time)
if status_code >= 400:
self.error_count[status_code] += 1
if user_id:
self.active_users.add(user_id)
def get_metrics(self):
"""Get current metrics"""
with self.lock:
avg_response_time = sum(self.response_times) / len(self.response_times) if self.response_times else 0
return {
'requests_per_minute': sum(self.request_count.values()),
'average_response_time': avg_response_time,
'error_rate': sum(self.error_count.values()) / sum(self.request_count.values()) if self.request_count else 0,
'active_users': len(self.active_users),
'timestamp': timezone.now().isoformat(),
}
def reset_metrics(self):
"""Reset metrics (called periodically)"""
with self.lock:
self.request_count.clear()
self.error_count.clear()
self.active_users.clear()
# Global metrics collector
metrics_collector = MetricsCollector()
class MetricsMiddleware:
"""Middleware to collect request metrics"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
start_time = time.time()
response = self.get_response(request)
response_time = time.time() - start_time
user_id = request.user.id if hasattr(request, 'user') and request.user.is_authenticated else None
metrics_collector.record_request(
method=request.method,
path=request.path,
status_code=response.status_code,
response_time=response_time,
user_id=user_id
)
return response
def metrics_endpoint(request):
"""Endpoint to expose metrics"""
return JsonResponse(metrics_collector.get_metrics())
# views/errors.py
from django.shortcuts import render
from django.http import JsonResponse
import logging
logger = logging.getLogger(__name__)
def handler404(request, exception):
"""Custom 404 error handler"""
logger.warning(f"404 error: {request.path} from {request.META.get('REMOTE_ADDR')}")
if request.path.startswith('/api/'):
return JsonResponse({
'error': 'Not Found',
'message': 'The requested resource was not found.',
'status_code': 404
}, status=404)
return render(request, 'errors/404.html', {
'request_path': request.path
}, status=404)
def handler500(request):
"""Custom 500 error handler"""
logger.error(f"500 error on {request.path} from {request.META.get('REMOTE_ADDR')}")
if request.path.startswith('/api/'):
return JsonResponse({
'error': 'Internal Server Error',
'message': 'An unexpected error occurred. Please try again later.',
'status_code': 500
}, status=500)
return render(request, 'errors/500.html', status=500)
def handler403(request, exception):
"""Custom 403 error handler"""
logger.warning(f"403 error: {request.path} from {request.META.get('REMOTE_ADDR')}")
if request.path.startswith('/api/'):
return JsonResponse({
'error': 'Forbidden',
'message': 'You do not have permission to access this resource.',
'status_code': 403
}, status=403)
return render(request, 'errors/403.html', status=403)
# logging/formatters.py
import json
import logging
from django.utils import timezone
class JSONFormatter(logging.Formatter):
"""JSON formatter for structured logging"""
def format(self, record):
log_entry = {
'timestamp': timezone.now().isoformat(),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno,
}
# Add exception info if present
if record.exc_info:
log_entry['exception'] = self.formatException(record.exc_info)
# Add extra fields
for key, value in record.__dict__.items():
if key not in ['name', 'msg', 'args', 'levelname', 'levelno', 'pathname',
'filename', 'module', 'lineno', 'funcName', 'created',
'msecs', 'relativeCreated', 'thread', 'threadName',
'processName', 'process', 'getMessage', 'exc_info',
'exc_text', 'stack_info']:
log_entry[key] = value
return json.dumps(log_entry)
class RequestFormatter(logging.Formatter):
"""Formatter for request logging"""
def format(self, record):
if hasattr(record, 'request'):
request = record.request
log_entry = {
'timestamp': timezone.now().isoformat(),
'level': record.levelname,
'message': record.getMessage(),
'request': {
'method': request.method,
'path': request.path,
'user': str(request.user) if hasattr(request, 'user') else 'Anonymous',
'ip': request.META.get('REMOTE_ADDR'),
'user_agent': request.META.get('HTTP_USER_AGENT'),
},
}
return json.dumps(log_entry)
return super().format(record)
# management/commands/check_migrations.py
from django.core.management.base import BaseCommand
from django.db.migrations.executor import MigrationExecutor
from django.db import connection
class Command(BaseCommand):
help = 'Check if all migrations are applied'
def handle(self, *args, **options):
executor = MigrationExecutor(connection)
plan = executor.migration_plan(executor.loader.graph.leaf_nodes())
if plan:
self.stdout.write(
self.style.WARNING(f'Unapplied migrations found: {len(plan)}')
)
for migration, backwards in plan:
self.stdout.write(f' - {migration}')
return False
else:
self.stdout.write(
self.style.SUCCESS('All migrations are applied')
)
return True
# management/commands/safe_migrate.py
import time
from django.core.management.base import BaseCommand
from django.core.management import call_command
from django.db import connection
class Command(BaseCommand):
help = 'Safely run migrations with backup'
def add_arguments(self, parser):
parser.add_argument('--backup', action='store_true', help='Create backup before migration')
parser.add_argument('--dry-run', action='store_true', help='Show migrations without applying')
def handle(self, *args, **options):
if options['backup']:
self.stdout.write('Creating database backup...')
# Implement backup logic here
call_command('dbbackup')
if options['dry_run']:
self.stdout.write('Dry run - showing planned migrations:')
call_command('showmigrations', '--plan')
else:
self.stdout.write('Applying migrations...')
start_time = time.time()
try:
call_command('migrate', verbosity=2)
duration = time.time() - start_time
self.stdout.write(
self.style.SUCCESS(f'Migrations completed in {duration:.2f} seconds')
)
except Exception as e:
self.stdout.write(
self.style.ERROR(f'Migration failed: {str(e)}')
)
if options['backup']:
self.stdout.write('Consider restoring from backup')
raise
# management/commands/deployment_check.py
import os
import sys
from django.core.management.base import BaseCommand
from django.conf import settings
from django.core.checks import run_checks
from django.core.management import call_command
class Command(BaseCommand):
help = 'Run comprehensive deployment checks'
def handle(self, *args, **options):
checks_passed = 0
checks_failed = 0
# Security checks
self.stdout.write(self.style.HTTP_INFO('Running security checks...'))
if self.check_security_settings():
checks_passed += 1
else:
checks_failed += 1
# Environment variables
self.stdout.write(self.style.HTTP_INFO('Checking environment variables...'))
if self.check_environment_variables():
checks_passed += 1
else:
checks_failed += 1
# Database connectivity
self.stdout.write(self.style.HTTP_INFO('Testing database connectivity...'))
if self.check_database():
checks_passed += 1
else:
checks_failed += 1
# Cache connectivity
self.stdout.write(self.style.HTTP_INFO('Testing cache connectivity...'))
if self.check_cache():
checks_passed += 1
else:
checks_failed += 1
# Static files
self.stdout.write(self.style.HTTP_INFO('Checking static files...'))
if self.check_static_files():
checks_passed += 1
else:
checks_failed += 1
# Django system checks
self.stdout.write(self.style.HTTP_INFO('Running Django system checks...'))
if self.run_django_checks():
checks_passed += 1
else:
checks_failed += 1
# Summary
self.stdout.write('\n' + '='*50)
self.stdout.write(f'Deployment Check Summary:')
self.stdout.write(f' Passed: {checks_passed}')
self.stdout.write(f' Failed: {checks_failed}')
if checks_failed > 0:
self.stdout.write(self.style.ERROR('❌ Deployment checks failed!'))
sys.exit(1)
else:
self.stdout.write(self.style.SUCCESS('✅ All deployment checks passed!'))
def check_security_settings(self):
"""Check security settings"""
issues = []
if settings.DEBUG:
issues.append('DEBUG is True')
if not settings.SECRET_KEY or len(settings.SECRET_KEY) < 50:
issues.append('SECRET_KEY is missing or too short')
if not settings.ALLOWED_HOSTS:
issues.append('ALLOWED_HOSTS is empty')
if not getattr(settings, 'SECURE_SSL_REDIRECT', False):
issues.append('SECURE_SSL_REDIRECT is not enabled')
if issues:
for issue in issues:
self.stdout.write(self.style.ERROR(f' ❌ {issue}'))
return False
self.stdout.write(self.style.SUCCESS(' ✅ Security settings OK'))
return True
def check_environment_variables(self):
"""Check required environment variables"""
required_vars = [
'DJANGO_SECRET_KEY',
'DATABASE_URL',
'REDIS_URL',
]
missing_vars = [var for var in required_vars if not os.environ.get(var)]
if missing_vars:
for var in missing_vars:
self.stdout.write(self.style.ERROR(f' ❌ Missing: {var}'))
return False
self.stdout.write(self.style.SUCCESS(' ✅ Environment variables OK'))
return True
def check_database(self):
"""Check database connectivity"""
try:
from django.db import connection
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
self.stdout.write(self.style.SUCCESS(' ✅ Database connection OK'))
return True
except Exception as e:
self.stdout.write(self.style.ERROR(f' ❌ Database error: {str(e)}'))
return False
def check_cache(self):
"""Check cache connectivity"""
try:
from django.core.cache import cache
cache.set('deployment_check', 'ok', 30)
if cache.get('deployment_check') == 'ok':
self.stdout.write(self.style.SUCCESS(' ✅ Cache connection OK'))
return True
else:
self.stdout.write(self.style.ERROR(' ❌ Cache not working'))
return False
except Exception as e:
self.stdout.write(self.style.ERROR(f' ❌ Cache error: {str(e)}'))
return False
def check_static_files(self):
"""Check static files configuration"""
try:
from django.contrib.staticfiles import finders
static_file = finders.find('admin/css/base.css')
if static_file:
self.stdout.write(self.style.SUCCESS(' ✅ Static files OK'))
return True
else:
self.stdout.write(self.style.ERROR(' ❌ Static files not found'))
return False
except Exception as e:
self.stdout.write(self.style.ERROR(f' ❌ Static files error: {str(e)}'))
return False
def run_django_checks(self):
"""Run Django system checks"""
try:
issues = run_checks()
if issues:
for issue in issues:
self.stdout.write(self.style.ERROR(f' ❌ {issue}'))
return False
else:
self.stdout.write(self.style.SUCCESS(' ✅ Django system checks OK'))
return True
except Exception as e:
self.stdout.write(self.style.ERROR(f' ❌ System checks error: {str(e)}'))
return False
#!/bin/bash
# scripts/post_deployment_check.sh
echo "🚀 Starting post-deployment verification..."
# Check if application is responding
echo "📡 Checking application health..."
response=$(curl -s -o /dev/null -w "%{http_code}" http://localhost:8000/health/)
if [ $response -eq 200 ]; then
echo "✅ Application health check passed"
else
echo "❌ Application health check failed (HTTP $response)"
exit 1
fi
# Check database migrations
echo "🗄️ Checking database migrations..."
python manage.py check_migrations
if [ $? -eq 0 ]; then
echo "✅ Database migrations are up to date"
else
echo "❌ Database migrations check failed"
exit 1
fi
# Check static files
echo "📁 Checking static files..."
if [ -f "staticfiles/admin/css/base.css" ]; then
echo "✅ Static files are collected"
else
echo "❌ Static files not found"
exit 1
fi
# Check critical endpoints
echo "🔍 Testing critical endpoints..."
endpoints=("/api/health/" "/admin/" "/")
for endpoint in "${endpoints[@]}"; do
response=$(curl -s -o /dev/null -w "%{http_code}" "http://localhost:8000$endpoint")
if [ $response -eq 200 ] || [ $response -eq 302 ]; then
echo "✅ $endpoint responding correctly"
else
echo "❌ $endpoint failed (HTTP $response)"
exit 1
fi
done
echo "🎉 Post-deployment verification completed successfully!"
This comprehensive preparation guide ensures your Django application is production-ready with proper security, performance optimization, monitoring, and deployment verification procedures.
Deployment
Deploying Django applications to production requires careful planning, proper configuration, and robust infrastructure. This comprehensive guide covers everything from preparing your application for production to implementing scalable deployment architectures, monitoring systems, and backup strategies. Whether you're deploying a simple web application or a complex microservices architecture, this section provides production-ready patterns and best practices.
Using WSGI and ASGI Servers
Production Django applications require robust application servers to handle HTTP requests efficiently. This chapter covers configuring and deploying Django applications with WSGI servers (Gunicorn, uWSGI) for traditional synchronous applications and ASGI servers (Uvicorn, Daphne, Hypercorn) for asynchronous applications with WebSocket support.