Django's management command system provides a powerful way to create command-line tools for administrative tasks, data processing, and automation. This guide covers creating sophisticated management commands with advanced features like progress tracking, parallel processing, and integration with external systems.
# management/commands/advanced_command.py
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction
from django.utils import timezone
import logging
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Optional
class Command(BaseCommand):
"""Advanced management command with comprehensive features"""
help = 'Advanced command with progress tracking and error handling'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.logger = logging.getLogger(__name__)
self.start_time = None
self.processed_count = 0
self.error_count = 0
def add_arguments(self, parser):
"""Add command arguments with validation"""
parser.add_argument(
'--batch-size',
type=int,
default=100,
help='Number of items to process in each batch'
)
parser.add_argument(
'--workers',
type=int,
default=4,
help='Number of worker threads'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Show what would be done without making changes'
)
parser.add_argument(
'--verbose',
action='store_true',
help='Enable verbose output'
)
parser.add_argument(
'--filter',
type=str,
help='Filter criteria for processing'
)
parser.add_argument(
'--output-file',
type=str,
help='Output file for results'
)
parser.add_argument(
'--continue-on-error',
action='store_true',
help='Continue processing even if errors occur'
)
def handle(self, *args, **options):
"""Main command handler with comprehensive error handling"""
self.start_time = timezone.now()
self.options = options
try:
# Validate arguments
self.validate_arguments(options)
# Setup logging
self.setup_logging(options)
# Initialize progress tracking
self.setup_progress_tracking()
# Execute main logic
self.execute_command(options)
# Generate summary
self.print_summary()
except CommandError:
raise
except Exception as e:
self.logger.error(f"Unexpected error: {e}", exc_info=True)
raise CommandError(f"Command failed: {e}")
def validate_arguments(self, options):
"""Validate command arguments"""
if options['batch_size'] <= 0:
raise CommandError("Batch size must be positive")
if options['workers'] <= 0:
raise CommandError("Number of workers must be positive")
if options['output_file']:
import os
output_dir = os.path.dirname(options['output_file'])
if output_dir and not os.path.exists(output_dir):
raise CommandError(f"Output directory does not exist: {output_dir}")
def setup_logging(self, options):
"""Setup logging configuration"""
level = logging.DEBUG if options['verbose'] else logging.INFO
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(level)
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
self.logger.setLevel(level)
# File handler if output file specified
if options['output_file']:
log_file = options['output_file'].replace('.txt', '.log')
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
def setup_progress_tracking(self):
"""Initialize progress tracking"""
try:
from tqdm import tqdm
self.use_progress_bar = True
except ImportError:
self.use_progress_bar = False
self.stdout.write("Install tqdm for progress bars: pip install tqdm")
def execute_command(self, options):
"""Execute the main command logic"""
# Get items to process
items = self.get_items_to_process(options)
total_items = len(items)
if total_items == 0:
self.stdout.write("No items to process")
return
self.stdout.write(f"Processing {total_items} items...")
if options['dry_run']:
self.stdout.write("DRY RUN - No changes will be made")
# Process items in batches
batch_size = options['batch_size']
workers = options['workers']
if workers > 1:
self.process_items_parallel(items, batch_size, workers, options)
else:
self.process_items_sequential(items, batch_size, options)
def get_items_to_process(self, options) -> List:
"""Get items to process based on filters"""
from myapp.models import MyModel
queryset = MyModel.objects.all()
# Apply filters
if options['filter']:
# Parse filter criteria
filter_parts = options['filter'].split('=')
if len(filter_parts) == 2:
field, value = filter_parts
queryset = queryset.filter(**{field: value})
return list(queryset.values_list('id', flat=True))
def process_items_sequential(self, items: List, batch_size: int, options):
"""Process items sequentially"""
batches = [items[i:i + batch_size] for i in range(0, len(items), batch_size)]
if self.use_progress_bar:
from tqdm import tqdm
progress_bar = tqdm(total=len(items), desc="Processing")
for batch in batches:
try:
self.process_batch(batch, options)
if self.use_progress_bar:
progress_bar.update(len(batch))
else:
self.processed_count += len(batch)
self.print_progress()
except Exception as e:
self.error_count += 1
self.logger.error(f"Batch processing failed: {e}")
if not options['continue_on_error']:
raise
if self.use_progress_bar:
progress_bar.close()
def process_items_parallel(self, items: List, batch_size: int, workers: int, options):
"""Process items in parallel using thread pool"""
batches = [items[i:i + batch_size] for i in range(0, len(items), batch_size)]
if self.use_progress_bar:
from tqdm import tqdm
progress_bar = tqdm(total=len(items), desc="Processing")
with ThreadPoolExecutor(max_workers=workers) as executor:
# Submit all batches
future_to_batch = {
executor.submit(self.process_batch, batch, options): batch
for batch in batches
}
# Process completed batches
for future in as_completed(future_to_batch):
batch = future_to_batch[future]
try:
future.result()
if self.use_progress_bar:
progress_bar.update(len(batch))
else:
self.processed_count += len(batch)
self.print_progress()
except Exception as e:
self.error_count += 1
self.logger.error(f"Batch processing failed: {e}")
if not options['continue_on_error']:
# Cancel remaining futures
for f in future_to_batch:
f.cancel()
raise
if self.use_progress_bar:
progress_bar.close()
@transaction.atomic
def process_batch(self, batch_ids: List[int], options):
"""Process a batch of items"""
from myapp.models import MyModel
items = MyModel.objects.filter(id__in=batch_ids)
for item in items:
try:
self.process_item(item, options)
except Exception as e:
self.logger.error(f"Failed to process item {item.id}: {e}")
raise
def process_item(self, item, options):
"""Process a single item"""
if options['dry_run']:
self.logger.info(f"Would process item {item.id}")
return
# Actual processing logic here
item.processed = True
item.processed_at = timezone.now()
item.save()
self.logger.debug(f"Processed item {item.id}")
def print_progress(self):
"""Print progress information"""
elapsed = timezone.now() - self.start_time
rate = self.processed_count / elapsed.total_seconds() if elapsed.total_seconds() > 0 else 0
self.stdout.write(
f"Processed: {self.processed_count}, "
f"Errors: {self.error_count}, "
f"Rate: {rate:.2f} items/sec"
)
def print_summary(self):
"""Print command execution summary"""
elapsed = timezone.now() - self.start_time
self.stdout.write(
self.style.SUCCESS(
f"\nCommand completed successfully!\n"
f"Processed: {self.processed_count} items\n"
f"Errors: {self.error_count}\n"
f"Duration: {elapsed}\n"
f"Average rate: {self.processed_count / elapsed.total_seconds():.2f} items/sec"
)
)
# management/commands/migrate_data.py
class Command(BaseCommand):
"""Command for complex data migrations"""
help = 'Migrate data between models or external systems'
def add_arguments(self, parser):
parser.add_argument('--source', required=True, help='Source model or file')
parser.add_argument('--target', required=True, help='Target model')
parser.add_argument('--mapping', help='Field mapping configuration file')
parser.add_argument('--validate', action='store_true', help='Validate data before migration')
parser.add_argument('--rollback', action='store_true', help='Rollback previous migration')
def handle(self, *args, **options):
if options['rollback']:
self.rollback_migration(options)
else:
self.execute_migration(options)
def execute_migration(self, options):
"""Execute data migration"""
# Load mapping configuration
mapping = self.load_mapping(options['mapping'])
# Get source data
source_data = self.get_source_data(options['source'])
# Validate if requested
if options['validate']:
self.validate_data(source_data, mapping)
# Create migration record
migration_record = self.create_migration_record(options)
try:
# Migrate data
migrated_count = self.migrate_data(source_data, mapping, options['target'])
# Update migration record
migration_record.status = 'completed'
migration_record.migrated_count = migrated_count
migration_record.save()
self.stdout.write(f"Successfully migrated {migrated_count} records")
except Exception as e:
migration_record.status = 'failed'
migration_record.error_message = str(e)
migration_record.save()
raise
def load_mapping(self, mapping_file):
"""Load field mapping configuration"""
if not mapping_file:
return {}
import json
with open(mapping_file, 'r') as f:
return json.load(f)
def get_source_data(self, source):
"""Get data from source"""
if source.endswith('.csv'):
return self.read_csv_data(source)
elif source.endswith('.json'):
return self.read_json_data(source)
else:
# Assume it's a model name
return self.read_model_data(source)
def migrate_data(self, source_data, mapping, target_model):
"""Migrate data to target model"""
from django.apps import apps
target_class = apps.get_model(target_model)
migrated_count = 0
for record in source_data:
# Apply field mapping
mapped_data = self.apply_mapping(record, mapping)
# Create target record
target_class.objects.create(**mapped_data)
migrated_count += 1
return migrated_count
# management/commands/export_data.py
class Command(BaseCommand):
"""Export data to various formats"""
help = 'Export model data to CSV, JSON, or XML'
def add_arguments(self, parser):
parser.add_argument('model', help='Model to export (app.Model)')
parser.add_argument('--format', choices=['csv', 'json', 'xml'], default='csv')
parser.add_argument('--output', required=True, help='Output file path')
parser.add_argument('--fields', help='Comma-separated list of fields to export')
parser.add_argument('--filter', help='Filter expression')
parser.add_argument('--compress', action='store_true', help='Compress output file')
def handle(self, *args, **options):
# Get model class
from django.apps import apps
model_class = apps.get_model(options['model'])
# Build queryset
queryset = model_class.objects.all()
if options['filter']:
# Parse and apply filter
queryset = self.apply_filter(queryset, options['filter'])
# Select fields
if options['fields']:
fields = options['fields'].split(',')
queryset = queryset.values(*fields)
# Export data
if options['format'] == 'csv':
self.export_csv(queryset, options['output'], options['compress'])
elif options['format'] == 'json':
self.export_json(queryset, options['output'], options['compress'])
elif options['format'] == 'xml':
self.export_xml(queryset, options['output'], options['compress'])
self.stdout.write(f"Exported {queryset.count()} records to {options['output']}")
def export_csv(self, queryset, output_file, compress):
"""Export to CSV format"""
import csv
import gzip
open_func = gzip.open if compress else open
mode = 'wt' if compress else 'w'
with open_func(output_file, mode, newline='') as csvfile:
if queryset.exists():
fieldnames = queryset.first().keys()
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for row in queryset:
writer.writerow(row)
def export_json(self, queryset, output_file, compress):
"""Export to JSON format"""
import json
import gzip
data = list(queryset)
if compress:
with gzip.open(output_file, 'wt') as f:
json.dump(data, f, indent=2, default=str)
else:
with open(output_file, 'w') as f:
json.dump(data, f, indent=2, default=str)
# management/commands/interactive_setup.py
class Command(BaseCommand):
"""Interactive setup command"""
help = 'Interactive application setup'
def handle(self, *args, **options):
self.stdout.write("Welcome to the interactive setup!")
# Database setup
if self.confirm("Set up database?"):
self.setup_database()
# Create superuser
if self.confirm("Create superuser?"):
self.create_superuser()
# Load sample data
if self.confirm("Load sample data?"):
self.load_sample_data()
# Configure settings
if self.confirm("Configure application settings?"):
self.configure_settings()
self.stdout.write(self.style.SUCCESS("Setup completed!"))
def confirm(self, question):
"""Ask for user confirmation"""
while True:
response = input(f"{question} (y/n): ").lower().strip()
if response in ['y', 'yes']:
return True
elif response in ['n', 'no']:
return False
else:
self.stdout.write("Please enter 'y' or 'n'")
def get_input(self, prompt, default=None, validator=None):
"""Get user input with validation"""
while True:
if default:
response = input(f"{prompt} [{default}]: ").strip()
if not response:
response = default
else:
response = input(f"{prompt}: ").strip()
if validator:
try:
validator(response)
return response
except ValueError as e:
self.stdout.write(f"Invalid input: {e}")
else:
return response
def setup_database(self):
"""Interactive database setup"""
self.stdout.write("Setting up database...")
# Run migrations
from django.core.management import call_command
call_command('migrate', verbosity=0)
self.stdout.write("Database setup completed")
def create_superuser(self):
"""Interactive superuser creation"""
from django.contrib.auth.models import User
username = self.get_input("Username", default="admin")
email = self.get_input("Email")
if User.objects.filter(username=username).exists():
self.stdout.write(f"User {username} already exists")
return
password = self.get_input("Password")
User.objects.create_superuser(username, email, password)
self.stdout.write(f"Superuser {username} created")
def configure_settings(self):
"""Interactive settings configuration"""
settings_file = 'local_settings.py'
self.stdout.write(f"Configuring {settings_file}...")
# Collect settings
debug = self.confirm("Enable debug mode?")
secret_key = self.get_input("Secret key", default="generate-random-key")
# Write settings file
with open(settings_file, 'w') as f:
f.write(f"DEBUG = {debug}\n")
f.write(f"SECRET_KEY = '{secret_key}'\n")
self.stdout.write(f"Settings written to {settings_file}")
# management/commands/health_check.py
class Command(BaseCommand):
"""System health check command"""
help = 'Perform comprehensive system health check'
def add_arguments(self, parser):
parser.add_argument('--format', choices=['text', 'json'], default='text')
parser.add_argument('--output', help='Output file')
def handle(self, *args, **options):
checks = [
('Database', self.check_database),
('Cache', self.check_cache),
('Storage', self.check_storage),
('External Services', self.check_external_services),
('Permissions', self.check_permissions),
]
results = {}
for check_name, check_func in checks:
self.stdout.write(f"Checking {check_name}...")
try:
result = check_func()
results[check_name] = result
status = "OK" if result['status'] else "FAIL"
self.stdout.write(f" {status}: {result['message']}")
except Exception as e:
results[check_name] = {'status': False, 'message': str(e)}
self.stdout.write(f" ERROR: {e}")
# Output results
if options['format'] == 'json':
self.output_json(results, options['output'])
else:
self.output_text(results, options['output'])
def check_database(self):
"""Check database connectivity"""
from django.db import connection
try:
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
return {'status': True, 'message': 'Database connection OK'}
except Exception as e:
return {'status': False, 'message': f'Database error: {e}'}
def check_cache(self):
"""Check cache functionality"""
from django.core.cache import cache
try:
test_key = 'health_check_test'
test_value = 'test_value'
cache.set(test_key, test_value, 60)
retrieved_value = cache.get(test_key)
if retrieved_value == test_value:
cache.delete(test_key)
return {'status': True, 'message': 'Cache working correctly'}
else:
return {'status': False, 'message': 'Cache value mismatch'}
except Exception as e:
return {'status': False, 'message': f'Cache error: {e}'}
def check_storage(self):
"""Check file storage"""
from django.core.files.storage import default_storage
import tempfile
try:
# Test file write/read
test_content = b'health check test'
test_file = tempfile.NamedTemporaryFile(delete=False)
test_file.write(test_content)
test_file.close()
# Save to storage
saved_name = default_storage.save('health_check_test.txt', open(test_file.name, 'rb'))
# Read from storage
with default_storage.open(saved_name, 'rb') as f:
read_content = f.read()
# Cleanup
default_storage.delete(saved_name)
if read_content == test_content:
return {'status': True, 'message': 'Storage working correctly'}
else:
return {'status': False, 'message': 'Storage content mismatch'}
except Exception as e:
return {'status': False, 'message': f'Storage error: {e}'}
Custom management commands provide powerful automation capabilities for Django applications. They enable complex data processing, system maintenance, and administrative tasks while integrating seamlessly with Django's ecosystem and providing robust error handling and progress tracking.
Custom ORM Expressions
Django's ORM provides powerful expression APIs that allow you to create custom database expressions, functions, and operations. This enables sophisticated database queries while maintaining the benefits of Django's ORM abstraction.
Working with Signals
Django signals provide a decoupled way to allow certain senders to notify a set of receivers when actions occur. This comprehensive guide covers advanced signal patterns, performance considerations, and best practices for building event-driven Django applications.