Background task processing is essential for building responsive async Django applications. While async views handle concurrent requests efficiently, long-running operations like file processing, email sending, data analysis, or external API calls should be offloaded to background workers. This chapter covers integrating Celery and RQ with async Django applications, implementing task queues, scheduling, monitoring, and async task patterns.
Background tasks decouple time-consuming operations from HTTP request-response cycles, enabling:
Improved User Experience: Immediate responses while processing continues in background System Reliability: Retry failed operations and handle errors gracefully Resource Management: Distribute workload across multiple worker processes Scalability: Scale task processing independently from web servers Async Integration: Seamlessly integrate with async Django views and WebSocket consumers
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Async View │───▶│ Task Queue │───▶│ Worker Process │
│ │ │ (Redis/RMQ) │ │ (Celery/RQ) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Task Status │ │ Task Results │ │ Task Logs │
│ Tracking │ │ Storage │ │ Monitoring │
└─────────────────┘ └─────────────────┘ └─────────────────┘
Celery is the most popular distributed task queue for Python applications, offering robust features for async Django integration.
# celery_app.py
import os
from celery import Celery
from django.conf import settings
# Set Django settings module
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
# Create Celery app
app = Celery('myproject')
# Configure Celery using Django settings
app.config_from_object('django.conf:settings', namespace='CELERY')
# Auto-discover tasks from all registered Django apps
app.autodiscover_tasks()
# Async-compatible configuration
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
task_track_started=True,
task_time_limit=30 * 60, # 30 minutes
task_soft_time_limit=25 * 60, # 25 minutes
worker_prefetch_multiplier=1,
worker_max_tasks_per_child=1000,
)
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
# settings.py
import os
# Celery Configuration
CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0')
CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0')
# Async-compatible settings
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'UTC'
CELERY_ENABLE_UTC = True
# Task routing for different queues
CELERY_TASK_ROUTES = {
'myapp.tasks.send_email': {'queue': 'email'},
'myapp.tasks.process_image': {'queue': 'media'},
'myapp.tasks.generate_report': {'queue': 'reports'},
'myapp.tasks.sync_data': {'queue': 'sync'},
}
# Worker configuration
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000
CELERY_TASK_TIME_LIMIT = 30 * 60
CELERY_TASK_SOFT_TIME_LIMIT = 25 * 60
# Beat scheduler for periodic tasks
CELERY_BEAT_SCHEDULE = {
'cleanup-expired-sessions': {
'task': 'myapp.tasks.cleanup_expired_sessions',
'schedule': 3600.0, # Every hour
},
'generate-daily-reports': {
'task': 'myapp.tasks.generate_daily_reports',
'schedule': crontab(hour=2, minute=0), # Daily at 2 AM
},
}
# tasks.py
import asyncio
import aiohttp
import aiofiles
from celery import shared_task
from django.core.mail import send_mail
from django.contrib.auth.models import User
from asgiref.sync import sync_to_async
from .models import Document, ProcessingJob
@shared_task(bind=True, max_retries=3)
def send_email_task(self, subject, message, recipient_list):
"""Send email with retry logic"""
try:
send_mail(
subject=subject,
message=message,
from_email='noreply@example.com',
recipient_list=recipient_list,
fail_silently=False
)
return f"Email sent to {len(recipient_list)} recipients"
except Exception as exc:
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
@shared_task(bind=True)
def process_document_async(self, document_id):
"""Process document with async operations"""
try:
# Run async processing in event loop
return asyncio.run(process_document_content(document_id))
except Exception as exc:
return {'error': str(exc), 'document_id': document_id}
async def process_document_content(document_id):
"""Async document processing logic"""
# Get document using async ORM
document = await sync_to_async(Document.objects.get)(id=document_id)
# Process file asynchronously
async with aiofiles.open(document.file.path, 'r') as f:
content = await f.read()
# Perform async analysis
analysis_results = await analyze_content_async(content)
# Update document with results
await sync_to_async(document.save)()
return {
'document_id': document_id,
'status': 'completed',
'analysis': analysis_results
}
async def analyze_content_async(content):
"""Async content analysis using external API"""
async with aiohttp.ClientSession() as session:
async with session.post(
'https://api.textanalysis.com/analyze',
json={'text': content},
headers={'Authorization': 'Bearer YOUR_API_KEY'}
) as response:
return await response.json()
@shared_task(bind=True)
def batch_process_users(self, user_ids, operation):
"""Process multiple users in batch"""
results = []
for user_id in user_ids:
try:
result = asyncio.run(process_single_user(user_id, operation))
results.append(result)
except Exception as exc:
results.append({'user_id': user_id, 'error': str(exc)})
return {
'total_processed': len(results),
'successful': len([r for r in results if 'error' not in r]),
'failed': len([r for r in results if 'error' in r]),
'results': results
}
async def process_single_user(user_id, operation):
"""Process individual user asynchronously"""
user = await sync_to_async(User.objects.get)(id=user_id)
if operation == 'update_profile':
# Async profile update logic
await update_user_profile_async(user)
elif operation == 'send_notification':
# Async notification sending
await send_user_notification_async(user)
return {'user_id': user_id, 'status': 'completed', 'operation': operation}
# views.py
import json
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
from django.views import View
from asgiref.sync import sync_to_async
from .tasks import send_email_task, process_document_async, batch_process_users
class AsyncTaskView(View):
"""Base class for async views that trigger tasks"""
async def dispatch(self, request, *args, **kwargs):
"""Async dispatch method"""
return await super().dispatch(request, *args, **kwargs)
@method_decorator(csrf_exempt, name='dispatch')
class DocumentProcessingView(AsyncTaskView):
"""Async view for document processing"""
async def post(self, request):
try:
data = json.loads(request.body)
document_id = data.get('document_id')
if not document_id:
return JsonResponse({'error': 'document_id required'}, status=400)
# Trigger async task
task = await sync_to_async(process_document_async.delay)(document_id)
return JsonResponse({
'task_id': task.id,
'status': 'processing',
'message': 'Document processing started'
})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
class BatchUserProcessingView(AsyncTaskView):
"""Async view for batch user processing"""
async def post(self, request):
try:
data = json.loads(request.body)
user_ids = data.get('user_ids', [])
operation = data.get('operation')
if not user_ids or not operation:
return JsonResponse({
'error': 'user_ids and operation required'
}, status=400)
# Trigger batch processing task
task = await sync_to_async(batch_process_users.delay)(user_ids, operation)
return JsonResponse({
'task_id': task.id,
'status': 'processing',
'user_count': len(user_ids),
'operation': operation
})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
class TaskStatusView(AsyncTaskView):
"""Check task status asynchronously"""
async def get(self, request, task_id):
try:
from celery.result import AsyncResult
# Get task result asynchronously
result = AsyncResult(task_id)
response_data = {
'task_id': task_id,
'status': result.status,
'ready': result.ready()
}
if result.ready():
if result.successful():
response_data['result'] = result.result
else:
response_data['error'] = str(result.result)
else:
response_data['progress'] = getattr(result.result, 'current', 0) if result.result else 0
return JsonResponse(response_data)
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
# consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from celery.result import AsyncResult
from .tasks import process_document_async
class TaskProgressConsumer(AsyncWebsocketConsumer):
"""WebSocket consumer for real-time task progress"""
async def connect(self):
self.task_id = self.scope['url_route']['kwargs']['task_id']
self.task_group_name = f'task_{self.task_id}'
# Join task group
await self.channel_layer.group_add(
self.task_group_name,
self.channel_name
)
await self.accept()
# Start monitoring task
await self.monitor_task()
async def disconnect(self, close_code):
# Leave task group
await self.channel_layer.group_discard(
self.task_group_name,
self.channel_name
)
async def monitor_task(self):
"""Monitor task progress and send updates"""
result = AsyncResult(self.task_id)
while not result.ready():
# Send progress update
await self.send(text_data=json.dumps({
'type': 'task_progress',
'task_id': self.task_id,
'status': result.status,
'progress': getattr(result.result, 'current', 0) if result.result else 0
}))
# Wait before next check
await asyncio.sleep(1)
# Send final result
if result.successful():
await self.send(text_data=json.dumps({
'type': 'task_complete',
'task_id': self.task_id,
'status': 'SUCCESS',
'result': result.result
}))
else:
await self.send(text_data=json.dumps({
'type': 'task_error',
'task_id': self.task_id,
'status': 'FAILURE',
'error': str(result.result)
}))
async def task_update(self, event):
"""Handle task update from group"""
await self.send(text_data=json.dumps(event['message']))
# Enhanced task with progress reporting
@shared_task(bind=True)
def process_large_dataset(self, dataset_id):
"""Process large dataset with progress updates"""
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
channel_layer = get_channel_layer()
task_group = f'task_{self.request.id}'
try:
# Get dataset
dataset = Dataset.objects.get(id=dataset_id)
total_items = dataset.items.count()
processed = 0
for item in dataset.items.all():
# Process individual item
process_dataset_item(item)
processed += 1
# Send progress update
progress = int((processed / total_items) * 100)
async_to_sync(channel_layer.group_send)(task_group, {
'type': 'task_update',
'message': {
'type': 'progress',
'task_id': self.request.id,
'progress': progress,
'processed': processed,
'total': total_items
}
})
# Update task state
self.update_state(
state='PROGRESS',
meta={'current': processed, 'total': total_items}
)
return {
'status': 'completed',
'processed': processed,
'total': total_items
}
except Exception as exc:
# Send error update
async_to_sync(channel_layer.group_send)(task_group, {
'type': 'task_update',
'message': {
'type': 'error',
'task_id': self.request.id,
'error': str(exc)
}
})
raise
RQ provides a simpler alternative to Celery for background task processing, with excellent async Django integration.
# rq_config.py
import os
import redis
from rq import Queue, Worker
from django.conf import settings
# Redis connection
redis_conn = redis.Redis(
host=os.environ.get('REDIS_HOST', 'localhost'),
port=int(os.environ.get('REDIS_PORT', 6379)),
db=int(os.environ.get('REDIS_DB', 0)),
decode_responses=True
)
# Define queues
default_queue = Queue('default', connection=redis_conn)
email_queue = Queue('email', connection=redis_conn)
media_queue = Queue('media', connection=redis_conn)
reports_queue = Queue('reports', connection=redis_conn)
# Queue mapping
QUEUE_MAP = {
'default': default_queue,
'email': email_queue,
'media': media_queue,
'reports': reports_queue,
}
def get_queue(name='default'):
"""Get queue by name"""
return QUEUE_MAP.get(name, default_queue)
# rq_tasks.py
import asyncio
import aiohttp
from rq import get_current_job
from django.core.mail import send_mail
from asgiref.sync import sync_to_async
from .models import Document, User
def send_email_rq(subject, message, recipient_list):
"""RQ task for sending emails"""
job = get_current_job()
try:
send_mail(
subject=subject,
message=message,
from_email='noreply@example.com',
recipient_list=recipient_list,
fail_silently=False
)
return {
'job_id': job.id,
'status': 'completed',
'recipients': len(recipient_list)
}
except Exception as e:
return {
'job_id': job.id,
'status': 'failed',
'error': str(e)
}
def process_document_rq(document_id):
"""RQ task for document processing"""
job = get_current_job()
try:
# Run async processing
result = asyncio.run(process_document_async_rq(document_id))
return {
'job_id': job.id,
'document_id': document_id,
'status': 'completed',
'result': result
}
except Exception as e:
return {
'job_id': job.id,
'document_id': document_id,
'status': 'failed',
'error': str(e)
}
async def process_document_async_rq(document_id):
"""Async document processing for RQ"""
# Get document
document = await sync_to_async(Document.objects.get)(id=document_id)
# Process with external API
async with aiohttp.ClientSession() as session:
async with session.post(
'https://api.processor.com/process',
json={'document_id': document_id, 'content': document.content}
) as response:
result = await response.json()
# Update document
document.processed_data = result
await sync_to_async(document.save)()
return result
def batch_user_update_rq(user_ids, update_data):
"""RQ task for batch user updates"""
job = get_current_job()
try:
results = asyncio.run(process_users_batch(user_ids, update_data))
return {
'job_id': job.id,
'processed': len(results),
'successful': len([r for r in results if r.get('status') == 'success']),
'results': results
}
except Exception as e:
return {
'job_id': job.id,
'status': 'failed',
'error': str(e)
}
async def process_users_batch(user_ids, update_data):
"""Process users in batch asynchronously"""
results = []
for user_id in user_ids:
try:
user = await sync_to_async(User.objects.get)(id=user_id)
# Update user data
for field, value in update_data.items():
setattr(user, field, value)
await sync_to_async(user.save)()
results.append({
'user_id': user_id,
'status': 'success'
})
except Exception as e:
results.append({
'user_id': user_id,
'status': 'error',
'error': str(e)
})
return results
# rq_views.py
import json
from django.http import JsonResponse
from django.views import View
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
from .rq_config import get_queue
from .rq_tasks import send_email_rq, process_document_rq, batch_user_update_rq
@method_decorator(csrf_exempt, name='dispatch')
class RQTaskView(View):
"""Base view for RQ task management"""
async def dispatch(self, request, *args, **kwargs):
return await super().dispatch(request, *args, **kwargs)
class EnqueueEmailView(RQTaskView):
"""Enqueue email sending task"""
async def post(self, request):
try:
data = json.loads(request.body)
subject = data.get('subject')
message = data.get('message')
recipients = data.get('recipients', [])
if not all([subject, message, recipients]):
return JsonResponse({
'error': 'subject, message, and recipients required'
}, status=400)
# Enqueue email task
queue = get_queue('email')
job = queue.enqueue(
send_email_rq,
subject,
message,
recipients,
timeout='5m'
)
return JsonResponse({
'job_id': job.id,
'status': 'enqueued',
'queue': 'email',
'recipients': len(recipients)
})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
class EnqueueDocumentProcessingView(RQTaskView):
"""Enqueue document processing task"""
async def post(self, request):
try:
data = json.loads(request.body)
document_id = data.get('document_id')
if not document_id:
return JsonResponse({'error': 'document_id required'}, status=400)
# Enqueue processing task
queue = get_queue('media')
job = queue.enqueue(
process_document_rq,
document_id,
timeout='30m'
)
return JsonResponse({
'job_id': job.id,
'status': 'enqueued',
'queue': 'media',
'document_id': document_id
})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
class RQJobStatusView(RQTaskView):
"""Check RQ job status"""
async def get(self, request, job_id):
try:
from rq import Job
from .rq_config import redis_conn
# Get job
job = Job.fetch(job_id, connection=redis_conn)
response_data = {
'job_id': job_id,
'status': job.get_status(),
'created_at': job.created_at.isoformat() if job.created_at else None,
'started_at': job.started_at.isoformat() if job.started_at else None,
'ended_at': job.ended_at.isoformat() if job.ended_at else None,
}
if job.is_finished:
response_data['result'] = job.result
elif job.is_failed:
response_data['error'] = str(job.exc_info)
return JsonResponse(response_data)
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
WebSockets with Channels
Django Channels extends Django to handle WebSockets, HTTP/2, and other protocols beyond traditional HTTP. This enables building real-time applications like chat systems, live notifications, collaborative tools, and streaming dashboards. This chapter covers Channels architecture, WebSocket consumers, real-time communication patterns, and production deployment strategies.
Asynchronous Support
Django's asynchronous support enables building high-performance applications that can handle concurrent operations efficiently. This chapter covers async views, async safety considerations, and adapter functions that bridge synchronous and asynchronous code.