RESTful APIs are the backbone of microservices communication. This section covers designing, implementing, and optimizing REST APIs using Django REST Framework for microservices architecture.
Design URLs around resources, not actions:
# Good - Resource-based URLs
GET /api/v1/users/ # Get all users
GET /api/v1/users/{id}/ # Get specific user
POST /api/v1/users/ # Create user
PUT /api/v1/users/{id}/ # Update user
DELETE /api/v1/users/{id}/ # Delete user
# Nested resources
GET /api/v1/users/{id}/orders/ # Get user's orders
POST /api/v1/users/{id}/orders/ # Create order for user
# Bad - Action-based URLs
POST /api/v1/createUser/
GET /api/v1/getUserById/{id}/
POST /api/v1/deleteUser/{id}/
# views.py
from rest_framework import status
from rest_framework.response import Response
from rest_framework.viewsets import ModelViewSet
class UserViewSet(ModelViewSet):
"""RESTful user management with proper HTTP methods"""
def create(self, request):
"""POST /api/v1/users/ - Create new user"""
serializer = self.get_serializer(data=request.data)
if serializer.is_valid():
user = serializer.save()
return Response(
serializer.data,
status=status.HTTP_201_CREATED,
headers={'Location': f'/api/v1/users/{user.id}/'}
)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def retrieve(self, request, pk=None):
"""GET /api/v1/users/{id}/ - Get specific user"""
try:
user = self.get_object()
serializer = self.get_serializer(user)
return Response(serializer.data, status=status.HTTP_200_OK)
except User.DoesNotExist:
return Response(
{'error': 'User not found'},
status=status.HTTP_404_NOT_FOUND
)
def update(self, request, pk=None):
"""PUT /api/v1/users/{id}/ - Full update"""
user = self.get_object()
serializer = self.get_serializer(user, data=request.data)
if serializer.is_valid():
serializer.save()
return Response(serializer.data, status=status.HTTP_200_OK)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def partial_update(self, request, pk=None):
"""PATCH /api/v1/users/{id}/ - Partial update"""
user = self.get_object()
serializer = self.get_serializer(user, data=request.data, partial=True)
if serializer.is_valid():
serializer.save()
return Response(serializer.data, status=status.HTTP_200_OK)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def destroy(self, request, pk=None):
"""DELETE /api/v1/users/{id}/ - Delete user"""
user = self.get_object()
user.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
# urls.py - URL-based versioning
from django.urls import path, include
from rest_framework.routers import DefaultRouter
from . import views
# Version 1
router_v1 = DefaultRouter()
router_v1.register(r'users', views.UserViewSetV1)
router_v1.register(r'products', views.ProductViewSetV1)
# Version 2
router_v2 = DefaultRouter()
router_v2.register(r'users', views.UserViewSetV2)
router_v2.register(r'products', views.ProductViewSetV2)
urlpatterns = [
path('api/v1/', include(router_v1.urls)),
path('api/v2/', include(router_v2.urls)),
]
# Alternative: Header-based versioning
# settings.py
REST_FRAMEWORK = {
'DEFAULT_VERSIONING_CLASS': 'rest_framework.versioning.AcceptHeaderVersioning',
'DEFAULT_VERSION': 'v1',
'ALLOWED_VERSIONS': ['v1', 'v2'],
'VERSION_PARAM': 'version',
}
# Usage in views
class UserViewSet(ModelViewSet):
def get_serializer_class(self):
if self.request.version == 'v2':
return UserSerializerV2
return UserSerializerV1
# filters.py
import django_filters
from django_filters import rest_framework as filters
from .models import User, Product
class UserFilter(filters.FilterSet):
"""Advanced filtering for users"""
username = filters.CharFilter(lookup_expr='icontains')
email = filters.CharFilter(lookup_expr='icontains')
is_active = filters.BooleanFilter()
created_after = filters.DateTimeFilter(field_name='created_at', lookup_expr='gte')
created_before = filters.DateTimeFilter(field_name='created_at', lookup_expr='lte')
age_min = filters.NumberFilter(method='filter_age_min')
age_max = filters.NumberFilter(method='filter_age_max')
class Meta:
model = User
fields = ['username', 'email', 'is_active', 'created_after', 'created_before']
def filter_age_min(self, queryset, name, value):
from datetime import date, timedelta
max_birth_date = date.today() - timedelta(days=value * 365)
return queryset.filter(date_of_birth__lte=max_birth_date)
def filter_age_max(self, queryset, name, value):
from datetime import date, timedelta
min_birth_date = date.today() - timedelta(days=value * 365)
return queryset.filter(date_of_birth__gte=min_birth_date)
class ProductFilter(filters.FilterSet):
"""Product filtering with price ranges"""
name = filters.CharFilter(lookup_expr='icontains')
category = filters.CharFilter(field_name='category__name', lookup_expr='icontains')
price_min = filters.NumberFilter(field_name='price', lookup_expr='gte')
price_max = filters.NumberFilter(field_name='price', lookup_expr='lte')
in_stock = filters.BooleanFilter(method='filter_in_stock')
class Meta:
model = Product
fields = ['name', 'category', 'price_min', 'price_max', 'in_stock']
def filter_in_stock(self, queryset, name, value):
if value:
return queryset.filter(inventory_count__gt=0)
return queryset.filter(inventory_count=0)
# views.py
from rest_framework import filters
from django_filters.rest_framework import DjangoFilterBackend
class UserViewSet(ModelViewSet):
queryset = User.objects.all()
serializer_class = UserSerializer
filter_backends = [
DjangoFilterBackend,
filters.SearchFilter,
filters.OrderingFilter
]
filterset_class = UserFilter
search_fields = ['username', 'email', 'first_name', 'last_name']
ordering_fields = ['created_at', 'username', 'email']
ordering = ['-created_at']
def get_queryset(self):
queryset = super().get_queryset()
# Custom filtering logic
location = self.request.query_params.get('location')
if location:
queryset = queryset.filter(
addresses__city__icontains=location
).distinct()
return queryset
# Usage examples:
# GET /api/v1/users/?username__icontains=john&is_active=true
# GET /api/v1/users/?search=john&ordering=-created_at
# GET /api/v1/users/?age_min=18&age_max=65&location=new york
# pagination.py
from rest_framework.pagination import PageNumberPagination, LimitOffsetPagination
from rest_framework.response import Response
class CustomPageNumberPagination(PageNumberPagination):
page_size = 20
page_size_query_param = 'page_size'
max_page_size = 100
def get_paginated_response(self, data):
return Response({
'pagination': {
'count': self.page.paginator.count,
'next': self.get_next_link(),
'previous': self.get_previous_link(),
'page_size': self.page_size,
'total_pages': self.page.paginator.num_pages,
'current_page': self.page.number,
},
'results': data
})
class CursorPagination(PageNumberPagination):
"""Cursor-based pagination for better performance on large datasets"""
page_size = 20
ordering = '-created_at'
cursor_query_param = 'cursor'
page_size_query_param = 'page_size'
def get_paginated_response(self, data):
return Response({
'next': self.get_next_link(),
'previous': self.get_previous_link(),
'results': data
})
# settings.py
REST_FRAMEWORK = {
'DEFAULT_PAGINATION_CLASS': 'myapp.pagination.CustomPageNumberPagination',
'PAGE_SIZE': 20
}
# views.py
class UserViewSet(ModelViewSet):
pagination_class = CustomPageNumberPagination
def list(self, request):
"""Custom list with metadata"""
queryset = self.filter_queryset(self.get_queryset())
page = self.paginate_queryset(queryset)
if page is not None:
serializer = self.get_serializer(page, many=True)
response = self.get_paginated_response(serializer.data)
# Add metadata
response.data['metadata'] = {
'total_active_users': User.objects.filter(is_active=True).count(),
'total_verified_users': User.objects.filter(is_verified=True).count(),
'generated_at': timezone.now().isoformat()
}
return response
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
# serializers.py
from rest_framework import serializers
from django.contrib.auth import get_user_model
from .models import User, Profile, Order, Product
User = get_user_model()
class DynamicFieldsModelSerializer(serializers.ModelSerializer):
"""Serializer that allows dynamic field selection"""
def __init__(self, *args, **kwargs):
# Extract fields parameter
fields = kwargs.pop('fields', None)
exclude = kwargs.pop('exclude', None)
super().__init__(*args, **kwargs)
if fields is not None:
# Drop any fields not specified in the `fields` argument
allowed = set(fields)
existing = set(self.fields)
for field_name in existing - allowed:
self.fields.pop(field_name)
if exclude is not None:
# Remove fields specified in exclude
for field_name in exclude:
self.fields.pop(field_name, None)
class UserSerializer(DynamicFieldsModelSerializer):
"""Optimized user serializer with dynamic fields"""
full_name = serializers.SerializerMethodField()
orders_count = serializers.SerializerMethodField()
profile = serializers.SerializerMethodField()
class Meta:
model = User
fields = [
'id', 'username', 'email', 'first_name', 'last_name',
'full_name', 'is_active', 'date_joined', 'orders_count', 'profile'
]
def get_full_name(self, obj):
return f"{obj.first_name} {obj.last_name}".strip()
def get_orders_count(self, obj):
# Use prefetch_related to avoid N+1 queries
if hasattr(obj, 'orders_count'):
return obj.orders_count
return obj.orders.count()
def get_profile(self, obj):
try:
profile = obj.profile
return {
'bio': profile.bio,
'avatar_url': profile.avatar_url,
'location': profile.location
}
except Profile.DoesNotExist:
return None
class UserListSerializer(serializers.ModelSerializer):
"""Lightweight serializer for list views"""
full_name = serializers.SerializerMethodField()
class Meta:
model = User
fields = ['id', 'username', 'email', 'full_name', 'is_active', 'date_joined']
def get_full_name(self, obj):
return f"{obj.first_name} {obj.last_name}".strip()
class UserDetailSerializer(UserSerializer):
"""Detailed serializer with related data"""
recent_orders = serializers.SerializerMethodField()
class Meta(UserSerializer.Meta):
fields = UserSerializer.Meta.fields + ['recent_orders']
def get_recent_orders(self, obj):
recent_orders = obj.orders.order_by('-created_at')[:5]
return OrderSerializer(recent_orders, many=True, context=self.context).data
# views.py
class UserViewSet(ModelViewSet):
queryset = User.objects.all()
def get_serializer_class(self):
if self.action == 'list':
return UserListSerializer
elif self.action == 'retrieve':
return UserDetailSerializer
return UserSerializer
def get_queryset(self):
queryset = super().get_queryset()
# Optimize queries based on action
if self.action == 'list':
# Only select needed fields for list view
queryset = queryset.only(
'id', 'username', 'email', 'first_name', 'last_name',
'is_active', 'date_joined'
)
# Add annotation for orders count
queryset = queryset.annotate(
orders_count=Count('orders')
)
elif self.action == 'retrieve':
# Prefetch related data for detail view
queryset = queryset.select_related('profile').prefetch_related(
Prefetch('orders', queryset=Order.objects.order_by('-created_at')[:5])
)
return queryset
def get_serializer(self, *args, **kwargs):
"""Support dynamic field selection"""
fields = self.request.query_params.get('fields')
if fields:
kwargs['fields'] = fields.split(',')
exclude = self.request.query_params.get('exclude')
if exclude:
kwargs['exclude'] = exclude.split(',')
return super().get_serializer(*args, **kwargs)
# Usage examples:
# GET /api/v1/users/?fields=id,username,email
# GET /api/v1/users/?exclude=orders_count,profile
# schema.py
from drf_spectacular.utils import extend_schema, OpenApiParameter, OpenApiExample
from drf_spectacular.types import OpenApiTypes
from rest_framework import status
class UserViewSet(ModelViewSet):
@extend_schema(
summary="List users",
description="Retrieve a paginated list of users with filtering and search capabilities",
parameters=[
OpenApiParameter(
name='search',
type=OpenApiTypes.STR,
location=OpenApiParameter.QUERY,
description='Search in username, email, first_name, last_name'
),
OpenApiParameter(
name='is_active',
type=OpenApiTypes.BOOL,
location=OpenApiParameter.QUERY,
description='Filter by active status'
),
OpenApiParameter(
name='created_after',
type=OpenApiTypes.DATETIME,
location=OpenApiParameter.QUERY,
description='Filter users created after this date'
),
OpenApiParameter(
name='fields',
type=OpenApiTypes.STR,
location=OpenApiParameter.QUERY,
description='Comma-separated list of fields to include'
),
],
responses={
200: UserListSerializer(many=True),
400: OpenApiExample(
'Bad Request',
value={'error': 'Invalid parameters'},
response_only=True
)
},
tags=['Users']
)
def list(self, request):
return super().list(request)
@extend_schema(
summary="Create user",
description="Create a new user account",
request=UserSerializer,
responses={
201: UserSerializer,
400: OpenApiExample(
'Validation Error',
value={
'username': ['This field is required.'],
'email': ['Enter a valid email address.']
},
response_only=True
)
},
examples=[
OpenApiExample(
'User Creation',
value={
'username': 'johndoe',
'email': 'john@example.com',
'first_name': 'John',
'last_name': 'Doe',
'password': 'securepassword123'
},
request_only=True
)
],
tags=['Users']
)
def create(self, request):
return super().create(request)
# settings.py
INSTALLED_APPS = [
# ...
'drf_spectacular',
]
REST_FRAMEWORK = {
'DEFAULT_SCHEMA_CLASS': 'drf_spectacular.openapi.AutoSchema',
}
SPECTACULAR_SETTINGS = {
'TITLE': 'Microservices API',
'DESCRIPTION': 'RESTful APIs for Django Microservices',
'VERSION': '1.0.0',
'SERVE_INCLUDE_SCHEMA': False,
'COMPONENT_SPLIT_REQUEST': True,
'SCHEMA_PATH_PREFIX': '/api/v[0-9]',
}
# urls.py
from drf_spectacular.views import SpectacularAPIView, SpectacularSwaggerView
urlpatterns = [
path('api/schema/', SpectacularAPIView.as_view(), name='schema'),
path('api/docs/', SpectacularSwaggerView.as_view(url_name='schema'), name='swagger-ui'),
]
# service_client.py
import requests
import logging
from django.conf import settings
from django.core.cache import cache
from typing import Optional, Dict, Any
import json
logger = logging.getLogger(__name__)
class ServiceClient:
"""HTTP client for inter-service communication"""
def __init__(self, service_name: str, base_url: str = None):
self.service_name = service_name
self.base_url = base_url or self._discover_service(service_name)
self.session = requests.Session()
self.session.headers.update({
'Content-Type': 'application/json',
'X-Service-Token': settings.SERVICE_SECRET_TOKEN,
'User-Agent': f'{settings.SERVICE_NAME}/1.0'
})
def _discover_service(self, service_name: str) -> str:
"""Discover service URL from service registry"""
# Try cache first
cache_key = f"service_url:{service_name}"
url = cache.get(cache_key)
if not url:
# Discover from Consul or other service registry
from .service_discovery import ConsulServiceDiscovery
discovery = ConsulServiceDiscovery()
url = discovery.discover_service(service_name)
if url:
cache.set(cache_key, url, timeout=300) # Cache for 5 minutes
return url or f"http://{service_name}:8000"
def get(self, endpoint: str, params: Dict = None, timeout: int = 30) -> Optional[Dict]:
"""GET request to service"""
return self._request('GET', endpoint, params=params, timeout=timeout)
def post(self, endpoint: str, data: Dict = None, timeout: int = 30) -> Optional[Dict]:
"""POST request to service"""
return self._request('POST', endpoint, json=data, timeout=timeout)
def put(self, endpoint: str, data: Dict = None, timeout: int = 30) -> Optional[Dict]:
"""PUT request to service"""
return self._request('PUT', endpoint, json=data, timeout=timeout)
def delete(self, endpoint: str, timeout: int = 30) -> bool:
"""DELETE request to service"""
response = self._request('DELETE', endpoint, timeout=timeout)
return response is not None
def _request(self, method: str, endpoint: str, **kwargs) -> Optional[Dict]:
"""Make HTTP request with error handling"""
url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}"
try:
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
if response.status_code == 204: # No Content
return {}
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"Service call failed: {method} {url} - {e}")
return None
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON response from {url}: {e}")
return None
# Usage in views
class OrderViewSet(ModelViewSet):
def create(self, request):
"""Create order with user and product validation"""
user_id = request.data.get('user_id')
product_ids = request.data.get('product_ids', [])
# Validate user exists
user_client = ServiceClient('user-service')
user_data = user_client.get(f'api/v1/users/{user_id}/')
if not user_data:
return Response(
{'error': 'User not found'},
status=status.HTTP_400_BAD_REQUEST
)
# Validate products exist and get pricing
product_client = ServiceClient('product-service')
products_data = []
total_amount = 0
for product_id in product_ids:
product_data = product_client.get(f'api/v1/products/{product_id}/')
if not product_data:
return Response(
{'error': f'Product {product_id} not found'},
status=status.HTTP_400_BAD_REQUEST
)
products_data.append(product_data)
total_amount += float(product_data['price'])
# Create order
order_data = {
'user_id': user_id,
'user_email': user_data['email'],
'products': products_data,
'total_amount': total_amount,
'status': 'pending'
}
serializer = self.get_serializer(data=order_data)
if serializer.is_valid():
order = serializer.save()
# Notify other services
self._notify_order_created(order, user_data, products_data)
return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def _notify_order_created(self, order, user_data, products_data):
"""Notify other services about order creation"""
# Update inventory
product_client = ServiceClient('product-service')
for product in products_data:
product_client.post(f'api/v1/products/{product["id"]}/reserve/', {
'quantity': 1,
'order_id': order.id
})
# Send notification
notification_client = ServiceClient('notification-service')
notification_client.post('api/v1/notifications/', {
'user_id': user_data['id'],
'type': 'order_created',
'message': f'Your order #{order.id} has been created',
'metadata': {'order_id': order.id}
})
# circuit_breaker.py
import time
import threading
from enum import Enum
from typing import Callable, Any
from functools import wraps
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
"""Circuit breaker for service calls"""
def __init__(self, failure_threshold: int = 5, timeout: int = 60, expected_exception: Exception = Exception):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
self.lock = threading.Lock()
def __call__(self, func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
return self.call(func, *args, **kwargs)
return wrapper
def call(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function with circuit breaker protection"""
with self.lock:
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise e
def _should_attempt_reset(self) -> bool:
"""Check if enough time has passed to attempt reset"""
return (
self.last_failure_time and
time.time() - self.last_failure_time >= self.timeout
)
def _on_success(self):
"""Handle successful call"""
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
"""Handle failed call"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
# Usage
class ResilientServiceClient(ServiceClient):
"""Service client with circuit breaker"""
def __init__(self, service_name: str, base_url: str = None):
super().__init__(service_name, base_url)
self.circuit_breaker = CircuitBreaker(
failure_threshold=3,
timeout=30,
expected_exception=requests.RequestException
)
@property
def get(self):
return self.circuit_breaker(super().get)
@property
def post(self):
return self.circuit_breaker(super().post)
@property
def put(self):
return self.circuit_breaker(super().put)
@property
def delete(self):
return self.circuit_breaker(super().delete)
# throttling.py
from rest_framework.throttling import UserRateThrottle, AnonRateThrottle
from django.core.cache import cache
import time
class ServiceRateThrottle(UserRateThrottle):
"""Rate limiting for inter-service communication"""
scope = 'service'
def get_cache_key(self, request, view):
# Use service token for identification
service_token = request.META.get('HTTP_X_SERVICE_TOKEN')
if service_token:
return f"throttle_service_{service_token}"
return super().get_cache_key(request, view)
class BurstRateThrottle(UserRateThrottle):
"""Allow burst requests with sustained rate limiting"""
scope = 'burst'
def allow_request(self, request, view):
# Allow burst of requests, then apply sustained rate
cache_key = self.get_cache_key(request, view)
if not cache_key:
return True
now = time.time()
burst_key = f"{cache_key}_burst"
sustained_key = f"{cache_key}_sustained"
# Check burst limit (e.g., 100 requests in 1 minute)
burst_requests = cache.get(burst_key, [])
burst_requests = [req_time for req_time in burst_requests if now - req_time < 60]
if len(burst_requests) >= 100:
return False
# Check sustained limit (e.g., 1000 requests per hour)
sustained_requests = cache.get(sustained_key, [])
sustained_requests = [req_time for req_time in sustained_requests if now - req_time < 3600]
if len(sustained_requests) >= 1000:
return False
# Update counters
burst_requests.append(now)
sustained_requests.append(now)
cache.set(burst_key, burst_requests, 60)
cache.set(sustained_key, sustained_requests, 3600)
return True
# settings.py
REST_FRAMEWORK = {
'DEFAULT_THROTTLE_CLASSES': [
'rest_framework.throttling.AnonRateThrottle',
'rest_framework.throttling.UserRateThrottle',
'myapp.throttling.ServiceRateThrottle',
],
'DEFAULT_THROTTLE_RATES': {
'anon': '100/hour',
'user': '1000/hour',
'service': '10000/hour',
'burst': '100/min',
}
}
# views.py
from rest_framework.throttling import UserRateThrottle
class UserViewSet(ModelViewSet):
throttle_classes = [BurstRateThrottle, ServiceRateThrottle]
throttle_scope = 'user'
def get_throttles(self):
"""Apply different throttling based on request type"""
if self.request.META.get('HTTP_X_SERVICE_TOKEN'):
# Inter-service requests get higher limits
return [ServiceRateThrottle()]
return super().get_throttles()
# tests/test_api.py
from rest_framework.test import APITestCase
from rest_framework import status
from django.contrib.auth import get_user_model
from unittest.mock import patch, Mock
User = get_user_model()
class UserAPITestCase(APITestCase):
"""Test user API endpoints"""
def setUp(self):
self.user = User.objects.create_user(
username='testuser',
email='test@example.com',
password='testpass123'
)
self.client.force_authenticate(user=self.user)
def test_list_users(self):
"""Test user list endpoint"""
response = self.client.get('/api/v1/users/')
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn('results', response.data)
self.assertIn('pagination', response.data)
def test_create_user(self):
"""Test user creation"""
data = {
'username': 'newuser',
'email': 'new@example.com',
'password': 'newpass123',
'first_name': 'New',
'last_name': 'User'
}
response = self.client.post('/api/v1/users/', data)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
self.assertEqual(response.data['username'], 'newuser')
def test_user_filtering(self):
"""Test user filtering"""
response = self.client.get('/api/v1/users/?is_active=true&search=test')
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_dynamic_fields(self):
"""Test dynamic field selection"""
response = self.client.get('/api/v1/users/?fields=id,username,email')
self.assertEqual(response.status_code, status.HTTP_200_OK)
user_data = response.data['results'][0]
self.assertIn('id', user_data)
self.assertIn('username', user_data)
self.assertIn('email', user_data)
self.assertNotIn('first_name', user_data)
@patch('myapp.service_client.ServiceClient.get')
def test_inter_service_communication(self, mock_service_get):
"""Test inter-service API calls"""
# Mock service response
mock_service_get.return_value = {
'id': 1,
'name': 'Test Product',
'price': 99.99
}
# Test endpoint that calls another service
response = self.client.post('/api/v1/orders/', {
'user_id': self.user.id,
'product_ids': [1]
})
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
mock_service_get.assert_called_once()
def test_rate_limiting(self):
"""Test API rate limiting"""
# Make multiple requests to trigger rate limiting
for i in range(105): # Exceed the limit
response = self.client.get('/api/v1/users/')
if i < 100:
self.assertEqual(response.status_code, status.HTTP_200_OK)
else:
self.assertEqual(response.status_code, status.HTTP_429_TOO_MANY_REQUESTS)
RESTful APIs are essential for microservices communication. Key principles include:
Well-designed APIs enable loose coupling between services while maintaining clear contracts for communication. In the next section, we'll explore orchestrating microservices with Celery and RabbitMQ.
Cloud-native Data Processing with MongoDB
MongoDB is an excellent choice for microservices due to its flexible schema, horizontal scaling capabilities, and cloud-native features. This section explores how to integrate MongoDB with Django microservices for efficient data processing and storage.
Orchestrating Microservices with Celery and RabbitMQ
Microservices architecture often requires asynchronous task processing and inter-service communication. Celery, combined with RabbitMQ as a message broker, provides a robust solution for orchestrating tasks across your Django microservices ecosystem.