Migrations

Data Migrations

Data migrations allow you to transform, populate, or clean up data during schema changes. Unlike schema migrations that modify database structure, data migrations work with the actual data in your database. Understanding how to write effective data migrations is crucial for maintaining data integrity during application evolution.

Data Migrations

Data migrations allow you to transform, populate, or clean up data during schema changes. Unlike schema migrations that modify database structure, data migrations work with the actual data in your database. Understanding how to write effective data migrations is crucial for maintaining data integrity during application evolution.

Understanding Data Migrations

Basic Data Migration Structure

# Basic data migration using RunPython
from django.db import migrations

def populate_slugs(apps, schema_editor):
    """Forward data migration function"""
    
    # Get historical model - represents model state at this migration
    Post = apps.get_model('blog', 'Post')
    
    # Process data using historical model
    for post in Post.objects.filter(slug__isnull=True):
        post.slug = slugify(post.title)
        post.save()

def remove_slugs(apps, schema_editor):
    """Reverse data migration function"""
    
    Post = apps.get_model('blog', 'Post')
    
    # Clear slugs for rollback
    Post.objects.update(slug=None)

class Migration(migrations.Migration):
    """Data migration example"""
    
    dependencies = [
        ('blog', '0001_initial'),
    ]
    
    operations = [
        # Add slug field first
        migrations.AddField(
            model_name='post',
            name='slug',
            field=models.SlugField(max_length=200, null=True, blank=True),
        ),
        
        # Then populate it with data
        migrations.RunPython(
            code=populate_slugs,
            reverse_code=remove_slugs,
        ),
        
        # Finally make it required (optional)
        migrations.AlterField(
            model_name='post',
            name='slug',
            field=models.SlugField(max_length=200, unique=True),
        ),
    ]

# Data migration with error handling
def robust_data_migration(apps, schema_editor):
    """Data migration with comprehensive error handling"""
    
    Post = apps.get_model('blog', 'Post')
    
    # Track migration progress
    total_posts = Post.objects.count()
    processed = 0
    errors = 0
    
    print(f"Starting data migration for {total_posts} posts")
    
    for post in Post.objects.all():
        try:
            # Perform data transformation
            if not post.slug and post.title:
                post.slug = slugify(post.title)
                
                # Handle duplicate slugs
                original_slug = post.slug
                counter = 1
                
                while Post.objects.filter(slug=post.slug).exists():
                    post.slug = f"{original_slug}-{counter}"
                    counter += 1
                
                post.save()
                processed += 1
            
        except Exception as e:
            errors += 1
            print(f"Error processing post {post.id}: {e}")
            
            # Continue processing other posts
            continue
    
    print(f"Migration completed: {processed} processed, {errors} errors")
    
    if errors > 0:
        print("Some posts had errors - review and fix manually if needed")

# Batch processing for large datasets
def batch_data_migration(apps, schema_editor):
    """Process data in batches for better performance"""
    
    Post = apps.get_model('blog', 'Post')
    
    batch_size = 1000
    total_posts = Post.objects.count()
    processed = 0
    
    print(f"Processing {total_posts} posts in batches of {batch_size}")
    
    # Use iterator to avoid loading all objects into memory
    posts_queryset = Post.objects.all().iterator(chunk_size=batch_size)
    
    posts_to_update = []
    
    for post in posts_queryset:
        if not post.slug and post.title:
            post.slug = slugify(post.title)
            posts_to_update.append(post)
        
        # Process batch when it reaches batch_size
        if len(posts_to_update) >= batch_size:
            # Use bulk_update for better performance
            Post.objects.bulk_update(posts_to_update, ['slug'])
            
            processed += len(posts_to_update)
            posts_to_update = []
            
            print(f"Processed {processed}/{total_posts} posts")
    
    # Process remaining posts
    if posts_to_update:
        Post.objects.bulk_update(posts_to_update, ['slug'])
        processed += len(posts_to_update)
    
    print(f"Batch migration completed: {processed} posts processed")

Advanced Data Migration Patterns

class AdvancedDataMigrationPatterns:
    """Advanced patterns for data migrations"""
    
    @staticmethod
    def conditional_data_migration():
        """Data migration that runs conditionally"""
        
        def conditional_migration(apps, schema_editor):
            """Run migration only if conditions are met"""
            
            Post = apps.get_model('blog', 'Post')
            
            # Check if migration is needed
            posts_without_slugs = Post.objects.filter(slug__isnull=True).count()
            
            if posts_without_slugs == 0:
                print("All posts already have slugs, skipping migration")
                return
            
            # Check system resources before proceeding
            import psutil
            
            memory_usage = psutil.virtual_memory().percent
            
            if memory_usage > 80:
                print(f"High memory usage ({memory_usage}%), postponing migration")
                return
            
            print(f"Proceeding with migration for {posts_without_slugs} posts")
            
            # Perform migration
            for post in Post.objects.filter(slug__isnull=True):
                post.slug = slugify(post.title)
                post.save()
        
        return conditional_migration
    
    @staticmethod
    def progressive_data_migration():
        """Data migration that can be run multiple times"""
        
        def progressive_migration(apps, schema_editor):
            """Migration that processes data progressively"""
            
            Post = apps.get_model('blog', 'Post')
            
            # Use cache to track progress
            from django.core.cache import cache
            
            progress_key = 'data_migration_progress'
            last_processed_id = cache.get(progress_key, 0)
            
            # Process next batch
            batch_size = 100
            
            posts_to_process = Post.objects.filter(
                id__gt=last_processed_id,
                slug__isnull=True
            ).order_by('id')[:batch_size]
            
            if not posts_to_process:
                print("Progressive migration completed")
                cache.delete(progress_key)
                return
            
            processed_count = 0
            
            for post in posts_to_process:
                post.slug = slugify(post.title)
                post.save()
                processed_count += 1
                
                # Update progress
                cache.set(progress_key, post.id, timeout=3600)
            
            print(f"Progressive migration: processed {processed_count} posts")
            print(f"Last processed ID: {posts_to_process.last().id}")
            
            # Schedule next batch (in production, use task queue)
            remaining = Post.objects.filter(
                id__gt=posts_to_process.last().id,
                slug__isnull=True
            ).count()
            
            if remaining > 0:
                print(f"Remaining posts to process: {remaining}")
        
        return progressive_migration
    
    @staticmethod
    def data_validation_migration():
        """Data migration with built-in validation"""
        
        def validating_migration(apps, schema_editor):
            """Migration that validates data before and after"""
            
            Post = apps.get_model('blog', 'Post')
            
            # Pre-migration validation
            print("Running pre-migration validation...")
            
            total_posts = Post.objects.count()
            posts_with_titles = Post.objects.filter(title__isnull=False).count()
            posts_with_slugs = Post.objects.filter(slug__isnull=False).count()
            
            print(f"Total posts: {total_posts}")
            print(f"Posts with titles: {posts_with_titles}")
            print(f"Posts with slugs: {posts_with_slugs}")
            
            if posts_with_titles == 0:
                raise ValueError("No posts have titles - cannot generate slugs")
            
            # Perform migration
            print("Performing data migration...")
            
            migrated_count = 0
            
            for post in Post.objects.filter(slug__isnull=True):
                if post.title:
                    post.slug = slugify(post.title)
                    post.save()
                    migrated_count += 1
            
            # Post-migration validation
            print("Running post-migration validation...")
            
            final_posts_with_slugs = Post.objects.filter(slug__isnull=False).count()
            duplicate_slugs = Post.objects.values('slug').annotate(
                count=Count('slug')
            ).filter(count__gt=1).count()
            
            print(f"Posts with slugs after migration: {final_posts_with_slugs}")
            print(f"Duplicate slugs found: {duplicate_slugs}")
            
            if duplicate_slugs > 0:
                print("Warning: Duplicate slugs detected - manual cleanup may be needed")
            
            if final_posts_with_slugs < posts_with_titles:
                print("Warning: Some posts still don't have slugs")
            
            print(f"Migration completed: {migrated_count} posts migrated")
        
        return validating_migration
    
    @staticmethod
    def cross_model_data_migration():
        """Data migration across multiple models"""
        
        def cross_model_migration(apps, schema_editor):
            """Migrate data between related models"""
            
            Post = apps.get_model('blog', 'Post')
            Category = apps.get_model('blog', 'Category')
            Tag = apps.get_model('blog', 'Tag')
            User = apps.get_model('auth', 'User')
            
            # Create default category if needed
            default_category, created = Category.objects.get_or_create(
                name='Uncategorized',
                defaults={'description': 'Default category for posts'}
            )
            
            if created:
                print("Created default category")
            
            # Assign category to posts without one
            posts_without_category = Post.objects.filter(category__isnull=True)
            posts_without_category.update(category=default_category)
            
            print(f"Assigned default category to {posts_without_category.count()} posts")
            
            # Create author profiles for posts
            authors = User.objects.filter(post__isnull=False).distinct()
            
            for author in authors:
                author_posts = Post.objects.filter(author=author)
                post_count = author_posts.count()
                
                # Update author's post count (if such field exists)
                if hasattr(author, 'profile'):
                    author.profile.post_count = post_count
                    author.profile.save()
            
            # Create default tags for posts without tags
            default_tag, created = Tag.objects.get_or_create(
                name='general',
                defaults={'slug': 'general'}
            )
            
            posts_without_tags = Post.objects.filter(tags__isnull=True)
            
            for post in posts_without_tags:
                post.tags.add(default_tag)
            
            print(f"Added default tag to {posts_without_tags.count()} posts")
        
        return cross_model_migration

# Complex data transformations
class ComplexDataTransformations:
    """Handle complex data transformation scenarios"""
    
    @staticmethod
    def json_field_migration():
        """Migrate data to/from JSON fields"""
        
        def migrate_to_json(apps, schema_editor):
            """Convert separate fields to JSON field"""
            
            Post = apps.get_model('blog', 'Post')
            
            for post in Post.objects.all():
                # Combine multiple fields into JSON
                metadata = {}
                
                if hasattr(post, 'view_count'):
                    metadata['view_count'] = post.view_count or 0
                
                if hasattr(post, 'like_count'):
                    metadata['like_count'] = post.like_count or 0
                
                if hasattr(post, 'featured'):
                    metadata['featured'] = post.featured or False
                
                # Store as JSON
                post.metadata = metadata
                post.save()
        
        def migrate_from_json(apps, schema_editor):
            """Extract data from JSON field to separate fields"""
            
            Post = apps.get_model('blog', 'Post')
            
            for post in Post.objects.all():
                if post.metadata:
                    # Extract from JSON to separate fields
                    if 'view_count' in post.metadata:
                        post.view_count = post.metadata['view_count']
                    
                    if 'like_count' in post.metadata:
                        post.like_count = post.metadata['like_count']
                    
                    if 'featured' in post.metadata:
                        post.featured = post.metadata['featured']
                    
                    post.save()
        
        return migrate_to_json, migrate_from_json
    
    @staticmethod
    def data_normalization_migration():
        """Normalize denormalized data"""
        
        def normalize_data(apps, schema_editor):
            """Normalize data structure"""
            
            Post = apps.get_model('blog', 'Post')
            Category = apps.get_model('blog', 'Category')
            
            # Extract categories from post titles
            category_mapping = {}
            
            for post in Post.objects.all():
                # Extract category from title (example pattern)
                if '[' in post.title and ']' in post.title:
                    category_name = post.title.split('[')[1].split(']')[0]
                    
                    # Get or create category
                    if category_name not in category_mapping:
                        category, created = Category.objects.get_or_create(
                            name=category_name,
                            defaults={'description': f'Category for {category_name} posts'}
                        )
                        category_mapping[category_name] = category
                    
                    # Assign category to post
                    post.category = category_mapping[category_name]
                    
                    # Clean up title
                    post.title = post.title.replace(f'[{category_name}]', '').strip()
                    
                    post.save()
            
            print(f"Created {len(category_mapping)} categories from post titles")
        
        return normalize_data
    
    @staticmethod
    def data_cleanup_migration():
        """Clean up inconsistent data"""
        
        def cleanup_data(apps, schema_editor):
            """Clean up data inconsistencies"""
            
            Post = apps.get_model('blog', 'Post')
            
            cleanup_stats = {
                'trimmed_titles': 0,
                'fixed_statuses': 0,
                'removed_duplicates': 0,
                'fixed_dates': 0
            }
            
            for post in Post.objects.all():
                modified = False
                
                # Trim whitespace from titles
                if post.title and post.title != post.title.strip():
                    post.title = post.title.strip()
                    cleanup_stats['trimmed_titles'] += 1
                    modified = True
                
                # Fix invalid statuses
                valid_statuses = ['draft', 'published', 'archived']
                if hasattr(post, 'status') and post.status not in valid_statuses:
                    post.status = 'draft'
                    cleanup_stats['fixed_statuses'] += 1
                    modified = True
                
                # Fix future dates
                if hasattr(post, 'created_at') and post.created_at > timezone.now():
                    post.created_at = timezone.now()
                    cleanup_stats['fixed_dates'] += 1
                    modified = True
                
                if modified:
                    post.save()
            
            # Remove duplicate posts (same title and author)
            from django.db.models import Count
            
            duplicates = Post.objects.values('title', 'author').annotate(
                count=Count('id')
            ).filter(count__gt=1)
            
            for duplicate in duplicates:
                # Keep the first post, delete others
                posts = Post.objects.filter(
                    title=duplicate['title'],
                    author=duplicate['author']
                ).order_by('created_at')
                
                posts_to_delete = posts[1:]  # Keep first, delete rest
                
                for post in posts_to_delete:
                    post.delete()
                    cleanup_stats['removed_duplicates'] += 1
            
            print("Data cleanup completed:")
            for stat, count in cleanup_stats.items():
                print(f"  {stat}: {count}")
        
        return cleanup_data

Performance Optimization for Data Migrations

Efficient Data Processing

class DataMigrationOptimization:
    """Optimize data migration performance"""
    
    @staticmethod
    def bulk_operations_migration():
        """Use bulk operations for better performance"""
        
        def bulk_migration(apps, schema_editor):
            """Efficient bulk data migration"""
            
            Post = apps.get_model('blog', 'Post')
            
            # Method 1: Bulk update with database-level operations
            from django.db.models import F, Case, When, Value
            
            # Update all posts at once using database operations
            Post.objects.update(
                slug=Case(
                    When(slug__isnull=True, then=F('title')),
                    default=F('slug')
                )
            )
            
            # Method 2: Bulk create for new records
            posts_to_create = []
            
            for i in range(1000):
                posts_to_create.append(Post(
                    title=f'Bulk Post {i}',
                    content=f'Content for bulk post {i}',
                    slug=f'bulk-post-{i}'
                ))
            
            # Create all at once
            Post.objects.bulk_create(posts_to_create, batch_size=100)
            
            # Method 3: Bulk update existing records
            posts_to_update = []
            
            for post in Post.objects.filter(slug__isnull=True):
                post.slug = slugify(post.title)
                posts_to_update.append(post)
            
            # Update in batches
            batch_size = 1000
            
            for i in range(0, len(posts_to_update), batch_size):
                batch = posts_to_update[i:i + batch_size]
                Post.objects.bulk_update(batch, ['slug'], batch_size=batch_size)
        
        return bulk_migration
    
    @staticmethod
    def raw_sql_migration():
        """Use raw SQL for maximum performance"""
        
        def raw_sql_migration(apps, schema_editor):
            """High-performance migration using raw SQL"""
            
            from django.db import connection
            
            with connection.cursor() as cursor:
                # Update slugs using SQL function
                cursor.execute("""
                    UPDATE blog_post 
                    SET slug = LOWER(REPLACE(REPLACE(title, ' ', '-'), '--', '-'))
                    WHERE slug IS NULL AND title IS NOT NULL
                """)
                
                affected_rows = cursor.rowcount
                print(f"Updated {affected_rows} posts with slugs using raw SQL")
                
                # Create indexes for better performance
                cursor.execute("""
                    CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_blog_post_slug 
                    ON blog_post(slug) WHERE slug IS NOT NULL
                """)
                
                # Analyze table statistics for query optimization
                cursor.execute("ANALYZE blog_post")
        
        return raw_sql_migration
    
    @staticmethod
    def memory_efficient_migration():
        """Memory-efficient migration for large datasets"""
        
        def memory_efficient_migration(apps, schema_editor):
            """Process large datasets without memory issues"""
            
            Post = apps.get_model('blog', 'Post')
            
            # Use iterator to avoid loading all objects into memory
            batch_size = 1000
            
            # Process in chunks
            total_count = Post.objects.count()
            processed = 0
            
            while processed < total_count:
                # Get batch using offset/limit
                batch = Post.objects.all()[processed:processed + batch_size]
                
                # Process batch
                posts_to_update = []
                
                for post in batch:
                    if not post.slug and post.title:
                        post.slug = slugify(post.title)
                        posts_to_update.append(post)
                
                # Bulk update batch
                if posts_to_update:
                    Post.objects.bulk_update(posts_to_update, ['slug'])
                
                processed += batch_size
                
                # Progress reporting
                progress = min(processed, total_count)
                print(f"Processed {progress}/{total_count} posts ({progress/total_count*100:.1f}%)")
                
                # Optional: Add small delay to reduce system load
                import time
                time.sleep(0.1)
        
        return memory_efficient_migration
    
    @staticmethod
    def parallel_migration():
        """Parallel processing for data migration"""
        
        def parallel_migration(apps, schema_editor):
            """Process data in parallel for better performance"""
            
            Post = apps.get_model('blog', 'Post')
            
            # Note: Be careful with parallel processing in migrations
            # This is an example - test thoroughly before using in production
            
            from concurrent.futures import ThreadPoolExecutor
            import threading
            
            def process_batch(batch_ids):
                """Process a batch of posts"""
                
                # Each thread needs its own database connection
                from django.db import connection
                connection.ensure_connection()
                
                batch_posts = Post.objects.filter(id__in=batch_ids)
                posts_to_update = []
                
                for post in batch_posts:
                    if not post.slug and post.title:
                        post.slug = slugify(post.title)
                        posts_to_update.append(post)
                
                if posts_to_update:
                    Post.objects.bulk_update(posts_to_update, ['slug'])
                
                return len(posts_to_update)
            
            # Get all post IDs
            all_post_ids = list(Post.objects.values_list('id', flat=True))
            
            # Split into batches
            batch_size = 1000
            batches = [
                all_post_ids[i:i + batch_size]
                for i in range(0, len(all_post_ids), batch_size)
            ]
            
            # Process batches in parallel
            total_updated = 0
            
            with ThreadPoolExecutor(max_workers=4) as executor:
                futures = [executor.submit(process_batch, batch) for batch in batches]
                
                for future in futures:
                    updated_count = future.result()
                    total_updated += updated_count
            
            print(f"Parallel migration completed: {total_updated} posts updated")
        
        return parallel_migration

# Data migration monitoring and logging
class DataMigrationMonitoring:
    """Monitor and log data migration progress"""
    
    @staticmethod
    def monitored_migration():
        """Data migration with comprehensive monitoring"""
        
        def monitored_migration(apps, schema_editor):
            """Migration with detailed monitoring and logging"""
            
            import time
            import logging
            import psutil
            
            logger = logging.getLogger(__name__)
            
            Post = apps.get_model('blog', 'Post')
            
            # Setup monitoring
            start_time = time.time()
            initial_memory = psutil.virtual_memory().percent
            
            logger.info("Starting data migration")
            logger.info(f"Initial memory usage: {initial_memory}%")
            
            total_posts = Post.objects.count()
            posts_to_migrate = Post.objects.filter(slug__isnull=True).count()
            
            logger.info(f"Total posts: {total_posts}")
            logger.info(f"Posts to migrate: {posts_to_migrate}")
            
            if posts_to_migrate == 0:
                logger.info("No posts need migration")
                return
            
            # Process with monitoring
            processed = 0
            errors = 0
            batch_size = 100
            
            for i in range(0, posts_to_migrate, batch_size):
                batch_start = time.time()
                
                batch_posts = Post.objects.filter(slug__isnull=True)[:batch_size]
                batch_processed = 0
                
                for post in batch_posts:
                    try:
                        post.slug = slugify(post.title)
                        post.save()
                        batch_processed += 1
                        processed += 1
                        
                    except Exception as e:
                        errors += 1
                        logger.error(f"Error processing post {post.id}: {e}")
                
                batch_time = time.time() - batch_start
                current_memory = psutil.virtual_memory().percent
                
                # Log batch progress
                logger.info(f"Batch {i//batch_size + 1}: {batch_processed} posts, "
                           f"{batch_time:.2f}s, memory: {current_memory}%")
                
                # Check for memory issues
                if current_memory > 90:
                    logger.warning(f"High memory usage: {current_memory}%")
                    
                    # Optional: pause migration
                    time.sleep(1)
            
            # Final statistics
            total_time = time.time() - start_time
            final_memory = psutil.virtual_memory().percent
            
            logger.info("Data migration completed")
            logger.info(f"Total time: {total_time:.2f} seconds")
            logger.info(f"Posts processed: {processed}")
            logger.info(f"Errors: {errors}")
            logger.info(f"Final memory usage: {final_memory}%")
            
            if errors > 0:
                logger.warning(f"Migration completed with {errors} errors")
        
        return monitored_migration

Data migrations are essential for maintaining data integrity during application evolution. Proper implementation with error handling, performance optimization, and monitoring ensures reliable data transformations in production environments.