Comprehensive backup strategies are critical for Django applications to ensure data protection, disaster recovery, and business continuity. This chapter covers database backups, file system backups, automated backup procedures, disaster recovery planning, and backup testing strategies.
#!/bin/bash
# scripts/postgres_backup.sh - Comprehensive PostgreSQL backup script
set -e
# Configuration
DB_NAME="${DB_NAME:-django_app}"
DB_USER="${DB_USER:-postgres}"
DB_HOST="${DB_HOST:-localhost}"
DB_PORT="${DB_PORT:-5432}"
BACKUP_DIR="${BACKUP_DIR:-/var/backups/postgresql}"
RETENTION_DAYS="${RETENTION_DAYS:-30}"
S3_BUCKET="${S3_BUCKET:-my-app-backups}"
ENCRYPTION_KEY="${ENCRYPTION_KEY:-/etc/backup/encryption.key}"
# Create backup directory
mkdir -p "$BACKUP_DIR"
# Generate backup filename with timestamp
TIMESTAMP=$(date +"%Y%m%d_%H%M%S")
BACKUP_FILE="$BACKUP_DIR/${DB_NAME}_${TIMESTAMP}.sql"
COMPRESSED_FILE="${BACKUP_FILE}.gz"
ENCRYPTED_FILE="${COMPRESSED_FILE}.enc"
echo "🗄️ Starting PostgreSQL backup for $DB_NAME..."
# Create database dump
pg_dump \
--host="$DB_HOST" \
--port="$DB_PORT" \
--username="$DB_USER" \
--dbname="$DB_NAME" \
--verbose \
--clean \
--if-exists \
--create \
--format=custom \
--compress=9 \
--file="$BACKUP_FILE"
# Verify backup integrity
if pg_restore --list "$BACKUP_FILE" > /dev/null 2>&1; then
echo "✅ Backup integrity verified"
else
echo "❌ Backup integrity check failed"
exit 1
fi
# Compress backup
gzip "$BACKUP_FILE"
echo "📦 Backup compressed: $COMPRESSED_FILE"
# Encrypt backup
if [ -f "$ENCRYPTION_KEY" ]; then
openssl enc -aes-256-cbc -salt -in "$COMPRESSED_FILE" -out "$ENCRYPTED_FILE" -pass file:"$ENCRYPTION_KEY"
rm "$COMPRESSED_FILE"
FINAL_FILE="$ENCRYPTED_FILE"
echo "🔒 Backup encrypted: $ENCRYPTED_FILE"
else
FINAL_FILE="$COMPRESSED_FILE"
echo "⚠️ No encryption key found, backup not encrypted"
fi
# Upload to S3
if command -v aws &> /dev/null && [ -n "$S3_BUCKET" ]; then
S3_KEY="postgresql/$(basename "$FINAL_FILE")"
aws s3 cp "$FINAL_FILE" "s3://$S3_BUCKET/$S3_KEY" \
--storage-class STANDARD_IA \
--server-side-encryption AES256
echo "☁️ Backup uploaded to S3: s3://$S3_BUCKET/$S3_KEY"
fi
# Clean up old backups
find "$BACKUP_DIR" -name "${DB_NAME}_*.sql*" -mtime +$RETENTION_DAYS -delete
echo "🧹 Cleaned up backups older than $RETENTION_DAYS days"
# Log backup completion
echo "✅ Backup completed successfully: $FINAL_FILE"
echo "📊 Backup size: $(du -h "$FINAL_FILE" | cut -f1)"
# Send notification
if command -v curl &> /dev/null && [ -n "$SLACK_WEBHOOK" ]; then
curl -X POST -H 'Content-type: application/json' \
--data "{\"text\":\"✅ Database backup completed for $DB_NAME\"}" \
"$SLACK_WEBHOOK"
fi
#!/bin/bash
# scripts/postgres_pitr_backup.sh - Point-in-time recovery backup
set -e
# Configuration
PGDATA="${PGDATA:-/var/lib/postgresql/data}"
BACKUP_DIR="${BACKUP_DIR:-/var/backups/postgresql/pitr}"
WAL_ARCHIVE_DIR="${WAL_ARCHIVE_DIR:-/var/backups/postgresql/wal}"
RETENTION_DAYS="${RETENTION_DAYS:-7}"
# Create directories
mkdir -p "$BACKUP_DIR" "$WAL_ARCHIVE_DIR"
echo "🔄 Starting point-in-time recovery backup..."
# Start base backup
BACKUP_LABEL=$(date +"%Y%m%d_%H%M%S")
BASE_BACKUP_DIR="$BACKUP_DIR/base_$BACKUP_LABEL"
# Create base backup using pg_basebackup
pg_basebackup \
--pgdata="$BASE_BACKUP_DIR" \
--format=tar \
--gzip \
--compress=9 \
--checkpoint=fast \
--label="base_backup_$BACKUP_LABEL" \
--progress \
--verbose \
--wal-method=stream
echo "✅ Base backup completed: $BASE_BACKUP_DIR"
# Configure WAL archiving (add to postgresql.conf)
cat >> "$PGDATA/postgresql.conf" << EOF
# WAL archiving configuration
wal_level = replica
archive_mode = on
archive_command = 'cp %p $WAL_ARCHIVE_DIR/%f'
archive_timeout = 300
EOF
# Create recovery configuration template
cat > "$BASE_BACKUP_DIR/recovery.conf.template" << EOF
# Point-in-time recovery configuration
restore_command = 'cp $WAL_ARCHIVE_DIR/%f %p'
recovery_target_time = 'YYYY-MM-DD HH:MM:SS'
recovery_target_inclusive = true
EOF
# Clean up old base backups
find "$BACKUP_DIR" -name "base_*" -mtime +$RETENTION_DAYS -exec rm -rf {} +
# Clean up old WAL files (keep files from last base backup)
LAST_BACKUP_TIME=$(find "$BACKUP_DIR" -name "base_*" -type d -printf '%T@ %p\n' | sort -n | tail -1 | cut -d' ' -f2-)
if [ -n "$LAST_BACKUP_TIME" ]; then
find "$WAL_ARCHIVE_DIR" -type f -not -newer "$LAST_BACKUP_TIME" -delete
fi
echo "✅ Point-in-time recovery backup setup completed"
# management/commands/backup_database.py
import os
import subprocess
import gzip
import shutil
from datetime import datetime, timedelta
from django.core.management.base import BaseCommand
from django.conf import settings
from django.db import connection
import boto3
class Command(BaseCommand):
help = 'Create database backup with various options'
def add_arguments(self, parser):
parser.add_argument('--format', choices=['sql', 'custom', 'json'], default='custom')
parser.add_argument('--compress', action='store_true', help='Compress backup')
parser.add_argument('--encrypt', action='store_true', help='Encrypt backup')
parser.add_argument('--upload-s3', action='store_true', help='Upload to S3')
parser.add_argument('--retention-days', type=int, default=30)
parser.add_argument('--output-dir', default='/var/backups/django')
def handle(self, *args, **options):
self.stdout.write('Starting database backup...')
# Get database configuration
db_config = settings.DATABASES['default']
# Generate backup filename
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_name = f"django_db_{timestamp}"
if options['format'] == 'json':
backup_file = self.create_json_backup(backup_name, options)
else:
backup_file = self.create_sql_backup(backup_name, options, db_config)
# Compress if requested
if options['compress']:
backup_file = self.compress_backup(backup_file)
# Encrypt if requested
if options['encrypt']:
backup_file = self.encrypt_backup(backup_file)
# Upload to S3 if requested
if options['upload_s3']:
self.upload_to_s3(backup_file)
# Clean up old backups
self.cleanup_old_backups(options['output_dir'], options['retention_days'])
self.stdout.write(
self.style.SUCCESS(f'Backup completed successfully: {backup_file}')
)
def create_sql_backup(self, backup_name, options, db_config):
"""Create SQL backup using pg_dump"""
backup_dir = options['output_dir']
os.makedirs(backup_dir, exist_ok=True)
if options['format'] == 'custom':
backup_file = os.path.join(backup_dir, f"{backup_name}.dump")
format_option = 'custom'
else:
backup_file = os.path.join(backup_dir, f"{backup_name}.sql")
format_option = 'plain'
# Build pg_dump command
cmd = [
'pg_dump',
f"--host={db_config['HOST']}",
f"--port={db_config['PORT']}",
f"--username={db_config['USER']}",
f"--dbname={db_config['NAME']}",
f"--format={format_option}",
'--verbose',
'--clean',
'--if-exists',
'--create',
]
if format_option == 'custom':
cmd.append('--compress=9')
cmd.extend(['-f', backup_file])
# Set password via environment
env = os.environ.copy()
env['PGPASSWORD'] = db_config['PASSWORD']
# Execute backup
result = subprocess.run(cmd, env=env, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"pg_dump failed: {result.stderr}")
return backup_file
def create_json_backup(self, backup_name, options):
"""Create JSON backup using Django's dumpdata"""
backup_dir = options['output_dir']
os.makedirs(backup_dir, exist_ok=True)
backup_file = os.path.join(backup_dir, f"{backup_name}.json")
# Use Django's dumpdata command
from django.core.management import call_command
with open(backup_file, 'w') as f:
call_command(
'dumpdata',
'--natural-foreign',
'--natural-primary',
'--indent=2',
stdout=f
)
return backup_file
def compress_backup(self, backup_file):
"""Compress backup file"""
compressed_file = f"{backup_file}.gz"
with open(backup_file, 'rb') as f_in:
with gzip.open(compressed_file, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(backup_file)
return compressed_file
def encrypt_backup(self, backup_file):
"""Encrypt backup file"""
encrypted_file = f"{backup_file}.enc"
encryption_key = getattr(settings, 'BACKUP_ENCRYPTION_KEY', None)
if not encryption_key:
self.stdout.write(
self.style.WARNING('No encryption key configured, skipping encryption')
)
return backup_file
cmd = [
'openssl', 'enc', '-aes-256-cbc', '-salt',
'-in', backup_file,
'-out', encrypted_file,
'-pass', f'file:{encryption_key}'
]
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
raise Exception(f"Encryption failed: {result.stderr}")
os.remove(backup_file)
return encrypted_file
def upload_to_s3(self, backup_file):
"""Upload backup to S3"""
s3_bucket = getattr(settings, 'BACKUP_S3_BUCKET', None)
if not s3_bucket:
self.stdout.write(
self.style.WARNING('No S3 bucket configured, skipping upload')
)
return
s3_client = boto3.client('s3')
s3_key = f"database/{os.path.basename(backup_file)}"
try:
s3_client.upload_file(
backup_file,
s3_bucket,
s3_key,
ExtraArgs={
'StorageClass': 'STANDARD_IA',
'ServerSideEncryption': 'AES256'
}
)
self.stdout.write(f'Backup uploaded to S3: s3://{s3_bucket}/{s3_key}')
except Exception as e:
self.stdout.write(
self.style.ERROR(f'S3 upload failed: {str(e)}')
)
def cleanup_old_backups(self, backup_dir, retention_days):
"""Clean up old backup files"""
cutoff_date = datetime.now() - timedelta(days=retention_days)
for filename in os.listdir(backup_dir):
file_path = os.path.join(backup_dir, filename)
if os.path.isfile(file_path):
file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
if file_mtime < cutoff_date:
os.remove(file_path)
self.stdout.write(f'Removed old backup: {filename}')
# management/commands/backup_media.py
import os
import shutil
import tarfile
from datetime import datetime, timedelta
from django.core.management.base import BaseCommand
from django.conf import settings
import boto3
from botocore.exceptions import ClientError
class Command(BaseCommand):
help = 'Backup media files with incremental and full backup options'
def add_arguments(self, parser):
parser.add_argument('--type', choices=['full', 'incremental'], default='incremental')
parser.add_argument('--compress', action='store_true', help='Compress backup')
parser.add_argument('--sync-s3', action='store_true', help='Sync to S3')
parser.add_argument('--retention-days', type=int, default=30)
parser.add_argument('--output-dir', default='/var/backups/media')
def handle(self, *args, **options):
self.stdout.write('Starting media files backup...')
media_root = settings.MEDIA_ROOT
backup_dir = options['output_dir']
os.makedirs(backup_dir, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
if options['type'] == 'full':
backup_file = self.create_full_backup(media_root, backup_dir, timestamp, options)
else:
backup_file = self.create_incremental_backup(media_root, backup_dir, timestamp, options)
if options['sync_s3']:
self.sync_to_s3(media_root)
self.cleanup_old_backups(backup_dir, options['retention_days'])
self.stdout.write(
self.style.SUCCESS(f'Media backup completed: {backup_file}')
)
def create_full_backup(self, media_root, backup_dir, timestamp, options):
"""Create full backup of media files"""
backup_name = f"media_full_{timestamp}"
if options['compress']:
backup_file = os.path.join(backup_dir, f"{backup_name}.tar.gz")
mode = 'w:gz'
else:
backup_file = os.path.join(backup_dir, f"{backup_name}.tar")
mode = 'w'
with tarfile.open(backup_file, mode) as tar:
tar.add(media_root, arcname='media')
# Create backup manifest
manifest_file = os.path.join(backup_dir, f"{backup_name}.manifest")
self.create_manifest(media_root, manifest_file)
return backup_file
def create_incremental_backup(self, media_root, backup_dir, timestamp, options):
"""Create incremental backup based on last full backup"""
# Find last full backup
last_full_backup = self.find_last_full_backup(backup_dir)
if not last_full_backup:
self.stdout.write('No full backup found, creating full backup instead')
return self.create_full_backup(media_root, backup_dir, timestamp, options)
# Get modification time of last backup
last_backup_time = datetime.fromtimestamp(os.path.getmtime(last_full_backup))
backup_name = f"media_incremental_{timestamp}"
if options['compress']:
backup_file = os.path.join(backup_dir, f"{backup_name}.tar.gz")
mode = 'w:gz'
else:
backup_file = os.path.join(backup_dir, f"{backup_name}.tar")
mode = 'w'
# Create incremental backup with only modified files
with tarfile.open(backup_file, mode) as tar:
for root, dirs, files in os.walk(media_root):
for file in files:
file_path = os.path.join(root, file)
file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
if file_mtime > last_backup_time:
arcname = os.path.relpath(file_path, media_root)
tar.add(file_path, arcname=f"media/{arcname}")
# Create incremental manifest
manifest_file = os.path.join(backup_dir, f"{backup_name}.manifest")
self.create_incremental_manifest(media_root, manifest_file, last_backup_time)
return backup_file
def find_last_full_backup(self, backup_dir):
"""Find the most recent full backup"""
full_backups = []
for filename in os.listdir(backup_dir):
if filename.startswith('media_full_') and filename.endswith(('.tar', '.tar.gz')):
file_path = os.path.join(backup_dir, filename)
full_backups.append((os.path.getmtime(file_path), file_path))
if full_backups:
full_backups.sort(reverse=True)
return full_backups[0][1]
return None
def create_manifest(self, media_root, manifest_file):
"""Create backup manifest with file checksums"""
import hashlib
with open(manifest_file, 'w') as f:
f.write(f"# Media Backup Manifest\n")
f.write(f"# Created: {datetime.now().isoformat()}\n")
f.write(f"# Root: {media_root}\n\n")
for root, dirs, files in os.walk(media_root):
for file in files:
file_path = os.path.join(root, file)
rel_path = os.path.relpath(file_path, media_root)
# Calculate MD5 checksum
md5_hash = hashlib.md5()
with open(file_path, 'rb') as file_obj:
for chunk in iter(lambda: file_obj.read(4096), b""):
md5_hash.update(chunk)
file_size = os.path.getsize(file_path)
file_mtime = os.path.getmtime(file_path)
f.write(f"{md5_hash.hexdigest()} {file_size} {file_mtime} {rel_path}\n")
def create_incremental_manifest(self, media_root, manifest_file, since_time):
"""Create incremental backup manifest"""
import hashlib
with open(manifest_file, 'w') as f:
f.write(f"# Incremental Media Backup Manifest\n")
f.write(f"# Created: {datetime.now().isoformat()}\n")
f.write(f"# Since: {since_time.isoformat()}\n")
f.write(f"# Root: {media_root}\n\n")
for root, dirs, files in os.walk(media_root):
for file in files:
file_path = os.path.join(root, file)
file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
if file_mtime > since_time:
rel_path = os.path.relpath(file_path, media_root)
# Calculate MD5 checksum
md5_hash = hashlib.md5()
with open(file_path, 'rb') as file_obj:
for chunk in iter(lambda: file_obj.read(4096), b""):
md5_hash.update(chunk)
file_size = os.path.getsize(file_path)
f.write(f"{md5_hash.hexdigest()} {file_size} {file_mtime.timestamp()} {rel_path}\n")
def sync_to_s3(self, media_root):
"""Sync media files to S3"""
s3_bucket = getattr(settings, 'BACKUP_S3_BUCKET', None)
if not s3_bucket:
self.stdout.write(
self.style.WARNING('No S3 bucket configured for sync')
)
return
s3_client = boto3.client('s3')
for root, dirs, files in os.walk(media_root):
for file in files:
local_path = os.path.join(root, file)
s3_key = f"media/{os.path.relpath(local_path, media_root)}"
try:
# Check if file exists in S3 and compare modification time
try:
s3_object = s3_client.head_object(Bucket=s3_bucket, Key=s3_key)
s3_mtime = s3_object['LastModified'].timestamp()
local_mtime = os.path.getmtime(local_path)
if local_mtime <= s3_mtime:
continue # Skip if S3 version is newer or same
except ClientError:
pass # File doesn't exist in S3, upload it
# Upload file to S3
s3_client.upload_file(
local_path,
s3_bucket,
s3_key,
ExtraArgs={
'StorageClass': 'STANDARD_IA',
'ServerSideEncryption': 'AES256'
}
)
self.stdout.write(f'Synced to S3: {s3_key}')
except Exception as e:
self.stdout.write(
self.style.ERROR(f'Failed to sync {local_path}: {str(e)}')
)
def cleanup_old_backups(self, backup_dir, retention_days):
"""Clean up old backup files"""
cutoff_date = datetime.now() - timedelta(days=retention_days)
for filename in os.listdir(backup_dir):
if filename.startswith('media_') and (filename.endswith('.tar') or filename.endswith('.tar.gz')):
file_path = os.path.join(backup_dir, filename)
file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
if file_mtime < cutoff_date:
os.remove(file_path)
# Also remove corresponding manifest
manifest_file = file_path.replace('.tar.gz', '.manifest').replace('.tar', '.manifest')
if os.path.exists(manifest_file):
os.remove(manifest_file)
self.stdout.write(f'Removed old backup: {filename}')
# backup/orchestrator.py
import os
import logging
import subprocess
from datetime import datetime, timedelta
from django.core.management import call_command
from django.conf import settings
from celery import shared_task
import boto3
logger = logging.getLogger('backup')
class BackupOrchestrator:
"""Orchestrate comprehensive backup operations"""
def __init__(self):
self.backup_config = getattr(settings, 'BACKUP_CONFIG', {})
self.s3_client = boto3.client('s3') if self.backup_config.get('s3_enabled') else None
def run_full_backup(self):
"""Run complete backup of all components"""
logger.info("Starting full backup process")
backup_results = {
'timestamp': datetime.now().isoformat(),
'components': {},
'success': True,
'errors': []
}
# Database backup
try:
db_result = self.backup_database()
backup_results['components']['database'] = db_result
except Exception as e:
logger.error(f"Database backup failed: {str(e)}")
backup_results['errors'].append(f"Database: {str(e)}")
backup_results['success'] = False
# Media files backup
try:
media_result = self.backup_media_files()
backup_results['components']['media'] = media_result
except Exception as e:
logger.error(f"Media backup failed: {str(e)}")
backup_results['errors'].append(f"Media: {str(e)}")
backup_results['success'] = False
# Static files backup
try:
static_result = self.backup_static_files()
backup_results['components']['static'] = static_result
except Exception as e:
logger.error(f"Static files backup failed: {str(e)}")
backup_results['errors'].append(f"Static: {str(e)}")
backup_results['success'] = False
# Configuration backup
try:
config_result = self.backup_configuration()
backup_results['components']['configuration'] = config_result
except Exception as e:
logger.error(f"Configuration backup failed: {str(e)}")
backup_results['errors'].append(f"Configuration: {str(e)}")
backup_results['success'] = False
# Code backup
try:
code_result = self.backup_application_code()
backup_results['components']['code'] = code_result
except Exception as e:
logger.error(f"Code backup failed: {str(e)}")
backup_results['errors'].append(f"Code: {str(e)}")
backup_results['success'] = False
# Send notification
self.send_backup_notification(backup_results)
logger.info(f"Full backup completed. Success: {backup_results['success']}")
return backup_results
def backup_database(self):
"""Backup database using management command"""
logger.info("Starting database backup")
call_command(
'backup_database',
format='custom',
compress=True,
encrypt=self.backup_config.get('encrypt', False),
upload_s3=self.backup_config.get('s3_enabled', False),
retention_days=self.backup_config.get('retention_days', 30)
)
return {
'status': 'success',
'timestamp': datetime.now().isoformat(),
'type': 'database'
}
def backup_media_files(self):
"""Backup media files"""
logger.info("Starting media files backup")
# Determine backup type based on schedule
backup_type = self.get_media_backup_type()
call_command(
'backup_media',
type=backup_type,
compress=True,
sync_s3=self.backup_config.get('s3_enabled', False),
retention_days=self.backup_config.get('retention_days', 30)
)
return {
'status': 'success',
'timestamp': datetime.now().isoformat(),
'type': f'media_{backup_type}'
}
def backup_static_files(self):
"""Backup static files"""
logger.info("Starting static files backup")
static_root = settings.STATIC_ROOT
backup_dir = self.backup_config.get('backup_dir', '/var/backups')
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
# Create tar archive of static files
static_backup = os.path.join(backup_dir, f"static_{timestamp}.tar.gz")
subprocess.run([
'tar', '-czf', static_backup,
'-C', os.path.dirname(static_root),
os.path.basename(static_root)
], check=True)
# Upload to S3 if enabled
if self.s3_client and self.backup_config.get('s3_enabled'):
s3_key = f"static/static_{timestamp}.tar.gz"
self.s3_client.upload_file(
static_backup,
self.backup_config['s3_bucket'],
s3_key
)
return {
'status': 'success',
'timestamp': datetime.now().isoformat(),
'type': 'static',
'file': static_backup
}
def backup_configuration(self):
"""Backup configuration files"""
logger.info("Starting configuration backup")
config_files = [
'/etc/nginx/sites-available/',
'/etc/systemd/system/django-app.service',
'/opt/django_app/.env',
'/opt/django_app/gunicorn.conf.py',
]
backup_dir = self.backup_config.get('backup_dir', '/var/backups')
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
config_backup = os.path.join(backup_dir, f"config_{timestamp}.tar.gz")
# Create tar archive of configuration files
tar_cmd = ['tar', '-czf', config_backup]
for config_file in config_files:
if os.path.exists(config_file):
tar_cmd.append(config_file)
subprocess.run(tar_cmd, check=True)
# Upload to S3 if enabled
if self.s3_client and self.backup_config.get('s3_enabled'):
s3_key = f"config/config_{timestamp}.tar.gz"
self.s3_client.upload_file(
config_backup,
self.backup_config['s3_bucket'],
s3_key
)
return {
'status': 'success',
'timestamp': datetime.now().isoformat(),
'type': 'configuration',
'file': config_backup
}
def backup_application_code(self):
"""Backup application code from Git repository"""
logger.info("Starting application code backup")
app_dir = getattr(settings, 'BASE_DIR', '/opt/django_app')
backup_dir = self.backup_config.get('backup_dir', '/var/backups')
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
# Get current Git commit
try:
git_commit = subprocess.check_output(
['git', 'rev-parse', 'HEAD'],
cwd=app_dir,
text=True
).strip()
except subprocess.CalledProcessError:
git_commit = 'unknown'
# Create Git archive
code_backup = os.path.join(backup_dir, f"code_{timestamp}_{git_commit[:8]}.tar.gz")
subprocess.run([
'git', 'archive',
'--format=tar.gz',
f'--output={code_backup}',
'HEAD'
], cwd=app_dir, check=True)
# Upload to S3 if enabled
if self.s3_client and self.backup_config.get('s3_enabled'):
s3_key = f"code/code_{timestamp}_{git_commit[:8]}.tar.gz"
self.s3_client.upload_file(
code_backup,
self.backup_config['s3_bucket'],
s3_key
)
return {
'status': 'success',
'timestamp': datetime.now().isoformat(),
'type': 'code',
'file': code_backup,
'git_commit': git_commit
}
def get_media_backup_type(self):
"""Determine media backup type based on schedule"""
# Full backup on Sundays, incremental otherwise
if datetime.now().weekday() == 6: # Sunday
return 'full'
else:
return 'incremental'
def send_backup_notification(self, backup_results):
"""Send backup completion notification"""
if backup_results['success']:
message = "✅ Full backup completed successfully"
color = 'good'
else:
message = f"❌ Backup completed with errors: {', '.join(backup_results['errors'])}"
color = 'danger'
# Send Slack notification if configured
slack_webhook = self.backup_config.get('slack_webhook')
if slack_webhook:
import requests
payload = {
'text': message,
'attachments': [
{
'color': color,
'fields': [
{
'title': 'Components',
'value': ', '.join(backup_results['components'].keys()),
'short': True
},
{
'title': 'Timestamp',
'value': backup_results['timestamp'],
'short': True
}
]
}
]
}
try:
requests.post(slack_webhook, json=payload, timeout=10)
except Exception as e:
logger.error(f"Failed to send Slack notification: {str(e)}")
# Celery tasks for automated backups
@shared_task
def run_full_backup():
"""Celery task for full backup"""
orchestrator = BackupOrchestrator()
return orchestrator.run_full_backup()
@shared_task
def run_database_backup():
"""Celery task for database-only backup"""
orchestrator = BackupOrchestrator()
return orchestrator.backup_database()
@shared_task
def run_media_backup():
"""Celery task for media files backup"""
orchestrator = BackupOrchestrator()
return orchestrator.backup_media_files()
# settings/backup.py
from celery.schedules import crontab
# Backup configuration
BACKUP_CONFIG = {
'backup_dir': '/var/backups/django',
'retention_days': 30,
'encrypt': True,
's3_enabled': True,
's3_bucket': 'my-app-backups',
'slack_webhook': 'https://hooks.slack.com/services/...',
}
# Celery beat schedule for automated backups
CELERY_BEAT_SCHEDULE.update({
# Full backup every Sunday at 2 AM
'full-backup-weekly': {
'task': 'backup.tasks.run_full_backup',
'schedule': crontab(hour=2, minute=0, day_of_week=0),
},
# Database backup daily at 3 AM
'database-backup-daily': {
'task': 'backup.tasks.run_database_backup',
'schedule': crontab(hour=3, minute=0),
},
# Media backup daily at 4 AM
'media-backup-daily': {
'task': 'backup.tasks.run_media_backup',
'schedule': crontab(hour=4, minute=0),
},
})
#!/bin/bash
# scripts/disaster_recovery.sh - Disaster recovery procedures
set -e
# Configuration
BACKUP_DIR="${BACKUP_DIR:-/var/backups}"
S3_BUCKET="${S3_BUCKET:-my-app-backups}"
RECOVERY_DIR="${RECOVERY_DIR:-/var/recovery}"
DB_NAME="${DB_NAME:-django_app}"
DB_USER="${DB_USER:-postgres}"
echo "🚨 Starting disaster recovery process..."
# Create recovery directory
mkdir -p "$RECOVERY_DIR"
# Function to restore database
restore_database() {
local backup_file="$1"
echo "🗄️ Restoring database from $backup_file"
# Drop existing database (if exists)
dropdb --if-exists "$DB_NAME"
# Restore from backup
if [[ "$backup_file" == *.sql ]]; then
# Plain SQL backup
createdb "$DB_NAME"
psql -d "$DB_NAME" -f "$backup_file"
else
# Custom format backup
pg_restore --create --dbname=postgres "$backup_file"
fi
echo "✅ Database restored successfully"
}
# Function to restore media files
restore_media() {
local backup_file="$1"
local media_dir="$2"
echo "📁 Restoring media files from $backup_file"
# Create media directory
mkdir -p "$media_dir"
# Extract backup
if [[ "$backup_file" == *.tar.gz ]]; then
tar -xzf "$backup_file" -C "$media_dir" --strip-components=1
else
tar -xf "$backup_file" -C "$media_dir" --strip-components=1
fi
echo "✅ Media files restored successfully"
}
# Function to download from S3
download_from_s3() {
local s3_key="$1"
local local_file="$2"
echo "☁️ Downloading $s3_key from S3..."
aws s3 cp "s3://$S3_BUCKET/$s3_key" "$local_file"
}
# Main recovery process
case "${1:-full}" in
"database")
echo "🔄 Database-only recovery"
# Find latest database backup
if [ -n "$2" ]; then
DB_BACKUP="$2"
else
DB_BACKUP=$(find "$BACKUP_DIR" -name "django_db_*.dump" -o -name "django_db_*.sql" | sort -r | head -1)
fi
if [ -z "$DB_BACKUP" ]; then
echo "❌ No database backup found"
exit 1
fi
restore_database "$DB_BACKUP"
;;
"media")
echo "🔄 Media files recovery"
# Find latest media backup
if [ -n "$2" ]; then
MEDIA_BACKUP="$2"
else
MEDIA_BACKUP=$(find "$BACKUP_DIR" -name "media_*.tar*" | sort -r | head -1)
fi
if [ -z "$MEDIA_BACKUP" ]; then
echo "❌ No media backup found"
exit 1
fi
restore_media "$MEDIA_BACKUP" "/opt/django_app/media"
;;
"full")
echo "🔄 Full disaster recovery"
# Restore database
DB_BACKUP=$(find "$BACKUP_DIR" -name "django_db_*.dump" -o -name "django_db_*.sql" | sort -r | head -1)
if [ -n "$DB_BACKUP" ]; then
restore_database "$DB_BACKUP"
else
echo "⚠️ No database backup found"
fi
# Restore media files
MEDIA_BACKUP=$(find "$BACKUP_DIR" -name "media_*.tar*" | sort -r | head -1)
if [ -n "$MEDIA_BACKUP" ]; then
restore_media "$MEDIA_BACKUP" "/opt/django_app/media"
else
echo "⚠️ No media backup found"
fi
# Restore configuration
CONFIG_BACKUP=$(find "$BACKUP_DIR" -name "config_*.tar*" | sort -r | head -1)
if [ -n "$CONFIG_BACKUP" ]; then
echo "⚙️ Restoring configuration files"
tar -xzf "$CONFIG_BACKUP" -C /
else
echo "⚠️ No configuration backup found"
fi
# Restore application code
CODE_BACKUP=$(find "$BACKUP_DIR" -name "code_*.tar*" | sort -r | head -1)
if [ -n "$CODE_BACKUP" ]; then
echo "💻 Restoring application code"
mkdir -p /opt/django_app/src
tar -xzf "$CODE_BACKUP" -C /opt/django_app/src
else
echo "⚠️ No code backup found"
fi
echo "✅ Full recovery completed"
;;
*)
echo "Usage: $0 [full|database|media] [backup_file]"
exit 1
;;
esac
echo "🎉 Disaster recovery completed successfully!"
This comprehensive backup strategies guide provides all the tools and procedures needed to protect Django applications from data loss and ensure business continuity through robust backup and recovery systems.
Monitoring and Logging
Comprehensive monitoring and logging are essential for maintaining healthy Django applications in production. This chapter covers application performance monitoring, error tracking, log aggregation, alerting systems, and observability best practices for Django applications.
Performance and Optimization
Performance optimization is critical for building scalable Django applications that provide excellent user experiences. This comprehensive guide covers database query optimization, template rendering, caching strategies, profiling techniques, and advanced optimization patterns that transform slow applications into high-performance systems.