Advanced and Expert Topics

Custom Management Commands

Django's management command system provides a powerful way to create command-line tools for administrative tasks, data processing, and automation. This guide covers creating sophisticated management commands with advanced features like progress tracking, parallel processing, and integration with external systems.

Custom Management Commands

Django's management command system provides a powerful way to create command-line tools for administrative tasks, data processing, and automation. This guide covers creating sophisticated management commands with advanced features like progress tracking, parallel processing, and integration with external systems.

Advanced Command Structure

# management/commands/advanced_command.py
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction
from django.utils import timezone
import logging
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Optional

class Command(BaseCommand):
    """Advanced management command with comprehensive features"""
    
    help = 'Advanced command with progress tracking and error handling'
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.logger = logging.getLogger(__name__)
        self.start_time = None
        self.processed_count = 0
        self.error_count = 0
    
    def add_arguments(self, parser):
        """Add command arguments with validation"""
        parser.add_argument(
            '--batch-size',
            type=int,
            default=100,
            help='Number of items to process in each batch'
        )
        
        parser.add_argument(
            '--workers',
            type=int,
            default=4,
            help='Number of worker threads'
        )
        
        parser.add_argument(
            '--dry-run',
            action='store_true',
            help='Show what would be done without making changes'
        )
        
        parser.add_argument(
            '--verbose',
            action='store_true',
            help='Enable verbose output'
        )
        
        parser.add_argument(
            '--filter',
            type=str,
            help='Filter criteria for processing'
        )
        
        parser.add_argument(
            '--output-file',
            type=str,
            help='Output file for results'
        )
        
        parser.add_argument(
            '--continue-on-error',
            action='store_true',
            help='Continue processing even if errors occur'
        )
    
    def handle(self, *args, **options):
        """Main command handler with comprehensive error handling"""
        self.start_time = timezone.now()
        self.options = options
        
        try:
            # Validate arguments
            self.validate_arguments(options)
            
            # Setup logging
            self.setup_logging(options)
            
            # Initialize progress tracking
            self.setup_progress_tracking()
            
            # Execute main logic
            self.execute_command(options)
            
            # Generate summary
            self.print_summary()
            
        except CommandError:
            raise
        except Exception as e:
            self.logger.error(f"Unexpected error: {e}", exc_info=True)
            raise CommandError(f"Command failed: {e}")
    
    def validate_arguments(self, options):
        """Validate command arguments"""
        if options['batch_size'] <= 0:
            raise CommandError("Batch size must be positive")
        
        if options['workers'] <= 0:
            raise CommandError("Number of workers must be positive")
        
        if options['output_file']:
            import os
            output_dir = os.path.dirname(options['output_file'])
            if output_dir and not os.path.exists(output_dir):
                raise CommandError(f"Output directory does not exist: {output_dir}")
    
    def setup_logging(self, options):
        """Setup logging configuration"""
        level = logging.DEBUG if options['verbose'] else logging.INFO
        
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        
        # Console handler
        console_handler = logging.StreamHandler()
        console_handler.setLevel(level)
        console_handler.setFormatter(formatter)
        
        self.logger.addHandler(console_handler)
        self.logger.setLevel(level)
        
        # File handler if output file specified
        if options['output_file']:
            log_file = options['output_file'].replace('.txt', '.log')
            file_handler = logging.FileHandler(log_file)
            file_handler.setLevel(logging.DEBUG)
            file_handler.setFormatter(formatter)
            self.logger.addHandler(file_handler)
    
    def setup_progress_tracking(self):
        """Initialize progress tracking"""
        try:
            from tqdm import tqdm
            self.use_progress_bar = True
        except ImportError:
            self.use_progress_bar = False
            self.stdout.write("Install tqdm for progress bars: pip install tqdm")
    
    def execute_command(self, options):
        """Execute the main command logic"""
        # Get items to process
        items = self.get_items_to_process(options)
        total_items = len(items)
        
        if total_items == 0:
            self.stdout.write("No items to process")
            return
        
        self.stdout.write(f"Processing {total_items} items...")
        
        if options['dry_run']:
            self.stdout.write("DRY RUN - No changes will be made")
        
        # Process items in batches
        batch_size = options['batch_size']
        workers = options['workers']
        
        if workers > 1:
            self.process_items_parallel(items, batch_size, workers, options)
        else:
            self.process_items_sequential(items, batch_size, options)
    
    def get_items_to_process(self, options) -> List:
        """Get items to process based on filters"""
        from myapp.models import MyModel
        
        queryset = MyModel.objects.all()
        
        # Apply filters
        if options['filter']:
            # Parse filter criteria
            filter_parts = options['filter'].split('=')
            if len(filter_parts) == 2:
                field, value = filter_parts
                queryset = queryset.filter(**{field: value})
        
        return list(queryset.values_list('id', flat=True))
    
    def process_items_sequential(self, items: List, batch_size: int, options):
        """Process items sequentially"""
        batches = [items[i:i + batch_size] for i in range(0, len(items), batch_size)]
        
        if self.use_progress_bar:
            from tqdm import tqdm
            progress_bar = tqdm(total=len(items), desc="Processing")
        
        for batch in batches:
            try:
                self.process_batch(batch, options)
                
                if self.use_progress_bar:
                    progress_bar.update(len(batch))
                else:
                    self.processed_count += len(batch)
                    self.print_progress()
                    
            except Exception as e:
                self.error_count += 1
                self.logger.error(f"Batch processing failed: {e}")
                
                if not options['continue_on_error']:
                    raise
        
        if self.use_progress_bar:
            progress_bar.close()
    
    def process_items_parallel(self, items: List, batch_size: int, workers: int, options):
        """Process items in parallel using thread pool"""
        batches = [items[i:i + batch_size] for i in range(0, len(items), batch_size)]
        
        if self.use_progress_bar:
            from tqdm import tqdm
            progress_bar = tqdm(total=len(items), desc="Processing")
        
        with ThreadPoolExecutor(max_workers=workers) as executor:
            # Submit all batches
            future_to_batch = {
                executor.submit(self.process_batch, batch, options): batch
                for batch in batches
            }
            
            # Process completed batches
            for future in as_completed(future_to_batch):
                batch = future_to_batch[future]
                
                try:
                    future.result()
                    
                    if self.use_progress_bar:
                        progress_bar.update(len(batch))
                    else:
                        self.processed_count += len(batch)
                        self.print_progress()
                        
                except Exception as e:
                    self.error_count += 1
                    self.logger.error(f"Batch processing failed: {e}")
                    
                    if not options['continue_on_error']:
                        # Cancel remaining futures
                        for f in future_to_batch:
                            f.cancel()
                        raise
        
        if self.use_progress_bar:
            progress_bar.close()
    
    @transaction.atomic
    def process_batch(self, batch_ids: List[int], options):
        """Process a batch of items"""
        from myapp.models import MyModel
        
        items = MyModel.objects.filter(id__in=batch_ids)
        
        for item in items:
            try:
                self.process_item(item, options)
            except Exception as e:
                self.logger.error(f"Failed to process item {item.id}: {e}")
                raise
    
    def process_item(self, item, options):
        """Process a single item"""
        if options['dry_run']:
            self.logger.info(f"Would process item {item.id}")
            return
        
        # Actual processing logic here
        item.processed = True
        item.processed_at = timezone.now()
        item.save()
        
        self.logger.debug(f"Processed item {item.id}")
    
    def print_progress(self):
        """Print progress information"""
        elapsed = timezone.now() - self.start_time
        rate = self.processed_count / elapsed.total_seconds() if elapsed.total_seconds() > 0 else 0
        
        self.stdout.write(
            f"Processed: {self.processed_count}, "
            f"Errors: {self.error_count}, "
            f"Rate: {rate:.2f} items/sec"
        )
    
    def print_summary(self):
        """Print command execution summary"""
        elapsed = timezone.now() - self.start_time
        
        self.stdout.write(
            self.style.SUCCESS(
                f"\nCommand completed successfully!\n"
                f"Processed: {self.processed_count} items\n"
                f"Errors: {self.error_count}\n"
                f"Duration: {elapsed}\n"
                f"Average rate: {self.processed_count / elapsed.total_seconds():.2f} items/sec"
            )
        )

Specialized Command Types

Data Migration Command

# management/commands/migrate_data.py
class Command(BaseCommand):
    """Command for complex data migrations"""
    
    help = 'Migrate data between models or external systems'
    
    def add_arguments(self, parser):
        parser.add_argument('--source', required=True, help='Source model or file')
        parser.add_argument('--target', required=True, help='Target model')
        parser.add_argument('--mapping', help='Field mapping configuration file')
        parser.add_argument('--validate', action='store_true', help='Validate data before migration')
        parser.add_argument('--rollback', action='store_true', help='Rollback previous migration')
    
    def handle(self, *args, **options):
        if options['rollback']:
            self.rollback_migration(options)
        else:
            self.execute_migration(options)
    
    def execute_migration(self, options):
        """Execute data migration"""
        # Load mapping configuration
        mapping = self.load_mapping(options['mapping'])
        
        # Get source data
        source_data = self.get_source_data(options['source'])
        
        # Validate if requested
        if options['validate']:
            self.validate_data(source_data, mapping)
        
        # Create migration record
        migration_record = self.create_migration_record(options)
        
        try:
            # Migrate data
            migrated_count = self.migrate_data(source_data, mapping, options['target'])
            
            # Update migration record
            migration_record.status = 'completed'
            migration_record.migrated_count = migrated_count
            migration_record.save()
            
            self.stdout.write(f"Successfully migrated {migrated_count} records")
            
        except Exception as e:
            migration_record.status = 'failed'
            migration_record.error_message = str(e)
            migration_record.save()
            raise
    
    def load_mapping(self, mapping_file):
        """Load field mapping configuration"""
        if not mapping_file:
            return {}
        
        import json
        with open(mapping_file, 'r') as f:
            return json.load(f)
    
    def get_source_data(self, source):
        """Get data from source"""
        if source.endswith('.csv'):
            return self.read_csv_data(source)
        elif source.endswith('.json'):
            return self.read_json_data(source)
        else:
            # Assume it's a model name
            return self.read_model_data(source)
    
    def migrate_data(self, source_data, mapping, target_model):
        """Migrate data to target model"""
        from django.apps import apps
        
        target_class = apps.get_model(target_model)
        migrated_count = 0
        
        for record in source_data:
            # Apply field mapping
            mapped_data = self.apply_mapping(record, mapping)
            
            # Create target record
            target_class.objects.create(**mapped_data)
            migrated_count += 1
        
        return migrated_count

# management/commands/export_data.py
class Command(BaseCommand):
    """Export data to various formats"""
    
    help = 'Export model data to CSV, JSON, or XML'
    
    def add_arguments(self, parser):
        parser.add_argument('model', help='Model to export (app.Model)')
        parser.add_argument('--format', choices=['csv', 'json', 'xml'], default='csv')
        parser.add_argument('--output', required=True, help='Output file path')
        parser.add_argument('--fields', help='Comma-separated list of fields to export')
        parser.add_argument('--filter', help='Filter expression')
        parser.add_argument('--compress', action='store_true', help='Compress output file')
    
    def handle(self, *args, **options):
        # Get model class
        from django.apps import apps
        model_class = apps.get_model(options['model'])
        
        # Build queryset
        queryset = model_class.objects.all()
        
        if options['filter']:
            # Parse and apply filter
            queryset = self.apply_filter(queryset, options['filter'])
        
        # Select fields
        if options['fields']:
            fields = options['fields'].split(',')
            queryset = queryset.values(*fields)
        
        # Export data
        if options['format'] == 'csv':
            self.export_csv(queryset, options['output'], options['compress'])
        elif options['format'] == 'json':
            self.export_json(queryset, options['output'], options['compress'])
        elif options['format'] == 'xml':
            self.export_xml(queryset, options['output'], options['compress'])
        
        self.stdout.write(f"Exported {queryset.count()} records to {options['output']}")
    
    def export_csv(self, queryset, output_file, compress):
        """Export to CSV format"""
        import csv
        import gzip
        
        open_func = gzip.open if compress else open
        mode = 'wt' if compress else 'w'
        
        with open_func(output_file, mode, newline='') as csvfile:
            if queryset.exists():
                fieldnames = queryset.first().keys()
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                writer.writeheader()
                
                for row in queryset:
                    writer.writerow(row)
    
    def export_json(self, queryset, output_file, compress):
        """Export to JSON format"""
        import json
        import gzip
        
        data = list(queryset)
        
        if compress:
            with gzip.open(output_file, 'wt') as f:
                json.dump(data, f, indent=2, default=str)
        else:
            with open(output_file, 'w') as f:
                json.dump(data, f, indent=2, default=str)

Interactive Commands

# management/commands/interactive_setup.py
class Command(BaseCommand):
    """Interactive setup command"""
    
    help = 'Interactive application setup'
    
    def handle(self, *args, **options):
        self.stdout.write("Welcome to the interactive setup!")
        
        # Database setup
        if self.confirm("Set up database?"):
            self.setup_database()
        
        # Create superuser
        if self.confirm("Create superuser?"):
            self.create_superuser()
        
        # Load sample data
        if self.confirm("Load sample data?"):
            self.load_sample_data()
        
        # Configure settings
        if self.confirm("Configure application settings?"):
            self.configure_settings()
        
        self.stdout.write(self.style.SUCCESS("Setup completed!"))
    
    def confirm(self, question):
        """Ask for user confirmation"""
        while True:
            response = input(f"{question} (y/n): ").lower().strip()
            if response in ['y', 'yes']:
                return True
            elif response in ['n', 'no']:
                return False
            else:
                self.stdout.write("Please enter 'y' or 'n'")
    
    def get_input(self, prompt, default=None, validator=None):
        """Get user input with validation"""
        while True:
            if default:
                response = input(f"{prompt} [{default}]: ").strip()
                if not response:
                    response = default
            else:
                response = input(f"{prompt}: ").strip()
            
            if validator:
                try:
                    validator(response)
                    return response
                except ValueError as e:
                    self.stdout.write(f"Invalid input: {e}")
            else:
                return response
    
    def setup_database(self):
        """Interactive database setup"""
        self.stdout.write("Setting up database...")
        
        # Run migrations
        from django.core.management import call_command
        call_command('migrate', verbosity=0)
        
        self.stdout.write("Database setup completed")
    
    def create_superuser(self):
        """Interactive superuser creation"""
        from django.contrib.auth.models import User
        
        username = self.get_input("Username", default="admin")
        email = self.get_input("Email")
        
        if User.objects.filter(username=username).exists():
            self.stdout.write(f"User {username} already exists")
            return
        
        password = self.get_input("Password")
        
        User.objects.create_superuser(username, email, password)
        self.stdout.write(f"Superuser {username} created")
    
    def configure_settings(self):
        """Interactive settings configuration"""
        settings_file = 'local_settings.py'
        
        self.stdout.write(f"Configuring {settings_file}...")
        
        # Collect settings
        debug = self.confirm("Enable debug mode?")
        secret_key = self.get_input("Secret key", default="generate-random-key")
        
        # Write settings file
        with open(settings_file, 'w') as f:
            f.write(f"DEBUG = {debug}\n")
            f.write(f"SECRET_KEY = '{secret_key}'\n")
        
        self.stdout.write(f"Settings written to {settings_file}")

# management/commands/health_check.py
class Command(BaseCommand):
    """System health check command"""
    
    help = 'Perform comprehensive system health check'
    
    def add_arguments(self, parser):
        parser.add_argument('--format', choices=['text', 'json'], default='text')
        parser.add_argument('--output', help='Output file')
    
    def handle(self, *args, **options):
        checks = [
            ('Database', self.check_database),
            ('Cache', self.check_cache),
            ('Storage', self.check_storage),
            ('External Services', self.check_external_services),
            ('Permissions', self.check_permissions),
        ]
        
        results = {}
        
        for check_name, check_func in checks:
            self.stdout.write(f"Checking {check_name}...")
            try:
                result = check_func()
                results[check_name] = result
                status = "OK" if result['status'] else "FAIL"
                self.stdout.write(f"  {status}: {result['message']}")
            except Exception as e:
                results[check_name] = {'status': False, 'message': str(e)}
                self.stdout.write(f"  ERROR: {e}")
        
        # Output results
        if options['format'] == 'json':
            self.output_json(results, options['output'])
        else:
            self.output_text(results, options['output'])
    
    def check_database(self):
        """Check database connectivity"""
        from django.db import connection
        
        try:
            with connection.cursor() as cursor:
                cursor.execute("SELECT 1")
                return {'status': True, 'message': 'Database connection OK'}
        except Exception as e:
            return {'status': False, 'message': f'Database error: {e}'}
    
    def check_cache(self):
        """Check cache functionality"""
        from django.core.cache import cache
        
        try:
            test_key = 'health_check_test'
            test_value = 'test_value'
            
            cache.set(test_key, test_value, 60)
            retrieved_value = cache.get(test_key)
            
            if retrieved_value == test_value:
                cache.delete(test_key)
                return {'status': True, 'message': 'Cache working correctly'}
            else:
                return {'status': False, 'message': 'Cache value mismatch'}
        except Exception as e:
            return {'status': False, 'message': f'Cache error: {e}'}
    
    def check_storage(self):
        """Check file storage"""
        from django.core.files.storage import default_storage
        import tempfile
        
        try:
            # Test file write/read
            test_content = b'health check test'
            test_file = tempfile.NamedTemporaryFile(delete=False)
            test_file.write(test_content)
            test_file.close()
            
            # Save to storage
            saved_name = default_storage.save('health_check_test.txt', open(test_file.name, 'rb'))
            
            # Read from storage
            with default_storage.open(saved_name, 'rb') as f:
                read_content = f.read()
            
            # Cleanup
            default_storage.delete(saved_name)
            
            if read_content == test_content:
                return {'status': True, 'message': 'Storage working correctly'}
            else:
                return {'status': False, 'message': 'Storage content mismatch'}
        except Exception as e:
            return {'status': False, 'message': f'Storage error: {e}'}

Custom management commands provide powerful automation capabilities for Django applications. They enable complex data processing, system maintenance, and administrative tasks while integrating seamlessly with Django's ecosystem and providing robust error handling and progress tracking.