Django provides a comprehensive set of management commands for working with migrations. Understanding these commands and their options enables effective migration management across development, testing, and production environments.
# Basic usage
python manage.py makemigrations
# Create migrations for specific app
python manage.py makemigrations blog
# Create migrations for multiple apps
python manage.py makemigrations blog auth
# Create empty migration for custom operations
python manage.py makemigrations --empty blog
# Specify migration name
python manage.py makemigrations blog --name add_featured_posts
# Dry run - show what migrations would be created
python manage.py makemigrations --dry-run
# Merge conflicting migrations
python manage.py makemigrations --merge
# Check for migration issues without creating files
python manage.py makemigrations --check
# Include specific model changes only
python manage.py makemigrations --verbosity 2
# Custom makemigrations command with additional features
from django.core.management.commands.makemigrations import Command as BaseCommand
from django.core.management.base import CommandError
import os
class Command(BaseCommand):
"""Enhanced makemigrations command with additional features"""
def add_arguments(self, parser):
super().add_arguments(parser)
parser.add_argument(
'--backup',
action='store_true',
help='Create backup of existing migrations before creating new ones',
)
parser.add_argument(
'--review',
action='store_true',
help='Open generated migrations for review before saving',
)
parser.add_argument(
'--template',
type=str,
help='Use custom migration template',
)
def handle(self, *app_labels, **options):
# Create backup if requested
if options['backup']:
self.create_migration_backup(app_labels)
# Call parent command
super().handle(*app_labels, **options)
# Review migrations if requested
if options['review']:
self.review_generated_migrations(app_labels)
def create_migration_backup(self, app_labels):
"""Create backup of existing migrations"""
import shutil
from datetime import datetime
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
for app_label in app_labels or self.get_all_app_labels():
migrations_dir = self.get_migrations_dir(app_label)
if os.path.exists(migrations_dir):
backup_dir = f"{migrations_dir}_backup_{timestamp}"
shutil.copytree(migrations_dir, backup_dir)
self.stdout.write(f"Created backup: {backup_dir}")
def review_generated_migrations(self, app_labels):
"""Open generated migrations for review"""
import subprocess
for app_label in app_labels or self.get_all_app_labels():
migrations_dir = self.get_migrations_dir(app_label)
# Find newest migration file
if os.path.exists(migrations_dir):
migration_files = [
f for f in os.listdir(migrations_dir)
if f.endswith('.py') and not f.startswith('__')
]
if migration_files:
newest_file = max(migration_files)
file_path = os.path.join(migrations_dir, newest_file)
# Open in default editor
try:
subprocess.run(['code', file_path]) # VS Code
except FileNotFoundError:
try:
subprocess.run(['vim', file_path]) # Vim
except FileNotFoundError:
self.stdout.write(f"Please review: {file_path}")
# Migration validation before creation
class MigrationValidator:
"""Validate migrations before creation"""
@staticmethod
def validate_model_changes():
"""Validate model changes before creating migrations"""
from django.db.migrations.autodetector import MigrationAutodetector
from django.db.migrations.loader import MigrationLoader
from django.db.migrations.state import ProjectState
from django.apps import apps
# Get current and target states
loader = MigrationLoader(connection)
from_state = loader.project_state()
to_state = ProjectState.from_apps(apps)
# Detect changes
autodetector = MigrationAutodetector(from_state, to_state)
changes = autodetector.changes(
graph=loader.graph,
trim_to_apps=None,
convert_apps=None,
)
# Validate changes
validation_results = []
for app_label, migrations in changes.items():
for migration in migrations:
for operation in migration.operations:
result = MigrationValidator.validate_operation(operation)
if result:
validation_results.append({
'app': app_label,
'migration': migration.name,
'operation': operation.__class__.__name__,
'issues': result
})
return validation_results
@staticmethod
def validate_operation(operation):
"""Validate individual migration operation"""
issues = []
operation_name = operation.__class__.__name__
if operation_name == 'AddField':
field = operation.field
# Check for NOT NULL without default
if not field.null and not hasattr(field, 'default') and not field.blank:
issues.append(
f"Adding NOT NULL field '{operation.name}' without default "
"may fail on existing data"
)
# Check for unique fields
if getattr(field, 'unique', False):
issues.append(
f"Adding unique field '{operation.name}' may fail if "
"duplicate values exist"
)
elif operation_name == 'RemoveField':
issues.append(
f"Removing field '{operation.name}' will cause data loss"
)
elif operation_name == 'DeleteModel':
issues.append(
f"Deleting model '{operation.name}' will cause data loss"
)
elif operation_name == 'AlterField':
issues.append(
f"Altering field '{operation.name}' may cause data compatibility issues"
)
return issues
# Basic migration application
python manage.py migrate
# Migrate specific app
python manage.py migrate blog
# Migrate to specific migration
python manage.py migrate blog 0003
# Migrate to latest migration for app
python manage.py migrate blog
# Rollback to previous migration
python manage.py migrate blog 0002
# Rollback all migrations for app
python manage.py migrate blog zero
# Show SQL without applying
python manage.py migrate --plan
# Fake migration (mark as applied without running)
python manage.py migrate blog 0003 --fake
# Fake initial migration
python manage.py migrate blog 0001 --fake-initial
# Run migrations with verbosity
python manage.py migrate --verbosity 2
# Skip system checks
python manage.py migrate --skip-checks
# Custom migrate command with enhanced features
from django.core.management.commands.migrate import Command as BaseCommand
from django.core.management.base import CommandError
from django.db import transaction
import time
class Command(BaseCommand):
"""Enhanced migrate command with safety features"""
def add_arguments(self, parser):
super().add_arguments(parser)
parser.add_argument(
'--backup-before',
action='store_true',
help='Create database backup before applying migrations',
)
parser.add_argument(
'--confirm-destructive',
action='store_true',
help='Confirm destructive operations',
)
parser.add_argument(
'--max-time',
type=int,
help='Maximum time in seconds for migration execution',
)
parser.add_argument(
'--progress',
action='store_true',
help='Show detailed progress information',
)
def handle(self, *args, **options):
# Check for destructive operations
if self.has_destructive_operations(*args, **options):
if not options['confirm_destructive']:
raise CommandError(
"Destructive operations detected. Use --confirm-destructive to proceed."
)
# Create backup if requested
if options['backup_before']:
self.create_database_backup()
# Set up progress tracking
if options['progress']:
self.setup_progress_tracking()
# Set up timeout if specified
if options['max_time']:
self.setup_timeout(options['max_time'])
# Call parent command
super().handle(*args, **options)
def has_destructive_operations(self, *args, **options):
"""Check if pending migrations contain destructive operations"""
from django.db.migrations.executor import MigrationExecutor
executor = MigrationExecutor(connection)
plan = executor.migration_plan(executor.loader.graph.leaf_nodes())
destructive_operations = [
'RemoveField', 'DeleteModel', 'AlterField'
]
for migration, backwards in plan:
if backwards: # Rollback operations are potentially destructive
return True
for operation in migration.operations:
if operation.__class__.__name__ in destructive_operations:
return True
return False
def create_database_backup(self):
"""Create database backup before migration"""
from datetime import datetime
import subprocess
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
if connection.vendor == 'postgresql':
backup_file = f"backup_before_migration_{timestamp}.sql"
cmd = [
'pg_dump',
'-h', connection.settings_dict['HOST'],
'-p', str(connection.settings_dict['PORT']),
'-U', connection.settings_dict['USER'],
'-d', connection.settings_dict['NAME'],
'-f', backup_file
]
try:
subprocess.run(cmd, check=True)
self.stdout.write(f"Database backup created: {backup_file}")
except subprocess.CalledProcessError as e:
raise CommandError(f"Backup failed: {e}")
else:
self.stdout.write("Backup not implemented for this database backend")
def setup_progress_tracking(self):
"""Set up detailed progress tracking"""
# This would integrate with Django's migration executor
# to provide real-time progress updates
pass
def setup_timeout(self, max_time):
"""Set up migration timeout"""
import signal
def timeout_handler(signum, frame):
raise CommandError(f"Migration timed out after {max_time} seconds")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(max_time)
# Migration execution monitor
class MigrationExecutionMonitor:
"""Monitor migration execution in real-time"""
def __init__(self):
self.start_time = None
self.current_migration = None
self.total_migrations = 0
self.completed_migrations = 0
def start_monitoring(self, migration_plan):
"""Start monitoring migration execution"""
self.start_time = time.time()
self.total_migrations = len(migration_plan)
self.completed_migrations = 0
print(f"Starting migration of {self.total_migrations} migrations...")
def migration_started(self, migration):
"""Called when a migration starts"""
self.current_migration = migration
elapsed = time.time() - self.start_time
print(f"[{elapsed:.1f}s] Starting {migration.app_label}.{migration.name}")
def migration_completed(self, migration):
"""Called when a migration completes"""
self.completed_migrations += 1
elapsed = time.time() - self.start_time
progress = (self.completed_migrations / self.total_migrations) * 100
print(f"[{elapsed:.1f}s] Completed {migration.app_label}.{migration.name} "
f"({progress:.1f}% done)")
def migration_failed(self, migration, error):
"""Called when a migration fails"""
elapsed = time.time() - self.start_time
print(f"[{elapsed:.1f}s] FAILED {migration.app_label}.{migration.name}: {error}")
# Show all migrations and their status
python manage.py showmigrations
# Show migrations for specific app
python manage.py showmigrations blog
# Show migrations in list format
python manage.py showmigrations --list
# Show migrations in plan format
python manage.py showmigrations --plan
# Show only unapplied migrations
python manage.py showmigrations --plan | grep "[ ]"
# Show migrations with verbosity
python manage.py showmigrations --verbosity 2
# Show SQL for specific migration
python manage.py sqlmigrate blog 0001
# Show SQL with backwards migration
python manage.py sqlmigrate blog 0001 --backwards
# Show SQL for multiple migrations
for i in {1..3}; do
python manage.py sqlmigrate blog $(printf "%04d" $i)
done
# management/commands/migration_status.py
from django.core.management.base import BaseCommand
from django.db.migrations.loader import MigrationLoader
from django.db.migrations.executor import MigrationExecutor
from django.db import connection
class Command(BaseCommand):
"""Enhanced migration status command"""
help = 'Show detailed migration status information'
def add_arguments(self, parser):
parser.add_argument('--app', type=str, help='Show status for specific app')
parser.add_argument('--conflicts', action='store_true', help='Show migration conflicts')
parser.add_argument('--dependencies', action='store_true', help='Show migration dependencies')
parser.add_argument('--export', type=str, help='Export status to file')
def handle(self, *args, **options):
loader = MigrationLoader(connection)
executor = MigrationExecutor(connection)
if options['conflicts']:
self.show_conflicts(loader)
elif options['dependencies']:
self.show_dependencies(loader, options['app'])
else:
self.show_status(loader, executor, options['app'])
if options['export']:
self.export_status(loader, executor, options['export'])
def show_status(self, loader, executor, app_filter=None):
"""Show migration status"""
graph = loader.graph
# Group migrations by app
apps_migrations = {}
for app_label, migration_name in graph.nodes:
if app_filter and app_label != app_filter:
continue
if app_label not in apps_migrations:
apps_migrations[app_label] = []
is_applied = (app_label, migration_name) in loader.applied_migrations
apps_migrations[app_label].append({
'name': migration_name,
'applied': is_applied,
'dependencies': graph.nodes[(app_label, migration_name)].dependencies,
})
# Display status
for app_label, migrations in apps_migrations.items():
self.stdout.write(f"\n{app_label}:")
applied_count = sum(1 for m in migrations if m['applied'])
total_count = len(migrations)
self.stdout.write(f" Status: {applied_count}/{total_count} applied")
for migration in migrations:
status = "[X]" if migration['applied'] else "[ ]"
self.stdout.write(f" {status} {migration['name']}")
def show_conflicts(self, loader):
"""Show migration conflicts"""
conflicts = loader.detect_conflicts()
if not conflicts:
self.stdout.write("No migration conflicts detected.")
return
self.stdout.write("Migration conflicts detected:")
for app_label, conflict_migrations in conflicts.items():
self.stdout.write(f"\n{app_label}:")
for migration_name in conflict_migrations:
self.stdout.write(f" - {migration_name}")
self.stdout.write(" Resolution: Run 'python manage.py makemigrations --merge'")
def show_dependencies(self, loader, app_filter=None):
"""Show migration dependencies"""
graph = loader.graph
for app_label, migration_name in graph.nodes:
if app_filter and app_label != app_filter:
continue
migration = graph.nodes[(app_label, migration_name)]
if migration.dependencies:
self.stdout.write(f"\n{app_label}.{migration_name}:")
for dep_app, dep_migration in migration.dependencies:
self.stdout.write(f" depends on: {dep_app}.{dep_migration}")
def export_status(self, loader, executor, filename):
"""Export migration status to file"""
import json
from datetime import datetime
status_data = {
'timestamp': datetime.now().isoformat(),
'database': connection.settings_dict['NAME'],
'apps': {}
}
graph = loader.graph
for app_label, migration_name in graph.nodes:
if app_label not in status_data['apps']:
status_data['apps'][app_label] = {
'migrations': [],
'applied_count': 0,
'total_count': 0
}
is_applied = (app_label, migration_name) in loader.applied_migrations
migration = graph.nodes[(app_label, migration_name)]
status_data['apps'][app_label]['migrations'].append({
'name': migration_name,
'applied': is_applied,
'dependencies': migration.dependencies,
})
status_data['apps'][app_label]['total_count'] += 1
if is_applied:
status_data['apps'][app_label]['applied_count'] += 1
with open(filename, 'w') as f:
json.dump(status_data, f, indent=2)
self.stdout.write(f"Migration status exported to {filename}")
# management/commands/migration_graph.py
class Command(BaseCommand):
"""Visualize migration dependency graph"""
help = 'Generate migration dependency graph'
def add_arguments(self, parser):
parser.add_argument('--app', type=str, help='Generate graph for specific app')
parser.add_argument('--format', choices=['dot', 'json'], default='dot')
parser.add_argument('--output', type=str, help='Output file')
def handle(self, *args, **options):
from django.db.migrations.loader import MigrationLoader
loader = MigrationLoader(connection)
graph = loader.graph
if options['format'] == 'dot':
self.generate_dot_graph(graph, options['app'], options['output'])
elif options['format'] == 'json':
self.generate_json_graph(graph, options['app'], options['output'])
def generate_dot_graph(self, graph, app_filter=None, output_file=None):
"""Generate DOT format graph"""
dot_content = ["digraph migrations {"]
dot_content.append(" rankdir=TB;")
dot_content.append(" node [shape=box];")
# Add nodes
for app_label, migration_name in graph.nodes:
if app_filter and app_label != app_filter:
continue
node_id = f"{app_label}_{migration_name}"
label = f"{app_label}\\n{migration_name}"
dot_content.append(f' "{node_id}" [label="{label}"];')
# Add edges
for app_label, migration_name in graph.nodes:
if app_filter and app_label != app_filter:
continue
migration = graph.nodes[(app_label, migration_name)]
node_id = f"{app_label}_{migration_name}"
for dep_app, dep_migration in migration.dependencies:
if app_filter and dep_app != app_filter:
continue
dep_node_id = f"{dep_app}_{dep_migration}"
dot_content.append(f' "{dep_node_id}" -> "{node_id}";')
dot_content.append("}")
dot_output = "\n".join(dot_content)
if output_file:
with open(output_file, 'w') as f:
f.write(dot_output)
self.stdout.write(f"DOT graph written to {output_file}")
else:
self.stdout.write(dot_output)
def generate_json_graph(self, graph, app_filter=None, output_file=None):
"""Generate JSON format graph"""
graph_data = {
'nodes': [],
'edges': []
}
# Add nodes
for app_label, migration_name in graph.nodes:
if app_filter and app_label != app_filter:
continue
graph_data['nodes'].append({
'id': f"{app_label}.{migration_name}",
'app': app_label,
'migration': migration_name,
})
# Add edges
for app_label, migration_name in graph.nodes:
if app_filter and app_label != app_filter:
continue
migration = graph.nodes[(app_label, migration_name)]
for dep_app, dep_migration in migration.dependencies:
if app_filter and dep_app != app_filter:
continue
graph_data['edges'].append({
'from': f"{dep_app}.{dep_migration}",
'to': f"{app_label}.{migration_name}",
})
import json
json_output = json.dumps(graph_data, indent=2)
if output_file:
with open(output_file, 'w') as f:
f.write(json_output)
self.stdout.write(f"JSON graph written to {output_file}")
else:
self.stdout.write(json_output)
# management/commands/cleanup_migrations.py
class Command(BaseCommand):
"""Clean up and optimize migrations"""
help = 'Clean up migration files and optimize migration history'
def add_arguments(self, parser):
parser.add_argument('--squash', action='store_true', help='Suggest migrations to squash')
parser.add_argument('--unused', action='store_true', help='Find unused migration files')
parser.add_argument('--optimize', action='store_true', help='Optimize migration operations')
def handle(self, *args, **options):
if options['squash']:
self.suggest_squashing()
if options['unused']:
self.find_unused_migrations()
if options['optimize']:
self.optimize_migrations()
def suggest_squashing(self):
"""Suggest migrations that could be squashed"""
from django.db.migrations.loader import MigrationLoader
loader = MigrationLoader(connection)
# Find long chains of migrations
for app_label in loader.migrated_apps:
app_migrations = []
for migration_key in loader.graph.nodes:
if migration_key[0] == app_label:
app_migrations.append(migration_key[1])
app_migrations.sort()
if len(app_migrations) > 10:
self.stdout.write(f"\n{app_label} has {len(app_migrations)} migrations")
self.stdout.write("Consider squashing migrations:")
# Suggest squashing in groups of 5
for i in range(0, len(app_migrations), 5):
group = app_migrations[i:i+5]
if len(group) > 1:
start_migration = group[0]
end_migration = group[-1]
self.stdout.write(
f" python manage.py squashmigrations {app_label} "
f"{start_migration} {end_migration}"
)
def find_unused_migrations(self):
"""Find migration files that are not in the dependency graph"""
import os
from django.apps import apps
from django.db.migrations.loader import MigrationLoader
loader = MigrationLoader(connection)
for app_config in apps.get_app_configs():
migrations_dir = os.path.join(app_config.path, 'migrations')
if not os.path.exists(migrations_dir):
continue
# Get migration files
migration_files = [
f[:-3] for f in os.listdir(migrations_dir)
if f.endswith('.py') and not f.startswith('__')
]
# Get migrations in graph
graph_migrations = [
migration_name for app_label, migration_name in loader.graph.nodes
if app_label == app_config.label
]
# Find unused files
unused_files = set(migration_files) - set(graph_migrations)
if unused_files:
self.stdout.write(f"\n{app_config.label} unused migrations:")
for unused_file in unused_files:
file_path = os.path.join(migrations_dir, f"{unused_file}.py")
self.stdout.write(f" {file_path}")
def optimize_migrations(self):
"""Suggest optimizations for existing migrations"""
from django.db.migrations.loader import MigrationLoader
loader = MigrationLoader(connection)
for migration_key in loader.graph.nodes:
app_label, migration_name = migration_key
migration = loader.get_migration(*migration_key)
optimizations = self.analyze_migration_optimizations(migration)
if optimizations:
self.stdout.write(f"\n{app_label}.{migration_name}:")
for optimization in optimizations:
self.stdout.write(f" - {optimization}")
def analyze_migration_optimizations(self, migration):
"""Analyze migration for optimization opportunities"""
optimizations = []
# Check for redundant operations
operations = migration.operations
# Look for AddField followed by AlterField on same field
for i in range(len(operations) - 1):
current_op = operations[i]
next_op = operations[i + 1]
if (current_op.__class__.__name__ == 'AddField' and
next_op.__class__.__name__ == 'AlterField' and
current_op.model_name == next_op.model_name and
current_op.name == next_op.name):
optimizations.append(
f"Combine AddField and AlterField for {current_op.name}"
)
# Check for multiple index operations
index_operations = [
op for op in operations
if op.__class__.__name__ in ['AddIndex', 'RemoveIndex']
]
if len(index_operations) > 3:
optimizations.append(
f"Consider combining {len(index_operations)} index operations"
)
return optimizations
# Batch migration operations
class BatchMigrationRunner:
"""Run migrations in batches with monitoring"""
def __init__(self, batch_size=5, delay_between_batches=1):
self.batch_size = batch_size
self.delay_between_batches = delay_between_batches
def run_migrations_in_batches(self, migration_targets):
"""Run migrations in batches"""
from django.core.management import call_command
import time
# Split migrations into batches
batches = [
migration_targets[i:i + self.batch_size]
for i in range(0, len(migration_targets), self.batch_size)
]
for i, batch in enumerate(batches, 1):
print(f"Running batch {i}/{len(batches)}: {len(batch)} migrations")
for app_label, migration_name in batch:
try:
call_command('migrate', app_label, migration_name, verbosity=1)
print(f" ✓ {app_label}.{migration_name}")
except Exception as e:
print(f" ✗ {app_label}.{migration_name}: {e}")
raise
# Delay between batches
if i < len(batches):
print(f"Waiting {self.delay_between_batches}s before next batch...")
time.sleep(self.delay_between_batches)
print("All migrations completed successfully!")
Django's migration management commands provide powerful tools for controlling database schema evolution. Understanding these commands and their options, along with custom utilities, enables effective migration management in complex development and production environments.
How Migrations Work
Understanding the internal mechanics of Django migrations helps you write better migrations, debug issues, and optimize database schema changes. This deep dive explores how Django tracks, generates, and applies migrations.
Dependencies and Workflow
Understanding migration dependencies and establishing proper workflows is crucial for managing complex Django projects with multiple apps and developers. This section covers dependency management, conflict resolution, and best practices for team collaboration.