Migrations

Transaction Handling

Django migrations run within database transactions by default, providing atomicity and consistency during schema changes. Understanding transaction behavior in migrations is crucial for maintaining data integrity and handling complex migration scenarios safely.

Transaction Handling

Django migrations run within database transactions by default, providing atomicity and consistency during schema changes. Understanding transaction behavior in migrations is crucial for maintaining data integrity and handling complex migration scenarios safely.

Migration Atomicity

Default Transaction Behavior

# By default, migrations run in a single transaction
class Migration(migrations.Migration):
    """Standard atomic migration"""
    
    dependencies = [
        ('blog', '0001_initial'),
    ]
    
    # atomic = True is the default
    atomic = True
    
    operations = [
        migrations.AddField(
            model_name='post',
            name='view_count',
            field=models.PositiveIntegerField(default=0),
        ),
        migrations.AddField(
            model_name='post',
            name='like_count',
            field=models.PositiveIntegerField(default=0),
        ),
        # All operations succeed or all fail together
    ]

# Non-atomic migration for special cases
class Migration(migrations.Migration):
    """Non-atomic migration for operations that can't run in transactions"""
    
    dependencies = [
        ('blog', '0002_add_counters'),
    ]
    
    # Disable atomicity for this migration
    atomic = False
    
    operations = [
        # PostgreSQL: CREATE INDEX CONCURRENTLY cannot run in transaction
        migrations.RunSQL(
            sql="CREATE INDEX CONCURRENTLY idx_post_title_gin ON blog_post USING gin(to_tsvector('english', title));",
            reverse_sql="DROP INDEX IF EXISTS idx_post_title_gin;",
        ),
    ]

# Database-specific atomic behavior
class DatabaseSpecificMigration(migrations.Migration):
    """Handle atomicity differently per database"""
    
    dependencies = [
        ('blog', '0003_add_search_index'),
    ]
    
    def __init__(self, name, app_label):
        super().__init__(name, app_label)
        
        # Set atomicity based on database backend
        from django.db import connection
        
        if connection.vendor == 'postgresql':
            # PostgreSQL supports most operations in transactions
            self.atomic = True
        elif connection.vendor == 'mysql':
            # MySQL has limitations with DDL in transactions
            self.atomic = False
        else:
            # Default behavior for other databases
            self.atomic = True
    
    operations = [
        migrations.AddIndex(
            model_name='post',
            index=models.Index(fields=['created_at'], name='idx_post_created'),
        ),
    ]

Transaction Control Patterns

# Manual transaction control within migrations
from django.db import transaction

class TransactionControlledMigration(migrations.Migration):
    """Migration with manual transaction control"""
    
    dependencies = [
        ('blog', '0004_database_specific'),
    ]
    
    # Disable automatic transaction wrapping
    atomic = False
    
    operations = [
        migrations.RunPython(
            code=migrate_data_with_transaction_control,
            reverse_code=migrations.RunPython.noop,
        ),
    ]

def migrate_data_with_transaction_control(apps, schema_editor):
    """Migrate data with custom transaction handling"""
    
    Post = apps.get_model('blog', 'Post')
    
    # Process data in smaller transactions
    batch_size = 1000
    total_posts = Post.objects.count()
    processed = 0
    
    while processed < total_posts:
        with transaction.atomic():
            # Process batch within transaction
            posts = Post.objects.all()[processed:processed + batch_size]
            
            for post in posts:
                # Perform data transformation
                if not post.slug:
                    post.slug = slugify(post.title)
                    post.save()
            
            processed += batch_size
            print(f"Processed {processed}/{total_posts} posts")

# Savepoint usage in migrations
def migrate_with_savepoints(apps, schema_editor):
    """Use savepoints for partial rollback capability"""
    
    Post = apps.get_model('blog', 'Post')
    Category = apps.get_model('blog', 'Category')
    
    with transaction.atomic():
        # Create savepoint before risky operation
        savepoint = transaction.savepoint()
        
        try:
            # Risky data transformation
            for post in Post.objects.all():
                # Complex logic that might fail
                if post.category_name:
                    category, created = Category.objects.get_or_create(
                        name=post.category_name
                    )
                    post.category = category
                    post.save()
            
            # Commit savepoint if successful
            transaction.savepoint_commit(savepoint)
            
        except Exception as e:
            # Rollback to savepoint on error
            transaction.savepoint_rollback(savepoint)
            print(f"Error during migration: {e}")
            raise

# Conditional transaction handling
class ConditionalTransactionMigration(migrations.Migration):
    """Migration with conditional transaction behavior"""
    
    dependencies = [
        ('blog', '0005_transaction_controlled'),
    ]
    
    def __init__(self, name, app_label):
        super().__init__(name, app_label)
        
        # Determine atomicity based on data size
        from django.db import connection
        
        with connection.cursor() as cursor:
            cursor.execute("SELECT COUNT(*) FROM blog_post")
            post_count = cursor.fetchone()[0]
            
            # Use non-atomic for large datasets
            self.atomic = post_count < 100000
    
    operations = [
        migrations.RunPython(
            code=conditional_data_migration,
            reverse_code=migrations.RunPython.noop,
        ),
    ]

def conditional_data_migration(apps, schema_editor):
    """Data migration that adapts to dataset size"""
    
    Post = apps.get_model('blog', 'Post')
    total_posts = Post.objects.count()
    
    if total_posts < 10000:
        # Small dataset: process all at once
        with transaction.atomic():
            for post in Post.objects.all():
                post.updated_at = post.created_at
                post.save()
    
    else:
        # Large dataset: process in batches
        batch_size = 5000
        processed = 0
        
        while processed < total_posts:
            with transaction.atomic():
                posts = Post.objects.all()[processed:processed + batch_size]
                
                Post.objects.filter(
                    id__in=[p.id for p in posts]
                ).update(updated_at=F('created_at'))
                
                processed += batch_size
                print(f"Updated {processed}/{total_posts} posts")

Error Handling and Recovery

Transaction Rollback Strategies

class RobustMigrationPatterns:
    """Patterns for robust migration error handling"""
    
    @staticmethod
    def migration_with_retry_logic(apps, schema_editor):
        """Migration with automatic retry on transient failures"""
        
        import time
        from django.db import transaction, OperationalError
        
        Post = apps.get_model('blog', 'Post')
        
        max_retries = 3
        retry_delay = 1  # seconds
        
        for attempt in range(max_retries):
            try:
                with transaction.atomic():
                    # Perform migration operation
                    for post in Post.objects.all():
                        post.search_vector = SearchVector('title', 'content')
                        post.save()
                
                # Success - break out of retry loop
                break
                
            except OperationalError as e:
                if attempt < max_retries - 1:
                    print(f"Migration attempt {attempt + 1} failed: {e}")
                    print(f"Retrying in {retry_delay} seconds...")
                    time.sleep(retry_delay)
                    retry_delay *= 2  # Exponential backoff
                else:
                    print(f"Migration failed after {max_retries} attempts")
                    raise
    
    @staticmethod
    def migration_with_progress_tracking(apps, schema_editor):
        """Migration that tracks progress and can resume"""
        
        Post = apps.get_model('blog', 'Post')
        
        # Check for existing progress marker
        from django.core.cache import cache
        
        migration_key = 'migration_progress_post_slugs'
        last_processed_id = cache.get(migration_key, 0)
        
        try:
            posts_to_process = Post.objects.filter(
                id__gt=last_processed_id,
                slug__isnull=True
            ).order_by('id')
            
            batch_size = 100
            
            for i in range(0, posts_to_process.count(), batch_size):
                batch = posts_to_process[i:i + batch_size]
                
                with transaction.atomic():
                    for post in batch:
                        post.slug = slugify(post.title)
                        post.save()
                        
                        # Update progress marker
                        cache.set(migration_key, post.id, timeout=3600)
                
                print(f"Processed batch ending with ID {batch.last().id}")
        
        finally:
            # Clean up progress marker on completion
            cache.delete(migration_key)
    
    @staticmethod
    def migration_with_validation(apps, schema_editor):
        """Migration with built-in validation and rollback"""
        
        Post = apps.get_model('blog', 'Post')
        
        # Store original state for validation
        original_count = Post.objects.count()
        
        with transaction.atomic():
            savepoint = transaction.savepoint()
            
            try:
                # Perform migration
                Post.objects.filter(status='draft').update(
                    status='published',
                    published_at=timezone.now()
                )
                
                # Validate results
                published_count = Post.objects.filter(status='published').count()
                
                if published_count == 0:
                    raise ValueError("No posts were published - this seems wrong")
                
                # Additional validation
                if Post.objects.count() != original_count:
                    raise ValueError("Post count changed unexpectedly")
                
                # Commit if validation passes
                transaction.savepoint_commit(savepoint)
                print(f"Successfully published {published_count} posts")
                
            except Exception as e:
                # Rollback on validation failure
                transaction.savepoint_rollback(savepoint)
                print(f"Migration validation failed: {e}")
                raise

# Error recovery utilities
class MigrationErrorRecovery:
    """Utilities for recovering from migration errors"""
    
    @staticmethod
    def create_recovery_migration(failed_migration_name, error_details):
        """Create a recovery migration for a failed migration"""
        
        recovery_operations = []
        
        # Analyze error and create appropriate recovery operations
        if 'duplicate key' in error_details.lower():
            recovery_operations.append(
                migrations.RunSQL(
                    sql="-- Remove duplicate entries before retrying",
                    reverse_sql="-- No reverse operation needed",
                )
            )
        
        elif 'column does not exist' in error_details.lower():
            recovery_operations.append(
                migrations.RunSQL(
                    sql="-- Add missing column",
                    reverse_sql="-- Remove column if needed",
                )
            )
        
        # Generate recovery migration content
        recovery_migration = f'''
from django.db import migrations, models

class Migration(migrations.Migration):
    """Recovery migration for {failed_migration_name}"""
    
    dependencies = [
        # Add appropriate dependencies
    ]
    
    operations = {recovery_operations}
'''
        
        return recovery_migration
    
    @staticmethod
    def analyze_migration_failure(migration_name, error):
        """Analyze migration failure and suggest recovery steps"""
        
        analysis = {
            'migration': migration_name,
            'error_type': type(error).__name__,
            'error_message': str(error),
            'suggested_actions': [],
            'recovery_strategy': 'manual'
        }
        
        error_msg = str(error).lower()
        
        # Common error patterns and solutions
        if 'duplicate key' in error_msg or 'unique constraint' in error_msg:
            analysis['suggested_actions'].extend([
                "Identify and remove duplicate records",
                "Add data cleaning step before migration",
                "Consider using get_or_create instead of create"
            ])
            analysis['recovery_strategy'] = 'data_cleanup'
        
        elif 'column does not exist' in error_msg:
            analysis['suggested_actions'].extend([
                "Check if previous migration was applied",
                "Verify migration dependencies",
                "Consider fake-applying missing migrations"
            ])
            analysis['recovery_strategy'] = 'dependency_fix'
        
        elif 'table does not exist' in error_msg:
            analysis['suggested_actions'].extend([
                "Run initial migrations first",
                "Check database connection and permissions",
                "Verify app is in INSTALLED_APPS"
            ])
            analysis['recovery_strategy'] = 'setup_issue'
        
        elif 'timeout' in error_msg or 'lock' in error_msg:
            analysis['suggested_actions'].extend([
                "Run migration during low-traffic period",
                "Consider breaking migration into smaller parts",
                "Check for long-running queries blocking migration"
            ])
            analysis['recovery_strategy'] = 'performance_issue'
        
        return analysis

Advanced Transaction Patterns

Cross-Database Transactions

# Handling multiple databases in migrations
class MultiDatabaseMigration(migrations.Migration):
    """Migration that works with multiple databases"""
    
    dependencies = [
        ('blog', '0006_conditional_transaction'),
    ]
    
    operations = [
        migrations.RunPython(
            code=migrate_across_databases,
            reverse_code=migrations.RunPython.noop,
        ),
    ]

def migrate_across_databases(apps, schema_editor):
    """Migrate data across multiple databases"""
    
    from django.db import connections, transaction
    
    # Get database connections
    default_db = connections['default']
    analytics_db = connections['analytics']
    
    Post = apps.get_model('blog', 'Post')
    
    # Note: Cross-database transactions are not supported
    # Each database operation must be in its own transaction
    
    try:
        # Migrate data from default to analytics database
        with transaction.atomic(using='default'):
            posts_data = list(Post.objects.using('default').values(
                'id', 'title', 'view_count', 'created_at'
            ))
        
        # Insert into analytics database
        with transaction.atomic(using='analytics'):
            # Assuming we have an AnalyticsPost model in analytics DB
            for post_data in posts_data:
                # Insert logic here
                pass
    
    except Exception as e:
        print(f"Cross-database migration failed: {e}")
        # Manual cleanup may be required
        raise

# Distributed transaction simulation
def simulate_distributed_transaction(apps, schema_editor):
    """Simulate distributed transaction with manual coordination"""
    
    from django.db import connections, transaction
    
    databases = ['default', 'analytics', 'cache']
    operations_log = []
    
    try:
        # Phase 1: Prepare all operations
        for db_alias in databases:
            with transaction.atomic(using=db_alias):
                # Prepare operation on each database
                savepoint = transaction.savepoint(using=db_alias)
                
                # Perform operation
                # ... database-specific operations ...
                
                operations_log.append({
                    'database': db_alias,
                    'savepoint': savepoint,
                    'status': 'prepared'
                })
        
        # Phase 2: Commit all operations
        for operation in operations_log:
            transaction.savepoint_commit(
                operation['savepoint'],
                using=operation['database']
            )
            operation['status'] = 'committed'
    
    except Exception as e:
        # Rollback all prepared operations
        for operation in operations_log:
            if operation['status'] == 'prepared':
                transaction.savepoint_rollback(
                    operation['savepoint'],
                    using=operation['database']
                )
        
        raise

# Long-running migration with checkpoints
class CheckpointMigration(migrations.Migration):
    """Migration with checkpoint system for long operations"""
    
    dependencies = [
        ('blog', '0007_multi_database'),
    ]
    
    operations = [
        migrations.RunPython(
            code=long_running_migration_with_checkpoints,
            reverse_code=migrations.RunPython.noop,
        ),
    ]

def long_running_migration_with_checkpoints(apps, schema_editor):
    """Long-running migration with checkpoint system"""
    
    from django.core.cache import cache
    from django.db import transaction
    
    Post = apps.get_model('blog', 'Post')
    
    checkpoint_key = 'migration_checkpoint_post_processing'
    batch_size = 1000
    
    # Get last checkpoint
    last_checkpoint = cache.get(checkpoint_key, {'last_id': 0, 'processed': 0})
    
    total_posts = Post.objects.filter(id__gt=last_checkpoint['last_id']).count()
    processed_count = last_checkpoint['processed']
    
    print(f"Resuming migration from ID {last_checkpoint['last_id']}")
    print(f"Already processed: {processed_count} posts")
    print(f"Remaining: {total_posts} posts")
    
    try:
        posts_queryset = Post.objects.filter(
            id__gt=last_checkpoint['last_id']
        ).order_by('id')
        
        for i in range(0, total_posts, batch_size):
            batch = posts_queryset[i:i + batch_size]
            
            with transaction.atomic():
                batch_list = list(batch)
                
                for post in batch_list:
                    # Perform expensive operation
                    post.search_content = f"{post.title} {post.content}"
                    post.save()
                
                # Update checkpoint
                if batch_list:
                    last_id = batch_list[-1].id
                    processed_count += len(batch_list)
                    
                    cache.set(checkpoint_key, {
                        'last_id': last_id,
                        'processed': processed_count
                    }, timeout=3600)
                    
                    print(f"Checkpoint: processed {processed_count} posts, last ID: {last_id}")
    
    except Exception as e:
        print(f"Migration failed at checkpoint {last_checkpoint}")
        print(f"Error: {e}")
        print("Migration can be resumed by running again")
        raise
    
    finally:
        # Clean up checkpoint on successful completion
        if processed_count >= total_posts:
            cache.delete(checkpoint_key)
            print("Migration completed successfully")

Performance Optimization

Transaction Optimization Strategies

class TransactionOptimization:
    """Strategies for optimizing migration transactions"""
    
    @staticmethod
    def bulk_operations_migration(apps, schema_editor):
        """Use bulk operations to minimize transaction overhead"""
        
        Post = apps.get_model('blog', 'Post')
        
        # Inefficient: Individual saves in transaction
        # with transaction.atomic():
        #     for post in Post.objects.all():
        #         post.view_count = 0
        #         post.save()
        
        # Efficient: Bulk update
        with transaction.atomic():
            Post.objects.all().update(view_count=0)
        
        # For more complex operations, use bulk_create/bulk_update
        posts_to_update = []
        
        for post in Post.objects.all():
            post.slug = slugify(post.title)
            posts_to_update.append(post)
        
        # Bulk update (Django 2.2+)
        Post.objects.bulk_update(posts_to_update, ['slug'], batch_size=1000)
    
    @staticmethod
    def optimized_data_migration(apps, schema_editor):
        """Optimized data migration with minimal locking"""
        
        from django.db import connection
        
        # Use raw SQL for better performance
        with connection.cursor() as cursor:
            # Single SQL statement is more efficient than Python loop
            cursor.execute("""
                UPDATE blog_post 
                SET slug = LOWER(REPLACE(REPLACE(title, ' ', '-'), '--', '-'))
                WHERE slug IS NULL OR slug = ''
            """)
            
            affected_rows = cursor.rowcount
            print(f"Updated {affected_rows} posts with slugs")
    
    @staticmethod
    def memory_efficient_migration(apps, schema_editor):
        """Memory-efficient migration for large datasets"""
        
        Post = apps.get_model('blog', 'Post')
        
        # Use iterator() to avoid loading all objects into memory
        batch_size = 1000
        posts_processed = 0
        
        # Process in chunks to manage memory usage
        posts_queryset = Post.objects.all().iterator(chunk_size=batch_size)
        
        posts_to_update = []
        
        for post in posts_queryset:
            post.search_content = f"{post.title} {post.content}"
            posts_to_update.append(post)
            
            # Process batch when it reaches batch_size
            if len(posts_to_update) >= batch_size:
                with transaction.atomic():
                    Post.objects.bulk_update(
                        posts_to_update, 
                        ['search_content'], 
                        batch_size=batch_size
                    )
                
                posts_processed += len(posts_to_update)
                posts_to_update = []
                
                print(f"Processed {posts_processed} posts")
        
        # Process remaining posts
        if posts_to_update:
            with transaction.atomic():
                Post.objects.bulk_update(
                    posts_to_update, 
                    ['search_content'], 
                    batch_size=len(posts_to_update)
                )
            
            posts_processed += len(posts_to_update)
            print(f"Final batch: processed {posts_processed} posts total")

# Database-specific optimizations
class DatabaseSpecificOptimizations:
    """Database-specific transaction optimizations"""
    
    @staticmethod
    def postgresql_optimizations(apps, schema_editor):
        """PostgreSQL-specific optimizations"""
        
        from django.db import connection
        
        if connection.vendor != 'postgresql':
            return
        
        with connection.cursor() as cursor:
            # Disable autocommit for better performance
            cursor.execute("SET autocommit = off")
            
            try:
                # Use COPY for bulk inserts (if applicable)
                cursor.execute("BEGIN")
                
                # Bulk operations here
                cursor.execute("""
                    UPDATE blog_post 
                    SET updated_at = NOW() 
                    WHERE updated_at IS NULL
                """)
                
                cursor.execute("COMMIT")
                
            except Exception as e:
                cursor.execute("ROLLBACK")
                raise
            
            finally:
                cursor.execute("SET autocommit = on")
    
    @staticmethod
    def mysql_optimizations(apps, schema_editor):
        """MySQL-specific optimizations"""
        
        from django.db import connection
        
        if connection.vendor != 'mysql':
            return
        
        with connection.cursor() as cursor:
            # Disable foreign key checks for faster operations
            cursor.execute("SET foreign_key_checks = 0")
            
            try:
                # Perform bulk operations
                cursor.execute("""
                    UPDATE blog_post 
                    SET view_count = COALESCE(view_count, 0)
                """)
                
            finally:
                # Re-enable foreign key checks
                cursor.execute("SET foreign_key_checks = 1")
    
    @staticmethod
    def sqlite_optimizations(apps, schema_editor):
        """SQLite-specific optimizations"""
        
        from django.db import connection
        
        if connection.vendor != 'sqlite':
            return
        
        with connection.cursor() as cursor:
            # Optimize SQLite for bulk operations
            cursor.execute("PRAGMA synchronous = OFF")
            cursor.execute("PRAGMA journal_mode = MEMORY")
            
            try:
                # Perform operations
                cursor.execute("""
                    UPDATE blog_post 
                    SET slug = lower(replace(title, ' ', '-'))
                    WHERE slug IS NULL
                """)
                
            finally:
                # Restore normal settings
                cursor.execute("PRAGMA synchronous = NORMAL")
                cursor.execute("PRAGMA journal_mode = DELETE")

Understanding transaction handling in Django migrations ensures data integrity while optimizing performance. Proper use of atomic operations, error handling, and database-specific optimizations enables safe and efficient schema evolution in production environments.