Comprehensive monitoring and logging are essential for maintaining healthy Django applications in production. This chapter covers application performance monitoring, error tracking, log aggregation, alerting systems, and observability best practices for Django applications.
# settings/monitoring.py
import os
# Sentry for error tracking
import sentry_sdk
from sentry_sdk.integrations.django import DjangoIntegration
from sentry_sdk.integrations.celery import CeleryIntegration
from sentry_sdk.integrations.redis import RedisIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
sentry_sdk.init(
dsn=os.environ.get('SENTRY_DSN'),
integrations=[
DjangoIntegration(
transaction_style='url',
middleware_spans=True,
signals_spans=True,
),
CeleryIntegration(monitor_beat_tasks=True),
RedisIntegration(),
SqlalchemyIntegration(),
],
traces_sample_rate=0.1, # 10% of transactions
send_default_pii=False,
environment=os.environ.get('ENVIRONMENT', 'production'),
release=os.environ.get('GIT_COMMIT', 'unknown'),
before_send=filter_sensitive_data,
)
def filter_sensitive_data(event, hint):
"""Filter sensitive data from Sentry events"""
# Remove sensitive headers
if 'request' in event and 'headers' in event['request']:
sensitive_headers = ['authorization', 'cookie', 'x-api-key']
for header in sensitive_headers:
event['request']['headers'].pop(header, None)
# Remove sensitive form data
if 'request' in event and 'data' in event['request']:
sensitive_fields = ['password', 'token', 'secret', 'key']
for field in sensitive_fields:
if field in event['request']['data']:
event['request']['data'][field] = '[Filtered]'
return event
# New Relic APM
NEW_RELIC_CONFIG_FILE = os.path.join(BASE_DIR, 'newrelic.ini')
NEW_RELIC_ENVIRONMENT = os.environ.get('ENVIRONMENT', 'production')
# DataDog APM
DATADOG_TRACE = {
'DEFAULT_SERVICE': 'django-app',
'TAGS': {
'env': os.environ.get('ENVIRONMENT', 'production'),
'version': os.environ.get('GIT_COMMIT', 'unknown'),
},
}
# Custom metrics collection
MONITORING_ENABLED = True
METRICS_BACKEND = 'myproject.monitoring.backends.PrometheusBackend'
# monitoring/performance.py
import time
import threading
from collections import defaultdict, deque
from django.db import connection
from django.core.cache import cache
from django.utils import timezone
import psutil
class PerformanceMonitor:
"""Custom performance monitoring for Django applications"""
def __init__(self):
self.request_metrics = defaultdict(list)
self.database_metrics = deque(maxlen=1000)
self.cache_metrics = defaultdict(int)
self.error_metrics = defaultdict(int)
self.lock = threading.Lock()
self.start_time = time.time()
def record_request(self, method, path, status_code, response_time,
db_queries=0, cache_hits=0, cache_misses=0):
"""Record request performance metrics"""
with self.lock:
timestamp = time.time()
metric = {
'timestamp': timestamp,
'method': method,
'path': path,
'status_code': status_code,
'response_time': response_time,
'db_queries': db_queries,
'cache_hits': cache_hits,
'cache_misses': cache_misses,
}
self.request_metrics[path].append(metric)
# Keep only last 100 requests per endpoint
if len(self.request_metrics[path]) > 100:
self.request_metrics[path].pop(0)
# Track errors
if status_code >= 400:
self.error_metrics[status_code] += 1
def record_database_query(self, query, execution_time, table=None):
"""Record database query metrics"""
with self.lock:
metric = {
'timestamp': time.time(),
'query': query[:200], # Truncate long queries
'execution_time': execution_time,
'table': table,
}
self.database_metrics.append(metric)
def get_performance_summary(self):
"""Get performance summary"""
with self.lock:
current_time = time.time()
uptime = current_time - self.start_time
# Calculate request statistics
all_requests = []
for path_requests in self.request_metrics.values():
all_requests.extend(path_requests)
if all_requests:
response_times = [r['response_time'] for r in all_requests]
avg_response_time = sum(response_times) / len(response_times)
p95_response_time = sorted(response_times)[int(len(response_times) * 0.95)]
p99_response_time = sorted(response_times)[int(len(response_times) * 0.99)]
total_requests = len(all_requests)
error_count = sum(1 for r in all_requests if r['status_code'] >= 400)
error_rate = error_count / total_requests if total_requests > 0 else 0
else:
avg_response_time = p95_response_time = p99_response_time = 0
total_requests = error_count = error_rate = 0
# Database statistics
if self.database_metrics:
db_times = [m['execution_time'] for m in self.database_metrics]
avg_db_time = sum(db_times) / len(db_times)
slow_queries = sum(1 for t in db_times if t > 1.0) # > 1 second
else:
avg_db_time = slow_queries = 0
# System metrics
cpu_percent = psutil.cpu_percent()
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
return {
'timestamp': current_time,
'uptime': uptime,
'requests': {
'total': total_requests,
'error_count': error_count,
'error_rate': error_rate,
'avg_response_time': avg_response_time,
'p95_response_time': p95_response_time,
'p99_response_time': p99_response_time,
},
'database': {
'avg_query_time': avg_db_time,
'slow_queries': slow_queries,
'total_queries': len(self.database_metrics),
},
'system': {
'cpu_percent': cpu_percent,
'memory_percent': memory.percent,
'memory_available': memory.available,
'disk_percent': (disk.used / disk.total) * 100,
},
'errors': dict(self.error_metrics),
}
def get_endpoint_metrics(self, path):
"""Get metrics for specific endpoint"""
with self.lock:
if path not in self.request_metrics:
return None
requests = self.request_metrics[path]
if not requests:
return None
response_times = [r['response_time'] for r in requests]
db_queries = [r['db_queries'] for r in requests]
return {
'path': path,
'request_count': len(requests),
'avg_response_time': sum(response_times) / len(response_times),
'min_response_time': min(response_times),
'max_response_time': max(response_times),
'avg_db_queries': sum(db_queries) / len(db_queries),
'status_codes': defaultdict(int),
}
# Global performance monitor
performance_monitor = PerformanceMonitor()
class PerformanceMiddleware:
"""Middleware to collect performance metrics"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
start_time = time.time()
# Count initial database queries
initial_queries = len(connection.queries)
# Count initial cache operations
initial_cache_hits = getattr(cache, '_cache_hits', 0)
initial_cache_misses = getattr(cache, '_cache_misses', 0)
response = self.get_response(request)
# Calculate metrics
response_time = time.time() - start_time
db_queries = len(connection.queries) - initial_queries
cache_hits = getattr(cache, '_cache_hits', 0) - initial_cache_hits
cache_misses = getattr(cache, '_cache_misses', 0) - initial_cache_misses
# Record metrics
performance_monitor.record_request(
method=request.method,
path=request.path,
status_code=response.status_code,
response_time=response_time,
db_queries=db_queries,
cache_hits=cache_hits,
cache_misses=cache_misses,
)
# Add performance headers
response['X-Response-Time'] = f'{response_time:.3f}s'
response['X-DB-Queries'] = str(db_queries)
return response
# monitoring/prometheus.py
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from django.http import HttpResponse
import time
# Prometheus metrics
REQUEST_COUNT = Counter(
'django_requests_total',
'Total Django requests',
['method', 'endpoint', 'status']
)
REQUEST_DURATION = Histogram(
'django_request_duration_seconds',
'Django request duration',
['method', 'endpoint']
)
DATABASE_QUERIES = Histogram(
'django_database_queries_total',
'Number of database queries per request',
['endpoint']
)
ACTIVE_USERS = Gauge(
'django_active_users',
'Number of active users'
)
CACHE_OPERATIONS = Counter(
'django_cache_operations_total',
'Cache operations',
['operation', 'result']
)
ERROR_COUNT = Counter(
'django_errors_total',
'Total errors',
['error_type', 'endpoint']
)
class PrometheusMiddleware:
"""Middleware to collect Prometheus metrics"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
start_time = time.time()
# Count initial database queries
from django.db import connection
initial_queries = len(connection.queries)
try:
response = self.get_response(request)
# Record metrics
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.resolver_match.url_name if request.resolver_match else 'unknown',
status=response.status_code
).inc()
REQUEST_DURATION.labels(
method=request.method,
endpoint=request.resolver_match.url_name if request.resolver_match else 'unknown'
).observe(time.time() - start_time)
DATABASE_QUERIES.labels(
endpoint=request.resolver_match.url_name if request.resolver_match else 'unknown'
).observe(len(connection.queries) - initial_queries)
return response
except Exception as e:
ERROR_COUNT.labels(
error_type=type(e).__name__,
endpoint=request.resolver_match.url_name if request.resolver_match else 'unknown'
).inc()
raise
def metrics_view(request):
"""Prometheus metrics endpoint"""
return HttpResponse(generate_latest(), content_type='text/plain')
# Custom metrics collector
class DjangoMetricsCollector:
"""Collect Django-specific metrics for Prometheus"""
def __init__(self):
self.user_gauge = Gauge('django_active_sessions', 'Active user sessions')
self.model_counts = {}
def collect_user_metrics(self):
"""Collect user-related metrics"""
from django.contrib.sessions.models import Session
from django.utils import timezone
active_sessions = Session.objects.filter(
expire_date__gte=timezone.now()
).count()
self.user_gauge.set(active_sessions)
def collect_model_metrics(self):
"""Collect model count metrics"""
from django.apps import apps
for model in apps.get_models():
model_name = f"{model._meta.app_label}_{model._meta.model_name}"
if model_name not in self.model_counts:
self.model_counts[model_name] = Gauge(
f'django_model_count_{model_name}',
f'Count of {model_name} objects'
)
try:
count = model.objects.count()
self.model_counts[model_name].set(count)
except Exception:
pass # Skip models that can't be counted
def collect_all_metrics(self):
"""Collect all custom metrics"""
self.collect_user_metrics()
self.collect_model_metrics()
# Initialize metrics collector
metrics_collector = DjangoMetricsCollector()
# settings/logging.py
import os
import logging.config
# Structured logging configuration
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'verbose': {
'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
'style': '{',
},
'json': {
'()': 'myproject.logging.formatters.JSONFormatter',
},
'structured': {
'()': 'myproject.logging.formatters.StructuredFormatter',
},
},
'filters': {
'require_debug_false': {
'()': 'django.utils.log.RequireDebugFalse',
},
'require_debug_true': {
'()': 'django.utils.log.RequireDebugTrue',
},
'sensitive_data_filter': {
'()': 'myproject.logging.filters.SensitiveDataFilter',
},
},
'handlers': {
'console': {
'level': 'INFO',
'class': 'logging.StreamHandler',
'formatter': 'json',
'filters': ['sensitive_data_filter'],
},
'file': {
'level': 'INFO',
'class': 'logging.handlers.RotatingFileHandler',
'filename': '/var/log/django/django.log',
'maxBytes': 1024*1024*15, # 15MB
'backupCount': 10,
'formatter': 'json',
'filters': ['sensitive_data_filter'],
},
'error_file': {
'level': 'ERROR',
'class': 'logging.handlers.RotatingFileHandler',
'filename': '/var/log/django/error.log',
'maxBytes': 1024*1024*15, # 15MB
'backupCount': 10,
'formatter': 'json',
'filters': ['sensitive_data_filter'],
},
'security_file': {
'level': 'INFO',
'class': 'logging.handlers.RotatingFileHandler',
'filename': '/var/log/django/security.log',
'maxBytes': 1024*1024*15, # 15MB
'backupCount': 10,
'formatter': 'json',
},
'performance_file': {
'level': 'INFO',
'class': 'logging.handlers.RotatingFileHandler',
'filename': '/var/log/django/performance.log',
'maxBytes': 1024*1024*15, # 15MB
'backupCount': 10,
'formatter': 'json',
},
'elasticsearch': {
'level': 'INFO',
'class': 'myproject.logging.handlers.ElasticsearchHandler',
'formatter': 'json',
'index': 'django-logs',
},
'syslog': {
'level': 'INFO',
'class': 'logging.handlers.SysLogHandler',
'address': '/dev/log',
'formatter': 'structured',
},
},
'root': {
'handlers': ['console', 'file'],
'level': 'INFO',
},
'loggers': {
'django': {
'handlers': ['console', 'file'],
'level': 'INFO',
'propagate': False,
},
'django.request': {
'handlers': ['error_file', 'elasticsearch'],
'level': 'ERROR',
'propagate': False,
},
'django.security': {
'handlers': ['security_file', 'elasticsearch'],
'level': 'INFO',
'propagate': False,
},
'myproject.performance': {
'handlers': ['performance_file'],
'level': 'INFO',
'propagate': False,
},
'myproject.business': {
'handlers': ['file', 'elasticsearch'],
'level': 'INFO',
'propagate': False,
},
'celery': {
'handlers': ['file'],
'level': 'INFO',
'propagate': False,
},
},
}
# Environment-specific logging
if os.environ.get('ENVIRONMENT') == 'development':
LOGGING['handlers']['console']['level'] = 'DEBUG'
LOGGING['loggers']['django']['level'] = 'DEBUG'
elif os.environ.get('ENVIRONMENT') == 'production':
# Add CloudWatch logging in production
LOGGING['handlers']['cloudwatch'] = {
'level': 'INFO',
'class': 'watchtower.CloudWatchLogsHandler',
'log_group': 'django-app',
'stream_name': 'production',
'formatter': 'json',
}
LOGGING['root']['handlers'].append('cloudwatch')
# logging/formatters.py
import json
import logging
import traceback
from datetime import datetime
from django.utils import timezone
class JSONFormatter(logging.Formatter):
"""JSON formatter for structured logging"""
def format(self, record):
log_entry = {
'timestamp': timezone.now().isoformat(),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno,
'process': record.process,
'thread': record.thread,
}
# Add exception info if present
if record.exc_info:
log_entry['exception'] = {
'type': record.exc_info[0].__name__,
'message': str(record.exc_info[1]),
'traceback': traceback.format_exception(*record.exc_info),
}
# Add extra fields from the log record
extra_fields = {}
for key, value in record.__dict__.items():
if key not in ['name', 'msg', 'args', 'levelname', 'levelno',
'pathname', 'filename', 'module', 'lineno',
'funcName', 'created', 'msecs', 'relativeCreated',
'thread', 'threadName', 'processName', 'process',
'getMessage', 'exc_info', 'exc_text', 'stack_info']:
extra_fields[key] = value
if extra_fields:
log_entry['extra'] = extra_fields
return json.dumps(log_entry, default=str)
class StructuredFormatter(logging.Formatter):
"""Structured formatter for syslog"""
def format(self, record):
structured_data = []
# Add Django-specific structured data
if hasattr(record, 'request'):
request = record.request
structured_data.append(
f'[request@django method="{request.method}" '
f'path="{request.path}" user="{getattr(request, "user", "anonymous")}"]'
)
if hasattr(record, 'response_time'):
structured_data.append(
f'[performance@django response_time="{record.response_time}"]'
)
structured_part = ''.join(structured_data)
base_message = super().format(record)
return f'{base_message} {structured_part}'
# logging/filters.py
import re
class SensitiveDataFilter(logging.Filter):
"""Filter sensitive data from log records"""
SENSITIVE_PATTERNS = [
(re.compile(r'password["\']?\s*[:=]\s*["\']?([^"\'&\s]+)', re.IGNORECASE), 'password'),
(re.compile(r'token["\']?\s*[:=]\s*["\']?([^"\'&\s]+)', re.IGNORECASE), 'token'),
(re.compile(r'key["\']?\s*[:=]\s*["\']?([^"\'&\s]+)', re.IGNORECASE), 'key'),
(re.compile(r'secret["\']?\s*[:=]\s*["\']?([^"\'&\s]+)', re.IGNORECASE), 'secret'),
(re.compile(r'authorization:\s*([^\s]+)', re.IGNORECASE), 'authorization'),
(re.compile(r'(\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4})'), 'credit_card'),
(re.compile(r'(\d{3}-\d{2}-\d{4})'), 'ssn'),
]
def filter(self, record):
"""Filter sensitive data from log record"""
if hasattr(record, 'msg'):
message = str(record.msg)
for pattern, replacement in self.SENSITIVE_PATTERNS:
message = pattern.sub(f'[FILTERED_{replacement.upper()}]', message)
record.msg = message
# Filter extra fields
for key, value in record.__dict__.items():
if isinstance(value, str):
for pattern, replacement in self.SENSITIVE_PATTERNS:
if pattern.search(value):
setattr(record, key, f'[FILTERED_{replacement.upper()}]')
return True
# logging/handlers.py
import json
import requests
from logging import Handler
from elasticsearch import Elasticsearch
class ElasticsearchHandler(Handler):
"""Custom handler for Elasticsearch logging"""
def __init__(self, index='django-logs', doc_type='log'):
super().__init__()
self.es = Elasticsearch([
{'host': 'localhost', 'port': 9200}
])
self.index = index
self.doc_type = doc_type
def emit(self, record):
"""Emit log record to Elasticsearch"""
try:
log_entry = json.loads(self.format(record))
self.es.index(
index=f"{self.index}-{record.created:%Y.%m.%d}",
doc_type=self.doc_type,
body=log_entry
)
except Exception:
self.handleError(record)
class SlackHandler(Handler):
"""Custom handler for Slack notifications"""
def __init__(self, webhook_url, channel='#alerts'):
super().__init__()
self.webhook_url = webhook_url
self.channel = channel
def emit(self, record):
"""Send log record to Slack"""
if record.levelno < logging.ERROR:
return
try:
message = {
'channel': self.channel,
'username': 'Django Logger',
'text': f'*{record.levelname}*: {record.getMessage()}',
'attachments': [
{
'color': 'danger' if record.levelno >= logging.ERROR else 'warning',
'fields': [
{'title': 'Module', 'value': record.module, 'short': True},
{'title': 'Function', 'value': record.funcName, 'short': True},
{'title': 'Line', 'value': str(record.lineno), 'short': True},
{'title': 'Time', 'value': record.asctime, 'short': True},
]
}
]
}
if record.exc_info:
message['attachments'][0]['fields'].append({
'title': 'Exception',
'value': f'```{self.format(record)}```',
'short': False
})
requests.post(self.webhook_url, json=message, timeout=5)
except Exception:
self.handleError(record)
# logging/business.py
import logging
from functools import wraps
from django.utils import timezone
# Business logic logger
business_logger = logging.getLogger('myproject.business')
def log_business_event(event_type, **kwargs):
"""Log business events with structured data"""
business_logger.info(
f"Business event: {event_type}",
extra={
'event_type': event_type,
'timestamp': timezone.now().isoformat(),
**kwargs
}
)
def log_user_action(action, user, **kwargs):
"""Log user actions"""
business_logger.info(
f"User action: {action}",
extra={
'action': action,
'user_id': user.id if user.is_authenticated else None,
'username': user.username if user.is_authenticated else 'anonymous',
'timestamp': timezone.now().isoformat(),
**kwargs
}
)
def log_performance_issue(operation, duration, threshold=1.0, **kwargs):
"""Log performance issues"""
if duration > threshold:
performance_logger = logging.getLogger('myproject.performance')
performance_logger.warning(
f"Slow operation: {operation}",
extra={
'operation': operation,
'duration': duration,
'threshold': threshold,
'timestamp': timezone.now().isoformat(),
**kwargs
}
)
# Decorators for automatic logging
def log_function_call(logger=None, level=logging.INFO):
"""Decorator to log function calls"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
func_logger = logger or logging.getLogger(func.__module__)
func_logger.log(
level,
f"Calling {func.__name__}",
extra={
'function': func.__name__,
'module': func.__module__,
'args_count': len(args),
'kwargs_keys': list(kwargs.keys()),
}
)
try:
result = func(*args, **kwargs)
func_logger.log(
level,
f"Completed {func.__name__}",
extra={
'function': func.__name__,
'module': func.__module__,
'success': True,
}
)
return result
except Exception as e:
func_logger.error(
f"Error in {func.__name__}: {str(e)}",
extra={
'function': func.__name__,
'module': func.__module__,
'error': str(e),
'error_type': type(e).__name__,
},
exc_info=True
)
raise
return wrapper
return decorator
def log_model_changes(model_class):
"""Decorator to log model changes"""
def decorator(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
if hasattr(self, 'pk') and self.pk:
# Update operation
old_values = {}
for field in self._meta.fields:
old_values[field.name] = getattr(self, field.name)
result = func(self, *args, **kwargs)
# Log changes
changes = {}
for field in self._meta.fields:
old_value = old_values.get(field.name)
new_value = getattr(self, field.name)
if old_value != new_value:
changes[field.name] = {
'old': old_value,
'new': new_value
}
if changes:
business_logger.info(
f"Model updated: {model_class.__name__}",
extra={
'model': model_class.__name__,
'pk': self.pk,
'changes': changes,
'operation': 'update',
}
)
else:
# Create operation
result = func(self, *args, **kwargs)
business_logger.info(
f"Model created: {model_class.__name__}",
extra={
'model': model_class.__name__,
'pk': self.pk,
'operation': 'create',
}
)
return result
return wrapper
return decorator
# Usage examples
@log_function_call()
def process_payment(user, amount):
"""Process payment with automatic logging"""
log_business_event(
'payment_initiated',
user_id=user.id,
amount=amount,
currency='USD'
)
# Payment processing logic here
log_business_event(
'payment_completed',
user_id=user.id,
amount=amount,
currency='USD'
)
class Order(models.Model):
# Model fields here
@log_model_changes(model_class=lambda: Order)
def save(self, *args, **kwargs):
super().save(*args, **kwargs)
# monitoring/alerts.py
import smtplib
import requests
import logging
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from django.conf import settings
from django.utils import timezone
from .performance import performance_monitor
class AlertManager:
"""Manage alerts for various system conditions"""
def __init__(self):
self.alert_channels = {
'email': self.send_email_alert,
'slack': self.send_slack_alert,
'webhook': self.send_webhook_alert,
}
self.alert_history = {}
self.cooldown_period = 300 # 5 minutes
def check_system_health(self):
"""Check system health and send alerts if needed"""
metrics = performance_monitor.get_performance_summary()
alerts = []
# Check various conditions
if metrics['system']['cpu_percent'] > 90:
alerts.append({
'level': 'critical',
'message': f"High CPU usage: {metrics['system']['cpu_percent']:.1f}%",
'metric': 'cpu_usage',
'value': metrics['system']['cpu_percent'],
})
if metrics['system']['memory_percent'] > 90:
alerts.append({
'level': 'critical',
'message': f"High memory usage: {metrics['system']['memory_percent']:.1f}%",
'metric': 'memory_usage',
'value': metrics['system']['memory_percent'],
})
if metrics['requests']['error_rate'] > 0.05: # 5% error rate
alerts.append({
'level': 'warning',
'message': f"High error rate: {metrics['requests']['error_rate']:.2%}",
'metric': 'error_rate',
'value': metrics['requests']['error_rate'],
})
if metrics['requests']['avg_response_time'] > 2.0: # 2 seconds
alerts.append({
'level': 'warning',
'message': f"Slow response time: {metrics['requests']['avg_response_time']:.2f}s",
'metric': 'response_time',
'value': metrics['requests']['avg_response_time'],
})
# Send alerts
for alert in alerts:
self.send_alert(alert)
def send_alert(self, alert):
"""Send alert through configured channels"""
alert_key = f"{alert['metric']}_{alert['level']}"
current_time = timezone.now().timestamp()
# Check cooldown period
if alert_key in self.alert_history:
last_sent = self.alert_history[alert_key]
if current_time - last_sent < self.cooldown_period:
return # Skip alert due to cooldown
# Send through all configured channels
for channel in settings.ALERT_CHANNELS:
try:
self.alert_channels[channel](alert)
except Exception as e:
logging.error(f"Failed to send alert via {channel}: {str(e)}")
# Update alert history
self.alert_history[alert_key] = current_time
def send_email_alert(self, alert):
"""Send email alert"""
msg = MIMEMultipart()
msg['From'] = settings.ALERT_EMAIL_FROM
msg['To'] = ', '.join(settings.ALERT_EMAIL_TO)
msg['Subject'] = f"[{alert['level'].upper()}] Django App Alert"
body = f"""
Alert Level: {alert['level'].upper()}
Message: {alert['message']}
Metric: {alert['metric']}
Value: {alert['value']}
Time: {timezone.now().isoformat()}
Please check the application immediately.
"""
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP(settings.EMAIL_HOST, settings.EMAIL_PORT)
if settings.EMAIL_USE_TLS:
server.starttls()
server.login(settings.EMAIL_HOST_USER, settings.EMAIL_HOST_PASSWORD)
server.send_message(msg)
server.quit()
def send_slack_alert(self, alert):
"""Send Slack alert"""
color = 'danger' if alert['level'] == 'critical' else 'warning'
payload = {
'channel': settings.SLACK_ALERT_CHANNEL,
'username': 'Django Monitor',
'text': f"*{alert['level'].upper()} Alert*",
'attachments': [
{
'color': color,
'fields': [
{'title': 'Message', 'value': alert['message'], 'short': False},
{'title': 'Metric', 'value': alert['metric'], 'short': True},
{'title': 'Value', 'value': str(alert['value']), 'short': True},
{'title': 'Time', 'value': timezone.now().isoformat(), 'short': True},
]
}
]
}
requests.post(settings.SLACK_WEBHOOK_URL, json=payload, timeout=10)
def send_webhook_alert(self, alert):
"""Send webhook alert"""
payload = {
'alert': alert,
'timestamp': timezone.now().isoformat(),
'service': 'django-app',
}
requests.post(settings.ALERT_WEBHOOK_URL, json=payload, timeout=10)
# Initialize alert manager
alert_manager = AlertManager()
# Celery task for periodic health checks
from celery import shared_task
@shared_task
def check_system_health():
"""Periodic system health check"""
alert_manager.check_system_health()
This comprehensive monitoring and logging guide provides all the tools needed to maintain visibility into Django application performance, errors, and business metrics in production environments.
Scaling and Load Balancing
Scaling Django applications requires strategic planning for handling increased traffic, data growth, and user demands. This chapter covers horizontal and vertical scaling strategies, load balancing configurations, auto-scaling implementations, and performance optimization techniques for high-traffic Django applications.
Backup Strategies
Comprehensive backup strategies are critical for Django applications to ensure data protection, disaster recovery, and business continuity. This chapter covers database backups, file system backups, automated backup procedures, disaster recovery planning, and backup testing strategies.