MongoDB is an excellent choice for microservices due to its flexible schema, horizontal scaling capabilities, and cloud-native features. This section explores how to integrate MongoDB with Django microservices for efficient data processing and storage.
# PostgreSQL approach - rigid schema
class User(models.Model):
username = models.CharField(max_length=150)
email = models.EmailField()
profile_data = models.JSONField() # Limited JSON support
# MongoDB approach - flexible documents
{
"_id": ObjectId("..."),
"username": "john_doe",
"email": "john@example.com",
"profile": {
"bio": "Software developer",
"preferences": {
"theme": "dark",
"notifications": {
"email": True,
"push": False
}
},
"social_links": [
{"platform": "github", "url": "https://github.com/johndoe"},
{"platform": "linkedin", "url": "https://linkedin.com/in/johndoe"}
]
},
"created_at": ISODate("2023-01-01T00:00:00Z"),
"last_login": ISODate("2023-12-01T10:30:00Z")
}
# Install MongoDB and dependencies
pip install pymongo djongo mongoengine django-cors-headers
# Alternative: Use MongoEngine (recommended)
pip install mongoengine
# settings.py
import os
from decouple import config
# MongoDB Configuration with MongoEngine
import mongoengine
MONGODB_SETTINGS = {
'db': config('MONGODB_DB', default='user_service_db'),
'host': config('MONGODB_HOST', default='localhost'),
'port': config('MONGODB_PORT', default=27017, cast=int),
'username': config('MONGODB_USERNAME', default=''),
'password': config('MONGODB_PASSWORD', default=''),
'authentication_source': config('MONGODB_AUTH_SOURCE', default='admin'),
'connect': False, # Important for multiprocessing
}
# Connect to MongoDB
mongoengine.connect(**MONGODB_SETTINGS)
# Alternative: Direct PyMongo connection
DATABASES = {
'default': {
'ENGINE': 'djongo',
'NAME': config('MONGODB_DB', default='user_service_db'),
'CLIENT': {
'host': config('MONGODB_URI', default='mongodb://localhost:27017'),
'username': config('MONGODB_USERNAME', default=''),
'password': config('MONGODB_PASSWORD', default=''),
'authSource': config('MONGODB_AUTH_SOURCE', default='admin'),
'authMechanism': 'SCRAM-SHA-1',
}
}
}
# models.py
from mongoengine import Document, EmbeddedDocument, fields
from datetime import datetime
import uuid
class Address(EmbeddedDocument):
"""Embedded document for user addresses"""
street = fields.StringField(max_length=200, required=True)
city = fields.StringField(max_length=100, required=True)
state = fields.StringField(max_length=50)
postal_code = fields.StringField(max_length=20)
country = fields.StringField(max_length=50, required=True)
is_primary = fields.BooleanField(default=False)
class SocialLink(EmbeddedDocument):
"""Embedded document for social media links"""
platform = fields.StringField(max_length=50, required=True)
url = fields.URLField(required=True)
verified = fields.BooleanField(default=False)
class UserPreferences(EmbeddedDocument):
"""User preferences embedded document"""
theme = fields.StringField(max_length=20, default='light', choices=['light', 'dark'])
language = fields.StringField(max_length=10, default='en')
timezone = fields.StringField(max_length=50, default='UTC')
notifications = fields.DictField(default={
'email': True,
'push': True,
'sms': False
})
class User(Document):
"""User document model"""
# Use UUID as string for consistency with other services
user_id = fields.StringField(primary_key=True, default=lambda: str(uuid.uuid4()))
username = fields.StringField(max_length=150, required=True, unique=True)
email = fields.EmailField(required=True, unique=True)
password_hash = fields.StringField(required=True)
# Profile information
first_name = fields.StringField(max_length=100)
last_name = fields.StringField(max_length=100)
bio = fields.StringField(max_length=500)
avatar_url = fields.URLField()
date_of_birth = fields.DateTimeField()
# Embedded documents
addresses = fields.ListField(fields.EmbeddedDocumentField(Address))
social_links = fields.ListField(fields.EmbeddedDocumentField(SocialLink))
preferences = fields.EmbeddedDocumentField(UserPreferences, default=UserPreferences)
# Status and metadata
is_active = fields.BooleanField(default=True)
is_verified = fields.BooleanField(default=False)
verification_token = fields.StringField()
# Timestamps
created_at = fields.DateTimeField(default=datetime.utcnow)
updated_at = fields.DateTimeField(default=datetime.utcnow)
last_login = fields.DateTimeField()
# Indexing
meta = {
'collection': 'users',
'indexes': [
'email',
'username',
'created_at',
('email', 'is_active'),
{
'fields': ['$username', '$email', '$bio'],
'default_language': 'english',
'weights': {'username': 10, 'email': 5, 'bio': 2}
}
]
}
def save(self, *args, **kwargs):
"""Override save to update timestamp"""
self.updated_at = datetime.utcnow()
return super().save(*args, **kwargs)
def to_dict(self):
"""Convert document to dictionary"""
return {
'user_id': self.user_id,
'username': self.username,
'email': self.email,
'first_name': self.first_name,
'last_name': self.last_name,
'bio': self.bio,
'avatar_url': self.avatar_url,
'addresses': [addr.to_mongo().to_dict() for addr in self.addresses],
'social_links': [link.to_mongo().to_dict() for link in self.social_links],
'preferences': self.preferences.to_mongo().to_dict() if self.preferences else {},
'is_active': self.is_active,
'is_verified': self.is_verified,
'created_at': self.created_at.isoformat() if self.created_at else None,
'updated_at': self.updated_at.isoformat() if self.updated_at else None,
'last_login': self.last_login.isoformat() if self.last_login else None,
}
class UserSession(Document):
"""User session tracking"""
session_id = fields.StringField(primary_key=True, default=lambda: str(uuid.uuid4()))
user = fields.ReferenceField(User, required=True)
device_info = fields.DictField()
ip_address = fields.StringField()
user_agent = fields.StringField()
created_at = fields.DateTimeField(default=datetime.utcnow)
expires_at = fields.DateTimeField()
is_active = fields.BooleanField(default=True)
meta = {
'collection': 'user_sessions',
'indexes': [
'user',
'created_at',
'expires_at',
('user', 'is_active')
]
}
class UserActivity(Document):
"""User activity logging"""
activity_id = fields.StringField(primary_key=True, default=lambda: str(uuid.uuid4()))
user = fields.ReferenceField(User, required=True)
action = fields.StringField(required=True)
resource = fields.StringField()
metadata = fields.DictField()
ip_address = fields.StringField()
user_agent = fields.StringField()
timestamp = fields.DateTimeField(default=datetime.utcnow)
meta = {
'collection': 'user_activities',
'indexes': [
'user',
'timestamp',
'action',
('user', 'timestamp'),
('user', 'action')
]
}
# serializers.py
from rest_framework import serializers
from rest_framework_mongoengine import serializers as mongo_serializers
from .models import User, Address, SocialLink, UserPreferences
class AddressSerializer(serializers.Serializer):
street = serializers.CharField(max_length=200)
city = serializers.CharField(max_length=100)
state = serializers.CharField(max_length=50, required=False)
postal_code = serializers.CharField(max_length=20, required=False)
country = serializers.CharField(max_length=50)
is_primary = serializers.BooleanField(default=False)
class SocialLinkSerializer(serializers.Serializer):
platform = serializers.CharField(max_length=50)
url = serializers.URLField()
verified = serializers.BooleanField(default=False)
class UserPreferencesSerializer(serializers.Serializer):
theme = serializers.ChoiceField(choices=['light', 'dark'], default='light')
language = serializers.CharField(max_length=10, default='en')
timezone = serializers.CharField(max_length=50, default='UTC')
notifications = serializers.DictField(default={
'email': True,
'push': True,
'sms': False
})
class UserSerializer(mongo_serializers.DocumentSerializer):
addresses = AddressSerializer(many=True, required=False)
social_links = SocialLinkSerializer(many=True, required=False)
preferences = UserPreferencesSerializer(required=False)
password = serializers.CharField(write_only=True)
class Meta:
model = User
fields = [
'user_id', 'username', 'email', 'first_name', 'last_name',
'bio', 'avatar_url', 'date_of_birth', 'addresses', 'social_links',
'preferences', 'is_active', 'is_verified', 'created_at',
'updated_at', 'last_login', 'password'
]
read_only_fields = ['user_id', 'created_at', 'updated_at']
def create(self, validated_data):
password = validated_data.pop('password')
user = User(**validated_data)
user.password_hash = self.hash_password(password)
user.save()
return user
def update(self, instance, validated_data):
password = validated_data.pop('password', None)
if password:
instance.password_hash = self.hash_password(password)
for attr, value in validated_data.items():
setattr(instance, attr, value)
instance.save()
return instance
def hash_password(self, password):
from django.contrib.auth.hashers import make_password
return make_password(password)
class UserListSerializer(serializers.Serializer):
"""Lightweight serializer for user lists"""
user_id = serializers.CharField()
username = serializers.CharField()
email = serializers.EmailField()
first_name = serializers.CharField()
last_name = serializers.CharField()
avatar_url = serializers.URLField()
is_active = serializers.BooleanField()
created_at = serializers.DateTimeField()
# views.py
from rest_framework import status
from rest_framework.decorators import api_view, action
from rest_framework.response import Response
from rest_framework_mongoengine import viewsets
from mongoengine.queryset.visitor import Q
from mongoengine import DoesNotExist
from datetime import datetime, timedelta
from .models import User, UserActivity
from .serializers import UserSerializer, UserListSerializer
class UserViewSet(viewsets.ModelViewSet):
queryset = User.objects.all()
serializer_class = UserSerializer
def get_serializer_class(self):
if self.action == 'list':
return UserListSerializer
return UserSerializer
def get_queryset(self):
queryset = User.objects.all()
# Filtering
is_active = self.request.query_params.get('is_active')
if is_active is not None:
queryset = queryset.filter(is_active=is_active.lower() == 'true')
# Search
search = self.request.query_params.get('search')
if search:
queryset = queryset.filter(
Q(username__icontains=search) |
Q(email__icontains=search) |
Q(first_name__icontains=search) |
Q(last_name__icontains=search)
)
# Date range filtering
created_after = self.request.query_params.get('created_after')
if created_after:
try:
date = datetime.fromisoformat(created_after)
queryset = queryset.filter(created_at__gte=date)
except ValueError:
pass
return queryset.order_by('-created_at')
@action(detail=True, methods=['post'])
def add_address(self, request, pk=None):
"""Add address to user"""
try:
user = User.objects.get(user_id=pk)
address_data = request.data
# Validate address data
from .models import Address
address = Address(**address_data)
address.validate()
# If this is primary address, unset others
if address_data.get('is_primary', False):
for addr in user.addresses:
addr.is_primary = False
user.addresses.append(address)
user.save()
return Response({'message': 'Address added successfully'})
except DoesNotExist:
return Response({'error': 'User not found'}, status=status.HTTP_404_NOT_FOUND)
except Exception as e:
return Response({'error': str(e)}, status=status.HTTP_400_BAD_REQUEST)
@action(detail=True, methods=['get'])
def activity_summary(self, request, pk=None):
"""Get user activity summary"""
try:
user = User.objects.get(user_id=pk)
# Aggregation pipeline for activity summary
pipeline = [
{'$match': {'user': user.pk}},
{'$group': {
'_id': '$action',
'count': {'$sum': 1},
'last_activity': {'$max': '$timestamp'}
}},
{'$sort': {'count': -1}}
]
activities = list(UserActivity.objects.aggregate(pipeline))
return Response({
'user_id': user.user_id,
'activity_summary': activities,
'total_activities': UserActivity.objects.filter(user=user).count()
})
except DoesNotExist:
return Response({'error': 'User not found'}, status=status.HTTP_404_NOT_FOUND)
@action(detail=False, methods=['get'])
def analytics(self, request):
"""User analytics using MongoDB aggregation"""
pipeline = [
# Match active users
{'$match': {'is_active': True}},
# Group by creation month
{'$group': {
'_id': {
'year': {'$year': '$created_at'},
'month': {'$month': '$created_at'}
},
'user_count': {'$sum': 1},
'verified_count': {
'$sum': {'$cond': [{'$eq': ['$is_verified', True]}, 1, 0]}
}
}},
# Sort by date
{'$sort': {'_id.year': -1, '_id.month': -1}},
# Limit to last 12 months
{'$limit': 12}
]
analytics_data = list(User.objects.aggregate(pipeline))
return Response({
'monthly_registrations': analytics_data,
'total_users': User.objects.count(),
'active_users': User.objects.filter(is_active=True).count(),
'verified_users': User.objects.filter(is_verified=True).count()
})
@api_view(['GET'])
def user_search(request):
"""Advanced user search with text indexing"""
query = request.GET.get('q', '')
if not query:
return Response({'error': 'Query parameter q is required'},
status=status.HTTP_400_BAD_REQUEST)
# Use MongoDB text search
users = User.objects.search_text(query).order_by('$text_score')
# Limit results
limit = min(int(request.GET.get('limit', 20)), 100)
users = users[:limit]
serializer = UserListSerializer(users, many=True)
return Response({
'query': query,
'results': serializer.data,
'count': len(serializer.data)
})
# aggregation.py
from mongoengine import Document
from datetime import datetime, timedelta
class UserAnalytics:
"""Advanced analytics using MongoDB aggregation"""
@staticmethod
def user_engagement_report():
"""Generate user engagement report"""
pipeline = [
# Match users from last 30 days
{'$match': {
'created_at': {'$gte': datetime.utcnow() - timedelta(days=30)}
}},
# Lookup user activities
{'$lookup': {
'from': 'user_activities',
'localField': '_id',
'foreignField': 'user',
'as': 'activities'
}},
# Add computed fields
{'$addFields': {
'activity_count': {'$size': '$activities'},
'last_activity': {'$max': '$activities.timestamp'},
'unique_actions': {'$size': {'$setUnion': '$activities.action'}}
}},
# Group by engagement level
{'$group': {
'_id': {
'$switch': {
'branches': [
{'case': {'$gte': ['$activity_count', 50]}, 'then': 'high'},
{'case': {'$gte': ['$activity_count', 10]}, 'then': 'medium'},
{'case': {'$gt': ['$activity_count', 0]}, 'then': 'low'}
],
'default': 'inactive'
}
},
'user_count': {'$sum': 1},
'avg_activities': {'$avg': '$activity_count'},
'avg_unique_actions': {'$avg': '$unique_actions'}
}},
{'$sort': {'user_count': -1}}
]
return list(User.objects.aggregate(pipeline))
@staticmethod
def geographic_distribution():
"""Analyze user geographic distribution"""
pipeline = [
# Unwind addresses array
{'$unwind': '$addresses'},
# Match primary addresses only
{'$match': {'addresses.is_primary': True}},
# Group by country and state
{'$group': {
'_id': {
'country': '$addresses.country',
'state': '$addresses.state'
},
'user_count': {'$sum': 1}
}},
# Group by country
{'$group': {
'_id': '$_id.country',
'total_users': {'$sum': '$user_count'},
'states': {
'$push': {
'state': '$_id.state',
'user_count': '$user_count'
}
}
}},
{'$sort': {'total_users': -1}}
]
return list(User.objects.aggregate(pipeline))
@staticmethod
def user_preferences_analysis():
"""Analyze user preferences"""
pipeline = [
# Match users with preferences
{'$match': {'preferences': {'$exists': True}}},
# Group by theme preference
{'$group': {
'_id': '$preferences.theme',
'count': {'$sum': 1},
'languages': {'$addToSet': '$preferences.language'},
'avg_notifications': {
'$avg': {
'$add': [
{'$cond': [{'$eq': ['$preferences.notifications.email', True]}, 1, 0]},
{'$cond': [{'$eq': ['$preferences.notifications.push', True]}, 1, 0]},
{'$cond': [{'$eq': ['$preferences.notifications.sms', True]}, 1, 0]}
]
}
}
}},
{'$sort': {'count': -1}}
]
return list(User.objects.aggregate(pipeline))
# change_streams.py
import threading
from mongoengine import connect
from pymongo import MongoClient
from .models import User
from .messaging import publish_user_event
class UserChangeStreamHandler:
"""Handle MongoDB change streams for real-time updates"""
def __init__(self):
self.client = MongoClient('mongodb://localhost:27017/')
self.db = self.client.user_service_db
self.collection = self.db.users
self.running = False
def start_watching(self):
"""Start watching for changes"""
self.running = True
thread = threading.Thread(target=self._watch_changes)
thread.daemon = True
thread.start()
def stop_watching(self):
"""Stop watching for changes"""
self.running = False
def _watch_changes(self):
"""Watch for changes in user collection"""
try:
with self.collection.watch() as stream:
for change in stream:
if not self.running:
break
self._handle_change(change)
except Exception as e:
print(f"Change stream error: {e}")
def _handle_change(self, change):
"""Handle individual change events"""
operation_type = change['operationType']
if operation_type == 'insert':
self._handle_user_created(change)
elif operation_type == 'update':
self._handle_user_updated(change)
elif operation_type == 'delete':
self._handle_user_deleted(change)
def _handle_user_created(self, change):
"""Handle user creation"""
user_data = change['fullDocument']
# Publish event to message queue
publish_user_event('user.created', {
'user_id': user_data['_id'],
'username': user_data['username'],
'email': user_data['email'],
'timestamp': user_data['created_at']
})
print(f"User created: {user_data['username']}")
def _handle_user_updated(self, change):
"""Handle user updates"""
user_id = change['documentKey']['_id']
updated_fields = change.get('updateDescription', {}).get('updatedFields', {})
# Publish event for significant updates
if any(field in updated_fields for field in ['email', 'is_verified', 'is_active']):
publish_user_event('user.updated', {
'user_id': user_id,
'updated_fields': list(updated_fields.keys()),
'timestamp': datetime.utcnow().isoformat()
})
print(f"User updated: {user_id}")
def _handle_user_deleted(self, change):
"""Handle user deletion"""
user_id = change['documentKey']['_id']
publish_user_event('user.deleted', {
'user_id': user_id,
'timestamp': datetime.utcnow().isoformat()
})
print(f"User deleted: {user_id}")
# Start change stream handler
change_handler = UserChangeStreamHandler()
change_handler.start_watching()
# sharding.py
from pymongo import MongoClient
class MongoDBShardingSetup:
"""Setup MongoDB sharding for horizontal scaling"""
def __init__(self, config_servers, shard_servers, mongos_servers):
self.config_servers = config_servers
self.shard_servers = shard_servers
self.mongos_servers = mongos_servers
def setup_sharding(self):
"""Setup sharding configuration"""
# Connect to mongos
client = MongoClient(self.mongos_servers[0])
admin_db = client.admin
# Add shards
for shard in self.shard_servers:
try:
admin_db.command('addShard', shard)
print(f"Added shard: {shard}")
except Exception as e:
print(f"Error adding shard {shard}: {e}")
# Enable sharding for database
admin_db.command('enableSharding', 'user_service_db')
# Shard collections
self._shard_users_collection(admin_db)
self._shard_activities_collection(admin_db)
def _shard_users_collection(self, admin_db):
"""Shard users collection by user_id"""
admin_db.command({
'shardCollection': 'user_service_db.users',
'key': {'user_id': 1}
})
print("Sharded users collection")
def _shard_activities_collection(self, admin_db):
"""Shard activities collection by user and timestamp"""
admin_db.command({
'shardCollection': 'user_service_db.user_activities',
'key': {'user': 1, 'timestamp': 1}
})
print("Sharded user_activities collection")
# Usage
sharding_setup = MongoDBShardingSetup(
config_servers=['config1:27019', 'config2:27019', 'config3:27019'],
shard_servers=['shard1/shard1a:27018,shard1b:27018', 'shard2/shard2a:27018,shard2b:27018'],
mongos_servers=['mongos1:27017', 'mongos2:27017']
)
# optimization.py
from mongoengine import connect
from pymongo import IndexModel, TEXT, ASCENDING, DESCENDING
from .models import User, UserActivity
class MongoDBOptimizer:
"""MongoDB performance optimization utilities"""
@staticmethod
def create_indexes():
"""Create optimized indexes"""
# User collection indexes
User.ensure_indexes()
# Additional compound indexes
User._get_collection().create_indexes([
IndexModel([('email', ASCENDING), ('is_active', ASCENDING)]),
IndexModel([('created_at', DESCENDING), ('is_verified', ASCENDING)]),
IndexModel([('username', TEXT), ('email', TEXT), ('bio', TEXT)]),
])
# Activity collection indexes
UserActivity._get_collection().create_indexes([
IndexModel([('user', ASCENDING), ('timestamp', DESCENDING)]),
IndexModel([('action', ASCENDING), ('timestamp', DESCENDING)]),
IndexModel([('timestamp', DESCENDING)]),
])
print("Indexes created successfully")
@staticmethod
def analyze_query_performance():
"""Analyze query performance"""
# Explain query execution
explain_result = User.objects.filter(
is_active=True,
created_at__gte=datetime.utcnow() - timedelta(days=30)
).explain()
print("Query execution stats:")
print(f"Execution time: {explain_result['executionStats']['executionTimeMillis']}ms")
print(f"Documents examined: {explain_result['executionStats']['totalDocsExamined']}")
print(f"Documents returned: {explain_result['executionStats']['totalDocsReturned']}")
return explain_result
@staticmethod
def optimize_aggregation_pipeline(pipeline):
"""Optimize aggregation pipeline"""
# Add index hints and optimize stages
optimized_pipeline = []
# Move $match stages to the beginning
match_stages = [stage for stage in pipeline if '$match' in stage]
other_stages = [stage for stage in pipeline if '$match' not in stage]
optimized_pipeline.extend(match_stages)
optimized_pipeline.extend(other_stages)
return optimized_pipeline
# docker-compose.yml - MongoDB services
version: '3.8'
services:
# MongoDB Replica Set
mongo1:
image: mongo:7.0
command: mongod --replSet rs0 --bind_ip_all
environment:
MONGO_INITDB_ROOT_USERNAME: admin
MONGO_INITDB_ROOT_PASSWORD: password
volumes:
- mongo1_data:/data/db
- ./scripts/mongo-init.js:/docker-entrypoint-initdb.d/mongo-init.js
ports:
- "27017:27017"
networks:
- microservices-network
mongo2:
image: mongo:7.0
command: mongod --replSet rs0 --bind_ip_all
environment:
MONGO_INITDB_ROOT_USERNAME: admin
MONGO_INITDB_ROOT_PASSWORD: password
volumes:
- mongo2_data:/data/db
ports:
- "27018:27017"
networks:
- microservices-network
mongo3:
image: mongo:7.0
command: mongod --replSet rs0 --bind_ip_all
environment:
MONGO_INITDB_ROOT_USERNAME: admin
MONGO_INITDB_ROOT_PASSWORD: password
volumes:
- mongo3_data:/data/db
ports:
- "27019:27017"
networks:
- microservices-network
# MongoDB Express for administration
mongo-express:
image: mongo-express:latest
environment:
ME_CONFIG_MONGODB_ADMINUSERNAME: admin
ME_CONFIG_MONGODB_ADMINPASSWORD: password
ME_CONFIG_MONGODB_SERVER: mongo1
ME_CONFIG_BASICAUTH_USERNAME: admin
ME_CONFIG_BASICAUTH_PASSWORD: password
ports:
- "8081:8081"
depends_on:
- mongo1
networks:
- microservices-network
volumes:
mongo1_data:
mongo2_data:
mongo3_data:
networks:
microservices-network:
driver: bridge
// scripts/mongo-init.js
// Initialize replica set
rs.initiate({
_id: "rs0",
members: [
{ _id: 0, host: "mongo1:27017", priority: 2 },
{ _id: 1, host: "mongo2:27017", priority: 1 },
{ _id: 2, host: "mongo3:27017", priority: 1 }
]
});
// Create databases and users
use('user_service_db');
db.createUser({
user: 'user_service',
pwd: 'user_service_password',
roles: [{ role: 'readWrite', db: 'user_service_db' }]
});
use('product_service_db');
db.createUser({
user: 'product_service',
pwd: 'product_service_password',
roles: [{ role: 'readWrite', db: 'product_service_db' }]
});
MongoDB provides excellent support for microservices with its flexible schema, powerful aggregation framework, and cloud-native features. Key benefits include:
The combination of Django and MongoDB creates a robust foundation for microservices that can scale and adapt to changing requirements. In the next section, we'll explore creating RESTful APIs for microservices communication.
Setting Up the Development and Runtime Environment
Setting up a proper development and runtime environment is crucial for successful microservices development with Django. This section covers everything from local development setup to production-ready deployment configurations.
Creating RESTful APIs for Microservices
RESTful APIs are the backbone of microservices communication. This section covers designing, implementing, and optimizing REST APIs using Django REST Framework for microservices architecture.