Advanced and Expert Topics

Integrating Microservices

Microservices architecture breaks down monolithic applications into smaller, independent services that communicate over well-defined APIs. This guide covers strategies for integrating Django applications with microservices, including service communication patterns, data consistency, and deployment considerations.

Integrating Microservices

Microservices architecture breaks down monolithic applications into smaller, independent services that communicate over well-defined APIs. This guide covers strategies for integrating Django applications with microservices, including service communication patterns, data consistency, and deployment considerations.

Microservices Architecture Patterns

Service Communication

# services/base.py
import requests
import logging
from typing import Dict, Any, Optional
from django.conf import settings
from django.core.cache import cache
import json
import time

logger = logging.getLogger(__name__)

class ServiceClient:
    """Base client for microservice communication"""
    
    def __init__(self, service_name: str, base_url: str, timeout: int = 30):
        self.service_name = service_name
        self.base_url = base_url.rstrip('/')
        self.timeout = timeout
        self.session = requests.Session()
        
        # Configure authentication
        self._configure_auth()
        
        # Configure retries
        from requests.adapters import HTTPAdapter
        from urllib3.util.retry import Retry
        
        retry_strategy = Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504],
        )
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.session.mount("http://", adapter)
        self.session.mount("https://", adapter)
    
    def _configure_auth(self):
        """Configure service authentication"""
        api_key = getattr(settings, f'{self.service_name.upper()}_API_KEY', None)
        if api_key:
            self.session.headers.update({'Authorization': f'Bearer {api_key}'})
    
    def get(self, endpoint: str, params: Dict = None, **kwargs) -> Dict[str, Any]:
        """Make GET request to service"""
        return self._request('GET', endpoint, params=params, **kwargs)
    
    def post(self, endpoint: str, data: Dict = None, **kwargs) -> Dict[str, Any]:
        """Make POST request to service"""
        return self._request('POST', endpoint, json=data, **kwargs)
    
    def put(self, endpoint: str, data: Dict = None, **kwargs) -> Dict[str, Any]:
        """Make PUT request to service"""
        return self._request('PUT', endpoint, json=data, **kwargs)
    
    def delete(self, endpoint: str, **kwargs) -> Dict[str, Any]:
        """Make DELETE request to service"""
        return self._request('DELETE', endpoint, **kwargs)
    
    def _request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
        """Make HTTP request with error handling"""
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        
        # Add default timeout
        kwargs.setdefault('timeout', self.timeout)
        
        # Add request ID for tracing
        kwargs.setdefault('headers', {})
        kwargs['headers']['X-Request-ID'] = self._generate_request_id()
        
        try:
            logger.info(f"Making {method} request to {url}")
            start_time = time.time()
            
            response = self.session.request(method, url, **kwargs)
            
            duration = time.time() - start_time
            logger.info(f"Request completed in {duration:.3f}s with status {response.status_code}")
            
            response.raise_for_status()
            
            # Handle different content types
            if response.headers.get('content-type', '').startswith('application/json'):
                return response.json()
            else:
                return {'content': response.text, 'status_code': response.status_code}
        
        except requests.exceptions.Timeout:
            logger.error(f"Timeout calling {self.service_name} service: {url}")
            raise ServiceTimeoutError(f"Timeout calling {self.service_name}")
        
        except requests.exceptions.ConnectionError:
            logger.error(f"Connection error calling {self.service_name} service: {url}")
            raise ServiceConnectionError(f"Cannot connect to {self.service_name}")
        
        except requests.exceptions.HTTPError as e:
            logger.error(f"HTTP error calling {self.service_name}: {e}")
            raise ServiceHTTPError(f"HTTP error from {self.service_name}: {e}")
        
        except Exception as e:
            logger.error(f"Unexpected error calling {self.service_name}: {e}")
            raise ServiceError(f"Error calling {self.service_name}: {e}")
    
    def _generate_request_id(self) -> str:
        """Generate unique request ID for tracing"""
        import uuid
        return str(uuid.uuid4())

# Custom exceptions
class ServiceError(Exception):
    """Base exception for service errors"""
    pass

class ServiceTimeoutError(ServiceError):
    """Service timeout error"""
    pass

class ServiceConnectionError(ServiceError):
    """Service connection error"""
    pass

class ServiceHTTPError(ServiceError):
    """Service HTTP error"""
    pass

# Specific service clients
class UserServiceClient(ServiceClient):
    """Client for User microservice"""
    
    def __init__(self):
        super().__init__(
            service_name='user_service',
            base_url=settings.USER_SERVICE_URL,
            timeout=settings.USER_SERVICE_TIMEOUT
        )
    
    def get_user(self, user_id: int) -> Dict[str, Any]:
        """Get user by ID"""
        cache_key = f"user_service:user:{user_id}"
        cached_user = cache.get(cache_key)
        
        if cached_user:
            return cached_user
        
        user_data = self.get(f'users/{user_id}')
        
        # Cache for 5 minutes
        cache.set(cache_key, user_data, 300)
        
        return user_data
    
    def create_user(self, user_data: Dict[str, Any]) -> Dict[str, Any]:
        """Create new user"""
        result = self.post('users', user_data)
        
        # Invalidate related caches
        self._invalidate_user_caches()
        
        return result
    
    def update_user(self, user_id: int, user_data: Dict[str, Any]) -> Dict[str, Any]:
        """Update user"""
        result = self.put(f'users/{user_id}', user_data)
        
        # Invalidate user cache
        cache_key = f"user_service:user:{user_id}"
        cache.delete(cache_key)
        
        return result
    
    def search_users(self, query: str, limit: int = 20) -> Dict[str, Any]:
        """Search users"""
        return self.get('users/search', params={'q': query, 'limit': limit})
    
    def _invalidate_user_caches(self):
        """Invalidate user-related caches"""
        # This would need a more sophisticated cache invalidation strategy
        pass

class OrderServiceClient(ServiceClient):
    """Client for Order microservice"""
    
    def __init__(self):
        super().__init__(
            service_name='order_service',
            base_url=settings.ORDER_SERVICE_URL,
            timeout=settings.ORDER_SERVICE_TIMEOUT
        )
    
    def create_order(self, order_data: Dict[str, Any]) -> Dict[str, Any]:
        """Create new order"""
        return self.post('orders', order_data)
    
    def get_order(self, order_id: str) -> Dict[str, Any]:
        """Get order by ID"""
        return self.get(f'orders/{order_id}')
    
    def get_user_orders(self, user_id: int, limit: int = 10) -> Dict[str, Any]:
        """Get orders for user"""
        return self.get(f'users/{user_id}/orders', params={'limit': limit})
    
    def update_order_status(self, order_id: str, status: str) -> Dict[str, Any]:
        """Update order status"""
        return self.put(f'orders/{order_id}/status', {'status': status})

class PaymentServiceClient(ServiceClient):
    """Client for Payment microservice"""
    
    def __init__(self):
        super().__init__(
            service_name='payment_service',
            base_url=settings.PAYMENT_SERVICE_URL,
            timeout=settings.PAYMENT_SERVICE_TIMEOUT
        )
    
    def process_payment(self, payment_data: Dict[str, Any]) -> Dict[str, Any]:
        """Process payment"""
        return self.post('payments', payment_data)
    
    def get_payment(self, payment_id: str) -> Dict[str, Any]:
        """Get payment by ID"""
        return self.get(f'payments/{payment_id}')
    
    def refund_payment(self, payment_id: str, amount: float = None) -> Dict[str, Any]:
        """Refund payment"""
        data = {}
        if amount:
            data['amount'] = amount
        return self.post(f'payments/{payment_id}/refund', data)

Service Registry and Discovery

# services/registry.py
import consul
import logging
from typing import Dict, List, Optional
from django.conf import settings
from django.core.cache import cache

logger = logging.getLogger(__name__)

class ServiceRegistry:
    """Service registry using Consul"""
    
    def __init__(self):
        self.consul = consul.Consul(
            host=getattr(settings, 'CONSUL_HOST', 'localhost'),
            port=getattr(settings, 'CONSUL_PORT', 8500)
        )
    
    def register_service(self, name: str, address: str, port: int, 
                        health_check_url: str = None, tags: List[str] = None):
        """Register service with registry"""
        service_id = f"{name}-{address}-{port}"
        
        service_config = {
            'name': name,
            'service_id': service_id,
            'address': address,
            'port': port,
            'tags': tags or [],
        }
        
        if health_check_url:
            service_config['check'] = {
                'http': health_check_url,
                'interval': '10s',
                'timeout': '5s',
            }
        
        try:
            self.consul.agent.service.register(**service_config)
            logger.info(f"Registered service {name} at {address}:{port}")
        except Exception as e:
            logger.error(f"Failed to register service {name}: {e}")
    
    def deregister_service(self, service_id: str):
        """Deregister service from registry"""
        try:
            self.consul.agent.service.deregister(service_id)
            logger.info(f"Deregistered service {service_id}")
        except Exception as e:
            logger.error(f"Failed to deregister service {service_id}: {e}")
    
    def discover_service(self, service_name: str) -> Optional[Dict[str, Any]]:
        """Discover service instances"""
        cache_key = f"service_discovery:{service_name}"
        cached_result = cache.get(cache_key)
        
        if cached_result:
            return cached_result
        
        try:
            _, services = self.consul.health.service(service_name, passing=True)
            
            if not services:
                logger.warning(f"No healthy instances found for service {service_name}")
                return None
            
            # Return first healthy instance
            service = services[0]
            result = {
                'address': service['Service']['Address'],
                'port': service['Service']['Port'],
                'tags': service['Service']['Tags'],
            }
            
            # Cache for 30 seconds
            cache.set(cache_key, result, 30)
            
            return result
        
        except Exception as e:
            logger.error(f"Failed to discover service {service_name}: {e}")
            return None
    
    def get_service_url(self, service_name: str) -> Optional[str]:
        """Get service URL"""
        service_info = self.discover_service(service_name)
        if service_info:
            return f"http://{service_info['address']}:{service_info['port']}"
        return None

# Service discovery integration
class DiscoveryServiceClient(ServiceClient):
    """Service client with automatic discovery"""
    
    def __init__(self, service_name: str, timeout: int = 30):
        self.registry = ServiceRegistry()
        
        # Discover service URL
        base_url = self.registry.get_service_url(service_name)
        if not base_url:
            raise ServiceConnectionError(f"Cannot discover service {service_name}")
        
        super().__init__(service_name, base_url, timeout)
    
    def _request(self, method: str, endpoint: str, **kwargs):
        """Override request to handle service discovery failures"""
        try:
            return super()._request(method, endpoint, **kwargs)
        except ServiceConnectionError:
            # Try to rediscover service
            logger.info(f"Rediscovering service {self.service_name}")
            new_url = self.registry.get_service_url(self.service_name)
            
            if new_url and new_url != self.base_url:
                self.base_url = new_url
                return super()._request(method, endpoint, **kwargs)
            
            raise

# Django integration
class ServiceRegistryMiddleware:
    """Middleware to register Django service"""
    
    def __init__(self, get_response):
        self.get_response = get_response
        self.registry = ServiceRegistry()
        self.registered = False
    
    def __call__(self, request):
        # Register service on first request
        if not self.registered:
            self._register_service()
            self.registered = True
        
        response = self.get_response(request)
        return response
    
    def _register_service(self):
        """Register this Django service"""
        service_name = getattr(settings, 'SERVICE_NAME', 'django-app')
        service_host = getattr(settings, 'SERVICE_HOST', 'localhost')
        service_port = getattr(settings, 'SERVICE_PORT', 8000)
        health_check_url = f"http://{service_host}:{service_port}/health/"
        
        self.registry.register_service(
            name=service_name,
            address=service_host,
            port=service_port,
            health_check_url=health_check_url,
            tags=['django', 'web']
        )

Event-Driven Communication

# services/events.py
import json
import logging
from typing import Dict, Any, Callable
from django.conf import settings
import pika
import redis

logger = logging.getLogger(__name__)

class EventBus:
    """Base class for event bus implementations"""
    
    def publish(self, event_type: str, data: Dict[str, Any], routing_key: str = None):
        """Publish event"""
        raise NotImplementedError
    
    def subscribe(self, event_type: str, handler: Callable, routing_key: str = None):
        """Subscribe to event"""
        raise NotImplementedError

class RabbitMQEventBus(EventBus):
    """RabbitMQ-based event bus"""
    
    def __init__(self):
        self.connection_params = pika.ConnectionParameters(
            host=getattr(settings, 'RABBITMQ_HOST', 'localhost'),
            port=getattr(settings, 'RABBITMQ_PORT', 5672),
            virtual_host=getattr(settings, 'RABBITMQ_VHOST', '/'),
            credentials=pika.PlainCredentials(
                getattr(settings, 'RABBITMQ_USER', 'guest'),
                getattr(settings, 'RABBITMQ_PASSWORD', 'guest')
            )
        )
        self.exchange_name = getattr(settings, 'RABBITMQ_EXCHANGE', 'events')
    
    def publish(self, event_type: str, data: Dict[str, Any], routing_key: str = None):
        """Publish event to RabbitMQ"""
        try:
            connection = pika.BlockingConnection(self.connection_params)
            channel = connection.channel()
            
            # Declare exchange
            channel.exchange_declare(
                exchange=self.exchange_name,
                exchange_type='topic',
                durable=True
            )
            
            # Prepare message
            message = {
                'event_type': event_type,
                'data': data,
                'timestamp': time.time(),
                'service': getattr(settings, 'SERVICE_NAME', 'django-app')
            }
            
            # Publish message
            channel.basic_publish(
                exchange=self.exchange_name,
                routing_key=routing_key or event_type,
                body=json.dumps(message),
                properties=pika.BasicProperties(
                    delivery_mode=2,  # Make message persistent
                    content_type='application/json'
                )
            )
            
            connection.close()
            logger.info(f"Published event {event_type} with routing key {routing_key}")
        
        except Exception as e:
            logger.error(f"Failed to publish event {event_type}: {e}")
            raise
    
    def subscribe(self, event_type: str, handler: Callable, routing_key: str = None):
        """Subscribe to events"""
        try:
            connection = pika.BlockingConnection(self.connection_params)
            channel = connection.channel()
            
            # Declare exchange
            channel.exchange_declare(
                exchange=self.exchange_name,
                exchange_type='topic',
                durable=True
            )
            
            # Declare queue
            queue_name = f"{getattr(settings, 'SERVICE_NAME', 'django-app')}_{event_type}"
            channel.queue_declare(queue=queue_name, durable=True)
            
            # Bind queue to exchange
            channel.queue_bind(
                exchange=self.exchange_name,
                queue=queue_name,
                routing_key=routing_key or event_type
            )
            
            # Set up consumer
            def callback(ch, method, properties, body):
                try:
                    message = json.loads(body)
                    handler(message)
                    ch.basic_ack(delivery_tag=method.delivery_tag)
                except Exception as e:
                    logger.error(f"Error processing event {event_type}: {e}")
                    ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
            
            channel.basic_consume(queue=queue_name, on_message_callback=callback)
            
            logger.info(f"Subscribed to event {event_type}")
            channel.start_consuming()
        
        except Exception as e:
            logger.error(f"Failed to subscribe to event {event_type}: {e}")
            raise

class RedisEventBus(EventBus):
    """Redis-based event bus using pub/sub"""
    
    def __init__(self):
        self.redis_client = redis.Redis(
            host=getattr(settings, 'REDIS_HOST', 'localhost'),
            port=getattr(settings, 'REDIS_PORT', 6379),
            db=getattr(settings, 'REDIS_EVENTS_DB', 0),
            decode_responses=True
        )
    
    def publish(self, event_type: str, data: Dict[str, Any], routing_key: str = None):
        """Publish event to Redis"""
        try:
            message = {
                'event_type': event_type,
                'data': data,
                'timestamp': time.time(),
                'service': getattr(settings, 'SERVICE_NAME', 'django-app')
            }
            
            channel = routing_key or event_type
            self.redis_client.publish(channel, json.dumps(message))
            
            logger.info(f"Published event {event_type} to channel {channel}")
        
        except Exception as e:
            logger.error(f"Failed to publish event {event_type}: {e}")
            raise
    
    def subscribe(self, event_type: str, handler: Callable, routing_key: str = None):
        """Subscribe to events"""
        try:
            pubsub = self.redis_client.pubsub()
            channel = routing_key or event_type
            pubsub.subscribe(channel)
            
            logger.info(f"Subscribed to event {event_type} on channel {channel}")
            
            for message in pubsub.listen():
                if message['type'] == 'message':
                    try:
                        event_data = json.loads(message['data'])
                        handler(event_data)
                    except Exception as e:
                        logger.error(f"Error processing event {event_type}: {e}")
        
        except Exception as e:
            logger.error(f"Failed to subscribe to event {event_type}: {e}")
            raise

# Event handlers
class EventHandler:
    """Base event handler"""
    
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
    
    def handle_user_created(self, event_data: Dict[str, Any]):
        """Handle user created event"""
        user_data = event_data['data']
        logger.info(f"User created: {user_data['user_id']}")
        
        # Perform local actions
        self._create_user_profile(user_data)
        self._send_welcome_email(user_data)
    
    def handle_order_created(self, event_data: Dict[str, Any]):
        """Handle order created event"""
        order_data = event_data['data']
        logger.info(f"Order created: {order_data['order_id']}")
        
        # Update local statistics
        self._update_order_statistics(order_data)
        
        # Trigger inventory check
        self._check_inventory_levels(order_data['items'])
    
    def _create_user_profile(self, user_data: Dict[str, Any]):
        """Create local user profile"""
        from myapp.models import UserProfile
        
        UserProfile.objects.get_or_create(
            user_id=user_data['user_id'],
            defaults={
                'email': user_data['email'],
                'created_from_service': True
            }
        )
    
    def _send_welcome_email(self, user_data: Dict[str, Any]):
        """Send welcome email"""
        # Implementation would send email
        pass
    
    def _update_order_statistics(self, order_data: Dict[str, Any]):
        """Update order statistics"""
        # Implementation would update statistics
        pass
    
    def _check_inventory_levels(self, items: List[Dict[str, Any]]):
        """Check inventory levels"""
        # Implementation would check inventory
        pass

# Django integration
def setup_event_handlers():
    """Setup event handlers"""
    event_bus = RabbitMQEventBus()  # or RedisEventBus()
    handler = EventHandler(event_bus)
    
    # Subscribe to events
    event_bus.subscribe('user.created', handler.handle_user_created)
    event_bus.subscribe('order.created', handler.handle_order_created)

# Management command to start event consumer
from django.core.management.base import BaseCommand

class Command(BaseCommand):
    """Start event consumer"""
    
    help = 'Start consuming events from message bus'
    
    def handle(self, *args, **options):
        self.stdout.write("Starting event consumer...")
        setup_event_handlers()

Data Consistency Patterns

Saga Pattern Implementation

# services/saga.py
import logging
from typing import Dict, Any, List, Callable
from enum import Enum
from django.db import models, transaction
import json

logger = logging.getLogger(__name__)

class SagaStatus(models.TextChoices):
    PENDING = 'pending', 'Pending'
    RUNNING = 'running', 'Running'
    COMPLETED = 'completed', 'Completed'
    FAILED = 'failed', 'Failed'
    COMPENSATING = 'compensating', 'Compensating'
    COMPENSATED = 'compensated', 'Compensated'

class SagaExecution(models.Model):
    """Model to track saga execution"""
    
    saga_id = models.CharField(max_length=100, unique=True)
    saga_type = models.CharField(max_length=100)
    status = models.CharField(max_length=20, choices=SagaStatus.choices, default=SagaStatus.PENDING)
    current_step = models.IntegerField(default=0)
    context_data = models.JSONField(default=dict)
    error_message = models.TextField(blank=True)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    
    class Meta:
        indexes = [
            models.Index(fields=['saga_type', 'status']),
            models.Index(fields=['created_at']),
        ]

class SagaStep:
    """Individual step in a saga"""
    
    def __init__(self, name: str, action: Callable, compensation: Callable = None):
        self.name = name
        self.action = action
        self.compensation = compensation
    
    def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        """Execute the step"""
        return self.action(context)
    
    def compensate(self, context: Dict[str, Any]) -> Dict[str, Any]:
        """Compensate the step"""
        if self.compensation:
            return self.compensation(context)
        return context

class Saga:
    """Saga orchestrator"""
    
    def __init__(self, saga_type: str, steps: List[SagaStep]):
        self.saga_type = saga_type
        self.steps = steps
    
    def execute(self, saga_id: str, initial_context: Dict[str, Any]) -> bool:
        """Execute saga"""
        try:
            # Create or get saga execution record
            saga_execution, created = SagaExecution.objects.get_or_create(
                saga_id=saga_id,
                defaults={
                    'saga_type': self.saga_type,
                    'context_data': initial_context,
                    'status': SagaStatus.RUNNING
                }
            )
            
            if not created and saga_execution.status in [SagaStatus.COMPLETED, SagaStatus.COMPENSATED]:
                logger.info(f"Saga {saga_id} already completed")
                return True
            
            context = saga_execution.context_data.copy()
            
            # Execute steps from current position
            for i in range(saga_execution.current_step, len(self.steps)):
                step = self.steps[i]
                
                try:
                    logger.info(f"Executing step {i}: {step.name} for saga {saga_id}")
                    context = step.execute(context)
                    
                    # Update saga execution
                    saga_execution.current_step = i + 1
                    saga_execution.context_data = context
                    saga_execution.save()
                    
                except Exception as e:
                    logger.error(f"Step {i} failed for saga {saga_id}: {e}")
                    
                    # Start compensation
                    saga_execution.status = SagaStatus.FAILED
                    saga_execution.error_message = str(e)
                    saga_execution.save()
                    
                    self._compensate(saga_execution, i - 1)
                    return False
            
            # All steps completed successfully
            saga_execution.status = SagaStatus.COMPLETED
            saga_execution.save()
            
            logger.info(f"Saga {saga_id} completed successfully")
            return True
        
        except Exception as e:
            logger.error(f"Saga {saga_id} execution failed: {e}")
            return False
    
    def _compensate(self, saga_execution: SagaExecution, last_completed_step: int):
        """Compensate completed steps in reverse order"""
        saga_execution.status = SagaStatus.COMPENSATING
        saga_execution.save()
        
        context = saga_execution.context_data.copy()
        
        # Compensate steps in reverse order
        for i in range(last_completed_step, -1, -1):
            step = self.steps[i]
            
            try:
                logger.info(f"Compensating step {i}: {step.name} for saga {saga_execution.saga_id}")
                context = step.compensate(context)
                
            except Exception as e:
                logger.error(f"Compensation failed for step {i} in saga {saga_execution.saga_id}: {e}")
                # Continue with other compensations
        
        saga_execution.status = SagaStatus.COMPENSATED
        saga_execution.context_data = context
        saga_execution.save()

# Example: Order processing saga
class OrderProcessingSaga:
    """Saga for processing orders across multiple services"""
    
    def __init__(self):
        self.user_service = UserServiceClient()
        self.inventory_service = InventoryServiceClient()
        self.payment_service = PaymentServiceClient()
        self.order_service = OrderServiceClient()
        
        self.saga = Saga('order_processing', [
            SagaStep('validate_user', self.validate_user, self.compensate_user_validation),
            SagaStep('reserve_inventory', self.reserve_inventory, self.release_inventory),
            SagaStep('process_payment', self.process_payment, self.refund_payment),
            SagaStep('create_order', self.create_order, self.cancel_order),
            SagaStep('send_confirmation', self.send_confirmation, None),
        ])
    
    def process_order(self, order_data: Dict[str, Any]) -> bool:
        """Process order using saga pattern"""
        saga_id = f"order_{order_data['order_id']}"
        return self.saga.execute(saga_id, order_data)
    
    def validate_user(self, context: Dict[str, Any]) -> Dict[str, Any]:
        """Validate user exists and is active"""
        user_id = context['user_id']
        user_data = self.user_service.get_user(user_id)
        
        if not user_data.get('is_active'):
            raise Exception(f"User {user_id} is not active")
        
        context['user_data'] = user_data
        return context
    
    def compensate_user_validation(self, context: Dict[str, Any]) -> Dict[str, Any]:
        """No compensation needed for user validation"""
        return context
    
    def reserve_inventory(self, context: Dict[str, Any]) -> Dict[str, Any]:
        """Reserve inventory for order items"""
        items = context['items']
        reservation_ids = []
        
        for item in items:
            reservation = self.inventory_service.reserve_item(
                item['product_id'],
                item['quantity']
            )
            reservation_ids.append(reservation['reservation_id'])
        
        context['reservation_ids'] = reservation_ids
        return context
    
    def release_inventory(self, context: Dict[str, Any]) -> Dict[str, Any]:
        """Release reserved inventory"""
        reservation_ids = context.get('reservation_ids', [])
        
        for reservation_id in reservation_ids:
            try:
                self.inventory_service.release_reservation(reservation_id)
            except Exception as e:
                logger.error(f"Failed to release reservation {reservation_id}: {e}")
        
        return context
    
    def process_payment(self, context: Dict[str, Any]) -> Dict[str, Any]:
        """Process payment"""
        payment_data = {
            'user_id': context['user_id'],
            'amount': context['total_amount'],
            'payment_method': context['payment_method']
        }
        
        payment_result = self.payment_service.process_payment(payment_data)
        context['payment_id'] = payment_result['payment_id']
        
        return context
    
    def refund_payment(self, context: Dict[str, Any]) -> Dict[str, Any]:
        """Refund payment"""
        payment_id = context.get('payment_id')
        
        if payment_id:
            try:
                self.payment_service.refund_payment(payment_id)
            except Exception as e:
                logger.error(f"Failed to refund payment {payment_id}: {e}")
        
        return context
    
    def create_order(self, context: Dict[str, Any]) -> Dict[str, Any]:
        """Create order record"""
        order_data = {
            'order_id': context['order_id'],
            'user_id': context['user_id'],
            'items': context['items'],
            'total_amount': context['total_amount'],
            'payment_id': context['payment_id']
        }
        
        order_result = self.order_service.create_order(order_data)
        context['order_created'] = True
        
        return context
    
    def cancel_order(self, context: Dict[str, Any]) -> Dict[str, Any]:
        """Cancel created order"""
        if context.get('order_created'):
            try:
                self.order_service.cancel_order(context['order_id'])
            except Exception as e:
                logger.error(f"Failed to cancel order {context['order_id']}: {e}")
        
        return context
    
    def send_confirmation(self, context: Dict[str, Any]) -> Dict[str, Any]:
        """Send order confirmation"""
        # Send confirmation email/notification
        logger.info(f"Order {context['order_id']} processed successfully")
        return context

# Usage
def process_order_request(order_data):
    """Process order using saga pattern"""
    saga = OrderProcessingSaga()
    success = saga.process_order(order_data)
    
    if success:
        logger.info(f"Order {order_data['order_id']} processed successfully")
    else:
        logger.error(f"Order {order_data['order_id']} processing failed")
    
    return success

Microservices integration requires careful consideration of communication patterns, data consistency, and failure handling. The key is choosing the right patterns for your specific use case while maintaining system reliability and performance. Start with simple synchronous communication and evolve to more sophisticated patterns like event sourcing and saga orchestration as your system grows in complexity.