Migrations

How Migrations Work

Understanding the internal mechanics of Django migrations helps you write better migrations, debug issues, and optimize database schema changes. This deep dive explores how Django tracks, generates, and applies migrations.

How Migrations Work

Understanding the internal mechanics of Django migrations helps you write better migrations, debug issues, and optimize database schema changes. This deep dive explores how Django tracks, generates, and applies migrations.

Migration System Architecture

Migration State Tracking

# Django tracks migration state using the django_migrations table
# This table stores which migrations have been applied

# Database schema for django_migrations table:
"""
CREATE TABLE django_migrations (
    id INTEGER PRIMARY KEY,
    app VARCHAR(255) NOT NULL,
    name VARCHAR(255) NOT NULL,
    applied DATETIME NOT NULL
);
"""

# Example entries in django_migrations table:
"""
| id | app  | name                    | applied             |
|----|------|-------------------------|---------------------|
| 1  | auth | 0001_initial           | 2023-01-01 10:00:00 |
| 2  | blog | 0001_initial           | 2023-01-01 10:01:00 |
| 3  | blog | 0002_post_author       | 2023-01-02 14:30:00 |
"""

from django.db.migrations.recorder import MigrationRecorder

class MigrationStateManager:
    """Understand how Django tracks migration state"""
    
    @staticmethod
    def get_applied_migrations():
        """Get all applied migrations"""
        recorder = MigrationRecorder(connection)
        return recorder.applied_migrations()
    
    @staticmethod
    def check_migration_status(app_label, migration_name):
        """Check if a specific migration has been applied"""
        recorder = MigrationRecorder(connection)
        return (app_label, migration_name) in recorder.applied_migrations()
    
    @staticmethod
    def get_migration_history():
        """Get complete migration history"""
        from django.db.migrations.executor import MigrationExecutor
        
        executor = MigrationExecutor(connection)
        
        # Get migration graph
        graph = executor.loader.graph
        
        # Get all migrations
        all_migrations = []
        for app_label in graph.nodes:
            for migration_name in graph.nodes[app_label]:
                migration = graph.nodes[app_label][migration_name]
                all_migrations.append({
                    'app': app_label,
                    'name': migration_name,
                    'applied': (app_label, migration_name) in executor.loader.applied_migrations,
                    'dependencies': migration.dependencies,
                })
        
        return all_migrations

# Migration loader and graph
class MigrationGraphExplorer:
    """Explore the migration dependency graph"""
    
    def __init__(self):
        from django.db.migrations.loader import MigrationLoader
        self.loader = MigrationLoader(connection)
        self.graph = self.loader.graph
    
    def get_migration_plan(self, targets):
        """Get the plan to reach target migrations"""
        from django.db.migrations.executor import MigrationExecutor
        
        executor = MigrationExecutor(connection)
        plan = executor.migration_plan(targets)
        
        return [
            {
                'migration': f"{migration.app_label}.{migration.name}",
                'backwards': backwards,
                'dependencies': migration.dependencies,
            }
            for migration, backwards in plan
        ]
    
    def find_migration_conflicts(self):
        """Find conflicting migrations"""
        conflicts = {}
        
        for app_label in self.graph.nodes:
            app_conflicts = []
            
            # Check for multiple leaf nodes (potential conflicts)
            leaves = self.graph.leaf_nodes(app_label)
            if len(leaves) > 1:
                app_conflicts.extend(leaves)
            
            if app_conflicts:
                conflicts[app_label] = app_conflicts
        
        return conflicts
    
    def get_unapplied_migrations(self):
        """Get all unapplied migrations"""
        unapplied = []
        
        for app_label in self.graph.nodes:
            for migration_name in self.graph.nodes[app_label]:
                if (app_label, migration_name) not in self.loader.applied_migrations:
                    unapplied.append((app_label, migration_name))
        
        return unapplied

Migration Generation Process

# How Django generates migrations from model changes

from django.db.migrations.autodetector import MigrationAutodetector
from django.db.migrations.state import ProjectState
from django.apps import apps

class MigrationGenerationProcess:
    """Understand how Django generates migrations"""
    
    @staticmethod
    def analyze_model_changes():
        """Analyze what changes Django can detect"""
        
        # Get current project state from migrations
        from django.db.migrations.loader import MigrationLoader
        loader = MigrationLoader(connection)
        
        # Get state from last migrations
        from_state = loader.project_state()
        
        # Get current state from models
        to_state = ProjectState.from_apps(apps)
        
        # Create autodetector
        autodetector = MigrationAutodetector(from_state, to_state)
        
        # Generate changes
        changes = autodetector.changes(
            graph=loader.graph,
            trim_to_apps=None,
            convert_apps=None,
        )
        
        return changes
    
    @staticmethod
    def simulate_migration_generation(app_label):
        """Simulate migration generation for an app"""
        
        changes = MigrationGenerationProcess.analyze_model_changes()
        
        if app_label in changes:
            migrations = changes[app_label]
            
            for migration in migrations:
                print(f"Migration: {migration.name}")
                print(f"Dependencies: {migration.dependencies}")
                print("Operations:")
                
                for operation in migration.operations:
                    print(f"  - {operation.__class__.__name__}: {operation}")
        
        return changes.get(app_label, [])

# Migration operation types and their effects
class MigrationOperationAnalyzer:
    """Analyze different types of migration operations"""
    
    operation_types = {
        'CreateModel': {
            'description': 'Creates a new database table',
            'sql_pattern': 'CREATE TABLE',
            'reversible': True,
            'data_safe': True,
        },
        'DeleteModel': {
            'description': 'Drops a database table',
            'sql_pattern': 'DROP TABLE',
            'reversible': False,  # Data loss
            'data_safe': False,
        },
        'AddField': {
            'description': 'Adds a column to existing table',
            'sql_pattern': 'ALTER TABLE ... ADD COLUMN',
            'reversible': True,
            'data_safe': True,
        },
        'RemoveField': {
            'description': 'Removes a column from table',
            'sql_pattern': 'ALTER TABLE ... DROP COLUMN',
            'reversible': False,  # Data loss
            'data_safe': False,
        },
        'AlterField': {
            'description': 'Modifies existing column',
            'sql_pattern': 'ALTER TABLE ... ALTER COLUMN',
            'reversible': True,  # Usually
            'data_safe': True,   # Usually
        },
        'RenameField': {
            'description': 'Renames a column',
            'sql_pattern': 'ALTER TABLE ... RENAME COLUMN',
            'reversible': True,
            'data_safe': True,
        },
        'AddIndex': {
            'description': 'Creates database index',
            'sql_pattern': 'CREATE INDEX',
            'reversible': True,
            'data_safe': True,
        },
        'RemoveIndex': {
            'description': 'Drops database index',
            'sql_pattern': 'DROP INDEX',
            'reversible': True,
            'data_safe': True,
        },
        'RunSQL': {
            'description': 'Executes custom SQL',
            'sql_pattern': 'Custom SQL',
            'reversible': 'Depends on implementation',
            'data_safe': 'Depends on SQL',
        },
        'RunPython': {
            'description': 'Executes Python code',
            'sql_pattern': 'N/A',
            'reversible': 'Depends on implementation',
            'data_safe': 'Depends on code',
        },
    }
    
    @staticmethod
    def analyze_operation_safety(operation):
        """Analyze the safety of a migration operation"""
        
        operation_name = operation.__class__.__name__
        operation_info = MigrationOperationAnalyzer.operation_types.get(
            operation_name, 
            {'description': 'Unknown operation', 'data_safe': 'Unknown'}
        )
        
        safety_analysis = {
            'operation': operation_name,
            'description': operation_info['description'],
            'reversible': operation_info.get('reversible', 'Unknown'),
            'data_safe': operation_info.get('data_safe', 'Unknown'),
            'recommendations': []
        }
        
        # Add specific recommendations
        if operation_name == 'AddField':
            if hasattr(operation, 'field') and not operation.field.null and not operation.field.default:
                safety_analysis['data_safe'] = False
                safety_analysis['recommendations'].append(
                    'Adding NOT NULL field without default requires existing data handling'
                )
        
        elif operation_name == 'AlterField':
            safety_analysis['recommendations'].append(
                'Review field changes for data compatibility'
            )
        
        elif operation_name in ['RunSQL', 'RunPython']:
            safety_analysis['recommendations'].append(
                'Custom operations require manual review for safety'
            )
        
        return safety_analysis

# Migration execution process
class MigrationExecutionAnalyzer:
    """Analyze how migrations are executed"""
    
    @staticmethod
    def trace_migration_execution(app_label, migration_name):
        """Trace the execution of a specific migration"""
        
        from django.db.migrations.executor import MigrationExecutor
        from django.db.migrations.loader import MigrationLoader
        
        # Load migration
        loader = MigrationLoader(connection)
        migration = loader.get_migration(app_label, migration_name)
        
        # Analyze operations
        execution_plan = []
        
        for operation in migration.operations:
            operation_analysis = {
                'operation': operation.__class__.__name__,
                'description': str(operation),
                'estimated_sql': None,
                'dependencies': getattr(operation, 'dependencies', []),
            }
            
            # Try to get SQL representation
            try:
                # Create a schema editor to generate SQL
                with connection.schema_editor() as schema_editor:
                    if hasattr(operation, 'database_forwards'):
                        # This is a simplified approach - actual SQL generation
                        # requires more context
                        operation_analysis['has_sql'] = True
                    else:
                        operation_analysis['has_sql'] = False
            except:
                operation_analysis['has_sql'] = 'Unknown'
            
            execution_plan.append(operation_analysis)
        
        return {
            'migration': f"{app_label}.{migration_name}",
            'dependencies': migration.dependencies,
            'operations': execution_plan,
            'atomic': getattr(migration, 'atomic', True),
        }
    
    @staticmethod
    def simulate_migration_rollback(app_label, migration_name):
        """Simulate rolling back a migration"""
        
        from django.db.migrations.loader import MigrationLoader
        
        loader = MigrationLoader(connection)
        migration = loader.get_migration(app_label, migration_name)
        
        rollback_plan = []
        
        # Operations are applied in reverse order for rollback
        for operation in reversed(migration.operations):
            rollback_info = {
                'operation': operation.__class__.__name__,
                'reversible': hasattr(operation, 'database_backwards'),
                'description': str(operation),
            }
            
            # Check if operation is reversible
            if hasattr(operation, 'reversible') and not operation.reversible:
                rollback_info['warning'] = 'This operation is not reversible'
            
            rollback_plan.append(rollback_info)
        
        return {
            'migration': f"{app_label}.{migration_name}",
            'rollback_operations': rollback_plan,
            'safe_to_rollback': all(op['reversible'] for op in rollback_plan),
        }

Migration State Management

Project State and Schema Evolution

from django.db.migrations.state import ProjectState, ModelState

class ProjectStateManager:
    """Manage and analyze project state evolution"""
    
    def __init__(self):
        from django.db.migrations.loader import MigrationLoader
        self.loader = MigrationLoader(connection)
    
    def get_state_at_migration(self, app_label, migration_name):
        """Get project state at a specific migration"""
        
        # Build state up to the specified migration
        target = (app_label, migration_name)
        state = self.loader.project_state(target)
        
        return state
    
    def compare_states(self, state1, state2):
        """Compare two project states"""
        
        differences = {
            'added_models': [],
            'removed_models': [],
            'modified_models': [],
        }
        
        # Get model keys from both states
        models1 = set(state1.models.keys())
        models2 = set(state2.models.keys())
        
        # Find added and removed models
        differences['added_models'] = list(models2 - models1)
        differences['removed_models'] = list(models1 - models2)
        
        # Find modified models
        common_models = models1 & models2
        for model_key in common_models:
            model1 = state1.models[model_key]
            model2 = state2.models[model_key]
            
            if self._models_differ(model1, model2):
                differences['modified_models'].append({
                    'model': model_key,
                    'changes': self._get_model_changes(model1, model2)
                })
        
        return differences
    
    def _models_differ(self, model1, model2):
        """Check if two model states differ"""
        
        # Compare fields
        if set(model1.fields.keys()) != set(model2.fields.keys()):
            return True
        
        # Compare field definitions
        for field_name in model1.fields:
            if field_name in model2.fields:
                field1 = model1.fields[field_name]
                field2 = model2.fields[field_name]
                
                # Compare field types and attributes
                if (field1.__class__ != field2.__class__ or
                    field1.max_length != field2.max_length or
                    field1.null != field2.null or
                    field1.blank != field2.blank):
                    return True
        
        # Compare options
        if model1.options != model2.options:
            return True
        
        return False
    
    def _get_model_changes(self, model1, model2):
        """Get detailed changes between two model states"""
        
        changes = {
            'added_fields': [],
            'removed_fields': [],
            'modified_fields': [],
            'option_changes': {},
        }
        
        # Field changes
        fields1 = set(model1.fields.keys())
        fields2 = set(model2.fields.keys())
        
        changes['added_fields'] = list(fields2 - fields1)
        changes['removed_fields'] = list(fields1 - fields2)
        
        # Modified fields
        common_fields = fields1 & fields2
        for field_name in common_fields:
            field1 = model1.fields[field_name]
            field2 = model2.fields[field_name]
            
            field_changes = []
            
            if field1.__class__ != field2.__class__:
                field_changes.append(f'Type: {field1.__class__.__name__} -> {field2.__class__.__name__}')
            
            if getattr(field1, 'max_length', None) != getattr(field2, 'max_length', None):
                field_changes.append(f'Max length: {field1.max_length} -> {field2.max_length}')
            
            if field1.null != field2.null:
                field_changes.append(f'Null: {field1.null} -> {field2.null}')
            
            if field_changes:
                changes['modified_fields'].append({
                    'field': field_name,
                    'changes': field_changes
                })
        
        # Option changes
        for option, value1 in model1.options.items():
            value2 = model2.options.get(option)
            if value1 != value2:
                changes['option_changes'][option] = {
                    'from': value1,
                    'to': value2
                }
        
        return changes
    
    def get_migration_timeline(self, app_label):
        """Get timeline of migrations for an app"""
        
        timeline = []
        
        # Get all migrations for the app
        app_migrations = []
        for migration_key in self.loader.graph.nodes:
            if migration_key[0] == app_label:
                app_migrations.append(migration_key)
        
        # Sort by migration order
        app_migrations.sort(key=lambda x: x[1])
        
        # Build timeline with state at each migration
        previous_state = None
        
        for migration_key in app_migrations:
            current_state = self.get_state_at_migration(*migration_key)
            
            timeline_entry = {
                'migration': f"{migration_key[0]}.{migration_key[1]}",
                'timestamp': None,  # Would need to get from migration recorder
            }
            
            if previous_state:
                timeline_entry['changes'] = self.compare_states(previous_state, current_state)
            else:
                timeline_entry['changes'] = {'initial': True}
            
            timeline.append(timeline_entry)
            previous_state = current_state
        
        return timeline

# Schema editor integration
class SchemaEditorAnalyzer:
    """Analyze how schema editor generates SQL"""
    
    @staticmethod
    def analyze_sql_generation(operation, model_state=None):
        """Analyze SQL generation for an operation"""
        
        sql_info = {
            'operation': operation.__class__.__name__,
            'sql_statements': [],
            'database_vendor': connection.vendor,
        }
        
        try:
            with connection.schema_editor() as schema_editor:
                # Capture SQL statements
                original_execute = schema_editor.execute
                captured_sql = []
                
                def capture_sql(sql, params=()):
                    captured_sql.append({
                        'sql': sql,
                        'params': params
                    })
                    return original_execute(sql, params)
                
                schema_editor.execute = capture_sql
                
                # Execute operation (in dry-run mode)
                try:
                    if hasattr(operation, 'database_forwards'):
                        # This would need proper state setup for real execution
                        pass
                except Exception as e:
                    sql_info['error'] = str(e)
                
                sql_info['sql_statements'] = captured_sql
        
        except Exception as e:
            sql_info['error'] = f"Could not analyze SQL generation: {e}"
        
        return sql_info
    
    @staticmethod
    def get_database_specific_features():
        """Get database-specific migration features"""
        
        features = {
            'vendor': connection.vendor,
            'supports_transactions': connection.features.supports_transactions,
            'supports_atomic_references_rename': connection.features.supports_atomic_references_rename,
            'supports_foreign_keys': connection.features.supports_foreign_keys,
            'supports_check_constraints': connection.features.supports_check_constraints,
            'supports_partial_indexes': connection.features.supports_partial_indexes,
            'supports_functions_in_partial_indexes': connection.features.supports_functions_in_partial_indexes,
            'supports_expression_indexes': connection.features.supports_expression_indexes,
            'supports_timezones': connection.features.supports_timezones,
        }
        
        # Add vendor-specific features
        if connection.vendor == 'postgresql':
            features.update({
                'supports_concurrent_index_creation': True,
                'supports_gin_indexes': True,
                'supports_jsonb': True,
                'supports_arrays': True,
            })
        elif connection.vendor == 'mysql':
            features.update({
                'supports_fulltext_indexes': True,
                'supports_spatial_indexes': True,
            })
        elif connection.vendor == 'sqlite':
            features.update({
                'supports_fts': True,
                'limited_alter_table': True,
            })
        
        return features

# Migration optimization analyzer
class MigrationOptimizationAnalyzer:
    """Analyze migrations for optimization opportunities"""
    
    @staticmethod
    def analyze_migration_performance(migration):
        """Analyze migration for performance issues"""
        
        analysis = {
            'migration': f"{migration.app_label}.{migration.name}",
            'performance_concerns': [],
            'optimization_suggestions': [],
            'estimated_impact': 'low',
        }
        
        for operation in migration.operations:
            operation_name = operation.__class__.__name__
            
            # Analyze specific operations
            if operation_name == 'AddField':
                if hasattr(operation, 'field'):
                    field = operation.field
                    
                    # Check for potentially slow operations
                    if hasattr(field, 'db_index') and field.db_index:
                        analysis['performance_concerns'].append(
                            f"Adding indexed field '{operation.name}' may be slow on large tables"
                        )
                        analysis['optimization_suggestions'].append(
                            "Consider creating index separately with CREATE INDEX CONCURRENTLY"
                        )
                    
                    if not field.null and not hasattr(field, 'default'):
                        analysis['performance_concerns'].append(
                            f"Adding NOT NULL field '{operation.name}' without default may lock table"
                        )
                        analysis['optimization_suggestions'].append(
                            "Add field as nullable first, populate data, then make NOT NULL"
                        )
            
            elif operation_name == 'AlterField':
                analysis['performance_concerns'].append(
                    f"Altering field '{operation.name}' may require table rewrite"
                )
                analysis['optimization_suggestions'].append(
                    "Test on production-sized dataset to estimate impact"
                )
            
            elif operation_name == 'RunPython':
                analysis['performance_concerns'].append(
                    "Custom Python code may be slow on large datasets"
                )
                analysis['optimization_suggestions'].append(
                    "Consider using bulk operations or raw SQL for better performance"
                )
        
        # Estimate overall impact
        if len(analysis['performance_concerns']) > 2:
            analysis['estimated_impact'] = 'high'
        elif len(analysis['performance_concerns']) > 0:
            analysis['estimated_impact'] = 'medium'
        
        return analysis
    
    @staticmethod
    def suggest_migration_splitting(migration):
        """Suggest how to split a large migration"""
        
        suggestions = {
            'migration': f"{migration.app_label}.{migration.name}",
            'should_split': False,
            'split_suggestions': [],
        }
        
        if len(migration.operations) > 5:
            suggestions['should_split'] = True
            
            # Group operations by type
            operation_groups = {}
            for i, operation in enumerate(migration.operations):
                op_type = operation.__class__.__name__
                if op_type not in operation_groups:
                    operation_groups[op_type] = []
                operation_groups[op_type].append((i, operation))
            
            # Suggest splits
            for op_type, operations in operation_groups.items():
                if len(operations) > 1:
                    suggestions['split_suggestions'].append(
                        f"Group {len(operations)} {op_type} operations into separate migration"
                    )
        
        return suggestions

Understanding how Django migrations work internally enables you to write more efficient migrations, debug complex migration issues, and optimize database schema changes for production environments. This knowledge is essential for maintaining large Django applications with evolving data models.