Class Based Views

Subclassing Generic Views

Subclassing Django's generic views allows you to customize behavior while leveraging built-in functionality. This chapter covers advanced customization techniques, method overrides, and creating reusable view hierarchies.

Subclassing Generic Views

Subclassing Django's generic views allows you to customize behavior while leveraging built-in functionality. This chapter covers advanced customization techniques, method overrides, and creating reusable view hierarchies.

Understanding Generic View Inheritance

Generic View Hierarchy

# Understanding the inheritance chain
from django.views.generic import ListView, DetailView, CreateView

# ListView inheritance chain:
# ListView -> MultipleObjectTemplateResponseMixin -> TemplateResponseMixin
#          -> BaseListView -> MultipleObjectMixin -> ContextMixin -> View

# DetailView inheritance chain:
# DetailView -> SingleObjectTemplateResponseMixin -> TemplateResponseMixin
#            -> BaseDetailView -> SingleObjectMixin -> ContextMixin -> View

# CreateView inheritance chain:
# CreateView -> SingleObjectTemplateResponseMixin -> TemplateResponseMixin
#            -> BaseCreateView -> ModelFormMixin -> FormMixin -> SingleObjectMixin
#            -> ProcessFormView -> View

class CustomListView(ListView):
    """Understanding method resolution order"""
    
    def dispatch(self, request, *args, **kwargs):
        """Called first - route to appropriate method"""
        print("1. dispatch() called")
        return super().dispatch(request, *args, **kwargs)
    
    def get_queryset(self):
        """Get the base queryset"""
        print("2. get_queryset() called")
        return super().get_queryset()
    
    def get_context_data(self, **kwargs):
        """Build template context"""
        print("3. get_context_data() called")
        return super().get_context_data(**kwargs)
    
    def get_template_names(self):
        """Determine template to use"""
        print("4. get_template_names() called")
        return super().get_template_names()
    
    def render_to_response(self, context, **response_kwargs):
        """Render the final response"""
        print("5. render_to_response() called")
        return super().render_to_response(context, **response_kwargs)

Method Override Patterns

class AdvancedPostListView(ListView):
    """Advanced list view with comprehensive customization"""
    model = Post
    template_name = 'blog/post_list.html'
    context_object_name = 'posts'
    paginate_by = 15
    
    def setup(self, request, *args, **kwargs):
        """Initialize view instance"""
        super().setup(request, *args, **kwargs)
        
        # Custom initialization
        self.search_query = request.GET.get('q', '')
        self.category_filter = request.GET.get('category')
        self.sort_order = request.GET.get('sort', '-created_at')
    
    def dispatch(self, request, *args, **kwargs):
        """Custom dispatch logic"""
        # Log request
        logger.info(f"Post list accessed by {request.user} from {request.META.get('REMOTE_ADDR')}")
        
        # Check maintenance mode
        if getattr(settings, 'MAINTENANCE_MODE', False) and not request.user.is_staff:
            return render(request, 'maintenance.html', status=503)
        
        return super().dispatch(request, *args, **kwargs)
    
    def get_queryset(self):
        """Custom queryset with filtering and searching"""
        queryset = super().get_queryset()
        
        # Base filtering
        queryset = queryset.filter(status='published').select_related(
            'author', 'category'
        ).prefetch_related('tags')
        
        # Apply search
        if self.search_query:
            queryset = queryset.filter(
                Q(title__icontains=self.search_query) |
                Q(content__icontains=self.search_query) |
                Q(tags__name__icontains=self.search_query)
            ).distinct()
        
        # Apply category filter
        if self.category_filter:
            queryset = queryset.filter(category__slug=self.category_filter)
        
        # Apply sorting
        if self.sort_order in ['-created_at', 'created_at', 'title', '-title', 'views', '-views']:
            queryset = queryset.order_by(self.sort_order)
        
        return queryset
    
    def get_context_data(self, **kwargs):
        """Enhanced context with additional data"""
        context = super().get_context_data(**kwargs)
        
        # Add search and filter context
        context.update({
            'search_query': self.search_query,
            'category_filter': self.category_filter,
            'sort_order': self.sort_order,
        })
        
        # Add sidebar data
        context.update({
            'categories': Category.objects.annotate(
                post_count=Count('posts', filter=Q(posts__status='published'))
            ).filter(post_count__gt=0),
            'popular_tags': Tag.objects.annotate(
                post_count=Count('posts', filter=Q(posts__status='published'))
            ).order_by('-post_count')[:20],
            'recent_posts': Post.objects.filter(
                status='published'
            ).order_by('-created_at')[:5],
        })
        
        # Add statistics
        context['stats'] = {
            'total_posts': Post.objects.filter(status='published').count(),
            'total_authors': User.objects.filter(posts__status='published').distinct().count(),
            'posts_this_month': Post.objects.filter(
                status='published',
                created_at__month=timezone.now().month
            ).count(),
        }
        
        return context
    
    def get_template_names(self):
        """Dynamic template selection"""
        templates = []
        
        # AJAX template
        if self.request.headers.get('X-Requested-With') == 'XMLHttpRequest':
            templates.append('blog/post_list_ajax.html')
        
        # Mobile template
        user_agent = self.request.META.get('HTTP_USER_AGENT', '').lower()
        if any(device in user_agent for device in ['mobile', 'android', 'iphone']):
            templates.append('blog/post_list_mobile.html')
        
        # Category-specific template
        if self.category_filter:
            templates.append(f'blog/category_{self.category_filter}_list.html')
        
        # Default templates
        templates.extend(super().get_template_names())
        
        return templates
    
    def render_to_response(self, context, **response_kwargs):
        """Custom response handling"""
        # Handle AJAX requests
        if self.request.headers.get('X-Requested-With') == 'XMLHttpRequest':
            posts_data = []
            for post in context['posts']:
                posts_data.append({
                    'id': post.id,
                    'title': post.title,
                    'excerpt': post.excerpt,
                    'url': post.get_absolute_url(),
                    'author': post.author.username,
                    'created_at': post.created_at.isoformat(),
                })
            
            return JsonResponse({
                'posts': posts_data,
                'has_next': context['page_obj'].has_next() if context.get('page_obj') else False,
                'current_page': context['page_obj'].number if context.get('page_obj') else 1,
            })
        
        # Add custom headers
        response = super().render_to_response(context, **response_kwargs)
        response['X-Total-Posts'] = str(context.get('paginator').count if context.get('paginator') else 0)
        
        return response

Advanced DetailView Customization

Comprehensive DetailView Subclass

class AdvancedPostDetailView(DetailView):
    """Advanced post detail view with comprehensive features"""
    model = Post
    template_name = 'blog/post_detail.html'
    context_object_name = 'post'
    
    def setup(self, request, *args, **kwargs):
        """Initialize view with custom setup"""
        super().setup(request, *args, **kwargs)
        
        # Track referrer
        self.referrer = request.META.get('HTTP_REFERER', '')
        
        # Detect bot traffic
        user_agent = request.META.get('HTTP_USER_AGENT', '').lower()
        self.is_bot = any(bot in user_agent for bot in ['bot', 'crawler', 'spider'])
    
    def get_object(self, queryset=None):
        """Enhanced object retrieval with access control"""
        obj = super().get_object(queryset)
        
        # Check if post is accessible
        if not self.can_access_post(obj):
            raise Http404("Post not found")
        
        # Track view (exclude bots and author views)
        if not self.is_bot and obj.author != self.request.user:
            self.track_view(obj)
        
        return obj
    
    def can_access_post(self, post):
        """Check if user can access this post"""
        # Published posts are always accessible
        if post.status == 'published':
            return True
        
        # Draft posts only accessible to author and staff
        if post.status == 'draft':
            return (
                post.author == self.request.user or
                self.request.user.is_staff
            )
        
        # Scheduled posts only accessible to author and staff
        if post.status == 'scheduled':
            return (
                post.author == self.request.user or
                self.request.user.is_staff
            )
        
        return False
    
    def track_view(self, post):
        """Track post view with deduplication"""
        session_key = f'viewed_post_{post.pk}'
        
        # Only count unique views per session
        if not self.request.session.get(session_key):
            # Atomic increment to avoid race conditions
            Post.objects.filter(pk=post.pk).update(views=F('views') + 1)
            
            # Mark as viewed in session
            self.request.session[session_key] = True
            
            # Create detailed view record
            PostView.objects.create(
                post=post,
                user=self.request.user if self.request.user.is_authenticated else None,
                ip_address=self.get_client_ip(),
                user_agent=self.request.META.get('HTTP_USER_AGENT', ''),
                referrer=self.referrer,
                timestamp=timezone.now()
            )
    
    def get_context_data(self, **kwargs):
        """Comprehensive context data"""
        context = super().get_context_data(**kwargs)
        post = self.object
        
        # Add comments
        context['comments'] = self.get_comments()
        
        # Add comment form for authenticated users
        if self.request.user.is_authenticated:
            context['comment_form'] = CommentForm()
        
        # Add related content
        context.update({
            'related_posts': self.get_related_posts(post),
            'previous_post': self.get_previous_post(post),
            'next_post': self.get_next_post(post),
        })
        
        # Add metadata
        context.update({
            'reading_time': self.calculate_reading_time(post.content),
            'word_count': len(post.content.split()),
            'share_data': self.get_share_data(post),
        })
        
        # Add user-specific data
        if self.request.user.is_authenticated:
            context.update({
                'user_has_liked': post.likes.filter(user=self.request.user).exists(),
                'user_has_bookmarked': post.bookmarks.filter(user=self.request.user).exists(),
            })
        
        # Add edit permissions
        context['can_edit'] = self.can_edit_post(post)
        
        return context
    
    def get_comments(self):
        """Get comments with proper filtering"""
        comments = self.object.comments.select_related('author')
        
        # Staff can see all comments
        if self.request.user.is_staff:
            return comments.order_by('created_at')
        
        # Regular users see only approved comments
        return comments.filter(approved=True).order_by('created_at')
    
    def get_related_posts(self, post):
        """Get related posts using multiple strategies"""
        related_posts = Post.objects.filter(
            status='published'
        ).exclude(pk=post.pk)
        
        # Strategy 1: Same category
        category_posts = related_posts.filter(category=post.category)[:3]
        
        # Strategy 2: Similar tags
        if post.tags.exists():
            tag_posts = related_posts.filter(
                tags__in=post.tags.all()
            ).annotate(
                tag_count=Count('tags')
            ).order_by('-tag_count')[:3]
        else:
            tag_posts = []
        
        # Strategy 3: Same author
        author_posts = related_posts.filter(author=post.author)[:2]
        
        # Combine and deduplicate
        related_ids = set()
        final_related = []
        
        for post_list in [category_posts, tag_posts, author_posts]:
            for related_post in post_list:
                if related_post.id not in related_ids and len(final_related) < 6:
                    related_ids.add(related_post.id)
                    final_related.append(related_post)
        
        return final_related
    
    def get_previous_post(self, post):
        """Get chronologically previous post"""
        return Post.objects.filter(
            created_at__lt=post.created_at,
            status='published'
        ).order_by('-created_at').first()
    
    def get_next_post(self, post):
        """Get chronologically next post"""
        return Post.objects.filter(
            created_at__gt=post.created_at,
            status='published'
        ).order_by('created_at').first()
    
    def calculate_reading_time(self, content):
        """Calculate estimated reading time"""
        words_per_minute = 200
        word_count = len(content.split())
        return max(1, round(word_count / words_per_minute))
    
    def get_share_data(self, post):
        """Get social sharing data"""
        return {
            'title': post.title,
            'description': post.excerpt or post.content[:160],
            'url': self.request.build_absolute_uri(post.get_absolute_url()),
            'image': post.featured_image.url if post.featured_image else None,
        }
    
    def can_edit_post(self, post):
        """Check if user can edit this post"""
        return (
            post.author == self.request.user or
            self.request.user.is_staff
        )
    
    def get_client_ip(self):
        """Get client IP address"""
        x_forwarded_for = self.request.META.get('HTTP_X_FORWARDED_FOR')
        if x_forwarded_for:
            return x_forwarded_for.split(',')[0]
        return self.request.META.get('REMOTE_ADDR')

Custom Form View Subclasses

Advanced CreateView Customization

class AdvancedPostCreateView(CreateView):
    """Advanced post creation with comprehensive features"""
    model = Post
    form_class = PostForm
    template_name = 'blog/post_create.html'
    
    def setup(self, request, *args, **kwargs):
        """Setup with user validation"""
        super().setup(request, *args, **kwargs)
        
        # Check if user can create posts
        if not self.can_create_posts():
            raise PermissionDenied("You don't have permission to create posts")
    
    def can_create_posts(self):
        """Check if user can create posts"""
        user = self.request.user
        
        # Must be authenticated
        if not user.is_authenticated:
            return False
        
        # Staff can always create
        if user.is_staff:
            return True
        
        # Check user permissions
        if not user.has_perm('blog.add_post'):
            return False
        
        # Check rate limiting
        recent_posts = Post.objects.filter(
            author=user,
            created_at__gte=timezone.now() - timedelta(hours=24)
        ).count()
        
        # Limit to 5 posts per day for regular users
        if recent_posts >= 5 and not user.is_staff:
            return False
        
        return True
    
    def get_form_kwargs(self):
        """Pass additional data to form"""
        kwargs = super().get_form_kwargs()
        
        # Pass user for form customization
        kwargs['user'] = self.request.user
        
        # Pass request for additional context
        kwargs['request'] = self.request
        
        return kwargs
    
    def get_context_data(self, **kwargs):
        """Enhanced context for creation"""
        context = super().get_context_data(**kwargs)
        
        # Add helper data
        context.update({
            'categories': Category.objects.filter(active=True),
            'popular_tags': Tag.objects.annotate(
                usage_count=Count('posts')
            ).order_by('-usage_count')[:20],
            'user_drafts': Post.objects.filter(
                author=self.request.user,
                status='draft'
            ).count(),
            'writing_tips': self.get_writing_tips(),
        })
        
        return context
    
    def get_writing_tips(self):
        """Get contextual writing tips"""
        return [
            "Use clear, descriptive titles",
            "Break up long paragraphs for readability",
            "Include relevant tags to help readers find your content",
            "Add a compelling excerpt to summarize your post",
        ]
    
    def form_valid(self, form):
        """Enhanced form processing"""
        # Set author
        form.instance.author = self.request.user
        
        # Handle different submission types
        if 'save_draft' in self.request.POST:
            form.instance.status = 'draft'
            success_message = "Post saved as draft"
        elif 'schedule' in self.request.POST:
            form.instance.status = 'scheduled'
            form.instance.scheduled_date = form.cleaned_data.get('scheduled_date')
            success_message = "Post scheduled for publication"
        else:
            form.instance.status = 'published'
            form.instance.published_at = timezone.now()
            success_message = "Post published successfully!"
        
        # Process uploaded files
        self.process_attachments(form)
        
        # Save the post
        response = super().form_valid(form)
        
        # Send notifications
        if form.instance.status == 'published':
            self.send_publication_notifications()
        
        # Add success message
        messages.success(self.request, success_message)
        
        return response
    
    def process_attachments(self, form):
        """Process file attachments"""
        attachments = self.request.FILES.getlist('attachments')
        
        for attachment in attachments:
            if self.validate_attachment(attachment):
                PostAttachment.objects.create(
                    post=form.instance,
                    file=attachment,
                    uploaded_by=self.request.user
                )
    
    def validate_attachment(self, file):
        """Validate uploaded attachment"""
        # Size check (10MB)
        if file.size > 10 * 1024 * 1024:
            messages.error(self.request, f'File {file.name} is too large')
            return False
        
        # Type check
        allowed_types = ['image/jpeg', 'image/png', 'application/pdf']
        if file.content_type not in allowed_types:
            messages.error(self.request, f'File type {file.content_type} not allowed')
            return False
        
        return True
    
    def send_publication_notifications(self):
        """Send notifications when post is published"""
        # Notify followers
        followers = self.request.user.followers.all()
        for follower in followers:
            send_notification.delay(
                user_id=follower.id,
                message=f'{self.request.user.username} published a new post: {self.object.title}',
                post_id=self.object.id
            )
    
    def get_success_url(self):
        """Dynamic success URL based on action"""
        if self.object.status == 'draft':
            return reverse('blog:post_edit', kwargs={'pk': self.object.pk})
        else:
            return self.object.get_absolute_url()

Creating Reusable View Base Classes

Abstract Base Views

class BaseContentView(View):
    """Abstract base view for content management"""
    
    # Subclasses must define these
    model = None
    form_class = None
    
    def setup(self, request, *args, **kwargs):
        """Common setup for all content views"""
        super().setup(request, *args, **kwargs)
        
        # Security checks
        self.perform_security_checks()
        
        # Initialize common attributes
        self.user_permissions = self.get_user_permissions()
    
    def perform_security_checks(self):
        """Common security validations"""
        # Rate limiting
        if self.is_rate_limited():
            raise PermissionDenied("Rate limit exceeded")
        
        # IP blocking
        if self.is_ip_blocked():
            raise PermissionDenied("Access denied")
    
    def get_user_permissions(self):
        """Get user permissions for this content type"""
        if not self.request.user.is_authenticated:
            return set()
        
        model_name = self.model._meta.model_name
        app_label = self.model._meta.app_label
        
        return {
            f'{app_label}.add_{model_name}': self.request.user.has_perm(f'{app_label}.add_{model_name}'),
            f'{app_label}.change_{model_name}': self.request.user.has_perm(f'{app_label}.change_{model_name}'),
            f'{app_label}.delete_{model_name}': self.request.user.has_perm(f'{app_label}.delete_{model_name}'),
            f'{app_label}.view_{model_name}': self.request.user.has_perm(f'{app_label}.view_{model_name}'),
        }
    
    def get_context_data(self, **kwargs):
        """Common context for all content views"""
        context = super().get_context_data(**kwargs)
        
        context.update({
            'user_permissions': self.user_permissions,
            'content_stats': self.get_content_stats(),
            'breadcrumbs': self.get_breadcrumbs(),
        })
        
        return context
    
    def get_content_stats(self):
        """Get content statistics"""
        if self.request.user.is_authenticated:
            return {
                'user_content_count': self.model.objects.filter(
                    author=self.request.user
                ).count(),
                'total_content_count': self.model.objects.count(),
            }
        return {}
    
    def get_breadcrumbs(self):
        """Generate breadcrumb navigation"""
        breadcrumbs = [
            {'name': 'Home', 'url': reverse('home')},
        ]
        
        # Add model-specific breadcrumbs
        model_name = self.model._meta.verbose_name_plural
        breadcrumbs.append({
            'name': model_name.title(),
            'url': reverse(f'{self.model._meta.app_label}:{self.model._meta.model_name}_list')
        })
        
        return breadcrumbs

class BaseListView(BaseContentView, ListView):
    """Base list view with common functionality"""
    paginate_by = 20
    
    def get_queryset(self):
        """Enhanced queryset with common optimizations"""
        queryset = super().get_queryset()
        
        # Apply common select_related and prefetch_related
        if hasattr(self.model, 'author'):
            queryset = queryset.select_related('author')
        
        if hasattr(self.model, 'category'):
            queryset = queryset.select_related('category')
        
        # Apply search if provided
        search_query = self.request.GET.get('q')
        if search_query and hasattr(self, 'search_fields'):
            search_filter = Q()
            for field in self.search_fields:
                search_filter |= Q(**{f'{field}__icontains': search_query})
            queryset = queryset.filter(search_filter)
        
        return queryset

class BaseDetailView(BaseContentView, DetailView):
    """Base detail view with common functionality"""
    
    def get_object(self, queryset=None):
        """Enhanced object retrieval"""
        obj = super().get_object(queryset)
        
        # Check view permissions
        if not self.can_view_object(obj):
            raise Http404("Object not found")
        
        return obj
    
    def can_view_object(self, obj):
        """Check if user can view this object"""
        # Public objects are always viewable
        if getattr(obj, 'is_public', True):
            return True
        
        # Private objects require ownership or staff status
        if hasattr(obj, 'author'):
            return (
                obj.author == self.request.user or
                self.request.user.is_staff
            )
        
        return True

class BaseCreateView(BaseContentView, CreateView):
    """Base create view with common functionality"""
    
    def form_valid(self, form):
        """Enhanced form processing"""
        # Set common fields
        if hasattr(form.instance, 'author'):
            form.instance.author = self.request.user
        
        if hasattr(form.instance, 'created_by'):
            form.instance.created_by = self.request.user
        
        # Set timestamps
        form.instance.created_at = timezone.now()
        
        return super().form_valid(form)

# Usage of base classes
class PostListView(BaseListView):
    model = Post
    search_fields = ['title', 'content']

class PostDetailView(BaseDetailView):
    model = Post

class PostCreateView(BaseCreateView):
    model = Post
    form_class = PostForm

Performance Optimization in Subclasses

Optimized Generic Views

class OptimizedListView(ListView):
    """Performance-optimized list view"""
    
    def get_queryset(self):
        """Optimized queryset with caching"""
        cache_key = self.get_cache_key()
        queryset = cache.get(cache_key)
        
        if queryset is None:
            queryset = super().get_queryset()
            
            # Apply optimizations
            queryset = self.optimize_queryset(queryset)
            
            # Cache the queryset
            cache.set(cache_key, queryset, 300)  # 5 minutes
        
        return queryset
    
    def optimize_queryset(self, queryset):
        """Apply performance optimizations"""
        # Select related fields
        if hasattr(self.model, 'author'):
            queryset = queryset.select_related('author')
        
        if hasattr(self.model, 'category'):
            queryset = queryset.select_related('category')
        
        # Prefetch related fields
        if hasattr(self.model, 'tags'):
            queryset = queryset.prefetch_related('tags')
        
        # Only select needed fields
        if hasattr(self, 'list_fields'):
            queryset = queryset.only(*self.list_fields)
        
        return queryset
    
    def get_cache_key(self):
        """Generate cache key for this view"""
        key_parts = [
            self.__class__.__name__,
            self.request.path,
            self.request.GET.urlencode(),
        ]
        
        if self.request.user.is_authenticated:
            key_parts.append(str(self.request.user.id))
        
        return ':'.join(key_parts)

class OptimizedDetailView(DetailView):
    """Performance-optimized detail view"""
    
    def get_object(self, queryset=None):
        """Cached object retrieval"""
        pk = self.kwargs.get(self.pk_url_kwarg)
        cache_key = f'{self.model._meta.label_lower}:{pk}'
        
        obj = cache.get(cache_key)
        if obj is None:
            obj = super().get_object(queryset)
            cache.set(cache_key, obj, 600)  # 10 minutes
        
        return obj
    
    def get_context_data(self, **kwargs):
        """Cached context data"""
        context = super().get_context_data(**kwargs)
        
        # Cache expensive context calculations
        cache_key = f'{self.model._meta.label_lower}:{self.object.pk}:context'
        cached_context = cache.get(cache_key)
        
        if cached_context is None:
            cached_context = self.get_expensive_context_data()
            cache.set(cache_key, cached_context, 300)  # 5 minutes
        
        context.update(cached_context)
        return context
    
    def get_expensive_context_data(self):
        """Calculate expensive context data"""
        return {
            'related_objects': self.get_related_objects(),
            'statistics': self.calculate_statistics(),
        }

Subclassing generic views provides powerful customization capabilities while maintaining the benefits of Django's built-in functionality. Understanding method override patterns, inheritance hierarchies, and performance considerations enables you to create sophisticated, maintainable view hierarchies that handle complex application requirements efficiently.

Asynchronous View Subclassing

Django's async support allows you to subclass generic views with asynchronous operations for improved performance with I/O-bound tasks.

Basic Async View Patterns

# Basic async view structure
class AsyncView(View):
    async def get(self, request):
        # Async operations here
        await asyncio.sleep(1)  # Simulated async operation
        return JsonResponse({'message': 'Async response'})

# Converting sync to async
class AsyncAPIView(View):
    async def get(self, request):
        async with aiohttp.ClientSession() as session:
            async with session.get('https://api.example.com/data') as response:
                data = await response.json()
                return JsonResponse(data)

Async Generic View Subclasses

class AsyncArticleListView(ListView):
    """Async list view with external data fetching"""
    model = Article
    template_name = 'articles/list.html'
    
    async def get(self, request, *args, **kwargs):
        # Perform async operations before rendering
        await self.fetch_external_data()
        return await super().aget(request, *args, **kwargs)
    
    async def fetch_external_data(self):
        # Simulate fetching data from external API
        await asyncio.sleep(0.5)

class AsyncProductDetailView(DetailView):
    """Async detail view with concurrent data fetching"""
    model = Product
    template_name = 'products/detail.html'
    
    async def get_object(self, queryset=None):
        obj = await super().aget_object(queryset)
        
        # Fetch additional data concurrently
        tasks = [
            self.fetch_reviews(obj.id),
            self.fetch_related_products(obj.category)
        ]
        obj.reviews, obj.related_products = await asyncio.gather(*tasks)
        
        return obj
    
    async def fetch_reviews(self, product_id):
        async with aiohttp.ClientSession() as session:
            url = f'https://api.reviews.com/products/{product_id}/reviews'
            async with session.get(url) as response:
                return await response.json() if response.status == 200 else []

Async Form Handling

class AsyncOrderCreateView(CreateView):
    """Async form processing with concurrent operations"""
    model = Order
    form_class = OrderForm
    
    async def form_valid(self, form):
        # Save form asynchronously
        self.object = await self.asave_form(form)
        
        # Process order with concurrent tasks
        await self.process_order_async(self.object)
        
        return JsonResponse({
            'success': True,
            'order_id': self.object.id,
            'redirect_url': str(self.get_success_url())
        })
    
    async def asave_form(self, form):
        instance = form.save(commit=False)
        instance.status = 'processing'
        await instance.asave()
        return instance
    
    async def process_order_async(self, order):
        # Concurrent processing
        tasks = [
            self.process_payment(order),
            self.update_inventory(order),
            self.send_confirmation_email(order)
        ]
        await asyncio.gather(*tasks)

Async Mixins for Reusability

class AsyncCacheMixin:
    """Async caching functionality"""
    cache_timeout = 300
    
    async def get_cache_key(self):
        return f"{self.__class__.__name__}:{self.request.path}:{hash(str(self.request.GET))}"
    
    async def dispatch(self, request, *args, **kwargs):
        cache_key = await self.get_cache_key()
        cached_response = cache.get(cache_key)
        
        if cached_response:
            return cached_response
        
        response = await super().dispatch(request, *args, **kwargs)
        cache.set(cache_key, response, self.cache_timeout)
        return response

class AsyncRateLimitMixin:
    """Async rate limiting"""
    rate_limit = 60  # requests per minute
    
    async def check_rate_limit(self):
        client_ip = self.get_client_ip()
        cache_key = f"rate_limit:{client_ip}"
        
        current_requests = cache.get(cache_key, [])
        now = time.time()
        
        # Filter recent requests
        current_requests = [req_time for req_time in current_requests 
                          if now - req_time < 60]
        
        if len(current_requests) >= self.rate_limit:
            return False
        
        current_requests.append(now)
        cache.set(cache_key, current_requests, 60)
        return True
    
    async def dispatch(self, request, *args, **kwargs):
        if not await self.check_rate_limit():
            return HttpResponseTooManyRequests("Rate limit exceeded")
        return await super().dispatch(request, *args, **kwargs)

Async Database Operations

class AsyncAuthorListView(ListView):
    """Async database operations"""
    model = Author
    
    async def get_queryset(self):
        queryset = Author.objects.select_related('publisher').prefetch_related('books')
        
        authors = []
        async for author in queryset:
            authors.append(author)
        
        return authors
    
    async def get_context_data(self, **kwargs):
        context = await super().aget_context_data(**kwargs)
        
        # Add async computed data
        context['total_books'] = await Book.objects.filter(published=True).acount()
        
        return context

Error Handling and Performance

class AsyncTimeoutView(View):
    """Async view with timeout handling"""
    timeout = 30
    
    async def get(self, request):
        try:
            result = await asyncio.wait_for(
                self.fetch_data(),
                timeout=self.timeout
            )
            return JsonResponse(result)
        
        except asyncio.TimeoutError:
            return JsonResponse({'error': 'Request timeout'}, status=408)
        except Exception as e:
            return HttpResponseServerError(f'Server error: {str(e)}')

class ConcurrentProcessingView(View):
    """Concurrent data processing"""
    async def get(self, request):
        # Process multiple operations concurrently
        tasks = [
            self.fetch_user_data(request.user.id),
            self.fetch_notifications(request.user.id),
            self.fetch_recent_activity(request.user.id)
        ]
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Handle results and exceptions
        response_data = {}
        for i, (key, result) in enumerate(zip(['user', 'notifications', 'activity'], results)):
            if not isinstance(result, Exception):
                response_data[key] = result
        
        return JsonResponse(response_data)

Asynchronous view subclassing enables high-performance Django applications by leveraging async/await patterns for I/O-bound operations, concurrent processing, and improved scalability while maintaining the structure and benefits of Django's generic views.