Data migrations allow you to transform, populate, or clean up data during schema changes. Unlike schema migrations that modify database structure, data migrations work with the actual data in your database. Understanding how to write effective data migrations is crucial for maintaining data integrity during application evolution.
# Basic data migration using RunPython
from django.db import migrations
def populate_slugs(apps, schema_editor):
"""Forward data migration function"""
# Get historical model - represents model state at this migration
Post = apps.get_model('blog', 'Post')
# Process data using historical model
for post in Post.objects.filter(slug__isnull=True):
post.slug = slugify(post.title)
post.save()
def remove_slugs(apps, schema_editor):
"""Reverse data migration function"""
Post = apps.get_model('blog', 'Post')
# Clear slugs for rollback
Post.objects.update(slug=None)
class Migration(migrations.Migration):
"""Data migration example"""
dependencies = [
('blog', '0001_initial'),
]
operations = [
# Add slug field first
migrations.AddField(
model_name='post',
name='slug',
field=models.SlugField(max_length=200, null=True, blank=True),
),
# Then populate it with data
migrations.RunPython(
code=populate_slugs,
reverse_code=remove_slugs,
),
# Finally make it required (optional)
migrations.AlterField(
model_name='post',
name='slug',
field=models.SlugField(max_length=200, unique=True),
),
]
# Data migration with error handling
def robust_data_migration(apps, schema_editor):
"""Data migration with comprehensive error handling"""
Post = apps.get_model('blog', 'Post')
# Track migration progress
total_posts = Post.objects.count()
processed = 0
errors = 0
print(f"Starting data migration for {total_posts} posts")
for post in Post.objects.all():
try:
# Perform data transformation
if not post.slug and post.title:
post.slug = slugify(post.title)
# Handle duplicate slugs
original_slug = post.slug
counter = 1
while Post.objects.filter(slug=post.slug).exists():
post.slug = f"{original_slug}-{counter}"
counter += 1
post.save()
processed += 1
except Exception as e:
errors += 1
print(f"Error processing post {post.id}: {e}")
# Continue processing other posts
continue
print(f"Migration completed: {processed} processed, {errors} errors")
if errors > 0:
print("Some posts had errors - review and fix manually if needed")
# Batch processing for large datasets
def batch_data_migration(apps, schema_editor):
"""Process data in batches for better performance"""
Post = apps.get_model('blog', 'Post')
batch_size = 1000
total_posts = Post.objects.count()
processed = 0
print(f"Processing {total_posts} posts in batches of {batch_size}")
# Use iterator to avoid loading all objects into memory
posts_queryset = Post.objects.all().iterator(chunk_size=batch_size)
posts_to_update = []
for post in posts_queryset:
if not post.slug and post.title:
post.slug = slugify(post.title)
posts_to_update.append(post)
# Process batch when it reaches batch_size
if len(posts_to_update) >= batch_size:
# Use bulk_update for better performance
Post.objects.bulk_update(posts_to_update, ['slug'])
processed += len(posts_to_update)
posts_to_update = []
print(f"Processed {processed}/{total_posts} posts")
# Process remaining posts
if posts_to_update:
Post.objects.bulk_update(posts_to_update, ['slug'])
processed += len(posts_to_update)
print(f"Batch migration completed: {processed} posts processed")
class AdvancedDataMigrationPatterns:
"""Advanced patterns for data migrations"""
@staticmethod
def conditional_data_migration():
"""Data migration that runs conditionally"""
def conditional_migration(apps, schema_editor):
"""Run migration only if conditions are met"""
Post = apps.get_model('blog', 'Post')
# Check if migration is needed
posts_without_slugs = Post.objects.filter(slug__isnull=True).count()
if posts_without_slugs == 0:
print("All posts already have slugs, skipping migration")
return
# Check system resources before proceeding
import psutil
memory_usage = psutil.virtual_memory().percent
if memory_usage > 80:
print(f"High memory usage ({memory_usage}%), postponing migration")
return
print(f"Proceeding with migration for {posts_without_slugs} posts")
# Perform migration
for post in Post.objects.filter(slug__isnull=True):
post.slug = slugify(post.title)
post.save()
return conditional_migration
@staticmethod
def progressive_data_migration():
"""Data migration that can be run multiple times"""
def progressive_migration(apps, schema_editor):
"""Migration that processes data progressively"""
Post = apps.get_model('blog', 'Post')
# Use cache to track progress
from django.core.cache import cache
progress_key = 'data_migration_progress'
last_processed_id = cache.get(progress_key, 0)
# Process next batch
batch_size = 100
posts_to_process = Post.objects.filter(
id__gt=last_processed_id,
slug__isnull=True
).order_by('id')[:batch_size]
if not posts_to_process:
print("Progressive migration completed")
cache.delete(progress_key)
return
processed_count = 0
for post in posts_to_process:
post.slug = slugify(post.title)
post.save()
processed_count += 1
# Update progress
cache.set(progress_key, post.id, timeout=3600)
print(f"Progressive migration: processed {processed_count} posts")
print(f"Last processed ID: {posts_to_process.last().id}")
# Schedule next batch (in production, use task queue)
remaining = Post.objects.filter(
id__gt=posts_to_process.last().id,
slug__isnull=True
).count()
if remaining > 0:
print(f"Remaining posts to process: {remaining}")
return progressive_migration
@staticmethod
def data_validation_migration():
"""Data migration with built-in validation"""
def validating_migration(apps, schema_editor):
"""Migration that validates data before and after"""
Post = apps.get_model('blog', 'Post')
# Pre-migration validation
print("Running pre-migration validation...")
total_posts = Post.objects.count()
posts_with_titles = Post.objects.filter(title__isnull=False).count()
posts_with_slugs = Post.objects.filter(slug__isnull=False).count()
print(f"Total posts: {total_posts}")
print(f"Posts with titles: {posts_with_titles}")
print(f"Posts with slugs: {posts_with_slugs}")
if posts_with_titles == 0:
raise ValueError("No posts have titles - cannot generate slugs")
# Perform migration
print("Performing data migration...")
migrated_count = 0
for post in Post.objects.filter(slug__isnull=True):
if post.title:
post.slug = slugify(post.title)
post.save()
migrated_count += 1
# Post-migration validation
print("Running post-migration validation...")
final_posts_with_slugs = Post.objects.filter(slug__isnull=False).count()
duplicate_slugs = Post.objects.values('slug').annotate(
count=Count('slug')
).filter(count__gt=1).count()
print(f"Posts with slugs after migration: {final_posts_with_slugs}")
print(f"Duplicate slugs found: {duplicate_slugs}")
if duplicate_slugs > 0:
print("Warning: Duplicate slugs detected - manual cleanup may be needed")
if final_posts_with_slugs < posts_with_titles:
print("Warning: Some posts still don't have slugs")
print(f"Migration completed: {migrated_count} posts migrated")
return validating_migration
@staticmethod
def cross_model_data_migration():
"""Data migration across multiple models"""
def cross_model_migration(apps, schema_editor):
"""Migrate data between related models"""
Post = apps.get_model('blog', 'Post')
Category = apps.get_model('blog', 'Category')
Tag = apps.get_model('blog', 'Tag')
User = apps.get_model('auth', 'User')
# Create default category if needed
default_category, created = Category.objects.get_or_create(
name='Uncategorized',
defaults={'description': 'Default category for posts'}
)
if created:
print("Created default category")
# Assign category to posts without one
posts_without_category = Post.objects.filter(category__isnull=True)
posts_without_category.update(category=default_category)
print(f"Assigned default category to {posts_without_category.count()} posts")
# Create author profiles for posts
authors = User.objects.filter(post__isnull=False).distinct()
for author in authors:
author_posts = Post.objects.filter(author=author)
post_count = author_posts.count()
# Update author's post count (if such field exists)
if hasattr(author, 'profile'):
author.profile.post_count = post_count
author.profile.save()
# Create default tags for posts without tags
default_tag, created = Tag.objects.get_or_create(
name='general',
defaults={'slug': 'general'}
)
posts_without_tags = Post.objects.filter(tags__isnull=True)
for post in posts_without_tags:
post.tags.add(default_tag)
print(f"Added default tag to {posts_without_tags.count()} posts")
return cross_model_migration
# Complex data transformations
class ComplexDataTransformations:
"""Handle complex data transformation scenarios"""
@staticmethod
def json_field_migration():
"""Migrate data to/from JSON fields"""
def migrate_to_json(apps, schema_editor):
"""Convert separate fields to JSON field"""
Post = apps.get_model('blog', 'Post')
for post in Post.objects.all():
# Combine multiple fields into JSON
metadata = {}
if hasattr(post, 'view_count'):
metadata['view_count'] = post.view_count or 0
if hasattr(post, 'like_count'):
metadata['like_count'] = post.like_count or 0
if hasattr(post, 'featured'):
metadata['featured'] = post.featured or False
# Store as JSON
post.metadata = metadata
post.save()
def migrate_from_json(apps, schema_editor):
"""Extract data from JSON field to separate fields"""
Post = apps.get_model('blog', 'Post')
for post in Post.objects.all():
if post.metadata:
# Extract from JSON to separate fields
if 'view_count' in post.metadata:
post.view_count = post.metadata['view_count']
if 'like_count' in post.metadata:
post.like_count = post.metadata['like_count']
if 'featured' in post.metadata:
post.featured = post.metadata['featured']
post.save()
return migrate_to_json, migrate_from_json
@staticmethod
def data_normalization_migration():
"""Normalize denormalized data"""
def normalize_data(apps, schema_editor):
"""Normalize data structure"""
Post = apps.get_model('blog', 'Post')
Category = apps.get_model('blog', 'Category')
# Extract categories from post titles
category_mapping = {}
for post in Post.objects.all():
# Extract category from title (example pattern)
if '[' in post.title and ']' in post.title:
category_name = post.title.split('[')[1].split(']')[0]
# Get or create category
if category_name not in category_mapping:
category, created = Category.objects.get_or_create(
name=category_name,
defaults={'description': f'Category for {category_name} posts'}
)
category_mapping[category_name] = category
# Assign category to post
post.category = category_mapping[category_name]
# Clean up title
post.title = post.title.replace(f'[{category_name}]', '').strip()
post.save()
print(f"Created {len(category_mapping)} categories from post titles")
return normalize_data
@staticmethod
def data_cleanup_migration():
"""Clean up inconsistent data"""
def cleanup_data(apps, schema_editor):
"""Clean up data inconsistencies"""
Post = apps.get_model('blog', 'Post')
cleanup_stats = {
'trimmed_titles': 0,
'fixed_statuses': 0,
'removed_duplicates': 0,
'fixed_dates': 0
}
for post in Post.objects.all():
modified = False
# Trim whitespace from titles
if post.title and post.title != post.title.strip():
post.title = post.title.strip()
cleanup_stats['trimmed_titles'] += 1
modified = True
# Fix invalid statuses
valid_statuses = ['draft', 'published', 'archived']
if hasattr(post, 'status') and post.status not in valid_statuses:
post.status = 'draft'
cleanup_stats['fixed_statuses'] += 1
modified = True
# Fix future dates
if hasattr(post, 'created_at') and post.created_at > timezone.now():
post.created_at = timezone.now()
cleanup_stats['fixed_dates'] += 1
modified = True
if modified:
post.save()
# Remove duplicate posts (same title and author)
from django.db.models import Count
duplicates = Post.objects.values('title', 'author').annotate(
count=Count('id')
).filter(count__gt=1)
for duplicate in duplicates:
# Keep the first post, delete others
posts = Post.objects.filter(
title=duplicate['title'],
author=duplicate['author']
).order_by('created_at')
posts_to_delete = posts[1:] # Keep first, delete rest
for post in posts_to_delete:
post.delete()
cleanup_stats['removed_duplicates'] += 1
print("Data cleanup completed:")
for stat, count in cleanup_stats.items():
print(f" {stat}: {count}")
return cleanup_data
class DataMigrationOptimization:
"""Optimize data migration performance"""
@staticmethod
def bulk_operations_migration():
"""Use bulk operations for better performance"""
def bulk_migration(apps, schema_editor):
"""Efficient bulk data migration"""
Post = apps.get_model('blog', 'Post')
# Method 1: Bulk update with database-level operations
from django.db.models import F, Case, When, Value
# Update all posts at once using database operations
Post.objects.update(
slug=Case(
When(slug__isnull=True, then=F('title')),
default=F('slug')
)
)
# Method 2: Bulk create for new records
posts_to_create = []
for i in range(1000):
posts_to_create.append(Post(
title=f'Bulk Post {i}',
content=f'Content for bulk post {i}',
slug=f'bulk-post-{i}'
))
# Create all at once
Post.objects.bulk_create(posts_to_create, batch_size=100)
# Method 3: Bulk update existing records
posts_to_update = []
for post in Post.objects.filter(slug__isnull=True):
post.slug = slugify(post.title)
posts_to_update.append(post)
# Update in batches
batch_size = 1000
for i in range(0, len(posts_to_update), batch_size):
batch = posts_to_update[i:i + batch_size]
Post.objects.bulk_update(batch, ['slug'], batch_size=batch_size)
return bulk_migration
@staticmethod
def raw_sql_migration():
"""Use raw SQL for maximum performance"""
def raw_sql_migration(apps, schema_editor):
"""High-performance migration using raw SQL"""
from django.db import connection
with connection.cursor() as cursor:
# Update slugs using SQL function
cursor.execute("""
UPDATE blog_post
SET slug = LOWER(REPLACE(REPLACE(title, ' ', '-'), '--', '-'))
WHERE slug IS NULL AND title IS NOT NULL
""")
affected_rows = cursor.rowcount
print(f"Updated {affected_rows} posts with slugs using raw SQL")
# Create indexes for better performance
cursor.execute("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_blog_post_slug
ON blog_post(slug) WHERE slug IS NOT NULL
""")
# Analyze table statistics for query optimization
cursor.execute("ANALYZE blog_post")
return raw_sql_migration
@staticmethod
def memory_efficient_migration():
"""Memory-efficient migration for large datasets"""
def memory_efficient_migration(apps, schema_editor):
"""Process large datasets without memory issues"""
Post = apps.get_model('blog', 'Post')
# Use iterator to avoid loading all objects into memory
batch_size = 1000
# Process in chunks
total_count = Post.objects.count()
processed = 0
while processed < total_count:
# Get batch using offset/limit
batch = Post.objects.all()[processed:processed + batch_size]
# Process batch
posts_to_update = []
for post in batch:
if not post.slug and post.title:
post.slug = slugify(post.title)
posts_to_update.append(post)
# Bulk update batch
if posts_to_update:
Post.objects.bulk_update(posts_to_update, ['slug'])
processed += batch_size
# Progress reporting
progress = min(processed, total_count)
print(f"Processed {progress}/{total_count} posts ({progress/total_count*100:.1f}%)")
# Optional: Add small delay to reduce system load
import time
time.sleep(0.1)
return memory_efficient_migration
@staticmethod
def parallel_migration():
"""Parallel processing for data migration"""
def parallel_migration(apps, schema_editor):
"""Process data in parallel for better performance"""
Post = apps.get_model('blog', 'Post')
# Note: Be careful with parallel processing in migrations
# This is an example - test thoroughly before using in production
from concurrent.futures import ThreadPoolExecutor
import threading
def process_batch(batch_ids):
"""Process a batch of posts"""
# Each thread needs its own database connection
from django.db import connection
connection.ensure_connection()
batch_posts = Post.objects.filter(id__in=batch_ids)
posts_to_update = []
for post in batch_posts:
if not post.slug and post.title:
post.slug = slugify(post.title)
posts_to_update.append(post)
if posts_to_update:
Post.objects.bulk_update(posts_to_update, ['slug'])
return len(posts_to_update)
# Get all post IDs
all_post_ids = list(Post.objects.values_list('id', flat=True))
# Split into batches
batch_size = 1000
batches = [
all_post_ids[i:i + batch_size]
for i in range(0, len(all_post_ids), batch_size)
]
# Process batches in parallel
total_updated = 0
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(process_batch, batch) for batch in batches]
for future in futures:
updated_count = future.result()
total_updated += updated_count
print(f"Parallel migration completed: {total_updated} posts updated")
return parallel_migration
# Data migration monitoring and logging
class DataMigrationMonitoring:
"""Monitor and log data migration progress"""
@staticmethod
def monitored_migration():
"""Data migration with comprehensive monitoring"""
def monitored_migration(apps, schema_editor):
"""Migration with detailed monitoring and logging"""
import time
import logging
import psutil
logger = logging.getLogger(__name__)
Post = apps.get_model('blog', 'Post')
# Setup monitoring
start_time = time.time()
initial_memory = psutil.virtual_memory().percent
logger.info("Starting data migration")
logger.info(f"Initial memory usage: {initial_memory}%")
total_posts = Post.objects.count()
posts_to_migrate = Post.objects.filter(slug__isnull=True).count()
logger.info(f"Total posts: {total_posts}")
logger.info(f"Posts to migrate: {posts_to_migrate}")
if posts_to_migrate == 0:
logger.info("No posts need migration")
return
# Process with monitoring
processed = 0
errors = 0
batch_size = 100
for i in range(0, posts_to_migrate, batch_size):
batch_start = time.time()
batch_posts = Post.objects.filter(slug__isnull=True)[:batch_size]
batch_processed = 0
for post in batch_posts:
try:
post.slug = slugify(post.title)
post.save()
batch_processed += 1
processed += 1
except Exception as e:
errors += 1
logger.error(f"Error processing post {post.id}: {e}")
batch_time = time.time() - batch_start
current_memory = psutil.virtual_memory().percent
# Log batch progress
logger.info(f"Batch {i//batch_size + 1}: {batch_processed} posts, "
f"{batch_time:.2f}s, memory: {current_memory}%")
# Check for memory issues
if current_memory > 90:
logger.warning(f"High memory usage: {current_memory}%")
# Optional: pause migration
time.sleep(1)
# Final statistics
total_time = time.time() - start_time
final_memory = psutil.virtual_memory().percent
logger.info("Data migration completed")
logger.info(f"Total time: {total_time:.2f} seconds")
logger.info(f"Posts processed: {processed}")
logger.info(f"Errors: {errors}")
logger.info(f"Final memory usage: {final_memory}%")
if errors > 0:
logger.warning(f"Migration completed with {errors} errors")
return monitored_migration
Data migrations are essential for maintaining data integrity during application evolution. Proper implementation with error handling, performance optimization, and monitoring ensures reliable data transformations in production environments.
Considerations When Removing Fields
Removing fields from Django models requires careful planning to avoid data loss and maintain application stability. This section covers safe field removal strategies, data preservation techniques, and best practices for handling field deprecation in production environments.
Squashing Migrations
Migration squashing combines multiple migrations into a single, optimized migration file. This process helps maintain a clean migration history, improves performance, and reduces complexity in long-running projects. Understanding when and how to squash migrations is essential for maintaining a healthy Django project.