Microservices with Django

Securing Microservices

Security in microservices architecture requires a comprehensive approach covering authentication, authorization, network security, data protection, and monitoring. This section explores security best practices and implementation strategies for Django microservices.

Securing Microservices

Security in microservices architecture requires a comprehensive approach covering authentication, authorization, network security, data protection, and monitoring. This section explores security best practices and implementation strategies for Django microservices.

Security Challenges in Microservices

Key Security Concerns

  1. Distributed Authentication: Managing user identity across services
  2. Service-to-Service Communication: Securing inter-service calls
  3. Network Security: Protecting communication channels
  4. Data Protection: Encrypting sensitive data
  5. API Security: Securing external interfaces
  6. Monitoring: Detecting security threats

Authentication and Authorization

1. JWT-Based Authentication

# authentication.py
import jwt
from datetime import datetime, timedelta
from django.conf import settings
from django.contrib.auth import get_user_model
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed

User = get_user_model()

class JWTAuthentication(BaseAuthentication):
    """JWT token authentication"""
    
    def authenticate(self, request):
        auth_header = request.META.get('HTTP_AUTHORIZATION')
        if not auth_header or not auth_header.startswith('Bearer '):
            return None
        
        token = auth_header.split(' ')[1]
        
        try:
            payload = jwt.decode(
                token, 
                settings.JWT_SECRET_KEY, 
                algorithms=[settings.JWT_ALGORITHM]
            )
            
            user_id = payload.get('user_id')
            if not user_id:
                raise AuthenticationFailed('Invalid token payload')
            
            user = User.objects.get(id=user_id)
            if not user.is_active:
                raise AuthenticationFailed('User account is disabled')
            
            return (user, token)
            
        except jwt.ExpiredSignatureError:
            raise AuthenticationFailed('Token has expired')
        except jwt.InvalidTokenError:
            raise AuthenticationFailed('Invalid token')
        except User.DoesNotExist:
            raise AuthenticationFailed('User not found')

class JWTTokenGenerator:
    """Generate and validate JWT tokens"""
    
    @staticmethod
    def generate_access_token(user):
        """Generate access token"""
        payload = {
            'user_id': user.id,
            'username': user.username,
            'email': user.email,
            'exp': datetime.utcnow() + timedelta(minutes=settings.JWT_ACCESS_TOKEN_LIFETIME),
            'iat': datetime.utcnow(),
            'type': 'access'
        }
        
        return jwt.encode(
            payload, 
            settings.JWT_SECRET_KEY, 
            algorithm=settings.JWT_ALGORITHM
        )
    
    @staticmethod
    def generate_refresh_token(user):
        """Generate refresh token"""
        payload = {
            'user_id': user.id,
            'exp': datetime.utcnow() + timedelta(days=settings.JWT_REFRESH_TOKEN_LIFETIME),
            'iat': datetime.utcnow(),
            'type': 'refresh'
        }
        
        return jwt.encode(
            payload, 
            settings.JWT_SECRET_KEY, 
            algorithm=settings.JWT_ALGORITHM
        )
    
    @staticmethod
    def verify_token(token, token_type='access'):
        """Verify token and return payload"""
        try:
            payload = jwt.decode(
                token, 
                settings.JWT_SECRET_KEY, 
                algorithms=[settings.JWT_ALGORITHM]
            )
            
            if payload.get('type') != token_type:
                raise jwt.InvalidTokenError('Invalid token type')
            
            return payload
            
        except jwt.ExpiredSignatureError:
            raise AuthenticationFailed('Token has expired')
        except jwt.InvalidTokenError:
            raise AuthenticationFailed('Invalid token')

# views.py
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import AllowAny
from rest_framework.response import Response
from rest_framework import status
from django.contrib.auth import authenticate

@api_view(['POST'])
@permission_classes([AllowAny])
def login(request):
    """User login endpoint"""
    username = request.data.get('username')
    password = request.data.get('password')
    
    if not username or not password:
        return Response(
            {'error': 'Username and password required'}, 
            status=status.HTTP_400_BAD_REQUEST
        )
    
    user = authenticate(username=username, password=password)
    if not user:
        return Response(
            {'error': 'Invalid credentials'}, 
            status=status.HTTP_401_UNAUTHORIZED
        )
    
    access_token = JWTTokenGenerator.generate_access_token(user)
    refresh_token = JWTTokenGenerator.generate_refresh_token(user)
    
    return Response({
        'access_token': access_token,
        'refresh_token': refresh_token,
        'user': {
            'id': user.id,
            'username': user.username,
            'email': user.email
        }
    })

@api_view(['POST'])
@permission_classes([AllowAny])
def refresh_token(request):
    """Refresh access token"""
    refresh_token = request.data.get('refresh_token')
    
    if not refresh_token:
        return Response(
            {'error': 'Refresh token required'}, 
            status=status.HTTP_400_BAD_REQUEST
        )
    
    try:
        payload = JWTTokenGenerator.verify_token(refresh_token, 'refresh')
        user = User.objects.get(id=payload['user_id'])
        
        new_access_token = JWTTokenGenerator.generate_access_token(user)
        
        return Response({
            'access_token': new_access_token
        })
        
    except (AuthenticationFailed, User.DoesNotExist) as e:
        return Response(
            {'error': str(e)}, 
            status=status.HTTP_401_UNAUTHORIZED
        )

2. OAuth 2.0 Integration

# oauth.py
import requests
from django.conf import settings
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed

class OAuth2Authentication(BaseAuthentication):
    """OAuth 2.0 token authentication"""
    
    def authenticate(self, request):
        auth_header = request.META.get('HTTP_AUTHORIZATION')
        if not auth_header or not auth_header.startswith('Bearer '):
            return None
        
        token = auth_header.split(' ')[1]
        
        # Validate token with OAuth provider
        user_info = self.validate_token(token)
        if not user_info:
            raise AuthenticationFailed('Invalid OAuth token')
        
        # Get or create user
        user = self.get_or_create_user(user_info)
        
        return (user, token)
    
    def validate_token(self, token):
        """Validate token with OAuth provider"""
        try:
            response = requests.get(
                settings.OAUTH_USERINFO_URL,
                headers={'Authorization': f'Bearer {token}'},
                timeout=10
            )
            
            if response.status_code == 200:
                return response.json()
            
            return None
            
        except requests.RequestException:
            return None
    
    def get_or_create_user(self, user_info):
        """Get or create user from OAuth info"""
        email = user_info.get('email')
        if not email:
            raise AuthenticationFailed('Email not provided by OAuth provider')
        
        user, created = User.objects.get_or_create(
            email=email,
            defaults={
                'username': user_info.get('preferred_username', email),
                'first_name': user_info.get('given_name', ''),
                'last_name': user_info.get('family_name', ''),
                'is_active': True
            }
        )
        
        return user

3. Role-Based Access Control (RBAC)

# permissions.py
from rest_framework.permissions import BasePermission
from django.contrib.auth.models import Group

class RoleBasedPermission(BasePermission):
    """Role-based permission system"""
    
    required_roles = []
    
    def has_permission(self, request, view):
        if not request.user or not request.user.is_authenticated:
            return False
        
        if not self.required_roles:
            return True
        
        user_roles = set(request.user.groups.values_list('name', flat=True))
        required_roles = set(self.required_roles)
        
        return bool(user_roles.intersection(required_roles))

class AdminPermission(RoleBasedPermission):
    required_roles = ['admin']

class ManagerPermission(RoleBasedPermission):
    required_roles = ['admin', 'manager']

class UserPermission(RoleBasedPermission):
    required_roles = ['admin', 'manager', 'user']

# Custom permissions for specific actions
class IsOwnerOrAdmin(BasePermission):
    """Allow access to owners or admins"""
    
    def has_object_permission(self, request, view, obj):
        # Admin can access everything
        if request.user.groups.filter(name='admin').exists():
            return True
        
        # Owner can access their own objects
        if hasattr(obj, 'user'):
            return obj.user == request.user
        
        return False

# Usage in views
from rest_framework.decorators import permission_classes

class UserViewSet(ModelViewSet):
    permission_classes = [UserPermission]
    
    def get_permissions(self):
        """Apply different permissions based on action"""
        if self.action == 'create':
            permission_classes = [AllowAny]
        elif self.action in ['update', 'partial_update', 'destroy']:
            permission_classes = [IsOwnerOrAdmin]
        else:
            permission_classes = [UserPermission]
        
        return [permission() for permission in permission_classes]

Service-to-Service Security

1. Service Authentication

# service_auth.py
import hmac
import hashlib
import time
from django.conf import settings
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed

class ServiceTokenAuthentication(BaseAuthentication):
    """Service-to-service token authentication"""
    
    def authenticate(self, request):
        service_token = request.META.get('HTTP_X_SERVICE_TOKEN')
        service_name = request.META.get('HTTP_X_SERVICE_NAME')
        timestamp = request.META.get('HTTP_X_TIMESTAMP')
        signature = request.META.get('HTTP_X_SIGNATURE')
        
        if not all([service_token, service_name, timestamp, signature]):
            return None
        
        # Verify timestamp (prevent replay attacks)
        try:
            request_time = float(timestamp)
            current_time = time.time()
            
            if abs(current_time - request_time) > 300:  # 5 minutes
                raise AuthenticationFailed('Request timestamp too old')
        except ValueError:
            raise AuthenticationFailed('Invalid timestamp')
        
        # Verify service token
        if service_token != settings.SERVICE_TOKENS.get(service_name):
            raise AuthenticationFailed('Invalid service token')
        
        # Verify signature
        expected_signature = self.generate_signature(
            service_name, timestamp, service_token
        )
        
        if not hmac.compare_digest(signature, expected_signature):
            raise AuthenticationFailed('Invalid signature')
        
        # Create service user object
        service_user = ServiceUser(service_name)
        
        return (service_user, service_token)
    
    def generate_signature(self, service_name, timestamp, service_token):
        """Generate HMAC signature"""
        message = f"{service_name}:{timestamp}:{service_token}"
        
        return hmac.new(
            settings.SERVICE_SECRET_KEY.encode(),
            message.encode(),
            hashlib.sha256
        ).hexdigest()

class ServiceUser:
    """Represents a service user for authentication"""
    
    def __init__(self, service_name):
        self.service_name = service_name
        self.is_authenticated = True
        self.is_service = True
    
    @property
    def is_anonymous(self):
        return False

class ServiceClient:
    """Client for making authenticated service calls"""
    
    def __init__(self, service_name):
        self.service_name = service_name
        self.service_token = settings.SERVICE_TOKENS[service_name]
    
    def make_request(self, method, url, **kwargs):
        """Make authenticated request to another service"""
        timestamp = str(time.time())
        signature = self.generate_signature(timestamp)
        
        headers = kwargs.get('headers', {})
        headers.update({
            'X-Service-Token': self.service_token,
            'X-Service-Name': settings.SERVICE_NAME,
            'X-Timestamp': timestamp,
            'X-Signature': signature
        })
        kwargs['headers'] = headers
        
        return requests.request(method, url, **kwargs)
    
    def generate_signature(self, timestamp):
        """Generate request signature"""
        message = f"{settings.SERVICE_NAME}:{timestamp}:{self.service_token}"
        
        return hmac.new(
            settings.SERVICE_SECRET_KEY.encode(),
            message.encode(),
            hashlib.sha256
        ).hexdigest()

2. mTLS (Mutual TLS)

# mtls.py
import ssl
import requests
from django.conf import settings

class MTLSClient:
    """Client for mTLS communication"""
    
    def __init__(self):
        self.cert_file = settings.MTLS_CERT_FILE
        self.key_file = settings.MTLS_KEY_FILE
        self.ca_file = settings.MTLS_CA_FILE
    
    def make_request(self, method, url, **kwargs):
        """Make mTLS authenticated request"""
        # Configure SSL context
        ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
        ssl_context.load_cert_chain(self.cert_file, self.key_file)
        ssl_context.load_verify_locations(self.ca_file)
        ssl_context.check_hostname = True
        ssl_context.verify_mode = ssl.CERT_REQUIRED
        
        # Create session with SSL context
        session = requests.Session()
        session.mount('https://', requests.adapters.HTTPAdapter())
        
        # Make request with client certificate
        kwargs['cert'] = (self.cert_file, self.key_file)
        kwargs['verify'] = self.ca_file
        
        return session.request(method, url, **kwargs)

# Django middleware for mTLS
class MTLSMiddleware:
    """Middleware to verify client certificates"""
    
    def __init__(self, get_response):
        self.get_response = get_response
    
    def __call__(self, request):
        # Verify client certificate
        if not self.verify_client_cert(request):
            return HttpResponseForbidden('Invalid client certificate')
        
        response = self.get_response(request)
        return response
    
    def verify_client_cert(self, request):
        """Verify client certificate"""
        # In production, this would be handled by the web server
        # This is a simplified example
        
        cert_header = request.META.get('HTTP_X_CLIENT_CERT')
        if not cert_header:
            return False
        
        # Verify certificate against CA
        # Implementation depends on your certificate infrastructure
        
        return True

API Security

1. Rate Limiting and Throttling

# throttling.py
from rest_framework.throttling import UserRateThrottle, AnonRateThrottle
from django.core.cache import cache
import time

class BurstRateThrottle(UserRateThrottle):
    """Burst rate limiting with sustained rate"""
    
    scope = 'burst'
    
    def allow_request(self, request, view):
        if self.rate is None:
            return True
        
        self.key = self.get_cache_key(request, view)
        if self.key is None:
            return True
        
        self.history = self.cache.get(self.key, [])
        self.now = self.timer()
        
        # Drop any requests from the history which have now passed the throttle duration
        while self.history and self.history[-1] <= self.now - self.duration:
            self.history.pop()
        
        if len(self.history) >= self.num_requests:
            return self.throttle_failure()
        
        return self.throttle_success()

class IPRateThrottle(AnonRateThrottle):
    """IP-based rate limiting"""
    
    def get_cache_key(self, request, view):
        if request.user.is_authenticated:
            return None  # Only throttle anonymous requests
        
        return self.cache_format % {
            'scope': self.scope,
            'ident': self.get_ident(request)
        }

class DynamicRateThrottle(UserRateThrottle):
    """Dynamic rate limiting based on user tier"""
    
    def get_rate(self):
        """Get rate based on user tier"""
        if not hasattr(self, 'request') or not self.request.user.is_authenticated:
            return '100/hour'
        
        user = self.request.user
        
        if user.groups.filter(name='premium').exists():
            return '10000/hour'
        elif user.groups.filter(name='pro').exists():
            return '5000/hour'
        else:
            return '1000/hour'

# Usage in views
class UserViewSet(ModelViewSet):
    throttle_classes = [BurstRateThrottle, IPRateThrottle, DynamicRateThrottle]
    throttle_scope = 'user'

2. Input Validation and Sanitization

# validation.py
from rest_framework import serializers
from django.core.validators import RegexValidator
import bleach
import re

class SecureSerializer(serializers.ModelSerializer):
    """Base serializer with security features"""
    
    def validate(self, data):
        """Apply security validations"""
        # Sanitize HTML content
        for field_name, value in data.items():
            if isinstance(value, str):
                data[field_name] = self.sanitize_html(value)
        
        # Check for SQL injection patterns
        self.check_sql_injection(data)
        
        # Check for XSS patterns
        self.check_xss_patterns(data)
        
        return data
    
    def sanitize_html(self, value):
        """Sanitize HTML content"""
        allowed_tags = ['p', 'br', 'strong', 'em', 'ul', 'ol', 'li']
        allowed_attributes = {}
        
        return bleach.clean(
            value,
            tags=allowed_tags,
            attributes=allowed_attributes,
            strip=True
        )
    
    def check_sql_injection(self, data):
        """Check for SQL injection patterns"""
        sql_patterns = [
            r'(\b(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER)\b)',
            r'(\b(UNION|OR|AND)\b.*\b(SELECT|INSERT|UPDATE|DELETE)\b)',
            r'(--|#|/\*|\*/)',
            r'(\bEXEC\b|\bEXECUTE\b)',
        ]
        
        for field_name, value in data.items():
            if isinstance(value, str):
                for pattern in sql_patterns:
                    if re.search(pattern, value, re.IGNORECASE):
                        raise serializers.ValidationError(
                            f'Potential SQL injection detected in {field_name}'
                        )
    
    def check_xss_patterns(self, data):
        """Check for XSS patterns"""
        xss_patterns = [
            r'<script[^>]*>.*?</script>',
            r'javascript:',
            r'on\w+\s*=',
            r'<iframe[^>]*>.*?</iframe>',
        ]
        
        for field_name, value in data.items():
            if isinstance(value, str):
                for pattern in xss_patterns:
                    if re.search(pattern, value, re.IGNORECASE):
                        raise serializers.ValidationError(
                            f'Potential XSS detected in {field_name}'
                        )

class UserSerializer(SecureSerializer):
    """Secure user serializer"""
    
    username = serializers.CharField(
        max_length=150,
        validators=[
            RegexValidator(
                regex=r'^[a-zA-Z0-9_-]+$',
                message='Username can only contain letters, numbers, underscores, and hyphens'
            )
        ]
    )
    
    email = serializers.EmailField()
    
    password = serializers.CharField(
        write_only=True,
        min_length=8,
        validators=[
            RegexValidator(
                regex=r'^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[@$!%*?&])[A-Za-z\d@$!%*?&]',
                message='Password must contain at least one uppercase letter, one lowercase letter, one digit, and one special character'
            )
        ]
    )
    
    class Meta:
        model = User
        fields = ['id', 'username', 'email', 'first_name', 'last_name', 'password']
        read_only_fields = ['id']

Data Protection

1. Encryption at Rest

# encryption.py
from cryptography.fernet import Fernet
from django.conf import settings
from django.db import models
import base64

class EncryptedField(models.TextField):
    """Encrypted database field"""
    
    def __init__(self, *args, **kwargs):
        self.cipher_suite = Fernet(settings.ENCRYPTION_KEY.encode())
        super().__init__(*args, **kwargs)
    
    def from_db_value(self, value, expression, connection):
        if value is None:
            return value
        
        try:
            # Decrypt the value
            decrypted = self.cipher_suite.decrypt(value.encode())
            return decrypted.decode()
        except Exception:
            return value
    
    def to_python(self, value):
        if isinstance(value, str):
            return value
        
        if value is None:
            return value
        
        return str(value)
    
    def get_prep_value(self, value):
        if value is None:
            return value
        
        # Encrypt the value
        encrypted = self.cipher_suite.encrypt(value.encode())
        return encrypted.decode()

# Usage in models
class UserProfile(models.Model):
    user = models.OneToOneField(User, on_delete=models.CASCADE)
    bio = models.TextField()
    phone_number = EncryptedField()  # Encrypted field
    ssn = EncryptedField()  # Encrypted field
    
    class Meta:
        db_table = 'user_profiles'

# Encryption utilities
class DataEncryption:
    """Data encryption utilities"""
    
    def __init__(self):
        self.cipher_suite = Fernet(settings.ENCRYPTION_KEY.encode())
    
    def encrypt_data(self, data):
        """Encrypt sensitive data"""
        if isinstance(data, str):
            data = data.encode()
        
        encrypted = self.cipher_suite.encrypt(data)
        return base64.b64encode(encrypted).decode()
    
    def decrypt_data(self, encrypted_data):
        """Decrypt sensitive data"""
        try:
            decoded = base64.b64decode(encrypted_data.encode())
            decrypted = self.cipher_suite.decrypt(decoded)
            return decrypted.decode()
        except Exception as e:
            raise ValueError(f"Decryption failed: {e}")
    
    def encrypt_file(self, file_path, output_path):
        """Encrypt file"""
        with open(file_path, 'rb') as file:
            file_data = file.read()
        
        encrypted_data = self.cipher_suite.encrypt(file_data)
        
        with open(output_path, 'wb') as file:
            file.write(encrypted_data)
    
    def decrypt_file(self, encrypted_file_path, output_path):
        """Decrypt file"""
        with open(encrypted_file_path, 'rb') as file:
            encrypted_data = file.read()
        
        decrypted_data = self.cipher_suite.decrypt(encrypted_data)
        
        with open(output_path, 'wb') as file:
            file.write(decrypted_data)

2. Secrets Management

# secrets_manager.py
import boto3
import json
from django.conf import settings
from django.core.cache import cache

class SecretsManager:
    """AWS Secrets Manager integration"""
    
    def __init__(self):
        self.client = boto3.client(
            'secretsmanager',
            region_name=settings.AWS_REGION
        )
    
    def get_secret(self, secret_name, cache_timeout=300):
        """Get secret with caching"""
        cache_key = f"secret:{secret_name}"
        secret_value = cache.get(cache_key)
        
        if secret_value is None:
            try:
                response = self.client.get_secret_value(SecretId=secret_name)
                secret_value = json.loads(response['SecretString'])
                cache.set(cache_key, secret_value, timeout=cache_timeout)
            except Exception as e:
                raise ValueError(f"Failed to retrieve secret {secret_name}: {e}")
        
        return secret_value
    
    def update_secret(self, secret_name, secret_value):
        """Update secret"""
        try:
            self.client.update_secret(
                SecretId=secret_name,
                SecretString=json.dumps(secret_value)
            )
            
            # Invalidate cache
            cache_key = f"secret:{secret_name}"
            cache.delete(cache_key)
            
        except Exception as e:
            raise ValueError(f"Failed to update secret {secret_name}: {e}")

# Kubernetes secrets integration
class KubernetesSecretsManager:
    """Kubernetes secrets integration"""
    
    def __init__(self):
        from kubernetes import client, config
        
        try:
            config.load_incluster_config()
        except:
            config.load_kube_config()
        
        self.v1 = client.CoreV1Api()
    
    def get_secret(self, secret_name, namespace='default'):
        """Get Kubernetes secret"""
        try:
            secret = self.v1.read_namespaced_secret(
                name=secret_name,
                namespace=namespace
            )
            
            # Decode base64 encoded values
            decoded_data = {}
            for key, value in secret.data.items():
                decoded_data[key] = base64.b64decode(value).decode()
            
            return decoded_data
            
        except Exception as e:
            raise ValueError(f"Failed to retrieve secret {secret_name}: {e}")

# Environment-based secrets
class EnvironmentSecretsManager:
    """Environment variable based secrets"""
    
    @staticmethod
    def get_secret(secret_name):
        """Get secret from environment variables"""
        import os
        
        secret_value = os.getenv(secret_name)
        if secret_value is None:
            raise ValueError(f"Secret {secret_name} not found in environment")
        
        try:
            # Try to parse as JSON
            return json.loads(secret_value)
        except json.JSONDecodeError:
            # Return as string if not JSON
            return secret_value

Network Security

1. Network Policies

# k8s/network-policies.yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: microservices-network-policy
  namespace: microservices
spec:
  podSelector: {}
  policyTypes:
  - Ingress
  - Egress
  
  ingress:
  # Allow ingress from API Gateway
  - from:
    - namespaceSelector:
        matchLabels:
          name: ingress-nginx
    ports:
    - protocol: TCP
      port: 8000
  
  # Allow inter-service communication
  - from:
    - podSelector:
        matchLabels:
          app: user-service
    - podSelector:
        matchLabels:
          app: product-service
    - podSelector:
        matchLabels:
          app: order-service
    ports:
    - protocol: TCP
      port: 8000
  
  egress:
  # Allow egress to databases
  - to:
    - podSelector:
        matchLabels:
          app: postgresql
    ports:
    - protocol: TCP
      port: 5432
  
  # Allow egress to Redis
  - to:
    - podSelector:
        matchLabels:
          app: redis
    ports:
    - protocol: TCP
      port: 6379
  
  # Allow DNS resolution
  - to: []
    ports:
    - protocol: UDP
      port: 53
  
  # Allow HTTPS egress for external APIs
  - to: []
    ports:
    - protocol: TCP
      port: 443

---
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: database-network-policy
  namespace: microservices
spec:
  podSelector:
    matchLabels:
      app: postgresql
  policyTypes:
  - Ingress
  
  ingress:
  # Only allow access from microservices
  - from:
    - podSelector:
        matchLabels:
          app: user-service
    - podSelector:
        matchLabels:
          app: product-service
    - podSelector:
        matchLabels:
          app: order-service
    ports:
    - protocol: TCP
      port: 5432

2. Service Mesh Security (Istio)

# istio/security-policies.yaml
apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
  name: default
  namespace: microservices
spec:
  mtls:
    mode: STRICT

---
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
  name: user-service-authz
  namespace: microservices
spec:
  selector:
    matchLabels:
      app: user-service
  rules:
  - from:
    - source:
        principals: ["cluster.local/ns/microservices/sa/order-service"]
    - source:
        principals: ["cluster.local/ns/microservices/sa/api-gateway"]
    to:
    - operation:
        methods: ["GET", "POST", "PUT", "DELETE"]
        paths: ["/api/v1/users/*"]

---
apiVersion: security.istio.io/v1beta1
kind: RequestAuthentication
metadata:
  name: jwt-auth
  namespace: microservices
spec:
  selector:
    matchLabels:
      app: user-service
  jwtRules:
  - issuer: "https://auth.example.com"
    jwksUri: "https://auth.example.com/.well-known/jwks.json"
    audiences:
    - "microservices-api"

Security Monitoring

1. Security Logging

# security_logging.py
import logging
import json
from django.utils.deprecation import MiddlewareMixin
from django.http import HttpResponseForbidden
import time

security_logger = logging.getLogger('security')

class SecurityLoggingMiddleware(MiddlewareMixin):
    """Log security events"""
    
    def __init__(self, get_response):
        self.get_response = get_response
        self.suspicious_patterns = [
            r'<script[^>]*>.*?</script>',
            r'javascript:',
            r'(\b(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER)\b)',
            r'(--|#|/\*|\*/)',
        ]
    
    def __call__(self, request):
        start_time = time.time()
        
        # Check for suspicious patterns
        if self.detect_suspicious_activity(request):
            self.log_security_event(request, 'SUSPICIOUS_REQUEST', {
                'user_agent': request.META.get('HTTP_USER_AGENT'),
                'ip_address': self.get_client_ip(request),
                'path': request.path,
                'method': request.method
            })
            
            return HttpResponseForbidden('Suspicious activity detected')
        
        response = self.get_response(request)
        
        # Log authentication failures
        if response.status_code == 401:
            self.log_security_event(request, 'AUTH_FAILURE', {
                'path': request.path,
                'method': request.method,
                'ip_address': self.get_client_ip(request)
            })
        
        # Log access to sensitive endpoints
        if self.is_sensitive_endpoint(request.path):
            self.log_security_event(request, 'SENSITIVE_ACCESS', {
                'user': str(request.user) if request.user.is_authenticated else 'anonymous',
                'path': request.path,
                'method': request.method,
                'ip_address': self.get_client_ip(request),
                'response_time': time.time() - start_time
            })
        
        return response
    
    def detect_suspicious_activity(self, request):
        """Detect suspicious request patterns"""
        # Check query parameters
        for key, value in request.GET.items():
            if self.contains_suspicious_pattern(value):
                return True
        
        # Check POST data
        if hasattr(request, 'body') and request.body:
            try:
                body_str = request.body.decode('utf-8')
                if self.contains_suspicious_pattern(body_str):
                    return True
            except UnicodeDecodeError:
                pass
        
        return False
    
    def contains_suspicious_pattern(self, text):
        """Check if text contains suspicious patterns"""
        import re
        
        for pattern in self.suspicious_patterns:
            if re.search(pattern, text, re.IGNORECASE):
                return True
        
        return False
    
    def is_sensitive_endpoint(self, path):
        """Check if endpoint is sensitive"""
        sensitive_patterns = [
            r'/admin/',
            r'/api/.*/users/',
            r'/api/.*/auth/',
            r'/api/.*/payments/',
        ]
        
        import re
        for pattern in sensitive_patterns:
            if re.search(pattern, path):
                return True
        
        return False
    
    def get_client_ip(self, request):
        """Get client IP address"""
        x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
        if x_forwarded_for:
            ip = x_forwarded_for.split(',')[0]
        else:
            ip = request.META.get('REMOTE_ADDR')
        return ip
    
    def log_security_event(self, request, event_type, details):
        """Log security event"""
        event_data = {
            'timestamp': time.time(),
            'event_type': event_type,
            'service': 'user-service',
            'details': details
        }
        
        security_logger.warning(json.dumps(event_data))

# Intrusion detection
class IntrusionDetectionSystem:
    """Simple intrusion detection system"""
    
    def __init__(self):
        self.failed_attempts = {}
        self.blocked_ips = set()
    
    def record_failed_attempt(self, ip_address):
        """Record failed authentication attempt"""
        if ip_address not in self.failed_attempts:
            self.failed_attempts[ip_address] = []
        
        self.failed_attempts[ip_address].append(time.time())
        
        # Remove old attempts (older than 1 hour)
        cutoff_time = time.time() - 3600
        self.failed_attempts[ip_address] = [
            attempt for attempt in self.failed_attempts[ip_address]
            if attempt > cutoff_time
        ]
        
        # Block IP if too many failed attempts
        if len(self.failed_attempts[ip_address]) >= 5:
            self.blocked_ips.add(ip_address)
            
            security_logger.critical(json.dumps({
                'event_type': 'IP_BLOCKED',
                'ip_address': ip_address,
                'failed_attempts': len(self.failed_attempts[ip_address]),
                'timestamp': time.time()
            }))
    
    def is_blocked(self, ip_address):
        """Check if IP is blocked"""
        return ip_address in self.blocked_ips
    
    def unblock_ip(self, ip_address):
        """Unblock IP address"""
        self.blocked_ips.discard(ip_address)
        if ip_address in self.failed_attempts:
            del self.failed_attempts[ip_address]

ids = IntrusionDetectionSystem()

2. Security Metrics

# security_metrics.py
from prometheus_client import Counter, Histogram, Gauge
import time

# Security metrics
SECURITY_EVENTS = Counter(
    'security_events_total',
    'Total security events',
    ['event_type', 'service']
)

AUTH_FAILURES = Counter(
    'auth_failures_total',
    'Total authentication failures',
    ['service', 'reason']
)

BLOCKED_REQUESTS = Counter(
    'blocked_requests_total',
    'Total blocked requests',
    ['service', 'reason']
)

ACTIVE_SESSIONS = Gauge(
    'active_sessions',
    'Number of active user sessions',
    ['service']
)

class SecurityMetricsCollector:
    """Collect security metrics"""
    
    @staticmethod
    def record_security_event(event_type, service='user-service'):
        """Record security event"""
        SECURITY_EVENTS.labels(event_type=event_type, service=service).inc()
    
    @staticmethod
    def record_auth_failure(reason, service='user-service'):
        """Record authentication failure"""
        AUTH_FAILURES.labels(service=service, reason=reason).inc()
    
    @staticmethod
    def record_blocked_request(reason, service='user-service'):
        """Record blocked request"""
        BLOCKED_REQUESTS.labels(service=service, reason=reason).inc()
    
    @staticmethod
    def update_active_sessions(count, service='user-service'):
        """Update active sessions count"""
        ACTIVE_SESSIONS.labels(service=service).set(count)

# Usage in middleware
class SecurityMetricsMiddleware(MiddlewareMixin):
    """Collect security metrics"""
    
    def __call__(self, request):
        response = self.get_response(request)
        
        # Record metrics based on response
        if response.status_code == 401:
            SecurityMetricsCollector.record_auth_failure('invalid_credentials')
        elif response.status_code == 403:
            SecurityMetricsCollector.record_blocked_request('forbidden')
        elif response.status_code == 429:
            SecurityMetricsCollector.record_blocked_request('rate_limit')
        
        return response

Summary

Securing microservices requires a multi-layered approach:

  • Authentication: JWT tokens, OAuth 2.0, service authentication
  • Authorization: Role-based access control, fine-grained permissions
  • Network Security: Network policies, service mesh, mTLS
  • Data Protection: Encryption at rest and in transit, secrets management
  • API Security: Input validation, rate limiting, HTTPS
  • Monitoring: Security logging, intrusion detection, metrics

A comprehensive security strategy protects against various threats while maintaining system performance and usability. In the next section, we'll explore performance optimization with caching.