Microservices with Django

Cloud-native Data Processing with MongoDB

MongoDB is an excellent choice for microservices due to its flexible schema, horizontal scaling capabilities, and cloud-native features. This section explores how to integrate MongoDB with Django microservices for efficient data processing and storage.

Cloud-native Data Processing with MongoDB

MongoDB is an excellent choice for microservices due to its flexible schema, horizontal scaling capabilities, and cloud-native features. This section explores how to integrate MongoDB with Django microservices for efficient data processing and storage.

Why MongoDB for Microservices?

Advantages of MongoDB in Microservices

  1. Schema Flexibility: Each service can evolve its data model independently
  2. Horizontal Scaling: Built-in sharding for handling large datasets
  3. Document-based: Natural fit for JSON APIs
  4. Aggregation Pipeline: Powerful data processing capabilities
  5. Change Streams: Real-time data synchronization
  6. Cloud Integration: Native support for cloud deployments

MongoDB vs PostgreSQL for Microservices

# PostgreSQL approach - rigid schema
class User(models.Model):
    username = models.CharField(max_length=150)
    email = models.EmailField()
    profile_data = models.JSONField()  # Limited JSON support

# MongoDB approach - flexible documents
{
    "_id": ObjectId("..."),
    "username": "john_doe",
    "email": "john@example.com",
    "profile": {
        "bio": "Software developer",
        "preferences": {
            "theme": "dark",
            "notifications": {
                "email": True,
                "push": False
            }
        },
        "social_links": [
            {"platform": "github", "url": "https://github.com/johndoe"},
            {"platform": "linkedin", "url": "https://linkedin.com/in/johndoe"}
        ]
    },
    "created_at": ISODate("2023-01-01T00:00:00Z"),
    "last_login": ISODate("2023-12-01T10:30:00Z")
}

Setting Up MongoDB with Django

1. Installation and Configuration

# Install MongoDB and dependencies
pip install pymongo djongo mongoengine django-cors-headers

# Alternative: Use MongoEngine (recommended)
pip install mongoengine
# settings.py
import os
from decouple import config

# MongoDB Configuration with MongoEngine
import mongoengine

MONGODB_SETTINGS = {
    'db': config('MONGODB_DB', default='user_service_db'),
    'host': config('MONGODB_HOST', default='localhost'),
    'port': config('MONGODB_PORT', default=27017, cast=int),
    'username': config('MONGODB_USERNAME', default=''),
    'password': config('MONGODB_PASSWORD', default=''),
    'authentication_source': config('MONGODB_AUTH_SOURCE', default='admin'),
    'connect': False,  # Important for multiprocessing
}

# Connect to MongoDB
mongoengine.connect(**MONGODB_SETTINGS)

# Alternative: Direct PyMongo connection
DATABASES = {
    'default': {
        'ENGINE': 'djongo',
        'NAME': config('MONGODB_DB', default='user_service_db'),
        'CLIENT': {
            'host': config('MONGODB_URI', default='mongodb://localhost:27017'),
            'username': config('MONGODB_USERNAME', default=''),
            'password': config('MONGODB_PASSWORD', default=''),
            'authSource': config('MONGODB_AUTH_SOURCE', default='admin'),
            'authMechanism': 'SCRAM-SHA-1',
        }
    }
}

2. MongoDB Models with MongoEngine

# models.py
from mongoengine import Document, EmbeddedDocument, fields
from datetime import datetime
import uuid

class Address(EmbeddedDocument):
    """Embedded document for user addresses"""
    street = fields.StringField(max_length=200, required=True)
    city = fields.StringField(max_length=100, required=True)
    state = fields.StringField(max_length=50)
    postal_code = fields.StringField(max_length=20)
    country = fields.StringField(max_length=50, required=True)
    is_primary = fields.BooleanField(default=False)

class SocialLink(EmbeddedDocument):
    """Embedded document for social media links"""
    platform = fields.StringField(max_length=50, required=True)
    url = fields.URLField(required=True)
    verified = fields.BooleanField(default=False)

class UserPreferences(EmbeddedDocument):
    """User preferences embedded document"""
    theme = fields.StringField(max_length=20, default='light', choices=['light', 'dark'])
    language = fields.StringField(max_length=10, default='en')
    timezone = fields.StringField(max_length=50, default='UTC')
    notifications = fields.DictField(default={
        'email': True,
        'push': True,
        'sms': False
    })

class User(Document):
    """User document model"""
    # Use UUID as string for consistency with other services
    user_id = fields.StringField(primary_key=True, default=lambda: str(uuid.uuid4()))
    username = fields.StringField(max_length=150, required=True, unique=True)
    email = fields.EmailField(required=True, unique=True)
    password_hash = fields.StringField(required=True)
    
    # Profile information
    first_name = fields.StringField(max_length=100)
    last_name = fields.StringField(max_length=100)
    bio = fields.StringField(max_length=500)
    avatar_url = fields.URLField()
    date_of_birth = fields.DateTimeField()
    
    # Embedded documents
    addresses = fields.ListField(fields.EmbeddedDocumentField(Address))
    social_links = fields.ListField(fields.EmbeddedDocumentField(SocialLink))
    preferences = fields.EmbeddedDocumentField(UserPreferences, default=UserPreferences)
    
    # Status and metadata
    is_active = fields.BooleanField(default=True)
    is_verified = fields.BooleanField(default=False)
    verification_token = fields.StringField()
    
    # Timestamps
    created_at = fields.DateTimeField(default=datetime.utcnow)
    updated_at = fields.DateTimeField(default=datetime.utcnow)
    last_login = fields.DateTimeField()
    
    # Indexing
    meta = {
        'collection': 'users',
        'indexes': [
            'email',
            'username',
            'created_at',
            ('email', 'is_active'),
            {
                'fields': ['$username', '$email', '$bio'],
                'default_language': 'english',
                'weights': {'username': 10, 'email': 5, 'bio': 2}
            }
        ]
    }
    
    def save(self, *args, **kwargs):
        """Override save to update timestamp"""
        self.updated_at = datetime.utcnow()
        return super().save(*args, **kwargs)
    
    def to_dict(self):
        """Convert document to dictionary"""
        return {
            'user_id': self.user_id,
            'username': self.username,
            'email': self.email,
            'first_name': self.first_name,
            'last_name': self.last_name,
            'bio': self.bio,
            'avatar_url': self.avatar_url,
            'addresses': [addr.to_mongo().to_dict() for addr in self.addresses],
            'social_links': [link.to_mongo().to_dict() for link in self.social_links],
            'preferences': self.preferences.to_mongo().to_dict() if self.preferences else {},
            'is_active': self.is_active,
            'is_verified': self.is_verified,
            'created_at': self.created_at.isoformat() if self.created_at else None,
            'updated_at': self.updated_at.isoformat() if self.updated_at else None,
            'last_login': self.last_login.isoformat() if self.last_login else None,
        }

class UserSession(Document):
    """User session tracking"""
    session_id = fields.StringField(primary_key=True, default=lambda: str(uuid.uuid4()))
    user = fields.ReferenceField(User, required=True)
    device_info = fields.DictField()
    ip_address = fields.StringField()
    user_agent = fields.StringField()
    created_at = fields.DateTimeField(default=datetime.utcnow)
    expires_at = fields.DateTimeField()
    is_active = fields.BooleanField(default=True)
    
    meta = {
        'collection': 'user_sessions',
        'indexes': [
            'user',
            'created_at',
            'expires_at',
            ('user', 'is_active')
        ]
    }

class UserActivity(Document):
    """User activity logging"""
    activity_id = fields.StringField(primary_key=True, default=lambda: str(uuid.uuid4()))
    user = fields.ReferenceField(User, required=True)
    action = fields.StringField(required=True)
    resource = fields.StringField()
    metadata = fields.DictField()
    ip_address = fields.StringField()
    user_agent = fields.StringField()
    timestamp = fields.DateTimeField(default=datetime.utcnow)
    
    meta = {
        'collection': 'user_activities',
        'indexes': [
            'user',
            'timestamp',
            'action',
            ('user', 'timestamp'),
            ('user', 'action')
        ]
    }

3. MongoDB Serializers

# serializers.py
from rest_framework import serializers
from rest_framework_mongoengine import serializers as mongo_serializers
from .models import User, Address, SocialLink, UserPreferences

class AddressSerializer(serializers.Serializer):
    street = serializers.CharField(max_length=200)
    city = serializers.CharField(max_length=100)
    state = serializers.CharField(max_length=50, required=False)
    postal_code = serializers.CharField(max_length=20, required=False)
    country = serializers.CharField(max_length=50)
    is_primary = serializers.BooleanField(default=False)

class SocialLinkSerializer(serializers.Serializer):
    platform = serializers.CharField(max_length=50)
    url = serializers.URLField()
    verified = serializers.BooleanField(default=False)

class UserPreferencesSerializer(serializers.Serializer):
    theme = serializers.ChoiceField(choices=['light', 'dark'], default='light')
    language = serializers.CharField(max_length=10, default='en')
    timezone = serializers.CharField(max_length=50, default='UTC')
    notifications = serializers.DictField(default={
        'email': True,
        'push': True,
        'sms': False
    })

class UserSerializer(mongo_serializers.DocumentSerializer):
    addresses = AddressSerializer(many=True, required=False)
    social_links = SocialLinkSerializer(many=True, required=False)
    preferences = UserPreferencesSerializer(required=False)
    password = serializers.CharField(write_only=True)
    
    class Meta:
        model = User
        fields = [
            'user_id', 'username', 'email', 'first_name', 'last_name',
            'bio', 'avatar_url', 'date_of_birth', 'addresses', 'social_links',
            'preferences', 'is_active', 'is_verified', 'created_at',
            'updated_at', 'last_login', 'password'
        ]
        read_only_fields = ['user_id', 'created_at', 'updated_at']
    
    def create(self, validated_data):
        password = validated_data.pop('password')
        user = User(**validated_data)
        user.password_hash = self.hash_password(password)
        user.save()
        return user
    
    def update(self, instance, validated_data):
        password = validated_data.pop('password', None)
        if password:
            instance.password_hash = self.hash_password(password)
        
        for attr, value in validated_data.items():
            setattr(instance, attr, value)
        
        instance.save()
        return instance
    
    def hash_password(self, password):
        from django.contrib.auth.hashers import make_password
        return make_password(password)

class UserListSerializer(serializers.Serializer):
    """Lightweight serializer for user lists"""
    user_id = serializers.CharField()
    username = serializers.CharField()
    email = serializers.EmailField()
    first_name = serializers.CharField()
    last_name = serializers.CharField()
    avatar_url = serializers.URLField()
    is_active = serializers.BooleanField()
    created_at = serializers.DateTimeField()

4. MongoDB Views and Queries

# views.py
from rest_framework import status
from rest_framework.decorators import api_view, action
from rest_framework.response import Response
from rest_framework_mongoengine import viewsets
from mongoengine.queryset.visitor import Q
from mongoengine import DoesNotExist
from datetime import datetime, timedelta
from .models import User, UserActivity
from .serializers import UserSerializer, UserListSerializer

class UserViewSet(viewsets.ModelViewSet):
    queryset = User.objects.all()
    serializer_class = UserSerializer
    
    def get_serializer_class(self):
        if self.action == 'list':
            return UserListSerializer
        return UserSerializer
    
    def get_queryset(self):
        queryset = User.objects.all()
        
        # Filtering
        is_active = self.request.query_params.get('is_active')
        if is_active is not None:
            queryset = queryset.filter(is_active=is_active.lower() == 'true')
        
        # Search
        search = self.request.query_params.get('search')
        if search:
            queryset = queryset.filter(
                Q(username__icontains=search) |
                Q(email__icontains=search) |
                Q(first_name__icontains=search) |
                Q(last_name__icontains=search)
            )
        
        # Date range filtering
        created_after = self.request.query_params.get('created_after')
        if created_after:
            try:
                date = datetime.fromisoformat(created_after)
                queryset = queryset.filter(created_at__gte=date)
            except ValueError:
                pass
        
        return queryset.order_by('-created_at')
    
    @action(detail=True, methods=['post'])
    def add_address(self, request, pk=None):
        """Add address to user"""
        try:
            user = User.objects.get(user_id=pk)
            address_data = request.data
            
            # Validate address data
            from .models import Address
            address = Address(**address_data)
            address.validate()
            
            # If this is primary address, unset others
            if address_data.get('is_primary', False):
                for addr in user.addresses:
                    addr.is_primary = False
            
            user.addresses.append(address)
            user.save()
            
            return Response({'message': 'Address added successfully'})
        
        except DoesNotExist:
            return Response({'error': 'User not found'}, status=status.HTTP_404_NOT_FOUND)
        except Exception as e:
            return Response({'error': str(e)}, status=status.HTTP_400_BAD_REQUEST)
    
    @action(detail=True, methods=['get'])
    def activity_summary(self, request, pk=None):
        """Get user activity summary"""
        try:
            user = User.objects.get(user_id=pk)
            
            # Aggregation pipeline for activity summary
            pipeline = [
                {'$match': {'user': user.pk}},
                {'$group': {
                    '_id': '$action',
                    'count': {'$sum': 1},
                    'last_activity': {'$max': '$timestamp'}
                }},
                {'$sort': {'count': -1}}
            ]
            
            activities = list(UserActivity.objects.aggregate(pipeline))
            
            return Response({
                'user_id': user.user_id,
                'activity_summary': activities,
                'total_activities': UserActivity.objects.filter(user=user).count()
            })
        
        except DoesNotExist:
            return Response({'error': 'User not found'}, status=status.HTTP_404_NOT_FOUND)
    
    @action(detail=False, methods=['get'])
    def analytics(self, request):
        """User analytics using MongoDB aggregation"""
        pipeline = [
            # Match active users
            {'$match': {'is_active': True}},
            
            # Group by creation month
            {'$group': {
                '_id': {
                    'year': {'$year': '$created_at'},
                    'month': {'$month': '$created_at'}
                },
                'user_count': {'$sum': 1},
                'verified_count': {
                    '$sum': {'$cond': [{'$eq': ['$is_verified', True]}, 1, 0]}
                }
            }},
            
            # Sort by date
            {'$sort': {'_id.year': -1, '_id.month': -1}},
            
            # Limit to last 12 months
            {'$limit': 12}
        ]
        
        analytics_data = list(User.objects.aggregate(pipeline))
        
        return Response({
            'monthly_registrations': analytics_data,
            'total_users': User.objects.count(),
            'active_users': User.objects.filter(is_active=True).count(),
            'verified_users': User.objects.filter(is_verified=True).count()
        })

@api_view(['GET'])
def user_search(request):
    """Advanced user search with text indexing"""
    query = request.GET.get('q', '')
    if not query:
        return Response({'error': 'Query parameter q is required'}, 
                       status=status.HTTP_400_BAD_REQUEST)
    
    # Use MongoDB text search
    users = User.objects.search_text(query).order_by('$text_score')
    
    # Limit results
    limit = min(int(request.GET.get('limit', 20)), 100)
    users = users[:limit]
    
    serializer = UserListSerializer(users, many=True)
    return Response({
        'query': query,
        'results': serializer.data,
        'count': len(serializer.data)
    })

Advanced MongoDB Features

1. Aggregation Pipelines

# aggregation.py
from mongoengine import Document
from datetime import datetime, timedelta

class UserAnalytics:
    """Advanced analytics using MongoDB aggregation"""
    
    @staticmethod
    def user_engagement_report():
        """Generate user engagement report"""
        pipeline = [
            # Match users from last 30 days
            {'$match': {
                'created_at': {'$gte': datetime.utcnow() - timedelta(days=30)}
            }},
            
            # Lookup user activities
            {'$lookup': {
                'from': 'user_activities',
                'localField': '_id',
                'foreignField': 'user',
                'as': 'activities'
            }},
            
            # Add computed fields
            {'$addFields': {
                'activity_count': {'$size': '$activities'},
                'last_activity': {'$max': '$activities.timestamp'},
                'unique_actions': {'$size': {'$setUnion': '$activities.action'}}
            }},
            
            # Group by engagement level
            {'$group': {
                '_id': {
                    '$switch': {
                        'branches': [
                            {'case': {'$gte': ['$activity_count', 50]}, 'then': 'high'},
                            {'case': {'$gte': ['$activity_count', 10]}, 'then': 'medium'},
                            {'case': {'$gt': ['$activity_count', 0]}, 'then': 'low'}
                        ],
                        'default': 'inactive'
                    }
                },
                'user_count': {'$sum': 1},
                'avg_activities': {'$avg': '$activity_count'},
                'avg_unique_actions': {'$avg': '$unique_actions'}
            }},
            
            {'$sort': {'user_count': -1}}
        ]
        
        return list(User.objects.aggregate(pipeline))
    
    @staticmethod
    def geographic_distribution():
        """Analyze user geographic distribution"""
        pipeline = [
            # Unwind addresses array
            {'$unwind': '$addresses'},
            
            # Match primary addresses only
            {'$match': {'addresses.is_primary': True}},
            
            # Group by country and state
            {'$group': {
                '_id': {
                    'country': '$addresses.country',
                    'state': '$addresses.state'
                },
                'user_count': {'$sum': 1}
            }},
            
            # Group by country
            {'$group': {
                '_id': '$_id.country',
                'total_users': {'$sum': '$user_count'},
                'states': {
                    '$push': {
                        'state': '$_id.state',
                        'user_count': '$user_count'
                    }
                }
            }},
            
            {'$sort': {'total_users': -1}}
        ]
        
        return list(User.objects.aggregate(pipeline))
    
    @staticmethod
    def user_preferences_analysis():
        """Analyze user preferences"""
        pipeline = [
            # Match users with preferences
            {'$match': {'preferences': {'$exists': True}}},
            
            # Group by theme preference
            {'$group': {
                '_id': '$preferences.theme',
                'count': {'$sum': 1},
                'languages': {'$addToSet': '$preferences.language'},
                'avg_notifications': {
                    '$avg': {
                        '$add': [
                            {'$cond': [{'$eq': ['$preferences.notifications.email', True]}, 1, 0]},
                            {'$cond': [{'$eq': ['$preferences.notifications.push', True]}, 1, 0]},
                            {'$cond': [{'$eq': ['$preferences.notifications.sms', True]}, 1, 0]}
                        ]
                    }
                }
            }},
            
            {'$sort': {'count': -1}}
        ]
        
        return list(User.objects.aggregate(pipeline))

2. Change Streams for Real-time Updates

# change_streams.py
import threading
from mongoengine import connect
from pymongo import MongoClient
from .models import User
from .messaging import publish_user_event

class UserChangeStreamHandler:
    """Handle MongoDB change streams for real-time updates"""
    
    def __init__(self):
        self.client = MongoClient('mongodb://localhost:27017/')
        self.db = self.client.user_service_db
        self.collection = self.db.users
        self.running = False
    
    def start_watching(self):
        """Start watching for changes"""
        self.running = True
        thread = threading.Thread(target=self._watch_changes)
        thread.daemon = True
        thread.start()
    
    def stop_watching(self):
        """Stop watching for changes"""
        self.running = False
    
    def _watch_changes(self):
        """Watch for changes in user collection"""
        try:
            with self.collection.watch() as stream:
                for change in stream:
                    if not self.running:
                        break
                    
                    self._handle_change(change)
        except Exception as e:
            print(f"Change stream error: {e}")
    
    def _handle_change(self, change):
        """Handle individual change events"""
        operation_type = change['operationType']
        
        if operation_type == 'insert':
            self._handle_user_created(change)
        elif operation_type == 'update':
            self._handle_user_updated(change)
        elif operation_type == 'delete':
            self._handle_user_deleted(change)
    
    def _handle_user_created(self, change):
        """Handle user creation"""
        user_data = change['fullDocument']
        
        # Publish event to message queue
        publish_user_event('user.created', {
            'user_id': user_data['_id'],
            'username': user_data['username'],
            'email': user_data['email'],
            'timestamp': user_data['created_at']
        })
        
        print(f"User created: {user_data['username']}")
    
    def _handle_user_updated(self, change):
        """Handle user updates"""
        user_id = change['documentKey']['_id']
        updated_fields = change.get('updateDescription', {}).get('updatedFields', {})
        
        # Publish event for significant updates
        if any(field in updated_fields for field in ['email', 'is_verified', 'is_active']):
            publish_user_event('user.updated', {
                'user_id': user_id,
                'updated_fields': list(updated_fields.keys()),
                'timestamp': datetime.utcnow().isoformat()
            })
        
        print(f"User updated: {user_id}")
    
    def _handle_user_deleted(self, change):
        """Handle user deletion"""
        user_id = change['documentKey']['_id']
        
        publish_user_event('user.deleted', {
            'user_id': user_id,
            'timestamp': datetime.utcnow().isoformat()
        })
        
        print(f"User deleted: {user_id}")

# Start change stream handler
change_handler = UserChangeStreamHandler()
change_handler.start_watching()

3. MongoDB Sharding Configuration

# sharding.py
from pymongo import MongoClient

class MongoDBShardingSetup:
    """Setup MongoDB sharding for horizontal scaling"""
    
    def __init__(self, config_servers, shard_servers, mongos_servers):
        self.config_servers = config_servers
        self.shard_servers = shard_servers
        self.mongos_servers = mongos_servers
    
    def setup_sharding(self):
        """Setup sharding configuration"""
        # Connect to mongos
        client = MongoClient(self.mongos_servers[0])
        admin_db = client.admin
        
        # Add shards
        for shard in self.shard_servers:
            try:
                admin_db.command('addShard', shard)
                print(f"Added shard: {shard}")
            except Exception as e:
                print(f"Error adding shard {shard}: {e}")
        
        # Enable sharding for database
        admin_db.command('enableSharding', 'user_service_db')
        
        # Shard collections
        self._shard_users_collection(admin_db)
        self._shard_activities_collection(admin_db)
    
    def _shard_users_collection(self, admin_db):
        """Shard users collection by user_id"""
        admin_db.command({
            'shardCollection': 'user_service_db.users',
            'key': {'user_id': 1}
        })
        print("Sharded users collection")
    
    def _shard_activities_collection(self, admin_db):
        """Shard activities collection by user and timestamp"""
        admin_db.command({
            'shardCollection': 'user_service_db.user_activities',
            'key': {'user': 1, 'timestamp': 1}
        })
        print("Sharded user_activities collection")

# Usage
sharding_setup = MongoDBShardingSetup(
    config_servers=['config1:27019', 'config2:27019', 'config3:27019'],
    shard_servers=['shard1/shard1a:27018,shard1b:27018', 'shard2/shard2a:27018,shard2b:27018'],
    mongos_servers=['mongos1:27017', 'mongos2:27017']
)

4. Performance Optimization

# optimization.py
from mongoengine import connect
from pymongo import IndexModel, TEXT, ASCENDING, DESCENDING
from .models import User, UserActivity

class MongoDBOptimizer:
    """MongoDB performance optimization utilities"""
    
    @staticmethod
    def create_indexes():
        """Create optimized indexes"""
        # User collection indexes
        User.ensure_indexes()
        
        # Additional compound indexes
        User._get_collection().create_indexes([
            IndexModel([('email', ASCENDING), ('is_active', ASCENDING)]),
            IndexModel([('created_at', DESCENDING), ('is_verified', ASCENDING)]),
            IndexModel([('username', TEXT), ('email', TEXT), ('bio', TEXT)]),
        ])
        
        # Activity collection indexes
        UserActivity._get_collection().create_indexes([
            IndexModel([('user', ASCENDING), ('timestamp', DESCENDING)]),
            IndexModel([('action', ASCENDING), ('timestamp', DESCENDING)]),
            IndexModel([('timestamp', DESCENDING)]),
        ])
        
        print("Indexes created successfully")
    
    @staticmethod
    def analyze_query_performance():
        """Analyze query performance"""
        # Explain query execution
        explain_result = User.objects.filter(
            is_active=True,
            created_at__gte=datetime.utcnow() - timedelta(days=30)
        ).explain()
        
        print("Query execution stats:")
        print(f"Execution time: {explain_result['executionStats']['executionTimeMillis']}ms")
        print(f"Documents examined: {explain_result['executionStats']['totalDocsExamined']}")
        print(f"Documents returned: {explain_result['executionStats']['totalDocsReturned']}")
        
        return explain_result
    
    @staticmethod
    def optimize_aggregation_pipeline(pipeline):
        """Optimize aggregation pipeline"""
        # Add index hints and optimize stages
        optimized_pipeline = []
        
        # Move $match stages to the beginning
        match_stages = [stage for stage in pipeline if '$match' in stage]
        other_stages = [stage for stage in pipeline if '$match' not in stage]
        
        optimized_pipeline.extend(match_stages)
        optimized_pipeline.extend(other_stages)
        
        return optimized_pipeline

Docker Configuration for MongoDB

# docker-compose.yml - MongoDB services
version: '3.8'

services:
  # MongoDB Replica Set
  mongo1:
    image: mongo:7.0
    command: mongod --replSet rs0 --bind_ip_all
    environment:
      MONGO_INITDB_ROOT_USERNAME: admin
      MONGO_INITDB_ROOT_PASSWORD: password
    volumes:
      - mongo1_data:/data/db
      - ./scripts/mongo-init.js:/docker-entrypoint-initdb.d/mongo-init.js
    ports:
      - "27017:27017"
    networks:
      - microservices-network

  mongo2:
    image: mongo:7.0
    command: mongod --replSet rs0 --bind_ip_all
    environment:
      MONGO_INITDB_ROOT_USERNAME: admin
      MONGO_INITDB_ROOT_PASSWORD: password
    volumes:
      - mongo2_data:/data/db
    ports:
      - "27018:27017"
    networks:
      - microservices-network

  mongo3:
    image: mongo:7.0
    command: mongod --replSet rs0 --bind_ip_all
    environment:
      MONGO_INITDB_ROOT_USERNAME: admin
      MONGO_INITDB_ROOT_PASSWORD: password
    volumes:
      - mongo3_data:/data/db
    ports:
      - "27019:27017"
    networks:
      - microservices-network

  # MongoDB Express for administration
  mongo-express:
    image: mongo-express:latest
    environment:
      ME_CONFIG_MONGODB_ADMINUSERNAME: admin
      ME_CONFIG_MONGODB_ADMINPASSWORD: password
      ME_CONFIG_MONGODB_SERVER: mongo1
      ME_CONFIG_BASICAUTH_USERNAME: admin
      ME_CONFIG_BASICAUTH_PASSWORD: password
    ports:
      - "8081:8081"
    depends_on:
      - mongo1
    networks:
      - microservices-network

volumes:
  mongo1_data:
  mongo2_data:
  mongo3_data:

networks:
  microservices-network:
    driver: bridge
// scripts/mongo-init.js
// Initialize replica set
rs.initiate({
  _id: "rs0",
  members: [
    { _id: 0, host: "mongo1:27017", priority: 2 },
    { _id: 1, host: "mongo2:27017", priority: 1 },
    { _id: 2, host: "mongo3:27017", priority: 1 }
  ]
});

// Create databases and users
use('user_service_db');
db.createUser({
  user: 'user_service',
  pwd: 'user_service_password',
  roles: [{ role: 'readWrite', db: 'user_service_db' }]
});

use('product_service_db');
db.createUser({
  user: 'product_service',
  pwd: 'product_service_password',
  roles: [{ role: 'readWrite', db: 'product_service_db' }]
});

Summary

MongoDB provides excellent support for microservices with its flexible schema, powerful aggregation framework, and cloud-native features. Key benefits include:

  • Flexible Schema: Easy evolution of data models
  • Horizontal Scaling: Built-in sharding capabilities
  • Real-time Updates: Change streams for event-driven architecture
  • Rich Queries: Powerful aggregation pipelines
  • High Availability: Replica sets for fault tolerance

The combination of Django and MongoDB creates a robust foundation for microservices that can scale and adapt to changing requirements. In the next section, we'll explore creating RESTful APIs for microservices communication.