Advanced security hardening goes beyond Django's built-in security features to implement enterprise-grade security measures. This comprehensive guide covers advanced authentication, authorization patterns, security monitoring, compliance frameworks, and defense-in-depth strategies for production Django applications.
# authentication/mfa.py
import pyotp
import qrcode
from io import BytesIO
import base64
from django.contrib.auth.models import User
from django.db import models
from django.utils import timezone
from django.core.cache import cache
import secrets
import hashlib
class MFADevice(models.Model):
"""Multi-factor authentication device"""
DEVICE_TYPES = [
('totp', 'Time-based OTP'),
('sms', 'SMS'),
('email', 'Email'),
('backup', 'Backup Codes'),
]
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='mfa_devices')
name = models.CharField(max_length=100)
device_type = models.CharField(max_length=10, choices=DEVICE_TYPES)
secret_key = models.CharField(max_length=32, blank=True)
phone_number = models.CharField(max_length=20, blank=True)
email = models.EmailField(blank=True)
backup_codes = models.JSONField(default=list, blank=True)
is_active = models.BooleanField(default=True)
last_used = models.DateTimeField(null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
unique_together = ['user', 'name']
def generate_secret_key(self):
"""Generate secret key for TOTP"""
if self.device_type == 'totp':
self.secret_key = pyotp.random_base32()
self.save()
def get_qr_code(self):
"""Generate QR code for TOTP setup"""
if self.device_type != 'totp' or not self.secret_key:
return None
totp = pyotp.TOTP(self.secret_key)
provisioning_uri = totp.provisioning_uri(
name=self.user.email,
issuer_name="MyApp"
)
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(provisioning_uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buffer = BytesIO()
img.save(buffer, format='PNG')
return base64.b64encode(buffer.getvalue()).decode()
def verify_token(self, token):
"""Verify MFA token"""
if not self.is_active:
return False
if self.device_type == 'totp':
return self._verify_totp_token(token)
elif self.device_type == 'backup':
return self._verify_backup_code(token)
elif self.device_type in ['sms', 'email']:
return self._verify_sent_token(token)
return False
def _verify_totp_token(self, token):
"""Verify TOTP token"""
totp = pyotp.TOTP(self.secret_key)
# Check current and previous window to account for clock drift
for window in [0, -1, 1]:
if totp.verify(token, valid_window=window):
# Prevent token reuse
cache_key = f"mfa_used_token:{self.id}:{token}"
if cache.get(cache_key):
return False
cache.set(cache_key, True, 60) # 60 second window
self.last_used = timezone.now()
self.save()
return True
return False
def _verify_backup_code(self, code):
"""Verify backup code"""
if code in self.backup_codes:
# Remove used backup code
self.backup_codes.remove(code)
self.last_used = timezone.now()
self.save()
return True
return False
def _verify_sent_token(self, token):
"""Verify SMS/Email token"""
cache_key = f"mfa_sent_token:{self.id}"
stored_token = cache.get(cache_key)
if stored_token and stored_token == token:
cache.delete(cache_key)
self.last_used = timezone.now()
self.save()
return True
return False
def generate_backup_codes(self, count=10):
"""Generate backup codes"""
codes = []
for _ in range(count):
code = secrets.token_hex(4).upper()
codes.append(code)
self.backup_codes = codes
self.save()
return codes
def send_token(self):
"""Send token via SMS or email"""
if self.device_type not in ['sms', 'email']:
return False
# Generate 6-digit token
token = secrets.randbelow(1000000)
token_str = f"{token:06d}"
# Store token in cache for 5 minutes
cache_key = f"mfa_sent_token:{self.id}"
cache.set(cache_key, token_str, 300)
if self.device_type == 'sms':
return self._send_sms_token(token_str)
elif self.device_type == 'email':
return self._send_email_token(token_str)
return False
def _send_sms_token(self, token):
"""Send SMS token"""
# Implementation would use SMS service
from django.core.mail import send_mail
# Placeholder - would use actual SMS service
return True
def _send_email_token(self, token):
"""Send email token"""
from django.core.mail import send_mail
send_mail(
'Your verification code',
f'Your verification code is: {token}',
'noreply@myapp.com',
[self.email or self.user.email],
fail_silently=False,
)
return True
class MFABackend:
"""MFA authentication backend"""
def authenticate(self, request, username=None, password=None, mfa_token=None, **kwargs):
"""Authenticate with MFA"""
from django.contrib.auth import authenticate
# First authenticate with username/password
user = authenticate(request, username=username, password=password)
if not user:
return None
# Check if MFA is required
if not self.requires_mfa(user):
return user
# Check if MFA token provided
if not mfa_token:
# Store partial authentication
cache.set(f"mfa_pending:{username}", user.id, 300)
return None
# Verify MFA token
if self.verify_mfa_token(user, mfa_token):
# Clear partial authentication
cache.delete(f"mfa_pending:{username}")
return user
return None
def requires_mfa(self, user):
"""Check if user requires MFA"""
return user.mfa_devices.filter(is_active=True).exists()
def verify_mfa_token(self, user, token):
"""Verify MFA token against all user devices"""
for device in user.mfa_devices.filter(is_active=True):
if device.verify_token(token):
return True
return False
def get_user(self, user_id):
"""Get user by ID"""
try:
return User.objects.get(pk=user_id)
except User.DoesNotExist:
return None
# MFA middleware
class MFAMiddleware:
"""Middleware to enforce MFA"""
def __init__(self, get_response):
self.get_response = get_response
self.mfa_backend = MFABackend()
def __call__(self, request):
# Check if user needs MFA
if (request.user.is_authenticated and
self.mfa_backend.requires_mfa(request.user) and
not self.is_mfa_verified(request)):
# Redirect to MFA verification
from django.shortcuts import redirect
if not request.path.startswith('/mfa/'):
return redirect('mfa:verify')
response = self.get_response(request)
return response
def is_mfa_verified(self, request):
"""Check if MFA is verified for this session"""
return request.session.get('mfa_verified', False)
# authorization/rbac.py
from django.db import models
from django.contrib.auth.models import User
from django.contrib.contenttypes.models import ContentType
from django.contrib.contenttypes.fields import GenericForeignKey
class Role(models.Model):
"""Role-based access control role"""
name = models.CharField(max_length=100, unique=True)
description = models.TextField(blank=True)
is_active = models.BooleanField(default=True)
created_at = models.DateTimeField(auto_now_add=True)
def __str__(self):
return self.name
class Permission(models.Model):
"""Custom permission"""
name = models.CharField(max_length=100)
codename = models.CharField(max_length=100)
description = models.TextField(blank=True)
# Optional: scope to specific content type
content_type = models.ForeignKey(
ContentType,
on_delete=models.CASCADE,
null=True,
blank=True
)
class Meta:
unique_together = ['codename', 'content_type']
def __str__(self):
if self.content_type:
return f"{self.content_type.name} | {self.name}"
return self.name
class RolePermission(models.Model):
"""Many-to-many relationship between roles and permissions"""
role = models.ForeignKey(Role, on_delete=models.CASCADE)
permission = models.ForeignKey(Permission, on_delete=models.CASCADE)
# Optional: scope permission to specific object
object_id = models.PositiveIntegerField(null=True, blank=True)
content_object = GenericForeignKey('permission__content_type', 'object_id')
class Meta:
unique_together = ['role', 'permission', 'object_id']
class UserRole(models.Model):
"""User role assignment"""
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='user_roles')
role = models.ForeignKey(Role, on_delete=models.CASCADE)
# Optional: scope role to specific object
content_type = models.ForeignKey(
ContentType,
on_delete=models.CASCADE,
null=True,
blank=True
)
object_id = models.PositiveIntegerField(null=True, blank=True)
content_object = GenericForeignKey('content_type', 'object_id')
# Time-based access
valid_from = models.DateTimeField(null=True, blank=True)
valid_until = models.DateTimeField(null=True, blank=True)
is_active = models.BooleanField(default=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
unique_together = ['user', 'role', 'content_type', 'object_id']
def is_valid(self):
"""Check if role assignment is currently valid"""
if not self.is_active:
return False
now = timezone.now()
if self.valid_from and now < self.valid_from:
return False
if self.valid_until and now > self.valid_until:
return False
return True
class RBACBackend:
"""Role-based access control backend"""
def authenticate(self, request, username=None, password=None, **kwargs):
"""This backend doesn't authenticate"""
return None
def has_perm(self, user_obj, perm, obj=None):
"""Check if user has permission"""
if not user_obj.is_active:
return False
# Parse permission
if '.' in perm:
app_label, codename = perm.split('.', 1)
else:
codename = perm
app_label = None
# Get user roles
user_roles = self.get_user_roles(user_obj, obj)
# Check permissions for each role
for role in user_roles:
if self.role_has_permission(role, codename, obj, app_label):
return True
return False
def get_user_roles(self, user, obj=None):
"""Get active roles for user"""
queryset = user.user_roles.filter(is_active=True)
if obj:
content_type = ContentType.objects.get_for_model(obj)
queryset = queryset.filter(
models.Q(content_type__isnull=True) |
models.Q(content_type=content_type, object_id=obj.pk)
)
else:
queryset = queryset.filter(content_type__isnull=True)
# Filter by time validity
now = timezone.now()
queryset = queryset.filter(
models.Q(valid_from__isnull=True) | models.Q(valid_from__lte=now),
models.Q(valid_until__isnull=True) | models.Q(valid_until__gte=now)
)
return [ur.role for ur in queryset.select_related('role')]
def role_has_permission(self, role, codename, obj=None, app_label=None):
"""Check if role has specific permission"""
queryset = RolePermission.objects.filter(
role=role,
permission__codename=codename
)
if app_label:
queryset = queryset.filter(
permission__content_type__app_label=app_label
)
if obj:
content_type = ContentType.objects.get_for_model(obj)
queryset = queryset.filter(
models.Q(object_id__isnull=True) |
models.Q(object_id=obj.pk),
permission__content_type=content_type
)
return queryset.exists()
def get_user(self, user_id):
"""Get user by ID"""
try:
return User.objects.get(pk=user_id)
except User.DoesNotExist:
return None
# Permission decorators
from functools import wraps
from django.core.exceptions import PermissionDenied
def require_permission(permission, obj_func=None):
"""Decorator to require specific permission"""
def decorator(view_func):
@wraps(view_func)
def wrapper(request, *args, **kwargs):
if not request.user.is_authenticated:
raise PermissionDenied("Authentication required")
# Get object if obj_func provided
obj = None
if obj_func:
obj = obj_func(request, *args, **kwargs)
# Check permission
if not request.user.has_perm(permission, obj):
raise PermissionDenied(f"Permission {permission} required")
return view_func(request, *args, **kwargs)
return wrapper
return decorator
def require_role(role_name, obj_func=None):
"""Decorator to require specific role"""
def decorator(view_func):
@wraps(view_func)
def wrapper(request, *args, **kwargs):
if not request.user.is_authenticated:
raise PermissionDenied("Authentication required")
# Get object if obj_func provided
obj = None
if obj_func:
obj = obj_func(request, *args, **kwargs)
# Check if user has role
backend = RBACBackend()
user_roles = backend.get_user_roles(request.user, obj)
if not any(role.name == role_name for role in user_roles):
raise PermissionDenied(f"Role {role_name} required")
return view_func(request, *args, **kwargs)
return wrapper
return decorator
# Usage examples
@require_permission('myapp.view_sensitive_data')
def sensitive_view(request):
"""View requiring specific permission"""
return render(request, 'sensitive.html')
@require_role('manager', lambda req, **kw: get_object_or_404(Project, pk=kw['project_id']))
def manage_project(request, project_id):
"""View requiring manager role for specific project"""
project = get_object_or_404(Project, pk=project_id)
return render(request, 'manage_project.html', {'project': project})
# security/logging.py
import logging
import json
from django.utils.deprecation import MiddlewareMixin
from django.contrib.auth.signals import user_logged_in, user_logged_out, user_login_failed
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
from django.utils import timezone
from django.db import models
import hashlib
class SecurityEvent(models.Model):
"""Security event logging"""
EVENT_TYPES = [
('login_success', 'Login Success'),
('login_failed', 'Login Failed'),
('logout', 'Logout'),
('permission_denied', 'Permission Denied'),
('suspicious_activity', 'Suspicious Activity'),
('data_access', 'Data Access'),
('data_modification', 'Data Modification'),
('admin_action', 'Admin Action'),
('mfa_success', 'MFA Success'),
('mfa_failed', 'MFA Failed'),
]
SEVERITY_LEVELS = [
('low', 'Low'),
('medium', 'Medium'),
('high', 'High'),
('critical', 'Critical'),
]
event_type = models.CharField(max_length=50, choices=EVENT_TYPES)
severity = models.CharField(max_length=10, choices=SEVERITY_LEVELS, default='medium')
user = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True)
ip_address = models.GenericIPAddressField()
user_agent = models.TextField(blank=True)
session_key = models.CharField(max_length=40, blank=True)
# Event details
description = models.TextField()
additional_data = models.JSONField(default=dict, blank=True)
# Request details
request_method = models.CharField(max_length=10, blank=True)
request_path = models.TextField(blank=True)
request_params = models.JSONField(default=dict, blank=True)
timestamp = models.DateTimeField(auto_now_add=True)
class Meta:
indexes = [
models.Index(fields=['event_type', 'timestamp']),
models.Index(fields=['user', 'timestamp']),
models.Index(fields=['ip_address', 'timestamp']),
models.Index(fields=['severity', 'timestamp']),
]
def __str__(self):
return f"{self.event_type} - {self.user} - {self.timestamp}"
class SecurityLogger:
"""Centralized security logging"""
def __init__(self):
self.logger = logging.getLogger('security')
def log_event(self, event_type, request=None, user=None, severity='medium',
description='', additional_data=None):
"""Log security event"""
# Extract request information
ip_address = self._get_client_ip(request) if request else '127.0.0.1'
user_agent = request.META.get('HTTP_USER_AGENT', '') if request else ''
session_key = request.session.session_key if request and hasattr(request, 'session') else ''
request_method = request.method if request else ''
request_path = request.path if request else ''
request_params = dict(request.GET) if request else {}
# Use request user if not provided
if not user and request and hasattr(request, 'user') and request.user.is_authenticated:
user = request.user
# Create security event
event = SecurityEvent.objects.create(
event_type=event_type,
severity=severity,
user=user,
ip_address=ip_address,
user_agent=user_agent,
session_key=session_key,
description=description,
additional_data=additional_data or {},
request_method=request_method,
request_path=request_path,
request_params=request_params
)
# Log to file/external system
log_data = {
'event_id': event.id,
'event_type': event_type,
'severity': severity,
'user_id': user.id if user else None,
'username': user.username if user else None,
'ip_address': ip_address,
'timestamp': timezone.now().isoformat(),
'description': description,
'additional_data': additional_data or {}
}
if severity in ['high', 'critical']:
self.logger.critical(json.dumps(log_data))
elif severity == 'medium':
self.logger.warning(json.dumps(log_data))
else:
self.logger.info(json.dumps(log_data))
# Send alerts for critical events
if severity == 'critical':
self._send_security_alert(event)
return event
def _get_client_ip(self, request):
"""Extract client IP from request"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
def _send_security_alert(self, event):
"""Send security alert for critical events"""
from django.core.mail import send_mail
from django.conf import settings
subject = f"CRITICAL Security Event: {event.event_type}"
message = f"""
Critical security event detected:
Event Type: {event.event_type}
User: {event.user}
IP Address: {event.ip_address}
Timestamp: {event.timestamp}
Description: {event.description}
Additional Data: {json.dumps(event.additional_data, indent=2)}
"""
security_emails = getattr(settings, 'SECURITY_ALERT_EMAILS', [])
if security_emails:
send_mail(
subject,
message,
settings.DEFAULT_FROM_EMAIL,
security_emails,
fail_silently=False
)
# Global security logger
security_logger = SecurityLogger()
# Security monitoring middleware
class SecurityMonitoringMiddleware(MiddlewareMixin):
"""Middleware for security monitoring"""
def __init__(self, get_response):
self.get_response = get_response
self.suspicious_patterns = [
'union select',
'drop table',
'<script',
'javascript:',
'../../../',
'cmd.exe',
'/etc/passwd',
]
def __call__(self, request):
# Check for suspicious patterns
self._check_suspicious_activity(request)
# Track failed permission checks
response = self.get_response(request)
if response.status_code == 403:
security_logger.log_event(
'permission_denied',
request=request,
severity='medium',
description=f"Permission denied for {request.path}"
)
return response
def _check_suspicious_activity(self, request):
"""Check for suspicious activity patterns"""
# Check query parameters
query_string = request.META.get('QUERY_STRING', '').lower()
for pattern in self.suspicious_patterns:
if pattern in query_string:
security_logger.log_event(
'suspicious_activity',
request=request,
severity='high',
description=f"Suspicious pattern detected: {pattern}",
additional_data={'pattern': pattern, 'query_string': query_string}
)
break
# Check for excessive requests from same IP
self._check_rate_limiting(request)
def _check_rate_limiting(self, request):
"""Check for potential DoS attacks"""
ip_address = security_logger._get_client_ip(request)
cache_key = f"request_count:{ip_address}"
from django.core.cache import cache
current_count = cache.get(cache_key, 0)
cache.set(cache_key, current_count + 1, 60) # 1 minute window
if current_count > 100: # More than 100 requests per minute
security_logger.log_event(
'suspicious_activity',
request=request,
severity='high',
description=f"Excessive requests from IP: {ip_address}",
additional_data={'request_count': current_count}
)
# Signal handlers for security events
@receiver(user_logged_in)
def log_user_login(sender, request, user, **kwargs):
"""Log successful user login"""
security_logger.log_event(
'login_success',
request=request,
user=user,
severity='low',
description=f"User {user.username} logged in successfully"
)
@receiver(user_login_failed)
def log_failed_login(sender, credentials, request, **kwargs):
"""Log failed login attempt"""
username = credentials.get('username', 'unknown')
security_logger.log_event(
'login_failed',
request=request,
severity='medium',
description=f"Failed login attempt for username: {username}",
additional_data={'username': username}
)
# Check for brute force attempts
ip_address = security_logger._get_client_ip(request)
cache_key = f"failed_login:{ip_address}"
from django.core.cache import cache
failed_count = cache.get(cache_key, 0)
cache.set(cache_key, failed_count + 1, 300) # 5 minute window
if failed_count > 5: # More than 5 failed attempts
security_logger.log_event(
'suspicious_activity',
request=request,
severity='high',
description=f"Brute force attack detected from IP: {ip_address}",
additional_data={'failed_attempts': failed_count}
)
@receiver(user_logged_out)
def log_user_logout(sender, request, user, **kwargs):
"""Log user logout"""
security_logger.log_event(
'logout',
request=request,
user=user,
severity='low',
description=f"User {user.username} logged out"
)
# compliance/gdpr.py
from django.db import models
from django.contrib.auth.models import User
from django.utils import timezone
import json
class DataProcessingPurpose(models.Model):
"""GDPR data processing purposes"""
name = models.CharField(max_length=100, unique=True)
description = models.TextField()
legal_basis = models.CharField(max_length=50, choices=[
('consent', 'Consent'),
('contract', 'Contract'),
('legal_obligation', 'Legal Obligation'),
('vital_interests', 'Vital Interests'),
('public_task', 'Public Task'),
('legitimate_interests', 'Legitimate Interests'),
])
retention_period = models.IntegerField(help_text="Retention period in days")
is_active = models.BooleanField(default=True)
def __str__(self):
return self.name
class ConsentRecord(models.Model):
"""GDPR consent tracking"""
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='consent_records')
purpose = models.ForeignKey(DataProcessingPurpose, on_delete=models.CASCADE)
consent_given = models.BooleanField()
consent_date = models.DateTimeField(auto_now_add=True)
consent_method = models.CharField(max_length=50, choices=[
('web_form', 'Web Form'),
('email', 'Email'),
('phone', 'Phone'),
('paper', 'Paper'),
])
ip_address = models.GenericIPAddressField(null=True, blank=True)
user_agent = models.TextField(blank=True)
# Withdrawal tracking
withdrawn_date = models.DateTimeField(null=True, blank=True)
withdrawal_method = models.CharField(max_length=50, blank=True)
class Meta:
unique_together = ['user', 'purpose']
indexes = [
models.Index(fields=['user', 'consent_given']),
models.Index(fields=['purpose', 'consent_given']),
]
def withdraw_consent(self, method='web_form'):
"""Withdraw consent"""
self.consent_given = False
self.withdrawn_date = timezone.now()
self.withdrawal_method = method
self.save()
class DataExportRequest(models.Model):
"""GDPR data export requests"""
STATUS_CHOICES = [
('pending', 'Pending'),
('processing', 'Processing'),
('completed', 'Completed'),
('failed', 'Failed'),
]
user = models.ForeignKey(User, on_delete=models.CASCADE)
request_date = models.DateTimeField(auto_now_add=True)
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='pending')
export_file = models.FileField(upload_to='gdpr_exports/', blank=True)
completion_date = models.DateTimeField(null=True, blank=True)
error_message = models.TextField(blank=True)
def generate_export(self):
"""Generate user data export"""
try:
self.status = 'processing'
self.save()
# Collect user data from all relevant models
user_data = self._collect_user_data()
# Create export file
import tempfile
import json
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
json.dump(user_data, f, indent=2, default=str)
temp_path = f.name
# Save to model
from django.core.files import File
with open(temp_path, 'rb') as f:
self.export_file.save(
f'user_{self.user.id}_export_{timezone.now().strftime("%Y%m%d_%H%M%S")}.json',
File(f)
)
self.status = 'completed'
self.completion_date = timezone.now()
self.save()
# Clean up temp file
import os
os.unlink(temp_path)
except Exception as e:
self.status = 'failed'
self.error_message = str(e)
self.save()
def _collect_user_data(self):
"""Collect all user data for export"""
from django.apps import apps
user_data = {
'user_info': {
'id': self.user.id,
'username': self.user.username,
'email': self.user.email,
'first_name': self.user.first_name,
'last_name': self.user.last_name,
'date_joined': self.user.date_joined,
'last_login': self.user.last_login,
},
'consent_records': [],
'security_events': [],
'related_data': {}
}
# Collect consent records
for consent in self.user.consent_records.all():
user_data['consent_records'].append({
'purpose': consent.purpose.name,
'consent_given': consent.consent_given,
'consent_date': consent.consent_date,
'withdrawn_date': consent.withdrawn_date,
})
# Collect security events
for event in SecurityEvent.objects.filter(user=self.user):
user_data['security_events'].append({
'event_type': event.event_type,
'timestamp': event.timestamp,
'ip_address': event.ip_address,
'description': event.description,
})
# Collect data from other models
for model in apps.get_models():
if hasattr(model, 'user') and model != User:
try:
related_objects = model.objects.filter(user=self.user)
if related_objects.exists():
model_name = model._meta.label_lower
user_data['related_data'][model_name] = []
for obj in related_objects:
obj_data = {}
for field in model._meta.fields:
if not field.name.endswith('_id'): # Skip foreign key IDs
value = getattr(obj, field.name)
obj_data[field.name] = value
user_data['related_data'][model_name].append(obj_data)
except Exception:
# Skip models that cause errors
pass
return user_data
class DataDeletionRequest(models.Model):
"""GDPR data deletion requests"""
STATUS_CHOICES = [
('pending', 'Pending'),
('processing', 'Processing'),
('completed', 'Completed'),
('failed', 'Failed'),
('rejected', 'Rejected'),
]
user = models.ForeignKey(User, on_delete=models.CASCADE)
request_date = models.DateTimeField(auto_now_add=True)
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='pending')
reason = models.TextField(blank=True)
completion_date = models.DateTimeField(null=True, blank=True)
rejection_reason = models.TextField(blank=True)
def process_deletion(self):
"""Process data deletion request"""
try:
self.status = 'processing'
self.save()
# Check if deletion is allowed
if not self._can_delete_user_data():
self.status = 'rejected'
self.rejection_reason = "User data cannot be deleted due to legal obligations"
self.save()
return
# Anonymize or delete user data
self._anonymize_user_data()
self.status = 'completed'
self.completion_date = timezone.now()
self.save()
except Exception as e:
self.status = 'failed'
self.save()
def _can_delete_user_data(self):
"""Check if user data can be deleted"""
# Check for legal obligations that prevent deletion
# e.g., financial records, legal disputes, etc.
return True
def _anonymize_user_data(self):
"""Anonymize user data"""
# Anonymize user record
self.user.username = f"deleted_user_{self.user.id}"
self.user.email = f"deleted_{self.user.id}@example.com"
self.user.first_name = ""
self.user.last_name = ""
self.user.is_active = False
self.user.save()
# Anonymize related data
from django.apps import apps
for model in apps.get_models():
if hasattr(model, 'user'):
try:
# Instead of deleting, anonymize personal data
related_objects = model.objects.filter(user=self.user)
for obj in related_objects:
# Anonymize personal fields
for field in model._meta.fields:
if field.name in ['email', 'phone', 'address', 'name']:
setattr(obj, field.name, f"[DELETED_{obj.id}]")
obj.save()
except Exception:
# Skip models that cause errors
pass
# GDPR compliance utilities
class GDPRCompliance:
"""GDPR compliance utilities"""
@staticmethod
def check_consent(user, purpose_name):
"""Check if user has given consent for specific purpose"""
try:
purpose = DataProcessingPurpose.objects.get(name=purpose_name)
consent = ConsentRecord.objects.get(user=user, purpose=purpose)
return consent.consent_given and not consent.withdrawn_date
except (DataProcessingPurpose.DoesNotExist, ConsentRecord.DoesNotExist):
return False
@staticmethod
def record_consent(user, purpose_name, consent_given, method='web_form',
ip_address=None, user_agent=''):
"""Record user consent"""
purpose = DataProcessingPurpose.objects.get(name=purpose_name)
consent, created = ConsentRecord.objects.update_or_create(
user=user,
purpose=purpose,
defaults={
'consent_given': consent_given,
'consent_method': method,
'ip_address': ip_address,
'user_agent': user_agent,
'withdrawn_date': None,
'withdrawal_method': '',
}
)
return consent
@staticmethod
def get_data_retention_date(user, purpose_name):
"""Get data retention date for user and purpose"""
try:
purpose = DataProcessingPurpose.objects.get(name=purpose_name)
consent = ConsentRecord.objects.get(user=user, purpose=purpose)
if consent.withdrawn_date:
# Data should be deleted immediately after withdrawal
return consent.withdrawn_date
else:
# Data should be deleted after retention period
return consent.consent_date + timezone.timedelta(days=purpose.retention_period)
except (DataProcessingPurpose.DoesNotExist, ConsentRecord.DoesNotExist):
return None
Advanced security hardening requires a comprehensive approach that includes strong authentication, granular authorization, continuous monitoring, and compliance with regulatory requirements. The key is implementing defense-in-depth strategies while maintaining usability and performance. Regular security audits, penetration testing, and staying updated with the latest security best practices are essential for maintaining a secure Django application.
Integrating Microservices
Microservices architecture breaks down monolithic applications into smaller, independent services that communicate over well-defined APIs. This guide covers strategies for integrating Django applications with microservices, including service communication patterns, data consistency, and deployment considerations.
Logging in Django
Logging is essential for monitoring, debugging, and maintaining Django applications. Django's logging framework, built on Python's logging module, provides flexible and powerful tools for capturing and managing application logs across development and production environments.