Database instrumentation provides visibility into your application's database performance, helping you identify bottlenecks, monitor query patterns, and optimize database operations. Understanding how to implement comprehensive monitoring enables proactive performance management.
# settings.py - Enhanced logging configuration
import logging.config
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'verbose': {
'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
'style': '{',
},
'query': {
'format': '[{asctime}] {name} {levelname} - Duration: {duration}ms - {sql}',
'style': '{',
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'verbose',
},
'query_file': {
'class': 'logging.handlers.RotatingFileHandler',
'filename': 'logs/queries.log',
'maxBytes': 1024*1024*50, # 50MB
'backupCount': 5,
'formatter': 'query',
},
'slow_query_file': {
'class': 'logging.handlers.RotatingFileHandler',
'filename': 'logs/slow_queries.log',
'maxBytes': 1024*1024*10, # 10MB
'backupCount': 3,
'formatter': 'query',
},
},
'loggers': {
'django.db.backends': {
'handlers': ['query_file'],
'level': 'DEBUG',
'propagate': False,
},
'slow_queries': {
'handlers': ['slow_query_file'],
'level': 'WARNING',
'propagate': False,
},
},
}
# Custom database instrumentation
import time
import logging
from django.db import connection
from django.conf import settings
class DatabaseInstrumentation:
"""Comprehensive database instrumentation"""
def __init__(self):
self.query_logger = logging.getLogger('django.db.backends')
self.slow_query_logger = logging.getLogger('slow_queries')
self.slow_query_threshold = getattr(settings, 'SLOW_QUERY_THRESHOLD', 0.1) # 100ms
def log_query(self, query, duration, params=None):
"""Log query with performance metrics"""
log_data = {
'sql': query,
'duration': f"{duration*1000:.2f}",
'params': params,
}
if duration > self.slow_query_threshold:
self.slow_query_logger.warning(
f"Slow query detected - Duration: {duration*1000:.2f}ms - {query[:100]}...",
extra=log_data
)
self.query_logger.debug(
f"Query executed - Duration: {duration*1000:.2f}ms",
extra=log_data
)
def analyze_query_patterns(self):
"""Analyze query patterns from logs"""
query_patterns = {}
for query_data in connection.queries:
sql = query_data['sql']
duration = float(query_data['time'])
# Extract query pattern (remove specific values)
pattern = self.extract_query_pattern(sql)
if pattern not in query_patterns:
query_patterns[pattern] = {
'count': 0,
'total_time': 0,
'max_time': 0,
'min_time': float('inf'),
}
stats = query_patterns[pattern]
stats['count'] += 1
stats['total_time'] += duration
stats['max_time'] = max(stats['max_time'], duration)
stats['min_time'] = min(stats['min_time'], duration)
# Calculate averages and identify problematic patterns
for pattern, stats in query_patterns.items():
stats['avg_time'] = stats['total_time'] / stats['count']
stats['is_problematic'] = (
stats['count'] > 10 and stats['avg_time'] > self.slow_query_threshold
)
return query_patterns
def extract_query_pattern(self, sql):
"""Extract query pattern by removing specific values"""
import re
# Replace string literals
pattern = re.sub(r"'[^']*'", "'?'", sql)
# Replace numeric literals
pattern = re.sub(r'\b\d+\b', '?', pattern)
# Replace IN clauses with multiple values
pattern = re.sub(r'IN \([^)]+\)', 'IN (?)', pattern)
# Normalize whitespace
pattern = ' '.join(pattern.split())
return pattern
# Query performance middleware
class QueryPerformanceMiddleware:
"""Middleware to track query performance per request"""
def __init__(self, get_response):
self.get_response = get_response
self.instrumentation = DatabaseInstrumentation()
def __call__(self, request):
# Reset query log
connection.queries_log.clear()
start_time = time.time()
response = self.get_response(request)
end_time = time.time()
# Analyze request performance
self.analyze_request_performance(request, start_time, end_time)
return response
def analyze_request_performance(self, request, start_time, end_time):
"""Analyze performance metrics for the request"""
total_queries = len(connection.queries)
total_query_time = sum(float(q['time']) for q in connection.queries)
request_time = end_time - start_time
# Log performance metrics
performance_data = {
'path': request.path,
'method': request.method,
'query_count': total_queries,
'query_time': f"{total_query_time*1000:.2f}ms",
'request_time': f"{request_time*1000:.2f}ms",
'query_percentage': f"{(total_query_time/request_time)*100:.1f}%",
}
# Detect N+1 queries
n_plus_one_patterns = self.detect_n_plus_one()
if n_plus_one_patterns:
performance_data['n_plus_one_detected'] = n_plus_one_patterns
# Log if performance is concerning
if total_queries > 20 or total_query_time > 0.5 or request_time > 2.0:
logging.getLogger('performance').warning(
f"Performance concern for {request.path}",
extra=performance_data
)
# Store metrics for monitoring
self.store_performance_metrics(performance_data)
def detect_n_plus_one(self):
"""Detect potential N+1 query patterns"""
query_patterns = {}
for query_data in connection.queries:
pattern = self.instrumentation.extract_query_pattern(query_data['sql'])
query_patterns[pattern] = query_patterns.get(pattern, 0) + 1
# Return patterns that appear more than 5 times
return {
pattern: count
for pattern, count in query_patterns.items()
if count > 5
}
def store_performance_metrics(self, metrics):
"""Store performance metrics for analysis"""
# Store in cache for real-time monitoring
from django.core.cache import cache
cache_key = f"perf_metrics_{int(time.time())}"
cache.set(cache_key, metrics, timeout=3600)
# Store in database for historical analysis
try:
from .models import PerformanceMetric
PerformanceMetric.objects.create(**metrics)
except Exception:
pass # Don't fail request if metrics storage fails
from django.db import connection
from django.core.management.base import BaseCommand
import time
import threading
class QueryMonitor:
"""Real-time query monitoring system"""
def __init__(self):
self.active_queries = {}
self.query_stats = {
'total_queries': 0,
'slow_queries': 0,
'total_time': 0,
'avg_time': 0,
}
self.monitoring = False
def start_monitoring(self):
"""Start real-time query monitoring"""
self.monitoring = True
monitor_thread = threading.Thread(target=self._monitor_loop)
monitor_thread.daemon = True
monitor_thread.start()
def stop_monitoring(self):
"""Stop query monitoring"""
self.monitoring = False
def _monitor_loop(self):
"""Main monitoring loop"""
last_query_count = 0
while self.monitoring:
current_queries = connection.queries
new_queries = current_queries[last_query_count:]
for query_data in new_queries:
self._process_query(query_data)
last_query_count = len(current_queries)
time.sleep(0.1) # Check every 100ms
def _process_query(self, query_data):
"""Process individual query for monitoring"""
duration = float(query_data['time'])
sql = query_data['sql']
# Update statistics
self.query_stats['total_queries'] += 1
self.query_stats['total_time'] += duration
self.query_stats['avg_time'] = (
self.query_stats['total_time'] / self.query_stats['total_queries']
)
# Check for slow queries
if duration > 0.1: # 100ms threshold
self.query_stats['slow_queries'] += 1
self._handle_slow_query(sql, duration)
# Check for problematic patterns
self._check_query_patterns(sql, duration)
def _handle_slow_query(self, sql, duration):
"""Handle slow query detection"""
slow_query_data = {
'sql': sql[:200] + '...' if len(sql) > 200 else sql,
'duration': f"{duration*1000:.2f}ms",
'timestamp': time.time(),
}
# Log slow query
logging.getLogger('slow_queries').warning(
f"Slow query detected: {duration*1000:.2f}ms",
extra=slow_query_data
)
# Send alert if configured
self._send_slow_query_alert(slow_query_data)
def _check_query_patterns(self, sql, duration):
"""Check for problematic query patterns"""
# Check for SELECT N+1 patterns
if 'SELECT' in sql.upper() and 'WHERE' in sql.upper():
pattern = self._extract_pattern(sql)
if pattern in self.active_queries:
self.active_queries[pattern]['count'] += 1
# Alert if pattern repeats too frequently
if self.active_queries[pattern]['count'] > 10:
self._handle_n_plus_one_detection(pattern)
else:
self.active_queries[pattern] = {
'count': 1,
'first_seen': time.time(),
}
def _extract_pattern(self, sql):
"""Extract query pattern for N+1 detection"""
import re
# Simplified pattern extraction
pattern = re.sub(r'\d+', '?', sql)
pattern = re.sub(r"'[^']*'", "'?'", pattern)
return pattern[:100] # Limit pattern length
def _handle_n_plus_one_detection(self, pattern):
"""Handle N+1 query detection"""
logging.getLogger('performance').error(
f"Potential N+1 query detected: {pattern}",
extra={'pattern': pattern, 'count': self.active_queries[pattern]['count']}
)
def _send_slow_query_alert(self, query_data):
"""Send alert for slow queries"""
# Implementation depends on your alerting system
# Could send email, Slack message, etc.
pass
def get_current_stats(self):
"""Get current monitoring statistics"""
return self.query_stats.copy()
# Management command for query monitoring
class Command(BaseCommand):
"""Management command to monitor database queries"""
help = 'Monitor database queries in real-time'
def add_arguments(self, parser):
parser.add_argument('--duration', type=int, default=60, help='Monitoring duration in seconds')
parser.add_argument('--threshold', type=float, default=0.1, help='Slow query threshold in seconds')
def handle(self, *args, **options):
monitor = QueryMonitor()
monitor.slow_query_threshold = options['threshold']
self.stdout.write('Starting query monitoring...')
monitor.start_monitoring()
try:
# Monitor for specified duration
for i in range(options['duration']):
time.sleep(1)
if i % 10 == 0: # Report every 10 seconds
stats = monitor.get_current_stats()
self.stdout.write(
f"Queries: {stats['total_queries']}, "
f"Slow: {stats['slow_queries']}, "
f"Avg: {stats['avg_time']*1000:.2f}ms"
)
except KeyboardInterrupt:
self.stdout.write('Monitoring interrupted by user')
finally:
monitor.stop_monitoring()
final_stats = monitor.get_current_stats()
self.stdout.write('\n=== Final Statistics ===')
self.stdout.write(f"Total queries: {final_stats['total_queries']}")
self.stdout.write(f"Slow queries: {final_stats['slow_queries']}")
self.stdout.write(f"Average time: {final_stats['avg_time']*1000:.2f}ms")
self.stdout.write(f"Total time: {final_stats['total_time']:.2f}s")
# models.py - Performance metrics storage
from django.db import models
from django.contrib.auth.models import User
class PerformanceMetric(models.Model):
"""Store performance metrics for analysis"""
timestamp = models.DateTimeField(auto_now_add=True)
path = models.CharField(max_length=255)
method = models.CharField(max_length=10)
# Query metrics
query_count = models.PositiveIntegerField()
query_time = models.FloatField() # in seconds
# Request metrics
request_time = models.FloatField() # in seconds
response_size = models.PositiveIntegerField(null=True, blank=True)
status_code = models.PositiveIntegerField()
# User context
user = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True)
user_agent = models.TextField(blank=True)
ip_address = models.GenericIPAddressField(null=True, blank=True)
# Additional metadata
metadata = models.JSONField(default=dict, blank=True)
class Meta:
indexes = [
models.Index(fields=['timestamp', 'path']),
models.Index(fields=['query_count']),
models.Index(fields=['query_time']),
models.Index(fields=['request_time']),
]
class SlowQuery(models.Model):
"""Store slow query information"""
timestamp = models.DateTimeField(auto_now_add=True)
sql = models.TextField()
duration = models.FloatField() # in seconds
# Query context
path = models.CharField(max_length=255, blank=True)
user = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True)
# Query analysis
query_pattern = models.TextField()
table_names = models.JSONField(default=list)
operation_type = models.CharField(max_length=20) # SELECT, INSERT, UPDATE, DELETE
# Performance impact
rows_examined = models.PositiveIntegerField(null=True, blank=True)
rows_returned = models.PositiveIntegerField(null=True, blank=True)
class Meta:
indexes = [
models.Index(fields=['timestamp', 'duration']),
models.Index(fields=['query_pattern']),
models.Index(fields=['operation_type']),
]
class DatabaseMetrics:
"""Collect and analyze database performance metrics"""
@staticmethod
def collect_connection_metrics():
"""Collect database connection metrics"""
metrics = {}
# PostgreSQL specific metrics
if connection.vendor == 'postgresql':
with connection.cursor() as cursor:
# Connection statistics
cursor.execute("""
SELECT
numbackends as active_connections,
xact_commit as transactions_committed,
xact_rollback as transactions_rolled_back,
blks_read as blocks_read,
blks_hit as blocks_hit,
tup_returned as tuples_returned,
tup_fetched as tuples_fetched,
tup_inserted as tuples_inserted,
tup_updated as tuples_updated,
tup_deleted as tuples_deleted
FROM pg_stat_database
WHERE datname = current_database()
""")
row = cursor.fetchone()
if row:
columns = [desc[0] for desc in cursor.description]
metrics.update(dict(zip(columns, row)))
# Lock statistics
cursor.execute("""
SELECT
mode,
COUNT(*) as lock_count
FROM pg_locks
WHERE database = (SELECT oid FROM pg_database WHERE datname = current_database())
GROUP BY mode
""")
lock_stats = {}
for mode, count in cursor.fetchall():
lock_stats[f'locks_{mode.lower()}'] = count
metrics.update(lock_stats)
return metrics
@staticmethod
def collect_table_metrics():
"""Collect table-level performance metrics"""
table_metrics = {}
if connection.vendor == 'postgresql':
with connection.cursor() as cursor:
cursor.execute("""
SELECT
schemaname,
relname as table_name,
seq_scan,
seq_tup_read,
idx_scan,
idx_tup_fetch,
n_tup_ins as inserts,
n_tup_upd as updates,
n_tup_del as deletes,
n_live_tup as live_tuples,
n_dead_tup as dead_tuples,
last_vacuum,
last_autovacuum,
last_analyze,
last_autoanalyze
FROM pg_stat_user_tables
ORDER BY seq_tup_read + idx_tup_fetch DESC
""")
columns = [desc[0] for desc in cursor.description]
for row in cursor.fetchall():
table_data = dict(zip(columns, row))
table_name = table_data['table_name']
table_metrics[table_name] = table_data
return table_metrics
@staticmethod
def collect_index_metrics():
"""Collect index usage metrics"""
index_metrics = {}
if connection.vendor == 'postgresql':
with connection.cursor() as cursor:
cursor.execute("""
SELECT
schemaname,
relname as table_name,
indexrelname as index_name,
idx_scan as scans,
idx_tup_read as tuples_read,
idx_tup_fetch as tuples_fetched,
pg_size_pretty(pg_relation_size(indexrelid)) as size
FROM pg_stat_user_indexes
ORDER BY idx_scan DESC
""")
columns = [desc[0] for desc in cursor.description]
for row in cursor.fetchall():
index_data = dict(zip(columns, row))
index_name = index_data['index_name']
index_metrics[index_name] = index_data
return index_metrics
@staticmethod
def analyze_query_performance():
"""Analyze query performance patterns"""
analysis = {
'slow_queries': [],
'frequent_patterns': {},
'table_access_patterns': {},
'recommendations': []
}
# Analyze slow queries from the last hour
from datetime import timedelta
from django.utils import timezone
recent_slow_queries = SlowQuery.objects.filter(
timestamp__gte=timezone.now() - timedelta(hours=1)
).order_by('-duration')[:20]
for slow_query in recent_slow_queries:
analysis['slow_queries'].append({
'sql': slow_query.sql[:200] + '...',
'duration': slow_query.duration,
'pattern': slow_query.query_pattern,
'timestamp': slow_query.timestamp,
})
# Analyze query patterns
pattern_stats = SlowQuery.objects.filter(
timestamp__gte=timezone.now() - timedelta(hours=24)
).values('query_pattern').annotate(
count=models.Count('id'),
avg_duration=models.Avg('duration'),
max_duration=models.Max('duration')
).order_by('-count')[:10]
for pattern_stat in pattern_stats:
analysis['frequent_patterns'][pattern_stat['query_pattern']] = {
'count': pattern_stat['count'],
'avg_duration': pattern_stat['avg_duration'],
'max_duration': pattern_stat['max_duration'],
}
# Generate recommendations
analysis['recommendations'] = DatabaseMetrics.generate_recommendations(analysis)
return analysis
@staticmethod
def generate_recommendations(analysis):
"""Generate performance recommendations"""
recommendations = []
# Check for frequent slow queries
for pattern, stats in analysis['frequent_patterns'].items():
if stats['count'] > 10 and stats['avg_duration'] > 0.1:
recommendations.append({
'type': 'slow_query_pattern',
'severity': 'high',
'message': f"Query pattern executed {stats['count']} times with average duration {stats['avg_duration']:.3f}s",
'pattern': pattern,
'suggestion': 'Consider adding indexes or optimizing the query'
})
# Check for N+1 query patterns
select_patterns = [p for p in analysis['frequent_patterns'].keys() if 'SELECT' in p.upper()]
for pattern in select_patterns:
stats = analysis['frequent_patterns'][pattern]
if stats['count'] > 20:
recommendations.append({
'type': 'n_plus_one',
'severity': 'medium',
'message': f"Potential N+1 query pattern detected ({stats['count']} executions)",
'pattern': pattern,
'suggestion': 'Use select_related() or prefetch_related() to optimize'
})
return recommendations
# Automated performance monitoring
class PerformanceMonitor:
"""Automated performance monitoring and alerting"""
def __init__(self):
self.thresholds = {
'slow_query_duration': 0.5, # 500ms
'high_query_count': 50, # 50 queries per request
'long_request_time': 5.0, # 5 seconds
}
def monitor_performance(self):
"""Run performance monitoring checks"""
alerts = []
# Check recent performance metrics
recent_metrics = PerformanceMetric.objects.filter(
timestamp__gte=timezone.now() - timedelta(minutes=5)
)
# Check for slow requests
slow_requests = recent_metrics.filter(
request_time__gte=self.thresholds['long_request_time']
)
if slow_requests.exists():
alerts.append({
'type': 'slow_requests',
'count': slow_requests.count(),
'message': f'{slow_requests.count()} slow requests detected in last 5 minutes'
})
# Check for high query counts
high_query_requests = recent_metrics.filter(
query_count__gte=self.thresholds['high_query_count']
)
if high_query_requests.exists():
alerts.append({
'type': 'high_query_count',
'count': high_query_requests.count(),
'message': f'{high_query_requests.count()} requests with high query counts detected'
})
# Check database connection health
db_metrics = DatabaseMetrics.collect_connection_metrics()
if db_metrics.get('active_connections', 0) > 80: # Assuming max 100 connections
alerts.append({
'type': 'high_connections',
'count': db_metrics['active_connections'],
'message': f'High number of active connections: {db_metrics["active_connections"]}'
})
# Send alerts if any issues detected
if alerts:
self.send_alerts(alerts)
return alerts
def send_alerts(self, alerts):
"""Send performance alerts"""
# Implementation depends on your alerting system
for alert in alerts:
logging.getLogger('performance_alerts').error(
alert['message'],
extra=alert
)
def cleanup_old_metrics(self, days=30):
"""Clean up old performance metrics"""
cutoff_date = timezone.now() - timedelta(days=days)
# Clean up old performance metrics
deleted_metrics = PerformanceMetric.objects.filter(
timestamp__lt=cutoff_date
).delete()[0]
# Clean up old slow queries
deleted_queries = SlowQuery.objects.filter(
timestamp__lt=cutoff_date
).delete()[0]
return {
'deleted_metrics': deleted_metrics,
'deleted_queries': deleted_queries
}
# Management command for performance monitoring
class Command(BaseCommand):
"""Performance monitoring management command"""
help = 'Run performance monitoring and analysis'
def add_arguments(self, parser):
parser.add_argument('--analyze', action='store_true', help='Run performance analysis')
parser.add_argument('--monitor', action='store_true', help='Run monitoring checks')
parser.add_argument('--cleanup', action='store_true', help='Clean up old metrics')
parser.add_argument('--report', action='store_true', help='Generate performance report')
def handle(self, *args, **options):
monitor = PerformanceMonitor()
if options['analyze']:
self.stdout.write('Running performance analysis...')
analysis = DatabaseMetrics.analyze_query_performance()
self.stdout.write(f"Slow queries found: {len(analysis['slow_queries'])}")
self.stdout.write(f"Query patterns analyzed: {len(analysis['frequent_patterns'])}")
self.stdout.write(f"Recommendations: {len(analysis['recommendations'])}")
for rec in analysis['recommendations']:
self.stdout.write(f" {rec['severity'].upper()}: {rec['message']}")
if options['monitor']:
self.stdout.write('Running monitoring checks...')
alerts = monitor.monitor_performance()
if alerts:
self.stdout.write(f"Alerts generated: {len(alerts)}")
for alert in alerts:
self.stdout.write(f" {alert['type']}: {alert['message']}")
else:
self.stdout.write('No performance issues detected')
if options['cleanup']:
self.stdout.write('Cleaning up old metrics...')
cleanup_result = monitor.cleanup_old_metrics()
self.stdout.write(f"Deleted {cleanup_result['deleted_metrics']} metrics")
self.stdout.write(f"Deleted {cleanup_result['deleted_queries']} slow queries")
if options['report']:
self.stdout.write('Generating performance report...')
self.generate_performance_report()
def generate_performance_report(self):
"""Generate comprehensive performance report"""
# Collect metrics
db_metrics = DatabaseMetrics.collect_connection_metrics()
table_metrics = DatabaseMetrics.collect_table_metrics()
index_metrics = DatabaseMetrics.collect_index_metrics()
self.stdout.write('\n=== Database Performance Report ===')
# Connection metrics
self.stdout.write(f"\nConnection Metrics:")
self.stdout.write(f" Active connections: {db_metrics.get('active_connections', 'N/A')}")
self.stdout.write(f" Transactions committed: {db_metrics.get('transactions_committed', 'N/A')}")
self.stdout.write(f" Transactions rolled back: {db_metrics.get('transactions_rolled_back', 'N/A')}")
# Top tables by activity
self.stdout.write(f"\nTop 5 Most Active Tables:")
sorted_tables = sorted(
table_metrics.items(),
key=lambda x: (x[1]['seq_tup_read'] or 0) + (x[1]['idx_tup_fetch'] or 0),
reverse=True
)[:5]
for table_name, metrics in sorted_tables:
total_reads = (metrics['seq_tup_read'] or 0) + (metrics['idx_tup_fetch'] or 0)
self.stdout.write(f" {table_name}: {total_reads:,} tuples read")
# Index usage
self.stdout.write(f"\nTop 5 Most Used Indexes:")
sorted_indexes = sorted(
index_metrics.items(),
key=lambda x: x[1]['scans'] or 0,
reverse=True
)[:5]
for index_name, metrics in sorted_indexes:
self.stdout.write(f" {index_name}: {metrics['scans']:,} scans")
Database instrumentation provides the visibility needed to maintain optimal performance as your application scales. By implementing comprehensive monitoring, logging, and alerting systems, you can proactively identify and resolve performance issues before they impact users.
Composite Primary Keys
Composite primary keys (also called compound keys) consist of multiple columns that together uniquely identify a record. While Django traditionally uses single-column auto-incrementing primary keys, Django 4.2+ provides experimental support for composite primary keys.
Database Optimization
Database optimization is crucial for building scalable Django applications. Understanding how to identify performance bottlenecks, optimize queries, and implement efficient database patterns ensures your application can handle growth and maintain responsiveness.