Asynchronous Django

Django's Tasks Framework

Django's Tasks framework provides a built-in solution for handling background tasks without requiring external dependencies like Celery or RQ. This framework enables you to offload time-consuming operations from request-response cycles, improving application performance and user experience.

Django's Tasks Framework

Django's Tasks framework provides a built-in solution for handling background tasks without requiring external dependencies like Celery or RQ. This framework enables you to offload time-consuming operations from request-response cycles, improving application performance and user experience.

Background Task Fundamentals

Understanding Background Tasks

Background tasks are operations that run outside the normal request-response cycle. They're essential for:

  • Long-running computations
  • Email sending
  • File processing
  • API calls to external services
  • Data synchronization
  • Periodic maintenance tasks

Django Tasks vs External Solutions

# Traditional approach with external task queue (Celery)
from celery import shared_task

@shared_task
def send_email_celery(user_id, subject, message):
    """Send email using Celery."""
    user = User.objects.get(id=user_id)
    send_mail(subject, message, 'from@example.com', [user.email])

# Django Tasks approach (built-in)
from django.tasks import task

@task
def send_email_django_task(user_id, subject, message):
    """Send email using Django Tasks."""
    user = User.objects.get(id=user_id)
    send_mail(subject, message, 'from@example.com', [user.email])

Task Lifecycle

# Task lifecycle demonstration
from django.tasks import task, get_task_backend
from django.http import JsonResponse
import time

@task
def long_running_task(duration, task_name):
    """Demonstrate task lifecycle."""
    print(f"Task {task_name} started")
    
    for i in range(duration):
        time.sleep(1)
        print(f"Task {task_name} progress: {i+1}/{duration}")
    
    result = f"Task {task_name} completed after {duration} seconds"
    print(result)
    return result

def start_task_view(request):
    """Start a background task."""
    # Enqueue the task
    task_result = long_running_task.delay(5, "example-task")
    
    return JsonResponse({
        'task_id': task_result.id,
        'status': 'started',
        'message': 'Task has been queued'
    })

def check_task_status_view(request, task_id):
    """Check task status."""
    backend = get_task_backend()
    task_result = backend.get_result(task_id)
    
    return JsonResponse({
        'task_id': task_id,
        'status': task_result.status,
        'result': task_result.result if task_result.ready() else None,
        'progress': task_result.progress if hasattr(task_result, 'progress') else None
    })

Configuring a Task Backend

Backend Configuration

# settings.py - Task backend configuration

# Basic configuration
TASKS = {
    'default': {
        'BACKEND': 'django.tasks.backends.database.DatabaseBackend',
        'OPTIONS': {
            'database_alias': 'default',
            'table_name': 'django_tasks',
        }
    }
}

# Redis backend configuration
TASKS = {
    'default': {
        'BACKEND': 'django.tasks.backends.redis.RedisBackend',
        'OPTIONS': {
            'connection': {
                'host': 'localhost',
                'port': 6379,
                'db': 0,
            },
            'key_prefix': 'django_tasks:',
        }
    }
}

# Memory backend (development only)
TASKS = {
    'default': {
        'BACKEND': 'django.tasks.backends.memory.MemoryBackend',
        'OPTIONS': {
            'max_tasks': 1000,
        }
    }
}

# Multiple backends
TASKS = {
    'default': {
        'BACKEND': 'django.tasks.backends.database.DatabaseBackend',
    },
    'redis': {
        'BACKEND': 'django.tasks.backends.redis.RedisBackend',
        'OPTIONS': {
            'connection': {
                'host': 'redis.example.com',
                'port': 6379,
                'db': 1,
            }
        }
    },
    'priority': {
        'BACKEND': 'django.tasks.backends.database.DatabaseBackend',
        'OPTIONS': {
            'table_name': 'priority_tasks',
        }
    }
}

# Task routing
TASK_ROUTES = {
    'myapp.tasks.send_email': {'backend': 'default'},
    'myapp.tasks.process_image': {'backend': 'redis'},
    'myapp.tasks.urgent_notification': {'backend': 'priority'},
}

# Worker configuration
TASK_WORKERS = {
    'default': {
        'concurrency': 4,
        'max_tasks_per_worker': 100,
        'timeout': 300,  # 5 minutes
    },
    'redis': {
        'concurrency': 8,
        'max_tasks_per_worker': 50,
        'timeout': 600,  # 10 minutes
    }
}

# Task serialization
TASK_SERIALIZER = 'json'  # json, pickle, msgpack

# Task result expiration
TASK_RESULT_EXPIRES = 3600  # 1 hour

# Task retry configuration
TASK_DEFAULT_RETRY_DELAY = 60  # 1 minute
TASK_MAX_RETRIES = 3

# Monitoring and logging
TASK_SEND_EVENTS = True
TASK_TRACK_STARTED = True

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'handlers': {
        'task_file': {
            'level': 'INFO',
            'class': 'logging.FileHandler',
            'filename': 'tasks.log',
        },
    },
    'loggers': {
        'django.tasks': {
            'handlers': ['task_file'],
            'level': 'INFO',
            'propagate': True,
        },
    },
}

Database Backend Setup

# models.py - Custom task model for database backend
from django.db import models
from django.tasks.backends.database.models import TaskResult

class CustomTaskResult(TaskResult):
    """Extended task result model."""
    
    priority = models.IntegerField(default=0)
    category = models.CharField(max_length=50, blank=True)
    created_by = models.ForeignKey(
        'auth.User', 
        on_delete=models.SET_NULL, 
        null=True, 
        blank=True
    )
    
    class Meta:
        db_table = 'custom_task_results'
        indexes = [
            models.Index(fields=['priority', 'status']),
            models.Index(fields=['category', 'created_at']),
        ]

# Custom backend using extended model
class CustomDatabaseBackend(DatabaseBackend):
    """Custom database backend with extended functionality."""
    
    def __init__(self, **options):
        super().__init__(**options)
        self.model = CustomTaskResult
    
    def store_result(self, task_id, result, status, **kwargs):
        """Store task result with additional metadata."""
        return super().store_result(
            task_id, 
            result, 
            status,
            priority=kwargs.get('priority', 0),
            category=kwargs.get('category', ''),
            created_by_id=kwargs.get('user_id')
        )

# settings.py - Use custom backend
TASKS = {
    'default': {
        'BACKEND': 'myapp.backends.CustomDatabaseBackend',
        'OPTIONS': {
            'database_alias': 'default',
        }
    }
}

Redis Backend Configuration

# Redis backend with advanced configuration
import redis
from django.conf import settings

# Redis connection pool
REDIS_POOL = redis.ConnectionPool(
    host=settings.REDIS_HOST,
    port=settings.REDIS_PORT,
    db=settings.REDIS_DB,
    max_connections=20,
    retry_on_timeout=True,
    socket_keepalive=True,
    socket_keepalive_options={},
)

TASKS = {
    'default': {
        'BACKEND': 'django.tasks.backends.redis.RedisBackend',
        'OPTIONS': {
            'connection_pool': REDIS_POOL,
            'key_prefix': 'tasks:',
            'result_expires': 3600,
            'compression': 'gzip',  # None, gzip, lz4
        }
    }
}

# Redis Sentinel configuration for high availability
REDIS_SENTINELS = [
    ('sentinel1.example.com', 26379),
    ('sentinel2.example.com', 26379),
    ('sentinel3.example.com', 26379),
]

TASKS = {
    'default': {
        'BACKEND': 'django.tasks.backends.redis.RedisSentinelBackend',
        'OPTIONS': {
            'sentinels': REDIS_SENTINELS,
            'service_name': 'mymaster',
            'key_prefix': 'tasks:',
        }
    }
}

# Redis Cluster configuration
REDIS_CLUSTER_NODES = [
    {'host': 'redis-node1.example.com', 'port': 7000},
    {'host': 'redis-node2.example.com', 'port': 7000},
    {'host': 'redis-node3.example.com', 'port': 7000},
]

TASKS = {
    'default': {
        'BACKEND': 'django.tasks.backends.redis.RedisClusterBackend',
        'OPTIONS': {
            'startup_nodes': REDIS_CLUSTER_NODES,
            'key_prefix': 'tasks:',
        }
    }
}

Defining Tasks

Basic Task Definition

# tasks.py - Basic task definitions
from django.tasks import task
from django.core.mail import send_mail
from django.contrib.auth.models import User
from .models import Post, UserProfile
import requests
import time

@task
def simple_task(message):
    """Simple task example."""
    print(f"Processing: {message}")
    time.sleep(2)
    return f"Completed: {message}"

@task
def send_welcome_email(user_id):
    """Send welcome email to user."""
    try:
        user = User.objects.get(id=user_id)
        
        subject = "Welcome to our platform!"
        message = f"Hello {user.first_name or user.username}, welcome to our platform!"
        
        send_mail(
            subject=subject,
            message=message,
            from_email='noreply@example.com',
            recipient_list=[user.email],
            fail_silently=False
        )
        
        return f"Welcome email sent to {user.email}"
    
    except User.DoesNotExist:
        return f"User with ID {user_id} not found"
    except Exception as e:
        raise Exception(f"Failed to send email: {str(e)}")

@task
def process_user_data(user_id, data_updates):
    """Process user data updates."""
    try:
        user = User.objects.get(id=user_id)
        
        # Update user fields
        for field, value in data_updates.items():
            if hasattr(user, field):
                setattr(user, field, value)
        
        user.save()
        
        # Update or create profile
        profile, created = UserProfile.objects.get_or_create(user=user)
        profile_updates = data_updates.get('profile', {})
        
        for field, value in profile_updates.items():
            if hasattr(profile, field):
                setattr(profile, field, value)
        
        profile.save()
        
        return {
            'user_updated': True,
            'profile_created': created,
            'updated_fields': list(data_updates.keys())
        }
    
    except Exception as e:
        raise Exception(f"Failed to process user data: {str(e)}")

@task
def fetch_external_data(api_url, params=None):
    """Fetch data from external API."""
    try:
        response = requests.get(api_url, params=params, timeout=30)
        response.raise_for_status()
        
        data = response.json()
        
        # Process and store data
        processed_count = 0
        for item in data.get('items', []):
            # Process each item
            processed_count += 1
        
        return {
            'success': True,
            'items_processed': processed_count,
            'total_items': len(data.get('items', []))
        }
    
    except requests.RequestException as e:
        raise Exception(f"API request failed: {str(e)}")
    except Exception as e:
        raise Exception(f"Data processing failed: {str(e)}")

Advanced Task Patterns

# Advanced task definitions with options
from django.tasks import task
from django.tasks.exceptions import Retry, Ignore
from datetime import datetime, timedelta
import logging

logger = logging.getLogger(__name__)

@task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,
    rate_limit='10/m',  # 10 tasks per minute
    time_limit=300,     # 5 minutes
    soft_time_limit=240 # 4 minutes
)
def robust_email_task(self, user_id, template_name, context=None):
    """Robust email sending task with retry logic."""
    try:
        user = User.objects.get(id=user_id)
        
        # Render email template
        from django.template.loader import render_to_string
        
        html_content = render_to_string(
            f'emails/{template_name}.html',
            context or {}
        )
        
        # Send email
        from django.core.mail import EmailMessage
        
        email = EmailMessage(
            subject=context.get('subject', 'Notification'),
            body=html_content,
            from_email='noreply@example.com',
            to=[user.email]
        )
        email.content_subtype = 'html'
        email.send()
        
        logger.info(f"Email sent successfully to {user.email}")
        return f"Email sent to {user.email}"
    
    except User.DoesNotExist:
        logger.error(f"User {user_id} not found")
        raise Ignore(f"User {user_id} not found")
    
    except Exception as exc:
        logger.error(f"Email sending failed: {exc}")
        
        # Retry with exponential backoff
        retry_delay = 60 * (2 ** self.request.retries)
        
        raise self.retry(
            exc=exc,
            countdown=retry_delay,
            max_retries=3
        )

@task(bind=True, queue='high_priority')
def urgent_notification_task(self, user_ids, message, notification_type='urgent'):
    """Send urgent notifications to multiple users."""
    successful_sends = 0
    failed_sends = 0
    
    for user_id in user_ids:
        try:
            user = User.objects.get(id=user_id)
            
            # Send notification via multiple channels
            channels_sent = []
            
            # Email notification
            if user.email:
                send_mail(
                    subject=f'Urgent: {notification_type}',
                    message=message,
                    from_email='urgent@example.com',
                    recipient_list=[user.email]
                )
                channels_sent.append('email')
            
            # SMS notification (if configured)
            if hasattr(user, 'profile') and user.profile.phone_number:
                # send_sms(user.profile.phone_number, message)
                channels_sent.append('sms')
            
            # Push notification
            # send_push_notification(user, message)
            channels_sent.append('push')
            
            successful_sends += 1
            logger.info(f"Urgent notification sent to user {user_id} via {channels_sent}")
        
        except User.DoesNotExist:
            failed_sends += 1
            logger.warning(f"User {user_id} not found for urgent notification")
        
        except Exception as e:
            failed_sends += 1
            logger.error(f"Failed to send urgent notification to user {user_id}: {e}")
    
    return {
        'successful_sends': successful_sends,
        'failed_sends': failed_sends,
        'total_users': len(user_ids)
    }

@task(bind=True)
def batch_process_posts(self, post_ids, operation, **kwargs):
    """Batch process multiple posts."""
    results = {
        'successful': [],
        'failed': [],
        'total': len(post_ids)
    }
    
    for i, post_id in enumerate(post_ids):
        try:
            # Update task progress
            self.update_state(
                state='PROGRESS',
                meta={
                    'current': i + 1,
                    'total': len(post_ids),
                    'status': f'Processing post {post_id}'
                }
            )
            
            post = Post.objects.get(id=post_id)
            
            if operation == 'publish':
                post.published = True
                post.published_at = datetime.now()
            elif operation == 'unpublish':
                post.published = False
            elif operation == 'feature':
                post.featured = True
            elif operation == 'update_category':
                category_id = kwargs.get('category_id')
                if category_id:
                    post.category_id = category_id
            
            post.save()
            results['successful'].append(post_id)
        
        except Post.DoesNotExist:
            results['failed'].append({
                'post_id': post_id,
                'error': 'Post not found'
            })
        
        except Exception as e:
            results['failed'].append({
                'post_id': post_id,
                'error': str(e)
            })
    
    return results

@task(bind=True, time_limit=3600)  # 1 hour limit
def generate_report(self, report_type, date_range, user_id):
    """Generate comprehensive report."""
    try:
        user = User.objects.get(id=user_id)
        
        # Update progress
        self.update_state(
            state='PROGRESS',
            meta={'status': 'Initializing report generation'}
        )
        
        from .reports import ReportGenerator
        generator = ReportGenerator(report_type, date_range)
        
        # Generate report sections
        sections = ['summary', 'details', 'charts', 'export']
        
        for i, section in enumerate(sections):
            self.update_state(
                state='PROGRESS',
                meta={
                    'current': i + 1,
                    'total': len(sections),
                    'status': f'Generating {section} section'
                }
            )
            
            generator.generate_section(section)
        
        # Finalize report
        report_path = generator.finalize()
        
        # Send report to user
        send_mail(
            subject=f'Report Ready: {report_type}',
            message=f'Your {report_type} report is ready for download.',
            from_email='reports@example.com',
            recipient_list=[user.email]
        )
        
        return {
            'report_path': report_path,
            'report_type': report_type,
            'generated_for': user.email,
            'sections_generated': len(sections)
        }
    
    except Exception as e:
        logger.error(f"Report generation failed: {e}")
        raise

Task Classes

# Class-based tasks for complex operations
from django.tasks import Task
from django.db import transaction

class DataMigrationTask(Task):
    """Class-based task for data migration."""
    
    name = 'data_migration'
    max_retries = 1
    time_limit = 7200  # 2 hours
    
    def run(self, migration_type, batch_size=1000, **kwargs):
        """Run data migration."""
        self.migration_type = migration_type
        self.batch_size = batch_size
        
        if migration_type == 'user_profiles':
            return self.migrate_user_profiles()
        elif migration_type == 'post_categories':
            return self.migrate_post_categories()
        else:
            raise ValueError(f"Unknown migration type: {migration_type}")
    
    def migrate_user_profiles(self):
        """Migrate user profiles."""
        users_without_profiles = User.objects.filter(userprofile__isnull=True)
        total_users = users_without_profiles.count()
        
        migrated_count = 0
        
        for i in range(0, total_users, self.batch_size):
            batch = users_without_profiles[i:i + self.batch_size]
            
            with transaction.atomic():
                profiles_to_create = []
                
                for user in batch:
                    profiles_to_create.append(
                        UserProfile(
                            user=user,
                            bio='',
                            created_at=user.date_joined
                        )
                    )
                
                UserProfile.objects.bulk_create(profiles_to_create)
                migrated_count += len(profiles_to_create)
            
            # Update progress
            self.update_state(
                state='PROGRESS',
                meta={
                    'current': migrated_count,
                    'total': total_users,
                    'status': f'Migrated {migrated_count}/{total_users} profiles'
                }
            )
        
        return {
            'migration_type': 'user_profiles',
            'total_migrated': migrated_count,
            'batch_size': self.batch_size
        }
    
    def migrate_post_categories(self):
        """Migrate post categories."""
        # Implementation for post category migration
        pass
    
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        """Handle task failure."""
        logger.error(f"Data migration failed: {exc}")
        
        # Send failure notification
        send_mail(
            subject='Data Migration Failed',
            message=f'Migration {self.migration_type} failed: {exc}',
            from_email='admin@example.com',
            recipient_list=['admin@example.com']
        )
    
    def on_success(self, retval, task_id, args, kwargs):
        """Handle task success."""
        logger.info(f"Data migration completed: {retval}")
        
        # Send success notification
        send_mail(
            subject='Data Migration Completed',
            message=f'Migration completed successfully: {retval}',
            from_email='admin@example.com',
            recipient_list=['admin@example.com']
        )

# Register class-based task
data_migration_task = DataMigrationTask()

class FileProcessingTask(Task):
    """Class-based task for file processing."""
    
    name = 'file_processing'
    bind = True
    max_retries = 2
    
    def run(self, file_path, processing_type, **options):
        """Process uploaded file."""
        import os
        from PIL import Image
        
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"File not found: {file_path}")
        
        if processing_type == 'image_resize':
            return self.resize_image(file_path, **options)
        elif processing_type == 'image_optimize':
            return self.optimize_image(file_path, **options)
        elif processing_type == 'generate_thumbnails':
            return self.generate_thumbnails(file_path, **options)
        else:
            raise ValueError(f"Unknown processing type: {processing_type}")
    
    def resize_image(self, file_path, width, height, **kwargs):
        """Resize image to specified dimensions."""
        try:
            with Image.open(file_path) as img:
                # Resize image
                resized_img = img.resize((width, height), Image.Resampling.LANCZOS)
                
                # Save resized image
                output_path = file_path.replace('.', f'_resized_{width}x{height}.')
                resized_img.save(output_path, optimize=True, quality=85)
                
                return {
                    'original_path': file_path,
                    'resized_path': output_path,
                    'original_size': img.size,
                    'new_size': (width, height)
                }
        
        except Exception as e:
            raise Exception(f"Image resize failed: {str(e)}")
    
    def optimize_image(self, file_path, quality=85, **kwargs):
        """Optimize image for web."""
        try:
            with Image.open(file_path) as img:
                # Convert to RGB if necessary
                if img.mode in ('RGBA', 'P'):
                    img = img.convert('RGB')
                
                # Save optimized image
                output_path = file_path.replace('.', '_optimized.')
                img.save(output_path, optimize=True, quality=quality)
                
                # Get file sizes
                original_size = os.path.getsize(file_path)
                optimized_size = os.path.getsize(output_path)
                
                return {
                    'original_path': file_path,
                    'optimized_path': output_path,
                    'original_size': original_size,
                    'optimized_size': optimized_size,
                    'compression_ratio': (original_size - optimized_size) / original_size
                }
        
        except Exception as e:
            raise Exception(f"Image optimization failed: {str(e)}")
    
    def generate_thumbnails(self, file_path, sizes=None, **kwargs):
        """Generate multiple thumbnail sizes."""
        if sizes is None:
            sizes = [(150, 150), (300, 300), (600, 600)]
        
        thumbnails = []
        
        try:
            with Image.open(file_path) as img:
                for i, (width, height) in enumerate(sizes):
                    # Update progress
                    self.update_state(
                        state='PROGRESS',
                        meta={
                            'current': i + 1,
                            'total': len(sizes),
                            'status': f'Generating {width}x{height} thumbnail'
                        }
                    )
                    
                    # Create thumbnail
                    thumbnail = img.copy()
                    thumbnail.thumbnail((width, height), Image.Resampling.LANCZOS)
                    
                    # Save thumbnail
                    output_path = file_path.replace('.', f'_thumb_{width}x{height}.')
                    thumbnail.save(output_path, optimize=True, quality=85)
                    
                    thumbnails.append({
                        'size': (width, height),
                        'path': output_path,
                        'file_size': os.path.getsize(output_path)
                    })
                
                return {
                    'original_path': file_path,
                    'thumbnails': thumbnails,
                    'total_thumbnails': len(thumbnails)
                }
        
        except Exception as e:
            raise Exception(f"Thumbnail generation failed: {str(e)}")

# Register class-based task
file_processing_task = FileProcessingTask()

Enqueueing Tasks

Basic Task Enqueueing

# views.py - Enqueueing tasks from views
from django.http import JsonResponse
from django.shortcuts import get_object_or_404
from django.contrib.auth.decorators import login_required
from django.views.decorators.http import require_POST
from .tasks import (
    send_welcome_email, 
    process_user_data, 
    batch_process_posts,
    data_migration_task,
    file_processing_task
)
import json

@require_POST
@login_required
def send_welcome_email_view(request):
    """Enqueue welcome email task."""
    user_id = request.POST.get('user_id')
    
    if not user_id:
        return JsonResponse({'error': 'User ID required'}, status=400)
    
    # Enqueue task
    task_result = send_welcome_email.delay(user_id)
    
    return JsonResponse({
        'task_id': task_result.id,
        'status': 'queued',
        'message': 'Welcome email task queued successfully'
    })

@require_POST
@login_required
def update_user_profile_view(request):
    """Enqueue user profile update task."""
    try:
        data = json.loads(request.body)
        user_id = data.get('user_id')
        updates = data.get('updates', {})
        
        if not user_id:
            return JsonResponse({'error': 'User ID required'}, status=400)
        
        # Enqueue task with priority
        task_result = process_user_data.apply_async(
            args=[user_id, updates],
            priority=5,  # Higher priority
            countdown=10  # Delay execution by 10 seconds
        )
        
        return JsonResponse({
            'task_id': task_result.id,
            'status': 'queued',
            'message': 'User profile update queued'
        })
    
    except json.JSONDecodeError:
        return JsonResponse({'error': 'Invalid JSON'}, status=400)

@require_POST
@login_required
def batch_publish_posts_view(request):
    """Enqueue batch post publishing task."""
    try:
        data = json.loads(request.body)
        post_ids = data.get('post_ids', [])
        
        if not post_ids:
            return JsonResponse({'error': 'Post IDs required'}, status=400)
        
        # Enqueue batch task
        task_result = batch_process_posts.delay(
            post_ids, 
            'publish'
        )
        
        return JsonResponse({
            'task_id': task_result.id,
            'status': 'queued',
            'total_posts': len(post_ids),
            'message': 'Batch publish task queued'
        })
    
    except json.JSONDecodeError:
        return JsonResponse({'error': 'Invalid JSON'}, status=400)

def start_data_migration_view(request):
    """Start data migration task."""
    migration_type = request.GET.get('type', 'user_profiles')
    batch_size = int(request.GET.get('batch_size', 1000))
    
    # Enqueue migration task
    task_result = data_migration_task.delay(
        migration_type=migration_type,
        batch_size=batch_size
    )
    
    return JsonResponse({
        'task_id': task_result.id,
        'migration_type': migration_type,
        'batch_size': batch_size,
        'status': 'started'
    })

@require_POST
def process_uploaded_file_view(request):
    """Process uploaded file."""
    uploaded_file = request.FILES.get('file')
    processing_type = request.POST.get('processing_type', 'image_resize')
    
    if not uploaded_file:
        return JsonResponse({'error': 'No file uploaded'}, status=400)
    
    # Save uploaded file
    import os
    from django.conf import settings
    
    upload_dir = os.path.join(settings.MEDIA_ROOT, 'uploads')
    os.makedirs(upload_dir, exist_ok=True)
    
    file_path = os.path.join(upload_dir, uploaded_file.name)
    
    with open(file_path, 'wb+') as destination:
        for chunk in uploaded_file.chunks():
            destination.write(chunk)
    
    # Enqueue processing task
    options = {}
    if processing_type == 'image_resize':
        options = {
            'width': int(request.POST.get('width', 800)),
            'height': int(request.POST.get('height', 600))
        }
    elif processing_type == 'image_optimize':
        options = {
            'quality': int(request.POST.get('quality', 85))
        }
    
    task_result = file_processing_task.delay(
        file_path=file_path,
        processing_type=processing_type,
        **options
    )
    
    return JsonResponse({
        'task_id': task_result.id,
        'file_name': uploaded_file.name,
        'processing_type': processing_type,
        'status': 'queued'
    })

Advanced Enqueueing Patterns

# Advanced task enqueueing patterns
from django.tasks import group, chain, chord
from datetime import datetime, timedelta
from .tasks import send_email_django_task, process_user_data, generate_report

def enqueue_with_scheduling():
    """Enqueue tasks with scheduling."""
    
    # Immediate execution
    task_result = send_welcome_email.delay(user_id=1)
    
    # Delayed execution (countdown in seconds)
    task_result = send_welcome_email.apply_async(
        args=[1],
        countdown=300  # Execute in 5 minutes
    )
    
    # Scheduled execution (specific datetime)
    eta = datetime.now() + timedelta(hours=1)
    task_result = send_welcome_email.apply_async(
        args=[1],
        eta=eta
    )
    
    # Periodic execution (requires scheduler)
    from django.tasks.schedules import crontab
    
    # This would typically be in settings or task configuration
    periodic_task = {
        'task': 'myapp.tasks.send_welcome_email',
        'schedule': crontab(hour=9, minute=0),  # Daily at 9 AM
        'args': [1]
    }

def enqueue_with_routing():
    """Enqueue tasks with specific routing."""
    
    # Route to specific backend
    task_result = send_welcome_email.apply_async(
        args=[1],
        backend='redis'
    )
    
    # Route to specific queue
    task_result = send_welcome_email.apply_async(
        args=[1],
        queue='high_priority'
    )
    
    # Set task priority
    task_result = send_welcome_email.apply_async(
        args=[1],
        priority=9  # Higher number = higher priority
    )

def enqueue_task_groups():
    """Enqueue groups of related tasks."""
    
    # Group - execute tasks in parallel
    user_ids = [1, 2, 3, 4, 5]
    
    job = group(
        send_welcome_email.s(user_id) for user_id in user_ids
    )
    
    result = job.apply_async()
    
    return {
        'group_id': result.id,
        'task_count': len(user_ids),
        'status': 'queued'
    }

def enqueue_task_chain():
    """Enqueue chained tasks (sequential execution)."""
    
    # Chain - execute tasks sequentially
    workflow = chain(
        process_user_data.s(1, {'first_name': 'John'}),
        send_welcome_email.s(),  # Will receive result from previous task
        generate_report.s('user_activity', '2024-01-01:2024-12-31')
    )
    
    result = workflow.apply_async()
    
    return {
        'chain_id': result.id,
        'status': 'started'
    }

def enqueue_task_chord():
    """Enqueue chord (group + callback)."""
    
    # Chord - execute group in parallel, then callback with results
    user_ids = [1, 2, 3, 4, 5]
    
    # Header - tasks to execute in parallel
    header = group(
        process_user_data.s(user_id, {'status': 'processed'}) 
        for user_id in user_ids
    )
    
    # Body - callback task that receives all results
    callback = generate_report.s('batch_processing_summary')
    
    # Create chord
    chord_task = chord(header)(callback)
    
    return {
        'chord_id': chord_task.id,
        'header_tasks': len(user_ids),
        'status': 'started'
    }

def enqueue_with_error_handling():
    """Enqueue tasks with error handling options."""
    
    # Task with retry configuration
    task_result = send_welcome_email.apply_async(
        args=[1],
        retry=True,
        retry_policy={
            'max_retries': 3,
            'interval_start': 0,
            'interval_step': 0.2,
            'interval_max': 0.2,
        }
    )
    
    # Task with custom error handling
    task_result = process_user_data.apply_async(
        args=[1, {'email': 'new@example.com'}],
        link_error=handle_user_update_error.s()
    )
    
    return task_result

@task
def handle_user_update_error(task_id, error, traceback):
    """Handle user update errors."""
    # Log error
    logger.error(f"User update task {task_id} failed: {error}")
    
    # Send notification to admin
    send_mail(
        subject='Task Failure Notification',
        message=f'Task {task_id} failed with error: {error}',
        from_email='admin@example.com',
        recipient_list=['admin@example.com']
    )

def enqueue_conditional_tasks():
    """Enqueue tasks based on conditions."""
    
    # Conditional task execution
    user_id = 1
    user = User.objects.get(id=user_id)
    
    if user.is_active and user.email:
        # Send welcome email
        welcome_task = send_welcome_email.delay(user_id)
        
        # Chain additional tasks based on user type
        if user.is_staff:
            # Staff users get additional setup
            setup_task = process_user_data.apply_async(
                args=[user_id, {'role': 'staff'}],
                link=welcome_task.id
            )
        
        return welcome_task.id
    
    return None

def bulk_enqueue_tasks():
    """Efficiently enqueue multiple tasks."""
    
    # Bulk task creation
    user_ids = range(1, 1001)  # 1000 users
    
    # Method 1: Individual task enqueueing
    task_ids = []
    for user_id in user_ids:
        task_result = send_welcome_email.delay(user_id)
        task_ids.append(task_result.id)
    
    # Method 2: Batch enqueueing (more efficient)
    tasks = [
        send_welcome_email.s(user_id) 
        for user_id in user_ids
    ]
    
    # Enqueue in batches
    batch_size = 100
    batch_results = []
    
    for i in range(0, len(tasks), batch_size):
        batch = tasks[i:i + batch_size]
        batch_group = group(batch)
        batch_result = batch_group.apply_async()
        batch_results.append(batch_result.id)
    
    return {
        'total_tasks': len(user_ids),
        'batch_count': len(batch_results),
        'batch_ids': batch_results
    }

Task Results

Retrieving Task Results

# views.py - Task result handling
from django.tasks import get_task_backend
from django.http import JsonResponse
from django.shortcuts import get_object_or_404

def get_task_status_view(request, task_id):
    """Get task status and result."""
    backend = get_task_backend()
    
    try:
        task_result = backend.get_result(task_id)
        
        response_data = {
            'task_id': task_id,
            'status': task_result.status,
            'ready': task_result.ready(),
            'successful': task_result.successful() if task_result.ready() else None,
            'failed': task_result.failed() if task_result.ready() else None,
        }
        
        # Add result if task is complete
        if task_result.ready():
            if task_result.successful():
                response_data['result'] = task_result.result
            else:
                response_data['error'] = str(task_result.result)
                response_data['traceback'] = task_result.traceback
        
        # Add progress information if available
        if hasattr(task_result, 'info') and task_result.info:
            response_data['progress'] = task_result.info
        
        return JsonResponse(response_data)
    
    except Exception as e:
        return JsonResponse({
            'task_id': task_id,
            'error': f'Failed to retrieve task status: {str(e)}'
        }, status=500)

def wait_for_task_result_view(request, task_id):
    """Wait for task result with timeout."""
    timeout = int(request.GET.get('timeout', 30))
    
    backend = get_task_backend()
    
    try:
        task_result = backend.get_result(task_id)
        
        # Wait for result with timeout
        result = task_result.get(timeout=timeout)
        
        return JsonResponse({
            'task_id': task_id,
            'status': 'completed',
            'result': result
        })
    
    except TimeoutError:
        return JsonResponse({
            'task_id': task_id,
            'status': 'timeout',
            'message': f'Task did not complete within {timeout} seconds'
        }, status=408)
    
    except Exception as e:
        return JsonResponse({
            'task_id': task_id,
            'status': 'error',
            'error': str(e)
        }, status=500)

def revoke_task_view(request, task_id):
    """Revoke/cancel a task."""
    backend = get_task_backend()
    
    try:
        # Revoke the task
        backend.revoke(task_id, terminate=True)
        
        return JsonResponse({
            'task_id': task_id,
            'status': 'revoked',
            'message': 'Task has been cancelled'
        })
    
    except Exception as e:
        return JsonResponse({
            'task_id': task_id,
            'error': f'Failed to revoke task: {str(e)}'
        }, status=500)

def list_active_tasks_view(request):
    """List all active tasks."""
    backend = get_task_backend()
    
    try:
        # Get active tasks (implementation depends on backend)
        active_tasks = backend.get_active_tasks()
        
        tasks_data = []
        for task_info in active_tasks:
            tasks_data.append({
                'task_id': task_info['id'],
                'name': task_info['name'],
                'status': task_info['status'],
                'started_at': task_info.get('started_at'),
                'worker': task_info.get('worker')
            })
        
        return JsonResponse({
            'active_tasks': tasks_data,
            'count': len(tasks_data)
        })
    
    except Exception as e:
        return JsonResponse({
            'error': f'Failed to retrieve active tasks: {str(e)}'
        }, status=500)

Task Result Management

# Task result utilities and management
from django.tasks import get_task_backend
from django.core.management.base import BaseCommand
from datetime import datetime, timedelta
import json

class TaskResultManager:
    """Utility class for managing task results."""
    
    def __init__(self, backend_name='default'):
        self.backend = get_task_backend(backend_name)
    
    def get_task_info(self, task_id):
        """Get comprehensive task information."""
        try:
            task_result = self.backend.get_result(task_id)
            
            info = {
                'task_id': task_id,
                'status': task_result.status,
                'ready': task_result.ready(),
                'successful': task_result.successful() if task_result.ready() else None,
                'failed': task_result.failed() if task_result.ready() else None,
                'date_created': getattr(task_result, 'date_created', None),
                'date_done': getattr(task_result, 'date_done', None),
            }
            
            if task_result.ready():
                if task_result.successful():
                    info['result'] = task_result.result
                else:
                    info['error'] = str(task_result.result)
                    info['traceback'] = getattr(task_result, 'traceback', None)
            
            return info
        
        except Exception as e:
            return {
                'task_id': task_id,
                'error': f'Failed to get task info: {str(e)}'
            }
    
    def wait_for_multiple_tasks(self, task_ids, timeout=60):
        """Wait for multiple tasks to complete."""
        results = {}
        
        for task_id in task_ids:
            try:
                task_result = self.backend.get_result(task_id)
                result = task_result.get(timeout=timeout)
                results[task_id] = {
                    'status': 'completed',
                    'result': result
                }
            except TimeoutError:
                results[task_id] = {
                    'status': 'timeout',
                    'error': f'Task did not complete within {timeout} seconds'
                }
            except Exception as e:
                results[task_id] = {
                    'status': 'error',
                    'error': str(e)
                }
        
        return results
    
    def cleanup_old_results(self, days_old=7):
        """Clean up old task results."""
        cutoff_date = datetime.now() - timedelta(days=days_old)
        
        # Implementation depends on backend
        if hasattr(self.backend, 'cleanup_results'):
            return self.backend.cleanup_results(cutoff_date)
        else:
            # Manual cleanup for database backend
            from django.tasks.models import TaskResult
            
            old_results = TaskResult.objects.filter(
                date_created__lt=cutoff_date
            )
            
            count = old_results.count()
            old_results.delete()
            
            return count
    
    def get_task_statistics(self):
        """Get task execution statistics."""
        # Implementation depends on backend capabilities
        stats = {
            'total_tasks': 0,
            'completed_tasks': 0,
            'failed_tasks': 0,
            'pending_tasks': 0,
            'average_execution_time': 0
        }
        
        # This would need to be implemented based on your backend
        return stats

# Management command for task result cleanup
class Command(BaseCommand):
    """Management command to clean up old task results."""
    
    help = 'Clean up old task results'
    
    def add_arguments(self, parser):
        parser.add_argument(
            '--days',
            type=int,
            default=7,
            help='Number of days to keep results (default: 7)'
        )
        
        parser.add_argument(
            '--backend',
            type=str,
            default='default',
            help='Task backend to clean up (default: default)'
        )
    
    def handle(self, *args, **options):
        days = options['days']
        backend_name = options['backend']
        
        manager = TaskResultManager(backend_name)
        
        try:
            cleaned_count = manager.cleanup_old_results(days)
            
            self.stdout.write(
                self.style.SUCCESS(
                    f'Successfully cleaned up {cleaned_count} old task results'
                )
            )
        
        except Exception as e:
            self.stdout.write(
                self.style.ERROR(f'Failed to clean up task results: {e}')
            )

# Task monitoring utilities
class TaskMonitor:
    """Monitor task execution and performance."""
    
    def __init__(self):
        self.backend = get_task_backend()
    
    def monitor_task_progress(self, task_id, callback=None):
        """Monitor task progress with optional callback."""
        task_result = self.backend.get_result(task_id)
        
        while not task_result.ready():
            # Get current status
            status_info = {
                'task_id': task_id,
                'status': task_result.status,
                'timestamp': datetime.now().isoformat()
            }
            
            # Add progress info if available
            if hasattr(task_result, 'info') and task_result.info:
                status_info['progress'] = task_result.info
            
            # Call callback if provided
            if callback:
                callback(status_info)
            
            # Wait before checking again
            time.sleep(1)
        
        # Task completed
        final_status = {
            'task_id': task_id,
            'status': task_result.status,
            'completed': True,
            'successful': task_result.successful(),
            'timestamp': datetime.now().isoformat()
        }
        
        if task_result.successful():
            final_status['result'] = task_result.result
        else:
            final_status['error'] = str(task_result.result)
        
        if callback:
            callback(final_status)
        
        return final_status
    
    def get_worker_statistics(self):
        """Get worker performance statistics."""
        # This would depend on your backend implementation
        return {
            'active_workers': 0,
            'total_tasks_processed': 0,
            'average_task_duration': 0,
            'worker_load': {}
        }

# WebSocket integration for real-time task updates
from channels.generic.websocket import AsyncWebsocketConsumer
import json

class TaskProgressConsumer(AsyncWebsocketConsumer):
    """WebSocket consumer for real-time task progress updates."""
    
    async def connect(self):
        """Handle WebSocket connection."""
        self.task_id = self.scope['url_route']['kwargs']['task_id']
        self.group_name = f'task_{self.task_id}'
        
        # Join task group
        await self.channel_layer.group_add(
            self.group_name,
            self.channel_name
        )
        
        await self.accept()
        
        # Send initial task status
        await self.send_task_status()
    
    async def disconnect(self, close_code):
        """Handle WebSocket disconnection."""
        await self.channel_layer.group_discard(
            self.group_name,
            self.channel_name
        )
    
    async def send_task_status(self):
        """Send current task status."""
        manager = TaskResultManager()
        task_info = manager.get_task_info(self.task_id)
        
        await self.send(text_data=json.dumps({
            'type': 'task_status',
            'data': task_info
        }))
    
    async def task_progress_update(self, event):
        """Handle task progress updates."""
        await self.send(text_data=json.dumps({
            'type': 'progress_update',
            'data': event['data']
        }))

Django's Tasks framework provides a powerful, built-in solution for handling background tasks without external dependencies. It offers flexible backend options, comprehensive task management capabilities, and seamless integration with Django's ecosystem. The framework is particularly well-suited for applications that need reliable task processing without the complexity of setting up external message brokers or task queues.