Django's async views enable handling high-concurrency scenarios efficiently by allowing views to perform I/O operations without blocking the server thread. This chapter covers implementing async views, handling concurrent operations, integrating with external services, and optimizing performance for I/O-bound operations.
# views.py
import asyncio
import aiohttp
from django.http import JsonResponse
from django.shortcuts import aget_object_or_404
from django.views.decorators.http import require_http_methods
from django.views.decorators.csrf import csrf_exempt
from asgiref.sync import sync_to_async
from .models import Post, User
async def async_hello(request):
"""Basic async view."""
await asyncio.sleep(0.1) # Simulate async operation
return JsonResponse({'message': 'Hello from async view!'})
async def async_post_list(request):
"""Async view with database operations."""
# Using async database operations (Django 4.1+)
posts = []
async for post in Post.objects.filter(published=True):
posts.append({
'id': post.id,
'title': post.title,
'created_at': post.created_at.isoformat()
})
return JsonResponse({'posts': posts})
async def async_post_detail(request, post_id):
"""Async view with get_object_or_404 equivalent."""
try:
# Using aget_object_or_404 for async get operations
post = await aget_object_or_404(Post, id=post_id, published=True)
# Fetch related data concurrently
author_task = sync_to_async(lambda: post.author)()
comments_task = sync_to_async(lambda: list(post.comments.all()))()
author, comments = await asyncio.gather(author_task, comments_task)
return JsonResponse({
'id': post.id,
'title': post.title,
'content': post.content,
'author': author.username,
'comments_count': len(comments)
})
except Post.DoesNotExist:
return JsonResponse({'error': 'Post not found'}, status=404)
@require_http_methods(["GET", "POST"])
async def async_api_endpoint(request):
"""Async view handling multiple HTTP methods."""
if request.method == 'GET':
# Simulate fetching data from external API
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data') as response:
if response.status == 200:
data = await response.json()
return JsonResponse(data)
else:
return JsonResponse({'error': 'External API error'}, status=502)
elif request.method == 'POST':
# Handle POST request asynchronously
import json
try:
body = await request.aread()
data = json.loads(body)
# Process data asynchronously
result = await process_data_async(data)
return JsonResponse({'result': result, 'status': 'success'})
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
async def process_data_async(data):
"""Async data processing function."""
# Simulate async processing
await asyncio.sleep(0.5)
return f"Processed: {data.get('message', 'No message')}"
# views.py
from django.views import View
from django.http import JsonResponse
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
import asyncio
import json
class AsyncView(View):
"""Base class for async views."""
async def dispatch(self, request, *args, **kwargs):
"""Async dispatch method."""
handler = getattr(self, request.method.lower(), self.http_method_not_allowed)
if asyncio.iscoroutinefunction(handler):
return await handler(request, *args, **kwargs)
else:
# Handle sync methods
return handler(request, *args, **kwargs)
class AsyncPostListView(AsyncView):
"""Async class-based view for post listing."""
async def get(self, request):
"""Handle GET requests asynchronously."""
page = int(request.GET.get('page', 1))
limit = int(request.GET.get('limit', 10))
# Calculate offset
offset = (page - 1) * limit
# Fetch posts asynchronously
posts = []
queryset = Post.objects.filter(published=True)[offset:offset + limit]
async for post in queryset:
posts.append({
'id': post.id,
'title': post.title,
'slug': post.slug,
'created_at': post.created_at.isoformat()
})
# Get total count asynchronously
total_count = await Post.objects.filter(published=True).acount()
return JsonResponse({
'posts': posts,
'pagination': {
'page': page,
'limit': limit,
'total': total_count,
'has_next': offset + limit < total_count
}
})
@method_decorator(csrf_exempt, name='dispatch')
class AsyncAPIView(AsyncView):
"""Generic async API view."""
async def get(self, request, *args, **kwargs):
"""Handle GET requests."""
return await self.list(request, *args, **kwargs)
async def post(self, request, *args, **kwargs):
"""Handle POST requests."""
return await self.create(request, *args, **kwargs)
async def put(self, request, *args, **kwargs):
"""Handle PUT requests."""
return await self.update(request, *args, **kwargs)
async def delete(self, request, *args, **kwargs):
"""Handle DELETE requests."""
return await self.destroy(request, *args, **kwargs)
async def list(self, request, *args, **kwargs):
"""List resources."""
return JsonResponse({'message': 'List method not implemented'})
async def create(self, request, *args, **kwargs):
"""Create resource."""
try:
body = await request.aread()
data = json.loads(body)
# Validate and create resource
result = await self.perform_create(data)
return JsonResponse(result, status=201)
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
async def update(self, request, *args, **kwargs):
"""Update resource."""
try:
body = await request.aread()
data = json.loads(body)
result = await self.perform_update(data, *args, **kwargs)
return JsonResponse(result)
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
async def destroy(self, request, *args, **kwargs):
"""Delete resource."""
try:
await self.perform_destroy(*args, **kwargs)
return JsonResponse({'message': 'Resource deleted'}, status=204)
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
async def perform_create(self, data):
"""Override in subclasses."""
raise NotImplementedError
async def perform_update(self, data, *args, **kwargs):
"""Override in subclasses."""
raise NotImplementedError
async def perform_destroy(self, *args, **kwargs):
"""Override in subclasses."""
raise NotImplementedError
class PostAPIView(AsyncAPIView):
"""Async API view for posts."""
async def list(self, request):
"""List posts asynchronously."""
posts = []
async for post in Post.objects.filter(published=True):
posts.append({
'id': post.id,
'title': post.title,
'slug': post.slug
})
return JsonResponse({'posts': posts})
async def perform_create(self, data):
"""Create post asynchronously."""
# Note: Django's async ORM support is limited
# Use sync_to_async for complex operations
from asgiref.sync import sync_to_async
@sync_to_async
def create_post():
return Post.objects.create(
title=data['title'],
content=data['content'],
published=data.get('published', False)
)
post = await create_post()
return {
'id': post.id,
'title': post.title,
'slug': post.slug
}
async def perform_update(self, data, post_id):
"""Update post asynchronously."""
from asgiref.sync import sync_to_async
@sync_to_async
def update_post():
post = Post.objects.get(id=post_id)
post.title = data.get('title', post.title)
post.content = data.get('content', post.content)
post.published = data.get('published', post.published)
post.save()
return post
post = await update_post()
return {
'id': post.id,
'title': post.title,
'slug': post.slug
}
async def perform_destroy(self, post_id):
"""Delete post asynchronously."""
from asgiref.sync import sync_to_async
@sync_to_async
def delete_post():
post = Post.objects.get(id=post_id)
post.delete()
await delete_post()
# views.py
import aiohttp
import asyncio
from django.http import JsonResponse
async def fetch_user_data(session, user_id):
"""Fetch user data from external API."""
url = f"https://api.example.com/users/{user_id}"
try:
async with session.get(url) as response:
if response.status == 200:
return await response.json()
else:
return None
except Exception as e:
print(f"Error fetching user {user_id}: {e}")
return None
async def fetch_user_posts(session, user_id):
"""Fetch user posts from external API."""
url = f"https://api.example.com/users/{user_id}/posts"
try:
async with session.get(url) as response:
if response.status == 200:
return await response.json()
else:
return []
except Exception as e:
print(f"Error fetching posts for user {user_id}: {e}")
return []
async def fetch_user_followers(session, user_id):
"""Fetch user followers from external API."""
url = f"https://api.example.com/users/{user_id}/followers"
try:
async with session.get(url) as response:
if response.status == 200:
return await response.json()
else:
return []
except Exception as e:
print(f"Error fetching followers for user {user_id}: {e}")
return []
async def user_dashboard(request, user_id):
"""Async view that fetches data from multiple sources concurrently."""
async with aiohttp.ClientSession() as session:
# Fetch all data concurrently
user_data_task = fetch_user_data(session, user_id)
user_posts_task = fetch_user_posts(session, user_id)
user_followers_task = fetch_user_followers(session, user_id)
# Wait for all tasks to complete
user_data, user_posts, user_followers = await asyncio.gather(
user_data_task,
user_posts_task,
user_followers_task,
return_exceptions=True
)
# Handle exceptions
if isinstance(user_data, Exception):
user_data = None
if isinstance(user_posts, Exception):
user_posts = []
if isinstance(user_followers, Exception):
user_followers = []
# Combine results
dashboard_data = {
'user': user_data,
'posts': user_posts,
'followers': user_followers,
'stats': {
'posts_count': len(user_posts) if user_posts else 0,
'followers_count': len(user_followers) if user_followers else 0
}
}
return JsonResponse(dashboard_data)
async def batch_process_users(request):
"""Process multiple users concurrently."""
user_ids = request.GET.get('user_ids', '').split(',')
if not user_ids or user_ids == ['']:
return JsonResponse({'error': 'No user IDs provided'}, status=400)
async with aiohttp.ClientSession() as session:
# Create tasks for all users
tasks = []
for user_id in user_ids:
if user_id.strip():
task = fetch_user_data(session, user_id.strip())
tasks.append(task)
# Execute all tasks concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
processed_users = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_users.append({
'user_id': user_ids[i],
'error': str(result)
})
else:
processed_users.append({
'user_id': user_ids[i],
'data': result
})
return JsonResponse({
'processed_users': processed_users,
'total_processed': len(processed_users)
})
# views.py
import asyncio
from django.http import JsonResponse
from asgiref.sync import sync_to_async
from django.db import transaction
from .models import Post, Comment, User
async def concurrent_database_operations(request):
"""Perform multiple database operations concurrently."""
# Define async database operations
@sync_to_async
def get_recent_posts():
return list(Post.objects.filter(published=True).order_by('-created_at')[:10])
@sync_to_async
def get_popular_posts():
return list(Post.objects.filter(published=True).order_by('-views')[:10])
@sync_to_async
def get_recent_comments():
return list(Comment.objects.select_related('post').order_by('-created_at')[:10])
@sync_to_async
def get_active_users():
return list(User.objects.filter(is_active=True).order_by('-last_login')[:10])
# Execute all queries concurrently
recent_posts, popular_posts, recent_comments, active_users = await asyncio.gather(
get_recent_posts(),
get_popular_posts(),
get_recent_comments(),
get_active_users()
)
# Format response
return JsonResponse({
'recent_posts': [{'id': p.id, 'title': p.title} for p in recent_posts],
'popular_posts': [{'id': p.id, 'title': p.title, 'views': p.views} for p in popular_posts],
'recent_comments': [{'id': c.id, 'post_title': c.post.title} for c in recent_comments],
'active_users': [{'id': u.id, 'username': u.username} for u in active_users]
})
async def bulk_create_posts(request):
"""Create multiple posts concurrently."""
import json
try:
body = await request.aread()
data = json.loads(body)
posts_data = data.get('posts', [])
if not posts_data:
return JsonResponse({'error': 'No posts data provided'}, status=400)
# Define async post creation function
@sync_to_async
def create_post(post_data):
return Post.objects.create(
title=post_data['title'],
content=post_data['content'],
published=post_data.get('published', False)
)
# Create all posts concurrently
tasks = [create_post(post_data) for post_data in posts_data]
created_posts = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
successful_posts = []
failed_posts = []
for i, result in enumerate(created_posts):
if isinstance(result, Exception):
failed_posts.append({
'index': i,
'error': str(result),
'data': posts_data[i]
})
else:
successful_posts.append({
'id': result.id,
'title': result.title,
'slug': result.slug
})
return JsonResponse({
'successful_posts': successful_posts,
'failed_posts': failed_posts,
'total_created': len(successful_posts)
})
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
# services/external_api.py
import aiohttp
import asyncio
from typing import Optional, Dict, Any
from django.conf import settings
class AsyncAPIClient:
"""Async client for external API integration."""
def __init__(self, base_url: str, timeout: int = 30):
self.base_url = base_url.rstrip('/')
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.session = None
async def __aenter__(self):
"""Async context manager entry."""
self.session = aiohttp.ClientSession(
timeout=self.timeout,
headers={'User-Agent': 'Django-AsyncClient/1.0'}
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
if self.session:
await self.session.close()
async def get(self, endpoint: str, params: Optional[Dict] = None) -> Optional[Dict[str, Any]]:
"""Make async GET request."""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
try:
async with self.session.get(url, params=params) as response:
if response.status == 200:
return await response.json()
else:
print(f"GET {url} returned {response.status}")
return None
except asyncio.TimeoutError:
print(f"Timeout for GET {url}")
return None
except Exception as e:
print(f"Error in GET {url}: {e}")
return None
async def post(self, endpoint: str, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Make async POST request."""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
try:
async with self.session.post(url, json=data) as response:
if response.status in [200, 201]:
return await response.json()
else:
print(f"POST {url} returned {response.status}")
return None
except asyncio.TimeoutError:
print(f"Timeout for POST {url}")
return None
except Exception as e:
print(f"Error in POST {url}: {e}")
return None
async def put(self, endpoint: str, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Make async PUT request."""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
try:
async with self.session.put(url, json=data) as response:
if response.status == 200:
return await response.json()
else:
print(f"PUT {url} returned {response.status}")
return None
except asyncio.TimeoutError:
print(f"Timeout for PUT {url}")
return None
except Exception as e:
print(f"Error in PUT {url}: {e}")
return None
async def delete(self, endpoint: str) -> bool:
"""Make async DELETE request."""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
try:
async with self.session.delete(url) as response:
return response.status in [200, 204]
except asyncio.TimeoutError:
print(f"Timeout for DELETE {url}")
return False
except Exception as e:
print(f"Error in DELETE {url}: {e}")
return False
# Usage in views
async def sync_user_data(request, user_id):
"""Sync user data with external service."""
async with AsyncAPIClient('https://api.external-service.com') as client:
# Fetch user data from external service
external_user = await client.get(f'/users/{user_id}')
if not external_user:
return JsonResponse({'error': 'User not found in external service'}, status=404)
# Update local user data
from asgiref.sync import sync_to_async
@sync_to_async
def update_local_user():
try:
user = User.objects.get(id=user_id)
user.email = external_user.get('email', user.email)
user.first_name = external_user.get('first_name', user.first_name)
user.last_name = external_user.get('last_name', user.last_name)
user.save()
return user
except User.DoesNotExist:
return None
updated_user = await update_local_user()
if not updated_user:
return JsonResponse({'error': 'Local user not found'}, status=404)
return JsonResponse({
'message': 'User data synchronized',
'user': {
'id': updated_user.id,
'username': updated_user.username,
'email': updated_user.email,
'first_name': updated_user.first_name,
'last_name': updated_user.last_name
}
})
# views/webhooks.py
import json
import hmac
import hashlib
from django.http import JsonResponse, HttpResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_POST
from django.conf import settings
import asyncio
@csrf_exempt
@require_POST
async def github_webhook(request):
"""Handle GitHub webhook asynchronously."""
# Verify webhook signature
signature = request.META.get('HTTP_X_HUB_SIGNATURE_256')
if not signature:
return JsonResponse({'error': 'Missing signature'}, status=400)
body = await request.aread()
# Verify signature
expected_signature = 'sha256=' + hmac.new(
settings.GITHUB_WEBHOOK_SECRET.encode(),
body,
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(signature, expected_signature):
return JsonResponse({'error': 'Invalid signature'}, status=403)
try:
payload = json.loads(body)
event_type = request.META.get('HTTP_X_GITHUB_EVENT')
# Process webhook asynchronously
await process_github_webhook(event_type, payload)
return JsonResponse({'status': 'processed'})
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
async def process_github_webhook(event_type, payload):
"""Process GitHub webhook payload asynchronously."""
if event_type == 'push':
await handle_push_event(payload)
elif event_type == 'pull_request':
await handle_pull_request_event(payload)
elif event_type == 'issues':
await handle_issues_event(payload)
else:
print(f"Unhandled event type: {event_type}")
async def handle_push_event(payload):
"""Handle push event asynchronously."""
repository = payload.get('repository', {})
commits = payload.get('commits', [])
print(f"Push to {repository.get('full_name')}: {len(commits)} commits")
# Process commits concurrently
tasks = [process_commit(commit) for commit in commits]
await asyncio.gather(*tasks, return_exceptions=True)
async def process_commit(commit):
"""Process individual commit asynchronously."""
commit_id = commit.get('id')
message = commit.get('message')
author = commit.get('author', {})
print(f"Processing commit {commit_id[:8]}: {message}")
# Simulate async processing
await asyncio.sleep(0.1)
# Store commit data or trigger other actions
from asgiref.sync import sync_to_async
@sync_to_async
def store_commit():
# Store commit information in database
pass
await store_commit()
async def handle_pull_request_event(payload):
"""Handle pull request event asynchronously."""
action = payload.get('action')
pull_request = payload.get('pull_request', {})
print(f"Pull request {action}: #{pull_request.get('number')}")
if action == 'opened':
await process_new_pull_request(pull_request)
elif action == 'closed':
await process_closed_pull_request(pull_request)
async def process_new_pull_request(pull_request):
"""Process new pull request asynchronously."""
# Trigger CI/CD pipeline, send notifications, etc.
await asyncio.sleep(0.1) # Simulate processing
print(f"New PR processed: {pull_request.get('title')}")
async def process_closed_pull_request(pull_request):
"""Process closed pull request asynchronously."""
if pull_request.get('merged'):
# Handle merged PR
await asyncio.sleep(0.1)
print(f"PR merged: {pull_request.get('title')}")
else:
# Handle closed but not merged PR
print(f"PR closed without merge: {pull_request.get('title')}")
async def handle_issues_event(payload):
"""Handle issues event asynchronously."""
action = payload.get('action')
issue = payload.get('issue', {})
print(f"Issue {action}: #{issue.get('number')}")
# Process issue asynchronously
await asyncio.sleep(0.1)
# utils/async_db.py
import asyncio
import asyncpg
from django.conf import settings
class AsyncDatabasePool:
"""Async database connection pool."""
def __init__(self):
self.pool = None
async def initialize(self):
"""Initialize connection pool."""
if not self.pool:
database_url = settings.DATABASES['default']['NAME']
self.pool = await asyncpg.create_pool(
database_url,
min_size=5,
max_size=20,
command_timeout=60
)
async def close(self):
"""Close connection pool."""
if self.pool:
await self.pool.close()
async def fetch_one(self, query, *args):
"""Fetch single row."""
async with self.pool.acquire() as connection:
return await connection.fetchrow(query, *args)
async def fetch_all(self, query, *args):
"""Fetch all rows."""
async with self.pool.acquire() as connection:
return await connection.fetch(query, *args)
async def execute(self, query, *args):
"""Execute query."""
async with self.pool.acquire() as connection:
return await connection.execute(query, *args)
# Global pool instance
db_pool = AsyncDatabasePool()
# Usage in views
async def optimized_database_view(request):
"""View using optimized database connections."""
await db_pool.initialize()
# Fetch data using connection pool
posts = await db_pool.fetch_all(
"SELECT id, title, created_at FROM blog_post WHERE published = $1 ORDER BY created_at DESC LIMIT $2",
True, 10
)
return JsonResponse({
'posts': [
{
'id': post['id'],
'title': post['title'],
'created_at': post['created_at'].isoformat()
}
for post in posts
]
})
# views/cached.py
import asyncio
import json
from django.core.cache import cache
from django.http import JsonResponse
from asgiref.sync import sync_to_async
# Async cache operations
async_cache_get = sync_to_async(cache.get)
async_cache_set = sync_to_async(cache.set)
async_cache_delete = sync_to_async(cache.delete)
async def cached_async_view(request):
"""Async view with caching."""
cache_key = f"async_view_data_{request.GET.get('category', 'all')}"
# Try to get from cache
cached_data = await async_cache_get(cache_key)
if cached_data:
return JsonResponse(cached_data)
# Generate data asynchronously
data = await generate_expensive_data(request.GET.get('category'))
# Cache the result
await async_cache_set(cache_key, data, 300) # 5 minutes
return JsonResponse(data)
async def generate_expensive_data(category=None):
"""Generate expensive data asynchronously."""
# Simulate expensive operations
await asyncio.sleep(1)
return {
'category': category or 'all',
'data': [f'item_{i}' for i in range(100)],
'generated_at': asyncio.get_event_loop().time()
}
async def cache_warming_view(request):
"""Warm cache asynchronously."""
categories = ['tech', 'science', 'business', 'sports']
# Warm cache for all categories concurrently
tasks = []
for category in categories:
cache_key = f"async_view_data_{category}"
task = warm_cache_for_category(cache_key, category)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
successful = sum(1 for r in results if not isinstance(r, Exception))
failed = len(results) - successful
return JsonResponse({
'message': 'Cache warming completed',
'successful': successful,
'failed': failed
})
async def warm_cache_for_category(cache_key, category):
"""Warm cache for specific category."""
data = await generate_expensive_data(category)
await async_cache_set(cache_key, data, 300)
return f"Warmed cache for {category}"
Async views in Django enable building high-performance applications that can handle thousands of concurrent requests efficiently. The key is understanding when to use async views (I/O-bound operations), properly handling database operations with sync_to_async, and implementing effective error handling and resource management patterns. Start with simple async views and gradually implement more sophisticated patterns as your application's concurrency requirements grow.
Introduction to ASGI
ASGI (Asynchronous Server Gateway Interface) is the spiritual successor to WSGI, designed to handle both synchronous and asynchronous Python web applications. Understanding ASGI is fundamental to building modern Django applications that support WebSockets, HTTP/2, and high-concurrency scenarios while maintaining compatibility with traditional synchronous code.
Async ORM Status
Django's async ORM support has evolved significantly since Django 3.1, but it's still a work in progress. Understanding the current capabilities, limitations, and best practices for async database operations is crucial for building efficient async Django applications. This chapter covers the current state of async ORM, workarounds for limitations, and strategies for optimal database performance in async contexts.