Asynchronous Django

WebSockets with Channels

Django Channels extends Django to handle WebSockets, HTTP/2, and other protocols beyond traditional HTTP. This enables building real-time applications like chat systems, live notifications, collaborative tools, and streaming dashboards. This chapter covers Channels architecture, WebSocket consumers, real-time communication patterns, and production deployment strategies.

WebSockets with Channels

Django Channels extends Django to handle WebSockets, HTTP/2, and other protocols beyond traditional HTTP. This enables building real-time applications like chat systems, live notifications, collaborative tools, and streaming dashboards. This chapter covers Channels architecture, WebSocket consumers, real-time communication patterns, and production deployment strategies.

Channels Architecture

Understanding Channels Components

# Channel layers provide the communication backbone
# Redis is the most common channel layer backend

# settings.py
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'channels',  # Add channels
    'chat',      # Your app with WebSocket consumers
]

# ASGI application configuration
ASGI_APPLICATION = 'myproject.asgi.application'

# Channel layer configuration
CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            "hosts": [('127.0.0.1', 6379)],
            "capacity": 1500,  # Maximum messages to store
            "expiry": 60,      # Message expiry in seconds
        },
    },
}

# For production with Redis Sentinel
CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            "hosts": [
                {
                    'sentinels': [
                        ('sentinel1.example.com', 26379),
                        ('sentinel2.example.com', 26379),
                        ('sentinel3.example.com', 26379),
                    ],
                    'master_name': 'mymaster',
                    'db': 0,
                }
            ],
            "capacity": 5000,
            "expiry": 300,
            "group_expiry": 86400,  # 24 hours
        },
    },
}

ASGI Routing Configuration

# asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from channels.security.websocket import AllowedHostsOriginValidator
import chat.routing

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

# Initialize Django ASGI application early
django_asgi_app = get_asgi_application()

application = ProtocolTypeRouter({
    # HTTP requests handled by Django
    "http": django_asgi_app,
    
    # WebSocket requests handled by Channels
    "websocket": AllowedHostsOriginValidator(
        AuthMiddlewareStack(
            URLRouter(
                chat.routing.websocket_urlpatterns
            )
        )
    ),
})

# chat/routing.py
from django.urls import re_path, path
from . import consumers

websocket_urlpatterns = [
    path('ws/chat/<str:room_name>/', consumers.ChatConsumer.as_asgi()),
    path('ws/notifications/', consumers.NotificationConsumer.as_asgi()),
    path('ws/live-updates/', consumers.LiveUpdateConsumer.as_asgi()),
    re_path(r'ws/user/(?P<user_id>\d+)/$', consumers.UserConsumer.as_asgi()),
]

WebSocket Consumers

Basic WebSocket Consumer

# chat/consumers.py
import json
import asyncio
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from django.contrib.auth.models import User
from .models import ChatRoom, Message

class ChatConsumer(AsyncWebsocketConsumer):
    """Basic chat consumer for real-time messaging."""
    
    async def connect(self):
        """Handle WebSocket connection."""
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = f'chat_{self.room_name}'
        self.user = self.scope['user']
        
        # Reject anonymous users
        if not self.user.is_authenticated:
            await self.close()
            return
        
        # Join room group
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )
        
        # Accept WebSocket connection
        await self.accept()
        
        # Notify room about user joining
        await self.channel_layer.group_send(
            self.room_group_name,
            {
                'type': 'user_joined',
                'user': self.user.username,
                'message': f'{self.user.username} joined the room'
            }
        )
        
        # Send recent messages to newly connected user
        await self.send_recent_messages()
    
    async def disconnect(self, close_code):
        """Handle WebSocket disconnection."""
        if hasattr(self, 'room_group_name'):
            # Notify room about user leaving
            await self.channel_layer.group_send(
                self.room_group_name,
                {
                    'type': 'user_left',
                    'user': self.user.username,
                    'message': f'{self.user.username} left the room'
                }
            )
            
            # Leave room group
            await self.channel_layer.group_discard(
                self.room_group_name,
                self.channel_name
            )
    
    async def receive(self, text_data):
        """Handle messages from WebSocket."""
        try:
            text_data_json = json.loads(text_data)
            message_type = text_data_json.get('type', 'message')
            
            if message_type == 'message':
                await self.handle_chat_message(text_data_json)
            elif message_type == 'typing':
                await self.handle_typing_indicator(text_data_json)
            elif message_type == 'ping':
                await self.handle_ping()
            else:
                await self.send_error('Unknown message type')
        
        except json.JSONDecodeError:
            await self.send_error('Invalid JSON')
        except Exception as e:
            await self.send_error(f'Error processing message: {str(e)}')
    
    async def handle_chat_message(self, data):
        """Handle chat message."""
        message_content = data.get('message', '').strip()
        
        if not message_content:
            await self.send_error('Empty message')
            return
        
        # Save message to database
        message = await self.save_message(message_content)
        
        if message:
            # Send message to room group
            await self.channel_layer.group_send(
                self.room_group_name,
                {
                    'type': 'chat_message',
                    'message': message_content,
                    'user': self.user.username,
                    'user_id': self.user.id,
                    'timestamp': message.created_at.isoformat(),
                    'message_id': message.id
                }
            )
    
    async def handle_typing_indicator(self, data):
        """Handle typing indicator."""
        is_typing = data.get('is_typing', False)
        
        # Send typing indicator to room group (excluding sender)
        await self.channel_layer.group_send(
            self.room_group_name,
            {
                'type': 'typing_indicator',
                'user': self.user.username,
                'user_id': self.user.id,
                'is_typing': is_typing
            }
        )
    
    async def handle_ping(self):
        """Handle ping message for connection keepalive."""
        await self.send(text_data=json.dumps({
            'type': 'pong',
            'timestamp': asyncio.get_event_loop().time()
        }))
    
    # Group message handlers
    async def chat_message(self, event):
        """Send chat message to WebSocket."""
        await self.send(text_data=json.dumps({
            'type': 'message',
            'message': event['message'],
            'user': event['user'],
            'user_id': event['user_id'],
            'timestamp': event['timestamp'],
            'message_id': event['message_id']
        }))
    
    async def user_joined(self, event):
        """Send user joined notification to WebSocket."""
        # Don't send to the user who joined
        if event['user'] != self.user.username:
            await self.send(text_data=json.dumps({
                'type': 'user_joined',
                'user': event['user'],
                'message': event['message']
            }))
    
    async def user_left(self, event):
        """Send user left notification to WebSocket."""
        # Don't send to the user who left (they're already disconnected)
        if event['user'] != self.user.username:
            await self.send(text_data=json.dumps({
                'type': 'user_left',
                'user': event['user'],
                'message': event['message']
            }))
    
    async def typing_indicator(self, event):
        """Send typing indicator to WebSocket."""
        # Don't send typing indicator back to the sender
        if event['user_id'] != self.user.id:
            await self.send(text_data=json.dumps({
                'type': 'typing',
                'user': event['user'],
                'is_typing': event['is_typing']
            }))
    
    # Helper methods
    async def send_error(self, error_message):
        """Send error message to WebSocket."""
        await self.send(text_data=json.dumps({
            'type': 'error',
            'message': error_message
        }))
    
    async def send_recent_messages(self):
        """Send recent messages to newly connected user."""
        messages = await self.get_recent_messages()
        
        for message in messages:
            await self.send(text_data=json.dumps({
                'type': 'message',
                'message': message.content,
                'user': message.user.username,
                'user_id': message.user.id,
                'timestamp': message.created_at.isoformat(),
                'message_id': message.id
            }))
    
    @database_sync_to_async
    def save_message(self, content):
        """Save message to database."""
        try:
            room, created = ChatRoom.objects.get_or_create(name=self.room_name)
            message = Message.objects.create(
                room=room,
                user=self.user,
                content=content
            )
            return message
        except Exception as e:
            print(f"Error saving message: {e}")
            return None
    
    @database_sync_to_async
    def get_recent_messages(self):
        """Get recent messages from database."""
        try:
            room = ChatRoom.objects.get(name=self.room_name)
            return list(
                room.messages.select_related('user')
                .order_by('-created_at')[:50]
                .reverse()
            )
        except ChatRoom.DoesNotExist:
            return []

Advanced WebSocket Consumer

# consumers.py
import json
import asyncio
from datetime import datetime, timedelta
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from django.core.cache import cache
from asgiref.sync import sync_to_async

class AdvancedChatConsumer(AsyncWebsocketConsumer):
    """Advanced chat consumer with additional features."""
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.heartbeat_task = None
        self.last_activity = None
        self.user_status = 'online'
    
    async def connect(self):
        """Enhanced connection handling."""
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = f'chat_{self.room_name}'
        self.user = self.scope['user']
        
        if not self.user.is_authenticated:
            await self.close(code=4001)  # Unauthorized
            return
        
        # Check if user is banned from room
        if await self.is_user_banned():
            await self.close(code=4003)  # Forbidden
            return
        
        # Rate limiting check
        if not await self.check_rate_limit():
            await self.close(code=4029)  # Too many requests
            return
        
        # Join room group
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )
        
        await self.accept()
        
        # Start heartbeat
        self.heartbeat_task = asyncio.create_task(self.heartbeat_loop())
        
        # Update user status
        await self.update_user_status('online')
        
        # Send user list and recent messages
        await self.send_user_list()
        await self.send_recent_messages()
        
        # Notify others
        await self.broadcast_user_status('joined')
    
    async def disconnect(self, close_code):
        """Enhanced disconnection handling."""
        # Cancel heartbeat
        if self.heartbeat_task:
            self.heartbeat_task.cancel()
        
        # Update user status
        await self.update_user_status('offline')
        
        # Notify others
        await self.broadcast_user_status('left')
        
        # Leave room group
        if hasattr(self, 'room_group_name'):
            await self.channel_layer.group_discard(
                self.room_group_name,
                self.channel_name
            )
    
    async def receive(self, text_data):
        """Enhanced message handling with validation."""
        self.last_activity = datetime.now()
        
        try:
            data = json.loads(text_data)
            message_type = data.get('type')
            
            # Validate message structure
            if not await self.validate_message(data):
                return
            
            # Route message based on type
            handlers = {
                'message': self.handle_chat_message,
                'typing': self.handle_typing,
                'reaction': self.handle_reaction,
                'edit': self.handle_edit_message,
                'delete': self.handle_delete_message,
                'ping': self.handle_ping,
                'status': self.handle_status_change,
            }
            
            handler = handlers.get(message_type)
            if handler:
                await handler(data)
            else:
                await self.send_error('Unknown message type', data.get('id'))
        
        except json.JSONDecodeError:
            await self.send_error('Invalid JSON format')
        except Exception as e:
            await self.send_error(f'Error processing message: {str(e)}')
    
    async def handle_chat_message(self, data):
        """Handle chat message with advanced features."""
        content = data.get('message', '').strip()
        reply_to = data.get('reply_to')
        
        if not content:
            await self.send_error('Empty message', data.get('id'))
            return
        
        # Content filtering
        if await self.is_content_blocked(content):
            await self.send_error('Message blocked by content filter', data.get('id'))
            return
        
        # Save message
        message = await self.save_message(content, reply_to)
        
        if message:
            # Broadcast message
            await self.channel_layer.group_send(
                self.room_group_name,
                {
                    'type': 'chat_message',
                    'message_id': message.id,
                    'content': content,
                    'user': self.user.username,
                    'user_id': self.user.id,
                    'timestamp': message.created_at.isoformat(),
                    'reply_to': reply_to,
                    'client_id': data.get('id')  # For client-side correlation
                }
            )
    
    async def handle_reaction(self, data):
        """Handle message reactions."""
        message_id = data.get('message_id')
        emoji = data.get('emoji')
        action = data.get('action', 'add')  # add or remove
        
        if not message_id or not emoji:
            await self.send_error('Invalid reaction data', data.get('id'))
            return
        
        # Save reaction
        success = await self.save_reaction(message_id, emoji, action)
        
        if success:
            # Broadcast reaction
            await self.channel_layer.group_send(
                self.room_group_name,
                {
                    'type': 'message_reaction',
                    'message_id': message_id,
                    'emoji': emoji,
                    'action': action,
                    'user': self.user.username,
                    'user_id': self.user.id
                }
            )
    
    async def handle_edit_message(self, data):
        """Handle message editing."""
        message_id = data.get('message_id')
        new_content = data.get('content', '').strip()
        
        if not message_id or not new_content:
            await self.send_error('Invalid edit data', data.get('id'))
            return
        
        # Check if user can edit this message
        if not await self.can_edit_message(message_id):
            await self.send_error('Cannot edit this message', data.get('id'))
            return
        
        # Update message
        success = await self.update_message(message_id, new_content)
        
        if success:
            # Broadcast edit
            await self.channel_layer.group_send(
                self.room_group_name,
                {
                    'type': 'message_edited',
                    'message_id': message_id,
                    'content': new_content,
                    'user': self.user.username,
                    'edited_at': datetime.now().isoformat()
                }
            )
    
    async def heartbeat_loop(self):
        """Maintain connection with periodic heartbeat."""
        try:
            while True:
                await asyncio.sleep(30)  # Send heartbeat every 30 seconds
                
                # Check for inactive connections
                if self.last_activity:
                    inactive_time = datetime.now() - self.last_activity
                    if inactive_time > timedelta(minutes=30):
                        await self.close(code=4000)  # Timeout
                        break
                
                # Send heartbeat
                await self.send(text_data=json.dumps({
                    'type': 'heartbeat',
                    'timestamp': datetime.now().isoformat()
                }))
        
        except asyncio.CancelledError:
            pass
    
    # Group message handlers
    async def chat_message(self, event):
        """Send chat message to WebSocket."""
        await self.send(text_data=json.dumps({
            'type': 'message',
            'message_id': event['message_id'],
            'content': event['content'],
            'user': event['user'],
            'user_id': event['user_id'],
            'timestamp': event['timestamp'],
            'reply_to': event.get('reply_to'),
            'client_id': event.get('client_id')
        }))
    
    async def message_reaction(self, event):
        """Send message reaction to WebSocket."""
        await self.send(text_data=json.dumps({
            'type': 'reaction',
            'message_id': event['message_id'],
            'emoji': event['emoji'],
            'action': event['action'],
            'user': event['user'],
            'user_id': event['user_id']
        }))
    
    async def message_edited(self, event):
        """Send message edit notification to WebSocket."""
        await self.send(text_data=json.dumps({
            'type': 'message_edited',
            'message_id': event['message_id'],
            'content': event['content'],
            'user': event['user'],
            'edited_at': event['edited_at']
        }))
    
    async def user_status_changed(self, event):
        """Send user status change to WebSocket."""
        if event['user_id'] != self.user.id:  # Don't send to self
            await self.send(text_data=json.dumps({
                'type': 'user_status',
                'user': event['user'],
                'status': event['status'],
                'timestamp': event['timestamp']
            }))
    
    # Helper methods
    async def validate_message(self, data):
        """Validate incoming message structure."""
        required_fields = ['type']
        
        for field in required_fields:
            if field not in data:
                await self.send_error(f'Missing required field: {field}', data.get('id'))
                return False
        
        return True
    
    @database_sync_to_async
    def is_user_banned(self):
        """Check if user is banned from room."""
        # Implement ban checking logic
        return False
    
    @sync_to_async
    def check_rate_limit(self):
        """Check rate limiting for user."""
        cache_key = f'rate_limit_{self.user.id}_{self.room_name}'
        current_count = cache.get(cache_key, 0)
        
        if current_count >= 100:  # Max 100 connections per hour
            return False
        
        cache.set(cache_key, current_count + 1, 3600)  # 1 hour
        return True
    
    @sync_to_async
    def is_content_blocked(self, content):
        """Check if content should be blocked."""
        # Implement content filtering logic
        blocked_words = ['spam', 'abuse']  # Example
        return any(word in content.lower() for word in blocked_words)
    
    @database_sync_to_async
    def save_message(self, content, reply_to=None):
        """Save message with reply support."""
        try:
            from .models import ChatRoom, Message
            
            room, created = ChatRoom.objects.get_or_create(name=self.room_name)
            
            reply_message = None
            if reply_to:
                try:
                    reply_message = Message.objects.get(id=reply_to, room=room)
                except Message.DoesNotExist:
                    pass
            
            message = Message.objects.create(
                room=room,
                user=self.user,
                content=content,
                reply_to=reply_message
            )
            return message
        except Exception as e:
            print(f"Error saving message: {e}")
            return None
    
    @database_sync_to_async
    def save_reaction(self, message_id, emoji, action):
        """Save message reaction."""
        try:
            from .models import Message, MessageReaction
            
            message = Message.objects.get(id=message_id)
            
            if action == 'add':
                reaction, created = MessageReaction.objects.get_or_create(
                    message=message,
                    user=self.user,
                    emoji=emoji
                )
                return True
            elif action == 'remove':
                MessageReaction.objects.filter(
                    message=message,
                    user=self.user,
                    emoji=emoji
                ).delete()
                return True
            
            return False
        except Exception as e:
            print(f"Error saving reaction: {e}")
            return False
    
    async def send_error(self, message, client_id=None):
        """Send error message to client."""
        error_data = {
            'type': 'error',
            'message': message
        }
        
        if client_id:
            error_data['client_id'] = client_id
        
        await self.send(text_data=json.dumps(error_data))

Real-Time Notifications

Notification Consumer

# consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from django.contrib.auth.models import User

class NotificationConsumer(AsyncWebsocketConsumer):
    """Consumer for real-time notifications."""
    
    async def connect(self):
        """Handle notification connection."""
        self.user = self.scope['user']
        
        if not self.user.is_authenticated:
            await self.close()
            return
        
        # Create user-specific group
        self.user_group_name = f'user_{self.user.id}'
        
        # Join user group
        await self.channel_layer.group_add(
            self.user_group_name,
            self.channel_name
        )
        
        await self.accept()
        
        # Send unread notifications count
        await self.send_unread_count()
    
    async def disconnect(self, close_code):
        """Handle notification disconnection."""
        if hasattr(self, 'user_group_name'):
            await self.channel_layer.group_discard(
                self.user_group_name,
                self.channel_name
            )
    
    async def receive(self, text_data):
        """Handle notification actions."""
        try:
            data = json.loads(text_data)
            action = data.get('action')
            
            if action == 'mark_read':
                await self.mark_notification_read(data.get('notification_id'))
            elif action == 'mark_all_read':
                await self.mark_all_notifications_read()
            elif action == 'get_notifications':
                await self.send_notifications()
        
        except json.JSONDecodeError:
            pass
    
    # Group message handlers
    async def notification_message(self, event):
        """Send notification to WebSocket."""
        await self.send(text_data=json.dumps({
            'type': 'notification',
            'id': event['notification_id'],
            'title': event['title'],
            'message': event['message'],
            'category': event.get('category', 'general'),
            'timestamp': event['timestamp'],
            'read': False
        }))
    
    async def notification_count_update(self, event):
        """Send updated notification count."""
        await self.send(text_data=json.dumps({
            'type': 'count_update',
            'unread_count': event['unread_count']
        }))
    
    # Helper methods
    @database_sync_to_async
    def send_unread_count(self):
        """Send current unread notifications count."""
        from .models import Notification
        
        unread_count = Notification.objects.filter(
            user=self.user,
            read=False
        ).count()
        
        asyncio.create_task(self.send(text_data=json.dumps({
            'type': 'count_update',
            'unread_count': unread_count
        })))
    
    @database_sync_to_async
    def mark_notification_read(self, notification_id):
        """Mark specific notification as read."""
        from .models import Notification
        
        try:
            notification = Notification.objects.get(
                id=notification_id,
                user=self.user
            )
            notification.read = True
            notification.save()
            
            # Send updated count
            asyncio.create_task(self.send_unread_count())
        except Notification.DoesNotExist:
            pass
    
    @database_sync_to_async
    def mark_all_notifications_read(self):
        """Mark all notifications as read."""
        from .models import Notification
        
        Notification.objects.filter(
            user=self.user,
            read=False
        ).update(read=True)
        
        # Send updated count
        asyncio.create_task(self.send_unread_count())
    
    @database_sync_to_async
    def send_notifications(self):
        """Send recent notifications."""
        from .models import Notification
        
        notifications = Notification.objects.filter(
            user=self.user
        ).order_by('-created_at')[:20]
        
        notifications_data = []
        for notification in notifications:
            notifications_data.append({
                'id': notification.id,
                'title': notification.title,
                'message': notification.message,
                'category': notification.category,
                'timestamp': notification.created_at.isoformat(),
                'read': notification.read
            })
        
        asyncio.create_task(self.send(text_data=json.dumps({
            'type': 'notifications_list',
            'notifications': notifications_data
        })))

# Utility function to send notifications
async def send_notification_to_user(user_id, title, message, category='general'):
    """Send notification to specific user."""
    from channels.layers import get_channel_layer
    from .models import Notification
    from asgiref.sync import sync_to_async
    
    # Save notification to database
    @sync_to_async
    def save_notification():
        user = User.objects.get(id=user_id)
        return Notification.objects.create(
            user=user,
            title=title,
            message=message,
            category=category
        )
    
    notification = await save_notification()
    
    # Send to WebSocket
    channel_layer = get_channel_layer()
    await channel_layer.group_send(
        f'user_{user_id}',
        {
            'type': 'notification_message',
            'notification_id': notification.id,
            'title': title,
            'message': message,
            'category': category,
            'timestamp': notification.created_at.isoformat()
        }
    )

Models for Real-Time Features

Chat and Notification Models

# models.py
from django.db import models
from django.contrib.auth.models import User
from django.utils import timezone

class ChatRoom(models.Model):
    """Chat room model."""
    name = models.CharField(max_length=100, unique=True)
    display_name = models.CharField(max_length=200)
    description = models.TextField(blank=True)
    created_at = models.DateTimeField(auto_now_add=True)
    is_private = models.BooleanField(default=False)
    max_users = models.IntegerField(default=100)
    
    class Meta:
        ordering = ['name']
    
    def __str__(self):
        return self.display_name or self.name

class Message(models.Model):
    """Chat message model."""
    room = models.ForeignKey(ChatRoom, on_delete=models.CASCADE, related_name='messages')
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    content = models.TextField()
    reply_to = models.ForeignKey('self', on_delete=models.CASCADE, null=True, blank=True)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    is_edited = models.BooleanField(default=False)
    is_deleted = models.BooleanField(default=False)
    
    class Meta:
        ordering = ['created_at']
    
    def __str__(self):
        return f'{self.user.username}: {self.content[:50]}'

class MessageReaction(models.Model):
    """Message reaction model."""
    message = models.ForeignKey(Message, on_delete=models.CASCADE, related_name='reactions')
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    emoji = models.CharField(max_length=10)
    created_at = models.DateTimeField(auto_now_add=True)
    
    class Meta:
        unique_together = ['message', 'user', 'emoji']

class Notification(models.Model):
    """Notification model."""
    CATEGORY_CHOICES = [
        ('general', 'General'),
        ('message', 'Message'),
        ('mention', 'Mention'),
        ('system', 'System'),
        ('warning', 'Warning'),
    ]
    
    user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='notifications')
    title = models.CharField(max_length=200)
    message = models.TextField()
    category = models.CharField(max_length=20, choices=CATEGORY_CHOICES, default='general')
    read = models.BooleanField(default=False)
    created_at = models.DateTimeField(auto_now_add=True)
    
    class Meta:
        ordering = ['-created_at']
    
    def __str__(self):
        return f'{self.user.username}: {self.title}'

class UserStatus(models.Model):
    """User online status model."""
    STATUS_CHOICES = [
        ('online', 'Online'),
        ('away', 'Away'),
        ('busy', 'Busy'),
        ('offline', 'Offline'),
    ]
    
    user = models.OneToOneField(User, on_delete=models.CASCADE)
    status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='offline')
    last_seen = models.DateTimeField(default=timezone.now)
    custom_message = models.CharField(max_length=100, blank=True)
    
    def __str__(self):
        return f'{self.user.username}: {self.status}'

Django Channels transforms Django into a powerful real-time application framework. The key to successful WebSocket implementation is understanding the consumer lifecycle, proper group management, and efficient message broadcasting. Start with simple chat functionality and gradually add advanced features like typing indicators, message reactions, and user presence as your real-time requirements evolve.