Django supports using multiple databases in a single application, enabling you to separate different types of data, implement read/write splitting, or integrate with legacy systems. Understanding how to configure and use multiple databases effectively is crucial for scalable applications.
# settings.py
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'main_app_db',
'USER': 'postgres',
'PASSWORD': 'password',
'HOST': 'localhost',
'PORT': '5432',
'OPTIONS': {
'init_command': "SET sql_mode='STRICT_TRANS_TABLES'",
},
},
'users_db': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'users_database',
'USER': 'postgres',
'PASSWORD': 'password',
'HOST': 'users-db-server',
'PORT': '5432',
},
'analytics_db': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'analytics_database',
'USER': 'analytics_user',
'PASSWORD': 'analytics_password',
'HOST': 'analytics-server',
'PORT': '5432',
'OPTIONS': {
'init_command': "SET default_transaction_isolation TO 'read committed'",
},
},
'legacy_db': {
'ENGINE': 'django.db.backends.mysql',
'NAME': 'legacy_system',
'USER': 'legacy_user',
'PASSWORD': 'legacy_password',
'HOST': 'legacy-server',
'PORT': '3306',
'OPTIONS': {
'charset': 'utf8mb4',
},
},
'cache_db': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': BASE_DIR / 'cache.sqlite3',
},
'read_replica': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'main_app_db',
'USER': 'readonly_user',
'PASSWORD': 'readonly_password',
'HOST': 'read-replica-server',
'PORT': '5432',
'OPTIONS': {
'default_transaction_isolation': 'read committed',
},
}
}
# Database routing configuration
DATABASE_ROUTERS = [
'myapp.routers.DatabaseRouter',
'myapp.routers.AnalyticsRouter',
'myapp.routers.LegacyRouter',
]
# Connection pooling (if using django-db-pool)
DATABASES['default']['ENGINE'] = 'django_db_pool.backends.postgresql'
DATABASES['default']['POOL_OPTIONS'] = {
'POOL_SIZE': 10,
'MAX_OVERFLOW': 10,
'RECYCLE': -1,
}
# models.py
from django.db import models
class User(models.Model):
"""User model stored in users database"""
username = models.CharField(max_length=150, unique=True)
email = models.EmailField(unique=True)
first_name = models.CharField(max_length=30)
last_name = models.CharField(max_length=30)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
app_label = 'users'
class Post(models.Model):
"""Post model stored in default database"""
title = models.CharField(max_length=200)
content = models.TextField()
author = models.ForeignKey(User, on_delete=models.CASCADE)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
app_label = 'blog'
class AnalyticsEvent(models.Model):
"""Analytics model stored in analytics database"""
event_type = models.CharField(max_length=50)
user_id = models.IntegerField(null=True, blank=True)
session_id = models.CharField(max_length=100)
timestamp = models.DateTimeField(auto_now_add=True)
data = models.JSONField(default=dict)
class Meta:
app_label = 'analytics'
indexes = [
models.Index(fields=['event_type', 'timestamp']),
models.Index(fields=['user_id', 'timestamp']),
]
class LegacyCustomer(models.Model):
"""Legacy customer model from existing system"""
customer_id = models.AutoField(primary_key=True)
customer_name = models.CharField(max_length=100)
customer_email = models.CharField(max_length=100)
created_date = models.DateTimeField()
class Meta:
app_label = 'legacy'
db_table = 'customers' # Existing table name
managed = False # Don't let Django manage this table
class CacheEntry(models.Model):
"""Cache entries stored in SQLite for fast access"""
key = models.CharField(max_length=255, unique=True)
value = models.TextField()
expires_at = models.DateTimeField()
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
app_label = 'cache'
# routers.py
class DatabaseRouter:
"""
A router to control all database operations on models
"""
route_app_labels = {
'users': 'users_db',
'blog': 'default',
'analytics': 'analytics_db',
'legacy': 'legacy_db',
'cache': 'cache_db',
}
def db_for_read(self, model, **hints):
"""Suggest the database to read from"""
app_label = model._meta.app_label
# Route specific apps to their databases
if app_label in self.route_app_labels:
return self.route_app_labels[app_label]
# Default routing
return None
def db_for_write(self, model, **hints):
"""Suggest the database to write to"""
app_label = model._meta.app_label
# Route specific apps to their databases
if app_label in self.route_app_labels:
return self.route_app_labels[app_label]
# Default routing
return None
def allow_relation(self, obj1, obj2, **hints):
"""Allow relations if models are in the same app or compatible apps"""
db_set = {'default', 'users_db'} # Compatible databases
if obj1._state.db in db_set and obj2._state.db in db_set:
return True
# Allow relations within the same database
return obj1._state.db == obj2._state.db
def allow_migrate(self, db, app_label, model_name=None, **hints):
"""Ensure that certain apps' models get created on the right database"""
# Analytics models only on analytics_db
if app_label == 'analytics':
return db == 'analytics_db'
# Users models only on users_db
if app_label == 'users':
return db == 'users_db'
# Legacy models only on legacy_db
if app_label == 'legacy':
return db == 'legacy_db'
# Cache models only on cache_db
if app_label == 'cache':
return db == 'cache_db'
# Default app models on default database
if app_label in ['blog', 'contenttypes', 'auth', 'sessions', 'admin']:
return db == 'default'
# Don't migrate other apps to this database
return None
class ReadWriteSplitRouter:
"""
Router for read/write splitting
"""
def db_for_read(self, model, **hints):
"""Reading from the read replica database"""
# Use read replica for read operations
if model._meta.app_label in ['blog', 'users']:
return 'read_replica'
return None
def db_for_write(self, model, **hints):
"""Writing to the master database"""
# Always write to master
if model._meta.app_label in ['blog', 'users']:
return 'default'
return None
def allow_relation(self, obj1, obj2, **hints):
"""Relations between objects are OK"""
return True
def allow_migrate(self, db, app_label, model_name=None, **hints):
"""Migrations only on master database"""
return db == 'default'
class AnalyticsRouter:
"""
Dedicated router for analytics data
"""
def db_for_read(self, model, **hints):
if model._meta.app_label == 'analytics':
return 'analytics_db'
return None
def db_for_write(self, model, **hints):
if model._meta.app_label == 'analytics':
return 'analytics_db'
return None
def allow_migrate(self, db, app_label, model_name=None, **hints):
if app_label == 'analytics':
return db == 'analytics_db'
elif db == 'analytics_db':
return False
return None
class TimeBasedRouter:
"""
Route data based on time periods (e.g., monthly databases)
"""
def db_for_read(self, model, **hints):
if model._meta.app_label == 'timeseries':
# Determine database based on date
instance = hints.get('instance')
if instance and hasattr(instance, 'created_at'):
month = instance.created_at.strftime('%Y_%m')
return f'timeseries_{month}'
return None
def db_for_write(self, model, **hints):
if model._meta.app_label == 'timeseries':
from datetime import datetime
current_month = datetime.now().strftime('%Y_%m')
return f'timeseries_{current_month}'
return None
from django.db import connections
from myapp.models import User, Post, AnalyticsEvent
# Explicit database specification
def basic_multi_db_operations():
"""Basic operations across multiple databases"""
# Create user in users database
user = User.objects.using('users_db').create(
username='john_doe',
email='john@example.com',
first_name='John',
last_name='Doe'
)
# Create post in default database
post = Post.objects.using('default').create(
title='My First Post',
content='This is my first blog post.',
author_id=user.id # Reference by ID across databases
)
# Log analytics event
AnalyticsEvent.objects.using('analytics_db').create(
event_type='post_created',
user_id=user.id,
session_id='session_123',
data={'post_id': post.id, 'title': post.title}
)
return user, post
# Querying across databases
def cross_database_queries():
"""Query data from multiple databases"""
# Get users from users database
users = User.objects.using('users_db').filter(
created_at__gte=timezone.now() - timedelta(days=30)
)
# Get posts from default database
posts = Post.objects.using('default').filter(
author_id__in=[user.id for user in users]
)
# Get analytics for these users
analytics = AnalyticsEvent.objects.using('analytics_db').filter(
user_id__in=[user.id for user in users],
event_type='post_view'
)
return {
'users': list(users),
'posts': list(posts),
'analytics': list(analytics)
}
# Raw SQL across databases
def raw_sql_multi_db():
"""Execute raw SQL on specific databases"""
# Get database connections
users_conn = connections['users_db']
analytics_conn = connections['analytics_db']
# Query users database
with users_conn.cursor() as cursor:
cursor.execute("""
SELECT id, username, email, created_at
FROM users_user
WHERE created_at >= %s
""", [timezone.now() - timedelta(days=7)])
recent_users = cursor.fetchall()
# Query analytics database
with analytics_conn.cursor() as cursor:
cursor.execute("""
SELECT user_id, COUNT(*) as event_count
FROM analytics_analyticsevent
WHERE timestamp >= %s
GROUP BY user_id
""", [timezone.now() - timedelta(days=7)])
user_activity = cursor.fetchall()
return recent_users, user_activity
class MultiDatabaseService:
"""Service class for complex multi-database operations"""
@staticmethod
def create_user_with_profile(user_data, profile_data):
"""Create user and profile across databases with error handling"""
from django.db import transaction
try:
# Create user in users database
with transaction.atomic(using='users_db'):
user = User.objects.using('users_db').create(**user_data)
# Create profile in default database
with transaction.atomic(using='default'):
profile = UserProfile.objects.using('default').create(
user_id=user.id, # Reference by ID
**profile_data
)
# Log creation event
with transaction.atomic(using='analytics_db'):
AnalyticsEvent.objects.using('analytics_db').create(
event_type='user_registered',
user_id=user.id,
data={'profile_created': True}
)
return user, profile
except Exception as e:
# Note: No distributed transaction support
# Manual cleanup may be needed
logger.error(f"User creation failed: {e}")
raise
@staticmethod
def get_user_dashboard_data(user_id):
"""Aggregate data from multiple databases for dashboard"""
# Get user info
try:
user = User.objects.using('users_db').get(id=user_id)
except User.DoesNotExist:
return None
# Get user posts
posts = Post.objects.using('default').filter(
author_id=user_id
).order_by('-created_at')[:10]
# Get analytics summary
analytics_summary = AnalyticsEvent.objects.using('analytics_db').filter(
user_id=user_id,
timestamp__gte=timezone.now() - timedelta(days=30)
).values('event_type').annotate(
count=Count('id')
)
# Get cached data
cached_stats = CacheEntry.objects.using('cache_db').filter(
key=f'user_stats_{user_id}'
).first()
return {
'user': user,
'recent_posts': list(posts),
'analytics': list(analytics_summary),
'cached_stats': cached_stats.value if cached_stats else None
}
@staticmethod
def sync_user_data():
"""Synchronize user data between databases"""
# Get users that need syncing
users_to_sync = User.objects.using('users_db').filter(
updated_at__gte=timezone.now() - timedelta(hours=1)
)
for user in users_to_sync:
# Update user cache
cache_data = {
'username': user.username,
'email': user.email,
'full_name': f'{user.first_name} {user.last_name}',
'last_updated': user.updated_at.isoformat()
}
CacheEntry.objects.using('cache_db').update_or_create(
key=f'user_{user.id}',
defaults={
'value': json.dumps(cache_data),
'expires_at': timezone.now() + timedelta(hours=24)
}
)
# Log sync event
AnalyticsEvent.objects.using('analytics_db').create(
event_type='user_synced',
user_id=user.id,
data={'sync_timestamp': timezone.now().isoformat()}
)
# Database-specific managers
class MultiDatabaseManager(models.Manager):
"""Manager that can work with multiple databases"""
def __init__(self, database_alias=None):
super().__init__()
self.database_alias = database_alias
def get_queryset(self):
if self.database_alias:
return super().get_queryset().using(self.database_alias)
return super().get_queryset()
def create(self, **kwargs):
if self.database_alias:
return super().using(self.database_alias).create(**kwargs)
return super().create(**kwargs)
class AnalyticsEvent(models.Model):
event_type = models.CharField(max_length=50)
user_id = models.IntegerField(null=True, blank=True)
timestamp = models.DateTimeField(auto_now_add=True)
data = models.JSONField(default=dict)
# Multiple managers for different databases
objects = models.Manager()
analytics_objects = MultiDatabaseManager('analytics_db')
class Meta:
app_label = 'analytics'
# Usage
# events = AnalyticsEvent.analytics_objects.filter(event_type='login')
# Custom migration operations
from django.db import migrations
class Migration(migrations.Migration):
"""Migration that runs on specific database"""
dependencies = [
('myapp', '0001_initial'),
]
operations = [
migrations.RunSQL(
"CREATE INDEX CONCURRENTLY idx_user_email ON users_user(email);",
reverse_sql="DROP INDEX idx_user_email;",
# This migration only runs on users_db
hints={'target_db': 'users_db'}
),
]
# Custom migration command
from django.core.management.base import BaseCommand
from django.core.management import call_command
class Command(BaseCommand):
"""Migrate specific databases"""
help = 'Run migrations on specific databases'
def add_arguments(self, parser):
parser.add_argument('--database', type=str, help='Database to migrate')
parser.add_argument('--app', type=str, help='App to migrate')
def handle(self, *args, **options):
database = options.get('database')
app = options.get('app')
if database and app:
self.stdout.write(f'Migrating {app} on {database}...')
call_command('migrate', app, database=database, verbosity=2)
else:
# Migrate all databases
databases = ['default', 'users_db', 'analytics_db']
for db in databases:
self.stdout.write(f'Migrating {db}...')
call_command('migrate', database=db, verbosity=1)
# Database-specific migration runner
def run_multi_database_migrations():
"""Run migrations on appropriate databases"""
migration_map = {
'default': ['blog', 'contenttypes', 'auth', 'sessions'],
'users_db': ['users'],
'analytics_db': ['analytics'],
'legacy_db': [], # No migrations for legacy
}
for database, apps in migration_map.items():
for app in apps:
call_command('migrate', app, database=database)
# Connection management
class DatabaseConnectionManager:
"""Manage database connections efficiently"""
@staticmethod
def get_connection_info():
"""Get connection information for all databases"""
connection_info = {}
for alias in connections:
conn = connections[alias]
connection_info[alias] = {
'vendor': conn.vendor,
'is_usable': conn.is_usable(),
'queries_count': len(conn.queries),
'settings': {
'NAME': conn.settings_dict.get('NAME'),
'HOST': conn.settings_dict.get('HOST'),
'PORT': conn.settings_dict.get('PORT'),
}
}
return connection_info
@staticmethod
def close_old_connections():
"""Close old database connections"""
from django.db import close_old_connections
close_old_connections()
@staticmethod
def reset_queries():
"""Reset query logs for all connections"""
for conn in connections.all():
conn.queries_log.clear()
# Read/Write splitting with load balancing
class LoadBalancedRouter:
"""Router with load balancing for read replicas"""
read_replicas = ['read_replica_1', 'read_replica_2', 'read_replica_3']
def __init__(self):
self.replica_index = 0
def db_for_read(self, model, **hints):
"""Distribute reads across replicas"""
if model._meta.app_label in ['blog', 'users']:
# Round-robin load balancing
replica = self.read_replicas[self.replica_index % len(self.read_replicas)]
self.replica_index += 1
return replica
return None
def db_for_write(self, model, **hints):
"""All writes go to master"""
if model._meta.app_label in ['blog', 'users']:
return 'default'
return None
# Caching layer for cross-database queries
class CrossDatabaseCache:
"""Cache for expensive cross-database operations"""
@staticmethod
def get_user_with_stats(user_id, cache_timeout=300):
"""Get user with cached statistics"""
from django.core.cache import cache
cache_key = f'user_stats_{user_id}'
cached_data = cache.get(cache_key)
if cached_data:
return cached_data
# Fetch from multiple databases
user = User.objects.using('users_db').get(id=user_id)
post_count = Post.objects.using('default').filter(
author_id=user_id
).count()
recent_activity = AnalyticsEvent.objects.using('analytics_db').filter(
user_id=user_id,
timestamp__gte=timezone.now() - timedelta(days=7)
).count()
user_data = {
'user': {
'id': user.id,
'username': user.username,
'email': user.email,
},
'stats': {
'post_count': post_count,
'recent_activity': recent_activity,
}
}
cache.set(cache_key, user_data, cache_timeout)
return user_data
# Bulk operations across databases
class BulkMultiDatabaseOperations:
"""Efficient bulk operations across databases"""
@staticmethod
def bulk_create_analytics_events(events_data):
"""Bulk create analytics events"""
events = [
AnalyticsEvent(
event_type=event['type'],
user_id=event['user_id'],
data=event['data']
)
for event in events_data
]
return AnalyticsEvent.objects.using('analytics_db').bulk_create(
events,
batch_size=1000
)
@staticmethod
def sync_user_cache_bulk(user_ids):
"""Bulk sync user cache from users database"""
users = User.objects.using('users_db').filter(id__in=user_ids)
cache_entries = []
for user in users:
cache_entries.append(
CacheEntry(
key=f'user_{user.id}',
value=json.dumps({
'username': user.username,
'email': user.email,
}),
expires_at=timezone.now() + timedelta(hours=24)
)
)
# Use upsert if available, otherwise delete and create
CacheEntry.objects.using('cache_db').filter(
key__in=[f'user_{uid}' for uid in user_ids]
).delete()
return CacheEntry.objects.using('cache_db').bulk_create(cache_entries)
class DatabaseHealthMonitor:
"""Monitor health of multiple databases"""
@staticmethod
def check_all_databases():
"""Check health of all configured databases"""
health_status = {}
for alias in connections:
try:
conn = connections[alias]
# Test connection
with conn.cursor() as cursor:
cursor.execute("SELECT 1")
cursor.fetchone()
health_status[alias] = {
'status': 'healthy',
'response_time': None, # Could measure actual response time
'connection_count': None, # Database-specific query needed
}
except Exception as e:
health_status[alias] = {
'status': 'unhealthy',
'error': str(e),
}
return health_status
@staticmethod
def get_database_sizes():
"""Get database sizes (PostgreSQL example)"""
sizes = {}
for alias in ['default', 'users_db', 'analytics_db']:
try:
with connections[alias].cursor() as cursor:
cursor.execute("""
SELECT pg_size_pretty(pg_database_size(current_database()))
""")
size = cursor.fetchone()[0]
sizes[alias] = size
except Exception as e:
sizes[alias] = f"Error: {e}"
return sizes
@staticmethod
def get_slow_queries(database='default', limit=10):
"""Get slow queries from PostgreSQL"""
with connections[database].cursor() as cursor:
cursor.execute("""
SELECT
query,
calls,
total_time,
mean_time,
rows
FROM pg_stat_statements
ORDER BY total_time DESC
LIMIT %s
""", [limit])
columns = [col[0] for col in cursor.description]
return [dict(zip(columns, row)) for row in cursor.fetchall()]
# Maintenance tasks
class DatabaseMaintenanceTasks:
"""Maintenance tasks for multiple databases"""
@staticmethod
def cleanup_old_analytics_data(days=90):
"""Clean up old analytics data"""
cutoff_date = timezone.now() - timedelta(days=days)
deleted_count = AnalyticsEvent.objects.using('analytics_db').filter(
timestamp__lt=cutoff_date
).delete()[0]
return deleted_count
@staticmethod
def cleanup_expired_cache_entries():
"""Clean up expired cache entries"""
deleted_count = CacheEntry.objects.using('cache_db').filter(
expires_at__lt=timezone.now()
).delete()[0]
return deleted_count
@staticmethod
def vacuum_databases():
"""Run VACUUM on PostgreSQL databases"""
results = {}
for alias in ['default', 'users_db', 'analytics_db']:
try:
with connections[alias].cursor() as cursor:
cursor.execute("VACUUM ANALYZE")
results[alias] = "Success"
except Exception as e:
results[alias] = f"Error: {e}"
return results
# Management command for database operations
from django.core.management.base import BaseCommand
class Command(BaseCommand):
"""Database maintenance command"""
help = 'Perform database maintenance tasks'
def add_arguments(self, parser):
parser.add_argument('--cleanup-analytics', action='store_true')
parser.add_argument('--cleanup-cache', action='store_true')
parser.add_argument('--vacuum', action='store_true')
parser.add_argument('--health-check', action='store_true')
def handle(self, *args, **options):
if options['cleanup_analytics']:
count = DatabaseMaintenanceTasks.cleanup_old_analytics_data()
self.stdout.write(f'Cleaned up {count} analytics records')
if options['cleanup_cache']:
count = DatabaseMaintenanceTasks.cleanup_expired_cache_entries()
self.stdout.write(f'Cleaned up {count} cache entries')
if options['vacuum']:
results = DatabaseMaintenanceTasks.vacuum_databases()
for db, result in results.items():
self.stdout.write(f'{db}: {result}')
if options['health_check']:
health = DatabaseHealthMonitor.check_all_databases()
for db, status in health.items():
self.stdout.write(f'{db}: {status["status"]}')
Multiple database support in Django enables you to build scalable, distributed applications while maintaining clean separation of concerns. Proper routing, monitoring, and maintenance are essential for managing complex multi-database architectures effectively.
Transactions
Database transactions ensure data consistency and integrity by grouping multiple database operations into atomic units. Django provides comprehensive transaction management tools that help you maintain data consistency even in complex scenarios.
Tablespaces
Tablespaces provide a way to define locations in the file system where database objects can be stored. While primarily a PostgreSQL and Oracle feature, understanding tablespaces helps optimize database performance by strategically placing data on different storage devices.