Django signals provide a decoupled way to allow certain senders to notify a set of receivers when actions occur. This comprehensive guide covers advanced signal patterns, performance considerations, and best practices for building event-driven Django applications.
# signals.py
import django.dispatch
from django.dispatch import Signal
# Custom signals for business events
user_profile_completed = Signal()
order_status_changed = Signal()
payment_processed = Signal()
inventory_low = Signal()
user_activity_detected = Signal()
# Signal with custom arguments
class OrderSignal(Signal):
"""Custom signal class for order events"""
def __init__(self, providing_args=None, use_caching=False):
super().__init__(providing_args, use_caching)
self.order_events = []
def send(self, sender, **named):
"""Override send to log events"""
self.order_events.append({
'sender': sender,
'timestamp': timezone.now(),
'data': named
})
return super().send(sender, **named)
def get_recent_events(self, limit=10):
"""Get recent order events"""
return self.order_events[-limit:]
order_event = OrderSignal()
# Usage in models
from django.db import models
from django.db.models.signals import post_save, pre_save, post_delete
from django.dispatch import receiver
class Order(models.Model):
customer = models.ForeignKey(User, on_delete=models.CASCADE)
status = models.CharField(max_length=20, default='pending')
total = models.DecimalField(max_digits=10, decimal_places=2)
created_at = models.DateTimeField(auto_now_add=True)
def save(self, *args, **kwargs):
# Check if status changed
if self.pk:
old_instance = Order.objects.get(pk=self.pk)
if old_instance.status != self.status:
# Status changed, will trigger signal after save
self._status_changed = True
self._old_status = old_instance.status
super().save(*args, **kwargs)
# Send custom signal if status changed
if hasattr(self, '_status_changed'):
order_status_changed.send(
sender=self.__class__,
instance=self,
old_status=self._old_status,
new_status=self.status
)
# Advanced signal handlers
@receiver(order_status_changed)
def handle_order_status_change(sender, instance, old_status, new_status, **kwargs):
"""Handle order status changes with business logic"""
# Log status change
OrderStatusLog.objects.create(
order=instance,
old_status=old_status,
new_status=new_status,
changed_at=timezone.now()
)
# Business logic based on status
if new_status == 'confirmed':
# Reserve inventory
reserve_inventory_for_order(instance)
# Send confirmation email
send_order_confirmation_email(instance)
# Update customer statistics
update_customer_order_stats(instance.customer)
elif new_status == 'shipped':
# Generate tracking number
generate_tracking_number(instance)
# Send shipping notification
send_shipping_notification(instance)
elif new_status == 'cancelled':
# Release inventory
release_inventory_for_order(instance)
# Process refund if payment was made
if instance.payment_status == 'paid':
process_refund(instance)
@receiver(post_save, sender=User)
def handle_user_creation(sender, instance, created, **kwargs):
"""Handle user creation with conditional logic"""
if created:
# Create user profile
UserProfile.objects.create(user=instance)
# Send welcome email
send_welcome_email(instance)
# Track user registration
track_user_registration(instance)
# Add to default groups
add_user_to_default_groups(instance)
# middleware.py
class SignalContextMiddleware:
"""Middleware to provide context for signals"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
# Set signal context
SignalContext.set_request(request)
try:
response = self.get_response(request)
return response
finally:
# Clear signal context
SignalContext.clear()
class SignalContext:
"""Thread-local context for signals"""
_context = threading.local()
@classmethod
def set_request(cls, request):
"""Set current request in context"""
cls._context.request = request
cls._context.user = getattr(request, 'user', None)
cls._context.ip_address = cls._get_client_ip(request)
@classmethod
def get_request(cls):
"""Get current request from context"""
return getattr(cls._context, 'request', None)
@classmethod
def get_user(cls):
"""Get current user from context"""
return getattr(cls._context, 'user', None)
@classmethod
def get_ip_address(cls):
"""Get current IP address from context"""
return getattr(cls._context, 'ip_address', None)
@classmethod
def clear(cls):
"""Clear context"""
for attr in ['request', 'user', 'ip_address']:
if hasattr(cls._context, attr):
delattr(cls._context, attr)
@staticmethod
def _get_client_ip(request):
"""Extract client IP from request"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
# Using context in signal handlers
@receiver(post_save, sender=Order)
def log_order_creation(sender, instance, created, **kwargs):
"""Log order creation with context"""
if created:
user = SignalContext.get_user()
ip_address = SignalContext.get_ip_address()
OrderCreationLog.objects.create(
order=instance,
created_by=user,
ip_address=ip_address,
user_agent=SignalContext.get_request().META.get('HTTP_USER_AGENT', ''),
created_at=timezone.now()
)
# Conditional signal decorators
def signal_condition(condition_func):
"""Decorator to conditionally execute signal handlers"""
def decorator(handler_func):
def wrapper(sender, **kwargs):
if condition_func(sender, **kwargs):
return handler_func(sender, **kwargs)
return wrapper
return decorator
def only_for_models(*model_classes):
"""Only execute signal for specific models"""
def condition(sender, **kwargs):
return sender in model_classes
return signal_condition(condition)
def only_when_field_changed(field_name):
"""Only execute when specific field changed"""
def condition(sender, instance, **kwargs):
if not instance.pk:
return True # New instance
try:
old_instance = sender.objects.get(pk=instance.pk)
return getattr(old_instance, field_name) != getattr(instance, field_name)
except sender.DoesNotExist:
return True
return signal_condition(condition)
def only_in_environment(*environments):
"""Only execute in specific environments"""
def condition(sender, **kwargs):
from django.conf import settings
return settings.ENVIRONMENT in environments
return signal_condition(condition)
# Usage examples
@receiver(post_save)
@only_for_models(User, Customer)
@only_in_environment('production', 'staging')
def send_notification_email(sender, instance, created, **kwargs):
"""Send notification only for specific models in specific environments"""
if created:
send_email_notification(instance)
@receiver(post_save, sender=Product)
@only_when_field_changed('price')
def handle_price_change(sender, instance, **kwargs):
"""Handle product price changes"""
# Log price change
PriceChangeLog.objects.create(
product=instance,
old_price=sender.objects.get(pk=instance.pk).price,
new_price=instance.price,
changed_at=timezone.now()
)
# Notify subscribers
notify_price_change_subscribers(instance)
# Custom signals for bulk operations
bulk_created = Signal()
bulk_updated = Signal()
bulk_deleted = Signal()
class BulkSignalManager(models.Manager):
"""Manager that sends signals for bulk operations"""
def bulk_create(self, objs, batch_size=None, ignore_conflicts=False):
"""Bulk create with signal"""
created_objs = super().bulk_create(objs, batch_size, ignore_conflicts)
# Send bulk created signal
bulk_created.send(
sender=self.model,
instances=created_objs,
batch_size=batch_size
)
return created_objs
def bulk_update(self, objs, fields, batch_size=None):
"""Bulk update with signal"""
updated_count = super().bulk_update(objs, fields, batch_size)
# Send bulk updated signal
bulk_updated.send(
sender=self.model,
instances=objs,
fields=fields,
updated_count=updated_count
)
return updated_count
# Signal handlers for bulk operations
@receiver(bulk_created)
def handle_bulk_creation(sender, instances, **kwargs):
"""Handle bulk creation events"""
# Update related statistics
update_model_statistics(sender, len(instances))
# Send batch notification
send_bulk_creation_notification(sender, instances)
@receiver(bulk_updated)
def handle_bulk_update(sender, instances, fields, **kwargs):
"""Handle bulk update events"""
# Log bulk update
BulkUpdateLog.objects.create(
model_name=sender._meta.label,
updated_count=len(instances),
updated_fields=fields,
updated_at=timezone.now()
)
# Invalidate related caches
invalidate_model_cache(sender)
# Signal performance monitoring
import time
import functools
from collections import defaultdict
class SignalPerformanceMonitor:
"""Monitor signal handler performance"""
def __init__(self):
self.stats = defaultdict(list)
self.enabled = True
def monitor_handler(self, handler_func):
"""Decorator to monitor signal handler performance"""
@functools.wraps(handler_func)
def wrapper(*args, **kwargs):
if not self.enabled:
return handler_func(*args, **kwargs)
start_time = time.time()
try:
result = handler_func(*args, **kwargs)
success = True
error = None
except Exception as e:
success = False
error = str(e)
raise
finally:
end_time = time.time()
duration = end_time - start_time
self.stats[handler_func.__name__].append({
'duration': duration,
'success': success,
'error': error,
'timestamp': time.time()
})
return wrapper
def get_stats(self, handler_name=None):
"""Get performance statistics"""
if handler_name:
return self.stats.get(handler_name, [])
return dict(self.stats)
def get_slow_handlers(self, threshold=1.0):
"""Get handlers that exceed threshold"""
slow_handlers = {}
for handler_name, executions in self.stats.items():
slow_executions = [
exec for exec in executions
if exec['duration'] > threshold
]
if slow_executions:
slow_handlers[handler_name] = {
'slow_count': len(slow_executions),
'total_count': len(executions),
'avg_slow_duration': sum(e['duration'] for e in slow_executions) / len(slow_executions),
'max_duration': max(e['duration'] for e in slow_executions)
}
return slow_handlers
# Global monitor instance
signal_monitor = SignalPerformanceMonitor()
# Usage
@receiver(post_save, sender=Order)
@signal_monitor.monitor_handler
def expensive_order_processing(sender, instance, created, **kwargs):
"""Expensive order processing with monitoring"""
if created:
# Simulate expensive operation
time.sleep(0.5)
process_order_analytics(instance)
update_customer_segments(instance.customer)
sync_with_external_system(instance)
# Management command to view signal performance
class Command(BaseCommand):
"""View signal performance statistics"""
def handle(self, *args, **options):
stats = signal_monitor.get_stats()
for handler_name, executions in stats.items():
if executions:
avg_duration = sum(e['duration'] for e in executions) / len(executions)
max_duration = max(e['duration'] for e in executions)
error_count = sum(1 for e in executions if not e['success'])
self.stdout.write(f"\nHandler: {handler_name}")
self.stdout.write(f" Executions: {len(executions)}")
self.stdout.write(f" Average duration: {avg_duration:.4f}s")
self.stdout.write(f" Max duration: {max_duration:.4f}s")
self.stdout.write(f" Errors: {error_count}")
# Show slow handlers
slow_handlers = signal_monitor.get_slow_handlers(0.1) # 100ms threshold
if slow_handlers:
self.stdout.write("\nSlow handlers (>100ms):")
for handler_name, stats in slow_handlers.items():
self.stdout.write(f" {handler_name}: {stats['slow_count']}/{stats['total_count']} slow executions")
# Async signal processing with Celery
from celery import shared_task
from django.db import transaction
class AsyncSignalMixin:
"""Mixin for async signal processing"""
@classmethod
def send_async(cls, sender, **kwargs):
"""Send signal asynchronously"""
# Serialize signal data
signal_data = {
'sender': f"{sender._meta.app_label}.{sender._meta.model_name}",
'kwargs': cls._serialize_kwargs(kwargs)
}
# Send to Celery task
process_signal_async.delay(cls.__name__, signal_data)
@staticmethod
def _serialize_kwargs(kwargs):
"""Serialize signal kwargs for Celery"""
serialized = {}
for key, value in kwargs.items():
if hasattr(value, 'pk'):
# Model instance
serialized[key] = {
'type': 'model_instance',
'model': f"{value._meta.app_label}.{value._meta.model_name}",
'pk': value.pk
}
else:
# Regular value
serialized[key] = {
'type': 'value',
'value': value
}
return serialized
@shared_task
def process_signal_async(signal_name, signal_data):
"""Process signal asynchronously"""
from django.apps import apps
# Deserialize sender
app_label, model_name = signal_data['sender'].split('.')
sender = apps.get_model(app_label, model_name)
# Deserialize kwargs
kwargs = {}
for key, data in signal_data['kwargs'].items():
if data['type'] == 'model_instance':
app_label, model_name = data['model'].split('.')
model_class = apps.get_model(app_label, model_name)
kwargs[key] = model_class.objects.get(pk=data['pk'])
else:
kwargs[key] = data['value']
# Get signal instance and send
signal_instance = globals()[signal_name]
signal_instance.send(sender=sender, **kwargs)
# Async signal handlers
@receiver(order_status_changed)
def handle_order_status_async(sender, instance, **kwargs):
"""Handle order status change asynchronously"""
# Process immediately for critical operations
if kwargs['new_status'] == 'cancelled':
release_inventory_for_order(instance)
# Process non-critical operations asynchronously
process_order_analytics_async.delay(instance.pk, kwargs['new_status'])
send_status_notification_async.delay(instance.pk, kwargs['new_status'])
@shared_task
def process_order_analytics_async(order_id, new_status):
"""Process order analytics asynchronously"""
order = Order.objects.get(pk=order_id)
# Update analytics
update_order_analytics(order, new_status)
# Update customer segments
update_customer_segments(order.customer)
# Sync with external systems
sync_order_with_external_systems(order)
# Signal batching for performance
class BatchedSignalProcessor:
"""Batch signal processing for performance"""
def __init__(self, batch_size=100, flush_interval=60):
self.batch_size = batch_size
self.flush_interval = flush_interval
self.batches = defaultdict(list)
self.last_flush = time.time()
def add_signal(self, signal_name, sender, **kwargs):
"""Add signal to batch"""
self.batches[signal_name].append({
'sender': sender,
'kwargs': kwargs,
'timestamp': time.time()
})
# Check if batch is full or flush interval exceeded
if (len(self.batches[signal_name]) >= self.batch_size or
time.time() - self.last_flush > self.flush_interval):
self.flush_batch(signal_name)
def flush_batch(self, signal_name):
"""Flush batch of signals"""
if signal_name not in self.batches or not self.batches[signal_name]:
return
batch = self.batches[signal_name]
self.batches[signal_name] = []
self.last_flush = time.time()
# Process batch asynchronously
process_signal_batch.delay(signal_name, batch)
@shared_task
def process_signal_batch(signal_name, batch):
"""Process batch of signals"""
for signal_data in batch:
# Process each signal in the batch
sender = signal_data['sender']
kwargs = signal_data['kwargs']
# Execute signal handlers
signal_instance = globals()[signal_name]
signal_instance.send(sender=sender, **kwargs)
# Global batch processor
batch_processor = BatchedSignalProcessor()
# Usage in signal handlers
@receiver(user_activity_detected)
def batch_user_activity(sender, instance, **kwargs):
"""Batch user activity signals"""
batch_processor.add_signal('user_activity_detected', sender, instance=instance, **kwargs)
# Testing signal handlers
from django.test import TestCase, override_settings
from django.test.utils import override_settings
from unittest.mock import patch, MagicMock
class SignalTestCase(TestCase):
"""Test case for signal handlers"""
def setUp(self):
# Clear signal stats
signal_monitor.stats.clear()
def test_order_status_change_signal(self):
"""Test order status change signal"""
# Create order
order = Order.objects.create(
customer=self.user,
status='pending',
total=100.00
)
# Mock external services
with patch('myapp.signals.send_order_confirmation_email') as mock_email:
with patch('myapp.signals.reserve_inventory_for_order') as mock_inventory:
# Change status
order.status = 'confirmed'
order.save()
# Verify signal handlers were called
mock_email.assert_called_once_with(order)
mock_inventory.assert_called_once_with(order)
def test_signal_performance(self):
"""Test signal handler performance"""
# Create multiple orders to trigger signals
orders = []
for i in range(10):
order = Order.objects.create(
customer=self.user,
status='pending',
total=100.00 + i
)
orders.append(order)
# Check signal performance
stats = signal_monitor.get_stats()
self.assertIn('expensive_order_processing', stats)
# Verify no slow handlers
slow_handlers = signal_monitor.get_slow_handlers(0.1)
self.assertEqual(len(slow_handlers), 0)
@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
def test_async_signal_processing(self):
"""Test async signal processing"""
with patch('myapp.tasks.process_order_analytics_async.delay') as mock_task:
# Create order
order = Order.objects.create(
customer=self.user,
status='pending',
total=100.00
)
# Change status
order.status = 'confirmed'
order.save()
# Verify async task was called
mock_task.assert_called_once_with(order.pk, 'confirmed')
def test_signal_context(self):
"""Test signal context middleware"""
from django.test import RequestFactory
factory = RequestFactory()
request = factory.post('/orders/', {'total': 100.00})
request.user = self.user
# Set signal context
SignalContext.set_request(request)
try:
# Create order (should capture context)
order = Order.objects.create(
customer=self.user,
status='pending',
total=100.00
)
# Verify context was captured
log_entry = OrderCreationLog.objects.get(order=order)
self.assertEqual(log_entry.created_by, self.user)
self.assertIsNotNone(log_entry.ip_address)
finally:
SignalContext.clear()
Django signals provide powerful event-driven capabilities when used correctly. The key is understanding their performance implications, implementing proper error handling, and using them judiciously to maintain loose coupling while avoiding performance bottlenecks. Advanced patterns like async processing, batching, and conditional handling enable sophisticated event-driven architectures that scale effectively.
Custom Management Commands
Django's management command system provides a powerful way to create command-line tools for administrative tasks, data processing, and automation. This guide covers creating sophisticated management commands with advanced features like progress tracking, parallel processing, and integration with external systems.
Building Reusable Django Packages
Creating reusable Django packages allows you to share functionality across projects and contribute to the Django ecosystem. This comprehensive guide covers package design, development best practices, testing strategies, and distribution methods for building high-quality Django packages.