Django's Tasks framework provides a built-in solution for handling background tasks without requiring external dependencies like Celery or RQ. This framework enables you to offload time-consuming operations from request-response cycles, improving application performance and user experience.
Background tasks are operations that run outside the normal request-response cycle. They're essential for:
# Traditional approach with external task queue (Celery)
from celery import shared_task
@shared_task
def send_email_celery(user_id, subject, message):
"""Send email using Celery."""
user = User.objects.get(id=user_id)
send_mail(subject, message, 'from@example.com', [user.email])
# Django Tasks approach (built-in)
from django.tasks import task
@task
def send_email_django_task(user_id, subject, message):
"""Send email using Django Tasks."""
user = User.objects.get(id=user_id)
send_mail(subject, message, 'from@example.com', [user.email])
# Task lifecycle demonstration
from django.tasks import task, get_task_backend
from django.http import JsonResponse
import time
@task
def long_running_task(duration, task_name):
"""Demonstrate task lifecycle."""
print(f"Task {task_name} started")
for i in range(duration):
time.sleep(1)
print(f"Task {task_name} progress: {i+1}/{duration}")
result = f"Task {task_name} completed after {duration} seconds"
print(result)
return result
def start_task_view(request):
"""Start a background task."""
# Enqueue the task
task_result = long_running_task.delay(5, "example-task")
return JsonResponse({
'task_id': task_result.id,
'status': 'started',
'message': 'Task has been queued'
})
def check_task_status_view(request, task_id):
"""Check task status."""
backend = get_task_backend()
task_result = backend.get_result(task_id)
return JsonResponse({
'task_id': task_id,
'status': task_result.status,
'result': task_result.result if task_result.ready() else None,
'progress': task_result.progress if hasattr(task_result, 'progress') else None
})
# settings.py - Task backend configuration
# Basic configuration
TASKS = {
'default': {
'BACKEND': 'django.tasks.backends.database.DatabaseBackend',
'OPTIONS': {
'database_alias': 'default',
'table_name': 'django_tasks',
}
}
}
# Redis backend configuration
TASKS = {
'default': {
'BACKEND': 'django.tasks.backends.redis.RedisBackend',
'OPTIONS': {
'connection': {
'host': 'localhost',
'port': 6379,
'db': 0,
},
'key_prefix': 'django_tasks:',
}
}
}
# Memory backend (development only)
TASKS = {
'default': {
'BACKEND': 'django.tasks.backends.memory.MemoryBackend',
'OPTIONS': {
'max_tasks': 1000,
}
}
}
# Multiple backends
TASKS = {
'default': {
'BACKEND': 'django.tasks.backends.database.DatabaseBackend',
},
'redis': {
'BACKEND': 'django.tasks.backends.redis.RedisBackend',
'OPTIONS': {
'connection': {
'host': 'redis.example.com',
'port': 6379,
'db': 1,
}
}
},
'priority': {
'BACKEND': 'django.tasks.backends.database.DatabaseBackend',
'OPTIONS': {
'table_name': 'priority_tasks',
}
}
}
# Task routing
TASK_ROUTES = {
'myapp.tasks.send_email': {'backend': 'default'},
'myapp.tasks.process_image': {'backend': 'redis'},
'myapp.tasks.urgent_notification': {'backend': 'priority'},
}
# Worker configuration
TASK_WORKERS = {
'default': {
'concurrency': 4,
'max_tasks_per_worker': 100,
'timeout': 300, # 5 minutes
},
'redis': {
'concurrency': 8,
'max_tasks_per_worker': 50,
'timeout': 600, # 10 minutes
}
}
# Task serialization
TASK_SERIALIZER = 'json' # json, pickle, msgpack
# Task result expiration
TASK_RESULT_EXPIRES = 3600 # 1 hour
# Task retry configuration
TASK_DEFAULT_RETRY_DELAY = 60 # 1 minute
TASK_MAX_RETRIES = 3
# Monitoring and logging
TASK_SEND_EVENTS = True
TASK_TRACK_STARTED = True
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'handlers': {
'task_file': {
'level': 'INFO',
'class': 'logging.FileHandler',
'filename': 'tasks.log',
},
},
'loggers': {
'django.tasks': {
'handlers': ['task_file'],
'level': 'INFO',
'propagate': True,
},
},
}
# models.py - Custom task model for database backend
from django.db import models
from django.tasks.backends.database.models import TaskResult
class CustomTaskResult(TaskResult):
"""Extended task result model."""
priority = models.IntegerField(default=0)
category = models.CharField(max_length=50, blank=True)
created_by = models.ForeignKey(
'auth.User',
on_delete=models.SET_NULL,
null=True,
blank=True
)
class Meta:
db_table = 'custom_task_results'
indexes = [
models.Index(fields=['priority', 'status']),
models.Index(fields=['category', 'created_at']),
]
# Custom backend using extended model
class CustomDatabaseBackend(DatabaseBackend):
"""Custom database backend with extended functionality."""
def __init__(self, **options):
super().__init__(**options)
self.model = CustomTaskResult
def store_result(self, task_id, result, status, **kwargs):
"""Store task result with additional metadata."""
return super().store_result(
task_id,
result,
status,
priority=kwargs.get('priority', 0),
category=kwargs.get('category', ''),
created_by_id=kwargs.get('user_id')
)
# settings.py - Use custom backend
TASKS = {
'default': {
'BACKEND': 'myapp.backends.CustomDatabaseBackend',
'OPTIONS': {
'database_alias': 'default',
}
}
}
# Redis backend with advanced configuration
import redis
from django.conf import settings
# Redis connection pool
REDIS_POOL = redis.ConnectionPool(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DB,
max_connections=20,
retry_on_timeout=True,
socket_keepalive=True,
socket_keepalive_options={},
)
TASKS = {
'default': {
'BACKEND': 'django.tasks.backends.redis.RedisBackend',
'OPTIONS': {
'connection_pool': REDIS_POOL,
'key_prefix': 'tasks:',
'result_expires': 3600,
'compression': 'gzip', # None, gzip, lz4
}
}
}
# Redis Sentinel configuration for high availability
REDIS_SENTINELS = [
('sentinel1.example.com', 26379),
('sentinel2.example.com', 26379),
('sentinel3.example.com', 26379),
]
TASKS = {
'default': {
'BACKEND': 'django.tasks.backends.redis.RedisSentinelBackend',
'OPTIONS': {
'sentinels': REDIS_SENTINELS,
'service_name': 'mymaster',
'key_prefix': 'tasks:',
}
}
}
# Redis Cluster configuration
REDIS_CLUSTER_NODES = [
{'host': 'redis-node1.example.com', 'port': 7000},
{'host': 'redis-node2.example.com', 'port': 7000},
{'host': 'redis-node3.example.com', 'port': 7000},
]
TASKS = {
'default': {
'BACKEND': 'django.tasks.backends.redis.RedisClusterBackend',
'OPTIONS': {
'startup_nodes': REDIS_CLUSTER_NODES,
'key_prefix': 'tasks:',
}
}
}
# tasks.py - Basic task definitions
from django.tasks import task
from django.core.mail import send_mail
from django.contrib.auth.models import User
from .models import Post, UserProfile
import requests
import time
@task
def simple_task(message):
"""Simple task example."""
print(f"Processing: {message}")
time.sleep(2)
return f"Completed: {message}"
@task
def send_welcome_email(user_id):
"""Send welcome email to user."""
try:
user = User.objects.get(id=user_id)
subject = "Welcome to our platform!"
message = f"Hello {user.first_name or user.username}, welcome to our platform!"
send_mail(
subject=subject,
message=message,
from_email='noreply@example.com',
recipient_list=[user.email],
fail_silently=False
)
return f"Welcome email sent to {user.email}"
except User.DoesNotExist:
return f"User with ID {user_id} not found"
except Exception as e:
raise Exception(f"Failed to send email: {str(e)}")
@task
def process_user_data(user_id, data_updates):
"""Process user data updates."""
try:
user = User.objects.get(id=user_id)
# Update user fields
for field, value in data_updates.items():
if hasattr(user, field):
setattr(user, field, value)
user.save()
# Update or create profile
profile, created = UserProfile.objects.get_or_create(user=user)
profile_updates = data_updates.get('profile', {})
for field, value in profile_updates.items():
if hasattr(profile, field):
setattr(profile, field, value)
profile.save()
return {
'user_updated': True,
'profile_created': created,
'updated_fields': list(data_updates.keys())
}
except Exception as e:
raise Exception(f"Failed to process user data: {str(e)}")
@task
def fetch_external_data(api_url, params=None):
"""Fetch data from external API."""
try:
response = requests.get(api_url, params=params, timeout=30)
response.raise_for_status()
data = response.json()
# Process and store data
processed_count = 0
for item in data.get('items', []):
# Process each item
processed_count += 1
return {
'success': True,
'items_processed': processed_count,
'total_items': len(data.get('items', []))
}
except requests.RequestException as e:
raise Exception(f"API request failed: {str(e)}")
except Exception as e:
raise Exception(f"Data processing failed: {str(e)}")
# Advanced task definitions with options
from django.tasks import task
from django.tasks.exceptions import Retry, Ignore
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
@task(
bind=True,
max_retries=3,
default_retry_delay=60,
rate_limit='10/m', # 10 tasks per minute
time_limit=300, # 5 minutes
soft_time_limit=240 # 4 minutes
)
def robust_email_task(self, user_id, template_name, context=None):
"""Robust email sending task with retry logic."""
try:
user = User.objects.get(id=user_id)
# Render email template
from django.template.loader import render_to_string
html_content = render_to_string(
f'emails/{template_name}.html',
context or {}
)
# Send email
from django.core.mail import EmailMessage
email = EmailMessage(
subject=context.get('subject', 'Notification'),
body=html_content,
from_email='noreply@example.com',
to=[user.email]
)
email.content_subtype = 'html'
email.send()
logger.info(f"Email sent successfully to {user.email}")
return f"Email sent to {user.email}"
except User.DoesNotExist:
logger.error(f"User {user_id} not found")
raise Ignore(f"User {user_id} not found")
except Exception as exc:
logger.error(f"Email sending failed: {exc}")
# Retry with exponential backoff
retry_delay = 60 * (2 ** self.request.retries)
raise self.retry(
exc=exc,
countdown=retry_delay,
max_retries=3
)
@task(bind=True, queue='high_priority')
def urgent_notification_task(self, user_ids, message, notification_type='urgent'):
"""Send urgent notifications to multiple users."""
successful_sends = 0
failed_sends = 0
for user_id in user_ids:
try:
user = User.objects.get(id=user_id)
# Send notification via multiple channels
channels_sent = []
# Email notification
if user.email:
send_mail(
subject=f'Urgent: {notification_type}',
message=message,
from_email='urgent@example.com',
recipient_list=[user.email]
)
channels_sent.append('email')
# SMS notification (if configured)
if hasattr(user, 'profile') and user.profile.phone_number:
# send_sms(user.profile.phone_number, message)
channels_sent.append('sms')
# Push notification
# send_push_notification(user, message)
channels_sent.append('push')
successful_sends += 1
logger.info(f"Urgent notification sent to user {user_id} via {channels_sent}")
except User.DoesNotExist:
failed_sends += 1
logger.warning(f"User {user_id} not found for urgent notification")
except Exception as e:
failed_sends += 1
logger.error(f"Failed to send urgent notification to user {user_id}: {e}")
return {
'successful_sends': successful_sends,
'failed_sends': failed_sends,
'total_users': len(user_ids)
}
@task(bind=True)
def batch_process_posts(self, post_ids, operation, **kwargs):
"""Batch process multiple posts."""
results = {
'successful': [],
'failed': [],
'total': len(post_ids)
}
for i, post_id in enumerate(post_ids):
try:
# Update task progress
self.update_state(
state='PROGRESS',
meta={
'current': i + 1,
'total': len(post_ids),
'status': f'Processing post {post_id}'
}
)
post = Post.objects.get(id=post_id)
if operation == 'publish':
post.published = True
post.published_at = datetime.now()
elif operation == 'unpublish':
post.published = False
elif operation == 'feature':
post.featured = True
elif operation == 'update_category':
category_id = kwargs.get('category_id')
if category_id:
post.category_id = category_id
post.save()
results['successful'].append(post_id)
except Post.DoesNotExist:
results['failed'].append({
'post_id': post_id,
'error': 'Post not found'
})
except Exception as e:
results['failed'].append({
'post_id': post_id,
'error': str(e)
})
return results
@task(bind=True, time_limit=3600) # 1 hour limit
def generate_report(self, report_type, date_range, user_id):
"""Generate comprehensive report."""
try:
user = User.objects.get(id=user_id)
# Update progress
self.update_state(
state='PROGRESS',
meta={'status': 'Initializing report generation'}
)
from .reports import ReportGenerator
generator = ReportGenerator(report_type, date_range)
# Generate report sections
sections = ['summary', 'details', 'charts', 'export']
for i, section in enumerate(sections):
self.update_state(
state='PROGRESS',
meta={
'current': i + 1,
'total': len(sections),
'status': f'Generating {section} section'
}
)
generator.generate_section(section)
# Finalize report
report_path = generator.finalize()
# Send report to user
send_mail(
subject=f'Report Ready: {report_type}',
message=f'Your {report_type} report is ready for download.',
from_email='reports@example.com',
recipient_list=[user.email]
)
return {
'report_path': report_path,
'report_type': report_type,
'generated_for': user.email,
'sections_generated': len(sections)
}
except Exception as e:
logger.error(f"Report generation failed: {e}")
raise
# Class-based tasks for complex operations
from django.tasks import Task
from django.db import transaction
class DataMigrationTask(Task):
"""Class-based task for data migration."""
name = 'data_migration'
max_retries = 1
time_limit = 7200 # 2 hours
def run(self, migration_type, batch_size=1000, **kwargs):
"""Run data migration."""
self.migration_type = migration_type
self.batch_size = batch_size
if migration_type == 'user_profiles':
return self.migrate_user_profiles()
elif migration_type == 'post_categories':
return self.migrate_post_categories()
else:
raise ValueError(f"Unknown migration type: {migration_type}")
def migrate_user_profiles(self):
"""Migrate user profiles."""
users_without_profiles = User.objects.filter(userprofile__isnull=True)
total_users = users_without_profiles.count()
migrated_count = 0
for i in range(0, total_users, self.batch_size):
batch = users_without_profiles[i:i + self.batch_size]
with transaction.atomic():
profiles_to_create = []
for user in batch:
profiles_to_create.append(
UserProfile(
user=user,
bio='',
created_at=user.date_joined
)
)
UserProfile.objects.bulk_create(profiles_to_create)
migrated_count += len(profiles_to_create)
# Update progress
self.update_state(
state='PROGRESS',
meta={
'current': migrated_count,
'total': total_users,
'status': f'Migrated {migrated_count}/{total_users} profiles'
}
)
return {
'migration_type': 'user_profiles',
'total_migrated': migrated_count,
'batch_size': self.batch_size
}
def migrate_post_categories(self):
"""Migrate post categories."""
# Implementation for post category migration
pass
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Handle task failure."""
logger.error(f"Data migration failed: {exc}")
# Send failure notification
send_mail(
subject='Data Migration Failed',
message=f'Migration {self.migration_type} failed: {exc}',
from_email='admin@example.com',
recipient_list=['admin@example.com']
)
def on_success(self, retval, task_id, args, kwargs):
"""Handle task success."""
logger.info(f"Data migration completed: {retval}")
# Send success notification
send_mail(
subject='Data Migration Completed',
message=f'Migration completed successfully: {retval}',
from_email='admin@example.com',
recipient_list=['admin@example.com']
)
# Register class-based task
data_migration_task = DataMigrationTask()
class FileProcessingTask(Task):
"""Class-based task for file processing."""
name = 'file_processing'
bind = True
max_retries = 2
def run(self, file_path, processing_type, **options):
"""Process uploaded file."""
import os
from PIL import Image
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
if processing_type == 'image_resize':
return self.resize_image(file_path, **options)
elif processing_type == 'image_optimize':
return self.optimize_image(file_path, **options)
elif processing_type == 'generate_thumbnails':
return self.generate_thumbnails(file_path, **options)
else:
raise ValueError(f"Unknown processing type: {processing_type}")
def resize_image(self, file_path, width, height, **kwargs):
"""Resize image to specified dimensions."""
try:
with Image.open(file_path) as img:
# Resize image
resized_img = img.resize((width, height), Image.Resampling.LANCZOS)
# Save resized image
output_path = file_path.replace('.', f'_resized_{width}x{height}.')
resized_img.save(output_path, optimize=True, quality=85)
return {
'original_path': file_path,
'resized_path': output_path,
'original_size': img.size,
'new_size': (width, height)
}
except Exception as e:
raise Exception(f"Image resize failed: {str(e)}")
def optimize_image(self, file_path, quality=85, **kwargs):
"""Optimize image for web."""
try:
with Image.open(file_path) as img:
# Convert to RGB if necessary
if img.mode in ('RGBA', 'P'):
img = img.convert('RGB')
# Save optimized image
output_path = file_path.replace('.', '_optimized.')
img.save(output_path, optimize=True, quality=quality)
# Get file sizes
original_size = os.path.getsize(file_path)
optimized_size = os.path.getsize(output_path)
return {
'original_path': file_path,
'optimized_path': output_path,
'original_size': original_size,
'optimized_size': optimized_size,
'compression_ratio': (original_size - optimized_size) / original_size
}
except Exception as e:
raise Exception(f"Image optimization failed: {str(e)}")
def generate_thumbnails(self, file_path, sizes=None, **kwargs):
"""Generate multiple thumbnail sizes."""
if sizes is None:
sizes = [(150, 150), (300, 300), (600, 600)]
thumbnails = []
try:
with Image.open(file_path) as img:
for i, (width, height) in enumerate(sizes):
# Update progress
self.update_state(
state='PROGRESS',
meta={
'current': i + 1,
'total': len(sizes),
'status': f'Generating {width}x{height} thumbnail'
}
)
# Create thumbnail
thumbnail = img.copy()
thumbnail.thumbnail((width, height), Image.Resampling.LANCZOS)
# Save thumbnail
output_path = file_path.replace('.', f'_thumb_{width}x{height}.')
thumbnail.save(output_path, optimize=True, quality=85)
thumbnails.append({
'size': (width, height),
'path': output_path,
'file_size': os.path.getsize(output_path)
})
return {
'original_path': file_path,
'thumbnails': thumbnails,
'total_thumbnails': len(thumbnails)
}
except Exception as e:
raise Exception(f"Thumbnail generation failed: {str(e)}")
# Register class-based task
file_processing_task = FileProcessingTask()
# views.py - Enqueueing tasks from views
from django.http import JsonResponse
from django.shortcuts import get_object_or_404
from django.contrib.auth.decorators import login_required
from django.views.decorators.http import require_POST
from .tasks import (
send_welcome_email,
process_user_data,
batch_process_posts,
data_migration_task,
file_processing_task
)
import json
@require_POST
@login_required
def send_welcome_email_view(request):
"""Enqueue welcome email task."""
user_id = request.POST.get('user_id')
if not user_id:
return JsonResponse({'error': 'User ID required'}, status=400)
# Enqueue task
task_result = send_welcome_email.delay(user_id)
return JsonResponse({
'task_id': task_result.id,
'status': 'queued',
'message': 'Welcome email task queued successfully'
})
@require_POST
@login_required
def update_user_profile_view(request):
"""Enqueue user profile update task."""
try:
data = json.loads(request.body)
user_id = data.get('user_id')
updates = data.get('updates', {})
if not user_id:
return JsonResponse({'error': 'User ID required'}, status=400)
# Enqueue task with priority
task_result = process_user_data.apply_async(
args=[user_id, updates],
priority=5, # Higher priority
countdown=10 # Delay execution by 10 seconds
)
return JsonResponse({
'task_id': task_result.id,
'status': 'queued',
'message': 'User profile update queued'
})
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
@require_POST
@login_required
def batch_publish_posts_view(request):
"""Enqueue batch post publishing task."""
try:
data = json.loads(request.body)
post_ids = data.get('post_ids', [])
if not post_ids:
return JsonResponse({'error': 'Post IDs required'}, status=400)
# Enqueue batch task
task_result = batch_process_posts.delay(
post_ids,
'publish'
)
return JsonResponse({
'task_id': task_result.id,
'status': 'queued',
'total_posts': len(post_ids),
'message': 'Batch publish task queued'
})
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
def start_data_migration_view(request):
"""Start data migration task."""
migration_type = request.GET.get('type', 'user_profiles')
batch_size = int(request.GET.get('batch_size', 1000))
# Enqueue migration task
task_result = data_migration_task.delay(
migration_type=migration_type,
batch_size=batch_size
)
return JsonResponse({
'task_id': task_result.id,
'migration_type': migration_type,
'batch_size': batch_size,
'status': 'started'
})
@require_POST
def process_uploaded_file_view(request):
"""Process uploaded file."""
uploaded_file = request.FILES.get('file')
processing_type = request.POST.get('processing_type', 'image_resize')
if not uploaded_file:
return JsonResponse({'error': 'No file uploaded'}, status=400)
# Save uploaded file
import os
from django.conf import settings
upload_dir = os.path.join(settings.MEDIA_ROOT, 'uploads')
os.makedirs(upload_dir, exist_ok=True)
file_path = os.path.join(upload_dir, uploaded_file.name)
with open(file_path, 'wb+') as destination:
for chunk in uploaded_file.chunks():
destination.write(chunk)
# Enqueue processing task
options = {}
if processing_type == 'image_resize':
options = {
'width': int(request.POST.get('width', 800)),
'height': int(request.POST.get('height', 600))
}
elif processing_type == 'image_optimize':
options = {
'quality': int(request.POST.get('quality', 85))
}
task_result = file_processing_task.delay(
file_path=file_path,
processing_type=processing_type,
**options
)
return JsonResponse({
'task_id': task_result.id,
'file_name': uploaded_file.name,
'processing_type': processing_type,
'status': 'queued'
})
# Advanced task enqueueing patterns
from django.tasks import group, chain, chord
from datetime import datetime, timedelta
from .tasks import send_email_django_task, process_user_data, generate_report
def enqueue_with_scheduling():
"""Enqueue tasks with scheduling."""
# Immediate execution
task_result = send_welcome_email.delay(user_id=1)
# Delayed execution (countdown in seconds)
task_result = send_welcome_email.apply_async(
args=[1],
countdown=300 # Execute in 5 minutes
)
# Scheduled execution (specific datetime)
eta = datetime.now() + timedelta(hours=1)
task_result = send_welcome_email.apply_async(
args=[1],
eta=eta
)
# Periodic execution (requires scheduler)
from django.tasks.schedules import crontab
# This would typically be in settings or task configuration
periodic_task = {
'task': 'myapp.tasks.send_welcome_email',
'schedule': crontab(hour=9, minute=0), # Daily at 9 AM
'args': [1]
}
def enqueue_with_routing():
"""Enqueue tasks with specific routing."""
# Route to specific backend
task_result = send_welcome_email.apply_async(
args=[1],
backend='redis'
)
# Route to specific queue
task_result = send_welcome_email.apply_async(
args=[1],
queue='high_priority'
)
# Set task priority
task_result = send_welcome_email.apply_async(
args=[1],
priority=9 # Higher number = higher priority
)
def enqueue_task_groups():
"""Enqueue groups of related tasks."""
# Group - execute tasks in parallel
user_ids = [1, 2, 3, 4, 5]
job = group(
send_welcome_email.s(user_id) for user_id in user_ids
)
result = job.apply_async()
return {
'group_id': result.id,
'task_count': len(user_ids),
'status': 'queued'
}
def enqueue_task_chain():
"""Enqueue chained tasks (sequential execution)."""
# Chain - execute tasks sequentially
workflow = chain(
process_user_data.s(1, {'first_name': 'John'}),
send_welcome_email.s(), # Will receive result from previous task
generate_report.s('user_activity', '2024-01-01:2024-12-31')
)
result = workflow.apply_async()
return {
'chain_id': result.id,
'status': 'started'
}
def enqueue_task_chord():
"""Enqueue chord (group + callback)."""
# Chord - execute group in parallel, then callback with results
user_ids = [1, 2, 3, 4, 5]
# Header - tasks to execute in parallel
header = group(
process_user_data.s(user_id, {'status': 'processed'})
for user_id in user_ids
)
# Body - callback task that receives all results
callback = generate_report.s('batch_processing_summary')
# Create chord
chord_task = chord(header)(callback)
return {
'chord_id': chord_task.id,
'header_tasks': len(user_ids),
'status': 'started'
}
def enqueue_with_error_handling():
"""Enqueue tasks with error handling options."""
# Task with retry configuration
task_result = send_welcome_email.apply_async(
args=[1],
retry=True,
retry_policy={
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
}
)
# Task with custom error handling
task_result = process_user_data.apply_async(
args=[1, {'email': 'new@example.com'}],
link_error=handle_user_update_error.s()
)
return task_result
@task
def handle_user_update_error(task_id, error, traceback):
"""Handle user update errors."""
# Log error
logger.error(f"User update task {task_id} failed: {error}")
# Send notification to admin
send_mail(
subject='Task Failure Notification',
message=f'Task {task_id} failed with error: {error}',
from_email='admin@example.com',
recipient_list=['admin@example.com']
)
def enqueue_conditional_tasks():
"""Enqueue tasks based on conditions."""
# Conditional task execution
user_id = 1
user = User.objects.get(id=user_id)
if user.is_active and user.email:
# Send welcome email
welcome_task = send_welcome_email.delay(user_id)
# Chain additional tasks based on user type
if user.is_staff:
# Staff users get additional setup
setup_task = process_user_data.apply_async(
args=[user_id, {'role': 'staff'}],
link=welcome_task.id
)
return welcome_task.id
return None
def bulk_enqueue_tasks():
"""Efficiently enqueue multiple tasks."""
# Bulk task creation
user_ids = range(1, 1001) # 1000 users
# Method 1: Individual task enqueueing
task_ids = []
for user_id in user_ids:
task_result = send_welcome_email.delay(user_id)
task_ids.append(task_result.id)
# Method 2: Batch enqueueing (more efficient)
tasks = [
send_welcome_email.s(user_id)
for user_id in user_ids
]
# Enqueue in batches
batch_size = 100
batch_results = []
for i in range(0, len(tasks), batch_size):
batch = tasks[i:i + batch_size]
batch_group = group(batch)
batch_result = batch_group.apply_async()
batch_results.append(batch_result.id)
return {
'total_tasks': len(user_ids),
'batch_count': len(batch_results),
'batch_ids': batch_results
}
# views.py - Task result handling
from django.tasks import get_task_backend
from django.http import JsonResponse
from django.shortcuts import get_object_or_404
def get_task_status_view(request, task_id):
"""Get task status and result."""
backend = get_task_backend()
try:
task_result = backend.get_result(task_id)
response_data = {
'task_id': task_id,
'status': task_result.status,
'ready': task_result.ready(),
'successful': task_result.successful() if task_result.ready() else None,
'failed': task_result.failed() if task_result.ready() else None,
}
# Add result if task is complete
if task_result.ready():
if task_result.successful():
response_data['result'] = task_result.result
else:
response_data['error'] = str(task_result.result)
response_data['traceback'] = task_result.traceback
# Add progress information if available
if hasattr(task_result, 'info') and task_result.info:
response_data['progress'] = task_result.info
return JsonResponse(response_data)
except Exception as e:
return JsonResponse({
'task_id': task_id,
'error': f'Failed to retrieve task status: {str(e)}'
}, status=500)
def wait_for_task_result_view(request, task_id):
"""Wait for task result with timeout."""
timeout = int(request.GET.get('timeout', 30))
backend = get_task_backend()
try:
task_result = backend.get_result(task_id)
# Wait for result with timeout
result = task_result.get(timeout=timeout)
return JsonResponse({
'task_id': task_id,
'status': 'completed',
'result': result
})
except TimeoutError:
return JsonResponse({
'task_id': task_id,
'status': 'timeout',
'message': f'Task did not complete within {timeout} seconds'
}, status=408)
except Exception as e:
return JsonResponse({
'task_id': task_id,
'status': 'error',
'error': str(e)
}, status=500)
def revoke_task_view(request, task_id):
"""Revoke/cancel a task."""
backend = get_task_backend()
try:
# Revoke the task
backend.revoke(task_id, terminate=True)
return JsonResponse({
'task_id': task_id,
'status': 'revoked',
'message': 'Task has been cancelled'
})
except Exception as e:
return JsonResponse({
'task_id': task_id,
'error': f'Failed to revoke task: {str(e)}'
}, status=500)
def list_active_tasks_view(request):
"""List all active tasks."""
backend = get_task_backend()
try:
# Get active tasks (implementation depends on backend)
active_tasks = backend.get_active_tasks()
tasks_data = []
for task_info in active_tasks:
tasks_data.append({
'task_id': task_info['id'],
'name': task_info['name'],
'status': task_info['status'],
'started_at': task_info.get('started_at'),
'worker': task_info.get('worker')
})
return JsonResponse({
'active_tasks': tasks_data,
'count': len(tasks_data)
})
except Exception as e:
return JsonResponse({
'error': f'Failed to retrieve active tasks: {str(e)}'
}, status=500)
# Task result utilities and management
from django.tasks import get_task_backend
from django.core.management.base import BaseCommand
from datetime import datetime, timedelta
import json
class TaskResultManager:
"""Utility class for managing task results."""
def __init__(self, backend_name='default'):
self.backend = get_task_backend(backend_name)
def get_task_info(self, task_id):
"""Get comprehensive task information."""
try:
task_result = self.backend.get_result(task_id)
info = {
'task_id': task_id,
'status': task_result.status,
'ready': task_result.ready(),
'successful': task_result.successful() if task_result.ready() else None,
'failed': task_result.failed() if task_result.ready() else None,
'date_created': getattr(task_result, 'date_created', None),
'date_done': getattr(task_result, 'date_done', None),
}
if task_result.ready():
if task_result.successful():
info['result'] = task_result.result
else:
info['error'] = str(task_result.result)
info['traceback'] = getattr(task_result, 'traceback', None)
return info
except Exception as e:
return {
'task_id': task_id,
'error': f'Failed to get task info: {str(e)}'
}
def wait_for_multiple_tasks(self, task_ids, timeout=60):
"""Wait for multiple tasks to complete."""
results = {}
for task_id in task_ids:
try:
task_result = self.backend.get_result(task_id)
result = task_result.get(timeout=timeout)
results[task_id] = {
'status': 'completed',
'result': result
}
except TimeoutError:
results[task_id] = {
'status': 'timeout',
'error': f'Task did not complete within {timeout} seconds'
}
except Exception as e:
results[task_id] = {
'status': 'error',
'error': str(e)
}
return results
def cleanup_old_results(self, days_old=7):
"""Clean up old task results."""
cutoff_date = datetime.now() - timedelta(days=days_old)
# Implementation depends on backend
if hasattr(self.backend, 'cleanup_results'):
return self.backend.cleanup_results(cutoff_date)
else:
# Manual cleanup for database backend
from django.tasks.models import TaskResult
old_results = TaskResult.objects.filter(
date_created__lt=cutoff_date
)
count = old_results.count()
old_results.delete()
return count
def get_task_statistics(self):
"""Get task execution statistics."""
# Implementation depends on backend capabilities
stats = {
'total_tasks': 0,
'completed_tasks': 0,
'failed_tasks': 0,
'pending_tasks': 0,
'average_execution_time': 0
}
# This would need to be implemented based on your backend
return stats
# Management command for task result cleanup
class Command(BaseCommand):
"""Management command to clean up old task results."""
help = 'Clean up old task results'
def add_arguments(self, parser):
parser.add_argument(
'--days',
type=int,
default=7,
help='Number of days to keep results (default: 7)'
)
parser.add_argument(
'--backend',
type=str,
default='default',
help='Task backend to clean up (default: default)'
)
def handle(self, *args, **options):
days = options['days']
backend_name = options['backend']
manager = TaskResultManager(backend_name)
try:
cleaned_count = manager.cleanup_old_results(days)
self.stdout.write(
self.style.SUCCESS(
f'Successfully cleaned up {cleaned_count} old task results'
)
)
except Exception as e:
self.stdout.write(
self.style.ERROR(f'Failed to clean up task results: {e}')
)
# Task monitoring utilities
class TaskMonitor:
"""Monitor task execution and performance."""
def __init__(self):
self.backend = get_task_backend()
def monitor_task_progress(self, task_id, callback=None):
"""Monitor task progress with optional callback."""
task_result = self.backend.get_result(task_id)
while not task_result.ready():
# Get current status
status_info = {
'task_id': task_id,
'status': task_result.status,
'timestamp': datetime.now().isoformat()
}
# Add progress info if available
if hasattr(task_result, 'info') and task_result.info:
status_info['progress'] = task_result.info
# Call callback if provided
if callback:
callback(status_info)
# Wait before checking again
time.sleep(1)
# Task completed
final_status = {
'task_id': task_id,
'status': task_result.status,
'completed': True,
'successful': task_result.successful(),
'timestamp': datetime.now().isoformat()
}
if task_result.successful():
final_status['result'] = task_result.result
else:
final_status['error'] = str(task_result.result)
if callback:
callback(final_status)
return final_status
def get_worker_statistics(self):
"""Get worker performance statistics."""
# This would depend on your backend implementation
return {
'active_workers': 0,
'total_tasks_processed': 0,
'average_task_duration': 0,
'worker_load': {}
}
# WebSocket integration for real-time task updates
from channels.generic.websocket import AsyncWebsocketConsumer
import json
class TaskProgressConsumer(AsyncWebsocketConsumer):
"""WebSocket consumer for real-time task progress updates."""
async def connect(self):
"""Handle WebSocket connection."""
self.task_id = self.scope['url_route']['kwargs']['task_id']
self.group_name = f'task_{self.task_id}'
# Join task group
await self.channel_layer.group_add(
self.group_name,
self.channel_name
)
await self.accept()
# Send initial task status
await self.send_task_status()
async def disconnect(self, close_code):
"""Handle WebSocket disconnection."""
await self.channel_layer.group_discard(
self.group_name,
self.channel_name
)
async def send_task_status(self):
"""Send current task status."""
manager = TaskResultManager()
task_info = manager.get_task_info(self.task_id)
await self.send(text_data=json.dumps({
'type': 'task_status',
'data': task_info
}))
async def task_progress_update(self, event):
"""Handle task progress updates."""
await self.send(text_data=json.dumps({
'type': 'progress_update',
'data': event['data']
}))
Django's Tasks framework provides a powerful, built-in solution for handling background tasks without external dependencies. It offers flexible backend options, comprehensive task management capabilities, and seamless integration with Django's ecosystem. The framework is particularly well-suited for applications that need reliable task processing without the complexity of setting up external message brokers or task queues.
Asynchronous Support
Django's asynchronous support enables building high-performance applications that can handle concurrent operations efficiently. This chapter covers async views, async safety considerations, and adapter functions that bridge synchronous and asynchronous code.
Deployment
Deploying Django applications to production requires careful planning, proper configuration, and robust infrastructure. This comprehensive guide covers everything from preparing your application for production to implementing scalable deployment architectures, monitoring systems, and backup strategies. Whether you're deploying a simple web application or a complex microservices architecture, this section provides production-ready patterns and best practices.