Security in microservices architecture requires a comprehensive approach covering authentication, authorization, network security, data protection, and monitoring. This section explores security best practices and implementation strategies for Django microservices.
# authentication.py
import jwt
from datetime import datetime, timedelta
from django.conf import settings
from django.contrib.auth import get_user_model
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
User = get_user_model()
class JWTAuthentication(BaseAuthentication):
"""JWT token authentication"""
def authenticate(self, request):
auth_header = request.META.get('HTTP_AUTHORIZATION')
if not auth_header or not auth_header.startswith('Bearer '):
return None
token = auth_header.split(' ')[1]
try:
payload = jwt.decode(
token,
settings.JWT_SECRET_KEY,
algorithms=[settings.JWT_ALGORITHM]
)
user_id = payload.get('user_id')
if not user_id:
raise AuthenticationFailed('Invalid token payload')
user = User.objects.get(id=user_id)
if not user.is_active:
raise AuthenticationFailed('User account is disabled')
return (user, token)
except jwt.ExpiredSignatureError:
raise AuthenticationFailed('Token has expired')
except jwt.InvalidTokenError:
raise AuthenticationFailed('Invalid token')
except User.DoesNotExist:
raise AuthenticationFailed('User not found')
class JWTTokenGenerator:
"""Generate and validate JWT tokens"""
@staticmethod
def generate_access_token(user):
"""Generate access token"""
payload = {
'user_id': user.id,
'username': user.username,
'email': user.email,
'exp': datetime.utcnow() + timedelta(minutes=settings.JWT_ACCESS_TOKEN_LIFETIME),
'iat': datetime.utcnow(),
'type': 'access'
}
return jwt.encode(
payload,
settings.JWT_SECRET_KEY,
algorithm=settings.JWT_ALGORITHM
)
@staticmethod
def generate_refresh_token(user):
"""Generate refresh token"""
payload = {
'user_id': user.id,
'exp': datetime.utcnow() + timedelta(days=settings.JWT_REFRESH_TOKEN_LIFETIME),
'iat': datetime.utcnow(),
'type': 'refresh'
}
return jwt.encode(
payload,
settings.JWT_SECRET_KEY,
algorithm=settings.JWT_ALGORITHM
)
@staticmethod
def verify_token(token, token_type='access'):
"""Verify token and return payload"""
try:
payload = jwt.decode(
token,
settings.JWT_SECRET_KEY,
algorithms=[settings.JWT_ALGORITHM]
)
if payload.get('type') != token_type:
raise jwt.InvalidTokenError('Invalid token type')
return payload
except jwt.ExpiredSignatureError:
raise AuthenticationFailed('Token has expired')
except jwt.InvalidTokenError:
raise AuthenticationFailed('Invalid token')
# views.py
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import AllowAny
from rest_framework.response import Response
from rest_framework import status
from django.contrib.auth import authenticate
@api_view(['POST'])
@permission_classes([AllowAny])
def login(request):
"""User login endpoint"""
username = request.data.get('username')
password = request.data.get('password')
if not username or not password:
return Response(
{'error': 'Username and password required'},
status=status.HTTP_400_BAD_REQUEST
)
user = authenticate(username=username, password=password)
if not user:
return Response(
{'error': 'Invalid credentials'},
status=status.HTTP_401_UNAUTHORIZED
)
access_token = JWTTokenGenerator.generate_access_token(user)
refresh_token = JWTTokenGenerator.generate_refresh_token(user)
return Response({
'access_token': access_token,
'refresh_token': refresh_token,
'user': {
'id': user.id,
'username': user.username,
'email': user.email
}
})
@api_view(['POST'])
@permission_classes([AllowAny])
def refresh_token(request):
"""Refresh access token"""
refresh_token = request.data.get('refresh_token')
if not refresh_token:
return Response(
{'error': 'Refresh token required'},
status=status.HTTP_400_BAD_REQUEST
)
try:
payload = JWTTokenGenerator.verify_token(refresh_token, 'refresh')
user = User.objects.get(id=payload['user_id'])
new_access_token = JWTTokenGenerator.generate_access_token(user)
return Response({
'access_token': new_access_token
})
except (AuthenticationFailed, User.DoesNotExist) as e:
return Response(
{'error': str(e)},
status=status.HTTP_401_UNAUTHORIZED
)
# oauth.py
import requests
from django.conf import settings
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
class OAuth2Authentication(BaseAuthentication):
"""OAuth 2.0 token authentication"""
def authenticate(self, request):
auth_header = request.META.get('HTTP_AUTHORIZATION')
if not auth_header or not auth_header.startswith('Bearer '):
return None
token = auth_header.split(' ')[1]
# Validate token with OAuth provider
user_info = self.validate_token(token)
if not user_info:
raise AuthenticationFailed('Invalid OAuth token')
# Get or create user
user = self.get_or_create_user(user_info)
return (user, token)
def validate_token(self, token):
"""Validate token with OAuth provider"""
try:
response = requests.get(
settings.OAUTH_USERINFO_URL,
headers={'Authorization': f'Bearer {token}'},
timeout=10
)
if response.status_code == 200:
return response.json()
return None
except requests.RequestException:
return None
def get_or_create_user(self, user_info):
"""Get or create user from OAuth info"""
email = user_info.get('email')
if not email:
raise AuthenticationFailed('Email not provided by OAuth provider')
user, created = User.objects.get_or_create(
email=email,
defaults={
'username': user_info.get('preferred_username', email),
'first_name': user_info.get('given_name', ''),
'last_name': user_info.get('family_name', ''),
'is_active': True
}
)
return user
# permissions.py
from rest_framework.permissions import BasePermission
from django.contrib.auth.models import Group
class RoleBasedPermission(BasePermission):
"""Role-based permission system"""
required_roles = []
def has_permission(self, request, view):
if not request.user or not request.user.is_authenticated:
return False
if not self.required_roles:
return True
user_roles = set(request.user.groups.values_list('name', flat=True))
required_roles = set(self.required_roles)
return bool(user_roles.intersection(required_roles))
class AdminPermission(RoleBasedPermission):
required_roles = ['admin']
class ManagerPermission(RoleBasedPermission):
required_roles = ['admin', 'manager']
class UserPermission(RoleBasedPermission):
required_roles = ['admin', 'manager', 'user']
# Custom permissions for specific actions
class IsOwnerOrAdmin(BasePermission):
"""Allow access to owners or admins"""
def has_object_permission(self, request, view, obj):
# Admin can access everything
if request.user.groups.filter(name='admin').exists():
return True
# Owner can access their own objects
if hasattr(obj, 'user'):
return obj.user == request.user
return False
# Usage in views
from rest_framework.decorators import permission_classes
class UserViewSet(ModelViewSet):
permission_classes = [UserPermission]
def get_permissions(self):
"""Apply different permissions based on action"""
if self.action == 'create':
permission_classes = [AllowAny]
elif self.action in ['update', 'partial_update', 'destroy']:
permission_classes = [IsOwnerOrAdmin]
else:
permission_classes = [UserPermission]
return [permission() for permission in permission_classes]
# service_auth.py
import hmac
import hashlib
import time
from django.conf import settings
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
class ServiceTokenAuthentication(BaseAuthentication):
"""Service-to-service token authentication"""
def authenticate(self, request):
service_token = request.META.get('HTTP_X_SERVICE_TOKEN')
service_name = request.META.get('HTTP_X_SERVICE_NAME')
timestamp = request.META.get('HTTP_X_TIMESTAMP')
signature = request.META.get('HTTP_X_SIGNATURE')
if not all([service_token, service_name, timestamp, signature]):
return None
# Verify timestamp (prevent replay attacks)
try:
request_time = float(timestamp)
current_time = time.time()
if abs(current_time - request_time) > 300: # 5 minutes
raise AuthenticationFailed('Request timestamp too old')
except ValueError:
raise AuthenticationFailed('Invalid timestamp')
# Verify service token
if service_token != settings.SERVICE_TOKENS.get(service_name):
raise AuthenticationFailed('Invalid service token')
# Verify signature
expected_signature = self.generate_signature(
service_name, timestamp, service_token
)
if not hmac.compare_digest(signature, expected_signature):
raise AuthenticationFailed('Invalid signature')
# Create service user object
service_user = ServiceUser(service_name)
return (service_user, service_token)
def generate_signature(self, service_name, timestamp, service_token):
"""Generate HMAC signature"""
message = f"{service_name}:{timestamp}:{service_token}"
return hmac.new(
settings.SERVICE_SECRET_KEY.encode(),
message.encode(),
hashlib.sha256
).hexdigest()
class ServiceUser:
"""Represents a service user for authentication"""
def __init__(self, service_name):
self.service_name = service_name
self.is_authenticated = True
self.is_service = True
@property
def is_anonymous(self):
return False
class ServiceClient:
"""Client for making authenticated service calls"""
def __init__(self, service_name):
self.service_name = service_name
self.service_token = settings.SERVICE_TOKENS[service_name]
def make_request(self, method, url, **kwargs):
"""Make authenticated request to another service"""
timestamp = str(time.time())
signature = self.generate_signature(timestamp)
headers = kwargs.get('headers', {})
headers.update({
'X-Service-Token': self.service_token,
'X-Service-Name': settings.SERVICE_NAME,
'X-Timestamp': timestamp,
'X-Signature': signature
})
kwargs['headers'] = headers
return requests.request(method, url, **kwargs)
def generate_signature(self, timestamp):
"""Generate request signature"""
message = f"{settings.SERVICE_NAME}:{timestamp}:{self.service_token}"
return hmac.new(
settings.SERVICE_SECRET_KEY.encode(),
message.encode(),
hashlib.sha256
).hexdigest()
# mtls.py
import ssl
import requests
from django.conf import settings
class MTLSClient:
"""Client for mTLS communication"""
def __init__(self):
self.cert_file = settings.MTLS_CERT_FILE
self.key_file = settings.MTLS_KEY_FILE
self.ca_file = settings.MTLS_CA_FILE
def make_request(self, method, url, **kwargs):
"""Make mTLS authenticated request"""
# Configure SSL context
ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
ssl_context.load_cert_chain(self.cert_file, self.key_file)
ssl_context.load_verify_locations(self.ca_file)
ssl_context.check_hostname = True
ssl_context.verify_mode = ssl.CERT_REQUIRED
# Create session with SSL context
session = requests.Session()
session.mount('https://', requests.adapters.HTTPAdapter())
# Make request with client certificate
kwargs['cert'] = (self.cert_file, self.key_file)
kwargs['verify'] = self.ca_file
return session.request(method, url, **kwargs)
# Django middleware for mTLS
class MTLSMiddleware:
"""Middleware to verify client certificates"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
# Verify client certificate
if not self.verify_client_cert(request):
return HttpResponseForbidden('Invalid client certificate')
response = self.get_response(request)
return response
def verify_client_cert(self, request):
"""Verify client certificate"""
# In production, this would be handled by the web server
# This is a simplified example
cert_header = request.META.get('HTTP_X_CLIENT_CERT')
if not cert_header:
return False
# Verify certificate against CA
# Implementation depends on your certificate infrastructure
return True
# throttling.py
from rest_framework.throttling import UserRateThrottle, AnonRateThrottle
from django.core.cache import cache
import time
class BurstRateThrottle(UserRateThrottle):
"""Burst rate limiting with sustained rate"""
scope = 'burst'
def allow_request(self, request, view):
if self.rate is None:
return True
self.key = self.get_cache_key(request, view)
if self.key is None:
return True
self.history = self.cache.get(self.key, [])
self.now = self.timer()
# Drop any requests from the history which have now passed the throttle duration
while self.history and self.history[-1] <= self.now - self.duration:
self.history.pop()
if len(self.history) >= self.num_requests:
return self.throttle_failure()
return self.throttle_success()
class IPRateThrottle(AnonRateThrottle):
"""IP-based rate limiting"""
def get_cache_key(self, request, view):
if request.user.is_authenticated:
return None # Only throttle anonymous requests
return self.cache_format % {
'scope': self.scope,
'ident': self.get_ident(request)
}
class DynamicRateThrottle(UserRateThrottle):
"""Dynamic rate limiting based on user tier"""
def get_rate(self):
"""Get rate based on user tier"""
if not hasattr(self, 'request') or not self.request.user.is_authenticated:
return '100/hour'
user = self.request.user
if user.groups.filter(name='premium').exists():
return '10000/hour'
elif user.groups.filter(name='pro').exists():
return '5000/hour'
else:
return '1000/hour'
# Usage in views
class UserViewSet(ModelViewSet):
throttle_classes = [BurstRateThrottle, IPRateThrottle, DynamicRateThrottle]
throttle_scope = 'user'
# validation.py
from rest_framework import serializers
from django.core.validators import RegexValidator
import bleach
import re
class SecureSerializer(serializers.ModelSerializer):
"""Base serializer with security features"""
def validate(self, data):
"""Apply security validations"""
# Sanitize HTML content
for field_name, value in data.items():
if isinstance(value, str):
data[field_name] = self.sanitize_html(value)
# Check for SQL injection patterns
self.check_sql_injection(data)
# Check for XSS patterns
self.check_xss_patterns(data)
return data
def sanitize_html(self, value):
"""Sanitize HTML content"""
allowed_tags = ['p', 'br', 'strong', 'em', 'ul', 'ol', 'li']
allowed_attributes = {}
return bleach.clean(
value,
tags=allowed_tags,
attributes=allowed_attributes,
strip=True
)
def check_sql_injection(self, data):
"""Check for SQL injection patterns"""
sql_patterns = [
r'(\b(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER)\b)',
r'(\b(UNION|OR|AND)\b.*\b(SELECT|INSERT|UPDATE|DELETE)\b)',
r'(--|#|/\*|\*/)',
r'(\bEXEC\b|\bEXECUTE\b)',
]
for field_name, value in data.items():
if isinstance(value, str):
for pattern in sql_patterns:
if re.search(pattern, value, re.IGNORECASE):
raise serializers.ValidationError(
f'Potential SQL injection detected in {field_name}'
)
def check_xss_patterns(self, data):
"""Check for XSS patterns"""
xss_patterns = [
r'<script[^>]*>.*?</script>',
r'javascript:',
r'on\w+\s*=',
r'<iframe[^>]*>.*?</iframe>',
]
for field_name, value in data.items():
if isinstance(value, str):
for pattern in xss_patterns:
if re.search(pattern, value, re.IGNORECASE):
raise serializers.ValidationError(
f'Potential XSS detected in {field_name}'
)
class UserSerializer(SecureSerializer):
"""Secure user serializer"""
username = serializers.CharField(
max_length=150,
validators=[
RegexValidator(
regex=r'^[a-zA-Z0-9_-]+$',
message='Username can only contain letters, numbers, underscores, and hyphens'
)
]
)
email = serializers.EmailField()
password = serializers.CharField(
write_only=True,
min_length=8,
validators=[
RegexValidator(
regex=r'^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[@$!%*?&])[A-Za-z\d@$!%*?&]',
message='Password must contain at least one uppercase letter, one lowercase letter, one digit, and one special character'
)
]
)
class Meta:
model = User
fields = ['id', 'username', 'email', 'first_name', 'last_name', 'password']
read_only_fields = ['id']
# encryption.py
from cryptography.fernet import Fernet
from django.conf import settings
from django.db import models
import base64
class EncryptedField(models.TextField):
"""Encrypted database field"""
def __init__(self, *args, **kwargs):
self.cipher_suite = Fernet(settings.ENCRYPTION_KEY.encode())
super().__init__(*args, **kwargs)
def from_db_value(self, value, expression, connection):
if value is None:
return value
try:
# Decrypt the value
decrypted = self.cipher_suite.decrypt(value.encode())
return decrypted.decode()
except Exception:
return value
def to_python(self, value):
if isinstance(value, str):
return value
if value is None:
return value
return str(value)
def get_prep_value(self, value):
if value is None:
return value
# Encrypt the value
encrypted = self.cipher_suite.encrypt(value.encode())
return encrypted.decode()
# Usage in models
class UserProfile(models.Model):
user = models.OneToOneField(User, on_delete=models.CASCADE)
bio = models.TextField()
phone_number = EncryptedField() # Encrypted field
ssn = EncryptedField() # Encrypted field
class Meta:
db_table = 'user_profiles'
# Encryption utilities
class DataEncryption:
"""Data encryption utilities"""
def __init__(self):
self.cipher_suite = Fernet(settings.ENCRYPTION_KEY.encode())
def encrypt_data(self, data):
"""Encrypt sensitive data"""
if isinstance(data, str):
data = data.encode()
encrypted = self.cipher_suite.encrypt(data)
return base64.b64encode(encrypted).decode()
def decrypt_data(self, encrypted_data):
"""Decrypt sensitive data"""
try:
decoded = base64.b64decode(encrypted_data.encode())
decrypted = self.cipher_suite.decrypt(decoded)
return decrypted.decode()
except Exception as e:
raise ValueError(f"Decryption failed: {e}")
def encrypt_file(self, file_path, output_path):
"""Encrypt file"""
with open(file_path, 'rb') as file:
file_data = file.read()
encrypted_data = self.cipher_suite.encrypt(file_data)
with open(output_path, 'wb') as file:
file.write(encrypted_data)
def decrypt_file(self, encrypted_file_path, output_path):
"""Decrypt file"""
with open(encrypted_file_path, 'rb') as file:
encrypted_data = file.read()
decrypted_data = self.cipher_suite.decrypt(encrypted_data)
with open(output_path, 'wb') as file:
file.write(decrypted_data)
# secrets_manager.py
import boto3
import json
from django.conf import settings
from django.core.cache import cache
class SecretsManager:
"""AWS Secrets Manager integration"""
def __init__(self):
self.client = boto3.client(
'secretsmanager',
region_name=settings.AWS_REGION
)
def get_secret(self, secret_name, cache_timeout=300):
"""Get secret with caching"""
cache_key = f"secret:{secret_name}"
secret_value = cache.get(cache_key)
if secret_value is None:
try:
response = self.client.get_secret_value(SecretId=secret_name)
secret_value = json.loads(response['SecretString'])
cache.set(cache_key, secret_value, timeout=cache_timeout)
except Exception as e:
raise ValueError(f"Failed to retrieve secret {secret_name}: {e}")
return secret_value
def update_secret(self, secret_name, secret_value):
"""Update secret"""
try:
self.client.update_secret(
SecretId=secret_name,
SecretString=json.dumps(secret_value)
)
# Invalidate cache
cache_key = f"secret:{secret_name}"
cache.delete(cache_key)
except Exception as e:
raise ValueError(f"Failed to update secret {secret_name}: {e}")
# Kubernetes secrets integration
class KubernetesSecretsManager:
"""Kubernetes secrets integration"""
def __init__(self):
from kubernetes import client, config
try:
config.load_incluster_config()
except:
config.load_kube_config()
self.v1 = client.CoreV1Api()
def get_secret(self, secret_name, namespace='default'):
"""Get Kubernetes secret"""
try:
secret = self.v1.read_namespaced_secret(
name=secret_name,
namespace=namespace
)
# Decode base64 encoded values
decoded_data = {}
for key, value in secret.data.items():
decoded_data[key] = base64.b64decode(value).decode()
return decoded_data
except Exception as e:
raise ValueError(f"Failed to retrieve secret {secret_name}: {e}")
# Environment-based secrets
class EnvironmentSecretsManager:
"""Environment variable based secrets"""
@staticmethod
def get_secret(secret_name):
"""Get secret from environment variables"""
import os
secret_value = os.getenv(secret_name)
if secret_value is None:
raise ValueError(f"Secret {secret_name} not found in environment")
try:
# Try to parse as JSON
return json.loads(secret_value)
except json.JSONDecodeError:
# Return as string if not JSON
return secret_value
# k8s/network-policies.yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: microservices-network-policy
namespace: microservices
spec:
podSelector: {}
policyTypes:
- Ingress
- Egress
ingress:
# Allow ingress from API Gateway
- from:
- namespaceSelector:
matchLabels:
name: ingress-nginx
ports:
- protocol: TCP
port: 8000
# Allow inter-service communication
- from:
- podSelector:
matchLabels:
app: user-service
- podSelector:
matchLabels:
app: product-service
- podSelector:
matchLabels:
app: order-service
ports:
- protocol: TCP
port: 8000
egress:
# Allow egress to databases
- to:
- podSelector:
matchLabels:
app: postgresql
ports:
- protocol: TCP
port: 5432
# Allow egress to Redis
- to:
- podSelector:
matchLabels:
app: redis
ports:
- protocol: TCP
port: 6379
# Allow DNS resolution
- to: []
ports:
- protocol: UDP
port: 53
# Allow HTTPS egress for external APIs
- to: []
ports:
- protocol: TCP
port: 443
---
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: database-network-policy
namespace: microservices
spec:
podSelector:
matchLabels:
app: postgresql
policyTypes:
- Ingress
ingress:
# Only allow access from microservices
- from:
- podSelector:
matchLabels:
app: user-service
- podSelector:
matchLabels:
app: product-service
- podSelector:
matchLabels:
app: order-service
ports:
- protocol: TCP
port: 5432
# istio/security-policies.yaml
apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
name: default
namespace: microservices
spec:
mtls:
mode: STRICT
---
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
name: user-service-authz
namespace: microservices
spec:
selector:
matchLabels:
app: user-service
rules:
- from:
- source:
principals: ["cluster.local/ns/microservices/sa/order-service"]
- source:
principals: ["cluster.local/ns/microservices/sa/api-gateway"]
to:
- operation:
methods: ["GET", "POST", "PUT", "DELETE"]
paths: ["/api/v1/users/*"]
---
apiVersion: security.istio.io/v1beta1
kind: RequestAuthentication
metadata:
name: jwt-auth
namespace: microservices
spec:
selector:
matchLabels:
app: user-service
jwtRules:
- issuer: "https://auth.example.com"
jwksUri: "https://auth.example.com/.well-known/jwks.json"
audiences:
- "microservices-api"
# security_logging.py
import logging
import json
from django.utils.deprecation import MiddlewareMixin
from django.http import HttpResponseForbidden
import time
security_logger = logging.getLogger('security')
class SecurityLoggingMiddleware(MiddlewareMixin):
"""Log security events"""
def __init__(self, get_response):
self.get_response = get_response
self.suspicious_patterns = [
r'<script[^>]*>.*?</script>',
r'javascript:',
r'(\b(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER)\b)',
r'(--|#|/\*|\*/)',
]
def __call__(self, request):
start_time = time.time()
# Check for suspicious patterns
if self.detect_suspicious_activity(request):
self.log_security_event(request, 'SUSPICIOUS_REQUEST', {
'user_agent': request.META.get('HTTP_USER_AGENT'),
'ip_address': self.get_client_ip(request),
'path': request.path,
'method': request.method
})
return HttpResponseForbidden('Suspicious activity detected')
response = self.get_response(request)
# Log authentication failures
if response.status_code == 401:
self.log_security_event(request, 'AUTH_FAILURE', {
'path': request.path,
'method': request.method,
'ip_address': self.get_client_ip(request)
})
# Log access to sensitive endpoints
if self.is_sensitive_endpoint(request.path):
self.log_security_event(request, 'SENSITIVE_ACCESS', {
'user': str(request.user) if request.user.is_authenticated else 'anonymous',
'path': request.path,
'method': request.method,
'ip_address': self.get_client_ip(request),
'response_time': time.time() - start_time
})
return response
def detect_suspicious_activity(self, request):
"""Detect suspicious request patterns"""
# Check query parameters
for key, value in request.GET.items():
if self.contains_suspicious_pattern(value):
return True
# Check POST data
if hasattr(request, 'body') and request.body:
try:
body_str = request.body.decode('utf-8')
if self.contains_suspicious_pattern(body_str):
return True
except UnicodeDecodeError:
pass
return False
def contains_suspicious_pattern(self, text):
"""Check if text contains suspicious patterns"""
import re
for pattern in self.suspicious_patterns:
if re.search(pattern, text, re.IGNORECASE):
return True
return False
def is_sensitive_endpoint(self, path):
"""Check if endpoint is sensitive"""
sensitive_patterns = [
r'/admin/',
r'/api/.*/users/',
r'/api/.*/auth/',
r'/api/.*/payments/',
]
import re
for pattern in sensitive_patterns:
if re.search(pattern, path):
return True
return False
def get_client_ip(self, request):
"""Get client IP address"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
def log_security_event(self, request, event_type, details):
"""Log security event"""
event_data = {
'timestamp': time.time(),
'event_type': event_type,
'service': 'user-service',
'details': details
}
security_logger.warning(json.dumps(event_data))
# Intrusion detection
class IntrusionDetectionSystem:
"""Simple intrusion detection system"""
def __init__(self):
self.failed_attempts = {}
self.blocked_ips = set()
def record_failed_attempt(self, ip_address):
"""Record failed authentication attempt"""
if ip_address not in self.failed_attempts:
self.failed_attempts[ip_address] = []
self.failed_attempts[ip_address].append(time.time())
# Remove old attempts (older than 1 hour)
cutoff_time = time.time() - 3600
self.failed_attempts[ip_address] = [
attempt for attempt in self.failed_attempts[ip_address]
if attempt > cutoff_time
]
# Block IP if too many failed attempts
if len(self.failed_attempts[ip_address]) >= 5:
self.blocked_ips.add(ip_address)
security_logger.critical(json.dumps({
'event_type': 'IP_BLOCKED',
'ip_address': ip_address,
'failed_attempts': len(self.failed_attempts[ip_address]),
'timestamp': time.time()
}))
def is_blocked(self, ip_address):
"""Check if IP is blocked"""
return ip_address in self.blocked_ips
def unblock_ip(self, ip_address):
"""Unblock IP address"""
self.blocked_ips.discard(ip_address)
if ip_address in self.failed_attempts:
del self.failed_attempts[ip_address]
ids = IntrusionDetectionSystem()
# security_metrics.py
from prometheus_client import Counter, Histogram, Gauge
import time
# Security metrics
SECURITY_EVENTS = Counter(
'security_events_total',
'Total security events',
['event_type', 'service']
)
AUTH_FAILURES = Counter(
'auth_failures_total',
'Total authentication failures',
['service', 'reason']
)
BLOCKED_REQUESTS = Counter(
'blocked_requests_total',
'Total blocked requests',
['service', 'reason']
)
ACTIVE_SESSIONS = Gauge(
'active_sessions',
'Number of active user sessions',
['service']
)
class SecurityMetricsCollector:
"""Collect security metrics"""
@staticmethod
def record_security_event(event_type, service='user-service'):
"""Record security event"""
SECURITY_EVENTS.labels(event_type=event_type, service=service).inc()
@staticmethod
def record_auth_failure(reason, service='user-service'):
"""Record authentication failure"""
AUTH_FAILURES.labels(service=service, reason=reason).inc()
@staticmethod
def record_blocked_request(reason, service='user-service'):
"""Record blocked request"""
BLOCKED_REQUESTS.labels(service=service, reason=reason).inc()
@staticmethod
def update_active_sessions(count, service='user-service'):
"""Update active sessions count"""
ACTIVE_SESSIONS.labels(service=service).set(count)
# Usage in middleware
class SecurityMetricsMiddleware(MiddlewareMixin):
"""Collect security metrics"""
def __call__(self, request):
response = self.get_response(request)
# Record metrics based on response
if response.status_code == 401:
SecurityMetricsCollector.record_auth_failure('invalid_credentials')
elif response.status_code == 403:
SecurityMetricsCollector.record_blocked_request('forbidden')
elif response.status_code == 429:
SecurityMetricsCollector.record_blocked_request('rate_limit')
return response
Securing microservices requires a multi-layered approach:
A comprehensive security strategy protects against various threats while maintaining system performance and usability. In the next section, we'll explore performance optimization with caching.
Deploying Microservices
Deploying microservices requires careful orchestration of multiple services, their dependencies, and infrastructure components. This chapter covers deployment strategies, containerization, orchestration platforms, and best practices for production-ready Django microservices.
Improving Microservices Performance with Caching
Performance optimization in microservices requires a multi-layered approach to caching, from application-level caching to distributed caching strategies. This chapter covers comprehensive caching techniques, performance monitoring, and optimization strategies for Django microservices.