Advanced and Expert Topics

Working with Signals

Django signals provide a decoupled way to allow certain senders to notify a set of receivers when actions occur. This comprehensive guide covers advanced signal patterns, performance considerations, and best practices for building event-driven Django applications.

Working with Signals

Django signals provide a decoupled way to allow certain senders to notify a set of receivers when actions occur. This comprehensive guide covers advanced signal patterns, performance considerations, and best practices for building event-driven Django applications.

Advanced Signal Patterns

Custom Signal Creation

# signals.py
import django.dispatch
from django.dispatch import Signal

# Custom signals for business events
user_profile_completed = Signal()
order_status_changed = Signal()
payment_processed = Signal()
inventory_low = Signal()
user_activity_detected = Signal()

# Signal with custom arguments
class OrderSignal(Signal):
    """Custom signal class for order events"""
    
    def __init__(self, providing_args=None, use_caching=False):
        super().__init__(providing_args, use_caching)
        self.order_events = []
    
    def send(self, sender, **named):
        """Override send to log events"""
        self.order_events.append({
            'sender': sender,
            'timestamp': timezone.now(),
            'data': named
        })
        return super().send(sender, **named)
    
    def get_recent_events(self, limit=10):
        """Get recent order events"""
        return self.order_events[-limit:]

order_event = OrderSignal()

# Usage in models
from django.db import models
from django.db.models.signals import post_save, pre_save, post_delete
from django.dispatch import receiver

class Order(models.Model):
    customer = models.ForeignKey(User, on_delete=models.CASCADE)
    status = models.CharField(max_length=20, default='pending')
    total = models.DecimalField(max_digits=10, decimal_places=2)
    created_at = models.DateTimeField(auto_now_add=True)
    
    def save(self, *args, **kwargs):
        # Check if status changed
        if self.pk:
            old_instance = Order.objects.get(pk=self.pk)
            if old_instance.status != self.status:
                # Status changed, will trigger signal after save
                self._status_changed = True
                self._old_status = old_instance.status
        
        super().save(*args, **kwargs)
        
        # Send custom signal if status changed
        if hasattr(self, '_status_changed'):
            order_status_changed.send(
                sender=self.__class__,
                instance=self,
                old_status=self._old_status,
                new_status=self.status
            )

# Advanced signal handlers
@receiver(order_status_changed)
def handle_order_status_change(sender, instance, old_status, new_status, **kwargs):
    """Handle order status changes with business logic"""
    
    # Log status change
    OrderStatusLog.objects.create(
        order=instance,
        old_status=old_status,
        new_status=new_status,
        changed_at=timezone.now()
    )
    
    # Business logic based on status
    if new_status == 'confirmed':
        # Reserve inventory
        reserve_inventory_for_order(instance)
        
        # Send confirmation email
        send_order_confirmation_email(instance)
        
        # Update customer statistics
        update_customer_order_stats(instance.customer)
    
    elif new_status == 'shipped':
        # Generate tracking number
        generate_tracking_number(instance)
        
        # Send shipping notification
        send_shipping_notification(instance)
    
    elif new_status == 'cancelled':
        # Release inventory
        release_inventory_for_order(instance)
        
        # Process refund if payment was made
        if instance.payment_status == 'paid':
            process_refund(instance)

@receiver(post_save, sender=User)
def handle_user_creation(sender, instance, created, **kwargs):
    """Handle user creation with conditional logic"""
    if created:
        # Create user profile
        UserProfile.objects.create(user=instance)
        
        # Send welcome email
        send_welcome_email(instance)
        
        # Track user registration
        track_user_registration(instance)
        
        # Add to default groups
        add_user_to_default_groups(instance)

Signal Middleware and Context

# middleware.py
class SignalContextMiddleware:
    """Middleware to provide context for signals"""
    
    def __init__(self, get_response):
        self.get_response = get_response
    
    def __call__(self, request):
        # Set signal context
        SignalContext.set_request(request)
        
        try:
            response = self.get_response(request)
            return response
        finally:
            # Clear signal context
            SignalContext.clear()

class SignalContext:
    """Thread-local context for signals"""
    
    _context = threading.local()
    
    @classmethod
    def set_request(cls, request):
        """Set current request in context"""
        cls._context.request = request
        cls._context.user = getattr(request, 'user', None)
        cls._context.ip_address = cls._get_client_ip(request)
    
    @classmethod
    def get_request(cls):
        """Get current request from context"""
        return getattr(cls._context, 'request', None)
    
    @classmethod
    def get_user(cls):
        """Get current user from context"""
        return getattr(cls._context, 'user', None)
    
    @classmethod
    def get_ip_address(cls):
        """Get current IP address from context"""
        return getattr(cls._context, 'ip_address', None)
    
    @classmethod
    def clear(cls):
        """Clear context"""
        for attr in ['request', 'user', 'ip_address']:
            if hasattr(cls._context, attr):
                delattr(cls._context, attr)
    
    @staticmethod
    def _get_client_ip(request):
        """Extract client IP from request"""
        x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
        if x_forwarded_for:
            ip = x_forwarded_for.split(',')[0]
        else:
            ip = request.META.get('REMOTE_ADDR')
        return ip

# Using context in signal handlers
@receiver(post_save, sender=Order)
def log_order_creation(sender, instance, created, **kwargs):
    """Log order creation with context"""
    if created:
        user = SignalContext.get_user()
        ip_address = SignalContext.get_ip_address()
        
        OrderCreationLog.objects.create(
            order=instance,
            created_by=user,
            ip_address=ip_address,
            user_agent=SignalContext.get_request().META.get('HTTP_USER_AGENT', ''),
            created_at=timezone.now()
        )

Conditional Signal Handling

# Conditional signal decorators
def signal_condition(condition_func):
    """Decorator to conditionally execute signal handlers"""
    def decorator(handler_func):
        def wrapper(sender, **kwargs):
            if condition_func(sender, **kwargs):
                return handler_func(sender, **kwargs)
        return wrapper
    return decorator

def only_for_models(*model_classes):
    """Only execute signal for specific models"""
    def condition(sender, **kwargs):
        return sender in model_classes
    return signal_condition(condition)

def only_when_field_changed(field_name):
    """Only execute when specific field changed"""
    def condition(sender, instance, **kwargs):
        if not instance.pk:
            return True  # New instance
        
        try:
            old_instance = sender.objects.get(pk=instance.pk)
            return getattr(old_instance, field_name) != getattr(instance, field_name)
        except sender.DoesNotExist:
            return True
    
    return signal_condition(condition)

def only_in_environment(*environments):
    """Only execute in specific environments"""
    def condition(sender, **kwargs):
        from django.conf import settings
        return settings.ENVIRONMENT in environments
    return signal_condition(condition)

# Usage examples
@receiver(post_save)
@only_for_models(User, Customer)
@only_in_environment('production', 'staging')
def send_notification_email(sender, instance, created, **kwargs):
    """Send notification only for specific models in specific environments"""
    if created:
        send_email_notification(instance)

@receiver(post_save, sender=Product)
@only_when_field_changed('price')
def handle_price_change(sender, instance, **kwargs):
    """Handle product price changes"""
    # Log price change
    PriceChangeLog.objects.create(
        product=instance,
        old_price=sender.objects.get(pk=instance.pk).price,
        new_price=instance.price,
        changed_at=timezone.now()
    )
    
    # Notify subscribers
    notify_price_change_subscribers(instance)

Bulk Operation Signals

# Custom signals for bulk operations
bulk_created = Signal()
bulk_updated = Signal()
bulk_deleted = Signal()

class BulkSignalManager(models.Manager):
    """Manager that sends signals for bulk operations"""
    
    def bulk_create(self, objs, batch_size=None, ignore_conflicts=False):
        """Bulk create with signal"""
        created_objs = super().bulk_create(objs, batch_size, ignore_conflicts)
        
        # Send bulk created signal
        bulk_created.send(
            sender=self.model,
            instances=created_objs,
            batch_size=batch_size
        )
        
        return created_objs
    
    def bulk_update(self, objs, fields, batch_size=None):
        """Bulk update with signal"""
        updated_count = super().bulk_update(objs, fields, batch_size)
        
        # Send bulk updated signal
        bulk_updated.send(
            sender=self.model,
            instances=objs,
            fields=fields,
            updated_count=updated_count
        )
        
        return updated_count

# Signal handlers for bulk operations
@receiver(bulk_created)
def handle_bulk_creation(sender, instances, **kwargs):
    """Handle bulk creation events"""
    # Update related statistics
    update_model_statistics(sender, len(instances))
    
    # Send batch notification
    send_bulk_creation_notification(sender, instances)

@receiver(bulk_updated)
def handle_bulk_update(sender, instances, fields, **kwargs):
    """Handle bulk update events"""
    # Log bulk update
    BulkUpdateLog.objects.create(
        model_name=sender._meta.label,
        updated_count=len(instances),
        updated_fields=fields,
        updated_at=timezone.now()
    )
    
    # Invalidate related caches
    invalidate_model_cache(sender)

Performance Optimization

Signal Performance Monitoring

# Signal performance monitoring
import time
import functools
from collections import defaultdict

class SignalPerformanceMonitor:
    """Monitor signal handler performance"""
    
    def __init__(self):
        self.stats = defaultdict(list)
        self.enabled = True
    
    def monitor_handler(self, handler_func):
        """Decorator to monitor signal handler performance"""
        @functools.wraps(handler_func)
        def wrapper(*args, **kwargs):
            if not self.enabled:
                return handler_func(*args, **kwargs)
            
            start_time = time.time()
            try:
                result = handler_func(*args, **kwargs)
                success = True
                error = None
            except Exception as e:
                success = False
                error = str(e)
                raise
            finally:
                end_time = time.time()
                duration = end_time - start_time
                
                self.stats[handler_func.__name__].append({
                    'duration': duration,
                    'success': success,
                    'error': error,
                    'timestamp': time.time()
                })
        
        return wrapper
    
    def get_stats(self, handler_name=None):
        """Get performance statistics"""
        if handler_name:
            return self.stats.get(handler_name, [])
        return dict(self.stats)
    
    def get_slow_handlers(self, threshold=1.0):
        """Get handlers that exceed threshold"""
        slow_handlers = {}
        
        for handler_name, executions in self.stats.items():
            slow_executions = [
                exec for exec in executions 
                if exec['duration'] > threshold
            ]
            
            if slow_executions:
                slow_handlers[handler_name] = {
                    'slow_count': len(slow_executions),
                    'total_count': len(executions),
                    'avg_slow_duration': sum(e['duration'] for e in slow_executions) / len(slow_executions),
                    'max_duration': max(e['duration'] for e in slow_executions)
                }
        
        return slow_handlers

# Global monitor instance
signal_monitor = SignalPerformanceMonitor()

# Usage
@receiver(post_save, sender=Order)
@signal_monitor.monitor_handler
def expensive_order_processing(sender, instance, created, **kwargs):
    """Expensive order processing with monitoring"""
    if created:
        # Simulate expensive operation
        time.sleep(0.5)
        process_order_analytics(instance)
        update_customer_segments(instance.customer)
        sync_with_external_system(instance)

# Management command to view signal performance
class Command(BaseCommand):
    """View signal performance statistics"""
    
    def handle(self, *args, **options):
        stats = signal_monitor.get_stats()
        
        for handler_name, executions in stats.items():
            if executions:
                avg_duration = sum(e['duration'] for e in executions) / len(executions)
                max_duration = max(e['duration'] for e in executions)
                error_count = sum(1 for e in executions if not e['success'])
                
                self.stdout.write(f"\nHandler: {handler_name}")
                self.stdout.write(f"  Executions: {len(executions)}")
                self.stdout.write(f"  Average duration: {avg_duration:.4f}s")
                self.stdout.write(f"  Max duration: {max_duration:.4f}s")
                self.stdout.write(f"  Errors: {error_count}")
        
        # Show slow handlers
        slow_handlers = signal_monitor.get_slow_handlers(0.1)  # 100ms threshold
        if slow_handlers:
            self.stdout.write("\nSlow handlers (>100ms):")
            for handler_name, stats in slow_handlers.items():
                self.stdout.write(f"  {handler_name}: {stats['slow_count']}/{stats['total_count']} slow executions")

Async Signal Processing

# Async signal processing with Celery
from celery import shared_task
from django.db import transaction

class AsyncSignalMixin:
    """Mixin for async signal processing"""
    
    @classmethod
    def send_async(cls, sender, **kwargs):
        """Send signal asynchronously"""
        # Serialize signal data
        signal_data = {
            'sender': f"{sender._meta.app_label}.{sender._meta.model_name}",
            'kwargs': cls._serialize_kwargs(kwargs)
        }
        
        # Send to Celery task
        process_signal_async.delay(cls.__name__, signal_data)
    
    @staticmethod
    def _serialize_kwargs(kwargs):
        """Serialize signal kwargs for Celery"""
        serialized = {}
        
        for key, value in kwargs.items():
            if hasattr(value, 'pk'):
                # Model instance
                serialized[key] = {
                    'type': 'model_instance',
                    'model': f"{value._meta.app_label}.{value._meta.model_name}",
                    'pk': value.pk
                }
            else:
                # Regular value
                serialized[key] = {
                    'type': 'value',
                    'value': value
                }
        
        return serialized

@shared_task
def process_signal_async(signal_name, signal_data):
    """Process signal asynchronously"""
    from django.apps import apps
    
    # Deserialize sender
    app_label, model_name = signal_data['sender'].split('.')
    sender = apps.get_model(app_label, model_name)
    
    # Deserialize kwargs
    kwargs = {}
    for key, data in signal_data['kwargs'].items():
        if data['type'] == 'model_instance':
            app_label, model_name = data['model'].split('.')
            model_class = apps.get_model(app_label, model_name)
            kwargs[key] = model_class.objects.get(pk=data['pk'])
        else:
            kwargs[key] = data['value']
    
    # Get signal instance and send
    signal_instance = globals()[signal_name]
    signal_instance.send(sender=sender, **kwargs)

# Async signal handlers
@receiver(order_status_changed)
def handle_order_status_async(sender, instance, **kwargs):
    """Handle order status change asynchronously"""
    # Process immediately for critical operations
    if kwargs['new_status'] == 'cancelled':
        release_inventory_for_order(instance)
    
    # Process non-critical operations asynchronously
    process_order_analytics_async.delay(instance.pk, kwargs['new_status'])
    send_status_notification_async.delay(instance.pk, kwargs['new_status'])

@shared_task
def process_order_analytics_async(order_id, new_status):
    """Process order analytics asynchronously"""
    order = Order.objects.get(pk=order_id)
    
    # Update analytics
    update_order_analytics(order, new_status)
    
    # Update customer segments
    update_customer_segments(order.customer)
    
    # Sync with external systems
    sync_order_with_external_systems(order)

# Signal batching for performance
class BatchedSignalProcessor:
    """Batch signal processing for performance"""
    
    def __init__(self, batch_size=100, flush_interval=60):
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self.batches = defaultdict(list)
        self.last_flush = time.time()
    
    def add_signal(self, signal_name, sender, **kwargs):
        """Add signal to batch"""
        self.batches[signal_name].append({
            'sender': sender,
            'kwargs': kwargs,
            'timestamp': time.time()
        })
        
        # Check if batch is full or flush interval exceeded
        if (len(self.batches[signal_name]) >= self.batch_size or 
            time.time() - self.last_flush > self.flush_interval):
            self.flush_batch(signal_name)
    
    def flush_batch(self, signal_name):
        """Flush batch of signals"""
        if signal_name not in self.batches or not self.batches[signal_name]:
            return
        
        batch = self.batches[signal_name]
        self.batches[signal_name] = []
        self.last_flush = time.time()
        
        # Process batch asynchronously
        process_signal_batch.delay(signal_name, batch)

@shared_task
def process_signal_batch(signal_name, batch):
    """Process batch of signals"""
    for signal_data in batch:
        # Process each signal in the batch
        sender = signal_data['sender']
        kwargs = signal_data['kwargs']
        
        # Execute signal handlers
        signal_instance = globals()[signal_name]
        signal_instance.send(sender=sender, **kwargs)

# Global batch processor
batch_processor = BatchedSignalProcessor()

# Usage in signal handlers
@receiver(user_activity_detected)
def batch_user_activity(sender, instance, **kwargs):
    """Batch user activity signals"""
    batch_processor.add_signal('user_activity_detected', sender, instance=instance, **kwargs)

Testing Signals

# Testing signal handlers
from django.test import TestCase, override_settings
from django.test.utils import override_settings
from unittest.mock import patch, MagicMock

class SignalTestCase(TestCase):
    """Test case for signal handlers"""
    
    def setUp(self):
        # Clear signal stats
        signal_monitor.stats.clear()
    
    def test_order_status_change_signal(self):
        """Test order status change signal"""
        # Create order
        order = Order.objects.create(
            customer=self.user,
            status='pending',
            total=100.00
        )
        
        # Mock external services
        with patch('myapp.signals.send_order_confirmation_email') as mock_email:
            with patch('myapp.signals.reserve_inventory_for_order') as mock_inventory:
                # Change status
                order.status = 'confirmed'
                order.save()
                
                # Verify signal handlers were called
                mock_email.assert_called_once_with(order)
                mock_inventory.assert_called_once_with(order)
    
    def test_signal_performance(self):
        """Test signal handler performance"""
        # Create multiple orders to trigger signals
        orders = []
        for i in range(10):
            order = Order.objects.create(
                customer=self.user,
                status='pending',
                total=100.00 + i
            )
            orders.append(order)
        
        # Check signal performance
        stats = signal_monitor.get_stats()
        self.assertIn('expensive_order_processing', stats)
        
        # Verify no slow handlers
        slow_handlers = signal_monitor.get_slow_handlers(0.1)
        self.assertEqual(len(slow_handlers), 0)
    
    @override_settings(CELERY_TASK_ALWAYS_EAGER=True)
    def test_async_signal_processing(self):
        """Test async signal processing"""
        with patch('myapp.tasks.process_order_analytics_async.delay') as mock_task:
            # Create order
            order = Order.objects.create(
                customer=self.user,
                status='pending',
                total=100.00
            )
            
            # Change status
            order.status = 'confirmed'
            order.save()
            
            # Verify async task was called
            mock_task.assert_called_once_with(order.pk, 'confirmed')
    
    def test_signal_context(self):
        """Test signal context middleware"""
        from django.test import RequestFactory
        
        factory = RequestFactory()
        request = factory.post('/orders/', {'total': 100.00})
        request.user = self.user
        
        # Set signal context
        SignalContext.set_request(request)
        
        try:
            # Create order (should capture context)
            order = Order.objects.create(
                customer=self.user,
                status='pending',
                total=100.00
            )
            
            # Verify context was captured
            log_entry = OrderCreationLog.objects.get(order=order)
            self.assertEqual(log_entry.created_by, self.user)
            self.assertIsNotNone(log_entry.ip_address)
        
        finally:
            SignalContext.clear()

Django signals provide powerful event-driven capabilities when used correctly. The key is understanding their performance implications, implementing proper error handling, and using them judiciously to maintain loose coupling while avoiding performance bottlenecks. Advanced patterns like async processing, batching, and conditional handling enable sophisticated event-driven architectures that scale effectively.