Asynchronous Django

Async Views

Django's async views enable handling high-concurrency scenarios efficiently by allowing views to perform I/O operations without blocking the server thread. This chapter covers implementing async views, handling concurrent operations, integrating with external services, and optimizing performance for I/O-bound operations.

Async Views

Django's async views enable handling high-concurrency scenarios efficiently by allowing views to perform I/O operations without blocking the server thread. This chapter covers implementing async views, handling concurrent operations, integrating with external services, and optimizing performance for I/O-bound operations.

Basic Async Views

Function-Based Async Views

# views.py
import asyncio
import aiohttp
from django.http import JsonResponse
from django.shortcuts import aget_object_or_404
from django.views.decorators.http import require_http_methods
from django.views.decorators.csrf import csrf_exempt
from asgiref.sync import sync_to_async
from .models import Post, User

async def async_hello(request):
    """Basic async view."""
    await asyncio.sleep(0.1)  # Simulate async operation
    return JsonResponse({'message': 'Hello from async view!'})

async def async_post_list(request):
    """Async view with database operations."""
    # Using async database operations (Django 4.1+)
    posts = []
    async for post in Post.objects.filter(published=True):
        posts.append({
            'id': post.id,
            'title': post.title,
            'created_at': post.created_at.isoformat()
        })
    
    return JsonResponse({'posts': posts})

async def async_post_detail(request, post_id):
    """Async view with get_object_or_404 equivalent."""
    try:
        # Using aget_object_or_404 for async get operations
        post = await aget_object_or_404(Post, id=post_id, published=True)
        
        # Fetch related data concurrently
        author_task = sync_to_async(lambda: post.author)()
        comments_task = sync_to_async(lambda: list(post.comments.all()))()
        
        author, comments = await asyncio.gather(author_task, comments_task)
        
        return JsonResponse({
            'id': post.id,
            'title': post.title,
            'content': post.content,
            'author': author.username,
            'comments_count': len(comments)
        })
    
    except Post.DoesNotExist:
        return JsonResponse({'error': 'Post not found'}, status=404)

@require_http_methods(["GET", "POST"])
async def async_api_endpoint(request):
    """Async view handling multiple HTTP methods."""
    if request.method == 'GET':
        # Simulate fetching data from external API
        async with aiohttp.ClientSession() as session:
            async with session.get('https://api.example.com/data') as response:
                if response.status == 200:
                    data = await response.json()
                    return JsonResponse(data)
                else:
                    return JsonResponse({'error': 'External API error'}, status=502)
    
    elif request.method == 'POST':
        # Handle POST request asynchronously
        import json
        
        try:
            body = await request.aread()
            data = json.loads(body)
            
            # Process data asynchronously
            result = await process_data_async(data)
            
            return JsonResponse({'result': result, 'status': 'success'})
        
        except json.JSONDecodeError:
            return JsonResponse({'error': 'Invalid JSON'}, status=400)

async def process_data_async(data):
    """Async data processing function."""
    # Simulate async processing
    await asyncio.sleep(0.5)
    return f"Processed: {data.get('message', 'No message')}"

Class-Based Async Views

# views.py
from django.views import View
from django.http import JsonResponse
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
import asyncio
import json

class AsyncView(View):
    """Base class for async views."""
    
    async def dispatch(self, request, *args, **kwargs):
        """Async dispatch method."""
        handler = getattr(self, request.method.lower(), self.http_method_not_allowed)
        
        if asyncio.iscoroutinefunction(handler):
            return await handler(request, *args, **kwargs)
        else:
            # Handle sync methods
            return handler(request, *args, **kwargs)

class AsyncPostListView(AsyncView):
    """Async class-based view for post listing."""
    
    async def get(self, request):
        """Handle GET requests asynchronously."""
        page = int(request.GET.get('page', 1))
        limit = int(request.GET.get('limit', 10))
        
        # Calculate offset
        offset = (page - 1) * limit
        
        # Fetch posts asynchronously
        posts = []
        queryset = Post.objects.filter(published=True)[offset:offset + limit]
        
        async for post in queryset:
            posts.append({
                'id': post.id,
                'title': post.title,
                'slug': post.slug,
                'created_at': post.created_at.isoformat()
            })
        
        # Get total count asynchronously
        total_count = await Post.objects.filter(published=True).acount()
        
        return JsonResponse({
            'posts': posts,
            'pagination': {
                'page': page,
                'limit': limit,
                'total': total_count,
                'has_next': offset + limit < total_count
            }
        })

@method_decorator(csrf_exempt, name='dispatch')
class AsyncAPIView(AsyncView):
    """Generic async API view."""
    
    async def get(self, request, *args, **kwargs):
        """Handle GET requests."""
        return await self.list(request, *args, **kwargs)
    
    async def post(self, request, *args, **kwargs):
        """Handle POST requests."""
        return await self.create(request, *args, **kwargs)
    
    async def put(self, request, *args, **kwargs):
        """Handle PUT requests."""
        return await self.update(request, *args, **kwargs)
    
    async def delete(self, request, *args, **kwargs):
        """Handle DELETE requests."""
        return await self.destroy(request, *args, **kwargs)
    
    async def list(self, request, *args, **kwargs):
        """List resources."""
        return JsonResponse({'message': 'List method not implemented'})
    
    async def create(self, request, *args, **kwargs):
        """Create resource."""
        try:
            body = await request.aread()
            data = json.loads(body)
            
            # Validate and create resource
            result = await self.perform_create(data)
            
            return JsonResponse(result, status=201)
        
        except json.JSONDecodeError:
            return JsonResponse({'error': 'Invalid JSON'}, status=400)
        except Exception as e:
            return JsonResponse({'error': str(e)}, status=500)
    
    async def update(self, request, *args, **kwargs):
        """Update resource."""
        try:
            body = await request.aread()
            data = json.loads(body)
            
            result = await self.perform_update(data, *args, **kwargs)
            
            return JsonResponse(result)
        
        except json.JSONDecodeError:
            return JsonResponse({'error': 'Invalid JSON'}, status=400)
        except Exception as e:
            return JsonResponse({'error': str(e)}, status=500)
    
    async def destroy(self, request, *args, **kwargs):
        """Delete resource."""
        try:
            await self.perform_destroy(*args, **kwargs)
            return JsonResponse({'message': 'Resource deleted'}, status=204)
        
        except Exception as e:
            return JsonResponse({'error': str(e)}, status=500)
    
    async def perform_create(self, data):
        """Override in subclasses."""
        raise NotImplementedError
    
    async def perform_update(self, data, *args, **kwargs):
        """Override in subclasses."""
        raise NotImplementedError
    
    async def perform_destroy(self, *args, **kwargs):
        """Override in subclasses."""
        raise NotImplementedError

class PostAPIView(AsyncAPIView):
    """Async API view for posts."""
    
    async def list(self, request):
        """List posts asynchronously."""
        posts = []
        async for post in Post.objects.filter(published=True):
            posts.append({
                'id': post.id,
                'title': post.title,
                'slug': post.slug
            })
        
        return JsonResponse({'posts': posts})
    
    async def perform_create(self, data):
        """Create post asynchronously."""
        # Note: Django's async ORM support is limited
        # Use sync_to_async for complex operations
        from asgiref.sync import sync_to_async
        
        @sync_to_async
        def create_post():
            return Post.objects.create(
                title=data['title'],
                content=data['content'],
                published=data.get('published', False)
            )
        
        post = await create_post()
        
        return {
            'id': post.id,
            'title': post.title,
            'slug': post.slug
        }
    
    async def perform_update(self, data, post_id):
        """Update post asynchronously."""
        from asgiref.sync import sync_to_async
        
        @sync_to_async
        def update_post():
            post = Post.objects.get(id=post_id)
            post.title = data.get('title', post.title)
            post.content = data.get('content', post.content)
            post.published = data.get('published', post.published)
            post.save()
            return post
        
        post = await update_post()
        
        return {
            'id': post.id,
            'title': post.title,
            'slug': post.slug
        }
    
    async def perform_destroy(self, post_id):
        """Delete post asynchronously."""
        from asgiref.sync import sync_to_async
        
        @sync_to_async
        def delete_post():
            post = Post.objects.get(id=post_id)
            post.delete()
        
        await delete_post()

Concurrent Operations

Parallel API Calls

# views.py
import aiohttp
import asyncio
from django.http import JsonResponse

async def fetch_user_data(session, user_id):
    """Fetch user data from external API."""
    url = f"https://api.example.com/users/{user_id}"
    
    try:
        async with session.get(url) as response:
            if response.status == 200:
                return await response.json()
            else:
                return None
    except Exception as e:
        print(f"Error fetching user {user_id}: {e}")
        return None

async def fetch_user_posts(session, user_id):
    """Fetch user posts from external API."""
    url = f"https://api.example.com/users/{user_id}/posts"
    
    try:
        async with session.get(url) as response:
            if response.status == 200:
                return await response.json()
            else:
                return []
    except Exception as e:
        print(f"Error fetching posts for user {user_id}: {e}")
        return []

async def fetch_user_followers(session, user_id):
    """Fetch user followers from external API."""
    url = f"https://api.example.com/users/{user_id}/followers"
    
    try:
        async with session.get(url) as response:
            if response.status == 200:
                return await response.json()
            else:
                return []
    except Exception as e:
        print(f"Error fetching followers for user {user_id}: {e}")
        return []

async def user_dashboard(request, user_id):
    """Async view that fetches data from multiple sources concurrently."""
    async with aiohttp.ClientSession() as session:
        # Fetch all data concurrently
        user_data_task = fetch_user_data(session, user_id)
        user_posts_task = fetch_user_posts(session, user_id)
        user_followers_task = fetch_user_followers(session, user_id)
        
        # Wait for all tasks to complete
        user_data, user_posts, user_followers = await asyncio.gather(
            user_data_task,
            user_posts_task,
            user_followers_task,
            return_exceptions=True
        )
        
        # Handle exceptions
        if isinstance(user_data, Exception):
            user_data = None
        if isinstance(user_posts, Exception):
            user_posts = []
        if isinstance(user_followers, Exception):
            user_followers = []
        
        # Combine results
        dashboard_data = {
            'user': user_data,
            'posts': user_posts,
            'followers': user_followers,
            'stats': {
                'posts_count': len(user_posts) if user_posts else 0,
                'followers_count': len(user_followers) if user_followers else 0
            }
        }
        
        return JsonResponse(dashboard_data)

async def batch_process_users(request):
    """Process multiple users concurrently."""
    user_ids = request.GET.get('user_ids', '').split(',')
    
    if not user_ids or user_ids == ['']:
        return JsonResponse({'error': 'No user IDs provided'}, status=400)
    
    async with aiohttp.ClientSession() as session:
        # Create tasks for all users
        tasks = []
        for user_id in user_ids:
            if user_id.strip():
                task = fetch_user_data(session, user_id.strip())
                tasks.append(task)
        
        # Execute all tasks concurrently
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Process results
        processed_users = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                processed_users.append({
                    'user_id': user_ids[i],
                    'error': str(result)
                })
            else:
                processed_users.append({
                    'user_id': user_ids[i],
                    'data': result
                })
        
        return JsonResponse({
            'processed_users': processed_users,
            'total_processed': len(processed_users)
        })

Database Operations with Concurrency

# views.py
import asyncio
from django.http import JsonResponse
from asgiref.sync import sync_to_async
from django.db import transaction
from .models import Post, Comment, User

async def concurrent_database_operations(request):
    """Perform multiple database operations concurrently."""
    
    # Define async database operations
    @sync_to_async
    def get_recent_posts():
        return list(Post.objects.filter(published=True).order_by('-created_at')[:10])
    
    @sync_to_async
    def get_popular_posts():
        return list(Post.objects.filter(published=True).order_by('-views')[:10])
    
    @sync_to_async
    def get_recent_comments():
        return list(Comment.objects.select_related('post').order_by('-created_at')[:10])
    
    @sync_to_async
    def get_active_users():
        return list(User.objects.filter(is_active=True).order_by('-last_login')[:10])
    
    # Execute all queries concurrently
    recent_posts, popular_posts, recent_comments, active_users = await asyncio.gather(
        get_recent_posts(),
        get_popular_posts(),
        get_recent_comments(),
        get_active_users()
    )
    
    # Format response
    return JsonResponse({
        'recent_posts': [{'id': p.id, 'title': p.title} for p in recent_posts],
        'popular_posts': [{'id': p.id, 'title': p.title, 'views': p.views} for p in popular_posts],
        'recent_comments': [{'id': c.id, 'post_title': c.post.title} for c in recent_comments],
        'active_users': [{'id': u.id, 'username': u.username} for u in active_users]
    })

async def bulk_create_posts(request):
    """Create multiple posts concurrently."""
    import json
    
    try:
        body = await request.aread()
        data = json.loads(body)
        posts_data = data.get('posts', [])
        
        if not posts_data:
            return JsonResponse({'error': 'No posts data provided'}, status=400)
        
        # Define async post creation function
        @sync_to_async
        def create_post(post_data):
            return Post.objects.create(
                title=post_data['title'],
                content=post_data['content'],
                published=post_data.get('published', False)
            )
        
        # Create all posts concurrently
        tasks = [create_post(post_data) for post_data in posts_data]
        created_posts = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Process results
        successful_posts = []
        failed_posts = []
        
        for i, result in enumerate(created_posts):
            if isinstance(result, Exception):
                failed_posts.append({
                    'index': i,
                    'error': str(result),
                    'data': posts_data[i]
                })
            else:
                successful_posts.append({
                    'id': result.id,
                    'title': result.title,
                    'slug': result.slug
                })
        
        return JsonResponse({
            'successful_posts': successful_posts,
            'failed_posts': failed_posts,
            'total_created': len(successful_posts)
        })
    
    except json.JSONDecodeError:
        return JsonResponse({'error': 'Invalid JSON'}, status=400)
    except Exception as e:
        return JsonResponse({'error': str(e)}, status=500)

External Service Integration

HTTP Client Integration

# services/external_api.py
import aiohttp
import asyncio
from typing import Optional, Dict, Any
from django.conf import settings

class AsyncAPIClient:
    """Async client for external API integration."""
    
    def __init__(self, base_url: str, timeout: int = 30):
        self.base_url = base_url.rstrip('/')
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.session = None
    
    async def __aenter__(self):
        """Async context manager entry."""
        self.session = aiohttp.ClientSession(
            timeout=self.timeout,
            headers={'User-Agent': 'Django-AsyncClient/1.0'}
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit."""
        if self.session:
            await self.session.close()
    
    async def get(self, endpoint: str, params: Optional[Dict] = None) -> Optional[Dict[str, Any]]:
        """Make async GET request."""
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        
        try:
            async with self.session.get(url, params=params) as response:
                if response.status == 200:
                    return await response.json()
                else:
                    print(f"GET {url} returned {response.status}")
                    return None
        except asyncio.TimeoutError:
            print(f"Timeout for GET {url}")
            return None
        except Exception as e:
            print(f"Error in GET {url}: {e}")
            return None
    
    async def post(self, endpoint: str, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """Make async POST request."""
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        
        try:
            async with self.session.post(url, json=data) as response:
                if response.status in [200, 201]:
                    return await response.json()
                else:
                    print(f"POST {url} returned {response.status}")
                    return None
        except asyncio.TimeoutError:
            print(f"Timeout for POST {url}")
            return None
        except Exception as e:
            print(f"Error in POST {url}: {e}")
            return None
    
    async def put(self, endpoint: str, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """Make async PUT request."""
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        
        try:
            async with self.session.put(url, json=data) as response:
                if response.status == 200:
                    return await response.json()
                else:
                    print(f"PUT {url} returned {response.status}")
                    return None
        except asyncio.TimeoutError:
            print(f"Timeout for PUT {url}")
            return None
        except Exception as e:
            print(f"Error in PUT {url}: {e}")
            return None
    
    async def delete(self, endpoint: str) -> bool:
        """Make async DELETE request."""
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        
        try:
            async with self.session.delete(url) as response:
                return response.status in [200, 204]
        except asyncio.TimeoutError:
            print(f"Timeout for DELETE {url}")
            return False
        except Exception as e:
            print(f"Error in DELETE {url}: {e}")
            return False

# Usage in views
async def sync_user_data(request, user_id):
    """Sync user data with external service."""
    async with AsyncAPIClient('https://api.external-service.com') as client:
        # Fetch user data from external service
        external_user = await client.get(f'/users/{user_id}')
        
        if not external_user:
            return JsonResponse({'error': 'User not found in external service'}, status=404)
        
        # Update local user data
        from asgiref.sync import sync_to_async
        
        @sync_to_async
        def update_local_user():
            try:
                user = User.objects.get(id=user_id)
                user.email = external_user.get('email', user.email)
                user.first_name = external_user.get('first_name', user.first_name)
                user.last_name = external_user.get('last_name', user.last_name)
                user.save()
                return user
            except User.DoesNotExist:
                return None
        
        updated_user = await update_local_user()
        
        if not updated_user:
            return JsonResponse({'error': 'Local user not found'}, status=404)
        
        return JsonResponse({
            'message': 'User data synchronized',
            'user': {
                'id': updated_user.id,
                'username': updated_user.username,
                'email': updated_user.email,
                'first_name': updated_user.first_name,
                'last_name': updated_user.last_name
            }
        })

Webhook Processing

# views/webhooks.py
import json
import hmac
import hashlib
from django.http import JsonResponse, HttpResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_POST
from django.conf import settings
import asyncio

@csrf_exempt
@require_POST
async def github_webhook(request):
    """Handle GitHub webhook asynchronously."""
    # Verify webhook signature
    signature = request.META.get('HTTP_X_HUB_SIGNATURE_256')
    if not signature:
        return JsonResponse({'error': 'Missing signature'}, status=400)
    
    body = await request.aread()
    
    # Verify signature
    expected_signature = 'sha256=' + hmac.new(
        settings.GITHUB_WEBHOOK_SECRET.encode(),
        body,
        hashlib.sha256
    ).hexdigest()
    
    if not hmac.compare_digest(signature, expected_signature):
        return JsonResponse({'error': 'Invalid signature'}, status=403)
    
    try:
        payload = json.loads(body)
        event_type = request.META.get('HTTP_X_GITHUB_EVENT')
        
        # Process webhook asynchronously
        await process_github_webhook(event_type, payload)
        
        return JsonResponse({'status': 'processed'})
    
    except json.JSONDecodeError:
        return JsonResponse({'error': 'Invalid JSON'}, status=400)
    except Exception as e:
        return JsonResponse({'error': str(e)}, status=500)

async def process_github_webhook(event_type, payload):
    """Process GitHub webhook payload asynchronously."""
    if event_type == 'push':
        await handle_push_event(payload)
    elif event_type == 'pull_request':
        await handle_pull_request_event(payload)
    elif event_type == 'issues':
        await handle_issues_event(payload)
    else:
        print(f"Unhandled event type: {event_type}")

async def handle_push_event(payload):
    """Handle push event asynchronously."""
    repository = payload.get('repository', {})
    commits = payload.get('commits', [])
    
    print(f"Push to {repository.get('full_name')}: {len(commits)} commits")
    
    # Process commits concurrently
    tasks = [process_commit(commit) for commit in commits]
    await asyncio.gather(*tasks, return_exceptions=True)

async def process_commit(commit):
    """Process individual commit asynchronously."""
    commit_id = commit.get('id')
    message = commit.get('message')
    author = commit.get('author', {})
    
    print(f"Processing commit {commit_id[:8]}: {message}")
    
    # Simulate async processing
    await asyncio.sleep(0.1)
    
    # Store commit data or trigger other actions
    from asgiref.sync import sync_to_async
    
    @sync_to_async
    def store_commit():
        # Store commit information in database
        pass
    
    await store_commit()

async def handle_pull_request_event(payload):
    """Handle pull request event asynchronously."""
    action = payload.get('action')
    pull_request = payload.get('pull_request', {})
    
    print(f"Pull request {action}: #{pull_request.get('number')}")
    
    if action == 'opened':
        await process_new_pull_request(pull_request)
    elif action == 'closed':
        await process_closed_pull_request(pull_request)

async def process_new_pull_request(pull_request):
    """Process new pull request asynchronously."""
    # Trigger CI/CD pipeline, send notifications, etc.
    await asyncio.sleep(0.1)  # Simulate processing
    print(f"New PR processed: {pull_request.get('title')}")

async def process_closed_pull_request(pull_request):
    """Process closed pull request asynchronously."""
    if pull_request.get('merged'):
        # Handle merged PR
        await asyncio.sleep(0.1)
        print(f"PR merged: {pull_request.get('title')}")
    else:
        # Handle closed but not merged PR
        print(f"PR closed without merge: {pull_request.get('title')}")

async def handle_issues_event(payload):
    """Handle issues event asynchronously."""
    action = payload.get('action')
    issue = payload.get('issue', {})
    
    print(f"Issue {action}: #{issue.get('number')}")
    
    # Process issue asynchronously
    await asyncio.sleep(0.1)

Performance Optimization

Connection Pooling

# utils/async_db.py
import asyncio
import asyncpg
from django.conf import settings

class AsyncDatabasePool:
    """Async database connection pool."""
    
    def __init__(self):
        self.pool = None
    
    async def initialize(self):
        """Initialize connection pool."""
        if not self.pool:
            database_url = settings.DATABASES['default']['NAME']
            self.pool = await asyncpg.create_pool(
                database_url,
                min_size=5,
                max_size=20,
                command_timeout=60
            )
    
    async def close(self):
        """Close connection pool."""
        if self.pool:
            await self.pool.close()
    
    async def fetch_one(self, query, *args):
        """Fetch single row."""
        async with self.pool.acquire() as connection:
            return await connection.fetchrow(query, *args)
    
    async def fetch_all(self, query, *args):
        """Fetch all rows."""
        async with self.pool.acquire() as connection:
            return await connection.fetch(query, *args)
    
    async def execute(self, query, *args):
        """Execute query."""
        async with self.pool.acquire() as connection:
            return await connection.execute(query, *args)

# Global pool instance
db_pool = AsyncDatabasePool()

# Usage in views
async def optimized_database_view(request):
    """View using optimized database connections."""
    await db_pool.initialize()
    
    # Fetch data using connection pool
    posts = await db_pool.fetch_all(
        "SELECT id, title, created_at FROM blog_post WHERE published = $1 ORDER BY created_at DESC LIMIT $2",
        True, 10
    )
    
    return JsonResponse({
        'posts': [
            {
                'id': post['id'],
                'title': post['title'],
                'created_at': post['created_at'].isoformat()
            }
            for post in posts
        ]
    })

Caching with Async Views

# views/cached.py
import asyncio
import json
from django.core.cache import cache
from django.http import JsonResponse
from asgiref.sync import sync_to_async

# Async cache operations
async_cache_get = sync_to_async(cache.get)
async_cache_set = sync_to_async(cache.set)
async_cache_delete = sync_to_async(cache.delete)

async def cached_async_view(request):
    """Async view with caching."""
    cache_key = f"async_view_data_{request.GET.get('category', 'all')}"
    
    # Try to get from cache
    cached_data = await async_cache_get(cache_key)
    if cached_data:
        return JsonResponse(cached_data)
    
    # Generate data asynchronously
    data = await generate_expensive_data(request.GET.get('category'))
    
    # Cache the result
    await async_cache_set(cache_key, data, 300)  # 5 minutes
    
    return JsonResponse(data)

async def generate_expensive_data(category=None):
    """Generate expensive data asynchronously."""
    # Simulate expensive operations
    await asyncio.sleep(1)
    
    return {
        'category': category or 'all',
        'data': [f'item_{i}' for i in range(100)],
        'generated_at': asyncio.get_event_loop().time()
    }

async def cache_warming_view(request):
    """Warm cache asynchronously."""
    categories = ['tech', 'science', 'business', 'sports']
    
    # Warm cache for all categories concurrently
    tasks = []
    for category in categories:
        cache_key = f"async_view_data_{category}"
        task = warm_cache_for_category(cache_key, category)
        tasks.append(task)
    
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    successful = sum(1 for r in results if not isinstance(r, Exception))
    failed = len(results) - successful
    
    return JsonResponse({
        'message': 'Cache warming completed',
        'successful': successful,
        'failed': failed
    })

async def warm_cache_for_category(cache_key, category):
    """Warm cache for specific category."""
    data = await generate_expensive_data(category)
    await async_cache_set(cache_key, data, 300)
    return f"Warmed cache for {category}"

Async views in Django enable building high-performance applications that can handle thousands of concurrent requests efficiently. The key is understanding when to use async views (I/O-bound operations), properly handling database operations with sync_to_async, and implementing effective error handling and resource management patterns. Start with simple async views and gradually implement more sophisticated patterns as your application's concurrency requirements grow.