Asynchronous Django

Background Tasks with Celery or RQ

Background task processing is essential for building responsive async Django applications. While async views handle concurrent requests efficiently, long-running operations like file processing, email sending, data analysis, or external API calls should be offloaded to background workers. This chapter covers integrating Celery and RQ with async Django applications, implementing task queues, scheduling, monitoring, and async task patterns.

Background Tasks with Celery or RQ

Background task processing is essential for building responsive async Django applications. While async views handle concurrent requests efficiently, long-running operations like file processing, email sending, data analysis, or external API calls should be offloaded to background workers. This chapter covers integrating Celery and RQ with async Django applications, implementing task queues, scheduling, monitoring, and async task patterns.

Understanding Background Tasks

Background tasks decouple time-consuming operations from HTTP request-response cycles, enabling:

Improved User Experience: Immediate responses while processing continues in background System Reliability: Retry failed operations and handle errors gracefully Resource Management: Distribute workload across multiple worker processes Scalability: Scale task processing independently from web servers Async Integration: Seamlessly integrate with async Django views and WebSocket consumers

Task Processing Architecture

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Async View    │───▶│   Task Queue    │───▶│  Worker Process │
│                 │    │   (Redis/RMQ)   │    │   (Celery/RQ)   │
└─────────────────┘    └─────────────────┘    └─────────────────┘
         │                       │                       │
         ▼                       ▼                       ▼
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│  Task Status    │    │   Task Results  │    │   Task Logs     │
│   Tracking      │    │     Storage     │    │   Monitoring    │
└─────────────────┘    └─────────────────┘    └─────────────────┘

Celery Integration

Celery is the most popular distributed task queue for Python applications, offering robust features for async Django integration.

Celery Setup and Configuration

# celery_app.py
import os
from celery import Celery
from django.conf import settings

# Set Django settings module
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

# Create Celery app
app = Celery('myproject')

# Configure Celery using Django settings
app.config_from_object('django.conf:settings', namespace='CELERY')

# Auto-discover tasks from all registered Django apps
app.autodiscover_tasks()

# Async-compatible configuration
app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
    task_track_started=True,
    task_time_limit=30 * 60,  # 30 minutes
    task_soft_time_limit=25 * 60,  # 25 minutes
    worker_prefetch_multiplier=1,
    worker_max_tasks_per_child=1000,
)

@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

Django Settings for Celery

# settings.py
import os

# Celery Configuration
CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0')
CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0')

# Async-compatible settings
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'UTC'
CELERY_ENABLE_UTC = True

# Task routing for different queues
CELERY_TASK_ROUTES = {
    'myapp.tasks.send_email': {'queue': 'email'},
    'myapp.tasks.process_image': {'queue': 'media'},
    'myapp.tasks.generate_report': {'queue': 'reports'},
    'myapp.tasks.sync_data': {'queue': 'sync'},
}

# Worker configuration
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000
CELERY_TASK_TIME_LIMIT = 30 * 60
CELERY_TASK_SOFT_TIME_LIMIT = 25 * 60

# Beat scheduler for periodic tasks
CELERY_BEAT_SCHEDULE = {
    'cleanup-expired-sessions': {
        'task': 'myapp.tasks.cleanup_expired_sessions',
        'schedule': 3600.0,  # Every hour
    },
    'generate-daily-reports': {
        'task': 'myapp.tasks.generate_daily_reports',
        'schedule': crontab(hour=2, minute=0),  # Daily at 2 AM
    },
}

Async Task Definitions

# tasks.py
import asyncio
import aiohttp
import aiofiles
from celery import shared_task
from django.core.mail import send_mail
from django.contrib.auth.models import User
from asgiref.sync import sync_to_async
from .models import Document, ProcessingJob

@shared_task(bind=True, max_retries=3)
def send_email_task(self, subject, message, recipient_list):
    """Send email with retry logic"""
    try:
        send_mail(
            subject=subject,
            message=message,
            from_email='noreply@example.com',
            recipient_list=recipient_list,
            fail_silently=False
        )
        return f"Email sent to {len(recipient_list)} recipients"
    except Exception as exc:
        # Retry with exponential backoff
        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))

@shared_task(bind=True)
def process_document_async(self, document_id):
    """Process document with async operations"""
    try:
        # Run async processing in event loop
        return asyncio.run(process_document_content(document_id))
    except Exception as exc:
        return {'error': str(exc), 'document_id': document_id}

async def process_document_content(document_id):
    """Async document processing logic"""
    # Get document using async ORM
    document = await sync_to_async(Document.objects.get)(id=document_id)
    
    # Process file asynchronously
    async with aiofiles.open(document.file.path, 'r') as f:
        content = await f.read()
    
    # Perform async analysis
    analysis_results = await analyze_content_async(content)
    
    # Update document with results
    await sync_to_async(document.save)()
    
    return {
        'document_id': document_id,
        'status': 'completed',
        'analysis': analysis_results
    }

async def analyze_content_async(content):
    """Async content analysis using external API"""
    async with aiohttp.ClientSession() as session:
        async with session.post(
            'https://api.textanalysis.com/analyze',
            json={'text': content},
            headers={'Authorization': 'Bearer YOUR_API_KEY'}
        ) as response:
            return await response.json()

@shared_task(bind=True)
def batch_process_users(self, user_ids, operation):
    """Process multiple users in batch"""
    results = []
    
    for user_id in user_ids:
        try:
            result = asyncio.run(process_single_user(user_id, operation))
            results.append(result)
        except Exception as exc:
            results.append({'user_id': user_id, 'error': str(exc)})
    
    return {
        'total_processed': len(results),
        'successful': len([r for r in results if 'error' not in r]),
        'failed': len([r for r in results if 'error' in r]),
        'results': results
    }

async def process_single_user(user_id, operation):
    """Process individual user asynchronously"""
    user = await sync_to_async(User.objects.get)(id=user_id)
    
    if operation == 'update_profile':
        # Async profile update logic
        await update_user_profile_async(user)
    elif operation == 'send_notification':
        # Async notification sending
        await send_user_notification_async(user)
    
    return {'user_id': user_id, 'status': 'completed', 'operation': operation}

Integrating Tasks with Async Views

# views.py
import json
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
from django.views import View
from asgiref.sync import sync_to_async
from .tasks import send_email_task, process_document_async, batch_process_users

class AsyncTaskView(View):
    """Base class for async views that trigger tasks"""
    
    async def dispatch(self, request, *args, **kwargs):
        """Async dispatch method"""
        return await super().dispatch(request, *args, **kwargs)

@method_decorator(csrf_exempt, name='dispatch')
class DocumentProcessingView(AsyncTaskView):
    """Async view for document processing"""
    
    async def post(self, request):
        try:
            data = json.loads(request.body)
            document_id = data.get('document_id')
            
            if not document_id:
                return JsonResponse({'error': 'document_id required'}, status=400)
            
            # Trigger async task
            task = await sync_to_async(process_document_async.delay)(document_id)
            
            return JsonResponse({
                'task_id': task.id,
                'status': 'processing',
                'message': 'Document processing started'
            })
            
        except Exception as e:
            return JsonResponse({'error': str(e)}, status=500)

class BatchUserProcessingView(AsyncTaskView):
    """Async view for batch user processing"""
    
    async def post(self, request):
        try:
            data = json.loads(request.body)
            user_ids = data.get('user_ids', [])
            operation = data.get('operation')
            
            if not user_ids or not operation:
                return JsonResponse({
                    'error': 'user_ids and operation required'
                }, status=400)
            
            # Trigger batch processing task
            task = await sync_to_async(batch_process_users.delay)(user_ids, operation)
            
            return JsonResponse({
                'task_id': task.id,
                'status': 'processing',
                'user_count': len(user_ids),
                'operation': operation
            })
            
        except Exception as e:
            return JsonResponse({'error': str(e)}, status=500)

class TaskStatusView(AsyncTaskView):
    """Check task status asynchronously"""
    
    async def get(self, request, task_id):
        try:
            from celery.result import AsyncResult
            
            # Get task result asynchronously
            result = AsyncResult(task_id)
            
            response_data = {
                'task_id': task_id,
                'status': result.status,
                'ready': result.ready()
            }
            
            if result.ready():
                if result.successful():
                    response_data['result'] = result.result
                else:
                    response_data['error'] = str(result.result)
            else:
                response_data['progress'] = getattr(result.result, 'current', 0) if result.result else 0
            
            return JsonResponse(response_data)
            
        except Exception as e:
            return JsonResponse({'error': str(e)}, status=500)

WebSocket Integration with Tasks

# consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from celery.result import AsyncResult
from .tasks import process_document_async

class TaskProgressConsumer(AsyncWebsocketConsumer):
    """WebSocket consumer for real-time task progress"""
    
    async def connect(self):
        self.task_id = self.scope['url_route']['kwargs']['task_id']
        self.task_group_name = f'task_{self.task_id}'
        
        # Join task group
        await self.channel_layer.group_add(
            self.task_group_name,
            self.channel_name
        )
        
        await self.accept()
        
        # Start monitoring task
        await self.monitor_task()
    
    async def disconnect(self, close_code):
        # Leave task group
        await self.channel_layer.group_discard(
            self.task_group_name,
            self.channel_name
        )
    
    async def monitor_task(self):
        """Monitor task progress and send updates"""
        result = AsyncResult(self.task_id)
        
        while not result.ready():
            # Send progress update
            await self.send(text_data=json.dumps({
                'type': 'task_progress',
                'task_id': self.task_id,
                'status': result.status,
                'progress': getattr(result.result, 'current', 0) if result.result else 0
            }))
            
            # Wait before next check
            await asyncio.sleep(1)
        
        # Send final result
        if result.successful():
            await self.send(text_data=json.dumps({
                'type': 'task_complete',
                'task_id': self.task_id,
                'status': 'SUCCESS',
                'result': result.result
            }))
        else:
            await self.send(text_data=json.dumps({
                'type': 'task_error',
                'task_id': self.task_id,
                'status': 'FAILURE',
                'error': str(result.result)
            }))
    
    async def task_update(self, event):
        """Handle task update from group"""
        await self.send(text_data=json.dumps(event['message']))

# Enhanced task with progress reporting
@shared_task(bind=True)
def process_large_dataset(self, dataset_id):
    """Process large dataset with progress updates"""
    from channels.layers import get_channel_layer
    from asgiref.sync import async_to_sync
    
    channel_layer = get_channel_layer()
    task_group = f'task_{self.request.id}'
    
    try:
        # Get dataset
        dataset = Dataset.objects.get(id=dataset_id)
        total_items = dataset.items.count()
        
        processed = 0
        for item in dataset.items.all():
            # Process individual item
            process_dataset_item(item)
            processed += 1
            
            # Send progress update
            progress = int((processed / total_items) * 100)
            async_to_sync(channel_layer.group_send)(task_group, {
                'type': 'task_update',
                'message': {
                    'type': 'progress',
                    'task_id': self.request.id,
                    'progress': progress,
                    'processed': processed,
                    'total': total_items
                }
            })
            
            # Update task state
            self.update_state(
                state='PROGRESS',
                meta={'current': processed, 'total': total_items}
            )
        
        return {
            'status': 'completed',
            'processed': processed,
            'total': total_items
        }
        
    except Exception as exc:
        # Send error update
        async_to_sync(channel_layer.group_send)(task_group, {
            'type': 'task_update',
            'message': {
                'type': 'error',
                'task_id': self.request.id,
                'error': str(exc)
            }
        })
        raise

RQ (Redis Queue) Integration

RQ provides a simpler alternative to Celery for background task processing, with excellent async Django integration.

RQ Setup and Configuration

# rq_config.py
import os
import redis
from rq import Queue, Worker
from django.conf import settings

# Redis connection
redis_conn = redis.Redis(
    host=os.environ.get('REDIS_HOST', 'localhost'),
    port=int(os.environ.get('REDIS_PORT', 6379)),
    db=int(os.environ.get('REDIS_DB', 0)),
    decode_responses=True
)

# Define queues
default_queue = Queue('default', connection=redis_conn)
email_queue = Queue('email', connection=redis_conn)
media_queue = Queue('media', connection=redis_conn)
reports_queue = Queue('reports', connection=redis_conn)

# Queue mapping
QUEUE_MAP = {
    'default': default_queue,
    'email': email_queue,
    'media': media_queue,
    'reports': reports_queue,
}

def get_queue(name='default'):
    """Get queue by name"""
    return QUEUE_MAP.get(name, default_queue)

RQ Task Definitions

# rq_tasks.py
import asyncio
import aiohttp
from rq import get_current_job
from django.core.mail import send_mail
from asgiref.sync import sync_to_async
from .models import Document, User

def send_email_rq(subject, message, recipient_list):
    """RQ task for sending emails"""
    job = get_current_job()
    
    try:
        send_mail(
            subject=subject,
            message=message,
            from_email='noreply@example.com',
            recipient_list=recipient_list,
            fail_silently=False
        )
        
        return {
            'job_id': job.id,
            'status': 'completed',
            'recipients': len(recipient_list)
        }
        
    except Exception as e:
        return {
            'job_id': job.id,
            'status': 'failed',
            'error': str(e)
        }

def process_document_rq(document_id):
    """RQ task for document processing"""
    job = get_current_job()
    
    try:
        # Run async processing
        result = asyncio.run(process_document_async_rq(document_id))
        
        return {
            'job_id': job.id,
            'document_id': document_id,
            'status': 'completed',
            'result': result
        }
        
    except Exception as e:
        return {
            'job_id': job.id,
            'document_id': document_id,
            'status': 'failed',
            'error': str(e)
        }

async def process_document_async_rq(document_id):
    """Async document processing for RQ"""
    # Get document
    document = await sync_to_async(Document.objects.get)(id=document_id)
    
    # Process with external API
    async with aiohttp.ClientSession() as session:
        async with session.post(
            'https://api.processor.com/process',
            json={'document_id': document_id, 'content': document.content}
        ) as response:
            result = await response.json()
    
    # Update document
    document.processed_data = result
    await sync_to_async(document.save)()
    
    return result

def batch_user_update_rq(user_ids, update_data):
    """RQ task for batch user updates"""
    job = get_current_job()
    
    try:
        results = asyncio.run(process_users_batch(user_ids, update_data))
        
        return {
            'job_id': job.id,
            'processed': len(results),
            'successful': len([r for r in results if r.get('status') == 'success']),
            'results': results
        }
        
    except Exception as e:
        return {
            'job_id': job.id,
            'status': 'failed',
            'error': str(e)
        }

async def process_users_batch(user_ids, update_data):
    """Process users in batch asynchronously"""
    results = []
    
    for user_id in user_ids:
        try:
            user = await sync_to_async(User.objects.get)(id=user_id)
            
            # Update user data
            for field, value in update_data.items():
                setattr(user, field, value)
            
            await sync_to_async(user.save)()
            
            results.append({
                'user_id': user_id,
                'status': 'success'
            })
            
        except Exception as e:
            results.append({
                'user_id': user_id,
                'status': 'error',
                'error': str(e)
            })
    
    return results

RQ Integration with Async Views

# rq_views.py
import json
from django.http import JsonResponse
from django.views import View
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
from .rq_config import get_queue
from .rq_tasks import send_email_rq, process_document_rq, batch_user_update_rq

@method_decorator(csrf_exempt, name='dispatch')
class RQTaskView(View):
    """Base view for RQ task management"""
    
    async def dispatch(self, request, *args, **kwargs):
        return await super().dispatch(request, *args, **kwargs)

class EnqueueEmailView(RQTaskView):
    """Enqueue email sending task"""
    
    async def post(self, request):
        try:
            data = json.loads(request.body)
            
            subject = data.get('subject')
            message = data.get('message')
            recipients = data.get('recipients', [])
            
            if not all([subject, message, recipients]):
                return JsonResponse({
                    'error': 'subject, message, and recipients required'
                }, status=400)
            
            # Enqueue email task
            queue = get_queue('email')
            job = queue.enqueue(
                send_email_rq,
                subject,
                message,
                recipients,
                timeout='5m'
            )
            
            return JsonResponse({
                'job_id': job.id,
                'status': 'enqueued',
                'queue': 'email',
                'recipients': len(recipients)
            })
            
        except Exception as e:
            return JsonResponse({'error': str(e)}, status=500)

class EnqueueDocumentProcessingView(RQTaskView):
    """Enqueue document processing task"""
    
    async def post(self, request):
        try:
            data = json.loads(request.body)
            document_id = data.get('document_id')
            
            if not document_id:
                return JsonResponse({'error': 'document_id required'}, status=400)
            
            # Enqueue processing task
            queue = get_queue('media')
            job = queue.enqueue(
                process_document_rq,
                document_id,
                timeout='30m'
            )
            
            return JsonResponse({
                'job_id': job.id,
                'status': 'enqueued',
                'queue': 'media',
                'document_id': document_id
            })
            
        except Exception as e:
            return JsonResponse({'error': str(e)}, status=500)

class RQJobStatusView(RQTaskView):
    """Check RQ job status"""
    
    async def get(self, request, job_id):
        try:
            from rq import Job
            from .rq_config import redis_conn
            
            # Get job
            job = Job.fetch(job_id, connection=redis_conn)
            
            response_data = {
                'job_id': job_id,
                'status': job.get_status(),
                'created_at': job.created_at.isoformat() if job.created_at else None,
                'started_at': job.started_at.isoformat() if job.started_at else None,
                'ended_at': job.ended_at.isoformat() if job.ended_at else None,
            }
            
            if job.is_finished:
                response_data['result'] = job.result
            elif job.is_failed:
                response_data['error'] = str(job.exc_info)
            
            return JsonResponse(response_data)
            
        except Exception as e:
            return JsonResponse({'error': str(e)}, status=500)