Django's async ORM support has evolved significantly since Django 3.1, but it's still a work in progress. Understanding the current capabilities, limitations, and best practices for async database operations is crucial for building efficient async Django applications. This chapter covers the current state of async ORM, workarounds for limitations, and strategies for optimal database performance in async contexts.
# models.py
from django.db import models
class Post(models.Model):
title = models.CharField(max_length=200)
content = models.TextField()
published = models.BooleanField(default=False)
created_at = models.DateTimeField(auto_now_add=True)
views = models.IntegerField(default=0)
class Meta:
ordering = ['-created_at']
class Comment(models.Model):
post = models.ForeignKey(Post, on_delete=models.CASCADE, related_name='comments')
content = models.TextField()
created_at = models.DateTimeField(auto_now_add=True)
# Async ORM operations that work
async def async_orm_examples():
"""Examples of supported async ORM operations."""
# Basic queries
posts = []
async for post in Post.objects.filter(published=True):
posts.append(post)
# Count operations
total_posts = await Post.objects.acount()
published_count = await Post.objects.filter(published=True).acount()
# Exists checks
has_posts = await Post.objects.aexists()
has_published = await Post.objects.filter(published=True).aexists()
# Get operations
from django.shortcuts import aget_object_or_404
try:
post = await Post.objects.aget(id=1)
# or using the shortcut
post = await aget_object_or_404(Post, id=1)
except Post.DoesNotExist:
post = None
# First/Last operations
latest_post = await Post.objects.filter(published=True).afirst()
oldest_post = await Post.objects.filter(published=True).alast()
# Bulk operations
await Post.objects.filter(published=False).aupdate(published=True)
await Post.objects.filter(views=0).adelete()
# Aggregation
from django.db.models import Count, Avg, Max, Min, Sum
stats = await Post.objects.aaggregate(
total_posts=Count('id'),
avg_views=Avg('views'),
max_views=Max('views'),
min_views=Min('views'),
total_views=Sum('views')
)
return {
'posts': posts,
'total_posts': total_posts,
'published_count': published_count,
'latest_post': latest_post,
'stats': stats
}
# views.py
from django.http import JsonResponse
from .models import Post, Comment
async def async_queryset_examples(request):
"""Examples of async QuerySet methods."""
# Filtering and iteration
published_posts = []
async for post in Post.objects.filter(published=True).order_by('-created_at'):
published_posts.append({
'id': post.id,
'title': post.title,
'views': post.views
})
# Slicing (limited support)
recent_posts = []
async for post in Post.objects.filter(published=True)[:10]:
recent_posts.append(post.title)
# Counting
total_published = await Post.objects.filter(published=True).acount()
# Existence checks
has_popular_posts = await Post.objects.filter(views__gte=1000).aexists()
# Getting single objects
try:
most_viewed = await Post.objects.filter(published=True).order_by('-views').afirst()
most_viewed_title = most_viewed.title if most_viewed else None
except Exception:
most_viewed_title = None
# Aggregation
view_stats = await Post.objects.filter(published=True).aaggregate(
total_views=models.Sum('views'),
avg_views=models.Avg('views'),
max_views=models.Max('views')
)
return JsonResponse({
'published_posts': published_posts[:5], # Limit for response size
'recent_posts': recent_posts,
'total_published': total_published,
'has_popular_posts': has_popular_posts,
'most_viewed_title': most_viewed_title,
'view_stats': view_stats
})
async def async_bulk_operations(request):
"""Examples of async bulk operations."""
import json
try:
body = await request.aread()
data = json.loads(body)
# Bulk update
updated_count = await Post.objects.filter(
id__in=data.get('post_ids', [])
).aupdate(published=True)
# Bulk delete (be careful!)
if data.get('delete_unpublished'):
deleted_count = await Post.objects.filter(
published=False,
views=0
).adelete()
else:
deleted_count = 0
return JsonResponse({
'updated_count': updated_count,
'deleted_count': deleted_count
})
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
# What doesn't work yet (as of Django 4.2)
async def async_orm_limitations():
"""Examples of async ORM limitations and workarounds."""
# ❌ These operations are NOT supported asynchronously:
# 1. Creating objects
# post = await Post.objects.acreate(title="Test") # Not available
# 2. Saving objects
# post = Post(title="Test")
# await post.asave() # Not available
# 3. Related field access
# post = await Post.objects.aget(id=1)
# comments = await post.comments.all() # Not available
# 4. Many-to-many operations
# await post.tags.aadd(tag) # Not available
# 5. Complex joins and select_related
# posts = await Post.objects.select_related('author').all() # Limited
# 6. Transactions
# async with transaction.atomic(): # Not available
# await Post.objects.acreate(...)
pass
# Workarounds using sync_to_async
from asgiref.sync import sync_to_async
from django.db import transaction
@sync_to_async
def create_post_sync(title, content, published=False):
"""Create post synchronously, wrapped for async use."""
return Post.objects.create(
title=title,
content=content,
published=published
)
@sync_to_async
def update_post_sync(post_id, **kwargs):
"""Update post synchronously, wrapped for async use."""
post = Post.objects.get(id=post_id)
for key, value in kwargs.items():
setattr(post, key, value)
post.save()
return post
@sync_to_async
def get_post_with_comments_sync(post_id):
"""Get post with comments synchronously, wrapped for async use."""
return Post.objects.select_related().prefetch_related('comments').get(id=post_id)
@sync_to_async
@transaction.atomic
def create_post_with_comments_sync(post_data, comments_data):
"""Create post with comments in transaction."""
post = Post.objects.create(**post_data)
for comment_data in comments_data:
Comment.objects.create(post=post, **comment_data)
return post
async def async_view_with_workarounds(request):
"""Async view using sync_to_async workarounds."""
import json
if request.method == 'POST':
try:
body = await request.aread()
data = json.loads(body)
# Create post using sync_to_async wrapper
post = await create_post_sync(
title=data['title'],
content=data['content'],
published=data.get('published', False)
)
return JsonResponse({
'id': post.id,
'title': post.title,
'published': post.published
})
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
elif request.method == 'GET':
# Use async ORM where possible
posts = []
async for post in Post.objects.filter(published=True)[:10]:
posts.append({
'id': post.id,
'title': post.title,
'created_at': post.created_at.isoformat()
})
return JsonResponse({'posts': posts})
# utils/async_db.py
from django.db import connections
from asgiref.sync import sync_to_async
import asyncio
class AsyncDatabaseManager:
"""Manage database connections in async context."""
@staticmethod
@sync_to_async
def close_old_connections():
"""Close old database connections."""
for conn in connections.all():
conn.close_if_unusable_or_obsolete()
@staticmethod
@sync_to_async
def ensure_connection():
"""Ensure database connection is available."""
from django.db import connection
connection.ensure_connection()
@staticmethod
async def with_connection_management(coro):
"""Execute coroutine with proper connection management."""
try:
await AsyncDatabaseManager.ensure_connection()
result = await coro
return result
finally:
await AsyncDatabaseManager.close_old_connections()
# Usage in views
async def managed_database_view(request):
"""View with proper database connection management."""
async def database_operations():
# Perform database operations
posts = []
async for post in Post.objects.filter(published=True)[:5]:
posts.append({
'id': post.id,
'title': post.title
})
return posts
# Execute with connection management
posts = await AsyncDatabaseManager.with_connection_management(
database_operations()
)
return JsonResponse({'posts': posts})
# views.py
from django.http import JsonResponse
from asgiref.sync import sync_to_async
import asyncio
async def efficient_async_queries(request):
"""Examples of efficient async query patterns."""
# ✅ Good: Use async iteration for large datasets
posts = []
async for post in Post.objects.filter(published=True).order_by('-created_at'):
posts.append({
'id': post.id,
'title': post.title,
'views': post.views
})
# Break early if needed
if len(posts) >= 100:
break
# ✅ Good: Use async aggregation
stats = await Post.objects.filter(published=True).aaggregate(
total_posts=models.Count('id'),
total_views=models.Sum('views'),
avg_views=models.Avg('views')
)
# ✅ Good: Use async count for existence checks
has_posts = await Post.objects.filter(published=True).aexists()
# ✅ Good: Combine sync_to_async for complex operations
@sync_to_async
def get_posts_with_relations():
return list(
Post.objects.select_related('author')
.prefetch_related('comments')
.filter(published=True)[:10]
)
posts_with_relations = await get_posts_with_relations()
return JsonResponse({
'posts_count': len(posts),
'stats': stats,
'has_posts': has_posts,
'posts_with_relations': [
{
'id': p.id,
'title': p.title,
'author': p.author.username if hasattr(p, 'author') else None,
'comments_count': p.comments.count()
}
for p in posts_with_relations
]
})
async def batch_async_operations(request):
"""Efficient batch operations with async ORM."""
# Get multiple objects concurrently
async def get_post_stats(post_id):
try:
post = await Post.objects.aget(id=post_id)
comment_count = await Comment.objects.filter(post=post).acount()
return {
'post_id': post_id,
'title': post.title,
'comment_count': comment_count,
'views': post.views
}
except Post.DoesNotExist:
return {'post_id': post_id, 'error': 'Not found'}
# Process multiple posts concurrently
post_ids = [1, 2, 3, 4, 5]
tasks = [get_post_stats(post_id) for post_id in post_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter successful results
successful_results = [
result for result in results
if isinstance(result, dict) and 'error' not in result
]
return JsonResponse({
'results': successful_results,
'processed': len(results),
'successful': len(successful_results)
})
# utils/async_transactions.py
from django.db import transaction
from asgiref.sync import sync_to_async
import asyncio
class AsyncTransactionManager:
"""Handle transactions in async context."""
@staticmethod
@sync_to_async
@transaction.atomic
def atomic_operation(operation_func, *args, **kwargs):
"""Execute operation in atomic transaction."""
return operation_func(*args, **kwargs)
@staticmethod
@sync_to_async
def bulk_create_posts(posts_data):
"""Bulk create posts in transaction."""
with transaction.atomic():
posts = []
for post_data in posts_data:
post = Post.objects.create(**post_data)
posts.append(post)
return posts
@staticmethod
@sync_to_async
def transfer_post_ownership(from_user_id, to_user_id, post_ids):
"""Transfer post ownership atomically."""
with transaction.atomic():
# Verify users exist
from django.contrib.auth.models import User
from_user = User.objects.get(id=from_user_id)
to_user = User.objects.get(id=to_user_id)
# Update posts
updated_count = Post.objects.filter(
id__in=post_ids,
author=from_user
).update(author=to_user)
return {
'from_user': from_user.username,
'to_user': to_user.username,
'updated_count': updated_count
}
# Usage in views
async def atomic_post_creation(request):
"""Create multiple posts atomically."""
import json
try:
body = await request.aread()
data = json.loads(body)
posts_data = data.get('posts', [])
if not posts_data:
return JsonResponse({'error': 'No posts data provided'}, status=400)
# Create posts atomically
created_posts = await AsyncTransactionManager.bulk_create_posts(posts_data)
return JsonResponse({
'created_posts': [
{
'id': post.id,
'title': post.title,
'published': post.published
}
for post in created_posts
],
'total_created': len(created_posts)
})
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
async def transfer_ownership(request):
"""Transfer post ownership atomically."""
import json
try:
body = await request.aread()
data = json.loads(body)
result = await AsyncTransactionManager.transfer_post_ownership(
from_user_id=data['from_user_id'],
to_user_id=data['to_user_id'],
post_ids=data['post_ids']
)
return JsonResponse(result)
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
# settings.py
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'myproject',
'USER': 'myuser',
'PASSWORD': 'mypassword',
'HOST': 'localhost',
'PORT': '5432',
'OPTIONS': {
# Connection pooling options
'MAX_CONNS': 20,
'MIN_CONNS': 5,
},
'CONN_MAX_AGE': 600, # 10 minutes
'CONN_HEALTH_CHECKS': True,
}
}
# For production with connection pooling
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'myproject',
'USER': 'myuser',
'PASSWORD': 'mypassword',
'HOST': 'localhost',
'PORT': '5432',
'OPTIONS': {
'MAX_CONNS': 50,
'MIN_CONNS': 10,
'server_side_binding': True,
},
'CONN_MAX_AGE': 0, # Persistent connections
'CONN_HEALTH_CHECKS': True,
}
}
# utils/async_queries.py
from asgiref.sync import sync_to_async
from django.db.models import Prefetch
import asyncio
class AsyncQueryOptimizer:
"""Optimize async database queries."""
@staticmethod
@sync_to_async
def get_posts_optimized(limit=10):
"""Get posts with optimized queries."""
return list(
Post.objects
.select_related('author')
.prefetch_related(
Prefetch(
'comments',
queryset=Comment.objects.select_related('author')
)
)
.filter(published=True)
.order_by('-created_at')[:limit]
)
@staticmethod
async def get_posts_stats_concurrent(post_ids):
"""Get post statistics concurrently."""
async def get_single_post_stats(post_id):
try:
post = await Post.objects.aget(id=post_id)
comment_count = await Comment.objects.filter(post_id=post_id).acount()
return {
'post_id': post_id,
'title': post.title,
'views': post.views,
'comment_count': comment_count
}
except Post.DoesNotExist:
return None
# Execute queries concurrently
tasks = [get_single_post_stats(post_id) for post_id in post_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter successful results
return [r for r in results if r is not None and not isinstance(r, Exception)]
@staticmethod
@sync_to_async
def bulk_update_views(post_ids, increment=1):
"""Bulk update post views."""
from django.db.models import F
return Post.objects.filter(id__in=post_ids).update(
views=F('views') + increment
)
# Usage in views
async def optimized_post_list(request):
"""Optimized post list view."""
# Get posts with optimized query
posts = await AsyncQueryOptimizer.get_posts_optimized(limit=20)
# Get additional stats concurrently
post_ids = [post.id for post in posts]
stats = await AsyncQueryOptimizer.get_posts_stats_concurrent(post_ids[:5])
# Update view counts
await AsyncQueryOptimizer.bulk_update_views(post_ids)
return JsonResponse({
'posts': [
{
'id': post.id,
'title': post.title,
'author': post.author.username,
'comments_count': post.comments.count(),
'views': post.views
}
for post in posts
],
'stats': stats
})
# utils/async_db_monitor.py
import time
import logging
from django.db import connections
from asgiref.sync import sync_to_async
logger = logging.getLogger('async_db')
class AsyncDatabaseMonitor:
"""Monitor async database operations."""
@staticmethod
@sync_to_async
def get_connection_info():
"""Get database connection information."""
connection_info = {}
for alias, connection in connections.all():
connection_info[alias] = {
'vendor': connection.vendor,
'is_usable': connection.is_usable(),
'queries_count': len(connection.queries),
'total_time': sum(float(q['time']) for q in connection.queries)
}
return connection_info
@staticmethod
async def monitor_query_performance(query_func, *args, **kwargs):
"""Monitor query performance."""
start_time = time.time()
try:
result = await query_func(*args, **kwargs)
duration = time.time() - start_time
logger.info(f"Query completed in {duration:.3f}s: {query_func.__name__}")
return result
except Exception as e:
duration = time.time() - start_time
logger.error(f"Query failed after {duration:.3f}s: {query_func.__name__} - {e}")
raise
# Usage
async def monitored_database_view(request):
"""View with database monitoring."""
# Monitor query performance
async def get_posts():
posts = []
async for post in Post.objects.filter(published=True)[:10]:
posts.append(post)
return posts
posts = await AsyncDatabaseMonitor.monitor_query_performance(get_posts)
# Get connection info
connection_info = await AsyncDatabaseMonitor.get_connection_info()
return JsonResponse({
'posts_count': len(posts),
'connection_info': connection_info
})
Django's async ORM support continues to evolve, with each version adding new capabilities. While not all ORM operations are async-native yet, understanding the current limitations and using appropriate workarounds with sync_to_async enables building efficient async applications. The key is knowing when to use native async operations versus wrapped synchronous operations, and implementing proper connection management and query optimization strategies.
Async Views
Django's async views enable handling high-concurrency scenarios efficiently by allowing views to perform I/O operations without blocking the server thread. This chapter covers implementing async views, handling concurrent operations, integrating with external services, and optimizing performance for I/O-bound operations.
WebSockets with Channels
Django Channels extends Django to handle WebSockets, HTTP/2, and other protocols beyond traditional HTTP. This enables building real-time applications like chat systems, live notifications, collaborative tools, and streaming dashboards. This chapter covers Channels architecture, WebSocket consumers, real-time communication patterns, and production deployment strategies.