Models and Databases

Multiple Databases

Django supports using multiple databases in a single application, enabling you to separate different types of data, implement read/write splitting, or integrate with legacy systems. Understanding how to configure and use multiple databases effectively is crucial for scalable applications.

Multiple Databases

Django supports using multiple databases in a single application, enabling you to separate different types of data, implement read/write splitting, or integrate with legacy systems. Understanding how to configure and use multiple databases effectively is crucial for scalable applications.

Database Configuration

Settings Configuration

# settings.py
DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql',
        'NAME': 'main_app_db',
        'USER': 'postgres',
        'PASSWORD': 'password',
        'HOST': 'localhost',
        'PORT': '5432',
        'OPTIONS': {
            'init_command': "SET sql_mode='STRICT_TRANS_TABLES'",
        },
    },
    'users_db': {
        'ENGINE': 'django.db.backends.postgresql',
        'NAME': 'users_database',
        'USER': 'postgres',
        'PASSWORD': 'password',
        'HOST': 'users-db-server',
        'PORT': '5432',
    },
    'analytics_db': {
        'ENGINE': 'django.db.backends.postgresql',
        'NAME': 'analytics_database',
        'USER': 'analytics_user',
        'PASSWORD': 'analytics_password',
        'HOST': 'analytics-server',
        'PORT': '5432',
        'OPTIONS': {
            'init_command': "SET default_transaction_isolation TO 'read committed'",
        },
    },
    'legacy_db': {
        'ENGINE': 'django.db.backends.mysql',
        'NAME': 'legacy_system',
        'USER': 'legacy_user',
        'PASSWORD': 'legacy_password',
        'HOST': 'legacy-server',
        'PORT': '3306',
        'OPTIONS': {
            'charset': 'utf8mb4',
        },
    },
    'cache_db': {
        'ENGINE': 'django.db.backends.sqlite3',
        'NAME': BASE_DIR / 'cache.sqlite3',
    },
    'read_replica': {
        'ENGINE': 'django.db.backends.postgresql',
        'NAME': 'main_app_db',
        'USER': 'readonly_user',
        'PASSWORD': 'readonly_password',
        'HOST': 'read-replica-server',
        'PORT': '5432',
        'OPTIONS': {
            'default_transaction_isolation': 'read committed',
        },
    }
}

# Database routing configuration
DATABASE_ROUTERS = [
    'myapp.routers.DatabaseRouter',
    'myapp.routers.AnalyticsRouter',
    'myapp.routers.LegacyRouter',
]

# Connection pooling (if using django-db-pool)
DATABASES['default']['ENGINE'] = 'django_db_pool.backends.postgresql'
DATABASES['default']['POOL_OPTIONS'] = {
    'POOL_SIZE': 10,
    'MAX_OVERFLOW': 10,
    'RECYCLE': -1,
}

Model Database Assignment

# models.py
from django.db import models

class User(models.Model):
    """User model stored in users database"""
    username = models.CharField(max_length=150, unique=True)
    email = models.EmailField(unique=True)
    first_name = models.CharField(max_length=30)
    last_name = models.CharField(max_length=30)
    created_at = models.DateTimeField(auto_now_add=True)
    
    class Meta:
        app_label = 'users'

class Post(models.Model):
    """Post model stored in default database"""
    title = models.CharField(max_length=200)
    content = models.TextField()
    author = models.ForeignKey(User, on_delete=models.CASCADE)
    created_at = models.DateTimeField(auto_now_add=True)
    
    class Meta:
        app_label = 'blog'

class AnalyticsEvent(models.Model):
    """Analytics model stored in analytics database"""
    event_type = models.CharField(max_length=50)
    user_id = models.IntegerField(null=True, blank=True)
    session_id = models.CharField(max_length=100)
    timestamp = models.DateTimeField(auto_now_add=True)
    data = models.JSONField(default=dict)
    
    class Meta:
        app_label = 'analytics'
        indexes = [
            models.Index(fields=['event_type', 'timestamp']),
            models.Index(fields=['user_id', 'timestamp']),
        ]

class LegacyCustomer(models.Model):
    """Legacy customer model from existing system"""
    customer_id = models.AutoField(primary_key=True)
    customer_name = models.CharField(max_length=100)
    customer_email = models.CharField(max_length=100)
    created_date = models.DateTimeField()
    
    class Meta:
        app_label = 'legacy'
        db_table = 'customers'  # Existing table name
        managed = False  # Don't let Django manage this table

class CacheEntry(models.Model):
    """Cache entries stored in SQLite for fast access"""
    key = models.CharField(max_length=255, unique=True)
    value = models.TextField()
    expires_at = models.DateTimeField()
    created_at = models.DateTimeField(auto_now_add=True)
    
    class Meta:
        app_label = 'cache'

Database Routing

Custom Database Router

# routers.py
class DatabaseRouter:
    """
    A router to control all database operations on models
    """
    
    route_app_labels = {
        'users': 'users_db',
        'blog': 'default',
        'analytics': 'analytics_db',
        'legacy': 'legacy_db',
        'cache': 'cache_db',
    }
    
    def db_for_read(self, model, **hints):
        """Suggest the database to read from"""
        app_label = model._meta.app_label
        
        # Route specific apps to their databases
        if app_label in self.route_app_labels:
            return self.route_app_labels[app_label]
        
        # Default routing
        return None
    
    def db_for_write(self, model, **hints):
        """Suggest the database to write to"""
        app_label = model._meta.app_label
        
        # Route specific apps to their databases
        if app_label in self.route_app_labels:
            return self.route_app_labels[app_label]
        
        # Default routing
        return None
    
    def allow_relation(self, obj1, obj2, **hints):
        """Allow relations if models are in the same app or compatible apps"""
        db_set = {'default', 'users_db'}  # Compatible databases
        
        if obj1._state.db in db_set and obj2._state.db in db_set:
            return True
        
        # Allow relations within the same database
        return obj1._state.db == obj2._state.db
    
    def allow_migrate(self, db, app_label, model_name=None, **hints):
        """Ensure that certain apps' models get created on the right database"""
        
        # Analytics models only on analytics_db
        if app_label == 'analytics':
            return db == 'analytics_db'
        
        # Users models only on users_db
        if app_label == 'users':
            return db == 'users_db'
        
        # Legacy models only on legacy_db
        if app_label == 'legacy':
            return db == 'legacy_db'
        
        # Cache models only on cache_db
        if app_label == 'cache':
            return db == 'cache_db'
        
        # Default app models on default database
        if app_label in ['blog', 'contenttypes', 'auth', 'sessions', 'admin']:
            return db == 'default'
        
        # Don't migrate other apps to this database
        return None

class ReadWriteSplitRouter:
    """
    Router for read/write splitting
    """
    
    def db_for_read(self, model, **hints):
        """Reading from the read replica database"""
        
        # Use read replica for read operations
        if model._meta.app_label in ['blog', 'users']:
            return 'read_replica'
        
        return None
    
    def db_for_write(self, model, **hints):
        """Writing to the master database"""
        
        # Always write to master
        if model._meta.app_label in ['blog', 'users']:
            return 'default'
        
        return None
    
    def allow_relation(self, obj1, obj2, **hints):
        """Relations between objects are OK"""
        return True
    
    def allow_migrate(self, db, app_label, model_name=None, **hints):
        """Migrations only on master database"""
        return db == 'default'

class AnalyticsRouter:
    """
    Dedicated router for analytics data
    """
    
    def db_for_read(self, model, **hints):
        if model._meta.app_label == 'analytics':
            return 'analytics_db'
        return None
    
    def db_for_write(self, model, **hints):
        if model._meta.app_label == 'analytics':
            return 'analytics_db'
        return None
    
    def allow_migrate(self, db, app_label, model_name=None, **hints):
        if app_label == 'analytics':
            return db == 'analytics_db'
        elif db == 'analytics_db':
            return False
        return None

class TimeBasedRouter:
    """
    Route data based on time periods (e.g., monthly databases)
    """
    
    def db_for_read(self, model, **hints):
        if model._meta.app_label == 'timeseries':
            # Determine database based on date
            instance = hints.get('instance')
            if instance and hasattr(instance, 'created_at'):
                month = instance.created_at.strftime('%Y_%m')
                return f'timeseries_{month}'
        return None
    
    def db_for_write(self, model, **hints):
        if model._meta.app_label == 'timeseries':
            from datetime import datetime
            current_month = datetime.now().strftime('%Y_%m')
            return f'timeseries_{current_month}'
        return None

Working with Multiple Databases

Basic Database Operations

from django.db import connections
from myapp.models import User, Post, AnalyticsEvent

# Explicit database specification
def basic_multi_db_operations():
    """Basic operations across multiple databases"""
    
    # Create user in users database
    user = User.objects.using('users_db').create(
        username='john_doe',
        email='john@example.com',
        first_name='John',
        last_name='Doe'
    )
    
    # Create post in default database
    post = Post.objects.using('default').create(
        title='My First Post',
        content='This is my first blog post.',
        author_id=user.id  # Reference by ID across databases
    )
    
    # Log analytics event
    AnalyticsEvent.objects.using('analytics_db').create(
        event_type='post_created',
        user_id=user.id,
        session_id='session_123',
        data={'post_id': post.id, 'title': post.title}
    )
    
    return user, post

# Querying across databases
def cross_database_queries():
    """Query data from multiple databases"""
    
    # Get users from users database
    users = User.objects.using('users_db').filter(
        created_at__gte=timezone.now() - timedelta(days=30)
    )
    
    # Get posts from default database
    posts = Post.objects.using('default').filter(
        author_id__in=[user.id for user in users]
    )
    
    # Get analytics for these users
    analytics = AnalyticsEvent.objects.using('analytics_db').filter(
        user_id__in=[user.id for user in users],
        event_type='post_view'
    )
    
    return {
        'users': list(users),
        'posts': list(posts),
        'analytics': list(analytics)
    }

# Raw SQL across databases
def raw_sql_multi_db():
    """Execute raw SQL on specific databases"""
    
    # Get database connections
    users_conn = connections['users_db']
    analytics_conn = connections['analytics_db']
    
    # Query users database
    with users_conn.cursor() as cursor:
        cursor.execute("""
            SELECT id, username, email, created_at
            FROM users_user
            WHERE created_at >= %s
        """, [timezone.now() - timedelta(days=7)])
        
        recent_users = cursor.fetchall()
    
    # Query analytics database
    with analytics_conn.cursor() as cursor:
        cursor.execute("""
            SELECT user_id, COUNT(*) as event_count
            FROM analytics_analyticsevent
            WHERE timestamp >= %s
            GROUP BY user_id
        """, [timezone.now() - timedelta(days=7)])
        
        user_activity = cursor.fetchall()
    
    return recent_users, user_activity

Advanced Multi-Database Patterns

class MultiDatabaseService:
    """Service class for complex multi-database operations"""
    
    @staticmethod
    def create_user_with_profile(user_data, profile_data):
        """Create user and profile across databases with error handling"""
        
        from django.db import transaction
        
        try:
            # Create user in users database
            with transaction.atomic(using='users_db'):
                user = User.objects.using('users_db').create(**user_data)
            
            # Create profile in default database
            with transaction.atomic(using='default'):
                profile = UserProfile.objects.using('default').create(
                    user_id=user.id,  # Reference by ID
                    **profile_data
                )
            
            # Log creation event
            with transaction.atomic(using='analytics_db'):
                AnalyticsEvent.objects.using('analytics_db').create(
                    event_type='user_registered',
                    user_id=user.id,
                    data={'profile_created': True}
                )
            
            return user, profile
            
        except Exception as e:
            # Note: No distributed transaction support
            # Manual cleanup may be needed
            logger.error(f"User creation failed: {e}")
            raise
    
    @staticmethod
    def get_user_dashboard_data(user_id):
        """Aggregate data from multiple databases for dashboard"""
        
        # Get user info
        try:
            user = User.objects.using('users_db').get(id=user_id)
        except User.DoesNotExist:
            return None
        
        # Get user posts
        posts = Post.objects.using('default').filter(
            author_id=user_id
        ).order_by('-created_at')[:10]
        
        # Get analytics summary
        analytics_summary = AnalyticsEvent.objects.using('analytics_db').filter(
            user_id=user_id,
            timestamp__gte=timezone.now() - timedelta(days=30)
        ).values('event_type').annotate(
            count=Count('id')
        )
        
        # Get cached data
        cached_stats = CacheEntry.objects.using('cache_db').filter(
            key=f'user_stats_{user_id}'
        ).first()
        
        return {
            'user': user,
            'recent_posts': list(posts),
            'analytics': list(analytics_summary),
            'cached_stats': cached_stats.value if cached_stats else None
        }
    
    @staticmethod
    def sync_user_data():
        """Synchronize user data between databases"""
        
        # Get users that need syncing
        users_to_sync = User.objects.using('users_db').filter(
            updated_at__gte=timezone.now() - timedelta(hours=1)
        )
        
        for user in users_to_sync:
            # Update user cache
            cache_data = {
                'username': user.username,
                'email': user.email,
                'full_name': f'{user.first_name} {user.last_name}',
                'last_updated': user.updated_at.isoformat()
            }
            
            CacheEntry.objects.using('cache_db').update_or_create(
                key=f'user_{user.id}',
                defaults={
                    'value': json.dumps(cache_data),
                    'expires_at': timezone.now() + timedelta(hours=24)
                }
            )
            
            # Log sync event
            AnalyticsEvent.objects.using('analytics_db').create(
                event_type='user_synced',
                user_id=user.id,
                data={'sync_timestamp': timezone.now().isoformat()}
            )

# Database-specific managers
class MultiDatabaseManager(models.Manager):
    """Manager that can work with multiple databases"""
    
    def __init__(self, database_alias=None):
        super().__init__()
        self.database_alias = database_alias
    
    def get_queryset(self):
        if self.database_alias:
            return super().get_queryset().using(self.database_alias)
        return super().get_queryset()
    
    def create(self, **kwargs):
        if self.database_alias:
            return super().using(self.database_alias).create(**kwargs)
        return super().create(**kwargs)

class AnalyticsEvent(models.Model):
    event_type = models.CharField(max_length=50)
    user_id = models.IntegerField(null=True, blank=True)
    timestamp = models.DateTimeField(auto_now_add=True)
    data = models.JSONField(default=dict)
    
    # Multiple managers for different databases
    objects = models.Manager()
    analytics_objects = MultiDatabaseManager('analytics_db')
    
    class Meta:
        app_label = 'analytics'

# Usage
# events = AnalyticsEvent.analytics_objects.filter(event_type='login')

Database Migrations

Managing Migrations Across Databases

# Custom migration operations
from django.db import migrations

class Migration(migrations.Migration):
    """Migration that runs on specific database"""
    
    dependencies = [
        ('myapp', '0001_initial'),
    ]
    
    operations = [
        migrations.RunSQL(
            "CREATE INDEX CONCURRENTLY idx_user_email ON users_user(email);",
            reverse_sql="DROP INDEX idx_user_email;",
            # This migration only runs on users_db
            hints={'target_db': 'users_db'}
        ),
    ]

# Custom migration command
from django.core.management.base import BaseCommand
from django.core.management import call_command

class Command(BaseCommand):
    """Migrate specific databases"""
    
    help = 'Run migrations on specific databases'
    
    def add_arguments(self, parser):
        parser.add_argument('--database', type=str, help='Database to migrate')
        parser.add_argument('--app', type=str, help='App to migrate')
    
    def handle(self, *args, **options):
        database = options.get('database')
        app = options.get('app')
        
        if database and app:
            self.stdout.write(f'Migrating {app} on {database}...')
            call_command('migrate', app, database=database, verbosity=2)
        else:
            # Migrate all databases
            databases = ['default', 'users_db', 'analytics_db']
            
            for db in databases:
                self.stdout.write(f'Migrating {db}...')
                call_command('migrate', database=db, verbosity=1)

# Database-specific migration runner
def run_multi_database_migrations():
    """Run migrations on appropriate databases"""
    
    migration_map = {
        'default': ['blog', 'contenttypes', 'auth', 'sessions'],
        'users_db': ['users'],
        'analytics_db': ['analytics'],
        'legacy_db': [],  # No migrations for legacy
    }
    
    for database, apps in migration_map.items():
        for app in apps:
            call_command('migrate', app, database=database)

Performance Optimization

Connection Pooling and Optimization

# Connection management
class DatabaseConnectionManager:
    """Manage database connections efficiently"""
    
    @staticmethod
    def get_connection_info():
        """Get connection information for all databases"""
        
        connection_info = {}
        
        for alias in connections:
            conn = connections[alias]
            connection_info[alias] = {
                'vendor': conn.vendor,
                'is_usable': conn.is_usable(),
                'queries_count': len(conn.queries),
                'settings': {
                    'NAME': conn.settings_dict.get('NAME'),
                    'HOST': conn.settings_dict.get('HOST'),
                    'PORT': conn.settings_dict.get('PORT'),
                }
            }
        
        return connection_info
    
    @staticmethod
    def close_old_connections():
        """Close old database connections"""
        
        from django.db import close_old_connections
        close_old_connections()
    
    @staticmethod
    def reset_queries():
        """Reset query logs for all connections"""
        
        for conn in connections.all():
            conn.queries_log.clear()

# Read/Write splitting with load balancing
class LoadBalancedRouter:
    """Router with load balancing for read replicas"""
    
    read_replicas = ['read_replica_1', 'read_replica_2', 'read_replica_3']
    
    def __init__(self):
        self.replica_index = 0
    
    def db_for_read(self, model, **hints):
        """Distribute reads across replicas"""
        
        if model._meta.app_label in ['blog', 'users']:
            # Round-robin load balancing
            replica = self.read_replicas[self.replica_index % len(self.read_replicas)]
            self.replica_index += 1
            return replica
        
        return None
    
    def db_for_write(self, model, **hints):
        """All writes go to master"""
        
        if model._meta.app_label in ['blog', 'users']:
            return 'default'
        
        return None

# Caching layer for cross-database queries
class CrossDatabaseCache:
    """Cache for expensive cross-database operations"""
    
    @staticmethod
    def get_user_with_stats(user_id, cache_timeout=300):
        """Get user with cached statistics"""
        
        from django.core.cache import cache
        
        cache_key = f'user_stats_{user_id}'
        cached_data = cache.get(cache_key)
        
        if cached_data:
            return cached_data
        
        # Fetch from multiple databases
        user = User.objects.using('users_db').get(id=user_id)
        
        post_count = Post.objects.using('default').filter(
            author_id=user_id
        ).count()
        
        recent_activity = AnalyticsEvent.objects.using('analytics_db').filter(
            user_id=user_id,
            timestamp__gte=timezone.now() - timedelta(days=7)
        ).count()
        
        user_data = {
            'user': {
                'id': user.id,
                'username': user.username,
                'email': user.email,
            },
            'stats': {
                'post_count': post_count,
                'recent_activity': recent_activity,
            }
        }
        
        cache.set(cache_key, user_data, cache_timeout)
        return user_data

# Bulk operations across databases
class BulkMultiDatabaseOperations:
    """Efficient bulk operations across databases"""
    
    @staticmethod
    def bulk_create_analytics_events(events_data):
        """Bulk create analytics events"""
        
        events = [
            AnalyticsEvent(
                event_type=event['type'],
                user_id=event['user_id'],
                data=event['data']
            )
            for event in events_data
        ]
        
        return AnalyticsEvent.objects.using('analytics_db').bulk_create(
            events,
            batch_size=1000
        )
    
    @staticmethod
    def sync_user_cache_bulk(user_ids):
        """Bulk sync user cache from users database"""
        
        users = User.objects.using('users_db').filter(id__in=user_ids)
        
        cache_entries = []
        for user in users:
            cache_entries.append(
                CacheEntry(
                    key=f'user_{user.id}',
                    value=json.dumps({
                        'username': user.username,
                        'email': user.email,
                    }),
                    expires_at=timezone.now() + timedelta(hours=24)
                )
            )
        
        # Use upsert if available, otherwise delete and create
        CacheEntry.objects.using('cache_db').filter(
            key__in=[f'user_{uid}' for uid in user_ids]
        ).delete()
        
        return CacheEntry.objects.using('cache_db').bulk_create(cache_entries)

Monitoring and Maintenance

Database Health Monitoring

class DatabaseHealthMonitor:
    """Monitor health of multiple databases"""
    
    @staticmethod
    def check_all_databases():
        """Check health of all configured databases"""
        
        health_status = {}
        
        for alias in connections:
            try:
                conn = connections[alias]
                
                # Test connection
                with conn.cursor() as cursor:
                    cursor.execute("SELECT 1")
                    cursor.fetchone()
                
                health_status[alias] = {
                    'status': 'healthy',
                    'response_time': None,  # Could measure actual response time
                    'connection_count': None,  # Database-specific query needed
                }
                
            except Exception as e:
                health_status[alias] = {
                    'status': 'unhealthy',
                    'error': str(e),
                }
        
        return health_status
    
    @staticmethod
    def get_database_sizes():
        """Get database sizes (PostgreSQL example)"""
        
        sizes = {}
        
        for alias in ['default', 'users_db', 'analytics_db']:
            try:
                with connections[alias].cursor() as cursor:
                    cursor.execute("""
                        SELECT pg_size_pretty(pg_database_size(current_database()))
                    """)
                    size = cursor.fetchone()[0]
                    sizes[alias] = size
                    
            except Exception as e:
                sizes[alias] = f"Error: {e}"
        
        return sizes
    
    @staticmethod
    def get_slow_queries(database='default', limit=10):
        """Get slow queries from PostgreSQL"""
        
        with connections[database].cursor() as cursor:
            cursor.execute("""
                SELECT 
                    query,
                    calls,
                    total_time,
                    mean_time,
                    rows
                FROM pg_stat_statements
                ORDER BY total_time DESC
                LIMIT %s
            """, [limit])
            
            columns = [col[0] for col in cursor.description]
            return [dict(zip(columns, row)) for row in cursor.fetchall()]

# Maintenance tasks
class DatabaseMaintenanceTasks:
    """Maintenance tasks for multiple databases"""
    
    @staticmethod
    def cleanup_old_analytics_data(days=90):
        """Clean up old analytics data"""
        
        cutoff_date = timezone.now() - timedelta(days=days)
        
        deleted_count = AnalyticsEvent.objects.using('analytics_db').filter(
            timestamp__lt=cutoff_date
        ).delete()[0]
        
        return deleted_count
    
    @staticmethod
    def cleanup_expired_cache_entries():
        """Clean up expired cache entries"""
        
        deleted_count = CacheEntry.objects.using('cache_db').filter(
            expires_at__lt=timezone.now()
        ).delete()[0]
        
        return deleted_count
    
    @staticmethod
    def vacuum_databases():
        """Run VACUUM on PostgreSQL databases"""
        
        results = {}
        
        for alias in ['default', 'users_db', 'analytics_db']:
            try:
                with connections[alias].cursor() as cursor:
                    cursor.execute("VACUUM ANALYZE")
                    results[alias] = "Success"
                    
            except Exception as e:
                results[alias] = f"Error: {e}"
        
        return results

# Management command for database operations
from django.core.management.base import BaseCommand

class Command(BaseCommand):
    """Database maintenance command"""
    
    help = 'Perform database maintenance tasks'
    
    def add_arguments(self, parser):
        parser.add_argument('--cleanup-analytics', action='store_true')
        parser.add_argument('--cleanup-cache', action='store_true')
        parser.add_argument('--vacuum', action='store_true')
        parser.add_argument('--health-check', action='store_true')
    
    def handle(self, *args, **options):
        if options['cleanup_analytics']:
            count = DatabaseMaintenanceTasks.cleanup_old_analytics_data()
            self.stdout.write(f'Cleaned up {count} analytics records')
        
        if options['cleanup_cache']:
            count = DatabaseMaintenanceTasks.cleanup_expired_cache_entries()
            self.stdout.write(f'Cleaned up {count} cache entries')
        
        if options['vacuum']:
            results = DatabaseMaintenanceTasks.vacuum_databases()
            for db, result in results.items():
                self.stdout.write(f'{db}: {result}')
        
        if options['health_check']:
            health = DatabaseHealthMonitor.check_all_databases()
            for db, status in health.items():
                self.stdout.write(f'{db}: {status["status"]}')

Multiple database support in Django enables you to build scalable, distributed applications while maintaining clean separation of concerns. Proper routing, monitoring, and maintenance are essential for managing complex multi-database architectures effectively.