Django Channels extends Django to handle WebSockets, HTTP/2, and other protocols beyond traditional HTTP. This enables building real-time applications like chat systems, live notifications, collaborative tools, and streaming dashboards. This chapter covers Channels architecture, WebSocket consumers, real-time communication patterns, and production deployment strategies.
# Channel layers provide the communication backbone
# Redis is the most common channel layer backend
# settings.py
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'channels', # Add channels
'chat', # Your app with WebSocket consumers
]
# ASGI application configuration
ASGI_APPLICATION = 'myproject.asgi.application'
# Channel layer configuration
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
"hosts": [('127.0.0.1', 6379)],
"capacity": 1500, # Maximum messages to store
"expiry": 60, # Message expiry in seconds
},
},
}
# For production with Redis Sentinel
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
"hosts": [
{
'sentinels': [
('sentinel1.example.com', 26379),
('sentinel2.example.com', 26379),
('sentinel3.example.com', 26379),
],
'master_name': 'mymaster',
'db': 0,
}
],
"capacity": 5000,
"expiry": 300,
"group_expiry": 86400, # 24 hours
},
},
}
# asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from channels.security.websocket import AllowedHostsOriginValidator
import chat.routing
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
# Initialize Django ASGI application early
django_asgi_app = get_asgi_application()
application = ProtocolTypeRouter({
# HTTP requests handled by Django
"http": django_asgi_app,
# WebSocket requests handled by Channels
"websocket": AllowedHostsOriginValidator(
AuthMiddlewareStack(
URLRouter(
chat.routing.websocket_urlpatterns
)
)
),
})
# chat/routing.py
from django.urls import re_path, path
from . import consumers
websocket_urlpatterns = [
path('ws/chat/<str:room_name>/', consumers.ChatConsumer.as_asgi()),
path('ws/notifications/', consumers.NotificationConsumer.as_asgi()),
path('ws/live-updates/', consumers.LiveUpdateConsumer.as_asgi()),
re_path(r'ws/user/(?P<user_id>\d+)/$', consumers.UserConsumer.as_asgi()),
]
# chat/consumers.py
import json
import asyncio
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from django.contrib.auth.models import User
from .models import ChatRoom, Message
class ChatConsumer(AsyncWebsocketConsumer):
"""Basic chat consumer for real-time messaging."""
async def connect(self):
"""Handle WebSocket connection."""
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = f'chat_{self.room_name}'
self.user = self.scope['user']
# Reject anonymous users
if not self.user.is_authenticated:
await self.close()
return
# Join room group
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
# Accept WebSocket connection
await self.accept()
# Notify room about user joining
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'user_joined',
'user': self.user.username,
'message': f'{self.user.username} joined the room'
}
)
# Send recent messages to newly connected user
await self.send_recent_messages()
async def disconnect(self, close_code):
"""Handle WebSocket disconnection."""
if hasattr(self, 'room_group_name'):
# Notify room about user leaving
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'user_left',
'user': self.user.username,
'message': f'{self.user.username} left the room'
}
)
# Leave room group
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
async def receive(self, text_data):
"""Handle messages from WebSocket."""
try:
text_data_json = json.loads(text_data)
message_type = text_data_json.get('type', 'message')
if message_type == 'message':
await self.handle_chat_message(text_data_json)
elif message_type == 'typing':
await self.handle_typing_indicator(text_data_json)
elif message_type == 'ping':
await self.handle_ping()
else:
await self.send_error('Unknown message type')
except json.JSONDecodeError:
await self.send_error('Invalid JSON')
except Exception as e:
await self.send_error(f'Error processing message: {str(e)}')
async def handle_chat_message(self, data):
"""Handle chat message."""
message_content = data.get('message', '').strip()
if not message_content:
await self.send_error('Empty message')
return
# Save message to database
message = await self.save_message(message_content)
if message:
# Send message to room group
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'chat_message',
'message': message_content,
'user': self.user.username,
'user_id': self.user.id,
'timestamp': message.created_at.isoformat(),
'message_id': message.id
}
)
async def handle_typing_indicator(self, data):
"""Handle typing indicator."""
is_typing = data.get('is_typing', False)
# Send typing indicator to room group (excluding sender)
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'typing_indicator',
'user': self.user.username,
'user_id': self.user.id,
'is_typing': is_typing
}
)
async def handle_ping(self):
"""Handle ping message for connection keepalive."""
await self.send(text_data=json.dumps({
'type': 'pong',
'timestamp': asyncio.get_event_loop().time()
}))
# Group message handlers
async def chat_message(self, event):
"""Send chat message to WebSocket."""
await self.send(text_data=json.dumps({
'type': 'message',
'message': event['message'],
'user': event['user'],
'user_id': event['user_id'],
'timestamp': event['timestamp'],
'message_id': event['message_id']
}))
async def user_joined(self, event):
"""Send user joined notification to WebSocket."""
# Don't send to the user who joined
if event['user'] != self.user.username:
await self.send(text_data=json.dumps({
'type': 'user_joined',
'user': event['user'],
'message': event['message']
}))
async def user_left(self, event):
"""Send user left notification to WebSocket."""
# Don't send to the user who left (they're already disconnected)
if event['user'] != self.user.username:
await self.send(text_data=json.dumps({
'type': 'user_left',
'user': event['user'],
'message': event['message']
}))
async def typing_indicator(self, event):
"""Send typing indicator to WebSocket."""
# Don't send typing indicator back to the sender
if event['user_id'] != self.user.id:
await self.send(text_data=json.dumps({
'type': 'typing',
'user': event['user'],
'is_typing': event['is_typing']
}))
# Helper methods
async def send_error(self, error_message):
"""Send error message to WebSocket."""
await self.send(text_data=json.dumps({
'type': 'error',
'message': error_message
}))
async def send_recent_messages(self):
"""Send recent messages to newly connected user."""
messages = await self.get_recent_messages()
for message in messages:
await self.send(text_data=json.dumps({
'type': 'message',
'message': message.content,
'user': message.user.username,
'user_id': message.user.id,
'timestamp': message.created_at.isoformat(),
'message_id': message.id
}))
@database_sync_to_async
def save_message(self, content):
"""Save message to database."""
try:
room, created = ChatRoom.objects.get_or_create(name=self.room_name)
message = Message.objects.create(
room=room,
user=self.user,
content=content
)
return message
except Exception as e:
print(f"Error saving message: {e}")
return None
@database_sync_to_async
def get_recent_messages(self):
"""Get recent messages from database."""
try:
room = ChatRoom.objects.get(name=self.room_name)
return list(
room.messages.select_related('user')
.order_by('-created_at')[:50]
.reverse()
)
except ChatRoom.DoesNotExist:
return []
# consumers.py
import json
import asyncio
from datetime import datetime, timedelta
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from django.core.cache import cache
from asgiref.sync import sync_to_async
class AdvancedChatConsumer(AsyncWebsocketConsumer):
"""Advanced chat consumer with additional features."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.heartbeat_task = None
self.last_activity = None
self.user_status = 'online'
async def connect(self):
"""Enhanced connection handling."""
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = f'chat_{self.room_name}'
self.user = self.scope['user']
if not self.user.is_authenticated:
await self.close(code=4001) # Unauthorized
return
# Check if user is banned from room
if await self.is_user_banned():
await self.close(code=4003) # Forbidden
return
# Rate limiting check
if not await self.check_rate_limit():
await self.close(code=4029) # Too many requests
return
# Join room group
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
await self.accept()
# Start heartbeat
self.heartbeat_task = asyncio.create_task(self.heartbeat_loop())
# Update user status
await self.update_user_status('online')
# Send user list and recent messages
await self.send_user_list()
await self.send_recent_messages()
# Notify others
await self.broadcast_user_status('joined')
async def disconnect(self, close_code):
"""Enhanced disconnection handling."""
# Cancel heartbeat
if self.heartbeat_task:
self.heartbeat_task.cancel()
# Update user status
await self.update_user_status('offline')
# Notify others
await self.broadcast_user_status('left')
# Leave room group
if hasattr(self, 'room_group_name'):
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
async def receive(self, text_data):
"""Enhanced message handling with validation."""
self.last_activity = datetime.now()
try:
data = json.loads(text_data)
message_type = data.get('type')
# Validate message structure
if not await self.validate_message(data):
return
# Route message based on type
handlers = {
'message': self.handle_chat_message,
'typing': self.handle_typing,
'reaction': self.handle_reaction,
'edit': self.handle_edit_message,
'delete': self.handle_delete_message,
'ping': self.handle_ping,
'status': self.handle_status_change,
}
handler = handlers.get(message_type)
if handler:
await handler(data)
else:
await self.send_error('Unknown message type', data.get('id'))
except json.JSONDecodeError:
await self.send_error('Invalid JSON format')
except Exception as e:
await self.send_error(f'Error processing message: {str(e)}')
async def handle_chat_message(self, data):
"""Handle chat message with advanced features."""
content = data.get('message', '').strip()
reply_to = data.get('reply_to')
if not content:
await self.send_error('Empty message', data.get('id'))
return
# Content filtering
if await self.is_content_blocked(content):
await self.send_error('Message blocked by content filter', data.get('id'))
return
# Save message
message = await self.save_message(content, reply_to)
if message:
# Broadcast message
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'chat_message',
'message_id': message.id,
'content': content,
'user': self.user.username,
'user_id': self.user.id,
'timestamp': message.created_at.isoformat(),
'reply_to': reply_to,
'client_id': data.get('id') # For client-side correlation
}
)
async def handle_reaction(self, data):
"""Handle message reactions."""
message_id = data.get('message_id')
emoji = data.get('emoji')
action = data.get('action', 'add') # add or remove
if not message_id or not emoji:
await self.send_error('Invalid reaction data', data.get('id'))
return
# Save reaction
success = await self.save_reaction(message_id, emoji, action)
if success:
# Broadcast reaction
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'message_reaction',
'message_id': message_id,
'emoji': emoji,
'action': action,
'user': self.user.username,
'user_id': self.user.id
}
)
async def handle_edit_message(self, data):
"""Handle message editing."""
message_id = data.get('message_id')
new_content = data.get('content', '').strip()
if not message_id or not new_content:
await self.send_error('Invalid edit data', data.get('id'))
return
# Check if user can edit this message
if not await self.can_edit_message(message_id):
await self.send_error('Cannot edit this message', data.get('id'))
return
# Update message
success = await self.update_message(message_id, new_content)
if success:
# Broadcast edit
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'message_edited',
'message_id': message_id,
'content': new_content,
'user': self.user.username,
'edited_at': datetime.now().isoformat()
}
)
async def heartbeat_loop(self):
"""Maintain connection with periodic heartbeat."""
try:
while True:
await asyncio.sleep(30) # Send heartbeat every 30 seconds
# Check for inactive connections
if self.last_activity:
inactive_time = datetime.now() - self.last_activity
if inactive_time > timedelta(minutes=30):
await self.close(code=4000) # Timeout
break
# Send heartbeat
await self.send(text_data=json.dumps({
'type': 'heartbeat',
'timestamp': datetime.now().isoformat()
}))
except asyncio.CancelledError:
pass
# Group message handlers
async def chat_message(self, event):
"""Send chat message to WebSocket."""
await self.send(text_data=json.dumps({
'type': 'message',
'message_id': event['message_id'],
'content': event['content'],
'user': event['user'],
'user_id': event['user_id'],
'timestamp': event['timestamp'],
'reply_to': event.get('reply_to'),
'client_id': event.get('client_id')
}))
async def message_reaction(self, event):
"""Send message reaction to WebSocket."""
await self.send(text_data=json.dumps({
'type': 'reaction',
'message_id': event['message_id'],
'emoji': event['emoji'],
'action': event['action'],
'user': event['user'],
'user_id': event['user_id']
}))
async def message_edited(self, event):
"""Send message edit notification to WebSocket."""
await self.send(text_data=json.dumps({
'type': 'message_edited',
'message_id': event['message_id'],
'content': event['content'],
'user': event['user'],
'edited_at': event['edited_at']
}))
async def user_status_changed(self, event):
"""Send user status change to WebSocket."""
if event['user_id'] != self.user.id: # Don't send to self
await self.send(text_data=json.dumps({
'type': 'user_status',
'user': event['user'],
'status': event['status'],
'timestamp': event['timestamp']
}))
# Helper methods
async def validate_message(self, data):
"""Validate incoming message structure."""
required_fields = ['type']
for field in required_fields:
if field not in data:
await self.send_error(f'Missing required field: {field}', data.get('id'))
return False
return True
@database_sync_to_async
def is_user_banned(self):
"""Check if user is banned from room."""
# Implement ban checking logic
return False
@sync_to_async
def check_rate_limit(self):
"""Check rate limiting for user."""
cache_key = f'rate_limit_{self.user.id}_{self.room_name}'
current_count = cache.get(cache_key, 0)
if current_count >= 100: # Max 100 connections per hour
return False
cache.set(cache_key, current_count + 1, 3600) # 1 hour
return True
@sync_to_async
def is_content_blocked(self, content):
"""Check if content should be blocked."""
# Implement content filtering logic
blocked_words = ['spam', 'abuse'] # Example
return any(word in content.lower() for word in blocked_words)
@database_sync_to_async
def save_message(self, content, reply_to=None):
"""Save message with reply support."""
try:
from .models import ChatRoom, Message
room, created = ChatRoom.objects.get_or_create(name=self.room_name)
reply_message = None
if reply_to:
try:
reply_message = Message.objects.get(id=reply_to, room=room)
except Message.DoesNotExist:
pass
message = Message.objects.create(
room=room,
user=self.user,
content=content,
reply_to=reply_message
)
return message
except Exception as e:
print(f"Error saving message: {e}")
return None
@database_sync_to_async
def save_reaction(self, message_id, emoji, action):
"""Save message reaction."""
try:
from .models import Message, MessageReaction
message = Message.objects.get(id=message_id)
if action == 'add':
reaction, created = MessageReaction.objects.get_or_create(
message=message,
user=self.user,
emoji=emoji
)
return True
elif action == 'remove':
MessageReaction.objects.filter(
message=message,
user=self.user,
emoji=emoji
).delete()
return True
return False
except Exception as e:
print(f"Error saving reaction: {e}")
return False
async def send_error(self, message, client_id=None):
"""Send error message to client."""
error_data = {
'type': 'error',
'message': message
}
if client_id:
error_data['client_id'] = client_id
await self.send(text_data=json.dumps(error_data))
# consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from django.contrib.auth.models import User
class NotificationConsumer(AsyncWebsocketConsumer):
"""Consumer for real-time notifications."""
async def connect(self):
"""Handle notification connection."""
self.user = self.scope['user']
if not self.user.is_authenticated:
await self.close()
return
# Create user-specific group
self.user_group_name = f'user_{self.user.id}'
# Join user group
await self.channel_layer.group_add(
self.user_group_name,
self.channel_name
)
await self.accept()
# Send unread notifications count
await self.send_unread_count()
async def disconnect(self, close_code):
"""Handle notification disconnection."""
if hasattr(self, 'user_group_name'):
await self.channel_layer.group_discard(
self.user_group_name,
self.channel_name
)
async def receive(self, text_data):
"""Handle notification actions."""
try:
data = json.loads(text_data)
action = data.get('action')
if action == 'mark_read':
await self.mark_notification_read(data.get('notification_id'))
elif action == 'mark_all_read':
await self.mark_all_notifications_read()
elif action == 'get_notifications':
await self.send_notifications()
except json.JSONDecodeError:
pass
# Group message handlers
async def notification_message(self, event):
"""Send notification to WebSocket."""
await self.send(text_data=json.dumps({
'type': 'notification',
'id': event['notification_id'],
'title': event['title'],
'message': event['message'],
'category': event.get('category', 'general'),
'timestamp': event['timestamp'],
'read': False
}))
async def notification_count_update(self, event):
"""Send updated notification count."""
await self.send(text_data=json.dumps({
'type': 'count_update',
'unread_count': event['unread_count']
}))
# Helper methods
@database_sync_to_async
def send_unread_count(self):
"""Send current unread notifications count."""
from .models import Notification
unread_count = Notification.objects.filter(
user=self.user,
read=False
).count()
asyncio.create_task(self.send(text_data=json.dumps({
'type': 'count_update',
'unread_count': unread_count
})))
@database_sync_to_async
def mark_notification_read(self, notification_id):
"""Mark specific notification as read."""
from .models import Notification
try:
notification = Notification.objects.get(
id=notification_id,
user=self.user
)
notification.read = True
notification.save()
# Send updated count
asyncio.create_task(self.send_unread_count())
except Notification.DoesNotExist:
pass
@database_sync_to_async
def mark_all_notifications_read(self):
"""Mark all notifications as read."""
from .models import Notification
Notification.objects.filter(
user=self.user,
read=False
).update(read=True)
# Send updated count
asyncio.create_task(self.send_unread_count())
@database_sync_to_async
def send_notifications(self):
"""Send recent notifications."""
from .models import Notification
notifications = Notification.objects.filter(
user=self.user
).order_by('-created_at')[:20]
notifications_data = []
for notification in notifications:
notifications_data.append({
'id': notification.id,
'title': notification.title,
'message': notification.message,
'category': notification.category,
'timestamp': notification.created_at.isoformat(),
'read': notification.read
})
asyncio.create_task(self.send(text_data=json.dumps({
'type': 'notifications_list',
'notifications': notifications_data
})))
# Utility function to send notifications
async def send_notification_to_user(user_id, title, message, category='general'):
"""Send notification to specific user."""
from channels.layers import get_channel_layer
from .models import Notification
from asgiref.sync import sync_to_async
# Save notification to database
@sync_to_async
def save_notification():
user = User.objects.get(id=user_id)
return Notification.objects.create(
user=user,
title=title,
message=message,
category=category
)
notification = await save_notification()
# Send to WebSocket
channel_layer = get_channel_layer()
await channel_layer.group_send(
f'user_{user_id}',
{
'type': 'notification_message',
'notification_id': notification.id,
'title': title,
'message': message,
'category': category,
'timestamp': notification.created_at.isoformat()
}
)
# models.py
from django.db import models
from django.contrib.auth.models import User
from django.utils import timezone
class ChatRoom(models.Model):
"""Chat room model."""
name = models.CharField(max_length=100, unique=True)
display_name = models.CharField(max_length=200)
description = models.TextField(blank=True)
created_at = models.DateTimeField(auto_now_add=True)
is_private = models.BooleanField(default=False)
max_users = models.IntegerField(default=100)
class Meta:
ordering = ['name']
def __str__(self):
return self.display_name or self.name
class Message(models.Model):
"""Chat message model."""
room = models.ForeignKey(ChatRoom, on_delete=models.CASCADE, related_name='messages')
user = models.ForeignKey(User, on_delete=models.CASCADE)
content = models.TextField()
reply_to = models.ForeignKey('self', on_delete=models.CASCADE, null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
is_edited = models.BooleanField(default=False)
is_deleted = models.BooleanField(default=False)
class Meta:
ordering = ['created_at']
def __str__(self):
return f'{self.user.username}: {self.content[:50]}'
class MessageReaction(models.Model):
"""Message reaction model."""
message = models.ForeignKey(Message, on_delete=models.CASCADE, related_name='reactions')
user = models.ForeignKey(User, on_delete=models.CASCADE)
emoji = models.CharField(max_length=10)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
unique_together = ['message', 'user', 'emoji']
class Notification(models.Model):
"""Notification model."""
CATEGORY_CHOICES = [
('general', 'General'),
('message', 'Message'),
('mention', 'Mention'),
('system', 'System'),
('warning', 'Warning'),
]
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='notifications')
title = models.CharField(max_length=200)
message = models.TextField()
category = models.CharField(max_length=20, choices=CATEGORY_CHOICES, default='general')
read = models.BooleanField(default=False)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
ordering = ['-created_at']
def __str__(self):
return f'{self.user.username}: {self.title}'
class UserStatus(models.Model):
"""User online status model."""
STATUS_CHOICES = [
('online', 'Online'),
('away', 'Away'),
('busy', 'Busy'),
('offline', 'Offline'),
]
user = models.OneToOneField(User, on_delete=models.CASCADE)
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='offline')
last_seen = models.DateTimeField(default=timezone.now)
custom_message = models.CharField(max_length=100, blank=True)
def __str__(self):
return f'{self.user.username}: {self.status}'
Django Channels transforms Django into a powerful real-time application framework. The key to successful WebSocket implementation is understanding the consumer lifecycle, proper group management, and efficient message broadcasting. Start with simple chat functionality and gradually add advanced features like typing indicators, message reactions, and user presence as your real-time requirements evolve.
Async ORM Status
Django's async ORM support has evolved significantly since Django 3.1, but it's still a work in progress. Understanding the current capabilities, limitations, and best practices for async database operations is crucial for building efficient async Django applications. This chapter covers the current state of async ORM, workarounds for limitations, and strategies for optimal database performance in async contexts.
Background Tasks with Celery or RQ
Background task processing is essential for building responsive async Django applications. While async views handle concurrent requests efficiently, long-running operations like file processing, email sending, data analysis, or external API calls should be offloaded to background workers. This chapter covers integrating Celery and RQ with async Django applications, implementing task queues, scheduling, monitoring, and async task patterns.