Asynchronous Django

Asynchronous Support

Django's asynchronous support enables building high-performance applications that can handle concurrent operations efficiently. This chapter covers async views, async safety considerations, and adapter functions that bridge synchronous and asynchronous code.

Asynchronous Support

Django's asynchronous support enables building high-performance applications that can handle concurrent operations efficiently. This chapter covers async views, async safety considerations, and adapter functions that bridge synchronous and asynchronous code.

Async Views

Understanding Async Views

Async views allow Django to handle multiple requests concurrently without blocking threads. They're particularly beneficial for I/O-bound operations like API calls, file operations, or database queries.

# Basic async view patterns
import asyncio
import aiohttp
from django.http import JsonResponse
from django.shortcuts import aget_object_or_404
from asgiref.sync import sync_to_async
from .models import Post, User

async def simple_async_view(request):
    """Basic async view example."""
    # Simulate async I/O operation
    await asyncio.sleep(0.1)
    
    return JsonResponse({
        'message': 'Hello from async view',
        'timestamp': asyncio.get_event_loop().time()
    })

async def async_database_view(request):
    """Async view with database operations."""
    # Using Django's async ORM methods (Django 4.1+)
    posts_count = await Post.objects.acount()
    
    # Async iteration over queryset
    recent_posts = []
    async for post in Post.objects.filter(published=True)[:5]:
        recent_posts.append({
            'id': post.id,
            'title': post.title,
            'created_at': post.created_at.isoformat()
        })
    
    return JsonResponse({
        'total_posts': posts_count,
        'recent_posts': recent_posts
    })

async def async_external_api_view(request):
    """Async view calling external APIs."""
    async with aiohttp.ClientSession() as session:
        # Multiple concurrent API calls
        tasks = [
            fetch_weather_data(session, 'New York'),
            fetch_weather_data(session, 'London'),
            fetch_weather_data(session, 'Tokyo')
        ]
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        weather_data = {}
        for i, city in enumerate(['New York', 'London', 'Tokyo']):
            if isinstance(results[i], Exception):
                weather_data[city] = {'error': str(results[i])}
            else:
                weather_data[city] = results[i]
        
        return JsonResponse({'weather': weather_data})

async def fetch_weather_data(session, city):
    """Fetch weather data for a city."""
    url = f"https://api.weather.com/v1/current?city={city}"
    
    try:
        async with session.get(url) as response:
            if response.status == 200:
                return await response.json()
            else:
                return {'error': f'HTTP {response.status}'}
    except Exception as e:
        return {'error': str(e)}

Advanced Async View Patterns

# Advanced async view implementations
from django.views import View
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
import json

class AsyncBaseView(View):
    """Base class for async views with proper dispatch."""
    
    async def dispatch(self, request, *args, **kwargs):
        """Async dispatch method."""
        # Get the handler method
        handler = getattr(self, request.method.lower(), self.http_method_not_allowed)
        
        # Check if handler is async
        if asyncio.iscoroutinefunction(handler):
            return await handler(request, *args, **kwargs)
        else:
            # Handle sync methods in async context
            return await sync_to_async(handler)(request, *args, **kwargs)

class AsyncAPIView(AsyncBaseView):
    """Generic async API view."""
    
    async def get(self, request, *args, **kwargs):
        """Handle GET requests asynchronously."""
        try:
            data = await self.get_data(request, *args, **kwargs)
            return JsonResponse(data)
        except Exception as e:
            return JsonResponse({'error': str(e)}, status=500)
    
    async def post(self, request, *args, **kwargs):
        """Handle POST requests asynchronously."""
        try:
            # Read request body asynchronously
            body = await request.aread()
            data = json.loads(body) if body else {}
            
            result = await self.create_data(request, data, *args, **kwargs)
            return JsonResponse(result, status=201)
        
        except json.JSONDecodeError:
            return JsonResponse({'error': 'Invalid JSON'}, status=400)
        except Exception as e:
            return JsonResponse({'error': str(e)}, status=500)
    
    async def put(self, request, *args, **kwargs):
        """Handle PUT requests asynchronously."""
        try:
            body = await request.aread()
            data = json.loads(body) if body else {}
            
            result = await self.update_data(request, data, *args, **kwargs)
            return JsonResponse(result)
        
        except json.JSONDecodeError:
            return JsonResponse({'error': 'Invalid JSON'}, status=400)
        except Exception as e:
            return JsonResponse({'error': str(e)}, status=500)
    
    async def delete(self, request, *args, **kwargs):
        """Handle DELETE requests asynchronously."""
        try:
            await self.delete_data(request, *args, **kwargs)
            return JsonResponse({'message': 'Deleted successfully'}, status=204)
        except Exception as e:
            return JsonResponse({'error': str(e)}, status=500)
    
    # Override these methods in subclasses
    async def get_data(self, request, *args, **kwargs):
        """Override to implement GET logic."""
        raise NotImplementedError
    
    async def create_data(self, request, data, *args, **kwargs):
        """Override to implement POST logic."""
        raise NotImplementedError
    
    async def update_data(self, request, data, *args, **kwargs):
        """Override to implement PUT logic."""
        raise NotImplementedError
    
    async def delete_data(self, request, *args, **kwargs):
        """Override to implement DELETE logic."""
        raise NotImplementedError

@method_decorator(csrf_exempt, name='dispatch')
class PostAPIView(AsyncAPIView):
    """Async API view for posts."""
    
    async def get_data(self, request, *args, **kwargs):
        """Get posts data."""
        post_id = kwargs.get('post_id')
        
        if post_id:
            # Get single post
            post = await aget_object_or_404(Post, id=post_id, published=True)
            return {
                'id': post.id,
                'title': post.title,
                'content': post.content,
                'created_at': post.created_at.isoformat()
            }
        else:
            # Get list of posts
            posts = []
            async for post in Post.objects.filter(published=True)[:10]:
                posts.append({
                    'id': post.id,
                    'title': post.title,
                    'created_at': post.created_at.isoformat()
                })
            
            return {'posts': posts}
    
    async def create_data(self, request, data, *args, **kwargs):
        """Create new post."""
        # Use sync_to_async for complex database operations
        @sync_to_async
        def create_post():
            return Post.objects.create(
                title=data['title'],
                content=data['content'],
                author_id=data['author_id'],
                published=data.get('published', False)
            )
        
        post = await create_post()
        
        return {
            'id': post.id,
            'title': post.title,
            'message': 'Post created successfully'
        }
    
    async def update_data(self, request, data, *args, **kwargs):
        """Update existing post."""
        post_id = kwargs.get('post_id')
        
        @sync_to_async
        def update_post():
            post = Post.objects.get(id=post_id)
            post.title = data.get('title', post.title)
            post.content = data.get('content', post.content)
            post.published = data.get('published', post.published)
            post.save()
            return post
        
        post = await update_post()
        
        return {
            'id': post.id,
            'title': post.title,
            'message': 'Post updated successfully'
        }
    
    async def delete_data(self, request, *args, **kwargs):
        """Delete post."""
        post_id = kwargs.get('post_id')
        
        @sync_to_async
        def delete_post():
            post = Post.objects.get(id=post_id)
            post.delete()
        
        await delete_post()

Async Middleware

# middleware/async_middleware.py
import asyncio
import time
from django.utils.deprecation import MiddlewareMixin

class AsyncTimingMiddleware:
    """Async middleware for request timing."""
    
    def __init__(self, get_response):
        self.get_response = get_response
    
    async def __call__(self, request):
        """Process request asynchronously."""
        start_time = time.time()
        
        # Process request
        response = await self.get_response(request)
        
        # Calculate processing time
        processing_time = time.time() - start_time
        
        # Add timing header
        response['X-Processing-Time'] = f"{processing_time:.4f}s"
        
        return response

class AsyncLoggingMiddleware:
    """Async middleware for request logging."""
    
    def __init__(self, get_response):
        self.get_response = get_response
    
    async def __call__(self, request):
        """Log requests asynchronously."""
        # Log request start
        await self.log_request_start(request)
        
        # Process request
        response = await self.get_response(request)
        
        # Log request completion
        await self.log_request_end(request, response)
        
        return response
    
    async def log_request_start(self, request):
        """Log request start asynchronously."""
        # Use sync_to_async for database logging
        @sync_to_async
        def log_to_db():
            # Log to database or external service
            print(f"Request started: {request.method} {request.path}")
        
        await log_to_db()
    
    async def log_request_end(self, request, response):
        """Log request completion asynchronously."""
        @sync_to_async
        def log_to_db():
            print(f"Request completed: {request.method} {request.path} - {response.status_code}")
        
        await log_to_db()

class AsyncRateLimitMiddleware:
    """Async rate limiting middleware."""
    
    def __init__(self, get_response):
        self.get_response = get_response
        self.rate_limits = {}  # In production, use Redis or similar
    
    async def __call__(self, request):
        """Apply rate limiting asynchronously."""
        client_ip = self.get_client_ip(request)
        
        # Check rate limit
        if await self.is_rate_limited(client_ip):
            from django.http import JsonResponse
            return JsonResponse(
                {'error': 'Rate limit exceeded'}, 
                status=429
            )
        
        # Process request
        response = await self.get_response(request)
        
        # Update rate limit counter
        await self.update_rate_limit(client_ip)
        
        return response
    
    def get_client_ip(self, request):
        """Get client IP address."""
        x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
        if x_forwarded_for:
            return x_forwarded_for.split(',')[0]
        return request.META.get('REMOTE_ADDR')
    
    async def is_rate_limited(self, client_ip):
        """Check if client is rate limited."""
        # Simulate async rate limit check
        await asyncio.sleep(0.001)
        
        current_time = time.time()
        window_start = current_time - 60  # 1-minute window
        
        # Clean old entries
        if client_ip in self.rate_limits:
            self.rate_limits[client_ip] = [
                timestamp for timestamp in self.rate_limits[client_ip]
                if timestamp > window_start
            ]
        
        # Check current count
        current_count = len(self.rate_limits.get(client_ip, []))
        return current_count >= 100  # 100 requests per minute
    
    async def update_rate_limit(self, client_ip):
        """Update rate limit counter."""
        await asyncio.sleep(0.001)  # Simulate async operation
        
        current_time = time.time()
        
        if client_ip not in self.rate_limits:
            self.rate_limits[client_ip] = []
        
        self.rate_limits[client_ip].append(current_time)

Async Safety

Understanding Async Safety

Async safety refers to the ability of code to work correctly in asynchronous contexts without causing race conditions, deadlocks, or other concurrency issues.

# Thread-safe vs Async-safe patterns
import asyncio
import threading
from django.core.cache import cache
from asgiref.sync import sync_to_async

# NOT async-safe: Global state without protection
counter = 0

async def unsafe_increment():
    """NOT async-safe - race condition possible."""
    global counter
    current = counter
    await asyncio.sleep(0.001)  # Simulate async operation
    counter = current + 1

# Async-safe: Using asyncio.Lock
async_lock = asyncio.Lock()
safe_counter = 0

async def safe_increment():
    """Async-safe increment with lock."""
    global safe_counter
    
    async with async_lock:
        current = safe_counter
        await asyncio.sleep(0.001)
        safe_counter = current + 1

# Async-safe: Using local state
async def local_state_example():
    """Async-safe using local variables."""
    local_counter = 0
    
    async def increment():
        nonlocal local_counter
        local_counter += 1
    
    # Multiple concurrent calls are safe
    await asyncio.gather(*[increment() for _ in range(100)])
    return local_counter

# Database operations safety
class AsyncSafeUserService:
    """Async-safe user service."""
    
    def __init__(self):
        self._lock = asyncio.Lock()
    
    async def create_user_safely(self, username, email):
        """Create user with async safety."""
        async with self._lock:
            # Check if user exists
            exists = await sync_to_async(
                User.objects.filter(username=username).exists
            )()
            
            if exists:
                raise ValueError(f"User {username} already exists")
            
            # Create user
            user = await sync_to_async(User.objects.create)(
                username=username,
                email=email
            )
            
            return user
    
    async def update_user_profile(self, user_id, **kwargs):
        """Update user profile safely."""
        @sync_to_async
        def update_profile():
            try:
                user = User.objects.select_for_update().get(id=user_id)
                
                for field, value in kwargs.items():
                    if hasattr(user, field):
                        setattr(user, field, value)
                
                user.save()
                return user
            except User.DoesNotExist:
                return None
        
        return await update_profile()

Async-Safe Patterns

# Async-safe singleton pattern
class AsyncSafeSingleton:
    """Async-safe singleton implementation."""
    
    _instance = None
    _lock = asyncio.Lock()
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance
    
    @classmethod
    async def get_instance(cls):
        """Get singleton instance safely."""
        if cls._instance is None:
            async with cls._lock:
                if cls._instance is None:
                    cls._instance = cls()
                    await cls._instance.initialize()
        
        return cls._instance
    
    async def initialize(self):
        """Initialize singleton asynchronously."""
        # Async initialization code
        await asyncio.sleep(0.1)
        self.initialized = True

# Async-safe connection pool
class AsyncConnectionPool:
    """Async-safe connection pool."""
    
    def __init__(self, max_connections=10):
        self.max_connections = max_connections
        self.connections = asyncio.Queue(maxsize=max_connections)
        self.created_connections = 0
        self._lock = asyncio.Lock()
    
    async def get_connection(self):
        """Get connection from pool."""
        try:
            # Try to get existing connection
            connection = self.connections.get_nowait()
            return connection
        except asyncio.QueueEmpty:
            # Create new connection if under limit
            async with self._lock:
                if self.created_connections < self.max_connections:
                    connection = await self.create_connection()
                    self.created_connections += 1
                    return connection
            
            # Wait for available connection
            return await self.connections.get()
    
    async def return_connection(self, connection):
        """Return connection to pool."""
        if connection.is_healthy():
            await self.connections.put(connection)
        else:
            # Replace unhealthy connection
            async with self._lock:
                self.created_connections -= 1
    
    async def create_connection(self):
        """Create new connection."""
        # Simulate connection creation
        await asyncio.sleep(0.1)
        return MockConnection()

class MockConnection:
    """Mock connection for example."""
    
    def __init__(self):
        self.healthy = True
    
    def is_healthy(self):
        return self.healthy

# Async-safe caching
class AsyncSafeCache:
    """Async-safe cache implementation."""
    
    def __init__(self):
        self._cache = {}
        self._locks = {}
        self._main_lock = asyncio.Lock()
    
    async def get(self, key):
        """Get value from cache."""
        return self._cache.get(key)
    
    async def set(self, key, value, ttl=None):
        """Set value in cache with async safety."""
        # Get or create lock for this key
        async with self._main_lock:
            if key not in self._locks:
                self._locks[key] = asyncio.Lock()
            key_lock = self._locks[key]
        
        # Set value with key-specific lock
        async with key_lock:
            self._cache[key] = {
                'value': value,
                'expires': time.time() + ttl if ttl else None
            }
    
    async def get_or_set(self, key, factory_func, ttl=None):
        """Get value or set using factory function."""
        # Check if value exists and is not expired
        cached = await self.get(key)
        if cached and (not cached['expires'] or cached['expires'] > time.time()):
            return cached['value']
        
        # Get or create lock for this key
        async with self._main_lock:
            if key not in self._locks:
                self._locks[key] = asyncio.Lock()
            key_lock = self._locks[key]
        
        # Double-check pattern
        async with key_lock:
            cached = await self.get(key)
            if cached and (not cached['expires'] or cached['expires'] > time.time()):
                return cached['value']
            
            # Generate new value
            if asyncio.iscoroutinefunction(factory_func):
                value = await factory_func()
            else:
                value = factory_func()
            
            await self.set(key, value, ttl)
            return value

Avoiding Common Async Pitfalls

# Common async pitfalls and solutions
import asyncio
from django.db import transaction
from asgiref.sync import sync_to_async

# PITFALL 1: Blocking operations in async code
async def bad_blocking_example():
    """BAD: Blocking operation in async function."""
    import time
    time.sleep(1)  # This blocks the entire event loop!
    return "Done"

async def good_non_blocking_example():
    """GOOD: Non-blocking equivalent."""
    await asyncio.sleep(1)  # This doesn't block other coroutines
    return "Done"

# PITFALL 2: Not awaiting coroutines
async def bad_not_awaiting():
    """BAD: Not awaiting coroutines."""
    result = asyncio.sleep(1)  # Returns coroutine object, not result
    return result  # This returns a coroutine, not "Done"

async def good_awaiting():
    """GOOD: Properly awaiting coroutines."""
    await asyncio.sleep(1)
    return "Done"

# PITFALL 3: Using sync database operations incorrectly
async def bad_sync_db_usage():
    """BAD: Sync database operations in async context."""
    # This can cause database connection issues
    users = User.objects.all()  # Sync operation
    return list(users)

async def good_async_db_usage():
    """GOOD: Proper async database usage."""
    # Option 1: Use async ORM methods
    users = []
    async for user in User.objects.all():
        users.append(user)
    
    return users

async def good_sync_to_async_usage():
    """GOOD: Using sync_to_async wrapper."""
    @sync_to_async
    def get_users():
        return list(User.objects.all())
    
    users = await get_users()
    return users

# PITFALL 4: Incorrect transaction handling
async def bad_transaction_usage():
    """BAD: Incorrect async transaction usage."""
    with transaction.atomic():  # This won't work properly in async
        user = await sync_to_async(User.objects.create)(username='test')
        # More operations...

async def good_transaction_usage():
    """GOOD: Proper async transaction handling."""
    @sync_to_async
    def create_user_with_transaction():
        with transaction.atomic():
            user = User.objects.create(username='test')
            # More operations...
            return user
    
    user = await create_user_with_transaction()
    return user

# PITFALL 5: Shared mutable state without protection
class BadAsyncCounter:
    """BAD: Unprotected shared state."""
    
    def __init__(self):
        self.count = 0
    
    async def increment(self):
        current = self.count
        await asyncio.sleep(0.001)  # Simulate async work
        self.count = current + 1  # Race condition!

class GoodAsyncCounter:
    """GOOD: Protected shared state."""
    
    def __init__(self):
        self.count = 0
        self._lock = asyncio.Lock()
    
    async def increment(self):
        async with self._lock:
            current = self.count
            await asyncio.sleep(0.001)
            self.count = current + 1

# PITFALL 6: Not handling exceptions properly
async def bad_exception_handling():
    """BAD: Not handling async exceptions."""
    tasks = [
        risky_async_operation(i) for i in range(10)
    ]
    
    # If any task fails, all results are lost
    results = await asyncio.gather(*tasks)
    return results

async def good_exception_handling():
    """GOOD: Proper async exception handling."""
    tasks = [
        risky_async_operation(i) for i in range(10)
    ]
    
    # Handle exceptions gracefully
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    successful_results = []
    errors = []
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            errors.append({'index': i, 'error': str(result)})
        else:
            successful_results.append(result)
    
    return {
        'successful': successful_results,
        'errors': errors
    }

async def risky_async_operation(value):
    """Simulate risky async operation."""
    await asyncio.sleep(0.1)
    if value % 3 == 0:
        raise ValueError(f"Value {value} is divisible by 3")
    return value * 2

Async Adapter Functions

sync_to_async and async_to_sync

# Using Django's async adapters
from asgiref.sync import sync_to_async, async_to_sync
from django.contrib.auth.models import User
from django.db import transaction
import asyncio

# Basic sync_to_async usage
@sync_to_async
def get_user_by_username(username):
    """Convert sync function to async."""
    return User.objects.get(username=username)

async def async_user_lookup():
    """Use sync function in async context."""
    user = await get_user_by_username('admin')
    return user

# Method conversion
class UserService:
    """Service with both sync and async methods."""
    
    def get_user_sync(self, user_id):
        """Synchronous user retrieval."""
        return User.objects.get(id=user_id)
    
    # Convert method to async
    async def get_user_async(self, user_id):
        """Asynchronous user retrieval."""
        get_user = sync_to_async(self.get_user_sync)
        return await get_user(user_id)
    
    # Alternative: Direct conversion
    def __init__(self):
        self.get_user_async_alt = sync_to_async(self.get_user_sync)

# Complex database operations
class AsyncUserManager:
    """Async user manager with complex operations."""
    
    @sync_to_async
    def create_user_with_profile(self, username, email, profile_data):
        """Create user with profile in transaction."""
        with transaction.atomic():
            user = User.objects.create(
                username=username,
                email=email
            )
            
            # Create related profile
            from .models import UserProfile
            UserProfile.objects.create(
                user=user,
                **profile_data
            )
            
            return user
    
    @sync_to_async
    def bulk_update_users(self, user_updates):
        """Bulk update users."""
        users_to_update = []
        
        for user_id, updates in user_updates.items():
            try:
                user = User.objects.get(id=user_id)
                for field, value in updates.items():
                    setattr(user, field, value)
                users_to_update.append(user)
            except User.DoesNotExist:
                continue
        
        User.objects.bulk_update(
            users_to_update,
            ['email', 'first_name', 'last_name']
        )
        
        return len(users_to_update)
    
    async def process_user_batch(self, user_data_list):
        """Process multiple users concurrently."""
        tasks = []
        
        for user_data in user_data_list:
            task = self.create_user_with_profile(
                user_data['username'],
                user_data['email'],
                user_data['profile']
            )
            tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        successful = []
        failed = []
        
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                failed.append({
                    'data': user_data_list[i],
                    'error': str(result)
                })
            else:
                successful.append(result)
        
        return {
            'successful': successful,
            'failed': failed
        }

# Using async_to_sync (less common in views)
def sync_view_calling_async(request):
    """Sync view that needs to call async function."""
    
    async def fetch_external_data():
        """Async function to fetch external data."""
        async with aiohttp.ClientSession() as session:
            async with session.get('https://api.example.com/data') as response:
                return await response.json()
    
    # Convert async function to sync
    sync_fetch = async_to_sync(fetch_external_data)
    data = sync_fetch()
    
    return JsonResponse(data)

Custom Adapter Patterns

# Custom async adapters and decorators
import functools
from typing import Callable, Any
from asgiref.sync import sync_to_async

def async_method(func: Callable) -> Callable:
    """Decorator to convert sync method to async."""
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        sync_func = sync_to_async(func)
        return await sync_func(*args, **kwargs)
    
    return wrapper

def cached_async_method(cache_key_func: Callable = None, ttl: int = 300):
    """Decorator for caching async method results."""
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            # Generate cache key
            if cache_key_func:
                cache_key = cache_key_func(*args, **kwargs)
            else:
                cache_key = f"{func.__name__}:{hash(str(args) + str(kwargs))}"
            
            # Try to get from cache
            from django.core.cache import cache
            cached_result = await sync_to_async(cache.get)(cache_key)
            
            if cached_result is not None:
                return cached_result
            
            # Execute function
            result = await func(*args, **kwargs)
            
            # Cache result
            await sync_to_async(cache.set)(cache_key, result, ttl)
            
            return result
        
        return wrapper
    return decorator

class AsyncAdapter:
    """Generic async adapter for sync classes."""
    
    def __init__(self, sync_class):
        self.sync_class = sync_class
        self._instance = None
    
    async def __aenter__(self):
        """Async context manager entry."""
        create_instance = sync_to_async(self.sync_class)
        self._instance = await create_instance()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit."""
        if hasattr(self._instance, 'close'):
            close_method = sync_to_async(self._instance.close)
            await close_method()
    
    def __getattr__(self, name):
        """Convert sync methods to async on demand."""
        if self._instance is None:
            raise RuntimeError("Adapter not initialized. Use async with statement.")
        
        attr = getattr(self._instance, name)
        
        if callable(attr):
            return sync_to_async(attr)
        else:
            return attr

# Usage examples
class SyncFileProcessor:
    """Sync file processor."""
    
    def __init__(self, file_path):
        self.file_path = file_path
        self.file = open(file_path, 'r')
    
    def process_line(self, line):
        """Process a single line."""
        return line.strip().upper()
    
    def process_all(self):
        """Process all lines."""
        return [self.process_line(line) for line in self.file]
    
    def close(self):
        """Close file."""
        self.file.close()

async def async_file_processing_example():
    """Example using async adapter."""
    async with AsyncAdapter(lambda: SyncFileProcessor('data.txt')) as processor:
        # All methods are now async
        all_lines = await processor.process_all()
        return all_lines

# Batch async adapter
class BatchAsyncAdapter:
    """Adapter for batching sync operations."""
    
    def __init__(self, sync_func, batch_size=10):
        self.sync_func = sync_func
        self.batch_size = batch_size
    
    async def process_batch(self, items):
        """Process items in batches asynchronously."""
        results = []
        
        for i in range(0, len(items), self.batch_size):
            batch = items[i:i + self.batch_size]
            
            # Process batch synchronously (wrapped in async)
            batch_processor = sync_to_async(self._process_sync_batch)
            batch_results = await batch_processor(batch)
            
            results.extend(batch_results)
            
            # Yield control to event loop between batches
            await asyncio.sleep(0)
        
        return results
    
    def _process_sync_batch(self, batch):
        """Process batch synchronously."""
        return [self.sync_func(item) for item in batch]

# Usage
def expensive_sync_operation(item):
    """Simulate expensive sync operation."""
    import time
    time.sleep(0.1)  # Simulate work
    return item * 2

async def batch_processing_example():
    """Example of batch processing."""
    items = list(range(100))
    
    adapter = BatchAsyncAdapter(expensive_sync_operation, batch_size=10)
    results = await adapter.process_batch(items)
    
    return results

# Retry async adapter
class RetryAsyncAdapter:
    """Adapter that adds retry logic to async operations."""
    
    def __init__(self, max_retries=3, delay=1.0, backoff=2.0):
        self.max_retries = max_retries
        self.delay = delay
        self.backoff = backoff
    
    async def execute_with_retry(self, async_func, *args, **kwargs):
        """Execute async function with retry logic."""
        last_exception = None
        current_delay = self.delay
        
        for attempt in range(self.max_retries + 1):
            try:
                return await async_func(*args, **kwargs)
            
            except Exception as e:
                last_exception = e
                
                if attempt < self.max_retries:
                    await asyncio.sleep(current_delay)
                    current_delay *= self.backoff
                else:
                    raise last_exception

# Usage
async def unreliable_async_operation():
    """Simulate unreliable async operation."""
    import random
    
    if random.random() < 0.7:  # 70% chance of failure
        raise Exception("Random failure")
    
    return "Success!"

async def retry_example():
    """Example using retry adapter."""
    retry_adapter = RetryAsyncAdapter(max_retries=5, delay=0.5)
    
    try:
        result = await retry_adapter.execute_with_retry(unreliable_async_operation)
        return result
    except Exception as e:
        return f"Failed after retries: {e}"

Performance Optimization with Adapters

# Performance-optimized async adapters
import asyncio
from concurrent.futures import ThreadPoolExecutor
from asgiref.sync import sync_to_async

class ThreadPoolAsyncAdapter:
    """Adapter using thread pool for CPU-intensive sync operations."""
    
    def __init__(self, max_workers=None):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
    
    async def run_in_thread(self, sync_func, *args, **kwargs):
        """Run sync function in thread pool."""
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            self.executor,
            lambda: sync_func(*args, **kwargs)
        )
    
    async def close(self):
        """Close thread pool."""
        self.executor.shutdown(wait=True)

# CPU-intensive operations
def cpu_intensive_task(n):
    """Simulate CPU-intensive task."""
    result = 0
    for i in range(n):
        result += i ** 2
    return result

async def optimized_cpu_task_example():
    """Example using thread pool adapter."""
    adapter = ThreadPoolAsyncAdapter(max_workers=4)
    
    try:
        # Run multiple CPU-intensive tasks concurrently
        tasks = [
            adapter.run_in_thread(cpu_intensive_task, 100000)
            for _ in range(10)
        ]
        
        results = await asyncio.gather(*tasks)
        return sum(results)
    
    finally:
        await adapter.close()

# Connection pooling adapter
class AsyncConnectionPoolAdapter:
    """Adapter with connection pooling for database operations."""
    
    def __init__(self, pool_size=10):
        self.pool_size = pool_size
        self.semaphore = asyncio.Semaphore(pool_size)
    
    async def execute_with_pool(self, db_operation, *args, **kwargs):
        """Execute database operation with connection pooling."""
        async with self.semaphore:
            # Convert sync DB operation to async
            async_operation = sync_to_async(db_operation)
            return await async_operation(*args, **kwargs)

# Usage
def complex_db_query():
    """Complex database query."""
    from django.db import connection
    
    with connection.cursor() as cursor:
        cursor.execute("""
            SELECT u.username, COUNT(p.id) as post_count
            FROM auth_user u
            LEFT JOIN blog_post p ON u.id = p.author_id
            GROUP BY u.id, u.username
            ORDER BY post_count DESC
            LIMIT 10
        """)
        return cursor.fetchall()

async def pooled_db_example():
    """Example using connection pool adapter."""
    adapter = AsyncConnectionPoolAdapter(pool_size=5)
    
    # Execute multiple queries concurrently with pooling
    tasks = [
        adapter.execute_with_pool(complex_db_query)
        for _ in range(20)
    ]
    
    results = await asyncio.gather(*tasks)
    return results[0]  # Return first result

# Caching adapter with async support
class AsyncCachingAdapter:
    """Adapter that adds caching to async operations."""
    
    def __init__(self):
        self.cache = {}
        self.locks = {}
        self.main_lock = asyncio.Lock()
    
    async def cached_execute(self, cache_key, async_func, *args, **kwargs):
        """Execute function with caching."""
        # Check cache first
        if cache_key in self.cache:
            return self.cache[cache_key]
        
        # Get or create lock for this cache key
        async with self.main_lock:
            if cache_key not in self.locks:
                self.locks[cache_key] = asyncio.Lock()
            key_lock = self.locks[cache_key]
        
        # Execute with key-specific lock (prevents duplicate work)
        async with key_lock:
            # Double-check cache
            if cache_key in self.cache:
                return self.cache[cache_key]
            
            # Execute function
            result = await async_func(*args, **kwargs)
            
            # Cache result
            self.cache[cache_key] = result
            
            return result
    
    def clear_cache(self, pattern=None):
        """Clear cache entries."""
        if pattern:
            keys_to_remove = [
                key for key in self.cache.keys()
                if pattern in key
            ]
            for key in keys_to_remove:
                del self.cache[key]
        else:
            self.cache.clear()

async def cached_operation_example():
    """Example using caching adapter."""
    cache_adapter = AsyncCachingAdapter()
    
    async def expensive_operation(value):
        """Simulate expensive async operation."""
        await asyncio.sleep(1)
        return value * 2
    
    # First call - will execute function
    result1 = await cache_adapter.cached_execute(
        'expensive_op_5',
        expensive_operation,
        5
    )
    
    # Second call - will use cached result
    result2 = await cache_adapter.cached_execute(
        'expensive_op_5',
        expensive_operation,
        5
    )
    
    return result1, result2  # Both should be 10

Django's asynchronous support enables building high-performance applications that can handle thousands of concurrent requests efficiently. The key principles are understanding when to use async (I/O-bound operations), ensuring async safety through proper synchronization, and using adapter functions to bridge sync and async code effectively. Start with simple async views and gradually implement more sophisticated patterns as your application's concurrency requirements grow.