Deployment

Monitoring and Logging

Comprehensive monitoring and logging are essential for maintaining healthy Django applications in production. This chapter covers application performance monitoring, error tracking, log aggregation, alerting systems, and observability best practices for Django applications.

Monitoring and Logging

Comprehensive monitoring and logging are essential for maintaining healthy Django applications in production. This chapter covers application performance monitoring, error tracking, log aggregation, alerting systems, and observability best practices for Django applications.

Application Performance Monitoring (APM)

Django Performance Monitoring Setup

# settings/monitoring.py
import os

# Sentry for error tracking
import sentry_sdk
from sentry_sdk.integrations.django import DjangoIntegration
from sentry_sdk.integrations.celery import CeleryIntegration
from sentry_sdk.integrations.redis import RedisIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration

sentry_sdk.init(
    dsn=os.environ.get('SENTRY_DSN'),
    integrations=[
        DjangoIntegration(
            transaction_style='url',
            middleware_spans=True,
            signals_spans=True,
        ),
        CeleryIntegration(monitor_beat_tasks=True),
        RedisIntegration(),
        SqlalchemyIntegration(),
    ],
    traces_sample_rate=0.1,  # 10% of transactions
    send_default_pii=False,
    environment=os.environ.get('ENVIRONMENT', 'production'),
    release=os.environ.get('GIT_COMMIT', 'unknown'),
    before_send=filter_sensitive_data,
)

def filter_sensitive_data(event, hint):
    """Filter sensitive data from Sentry events"""
    # Remove sensitive headers
    if 'request' in event and 'headers' in event['request']:
        sensitive_headers = ['authorization', 'cookie', 'x-api-key']
        for header in sensitive_headers:
            event['request']['headers'].pop(header, None)
    
    # Remove sensitive form data
    if 'request' in event and 'data' in event['request']:
        sensitive_fields = ['password', 'token', 'secret', 'key']
        for field in sensitive_fields:
            if field in event['request']['data']:
                event['request']['data'][field] = '[Filtered]'
    
    return event

# New Relic APM
NEW_RELIC_CONFIG_FILE = os.path.join(BASE_DIR, 'newrelic.ini')
NEW_RELIC_ENVIRONMENT = os.environ.get('ENVIRONMENT', 'production')

# DataDog APM
DATADOG_TRACE = {
    'DEFAULT_SERVICE': 'django-app',
    'TAGS': {
        'env': os.environ.get('ENVIRONMENT', 'production'),
        'version': os.environ.get('GIT_COMMIT', 'unknown'),
    },
}

# Custom metrics collection
MONITORING_ENABLED = True
METRICS_BACKEND = 'myproject.monitoring.backends.PrometheusBackend'

Custom Performance Monitoring

# monitoring/performance.py
import time
import threading
from collections import defaultdict, deque
from django.db import connection
from django.core.cache import cache
from django.utils import timezone
import psutil

class PerformanceMonitor:
    """Custom performance monitoring for Django applications"""
    
    def __init__(self):
        self.request_metrics = defaultdict(list)
        self.database_metrics = deque(maxlen=1000)
        self.cache_metrics = defaultdict(int)
        self.error_metrics = defaultdict(int)
        self.lock = threading.Lock()
        self.start_time = time.time()
    
    def record_request(self, method, path, status_code, response_time, 
                      db_queries=0, cache_hits=0, cache_misses=0):
        """Record request performance metrics"""
        with self.lock:
            timestamp = time.time()
            
            metric = {
                'timestamp': timestamp,
                'method': method,
                'path': path,
                'status_code': status_code,
                'response_time': response_time,
                'db_queries': db_queries,
                'cache_hits': cache_hits,
                'cache_misses': cache_misses,
            }
            
            self.request_metrics[path].append(metric)
            
            # Keep only last 100 requests per endpoint
            if len(self.request_metrics[path]) > 100:
                self.request_metrics[path].pop(0)
            
            # Track errors
            if status_code >= 400:
                self.error_metrics[status_code] += 1
    
    def record_database_query(self, query, execution_time, table=None):
        """Record database query metrics"""
        with self.lock:
            metric = {
                'timestamp': time.time(),
                'query': query[:200],  # Truncate long queries
                'execution_time': execution_time,
                'table': table,
            }
            self.database_metrics.append(metric)
    
    def get_performance_summary(self):
        """Get performance summary"""
        with self.lock:
            current_time = time.time()
            uptime = current_time - self.start_time
            
            # Calculate request statistics
            all_requests = []
            for path_requests in self.request_metrics.values():
                all_requests.extend(path_requests)
            
            if all_requests:
                response_times = [r['response_time'] for r in all_requests]
                avg_response_time = sum(response_times) / len(response_times)
                p95_response_time = sorted(response_times)[int(len(response_times) * 0.95)]
                p99_response_time = sorted(response_times)[int(len(response_times) * 0.99)]
                
                total_requests = len(all_requests)
                error_count = sum(1 for r in all_requests if r['status_code'] >= 400)
                error_rate = error_count / total_requests if total_requests > 0 else 0
            else:
                avg_response_time = p95_response_time = p99_response_time = 0
                total_requests = error_count = error_rate = 0
            
            # Database statistics
            if self.database_metrics:
                db_times = [m['execution_time'] for m in self.database_metrics]
                avg_db_time = sum(db_times) / len(db_times)
                slow_queries = sum(1 for t in db_times if t > 1.0)  # > 1 second
            else:
                avg_db_time = slow_queries = 0
            
            # System metrics
            cpu_percent = psutil.cpu_percent()
            memory = psutil.virtual_memory()
            disk = psutil.disk_usage('/')
            
            return {
                'timestamp': current_time,
                'uptime': uptime,
                'requests': {
                    'total': total_requests,
                    'error_count': error_count,
                    'error_rate': error_rate,
                    'avg_response_time': avg_response_time,
                    'p95_response_time': p95_response_time,
                    'p99_response_time': p99_response_time,
                },
                'database': {
                    'avg_query_time': avg_db_time,
                    'slow_queries': slow_queries,
                    'total_queries': len(self.database_metrics),
                },
                'system': {
                    'cpu_percent': cpu_percent,
                    'memory_percent': memory.percent,
                    'memory_available': memory.available,
                    'disk_percent': (disk.used / disk.total) * 100,
                },
                'errors': dict(self.error_metrics),
            }
    
    def get_endpoint_metrics(self, path):
        """Get metrics for specific endpoint"""
        with self.lock:
            if path not in self.request_metrics:
                return None
            
            requests = self.request_metrics[path]
            if not requests:
                return None
            
            response_times = [r['response_time'] for r in requests]
            db_queries = [r['db_queries'] for r in requests]
            
            return {
                'path': path,
                'request_count': len(requests),
                'avg_response_time': sum(response_times) / len(response_times),
                'min_response_time': min(response_times),
                'max_response_time': max(response_times),
                'avg_db_queries': sum(db_queries) / len(db_queries),
                'status_codes': defaultdict(int),
            }

# Global performance monitor
performance_monitor = PerformanceMonitor()

class PerformanceMiddleware:
    """Middleware to collect performance metrics"""
    
    def __init__(self, get_response):
        self.get_response = get_response
    
    def __call__(self, request):
        start_time = time.time()
        
        # Count initial database queries
        initial_queries = len(connection.queries)
        
        # Count initial cache operations
        initial_cache_hits = getattr(cache, '_cache_hits', 0)
        initial_cache_misses = getattr(cache, '_cache_misses', 0)
        
        response = self.get_response(request)
        
        # Calculate metrics
        response_time = time.time() - start_time
        db_queries = len(connection.queries) - initial_queries
        cache_hits = getattr(cache, '_cache_hits', 0) - initial_cache_hits
        cache_misses = getattr(cache, '_cache_misses', 0) - initial_cache_misses
        
        # Record metrics
        performance_monitor.record_request(
            method=request.method,
            path=request.path,
            status_code=response.status_code,
            response_time=response_time,
            db_queries=db_queries,
            cache_hits=cache_hits,
            cache_misses=cache_misses,
        )
        
        # Add performance headers
        response['X-Response-Time'] = f'{response_time:.3f}s'
        response['X-DB-Queries'] = str(db_queries)
        
        return response

Prometheus Metrics Integration

# monitoring/prometheus.py
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from django.http import HttpResponse
import time

# Prometheus metrics
REQUEST_COUNT = Counter(
    'django_requests_total',
    'Total Django requests',
    ['method', 'endpoint', 'status']
)

REQUEST_DURATION = Histogram(
    'django_request_duration_seconds',
    'Django request duration',
    ['method', 'endpoint']
)

DATABASE_QUERIES = Histogram(
    'django_database_queries_total',
    'Number of database queries per request',
    ['endpoint']
)

ACTIVE_USERS = Gauge(
    'django_active_users',
    'Number of active users'
)

CACHE_OPERATIONS = Counter(
    'django_cache_operations_total',
    'Cache operations',
    ['operation', 'result']
)

ERROR_COUNT = Counter(
    'django_errors_total',
    'Total errors',
    ['error_type', 'endpoint']
)

class PrometheusMiddleware:
    """Middleware to collect Prometheus metrics"""
    
    def __init__(self, get_response):
        self.get_response = get_response
    
    def __call__(self, request):
        start_time = time.time()
        
        # Count initial database queries
        from django.db import connection
        initial_queries = len(connection.queries)
        
        try:
            response = self.get_response(request)
            
            # Record metrics
            REQUEST_COUNT.labels(
                method=request.method,
                endpoint=request.resolver_match.url_name if request.resolver_match else 'unknown',
                status=response.status_code
            ).inc()
            
            REQUEST_DURATION.labels(
                method=request.method,
                endpoint=request.resolver_match.url_name if request.resolver_match else 'unknown'
            ).observe(time.time() - start_time)
            
            DATABASE_QUERIES.labels(
                endpoint=request.resolver_match.url_name if request.resolver_match else 'unknown'
            ).observe(len(connection.queries) - initial_queries)
            
            return response
            
        except Exception as e:
            ERROR_COUNT.labels(
                error_type=type(e).__name__,
                endpoint=request.resolver_match.url_name if request.resolver_match else 'unknown'
            ).inc()
            raise

def metrics_view(request):
    """Prometheus metrics endpoint"""
    return HttpResponse(generate_latest(), content_type='text/plain')

# Custom metrics collector
class DjangoMetricsCollector:
    """Collect Django-specific metrics for Prometheus"""
    
    def __init__(self):
        self.user_gauge = Gauge('django_active_sessions', 'Active user sessions')
        self.model_counts = {}
    
    def collect_user_metrics(self):
        """Collect user-related metrics"""
        from django.contrib.sessions.models import Session
        from django.utils import timezone
        
        active_sessions = Session.objects.filter(
            expire_date__gte=timezone.now()
        ).count()
        
        self.user_gauge.set(active_sessions)
    
    def collect_model_metrics(self):
        """Collect model count metrics"""
        from django.apps import apps
        
        for model in apps.get_models():
            model_name = f"{model._meta.app_label}_{model._meta.model_name}"
            
            if model_name not in self.model_counts:
                self.model_counts[model_name] = Gauge(
                    f'django_model_count_{model_name}',
                    f'Count of {model_name} objects'
                )
            
            try:
                count = model.objects.count()
                self.model_counts[model_name].set(count)
            except Exception:
                pass  # Skip models that can't be counted
    
    def collect_all_metrics(self):
        """Collect all custom metrics"""
        self.collect_user_metrics()
        self.collect_model_metrics()

# Initialize metrics collector
metrics_collector = DjangoMetricsCollector()

Structured Logging

Advanced Logging Configuration

# settings/logging.py
import os
import logging.config

# Structured logging configuration
LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'verbose': {
            'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
            'style': '{',
        },
        'json': {
            '()': 'myproject.logging.formatters.JSONFormatter',
        },
        'structured': {
            '()': 'myproject.logging.formatters.StructuredFormatter',
        },
    },
    'filters': {
        'require_debug_false': {
            '()': 'django.utils.log.RequireDebugFalse',
        },
        'require_debug_true': {
            '()': 'django.utils.log.RequireDebugTrue',
        },
        'sensitive_data_filter': {
            '()': 'myproject.logging.filters.SensitiveDataFilter',
        },
    },
    'handlers': {
        'console': {
            'level': 'INFO',
            'class': 'logging.StreamHandler',
            'formatter': 'json',
            'filters': ['sensitive_data_filter'],
        },
        'file': {
            'level': 'INFO',
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': '/var/log/django/django.log',
            'maxBytes': 1024*1024*15,  # 15MB
            'backupCount': 10,
            'formatter': 'json',
            'filters': ['sensitive_data_filter'],
        },
        'error_file': {
            'level': 'ERROR',
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': '/var/log/django/error.log',
            'maxBytes': 1024*1024*15,  # 15MB
            'backupCount': 10,
            'formatter': 'json',
            'filters': ['sensitive_data_filter'],
        },
        'security_file': {
            'level': 'INFO',
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': '/var/log/django/security.log',
            'maxBytes': 1024*1024*15,  # 15MB
            'backupCount': 10,
            'formatter': 'json',
        },
        'performance_file': {
            'level': 'INFO',
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': '/var/log/django/performance.log',
            'maxBytes': 1024*1024*15,  # 15MB
            'backupCount': 10,
            'formatter': 'json',
        },
        'elasticsearch': {
            'level': 'INFO',
            'class': 'myproject.logging.handlers.ElasticsearchHandler',
            'formatter': 'json',
            'index': 'django-logs',
        },
        'syslog': {
            'level': 'INFO',
            'class': 'logging.handlers.SysLogHandler',
            'address': '/dev/log',
            'formatter': 'structured',
        },
    },
    'root': {
        'handlers': ['console', 'file'],
        'level': 'INFO',
    },
    'loggers': {
        'django': {
            'handlers': ['console', 'file'],
            'level': 'INFO',
            'propagate': False,
        },
        'django.request': {
            'handlers': ['error_file', 'elasticsearch'],
            'level': 'ERROR',
            'propagate': False,
        },
        'django.security': {
            'handlers': ['security_file', 'elasticsearch'],
            'level': 'INFO',
            'propagate': False,
        },
        'myproject.performance': {
            'handlers': ['performance_file'],
            'level': 'INFO',
            'propagate': False,
        },
        'myproject.business': {
            'handlers': ['file', 'elasticsearch'],
            'level': 'INFO',
            'propagate': False,
        },
        'celery': {
            'handlers': ['file'],
            'level': 'INFO',
            'propagate': False,
        },
    },
}

# Environment-specific logging
if os.environ.get('ENVIRONMENT') == 'development':
    LOGGING['handlers']['console']['level'] = 'DEBUG'
    LOGGING['loggers']['django']['level'] = 'DEBUG'
elif os.environ.get('ENVIRONMENT') == 'production':
    # Add CloudWatch logging in production
    LOGGING['handlers']['cloudwatch'] = {
        'level': 'INFO',
        'class': 'watchtower.CloudWatchLogsHandler',
        'log_group': 'django-app',
        'stream_name': 'production',
        'formatter': 'json',
    }
    LOGGING['root']['handlers'].append('cloudwatch')

Custom Logging Components

# logging/formatters.py
import json
import logging
import traceback
from datetime import datetime
from django.utils import timezone

class JSONFormatter(logging.Formatter):
    """JSON formatter for structured logging"""
    
    def format(self, record):
        log_entry = {
            'timestamp': timezone.now().isoformat(),
            'level': record.levelname,
            'logger': record.name,
            'message': record.getMessage(),
            'module': record.module,
            'function': record.funcName,
            'line': record.lineno,
            'process': record.process,
            'thread': record.thread,
        }
        
        # Add exception info if present
        if record.exc_info:
            log_entry['exception'] = {
                'type': record.exc_info[0].__name__,
                'message': str(record.exc_info[1]),
                'traceback': traceback.format_exception(*record.exc_info),
            }
        
        # Add extra fields from the log record
        extra_fields = {}
        for key, value in record.__dict__.items():
            if key not in ['name', 'msg', 'args', 'levelname', 'levelno', 
                          'pathname', 'filename', 'module', 'lineno', 
                          'funcName', 'created', 'msecs', 'relativeCreated', 
                          'thread', 'threadName', 'processName', 'process',
                          'getMessage', 'exc_info', 'exc_text', 'stack_info']:
                extra_fields[key] = value
        
        if extra_fields:
            log_entry['extra'] = extra_fields
        
        return json.dumps(log_entry, default=str)

class StructuredFormatter(logging.Formatter):
    """Structured formatter for syslog"""
    
    def format(self, record):
        structured_data = []
        
        # Add Django-specific structured data
        if hasattr(record, 'request'):
            request = record.request
            structured_data.append(
                f'[request@django method="{request.method}" '
                f'path="{request.path}" user="{getattr(request, "user", "anonymous")}"]'
            )
        
        if hasattr(record, 'response_time'):
            structured_data.append(
                f'[performance@django response_time="{record.response_time}"]'
            )
        
        structured_part = ''.join(structured_data)
        base_message = super().format(record)
        
        return f'{base_message} {structured_part}'

# logging/filters.py
import re

class SensitiveDataFilter(logging.Filter):
    """Filter sensitive data from log records"""
    
    SENSITIVE_PATTERNS = [
        (re.compile(r'password["\']?\s*[:=]\s*["\']?([^"\'&\s]+)', re.IGNORECASE), 'password'),
        (re.compile(r'token["\']?\s*[:=]\s*["\']?([^"\'&\s]+)', re.IGNORECASE), 'token'),
        (re.compile(r'key["\']?\s*[:=]\s*["\']?([^"\'&\s]+)', re.IGNORECASE), 'key'),
        (re.compile(r'secret["\']?\s*[:=]\s*["\']?([^"\'&\s]+)', re.IGNORECASE), 'secret'),
        (re.compile(r'authorization:\s*([^\s]+)', re.IGNORECASE), 'authorization'),
        (re.compile(r'(\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4})'), 'credit_card'),
        (re.compile(r'(\d{3}-\d{2}-\d{4})'), 'ssn'),
    ]
    
    def filter(self, record):
        """Filter sensitive data from log record"""
        if hasattr(record, 'msg'):
            message = str(record.msg)
            for pattern, replacement in self.SENSITIVE_PATTERNS:
                message = pattern.sub(f'[FILTERED_{replacement.upper()}]', message)
            record.msg = message
        
        # Filter extra fields
        for key, value in record.__dict__.items():
            if isinstance(value, str):
                for pattern, replacement in self.SENSITIVE_PATTERNS:
                    if pattern.search(value):
                        setattr(record, key, f'[FILTERED_{replacement.upper()}]')
        
        return True

# logging/handlers.py
import json
import requests
from logging import Handler
from elasticsearch import Elasticsearch

class ElasticsearchHandler(Handler):
    """Custom handler for Elasticsearch logging"""
    
    def __init__(self, index='django-logs', doc_type='log'):
        super().__init__()
        self.es = Elasticsearch([
            {'host': 'localhost', 'port': 9200}
        ])
        self.index = index
        self.doc_type = doc_type
    
    def emit(self, record):
        """Emit log record to Elasticsearch"""
        try:
            log_entry = json.loads(self.format(record))
            self.es.index(
                index=f"{self.index}-{record.created:%Y.%m.%d}",
                doc_type=self.doc_type,
                body=log_entry
            )
        except Exception:
            self.handleError(record)

class SlackHandler(Handler):
    """Custom handler for Slack notifications"""
    
    def __init__(self, webhook_url, channel='#alerts'):
        super().__init__()
        self.webhook_url = webhook_url
        self.channel = channel
    
    def emit(self, record):
        """Send log record to Slack"""
        if record.levelno < logging.ERROR:
            return
        
        try:
            message = {
                'channel': self.channel,
                'username': 'Django Logger',
                'text': f'*{record.levelname}*: {record.getMessage()}',
                'attachments': [
                    {
                        'color': 'danger' if record.levelno >= logging.ERROR else 'warning',
                        'fields': [
                            {'title': 'Module', 'value': record.module, 'short': True},
                            {'title': 'Function', 'value': record.funcName, 'short': True},
                            {'title': 'Line', 'value': str(record.lineno), 'short': True},
                            {'title': 'Time', 'value': record.asctime, 'short': True},
                        ]
                    }
                ]
            }
            
            if record.exc_info:
                message['attachments'][0]['fields'].append({
                    'title': 'Exception',
                    'value': f'```{self.format(record)}```',
                    'short': False
                })
            
            requests.post(self.webhook_url, json=message, timeout=5)
        except Exception:
            self.handleError(record)

Business Logic Logging

# logging/business.py
import logging
from functools import wraps
from django.utils import timezone

# Business logic logger
business_logger = logging.getLogger('myproject.business')

def log_business_event(event_type, **kwargs):
    """Log business events with structured data"""
    business_logger.info(
        f"Business event: {event_type}",
        extra={
            'event_type': event_type,
            'timestamp': timezone.now().isoformat(),
            **kwargs
        }
    )

def log_user_action(action, user, **kwargs):
    """Log user actions"""
    business_logger.info(
        f"User action: {action}",
        extra={
            'action': action,
            'user_id': user.id if user.is_authenticated else None,
            'username': user.username if user.is_authenticated else 'anonymous',
            'timestamp': timezone.now().isoformat(),
            **kwargs
        }
    )

def log_performance_issue(operation, duration, threshold=1.0, **kwargs):
    """Log performance issues"""
    if duration > threshold:
        performance_logger = logging.getLogger('myproject.performance')
        performance_logger.warning(
            f"Slow operation: {operation}",
            extra={
                'operation': operation,
                'duration': duration,
                'threshold': threshold,
                'timestamp': timezone.now().isoformat(),
                **kwargs
            }
        )

# Decorators for automatic logging
def log_function_call(logger=None, level=logging.INFO):
    """Decorator to log function calls"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            func_logger = logger or logging.getLogger(func.__module__)
            
            func_logger.log(
                level,
                f"Calling {func.__name__}",
                extra={
                    'function': func.__name__,
                    'module': func.__module__,
                    'args_count': len(args),
                    'kwargs_keys': list(kwargs.keys()),
                }
            )
            
            try:
                result = func(*args, **kwargs)
                func_logger.log(
                    level,
                    f"Completed {func.__name__}",
                    extra={
                        'function': func.__name__,
                        'module': func.__module__,
                        'success': True,
                    }
                )
                return result
            except Exception as e:
                func_logger.error(
                    f"Error in {func.__name__}: {str(e)}",
                    extra={
                        'function': func.__name__,
                        'module': func.__module__,
                        'error': str(e),
                        'error_type': type(e).__name__,
                    },
                    exc_info=True
                )
                raise
        
        return wrapper
    return decorator

def log_model_changes(model_class):
    """Decorator to log model changes"""
    def decorator(func):
        @wraps(func)
        def wrapper(self, *args, **kwargs):
            if hasattr(self, 'pk') and self.pk:
                # Update operation
                old_values = {}
                for field in self._meta.fields:
                    old_values[field.name] = getattr(self, field.name)
                
                result = func(self, *args, **kwargs)
                
                # Log changes
                changes = {}
                for field in self._meta.fields:
                    old_value = old_values.get(field.name)
                    new_value = getattr(self, field.name)
                    if old_value != new_value:
                        changes[field.name] = {
                            'old': old_value,
                            'new': new_value
                        }
                
                if changes:
                    business_logger.info(
                        f"Model updated: {model_class.__name__}",
                        extra={
                            'model': model_class.__name__,
                            'pk': self.pk,
                            'changes': changes,
                            'operation': 'update',
                        }
                    )
            else:
                # Create operation
                result = func(self, *args, **kwargs)
                business_logger.info(
                    f"Model created: {model_class.__name__}",
                    extra={
                        'model': model_class.__name__,
                        'pk': self.pk,
                        'operation': 'create',
                    }
                )
            
            return result
        
        return wrapper
    return decorator

# Usage examples
@log_function_call()
def process_payment(user, amount):
    """Process payment with automatic logging"""
    log_business_event(
        'payment_initiated',
        user_id=user.id,
        amount=amount,
        currency='USD'
    )
    
    # Payment processing logic here
    
    log_business_event(
        'payment_completed',
        user_id=user.id,
        amount=amount,
        currency='USD'
    )

class Order(models.Model):
    # Model fields here
    
    @log_model_changes(model_class=lambda: Order)
    def save(self, *args, **kwargs):
        super().save(*args, **kwargs)

Alerting and Monitoring Systems

Custom Alerting System

# monitoring/alerts.py
import smtplib
import requests
import logging
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from django.conf import settings
from django.utils import timezone
from .performance import performance_monitor

class AlertManager:
    """Manage alerts for various system conditions"""
    
    def __init__(self):
        self.alert_channels = {
            'email': self.send_email_alert,
            'slack': self.send_slack_alert,
            'webhook': self.send_webhook_alert,
        }
        self.alert_history = {}
        self.cooldown_period = 300  # 5 minutes
    
    def check_system_health(self):
        """Check system health and send alerts if needed"""
        metrics = performance_monitor.get_performance_summary()
        alerts = []
        
        # Check various conditions
        if metrics['system']['cpu_percent'] > 90:
            alerts.append({
                'level': 'critical',
                'message': f"High CPU usage: {metrics['system']['cpu_percent']:.1f}%",
                'metric': 'cpu_usage',
                'value': metrics['system']['cpu_percent'],
            })
        
        if metrics['system']['memory_percent'] > 90:
            alerts.append({
                'level': 'critical',
                'message': f"High memory usage: {metrics['system']['memory_percent']:.1f}%",
                'metric': 'memory_usage',
                'value': metrics['system']['memory_percent'],
            })
        
        if metrics['requests']['error_rate'] > 0.05:  # 5% error rate
            alerts.append({
                'level': 'warning',
                'message': f"High error rate: {metrics['requests']['error_rate']:.2%}",
                'metric': 'error_rate',
                'value': metrics['requests']['error_rate'],
            })
        
        if metrics['requests']['avg_response_time'] > 2.0:  # 2 seconds
            alerts.append({
                'level': 'warning',
                'message': f"Slow response time: {metrics['requests']['avg_response_time']:.2f}s",
                'metric': 'response_time',
                'value': metrics['requests']['avg_response_time'],
            })
        
        # Send alerts
        for alert in alerts:
            self.send_alert(alert)
    
    def send_alert(self, alert):
        """Send alert through configured channels"""
        alert_key = f"{alert['metric']}_{alert['level']}"
        current_time = timezone.now().timestamp()
        
        # Check cooldown period
        if alert_key in self.alert_history:
            last_sent = self.alert_history[alert_key]
            if current_time - last_sent < self.cooldown_period:
                return  # Skip alert due to cooldown
        
        # Send through all configured channels
        for channel in settings.ALERT_CHANNELS:
            try:
                self.alert_channels[channel](alert)
            except Exception as e:
                logging.error(f"Failed to send alert via {channel}: {str(e)}")
        
        # Update alert history
        self.alert_history[alert_key] = current_time
    
    def send_email_alert(self, alert):
        """Send email alert"""
        msg = MIMEMultipart()
        msg['From'] = settings.ALERT_EMAIL_FROM
        msg['To'] = ', '.join(settings.ALERT_EMAIL_TO)
        msg['Subject'] = f"[{alert['level'].upper()}] Django App Alert"
        
        body = f"""
        Alert Level: {alert['level'].upper()}
        Message: {alert['message']}
        Metric: {alert['metric']}
        Value: {alert['value']}
        Time: {timezone.now().isoformat()}
        
        Please check the application immediately.
        """
        
        msg.attach(MIMEText(body, 'plain'))
        
        server = smtplib.SMTP(settings.EMAIL_HOST, settings.EMAIL_PORT)
        if settings.EMAIL_USE_TLS:
            server.starttls()
        server.login(settings.EMAIL_HOST_USER, settings.EMAIL_HOST_PASSWORD)
        server.send_message(msg)
        server.quit()
    
    def send_slack_alert(self, alert):
        """Send Slack alert"""
        color = 'danger' if alert['level'] == 'critical' else 'warning'
        
        payload = {
            'channel': settings.SLACK_ALERT_CHANNEL,
            'username': 'Django Monitor',
            'text': f"*{alert['level'].upper()} Alert*",
            'attachments': [
                {
                    'color': color,
                    'fields': [
                        {'title': 'Message', 'value': alert['message'], 'short': False},
                        {'title': 'Metric', 'value': alert['metric'], 'short': True},
                        {'title': 'Value', 'value': str(alert['value']), 'short': True},
                        {'title': 'Time', 'value': timezone.now().isoformat(), 'short': True},
                    ]
                }
            ]
        }
        
        requests.post(settings.SLACK_WEBHOOK_URL, json=payload, timeout=10)
    
    def send_webhook_alert(self, alert):
        """Send webhook alert"""
        payload = {
            'alert': alert,
            'timestamp': timezone.now().isoformat(),
            'service': 'django-app',
        }
        
        requests.post(settings.ALERT_WEBHOOK_URL, json=payload, timeout=10)

# Initialize alert manager
alert_manager = AlertManager()

# Celery task for periodic health checks
from celery import shared_task

@shared_task
def check_system_health():
    """Periodic system health check"""
    alert_manager.check_system_health()

This comprehensive monitoring and logging guide provides all the tools needed to maintain visibility into Django application performance, errors, and business metrics in production environments.