Deployment

Backup Strategies

Comprehensive backup strategies are critical for Django applications to ensure data protection, disaster recovery, and business continuity. This chapter covers database backups, file system backups, automated backup procedures, disaster recovery planning, and backup testing strategies.

Backup Strategies

Comprehensive backup strategies are critical for Django applications to ensure data protection, disaster recovery, and business continuity. This chapter covers database backups, file system backups, automated backup procedures, disaster recovery planning, and backup testing strategies.

Database Backup Strategies

PostgreSQL Backup Solutions

#!/bin/bash
# scripts/postgres_backup.sh - Comprehensive PostgreSQL backup script

set -e

# Configuration
DB_NAME="${DB_NAME:-django_app}"
DB_USER="${DB_USER:-postgres}"
DB_HOST="${DB_HOST:-localhost}"
DB_PORT="${DB_PORT:-5432}"
BACKUP_DIR="${BACKUP_DIR:-/var/backups/postgresql}"
RETENTION_DAYS="${RETENTION_DAYS:-30}"
S3_BUCKET="${S3_BUCKET:-my-app-backups}"
ENCRYPTION_KEY="${ENCRYPTION_KEY:-/etc/backup/encryption.key}"

# Create backup directory
mkdir -p "$BACKUP_DIR"

# Generate backup filename with timestamp
TIMESTAMP=$(date +"%Y%m%d_%H%M%S")
BACKUP_FILE="$BACKUP_DIR/${DB_NAME}_${TIMESTAMP}.sql"
COMPRESSED_FILE="${BACKUP_FILE}.gz"
ENCRYPTED_FILE="${COMPRESSED_FILE}.enc"

echo "🗄️ Starting PostgreSQL backup for $DB_NAME..."

# Create database dump
pg_dump \
    --host="$DB_HOST" \
    --port="$DB_PORT" \
    --username="$DB_USER" \
    --dbname="$DB_NAME" \
    --verbose \
    --clean \
    --if-exists \
    --create \
    --format=custom \
    --compress=9 \
    --file="$BACKUP_FILE"

# Verify backup integrity
if pg_restore --list "$BACKUP_FILE" > /dev/null 2>&1; then
    echo "✅ Backup integrity verified"
else
    echo "❌ Backup integrity check failed"
    exit 1
fi

# Compress backup
gzip "$BACKUP_FILE"
echo "📦 Backup compressed: $COMPRESSED_FILE"

# Encrypt backup
if [ -f "$ENCRYPTION_KEY" ]; then
    openssl enc -aes-256-cbc -salt -in "$COMPRESSED_FILE" -out "$ENCRYPTED_FILE" -pass file:"$ENCRYPTION_KEY"
    rm "$COMPRESSED_FILE"
    FINAL_FILE="$ENCRYPTED_FILE"
    echo "🔒 Backup encrypted: $ENCRYPTED_FILE"
else
    FINAL_FILE="$COMPRESSED_FILE"
    echo "⚠️ No encryption key found, backup not encrypted"
fi

# Upload to S3
if command -v aws &> /dev/null && [ -n "$S3_BUCKET" ]; then
    S3_KEY="postgresql/$(basename "$FINAL_FILE")"
    aws s3 cp "$FINAL_FILE" "s3://$S3_BUCKET/$S3_KEY" \
        --storage-class STANDARD_IA \
        --server-side-encryption AES256
    echo "☁️ Backup uploaded to S3: s3://$S3_BUCKET/$S3_KEY"
fi

# Clean up old backups
find "$BACKUP_DIR" -name "${DB_NAME}_*.sql*" -mtime +$RETENTION_DAYS -delete
echo "🧹 Cleaned up backups older than $RETENTION_DAYS days"

# Log backup completion
echo "✅ Backup completed successfully: $FINAL_FILE"
echo "📊 Backup size: $(du -h "$FINAL_FILE" | cut -f1)"

# Send notification
if command -v curl &> /dev/null && [ -n "$SLACK_WEBHOOK" ]; then
    curl -X POST -H 'Content-type: application/json' \
        --data "{\"text\":\"✅ Database backup completed for $DB_NAME\"}" \
        "$SLACK_WEBHOOK"
fi

Advanced Database Backup with Point-in-Time Recovery

#!/bin/bash
# scripts/postgres_pitr_backup.sh - Point-in-time recovery backup

set -e

# Configuration
PGDATA="${PGDATA:-/var/lib/postgresql/data}"
BACKUP_DIR="${BACKUP_DIR:-/var/backups/postgresql/pitr}"
WAL_ARCHIVE_DIR="${WAL_ARCHIVE_DIR:-/var/backups/postgresql/wal}"
RETENTION_DAYS="${RETENTION_DAYS:-7}"

# Create directories
mkdir -p "$BACKUP_DIR" "$WAL_ARCHIVE_DIR"

echo "🔄 Starting point-in-time recovery backup..."

# Start base backup
BACKUP_LABEL=$(date +"%Y%m%d_%H%M%S")
BASE_BACKUP_DIR="$BACKUP_DIR/base_$BACKUP_LABEL"

# Create base backup using pg_basebackup
pg_basebackup \
    --pgdata="$BASE_BACKUP_DIR" \
    --format=tar \
    --gzip \
    --compress=9 \
    --checkpoint=fast \
    --label="base_backup_$BACKUP_LABEL" \
    --progress \
    --verbose \
    --wal-method=stream

echo "✅ Base backup completed: $BASE_BACKUP_DIR"

# Configure WAL archiving (add to postgresql.conf)
cat >> "$PGDATA/postgresql.conf" << EOF
# WAL archiving configuration
wal_level = replica
archive_mode = on
archive_command = 'cp %p $WAL_ARCHIVE_DIR/%f'
archive_timeout = 300
EOF

# Create recovery configuration template
cat > "$BASE_BACKUP_DIR/recovery.conf.template" << EOF
# Point-in-time recovery configuration
restore_command = 'cp $WAL_ARCHIVE_DIR/%f %p'
recovery_target_time = 'YYYY-MM-DD HH:MM:SS'
recovery_target_inclusive = true
EOF

# Clean up old base backups
find "$BACKUP_DIR" -name "base_*" -mtime +$RETENTION_DAYS -exec rm -rf {} +

# Clean up old WAL files (keep files from last base backup)
LAST_BACKUP_TIME=$(find "$BACKUP_DIR" -name "base_*" -type d -printf '%T@ %p\n' | sort -n | tail -1 | cut -d' ' -f2-)
if [ -n "$LAST_BACKUP_TIME" ]; then
    find "$WAL_ARCHIVE_DIR" -type f -not -newer "$LAST_BACKUP_TIME" -delete
fi

echo "✅ Point-in-time recovery backup setup completed"

Django Database Backup Management

# management/commands/backup_database.py
import os
import subprocess
import gzip
import shutil
from datetime import datetime, timedelta
from django.core.management.base import BaseCommand
from django.conf import settings
from django.db import connection
import boto3

class Command(BaseCommand):
    help = 'Create database backup with various options'
    
    def add_arguments(self, parser):
        parser.add_argument('--format', choices=['sql', 'custom', 'json'], default='custom')
        parser.add_argument('--compress', action='store_true', help='Compress backup')
        parser.add_argument('--encrypt', action='store_true', help='Encrypt backup')
        parser.add_argument('--upload-s3', action='store_true', help='Upload to S3')
        parser.add_argument('--retention-days', type=int, default=30)
        parser.add_argument('--output-dir', default='/var/backups/django')
    
    def handle(self, *args, **options):
        self.stdout.write('Starting database backup...')
        
        # Get database configuration
        db_config = settings.DATABASES['default']
        
        # Generate backup filename
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        backup_name = f"django_db_{timestamp}"
        
        if options['format'] == 'json':
            backup_file = self.create_json_backup(backup_name, options)
        else:
            backup_file = self.create_sql_backup(backup_name, options, db_config)
        
        # Compress if requested
        if options['compress']:
            backup_file = self.compress_backup(backup_file)
        
        # Encrypt if requested
        if options['encrypt']:
            backup_file = self.encrypt_backup(backup_file)
        
        # Upload to S3 if requested
        if options['upload_s3']:
            self.upload_to_s3(backup_file)
        
        # Clean up old backups
        self.cleanup_old_backups(options['output_dir'], options['retention_days'])
        
        self.stdout.write(
            self.style.SUCCESS(f'Backup completed successfully: {backup_file}')
        )
    
    def create_sql_backup(self, backup_name, options, db_config):
        """Create SQL backup using pg_dump"""
        backup_dir = options['output_dir']
        os.makedirs(backup_dir, exist_ok=True)
        
        if options['format'] == 'custom':
            backup_file = os.path.join(backup_dir, f"{backup_name}.dump")
            format_option = 'custom'
        else:
            backup_file = os.path.join(backup_dir, f"{backup_name}.sql")
            format_option = 'plain'
        
        # Build pg_dump command
        cmd = [
            'pg_dump',
            f"--host={db_config['HOST']}",
            f"--port={db_config['PORT']}",
            f"--username={db_config['USER']}",
            f"--dbname={db_config['NAME']}",
            f"--format={format_option}",
            '--verbose',
            '--clean',
            '--if-exists',
            '--create',
        ]
        
        if format_option == 'custom':
            cmd.append('--compress=9')
        
        cmd.extend(['-f', backup_file])
        
        # Set password via environment
        env = os.environ.copy()
        env['PGPASSWORD'] = db_config['PASSWORD']
        
        # Execute backup
        result = subprocess.run(cmd, env=env, capture_output=True, text=True)
        
        if result.returncode != 0:
            raise Exception(f"pg_dump failed: {result.stderr}")
        
        return backup_file
    
    def create_json_backup(self, backup_name, options):
        """Create JSON backup using Django's dumpdata"""
        backup_dir = options['output_dir']
        os.makedirs(backup_dir, exist_ok=True)
        
        backup_file = os.path.join(backup_dir, f"{backup_name}.json")
        
        # Use Django's dumpdata command
        from django.core.management import call_command
        
        with open(backup_file, 'w') as f:
            call_command(
                'dumpdata',
                '--natural-foreign',
                '--natural-primary',
                '--indent=2',
                stdout=f
            )
        
        return backup_file
    
    def compress_backup(self, backup_file):
        """Compress backup file"""
        compressed_file = f"{backup_file}.gz"
        
        with open(backup_file, 'rb') as f_in:
            with gzip.open(compressed_file, 'wb') as f_out:
                shutil.copyfileobj(f_in, f_out)
        
        os.remove(backup_file)
        return compressed_file
    
    def encrypt_backup(self, backup_file):
        """Encrypt backup file"""
        encrypted_file = f"{backup_file}.enc"
        encryption_key = getattr(settings, 'BACKUP_ENCRYPTION_KEY', None)
        
        if not encryption_key:
            self.stdout.write(
                self.style.WARNING('No encryption key configured, skipping encryption')
            )
            return backup_file
        
        cmd = [
            'openssl', 'enc', '-aes-256-cbc', '-salt',
            '-in', backup_file,
            '-out', encrypted_file,
            '-pass', f'file:{encryption_key}'
        ]
        
        result = subprocess.run(cmd, capture_output=True)
        
        if result.returncode != 0:
            raise Exception(f"Encryption failed: {result.stderr}")
        
        os.remove(backup_file)
        return encrypted_file
    
    def upload_to_s3(self, backup_file):
        """Upload backup to S3"""
        s3_bucket = getattr(settings, 'BACKUP_S3_BUCKET', None)
        
        if not s3_bucket:
            self.stdout.write(
                self.style.WARNING('No S3 bucket configured, skipping upload')
            )
            return
        
        s3_client = boto3.client('s3')
        s3_key = f"database/{os.path.basename(backup_file)}"
        
        try:
            s3_client.upload_file(
                backup_file,
                s3_bucket,
                s3_key,
                ExtraArgs={
                    'StorageClass': 'STANDARD_IA',
                    'ServerSideEncryption': 'AES256'
                }
            )
            self.stdout.write(f'Backup uploaded to S3: s3://{s3_bucket}/{s3_key}')
        except Exception as e:
            self.stdout.write(
                self.style.ERROR(f'S3 upload failed: {str(e)}')
            )
    
    def cleanup_old_backups(self, backup_dir, retention_days):
        """Clean up old backup files"""
        cutoff_date = datetime.now() - timedelta(days=retention_days)
        
        for filename in os.listdir(backup_dir):
            file_path = os.path.join(backup_dir, filename)
            
            if os.path.isfile(file_path):
                file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
                
                if file_mtime < cutoff_date:
                    os.remove(file_path)
                    self.stdout.write(f'Removed old backup: {filename}')

File System and Media Backup

Media Files Backup Strategy

# management/commands/backup_media.py
import os
import shutil
import tarfile
from datetime import datetime, timedelta
from django.core.management.base import BaseCommand
from django.conf import settings
import boto3
from botocore.exceptions import ClientError

class Command(BaseCommand):
    help = 'Backup media files with incremental and full backup options'
    
    def add_arguments(self, parser):
        parser.add_argument('--type', choices=['full', 'incremental'], default='incremental')
        parser.add_argument('--compress', action='store_true', help='Compress backup')
        parser.add_argument('--sync-s3', action='store_true', help='Sync to S3')
        parser.add_argument('--retention-days', type=int, default=30)
        parser.add_argument('--output-dir', default='/var/backups/media')
    
    def handle(self, *args, **options):
        self.stdout.write('Starting media files backup...')
        
        media_root = settings.MEDIA_ROOT
        backup_dir = options['output_dir']
        os.makedirs(backup_dir, exist_ok=True)
        
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        
        if options['type'] == 'full':
            backup_file = self.create_full_backup(media_root, backup_dir, timestamp, options)
        else:
            backup_file = self.create_incremental_backup(media_root, backup_dir, timestamp, options)
        
        if options['sync_s3']:
            self.sync_to_s3(media_root)
        
        self.cleanup_old_backups(backup_dir, options['retention_days'])
        
        self.stdout.write(
            self.style.SUCCESS(f'Media backup completed: {backup_file}')
        )
    
    def create_full_backup(self, media_root, backup_dir, timestamp, options):
        """Create full backup of media files"""
        backup_name = f"media_full_{timestamp}"
        
        if options['compress']:
            backup_file = os.path.join(backup_dir, f"{backup_name}.tar.gz")
            mode = 'w:gz'
        else:
            backup_file = os.path.join(backup_dir, f"{backup_name}.tar")
            mode = 'w'
        
        with tarfile.open(backup_file, mode) as tar:
            tar.add(media_root, arcname='media')
        
        # Create backup manifest
        manifest_file = os.path.join(backup_dir, f"{backup_name}.manifest")
        self.create_manifest(media_root, manifest_file)
        
        return backup_file
    
    def create_incremental_backup(self, media_root, backup_dir, timestamp, options):
        """Create incremental backup based on last full backup"""
        # Find last full backup
        last_full_backup = self.find_last_full_backup(backup_dir)
        
        if not last_full_backup:
            self.stdout.write('No full backup found, creating full backup instead')
            return self.create_full_backup(media_root, backup_dir, timestamp, options)
        
        # Get modification time of last backup
        last_backup_time = datetime.fromtimestamp(os.path.getmtime(last_full_backup))
        
        backup_name = f"media_incremental_{timestamp}"
        
        if options['compress']:
            backup_file = os.path.join(backup_dir, f"{backup_name}.tar.gz")
            mode = 'w:gz'
        else:
            backup_file = os.path.join(backup_dir, f"{backup_name}.tar")
            mode = 'w'
        
        # Create incremental backup with only modified files
        with tarfile.open(backup_file, mode) as tar:
            for root, dirs, files in os.walk(media_root):
                for file in files:
                    file_path = os.path.join(root, file)
                    file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
                    
                    if file_mtime > last_backup_time:
                        arcname = os.path.relpath(file_path, media_root)
                        tar.add(file_path, arcname=f"media/{arcname}")
        
        # Create incremental manifest
        manifest_file = os.path.join(backup_dir, f"{backup_name}.manifest")
        self.create_incremental_manifest(media_root, manifest_file, last_backup_time)
        
        return backup_file
    
    def find_last_full_backup(self, backup_dir):
        """Find the most recent full backup"""
        full_backups = []
        
        for filename in os.listdir(backup_dir):
            if filename.startswith('media_full_') and filename.endswith(('.tar', '.tar.gz')):
                file_path = os.path.join(backup_dir, filename)
                full_backups.append((os.path.getmtime(file_path), file_path))
        
        if full_backups:
            full_backups.sort(reverse=True)
            return full_backups[0][1]
        
        return None
    
    def create_manifest(self, media_root, manifest_file):
        """Create backup manifest with file checksums"""
        import hashlib
        
        with open(manifest_file, 'w') as f:
            f.write(f"# Media Backup Manifest\n")
            f.write(f"# Created: {datetime.now().isoformat()}\n")
            f.write(f"# Root: {media_root}\n\n")
            
            for root, dirs, files in os.walk(media_root):
                for file in files:
                    file_path = os.path.join(root, file)
                    rel_path = os.path.relpath(file_path, media_root)
                    
                    # Calculate MD5 checksum
                    md5_hash = hashlib.md5()
                    with open(file_path, 'rb') as file_obj:
                        for chunk in iter(lambda: file_obj.read(4096), b""):
                            md5_hash.update(chunk)
                    
                    file_size = os.path.getsize(file_path)
                    file_mtime = os.path.getmtime(file_path)
                    
                    f.write(f"{md5_hash.hexdigest()}  {file_size}  {file_mtime}  {rel_path}\n")
    
    def create_incremental_manifest(self, media_root, manifest_file, since_time):
        """Create incremental backup manifest"""
        import hashlib
        
        with open(manifest_file, 'w') as f:
            f.write(f"# Incremental Media Backup Manifest\n")
            f.write(f"# Created: {datetime.now().isoformat()}\n")
            f.write(f"# Since: {since_time.isoformat()}\n")
            f.write(f"# Root: {media_root}\n\n")
            
            for root, dirs, files in os.walk(media_root):
                for file in files:
                    file_path = os.path.join(root, file)
                    file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
                    
                    if file_mtime > since_time:
                        rel_path = os.path.relpath(file_path, media_root)
                        
                        # Calculate MD5 checksum
                        md5_hash = hashlib.md5()
                        with open(file_path, 'rb') as file_obj:
                            for chunk in iter(lambda: file_obj.read(4096), b""):
                                md5_hash.update(chunk)
                        
                        file_size = os.path.getsize(file_path)
                        
                        f.write(f"{md5_hash.hexdigest()}  {file_size}  {file_mtime.timestamp()}  {rel_path}\n")
    
    def sync_to_s3(self, media_root):
        """Sync media files to S3"""
        s3_bucket = getattr(settings, 'BACKUP_S3_BUCKET', None)
        
        if not s3_bucket:
            self.stdout.write(
                self.style.WARNING('No S3 bucket configured for sync')
            )
            return
        
        s3_client = boto3.client('s3')
        
        for root, dirs, files in os.walk(media_root):
            for file in files:
                local_path = os.path.join(root, file)
                s3_key = f"media/{os.path.relpath(local_path, media_root)}"
                
                try:
                    # Check if file exists in S3 and compare modification time
                    try:
                        s3_object = s3_client.head_object(Bucket=s3_bucket, Key=s3_key)
                        s3_mtime = s3_object['LastModified'].timestamp()
                        local_mtime = os.path.getmtime(local_path)
                        
                        if local_mtime <= s3_mtime:
                            continue  # Skip if S3 version is newer or same
                    except ClientError:
                        pass  # File doesn't exist in S3, upload it
                    
                    # Upload file to S3
                    s3_client.upload_file(
                        local_path,
                        s3_bucket,
                        s3_key,
                        ExtraArgs={
                            'StorageClass': 'STANDARD_IA',
                            'ServerSideEncryption': 'AES256'
                        }
                    )
                    
                    self.stdout.write(f'Synced to S3: {s3_key}')
                    
                except Exception as e:
                    self.stdout.write(
                        self.style.ERROR(f'Failed to sync {local_path}: {str(e)}')
                    )
    
    def cleanup_old_backups(self, backup_dir, retention_days):
        """Clean up old backup files"""
        cutoff_date = datetime.now() - timedelta(days=retention_days)
        
        for filename in os.listdir(backup_dir):
            if filename.startswith('media_') and (filename.endswith('.tar') or filename.endswith('.tar.gz')):
                file_path = os.path.join(backup_dir, filename)
                file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
                
                if file_mtime < cutoff_date:
                    os.remove(file_path)
                    
                    # Also remove corresponding manifest
                    manifest_file = file_path.replace('.tar.gz', '.manifest').replace('.tar', '.manifest')
                    if os.path.exists(manifest_file):
                        os.remove(manifest_file)
                    
                    self.stdout.write(f'Removed old backup: {filename}')

Automated Backup Orchestration

Comprehensive Backup Orchestrator

# backup/orchestrator.py
import os
import logging
import subprocess
from datetime import datetime, timedelta
from django.core.management import call_command
from django.conf import settings
from celery import shared_task
import boto3

logger = logging.getLogger('backup')

class BackupOrchestrator:
    """Orchestrate comprehensive backup operations"""
    
    def __init__(self):
        self.backup_config = getattr(settings, 'BACKUP_CONFIG', {})
        self.s3_client = boto3.client('s3') if self.backup_config.get('s3_enabled') else None
    
    def run_full_backup(self):
        """Run complete backup of all components"""
        logger.info("Starting full backup process")
        
        backup_results = {
            'timestamp': datetime.now().isoformat(),
            'components': {},
            'success': True,
            'errors': []
        }
        
        # Database backup
        try:
            db_result = self.backup_database()
            backup_results['components']['database'] = db_result
        except Exception as e:
            logger.error(f"Database backup failed: {str(e)}")
            backup_results['errors'].append(f"Database: {str(e)}")
            backup_results['success'] = False
        
        # Media files backup
        try:
            media_result = self.backup_media_files()
            backup_results['components']['media'] = media_result
        except Exception as e:
            logger.error(f"Media backup failed: {str(e)}")
            backup_results['errors'].append(f"Media: {str(e)}")
            backup_results['success'] = False
        
        # Static files backup
        try:
            static_result = self.backup_static_files()
            backup_results['components']['static'] = static_result
        except Exception as e:
            logger.error(f"Static files backup failed: {str(e)}")
            backup_results['errors'].append(f"Static: {str(e)}")
            backup_results['success'] = False
        
        # Configuration backup
        try:
            config_result = self.backup_configuration()
            backup_results['components']['configuration'] = config_result
        except Exception as e:
            logger.error(f"Configuration backup failed: {str(e)}")
            backup_results['errors'].append(f"Configuration: {str(e)}")
            backup_results['success'] = False
        
        # Code backup
        try:
            code_result = self.backup_application_code()
            backup_results['components']['code'] = code_result
        except Exception as e:
            logger.error(f"Code backup failed: {str(e)}")
            backup_results['errors'].append(f"Code: {str(e)}")
            backup_results['success'] = False
        
        # Send notification
        self.send_backup_notification(backup_results)
        
        logger.info(f"Full backup completed. Success: {backup_results['success']}")
        return backup_results
    
    def backup_database(self):
        """Backup database using management command"""
        logger.info("Starting database backup")
        
        call_command(
            'backup_database',
            format='custom',
            compress=True,
            encrypt=self.backup_config.get('encrypt', False),
            upload_s3=self.backup_config.get('s3_enabled', False),
            retention_days=self.backup_config.get('retention_days', 30)
        )
        
        return {
            'status': 'success',
            'timestamp': datetime.now().isoformat(),
            'type': 'database'
        }
    
    def backup_media_files(self):
        """Backup media files"""
        logger.info("Starting media files backup")
        
        # Determine backup type based on schedule
        backup_type = self.get_media_backup_type()
        
        call_command(
            'backup_media',
            type=backup_type,
            compress=True,
            sync_s3=self.backup_config.get('s3_enabled', False),
            retention_days=self.backup_config.get('retention_days', 30)
        )
        
        return {
            'status': 'success',
            'timestamp': datetime.now().isoformat(),
            'type': f'media_{backup_type}'
        }
    
    def backup_static_files(self):
        """Backup static files"""
        logger.info("Starting static files backup")
        
        static_root = settings.STATIC_ROOT
        backup_dir = self.backup_config.get('backup_dir', '/var/backups')
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        
        # Create tar archive of static files
        static_backup = os.path.join(backup_dir, f"static_{timestamp}.tar.gz")
        
        subprocess.run([
            'tar', '-czf', static_backup,
            '-C', os.path.dirname(static_root),
            os.path.basename(static_root)
        ], check=True)
        
        # Upload to S3 if enabled
        if self.s3_client and self.backup_config.get('s3_enabled'):
            s3_key = f"static/static_{timestamp}.tar.gz"
            self.s3_client.upload_file(
                static_backup,
                self.backup_config['s3_bucket'],
                s3_key
            )
        
        return {
            'status': 'success',
            'timestamp': datetime.now().isoformat(),
            'type': 'static',
            'file': static_backup
        }
    
    def backup_configuration(self):
        """Backup configuration files"""
        logger.info("Starting configuration backup")
        
        config_files = [
            '/etc/nginx/sites-available/',
            '/etc/systemd/system/django-app.service',
            '/opt/django_app/.env',
            '/opt/django_app/gunicorn.conf.py',
        ]
        
        backup_dir = self.backup_config.get('backup_dir', '/var/backups')
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        config_backup = os.path.join(backup_dir, f"config_{timestamp}.tar.gz")
        
        # Create tar archive of configuration files
        tar_cmd = ['tar', '-czf', config_backup]
        
        for config_file in config_files:
            if os.path.exists(config_file):
                tar_cmd.append(config_file)
        
        subprocess.run(tar_cmd, check=True)
        
        # Upload to S3 if enabled
        if self.s3_client and self.backup_config.get('s3_enabled'):
            s3_key = f"config/config_{timestamp}.tar.gz"
            self.s3_client.upload_file(
                config_backup,
                self.backup_config['s3_bucket'],
                s3_key
            )
        
        return {
            'status': 'success',
            'timestamp': datetime.now().isoformat(),
            'type': 'configuration',
            'file': config_backup
        }
    
    def backup_application_code(self):
        """Backup application code from Git repository"""
        logger.info("Starting application code backup")
        
        app_dir = getattr(settings, 'BASE_DIR', '/opt/django_app')
        backup_dir = self.backup_config.get('backup_dir', '/var/backups')
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        
        # Get current Git commit
        try:
            git_commit = subprocess.check_output(
                ['git', 'rev-parse', 'HEAD'],
                cwd=app_dir,
                text=True
            ).strip()
        except subprocess.CalledProcessError:
            git_commit = 'unknown'
        
        # Create Git archive
        code_backup = os.path.join(backup_dir, f"code_{timestamp}_{git_commit[:8]}.tar.gz")
        
        subprocess.run([
            'git', 'archive',
            '--format=tar.gz',
            f'--output={code_backup}',
            'HEAD'
        ], cwd=app_dir, check=True)
        
        # Upload to S3 if enabled
        if self.s3_client and self.backup_config.get('s3_enabled'):
            s3_key = f"code/code_{timestamp}_{git_commit[:8]}.tar.gz"
            self.s3_client.upload_file(
                code_backup,
                self.backup_config['s3_bucket'],
                s3_key
            )
        
        return {
            'status': 'success',
            'timestamp': datetime.now().isoformat(),
            'type': 'code',
            'file': code_backup,
            'git_commit': git_commit
        }
    
    def get_media_backup_type(self):
        """Determine media backup type based on schedule"""
        # Full backup on Sundays, incremental otherwise
        if datetime.now().weekday() == 6:  # Sunday
            return 'full'
        else:
            return 'incremental'
    
    def send_backup_notification(self, backup_results):
        """Send backup completion notification"""
        if backup_results['success']:
            message = "✅ Full backup completed successfully"
            color = 'good'
        else:
            message = f"❌ Backup completed with errors: {', '.join(backup_results['errors'])}"
            color = 'danger'
        
        # Send Slack notification if configured
        slack_webhook = self.backup_config.get('slack_webhook')
        if slack_webhook:
            import requests
            
            payload = {
                'text': message,
                'attachments': [
                    {
                        'color': color,
                        'fields': [
                            {
                                'title': 'Components',
                                'value': ', '.join(backup_results['components'].keys()),
                                'short': True
                            },
                            {
                                'title': 'Timestamp',
                                'value': backup_results['timestamp'],
                                'short': True
                            }
                        ]
                    }
                ]
            }
            
            try:
                requests.post(slack_webhook, json=payload, timeout=10)
            except Exception as e:
                logger.error(f"Failed to send Slack notification: {str(e)}")

# Celery tasks for automated backups
@shared_task
def run_full_backup():
    """Celery task for full backup"""
    orchestrator = BackupOrchestrator()
    return orchestrator.run_full_backup()

@shared_task
def run_database_backup():
    """Celery task for database-only backup"""
    orchestrator = BackupOrchestrator()
    return orchestrator.backup_database()

@shared_task
def run_media_backup():
    """Celery task for media files backup"""
    orchestrator = BackupOrchestrator()
    return orchestrator.backup_media_files()

Backup Scheduling Configuration

# settings/backup.py
from celery.schedules import crontab

# Backup configuration
BACKUP_CONFIG = {
    'backup_dir': '/var/backups/django',
    'retention_days': 30,
    'encrypt': True,
    's3_enabled': True,
    's3_bucket': 'my-app-backups',
    'slack_webhook': 'https://hooks.slack.com/services/...',
}

# Celery beat schedule for automated backups
CELERY_BEAT_SCHEDULE.update({
    # Full backup every Sunday at 2 AM
    'full-backup-weekly': {
        'task': 'backup.tasks.run_full_backup',
        'schedule': crontab(hour=2, minute=0, day_of_week=0),
    },
    
    # Database backup daily at 3 AM
    'database-backup-daily': {
        'task': 'backup.tasks.run_database_backup',
        'schedule': crontab(hour=3, minute=0),
    },
    
    # Media backup daily at 4 AM
    'media-backup-daily': {
        'task': 'backup.tasks.run_media_backup',
        'schedule': crontab(hour=4, minute=0),
    },
})

Disaster Recovery Planning

Recovery Procedures

#!/bin/bash
# scripts/disaster_recovery.sh - Disaster recovery procedures

set -e

# Configuration
BACKUP_DIR="${BACKUP_DIR:-/var/backups}"
S3_BUCKET="${S3_BUCKET:-my-app-backups}"
RECOVERY_DIR="${RECOVERY_DIR:-/var/recovery}"
DB_NAME="${DB_NAME:-django_app}"
DB_USER="${DB_USER:-postgres}"

echo "🚨 Starting disaster recovery process..."

# Create recovery directory
mkdir -p "$RECOVERY_DIR"

# Function to restore database
restore_database() {
    local backup_file="$1"
    
    echo "🗄️ Restoring database from $backup_file"
    
    # Drop existing database (if exists)
    dropdb --if-exists "$DB_NAME"
    
    # Restore from backup
    if [[ "$backup_file" == *.sql ]]; then
        # Plain SQL backup
        createdb "$DB_NAME"
        psql -d "$DB_NAME" -f "$backup_file"
    else
        # Custom format backup
        pg_restore --create --dbname=postgres "$backup_file"
    fi
    
    echo "✅ Database restored successfully"
}

# Function to restore media files
restore_media() {
    local backup_file="$1"
    local media_dir="$2"
    
    echo "📁 Restoring media files from $backup_file"
    
    # Create media directory
    mkdir -p "$media_dir"
    
    # Extract backup
    if [[ "$backup_file" == *.tar.gz ]]; then
        tar -xzf "$backup_file" -C "$media_dir" --strip-components=1
    else
        tar -xf "$backup_file" -C "$media_dir" --strip-components=1
    fi
    
    echo "✅ Media files restored successfully"
}

# Function to download from S3
download_from_s3() {
    local s3_key="$1"
    local local_file="$2"
    
    echo "☁️ Downloading $s3_key from S3..."
    aws s3 cp "s3://$S3_BUCKET/$s3_key" "$local_file"
}

# Main recovery process
case "${1:-full}" in
    "database")
        echo "🔄 Database-only recovery"
        
        # Find latest database backup
        if [ -n "$2" ]; then
            DB_BACKUP="$2"
        else
            DB_BACKUP=$(find "$BACKUP_DIR" -name "django_db_*.dump" -o -name "django_db_*.sql" | sort -r | head -1)
        fi
        
        if [ -z "$DB_BACKUP" ]; then
            echo "❌ No database backup found"
            exit 1
        fi
        
        restore_database "$DB_BACKUP"
        ;;
        
    "media")
        echo "🔄 Media files recovery"
        
        # Find latest media backup
        if [ -n "$2" ]; then
            MEDIA_BACKUP="$2"
        else
            MEDIA_BACKUP=$(find "$BACKUP_DIR" -name "media_*.tar*" | sort -r | head -1)
        fi
        
        if [ -z "$MEDIA_BACKUP" ]; then
            echo "❌ No media backup found"
            exit 1
        fi
        
        restore_media "$MEDIA_BACKUP" "/opt/django_app/media"
        ;;
        
    "full")
        echo "🔄 Full disaster recovery"
        
        # Restore database
        DB_BACKUP=$(find "$BACKUP_DIR" -name "django_db_*.dump" -o -name "django_db_*.sql" | sort -r | head -1)
        if [ -n "$DB_BACKUP" ]; then
            restore_database "$DB_BACKUP"
        else
            echo "⚠️ No database backup found"
        fi
        
        # Restore media files
        MEDIA_BACKUP=$(find "$BACKUP_DIR" -name "media_*.tar*" | sort -r | head -1)
        if [ -n "$MEDIA_BACKUP" ]; then
            restore_media "$MEDIA_BACKUP" "/opt/django_app/media"
        else
            echo "⚠️ No media backup found"
        fi
        
        # Restore configuration
        CONFIG_BACKUP=$(find "$BACKUP_DIR" -name "config_*.tar*" | sort -r | head -1)
        if [ -n "$CONFIG_BACKUP" ]; then
            echo "⚙️ Restoring configuration files"
            tar -xzf "$CONFIG_BACKUP" -C /
        else
            echo "⚠️ No configuration backup found"
        fi
        
        # Restore application code
        CODE_BACKUP=$(find "$BACKUP_DIR" -name "code_*.tar*" | sort -r | head -1)
        if [ -n "$CODE_BACKUP" ]; then
            echo "💻 Restoring application code"
            mkdir -p /opt/django_app/src
            tar -xzf "$CODE_BACKUP" -C /opt/django_app/src
        else
            echo "⚠️ No code backup found"
        fi
        
        echo "✅ Full recovery completed"
        ;;
        
    *)
        echo "Usage: $0 [full|database|media] [backup_file]"
        exit 1
        ;;
esac

echo "🎉 Disaster recovery completed successfully!"

This comprehensive backup strategies guide provides all the tools and procedures needed to protect Django applications from data loss and ensure business continuity through robust backup and recovery systems.