Django migrations run within database transactions by default, providing atomicity and consistency during schema changes. Understanding transaction behavior in migrations is crucial for maintaining data integrity and handling complex migration scenarios safely.
# By default, migrations run in a single transaction
class Migration(migrations.Migration):
"""Standard atomic migration"""
dependencies = [
('blog', '0001_initial'),
]
# atomic = True is the default
atomic = True
operations = [
migrations.AddField(
model_name='post',
name='view_count',
field=models.PositiveIntegerField(default=0),
),
migrations.AddField(
model_name='post',
name='like_count',
field=models.PositiveIntegerField(default=0),
),
# All operations succeed or all fail together
]
# Non-atomic migration for special cases
class Migration(migrations.Migration):
"""Non-atomic migration for operations that can't run in transactions"""
dependencies = [
('blog', '0002_add_counters'),
]
# Disable atomicity for this migration
atomic = False
operations = [
# PostgreSQL: CREATE INDEX CONCURRENTLY cannot run in transaction
migrations.RunSQL(
sql="CREATE INDEX CONCURRENTLY idx_post_title_gin ON blog_post USING gin(to_tsvector('english', title));",
reverse_sql="DROP INDEX IF EXISTS idx_post_title_gin;",
),
]
# Database-specific atomic behavior
class DatabaseSpecificMigration(migrations.Migration):
"""Handle atomicity differently per database"""
dependencies = [
('blog', '0003_add_search_index'),
]
def __init__(self, name, app_label):
super().__init__(name, app_label)
# Set atomicity based on database backend
from django.db import connection
if connection.vendor == 'postgresql':
# PostgreSQL supports most operations in transactions
self.atomic = True
elif connection.vendor == 'mysql':
# MySQL has limitations with DDL in transactions
self.atomic = False
else:
# Default behavior for other databases
self.atomic = True
operations = [
migrations.AddIndex(
model_name='post',
index=models.Index(fields=['created_at'], name='idx_post_created'),
),
]
# Manual transaction control within migrations
from django.db import transaction
class TransactionControlledMigration(migrations.Migration):
"""Migration with manual transaction control"""
dependencies = [
('blog', '0004_database_specific'),
]
# Disable automatic transaction wrapping
atomic = False
operations = [
migrations.RunPython(
code=migrate_data_with_transaction_control,
reverse_code=migrations.RunPython.noop,
),
]
def migrate_data_with_transaction_control(apps, schema_editor):
"""Migrate data with custom transaction handling"""
Post = apps.get_model('blog', 'Post')
# Process data in smaller transactions
batch_size = 1000
total_posts = Post.objects.count()
processed = 0
while processed < total_posts:
with transaction.atomic():
# Process batch within transaction
posts = Post.objects.all()[processed:processed + batch_size]
for post in posts:
# Perform data transformation
if not post.slug:
post.slug = slugify(post.title)
post.save()
processed += batch_size
print(f"Processed {processed}/{total_posts} posts")
# Savepoint usage in migrations
def migrate_with_savepoints(apps, schema_editor):
"""Use savepoints for partial rollback capability"""
Post = apps.get_model('blog', 'Post')
Category = apps.get_model('blog', 'Category')
with transaction.atomic():
# Create savepoint before risky operation
savepoint = transaction.savepoint()
try:
# Risky data transformation
for post in Post.objects.all():
# Complex logic that might fail
if post.category_name:
category, created = Category.objects.get_or_create(
name=post.category_name
)
post.category = category
post.save()
# Commit savepoint if successful
transaction.savepoint_commit(savepoint)
except Exception as e:
# Rollback to savepoint on error
transaction.savepoint_rollback(savepoint)
print(f"Error during migration: {e}")
raise
# Conditional transaction handling
class ConditionalTransactionMigration(migrations.Migration):
"""Migration with conditional transaction behavior"""
dependencies = [
('blog', '0005_transaction_controlled'),
]
def __init__(self, name, app_label):
super().__init__(name, app_label)
# Determine atomicity based on data size
from django.db import connection
with connection.cursor() as cursor:
cursor.execute("SELECT COUNT(*) FROM blog_post")
post_count = cursor.fetchone()[0]
# Use non-atomic for large datasets
self.atomic = post_count < 100000
operations = [
migrations.RunPython(
code=conditional_data_migration,
reverse_code=migrations.RunPython.noop,
),
]
def conditional_data_migration(apps, schema_editor):
"""Data migration that adapts to dataset size"""
Post = apps.get_model('blog', 'Post')
total_posts = Post.objects.count()
if total_posts < 10000:
# Small dataset: process all at once
with transaction.atomic():
for post in Post.objects.all():
post.updated_at = post.created_at
post.save()
else:
# Large dataset: process in batches
batch_size = 5000
processed = 0
while processed < total_posts:
with transaction.atomic():
posts = Post.objects.all()[processed:processed + batch_size]
Post.objects.filter(
id__in=[p.id for p in posts]
).update(updated_at=F('created_at'))
processed += batch_size
print(f"Updated {processed}/{total_posts} posts")
class RobustMigrationPatterns:
"""Patterns for robust migration error handling"""
@staticmethod
def migration_with_retry_logic(apps, schema_editor):
"""Migration with automatic retry on transient failures"""
import time
from django.db import transaction, OperationalError
Post = apps.get_model('blog', 'Post')
max_retries = 3
retry_delay = 1 # seconds
for attempt in range(max_retries):
try:
with transaction.atomic():
# Perform migration operation
for post in Post.objects.all():
post.search_vector = SearchVector('title', 'content')
post.save()
# Success - break out of retry loop
break
except OperationalError as e:
if attempt < max_retries - 1:
print(f"Migration attempt {attempt + 1} failed: {e}")
print(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
else:
print(f"Migration failed after {max_retries} attempts")
raise
@staticmethod
def migration_with_progress_tracking(apps, schema_editor):
"""Migration that tracks progress and can resume"""
Post = apps.get_model('blog', 'Post')
# Check for existing progress marker
from django.core.cache import cache
migration_key = 'migration_progress_post_slugs'
last_processed_id = cache.get(migration_key, 0)
try:
posts_to_process = Post.objects.filter(
id__gt=last_processed_id,
slug__isnull=True
).order_by('id')
batch_size = 100
for i in range(0, posts_to_process.count(), batch_size):
batch = posts_to_process[i:i + batch_size]
with transaction.atomic():
for post in batch:
post.slug = slugify(post.title)
post.save()
# Update progress marker
cache.set(migration_key, post.id, timeout=3600)
print(f"Processed batch ending with ID {batch.last().id}")
finally:
# Clean up progress marker on completion
cache.delete(migration_key)
@staticmethod
def migration_with_validation(apps, schema_editor):
"""Migration with built-in validation and rollback"""
Post = apps.get_model('blog', 'Post')
# Store original state for validation
original_count = Post.objects.count()
with transaction.atomic():
savepoint = transaction.savepoint()
try:
# Perform migration
Post.objects.filter(status='draft').update(
status='published',
published_at=timezone.now()
)
# Validate results
published_count = Post.objects.filter(status='published').count()
if published_count == 0:
raise ValueError("No posts were published - this seems wrong")
# Additional validation
if Post.objects.count() != original_count:
raise ValueError("Post count changed unexpectedly")
# Commit if validation passes
transaction.savepoint_commit(savepoint)
print(f"Successfully published {published_count} posts")
except Exception as e:
# Rollback on validation failure
transaction.savepoint_rollback(savepoint)
print(f"Migration validation failed: {e}")
raise
# Error recovery utilities
class MigrationErrorRecovery:
"""Utilities for recovering from migration errors"""
@staticmethod
def create_recovery_migration(failed_migration_name, error_details):
"""Create a recovery migration for a failed migration"""
recovery_operations = []
# Analyze error and create appropriate recovery operations
if 'duplicate key' in error_details.lower():
recovery_operations.append(
migrations.RunSQL(
sql="-- Remove duplicate entries before retrying",
reverse_sql="-- No reverse operation needed",
)
)
elif 'column does not exist' in error_details.lower():
recovery_operations.append(
migrations.RunSQL(
sql="-- Add missing column",
reverse_sql="-- Remove column if needed",
)
)
# Generate recovery migration content
recovery_migration = f'''
from django.db import migrations, models
class Migration(migrations.Migration):
"""Recovery migration for {failed_migration_name}"""
dependencies = [
# Add appropriate dependencies
]
operations = {recovery_operations}
'''
return recovery_migration
@staticmethod
def analyze_migration_failure(migration_name, error):
"""Analyze migration failure and suggest recovery steps"""
analysis = {
'migration': migration_name,
'error_type': type(error).__name__,
'error_message': str(error),
'suggested_actions': [],
'recovery_strategy': 'manual'
}
error_msg = str(error).lower()
# Common error patterns and solutions
if 'duplicate key' in error_msg or 'unique constraint' in error_msg:
analysis['suggested_actions'].extend([
"Identify and remove duplicate records",
"Add data cleaning step before migration",
"Consider using get_or_create instead of create"
])
analysis['recovery_strategy'] = 'data_cleanup'
elif 'column does not exist' in error_msg:
analysis['suggested_actions'].extend([
"Check if previous migration was applied",
"Verify migration dependencies",
"Consider fake-applying missing migrations"
])
analysis['recovery_strategy'] = 'dependency_fix'
elif 'table does not exist' in error_msg:
analysis['suggested_actions'].extend([
"Run initial migrations first",
"Check database connection and permissions",
"Verify app is in INSTALLED_APPS"
])
analysis['recovery_strategy'] = 'setup_issue'
elif 'timeout' in error_msg or 'lock' in error_msg:
analysis['suggested_actions'].extend([
"Run migration during low-traffic period",
"Consider breaking migration into smaller parts",
"Check for long-running queries blocking migration"
])
analysis['recovery_strategy'] = 'performance_issue'
return analysis
# Handling multiple databases in migrations
class MultiDatabaseMigration(migrations.Migration):
"""Migration that works with multiple databases"""
dependencies = [
('blog', '0006_conditional_transaction'),
]
operations = [
migrations.RunPython(
code=migrate_across_databases,
reverse_code=migrations.RunPython.noop,
),
]
def migrate_across_databases(apps, schema_editor):
"""Migrate data across multiple databases"""
from django.db import connections, transaction
# Get database connections
default_db = connections['default']
analytics_db = connections['analytics']
Post = apps.get_model('blog', 'Post')
# Note: Cross-database transactions are not supported
# Each database operation must be in its own transaction
try:
# Migrate data from default to analytics database
with transaction.atomic(using='default'):
posts_data = list(Post.objects.using('default').values(
'id', 'title', 'view_count', 'created_at'
))
# Insert into analytics database
with transaction.atomic(using='analytics'):
# Assuming we have an AnalyticsPost model in analytics DB
for post_data in posts_data:
# Insert logic here
pass
except Exception as e:
print(f"Cross-database migration failed: {e}")
# Manual cleanup may be required
raise
# Distributed transaction simulation
def simulate_distributed_transaction(apps, schema_editor):
"""Simulate distributed transaction with manual coordination"""
from django.db import connections, transaction
databases = ['default', 'analytics', 'cache']
operations_log = []
try:
# Phase 1: Prepare all operations
for db_alias in databases:
with transaction.atomic(using=db_alias):
# Prepare operation on each database
savepoint = transaction.savepoint(using=db_alias)
# Perform operation
# ... database-specific operations ...
operations_log.append({
'database': db_alias,
'savepoint': savepoint,
'status': 'prepared'
})
# Phase 2: Commit all operations
for operation in operations_log:
transaction.savepoint_commit(
operation['savepoint'],
using=operation['database']
)
operation['status'] = 'committed'
except Exception as e:
# Rollback all prepared operations
for operation in operations_log:
if operation['status'] == 'prepared':
transaction.savepoint_rollback(
operation['savepoint'],
using=operation['database']
)
raise
# Long-running migration with checkpoints
class CheckpointMigration(migrations.Migration):
"""Migration with checkpoint system for long operations"""
dependencies = [
('blog', '0007_multi_database'),
]
operations = [
migrations.RunPython(
code=long_running_migration_with_checkpoints,
reverse_code=migrations.RunPython.noop,
),
]
def long_running_migration_with_checkpoints(apps, schema_editor):
"""Long-running migration with checkpoint system"""
from django.core.cache import cache
from django.db import transaction
Post = apps.get_model('blog', 'Post')
checkpoint_key = 'migration_checkpoint_post_processing'
batch_size = 1000
# Get last checkpoint
last_checkpoint = cache.get(checkpoint_key, {'last_id': 0, 'processed': 0})
total_posts = Post.objects.filter(id__gt=last_checkpoint['last_id']).count()
processed_count = last_checkpoint['processed']
print(f"Resuming migration from ID {last_checkpoint['last_id']}")
print(f"Already processed: {processed_count} posts")
print(f"Remaining: {total_posts} posts")
try:
posts_queryset = Post.objects.filter(
id__gt=last_checkpoint['last_id']
).order_by('id')
for i in range(0, total_posts, batch_size):
batch = posts_queryset[i:i + batch_size]
with transaction.atomic():
batch_list = list(batch)
for post in batch_list:
# Perform expensive operation
post.search_content = f"{post.title} {post.content}"
post.save()
# Update checkpoint
if batch_list:
last_id = batch_list[-1].id
processed_count += len(batch_list)
cache.set(checkpoint_key, {
'last_id': last_id,
'processed': processed_count
}, timeout=3600)
print(f"Checkpoint: processed {processed_count} posts, last ID: {last_id}")
except Exception as e:
print(f"Migration failed at checkpoint {last_checkpoint}")
print(f"Error: {e}")
print("Migration can be resumed by running again")
raise
finally:
# Clean up checkpoint on successful completion
if processed_count >= total_posts:
cache.delete(checkpoint_key)
print("Migration completed successfully")
class TransactionOptimization:
"""Strategies for optimizing migration transactions"""
@staticmethod
def bulk_operations_migration(apps, schema_editor):
"""Use bulk operations to minimize transaction overhead"""
Post = apps.get_model('blog', 'Post')
# Inefficient: Individual saves in transaction
# with transaction.atomic():
# for post in Post.objects.all():
# post.view_count = 0
# post.save()
# Efficient: Bulk update
with transaction.atomic():
Post.objects.all().update(view_count=0)
# For more complex operations, use bulk_create/bulk_update
posts_to_update = []
for post in Post.objects.all():
post.slug = slugify(post.title)
posts_to_update.append(post)
# Bulk update (Django 2.2+)
Post.objects.bulk_update(posts_to_update, ['slug'], batch_size=1000)
@staticmethod
def optimized_data_migration(apps, schema_editor):
"""Optimized data migration with minimal locking"""
from django.db import connection
# Use raw SQL for better performance
with connection.cursor() as cursor:
# Single SQL statement is more efficient than Python loop
cursor.execute("""
UPDATE blog_post
SET slug = LOWER(REPLACE(REPLACE(title, ' ', '-'), '--', '-'))
WHERE slug IS NULL OR slug = ''
""")
affected_rows = cursor.rowcount
print(f"Updated {affected_rows} posts with slugs")
@staticmethod
def memory_efficient_migration(apps, schema_editor):
"""Memory-efficient migration for large datasets"""
Post = apps.get_model('blog', 'Post')
# Use iterator() to avoid loading all objects into memory
batch_size = 1000
posts_processed = 0
# Process in chunks to manage memory usage
posts_queryset = Post.objects.all().iterator(chunk_size=batch_size)
posts_to_update = []
for post in posts_queryset:
post.search_content = f"{post.title} {post.content}"
posts_to_update.append(post)
# Process batch when it reaches batch_size
if len(posts_to_update) >= batch_size:
with transaction.atomic():
Post.objects.bulk_update(
posts_to_update,
['search_content'],
batch_size=batch_size
)
posts_processed += len(posts_to_update)
posts_to_update = []
print(f"Processed {posts_processed} posts")
# Process remaining posts
if posts_to_update:
with transaction.atomic():
Post.objects.bulk_update(
posts_to_update,
['search_content'],
batch_size=len(posts_to_update)
)
posts_processed += len(posts_to_update)
print(f"Final batch: processed {posts_processed} posts total")
# Database-specific optimizations
class DatabaseSpecificOptimizations:
"""Database-specific transaction optimizations"""
@staticmethod
def postgresql_optimizations(apps, schema_editor):
"""PostgreSQL-specific optimizations"""
from django.db import connection
if connection.vendor != 'postgresql':
return
with connection.cursor() as cursor:
# Disable autocommit for better performance
cursor.execute("SET autocommit = off")
try:
# Use COPY for bulk inserts (if applicable)
cursor.execute("BEGIN")
# Bulk operations here
cursor.execute("""
UPDATE blog_post
SET updated_at = NOW()
WHERE updated_at IS NULL
""")
cursor.execute("COMMIT")
except Exception as e:
cursor.execute("ROLLBACK")
raise
finally:
cursor.execute("SET autocommit = on")
@staticmethod
def mysql_optimizations(apps, schema_editor):
"""MySQL-specific optimizations"""
from django.db import connection
if connection.vendor != 'mysql':
return
with connection.cursor() as cursor:
# Disable foreign key checks for faster operations
cursor.execute("SET foreign_key_checks = 0")
try:
# Perform bulk operations
cursor.execute("""
UPDATE blog_post
SET view_count = COALESCE(view_count, 0)
""")
finally:
# Re-enable foreign key checks
cursor.execute("SET foreign_key_checks = 1")
@staticmethod
def sqlite_optimizations(apps, schema_editor):
"""SQLite-specific optimizations"""
from django.db import connection
if connection.vendor != 'sqlite':
return
with connection.cursor() as cursor:
# Optimize SQLite for bulk operations
cursor.execute("PRAGMA synchronous = OFF")
cursor.execute("PRAGMA journal_mode = MEMORY")
try:
# Perform operations
cursor.execute("""
UPDATE blog_post
SET slug = lower(replace(title, ' ', '-'))
WHERE slug IS NULL
""")
finally:
# Restore normal settings
cursor.execute("PRAGMA synchronous = NORMAL")
cursor.execute("PRAGMA journal_mode = DELETE")
Understanding transaction handling in Django migrations ensures data integrity while optimizing performance. Proper use of atomic operations, error handling, and database-specific optimizations enables safe and efficient schema evolution in production environments.
Dependencies and Workflow
Understanding migration dependencies and establishing proper workflows is crucial for managing complex Django projects with multiple apps and developers. This section covers dependency management, conflict resolution, and best practices for team collaboration.
Adding Migrations to Apps
Adding migrations to Django apps requires understanding the migration system's integration with app structure, handling initial migrations, and managing migrations across different app states. This section covers best practices for incorporating migrations into new and existing applications.