Django's asynchronous support enables building high-performance applications that can handle concurrent operations efficiently. This chapter covers async views, async safety considerations, and adapter functions that bridge synchronous and asynchronous code.
Async views allow Django to handle multiple requests concurrently without blocking threads. They're particularly beneficial for I/O-bound operations like API calls, file operations, or database queries.
# Basic async view patterns
import asyncio
import aiohttp
from django.http import JsonResponse
from django.shortcuts import aget_object_or_404
from asgiref.sync import sync_to_async
from .models import Post, User
async def simple_async_view(request):
"""Basic async view example."""
# Simulate async I/O operation
await asyncio.sleep(0.1)
return JsonResponse({
'message': 'Hello from async view',
'timestamp': asyncio.get_event_loop().time()
})
async def async_database_view(request):
"""Async view with database operations."""
# Using Django's async ORM methods (Django 4.1+)
posts_count = await Post.objects.acount()
# Async iteration over queryset
recent_posts = []
async for post in Post.objects.filter(published=True)[:5]:
recent_posts.append({
'id': post.id,
'title': post.title,
'created_at': post.created_at.isoformat()
})
return JsonResponse({
'total_posts': posts_count,
'recent_posts': recent_posts
})
async def async_external_api_view(request):
"""Async view calling external APIs."""
async with aiohttp.ClientSession() as session:
# Multiple concurrent API calls
tasks = [
fetch_weather_data(session, 'New York'),
fetch_weather_data(session, 'London'),
fetch_weather_data(session, 'Tokyo')
]
results = await asyncio.gather(*tasks, return_exceptions=True)
weather_data = {}
for i, city in enumerate(['New York', 'London', 'Tokyo']):
if isinstance(results[i], Exception):
weather_data[city] = {'error': str(results[i])}
else:
weather_data[city] = results[i]
return JsonResponse({'weather': weather_data})
async def fetch_weather_data(session, city):
"""Fetch weather data for a city."""
url = f"https://api.weather.com/v1/current?city={city}"
try:
async with session.get(url) as response:
if response.status == 200:
return await response.json()
else:
return {'error': f'HTTP {response.status}'}
except Exception as e:
return {'error': str(e)}
# Advanced async view implementations
from django.views import View
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
import json
class AsyncBaseView(View):
"""Base class for async views with proper dispatch."""
async def dispatch(self, request, *args, **kwargs):
"""Async dispatch method."""
# Get the handler method
handler = getattr(self, request.method.lower(), self.http_method_not_allowed)
# Check if handler is async
if asyncio.iscoroutinefunction(handler):
return await handler(request, *args, **kwargs)
else:
# Handle sync methods in async context
return await sync_to_async(handler)(request, *args, **kwargs)
class AsyncAPIView(AsyncBaseView):
"""Generic async API view."""
async def get(self, request, *args, **kwargs):
"""Handle GET requests asynchronously."""
try:
data = await self.get_data(request, *args, **kwargs)
return JsonResponse(data)
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
async def post(self, request, *args, **kwargs):
"""Handle POST requests asynchronously."""
try:
# Read request body asynchronously
body = await request.aread()
data = json.loads(body) if body else {}
result = await self.create_data(request, data, *args, **kwargs)
return JsonResponse(result, status=201)
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
async def put(self, request, *args, **kwargs):
"""Handle PUT requests asynchronously."""
try:
body = await request.aread()
data = json.loads(body) if body else {}
result = await self.update_data(request, data, *args, **kwargs)
return JsonResponse(result)
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
async def delete(self, request, *args, **kwargs):
"""Handle DELETE requests asynchronously."""
try:
await self.delete_data(request, *args, **kwargs)
return JsonResponse({'message': 'Deleted successfully'}, status=204)
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
# Override these methods in subclasses
async def get_data(self, request, *args, **kwargs):
"""Override to implement GET logic."""
raise NotImplementedError
async def create_data(self, request, data, *args, **kwargs):
"""Override to implement POST logic."""
raise NotImplementedError
async def update_data(self, request, data, *args, **kwargs):
"""Override to implement PUT logic."""
raise NotImplementedError
async def delete_data(self, request, *args, **kwargs):
"""Override to implement DELETE logic."""
raise NotImplementedError
@method_decorator(csrf_exempt, name='dispatch')
class PostAPIView(AsyncAPIView):
"""Async API view for posts."""
async def get_data(self, request, *args, **kwargs):
"""Get posts data."""
post_id = kwargs.get('post_id')
if post_id:
# Get single post
post = await aget_object_or_404(Post, id=post_id, published=True)
return {
'id': post.id,
'title': post.title,
'content': post.content,
'created_at': post.created_at.isoformat()
}
else:
# Get list of posts
posts = []
async for post in Post.objects.filter(published=True)[:10]:
posts.append({
'id': post.id,
'title': post.title,
'created_at': post.created_at.isoformat()
})
return {'posts': posts}
async def create_data(self, request, data, *args, **kwargs):
"""Create new post."""
# Use sync_to_async for complex database operations
@sync_to_async
def create_post():
return Post.objects.create(
title=data['title'],
content=data['content'],
author_id=data['author_id'],
published=data.get('published', False)
)
post = await create_post()
return {
'id': post.id,
'title': post.title,
'message': 'Post created successfully'
}
async def update_data(self, request, data, *args, **kwargs):
"""Update existing post."""
post_id = kwargs.get('post_id')
@sync_to_async
def update_post():
post = Post.objects.get(id=post_id)
post.title = data.get('title', post.title)
post.content = data.get('content', post.content)
post.published = data.get('published', post.published)
post.save()
return post
post = await update_post()
return {
'id': post.id,
'title': post.title,
'message': 'Post updated successfully'
}
async def delete_data(self, request, *args, **kwargs):
"""Delete post."""
post_id = kwargs.get('post_id')
@sync_to_async
def delete_post():
post = Post.objects.get(id=post_id)
post.delete()
await delete_post()
# middleware/async_middleware.py
import asyncio
import time
from django.utils.deprecation import MiddlewareMixin
class AsyncTimingMiddleware:
"""Async middleware for request timing."""
def __init__(self, get_response):
self.get_response = get_response
async def __call__(self, request):
"""Process request asynchronously."""
start_time = time.time()
# Process request
response = await self.get_response(request)
# Calculate processing time
processing_time = time.time() - start_time
# Add timing header
response['X-Processing-Time'] = f"{processing_time:.4f}s"
return response
class AsyncLoggingMiddleware:
"""Async middleware for request logging."""
def __init__(self, get_response):
self.get_response = get_response
async def __call__(self, request):
"""Log requests asynchronously."""
# Log request start
await self.log_request_start(request)
# Process request
response = await self.get_response(request)
# Log request completion
await self.log_request_end(request, response)
return response
async def log_request_start(self, request):
"""Log request start asynchronously."""
# Use sync_to_async for database logging
@sync_to_async
def log_to_db():
# Log to database or external service
print(f"Request started: {request.method} {request.path}")
await log_to_db()
async def log_request_end(self, request, response):
"""Log request completion asynchronously."""
@sync_to_async
def log_to_db():
print(f"Request completed: {request.method} {request.path} - {response.status_code}")
await log_to_db()
class AsyncRateLimitMiddleware:
"""Async rate limiting middleware."""
def __init__(self, get_response):
self.get_response = get_response
self.rate_limits = {} # In production, use Redis or similar
async def __call__(self, request):
"""Apply rate limiting asynchronously."""
client_ip = self.get_client_ip(request)
# Check rate limit
if await self.is_rate_limited(client_ip):
from django.http import JsonResponse
return JsonResponse(
{'error': 'Rate limit exceeded'},
status=429
)
# Process request
response = await self.get_response(request)
# Update rate limit counter
await self.update_rate_limit(client_ip)
return response
def get_client_ip(self, request):
"""Get client IP address."""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
return x_forwarded_for.split(',')[0]
return request.META.get('REMOTE_ADDR')
async def is_rate_limited(self, client_ip):
"""Check if client is rate limited."""
# Simulate async rate limit check
await asyncio.sleep(0.001)
current_time = time.time()
window_start = current_time - 60 # 1-minute window
# Clean old entries
if client_ip in self.rate_limits:
self.rate_limits[client_ip] = [
timestamp for timestamp in self.rate_limits[client_ip]
if timestamp > window_start
]
# Check current count
current_count = len(self.rate_limits.get(client_ip, []))
return current_count >= 100 # 100 requests per minute
async def update_rate_limit(self, client_ip):
"""Update rate limit counter."""
await asyncio.sleep(0.001) # Simulate async operation
current_time = time.time()
if client_ip not in self.rate_limits:
self.rate_limits[client_ip] = []
self.rate_limits[client_ip].append(current_time)
Async safety refers to the ability of code to work correctly in asynchronous contexts without causing race conditions, deadlocks, or other concurrency issues.
# Thread-safe vs Async-safe patterns
import asyncio
import threading
from django.core.cache import cache
from asgiref.sync import sync_to_async
# NOT async-safe: Global state without protection
counter = 0
async def unsafe_increment():
"""NOT async-safe - race condition possible."""
global counter
current = counter
await asyncio.sleep(0.001) # Simulate async operation
counter = current + 1
# Async-safe: Using asyncio.Lock
async_lock = asyncio.Lock()
safe_counter = 0
async def safe_increment():
"""Async-safe increment with lock."""
global safe_counter
async with async_lock:
current = safe_counter
await asyncio.sleep(0.001)
safe_counter = current + 1
# Async-safe: Using local state
async def local_state_example():
"""Async-safe using local variables."""
local_counter = 0
async def increment():
nonlocal local_counter
local_counter += 1
# Multiple concurrent calls are safe
await asyncio.gather(*[increment() for _ in range(100)])
return local_counter
# Database operations safety
class AsyncSafeUserService:
"""Async-safe user service."""
def __init__(self):
self._lock = asyncio.Lock()
async def create_user_safely(self, username, email):
"""Create user with async safety."""
async with self._lock:
# Check if user exists
exists = await sync_to_async(
User.objects.filter(username=username).exists
)()
if exists:
raise ValueError(f"User {username} already exists")
# Create user
user = await sync_to_async(User.objects.create)(
username=username,
email=email
)
return user
async def update_user_profile(self, user_id, **kwargs):
"""Update user profile safely."""
@sync_to_async
def update_profile():
try:
user = User.objects.select_for_update().get(id=user_id)
for field, value in kwargs.items():
if hasattr(user, field):
setattr(user, field, value)
user.save()
return user
except User.DoesNotExist:
return None
return await update_profile()
# Async-safe singleton pattern
class AsyncSafeSingleton:
"""Async-safe singleton implementation."""
_instance = None
_lock = asyncio.Lock()
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
@classmethod
async def get_instance(cls):
"""Get singleton instance safely."""
if cls._instance is None:
async with cls._lock:
if cls._instance is None:
cls._instance = cls()
await cls._instance.initialize()
return cls._instance
async def initialize(self):
"""Initialize singleton asynchronously."""
# Async initialization code
await asyncio.sleep(0.1)
self.initialized = True
# Async-safe connection pool
class AsyncConnectionPool:
"""Async-safe connection pool."""
def __init__(self, max_connections=10):
self.max_connections = max_connections
self.connections = asyncio.Queue(maxsize=max_connections)
self.created_connections = 0
self._lock = asyncio.Lock()
async def get_connection(self):
"""Get connection from pool."""
try:
# Try to get existing connection
connection = self.connections.get_nowait()
return connection
except asyncio.QueueEmpty:
# Create new connection if under limit
async with self._lock:
if self.created_connections < self.max_connections:
connection = await self.create_connection()
self.created_connections += 1
return connection
# Wait for available connection
return await self.connections.get()
async def return_connection(self, connection):
"""Return connection to pool."""
if connection.is_healthy():
await self.connections.put(connection)
else:
# Replace unhealthy connection
async with self._lock:
self.created_connections -= 1
async def create_connection(self):
"""Create new connection."""
# Simulate connection creation
await asyncio.sleep(0.1)
return MockConnection()
class MockConnection:
"""Mock connection for example."""
def __init__(self):
self.healthy = True
def is_healthy(self):
return self.healthy
# Async-safe caching
class AsyncSafeCache:
"""Async-safe cache implementation."""
def __init__(self):
self._cache = {}
self._locks = {}
self._main_lock = asyncio.Lock()
async def get(self, key):
"""Get value from cache."""
return self._cache.get(key)
async def set(self, key, value, ttl=None):
"""Set value in cache with async safety."""
# Get or create lock for this key
async with self._main_lock:
if key not in self._locks:
self._locks[key] = asyncio.Lock()
key_lock = self._locks[key]
# Set value with key-specific lock
async with key_lock:
self._cache[key] = {
'value': value,
'expires': time.time() + ttl if ttl else None
}
async def get_or_set(self, key, factory_func, ttl=None):
"""Get value or set using factory function."""
# Check if value exists and is not expired
cached = await self.get(key)
if cached and (not cached['expires'] or cached['expires'] > time.time()):
return cached['value']
# Get or create lock for this key
async with self._main_lock:
if key not in self._locks:
self._locks[key] = asyncio.Lock()
key_lock = self._locks[key]
# Double-check pattern
async with key_lock:
cached = await self.get(key)
if cached and (not cached['expires'] or cached['expires'] > time.time()):
return cached['value']
# Generate new value
if asyncio.iscoroutinefunction(factory_func):
value = await factory_func()
else:
value = factory_func()
await self.set(key, value, ttl)
return value
# Common async pitfalls and solutions
import asyncio
from django.db import transaction
from asgiref.sync import sync_to_async
# PITFALL 1: Blocking operations in async code
async def bad_blocking_example():
"""BAD: Blocking operation in async function."""
import time
time.sleep(1) # This blocks the entire event loop!
return "Done"
async def good_non_blocking_example():
"""GOOD: Non-blocking equivalent."""
await asyncio.sleep(1) # This doesn't block other coroutines
return "Done"
# PITFALL 2: Not awaiting coroutines
async def bad_not_awaiting():
"""BAD: Not awaiting coroutines."""
result = asyncio.sleep(1) # Returns coroutine object, not result
return result # This returns a coroutine, not "Done"
async def good_awaiting():
"""GOOD: Properly awaiting coroutines."""
await asyncio.sleep(1)
return "Done"
# PITFALL 3: Using sync database operations incorrectly
async def bad_sync_db_usage():
"""BAD: Sync database operations in async context."""
# This can cause database connection issues
users = User.objects.all() # Sync operation
return list(users)
async def good_async_db_usage():
"""GOOD: Proper async database usage."""
# Option 1: Use async ORM methods
users = []
async for user in User.objects.all():
users.append(user)
return users
async def good_sync_to_async_usage():
"""GOOD: Using sync_to_async wrapper."""
@sync_to_async
def get_users():
return list(User.objects.all())
users = await get_users()
return users
# PITFALL 4: Incorrect transaction handling
async def bad_transaction_usage():
"""BAD: Incorrect async transaction usage."""
with transaction.atomic(): # This won't work properly in async
user = await sync_to_async(User.objects.create)(username='test')
# More operations...
async def good_transaction_usage():
"""GOOD: Proper async transaction handling."""
@sync_to_async
def create_user_with_transaction():
with transaction.atomic():
user = User.objects.create(username='test')
# More operations...
return user
user = await create_user_with_transaction()
return user
# PITFALL 5: Shared mutable state without protection
class BadAsyncCounter:
"""BAD: Unprotected shared state."""
def __init__(self):
self.count = 0
async def increment(self):
current = self.count
await asyncio.sleep(0.001) # Simulate async work
self.count = current + 1 # Race condition!
class GoodAsyncCounter:
"""GOOD: Protected shared state."""
def __init__(self):
self.count = 0
self._lock = asyncio.Lock()
async def increment(self):
async with self._lock:
current = self.count
await asyncio.sleep(0.001)
self.count = current + 1
# PITFALL 6: Not handling exceptions properly
async def bad_exception_handling():
"""BAD: Not handling async exceptions."""
tasks = [
risky_async_operation(i) for i in range(10)
]
# If any task fails, all results are lost
results = await asyncio.gather(*tasks)
return results
async def good_exception_handling():
"""GOOD: Proper async exception handling."""
tasks = [
risky_async_operation(i) for i in range(10)
]
# Handle exceptions gracefully
results = await asyncio.gather(*tasks, return_exceptions=True)
successful_results = []
errors = []
for i, result in enumerate(results):
if isinstance(result, Exception):
errors.append({'index': i, 'error': str(result)})
else:
successful_results.append(result)
return {
'successful': successful_results,
'errors': errors
}
async def risky_async_operation(value):
"""Simulate risky async operation."""
await asyncio.sleep(0.1)
if value % 3 == 0:
raise ValueError(f"Value {value} is divisible by 3")
return value * 2
# Using Django's async adapters
from asgiref.sync import sync_to_async, async_to_sync
from django.contrib.auth.models import User
from django.db import transaction
import asyncio
# Basic sync_to_async usage
@sync_to_async
def get_user_by_username(username):
"""Convert sync function to async."""
return User.objects.get(username=username)
async def async_user_lookup():
"""Use sync function in async context."""
user = await get_user_by_username('admin')
return user
# Method conversion
class UserService:
"""Service with both sync and async methods."""
def get_user_sync(self, user_id):
"""Synchronous user retrieval."""
return User.objects.get(id=user_id)
# Convert method to async
async def get_user_async(self, user_id):
"""Asynchronous user retrieval."""
get_user = sync_to_async(self.get_user_sync)
return await get_user(user_id)
# Alternative: Direct conversion
def __init__(self):
self.get_user_async_alt = sync_to_async(self.get_user_sync)
# Complex database operations
class AsyncUserManager:
"""Async user manager with complex operations."""
@sync_to_async
def create_user_with_profile(self, username, email, profile_data):
"""Create user with profile in transaction."""
with transaction.atomic():
user = User.objects.create(
username=username,
email=email
)
# Create related profile
from .models import UserProfile
UserProfile.objects.create(
user=user,
**profile_data
)
return user
@sync_to_async
def bulk_update_users(self, user_updates):
"""Bulk update users."""
users_to_update = []
for user_id, updates in user_updates.items():
try:
user = User.objects.get(id=user_id)
for field, value in updates.items():
setattr(user, field, value)
users_to_update.append(user)
except User.DoesNotExist:
continue
User.objects.bulk_update(
users_to_update,
['email', 'first_name', 'last_name']
)
return len(users_to_update)
async def process_user_batch(self, user_data_list):
"""Process multiple users concurrently."""
tasks = []
for user_data in user_data_list:
task = self.create_user_with_profile(
user_data['username'],
user_data['email'],
user_data['profile']
)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
successful = []
failed = []
for i, result in enumerate(results):
if isinstance(result, Exception):
failed.append({
'data': user_data_list[i],
'error': str(result)
})
else:
successful.append(result)
return {
'successful': successful,
'failed': failed
}
# Using async_to_sync (less common in views)
def sync_view_calling_async(request):
"""Sync view that needs to call async function."""
async def fetch_external_data():
"""Async function to fetch external data."""
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data') as response:
return await response.json()
# Convert async function to sync
sync_fetch = async_to_sync(fetch_external_data)
data = sync_fetch()
return JsonResponse(data)
# Custom async adapters and decorators
import functools
from typing import Callable, Any
from asgiref.sync import sync_to_async
def async_method(func: Callable) -> Callable:
"""Decorator to convert sync method to async."""
@functools.wraps(func)
async def wrapper(*args, **kwargs):
sync_func = sync_to_async(func)
return await sync_func(*args, **kwargs)
return wrapper
def cached_async_method(cache_key_func: Callable = None, ttl: int = 300):
"""Decorator for caching async method results."""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(*args, **kwargs):
# Generate cache key
if cache_key_func:
cache_key = cache_key_func(*args, **kwargs)
else:
cache_key = f"{func.__name__}:{hash(str(args) + str(kwargs))}"
# Try to get from cache
from django.core.cache import cache
cached_result = await sync_to_async(cache.get)(cache_key)
if cached_result is not None:
return cached_result
# Execute function
result = await func(*args, **kwargs)
# Cache result
await sync_to_async(cache.set)(cache_key, result, ttl)
return result
return wrapper
return decorator
class AsyncAdapter:
"""Generic async adapter for sync classes."""
def __init__(self, sync_class):
self.sync_class = sync_class
self._instance = None
async def __aenter__(self):
"""Async context manager entry."""
create_instance = sync_to_async(self.sync_class)
self._instance = await create_instance()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
if hasattr(self._instance, 'close'):
close_method = sync_to_async(self._instance.close)
await close_method()
def __getattr__(self, name):
"""Convert sync methods to async on demand."""
if self._instance is None:
raise RuntimeError("Adapter not initialized. Use async with statement.")
attr = getattr(self._instance, name)
if callable(attr):
return sync_to_async(attr)
else:
return attr
# Usage examples
class SyncFileProcessor:
"""Sync file processor."""
def __init__(self, file_path):
self.file_path = file_path
self.file = open(file_path, 'r')
def process_line(self, line):
"""Process a single line."""
return line.strip().upper()
def process_all(self):
"""Process all lines."""
return [self.process_line(line) for line in self.file]
def close(self):
"""Close file."""
self.file.close()
async def async_file_processing_example():
"""Example using async adapter."""
async with AsyncAdapter(lambda: SyncFileProcessor('data.txt')) as processor:
# All methods are now async
all_lines = await processor.process_all()
return all_lines
# Batch async adapter
class BatchAsyncAdapter:
"""Adapter for batching sync operations."""
def __init__(self, sync_func, batch_size=10):
self.sync_func = sync_func
self.batch_size = batch_size
async def process_batch(self, items):
"""Process items in batches asynchronously."""
results = []
for i in range(0, len(items), self.batch_size):
batch = items[i:i + self.batch_size]
# Process batch synchronously (wrapped in async)
batch_processor = sync_to_async(self._process_sync_batch)
batch_results = await batch_processor(batch)
results.extend(batch_results)
# Yield control to event loop between batches
await asyncio.sleep(0)
return results
def _process_sync_batch(self, batch):
"""Process batch synchronously."""
return [self.sync_func(item) for item in batch]
# Usage
def expensive_sync_operation(item):
"""Simulate expensive sync operation."""
import time
time.sleep(0.1) # Simulate work
return item * 2
async def batch_processing_example():
"""Example of batch processing."""
items = list(range(100))
adapter = BatchAsyncAdapter(expensive_sync_operation, batch_size=10)
results = await adapter.process_batch(items)
return results
# Retry async adapter
class RetryAsyncAdapter:
"""Adapter that adds retry logic to async operations."""
def __init__(self, max_retries=3, delay=1.0, backoff=2.0):
self.max_retries = max_retries
self.delay = delay
self.backoff = backoff
async def execute_with_retry(self, async_func, *args, **kwargs):
"""Execute async function with retry logic."""
last_exception = None
current_delay = self.delay
for attempt in range(self.max_retries + 1):
try:
return await async_func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < self.max_retries:
await asyncio.sleep(current_delay)
current_delay *= self.backoff
else:
raise last_exception
# Usage
async def unreliable_async_operation():
"""Simulate unreliable async operation."""
import random
if random.random() < 0.7: # 70% chance of failure
raise Exception("Random failure")
return "Success!"
async def retry_example():
"""Example using retry adapter."""
retry_adapter = RetryAsyncAdapter(max_retries=5, delay=0.5)
try:
result = await retry_adapter.execute_with_retry(unreliable_async_operation)
return result
except Exception as e:
return f"Failed after retries: {e}"
# Performance-optimized async adapters
import asyncio
from concurrent.futures import ThreadPoolExecutor
from asgiref.sync import sync_to_async
class ThreadPoolAsyncAdapter:
"""Adapter using thread pool for CPU-intensive sync operations."""
def __init__(self, max_workers=None):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
async def run_in_thread(self, sync_func, *args, **kwargs):
"""Run sync function in thread pool."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor,
lambda: sync_func(*args, **kwargs)
)
async def close(self):
"""Close thread pool."""
self.executor.shutdown(wait=True)
# CPU-intensive operations
def cpu_intensive_task(n):
"""Simulate CPU-intensive task."""
result = 0
for i in range(n):
result += i ** 2
return result
async def optimized_cpu_task_example():
"""Example using thread pool adapter."""
adapter = ThreadPoolAsyncAdapter(max_workers=4)
try:
# Run multiple CPU-intensive tasks concurrently
tasks = [
adapter.run_in_thread(cpu_intensive_task, 100000)
for _ in range(10)
]
results = await asyncio.gather(*tasks)
return sum(results)
finally:
await adapter.close()
# Connection pooling adapter
class AsyncConnectionPoolAdapter:
"""Adapter with connection pooling for database operations."""
def __init__(self, pool_size=10):
self.pool_size = pool_size
self.semaphore = asyncio.Semaphore(pool_size)
async def execute_with_pool(self, db_operation, *args, **kwargs):
"""Execute database operation with connection pooling."""
async with self.semaphore:
# Convert sync DB operation to async
async_operation = sync_to_async(db_operation)
return await async_operation(*args, **kwargs)
# Usage
def complex_db_query():
"""Complex database query."""
from django.db import connection
with connection.cursor() as cursor:
cursor.execute("""
SELECT u.username, COUNT(p.id) as post_count
FROM auth_user u
LEFT JOIN blog_post p ON u.id = p.author_id
GROUP BY u.id, u.username
ORDER BY post_count DESC
LIMIT 10
""")
return cursor.fetchall()
async def pooled_db_example():
"""Example using connection pool adapter."""
adapter = AsyncConnectionPoolAdapter(pool_size=5)
# Execute multiple queries concurrently with pooling
tasks = [
adapter.execute_with_pool(complex_db_query)
for _ in range(20)
]
results = await asyncio.gather(*tasks)
return results[0] # Return first result
# Caching adapter with async support
class AsyncCachingAdapter:
"""Adapter that adds caching to async operations."""
def __init__(self):
self.cache = {}
self.locks = {}
self.main_lock = asyncio.Lock()
async def cached_execute(self, cache_key, async_func, *args, **kwargs):
"""Execute function with caching."""
# Check cache first
if cache_key in self.cache:
return self.cache[cache_key]
# Get or create lock for this cache key
async with self.main_lock:
if cache_key not in self.locks:
self.locks[cache_key] = asyncio.Lock()
key_lock = self.locks[cache_key]
# Execute with key-specific lock (prevents duplicate work)
async with key_lock:
# Double-check cache
if cache_key in self.cache:
return self.cache[cache_key]
# Execute function
result = await async_func(*args, **kwargs)
# Cache result
self.cache[cache_key] = result
return result
def clear_cache(self, pattern=None):
"""Clear cache entries."""
if pattern:
keys_to_remove = [
key for key in self.cache.keys()
if pattern in key
]
for key in keys_to_remove:
del self.cache[key]
else:
self.cache.clear()
async def cached_operation_example():
"""Example using caching adapter."""
cache_adapter = AsyncCachingAdapter()
async def expensive_operation(value):
"""Simulate expensive async operation."""
await asyncio.sleep(1)
return value * 2
# First call - will execute function
result1 = await cache_adapter.cached_execute(
'expensive_op_5',
expensive_operation,
5
)
# Second call - will use cached result
result2 = await cache_adapter.cached_execute(
'expensive_op_5',
expensive_operation,
5
)
return result1, result2 # Both should be 10
Django's asynchronous support enables building high-performance applications that can handle thousands of concurrent requests efficiently. The key principles are understanding when to use async (I/O-bound operations), ensuring async safety through proper synchronization, and using adapter functions to bridge sync and async code effectively. Start with simple async views and gradually implement more sophisticated patterns as your application's concurrency requirements grow.
Background Tasks with Celery or RQ
Background task processing is essential for building responsive async Django applications. While async views handle concurrent requests efficiently, long-running operations like file processing, email sending, data analysis, or external API calls should be offloaded to background workers. This chapter covers integrating Celery and RQ with async Django applications, implementing task queues, scheduling, monitoring, and async task patterns.
Django's Tasks Framework
Django's Tasks framework provides a built-in solution for handling background tasks without requiring external dependencies like Celery or RQ. This framework enables you to offload time-consuming operations from request-response cycles, improving application performance and user experience.