Asynchronous Django

Introduction to ASGI

ASGI (Asynchronous Server Gateway Interface) is the spiritual successor to WSGI, designed to handle both synchronous and asynchronous Python web applications. Understanding ASGI is fundamental to building modern Django applications that support WebSockets, HTTP/2, and high-concurrency scenarios while maintaining compatibility with traditional synchronous code.

Introduction to ASGI

ASGI (Asynchronous Server Gateway Interface) is the spiritual successor to WSGI, designed to handle both synchronous and asynchronous Python web applications. Understanding ASGI is fundamental to building modern Django applications that support WebSockets, HTTP/2, and high-concurrency scenarios while maintaining compatibility with traditional synchronous code.

Understanding ASGI

WSGI vs ASGI Comparison

# Traditional WSGI Application
def wsgi_application(environ, start_response):
    """Synchronous WSGI application."""
    status = '200 OK'
    headers = [('Content-Type', 'text/plain')]
    start_response(status, headers)
    return [b'Hello World']

# Modern ASGI Application
async def asgi_application(scope, receive, send):
    """Asynchronous ASGI application."""
    assert scope['type'] == 'http'
    
    await send({
        'type': 'http.response.start',
        'status': 200,
        'headers': [[b'content-type', b'text/plain']],
    })
    
    await send({
        'type': 'http.response.body',
        'body': b'Hello World',
    })

ASGI Scope Types

ASGI supports multiple protocol types through different scopes:

# HTTP Scope
http_scope = {
    'type': 'http',
    'method': 'GET',
    'path': '/api/users/',
    'query_string': b'page=1&limit=10',
    'headers': [[b'host', b'example.com']],
}

# WebSocket Scope
websocket_scope = {
    'type': 'websocket',
    'path': '/ws/chat/',
    'query_string': b'room=general',
    'headers': [[b'origin', b'https://example.com']],
}

# Lifespan Scope (for startup/shutdown events)
lifespan_scope = {
    'type': 'lifespan',
}

Django ASGI Configuration

Basic ASGI Setup

# asgi.py
import os
from django.core.asgi import get_asgi_application
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

# Initialize Django ASGI application early
django_asgi_app = get_asgi_application()

# Simple ASGI application
application = django_asgi_app

Advanced ASGI Configuration

# asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from channels.security.websocket import AllowedHostsOriginValidator
import chat.routing

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

# Initialize Django ASGI application
django_asgi_app = get_asgi_application()

application = ProtocolTypeRouter({
    # HTTP requests handled by Django
    "http": django_asgi_app,
    
    # WebSocket requests handled by Channels
    "websocket": AllowedHostsOriginValidator(
        AuthMiddlewareStack(
            URLRouter(
                chat.routing.websocket_urlpatterns
            )
        )
    ),
})

Custom ASGI Middleware

# middleware/asgi.py
import time
import logging
from django.utils.deprecation import MiddlewareMixin

logger = logging.getLogger(__name__)

class ASGILoggingMiddleware:
    """ASGI middleware for request logging."""
    
    def __init__(self, inner):
        self.inner = inner
    
    async def __call__(self, scope, receive, send):
        if scope['type'] == 'http':
            start_time = time.time()
            
            # Log request start
            logger.info(f"Request started: {scope['method']} {scope['path']}")
            
            # Wrap send to log response
            async def send_wrapper(message):
                if message['type'] == 'http.response.start':
                    duration = time.time() - start_time
                    logger.info(
                        f"Request completed: {scope['method']} {scope['path']} "
                        f"- {message['status']} ({duration:.3f}s)"
                    )
                await send(message)
            
            await self.inner(scope, receive, send_wrapper)
        else:
            await self.inner(scope, receive, send)

class ASGICORSMiddleware:
    """ASGI middleware for CORS handling."""
    
    def __init__(self, inner):
        self.inner = inner
        self.allowed_origins = ['http://localhost:3000', 'https://myapp.com']
        self.allowed_methods = ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS']
        self.allowed_headers = ['Content-Type', 'Authorization']
    
    async def __call__(self, scope, receive, send):
        if scope['type'] == 'http':
            # Handle CORS preflight requests
            if scope['method'] == 'OPTIONS':
                await self.handle_preflight(scope, receive, send)
                return
            
            # Add CORS headers to regular responses
            async def send_wrapper(message):
                if message['type'] == 'http.response.start':
                    headers = dict(message.get('headers', []))
                    
                    # Add CORS headers
                    origin = self.get_origin(scope)
                    if origin in self.allowed_origins:
                        headers[b'access-control-allow-origin'] = origin.encode()
                        headers[b'access-control-allow-credentials'] = b'true'
                    
                    message['headers'] = list(headers.items())
                
                await send(message)
            
            await self.inner(scope, receive, send_wrapper)
        else:
            await self.inner(scope, receive, send)
    
    async def handle_preflight(self, scope, receive, send):
        """Handle CORS preflight requests."""
        origin = self.get_origin(scope)
        
        if origin in self.allowed_origins:
            await send({
                'type': 'http.response.start',
                'status': 200,
                'headers': [
                    [b'access-control-allow-origin', origin.encode()],
                    [b'access-control-allow-methods', ', '.join(self.allowed_methods).encode()],
                    [b'access-control-allow-headers', ', '.join(self.allowed_headers).encode()],
                    [b'access-control-max-age', b'86400'],
                ],
            })
        else:
            await send({
                'type': 'http.response.start',
                'status': 403,
                'headers': [[b'content-type', b'text/plain']],
            })
        
        await send({
            'type': 'http.response.body',
            'body': b'',
        })
    
    def get_origin(self, scope):
        """Extract origin from request headers."""
        headers = dict(scope.get('headers', []))
        return headers.get(b'origin', b'').decode()

# Apply middleware in asgi.py
from .middleware.asgi import ASGILoggingMiddleware, ASGICORSMiddleware

application = ASGILoggingMiddleware(
    ASGICORSMiddleware(
        ProtocolTypeRouter({
            "http": django_asgi_app,
            "websocket": websocket_application,
        })
    )
)

ASGI Server Configuration

Uvicorn Configuration

# uvicorn_config.py
import multiprocessing

# Basic configuration
bind = "0.0.0.0:8000"
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "uvicorn.workers.UvicornWorker"
worker_connections = 1000
max_requests = 1000
max_requests_jitter = 100
timeout = 30
keepalive = 5

# SSL configuration
keyfile = "/path/to/keyfile.key"
certfile = "/path/to/certfile.crt"

# Logging
accesslog = "-"
errorlog = "-"
loglevel = "info"
access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(D)s'

# Advanced settings
preload_app = True
worker_tmp_dir = "/dev/shm"  # Use memory for worker temp files

Production Uvicorn Setup

# Install uvicorn with performance extras
pip install uvicorn[standard]

# Run with multiple workers
uvicorn myproject.asgi:application \
    --host 0.0.0.0 \
    --port 8000 \
    --workers 4 \
    --worker-class uvicorn.workers.UvicornWorker \
    --access-log \
    --loop uvloop \
    --http httptools

# Run with Gunicorn + Uvicorn workers
gunicorn myproject.asgi:application \
    -k uvicorn.workers.UvicornWorker \
    --bind 0.0.0.0:8000 \
    --workers 4 \
    --worker-connections 1000 \
    --max-requests 1000 \
    --timeout 30 \
    --access-logfile - \
    --error-logfile -

Hypercorn Configuration

# hypercorn_config.py
from hypercorn.config import Config

config = Config()

# Basic settings
config.bind = ["0.0.0.0:8000"]
config.workers = 4
config.worker_class = "asyncio"

# HTTP/2 support
config.h2 = True

# SSL settings
config.keyfile = "/path/to/keyfile.key"
config.certfile = "/path/to/certfile.crt"

# Performance settings
config.keep_alive_timeout = 5
config.max_requests = 1000
config.max_requests_jitter = 100

# Logging
config.accesslog = "-"
config.errorlog = "-"
config.loglevel = "INFO"

# WebSocket settings
config.websocket_ping_interval = 20
config.websocket_ping_timeout = 20

ASGI Application Patterns

Custom ASGI Application

# custom_asgi.py
import json
import asyncio
from urllib.parse import parse_qs

class CustomASGIApp:
    """Custom ASGI application for specific use cases."""
    
    def __init__(self):
        self.connections = set()
    
    async def __call__(self, scope, receive, send):
        if scope['type'] == 'http':
            await self.handle_http(scope, receive, send)
        elif scope['type'] == 'websocket':
            await self.handle_websocket(scope, receive, send)
        elif scope['type'] == 'lifespan':
            await self.handle_lifespan(scope, receive, send)
    
    async def handle_http(self, scope, receive, send):
        """Handle HTTP requests."""
        path = scope['path']
        method = scope['method']
        
        if path == '/health' and method == 'GET':
            await self.health_check(scope, receive, send)
        elif path == '/metrics' and method == 'GET':
            await self.metrics(scope, receive, send)
        else:
            await self.not_found(scope, receive, send)
    
    async def handle_websocket(self, scope, receive, send):
        """Handle WebSocket connections."""
        await send({'type': 'websocket.accept'})
        self.connections.add(send)
        
        try:
            while True:
                message = await receive()
                
                if message['type'] == 'websocket.disconnect':
                    break
                elif message['type'] == 'websocket.receive':
                    # Echo message to all connections
                    data = message.get('text', message.get('bytes'))
                    await self.broadcast(data)
        
        finally:
            self.connections.discard(send)
    
    async def handle_lifespan(self, scope, receive, send):
        """Handle application lifespan events."""
        message = await receive()
        
        if message['type'] == 'lifespan.startup':
            # Perform startup tasks
            await self.startup()
            await send({'type': 'lifespan.startup.complete'})
        elif message['type'] == 'lifespan.shutdown':
            # Perform cleanup tasks
            await self.shutdown()
            await send({'type': 'lifespan.shutdown.complete'})
    
    async def health_check(self, scope, receive, send):
        """Health check endpoint."""
        await send({
            'type': 'http.response.start',
            'status': 200,
            'headers': [[b'content-type', b'application/json']],
        })
        
        health_data = {
            'status': 'healthy',
            'connections': len(self.connections),
            'timestamp': asyncio.get_event_loop().time()
        }
        
        await send({
            'type': 'http.response.body',
            'body': json.dumps(health_data).encode(),
        })
    
    async def metrics(self, scope, receive, send):
        """Metrics endpoint."""
        await send({
            'type': 'http.response.start',
            'status': 200,
            'headers': [[b'content-type', b'text/plain']],
        })
        
        metrics = f"""
# HELP active_connections Number of active WebSocket connections
# TYPE active_connections gauge
active_connections {len(self.connections)}
        """.strip()
        
        await send({
            'type': 'http.response.body',
            'body': metrics.encode(),
        })
    
    async def not_found(self, scope, receive, send):
        """404 response."""
        await send({
            'type': 'http.response.start',
            'status': 404,
            'headers': [[b'content-type', b'text/plain']],
        })
        
        await send({
            'type': 'http.response.body',
            'body': b'Not Found',
        })
    
    async def broadcast(self, data):
        """Broadcast data to all connected WebSocket clients."""
        if self.connections:
            await asyncio.gather(
                *[self.send_to_connection(conn, data) for conn in self.connections],
                return_exceptions=True
            )
    
    async def send_to_connection(self, send, data):
        """Send data to a specific connection."""
        try:
            await send({
                'type': 'websocket.send',
                'text': data if isinstance(data, str) else data.decode()
            })
        except Exception:
            # Connection might be closed
            self.connections.discard(send)
    
    async def startup(self):
        """Application startup tasks."""
        print("ASGI application starting up...")
        # Initialize resources, connect to databases, etc.
    
    async def shutdown(self):
        """Application shutdown tasks."""
        print("ASGI application shutting down...")
        # Close connections, cleanup resources, etc.
        
        # Close all WebSocket connections
        if self.connections:
            await asyncio.gather(
                *[send({'type': 'websocket.close', 'code': 1000}) 
                  for send in self.connections],
                return_exceptions=True
            )

ASGI Routing

# routing.py
from django.urls import path, re_path
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from . import consumers

class ASGIRouter:
    """Custom ASGI router for complex routing needs."""
    
    def __init__(self):
        self.http_routes = {}
        self.websocket_routes = {}
    
    def add_http_route(self, path, handler):
        """Add HTTP route."""
        self.http_routes[path] = handler
    
    def add_websocket_route(self, path, consumer):
        """Add WebSocket route."""
        self.websocket_routes[path] = consumer
    
    async def __call__(self, scope, receive, send):
        if scope['type'] == 'http':
            await self.route_http(scope, receive, send)
        elif scope['type'] == 'websocket':
            await self.route_websocket(scope, receive, send)
    
    async def route_http(self, scope, receive, send):
        """Route HTTP requests."""
        path = scope['path']
        handler = self.http_routes.get(path)
        
        if handler:
            await handler(scope, receive, send)
        else:
            # Default to Django
            from django.core.asgi import get_asgi_application
            django_app = get_asgi_application()
            await django_app(scope, receive, send)
    
    async def route_websocket(self, scope, receive, send):
        """Route WebSocket connections."""
        path = scope['path']
        consumer = self.websocket_routes.get(path)
        
        if consumer:
            await consumer(scope, receive, send)
        else:
            # Reject connection
            await send({'type': 'websocket.close', 'code': 4004})

# Usage
router = ASGIRouter()
router.add_websocket_route('/ws/chat/', consumers.ChatConsumer.as_asgi())
router.add_websocket_route('/ws/notifications/', consumers.NotificationConsumer.as_asgi())

application = router

ASGI Testing

Testing ASGI Applications

# tests/test_asgi.py
import pytest
from channels.testing import HttpCommunicator, WebsocketCommunicator
from myproject.asgi import application

@pytest.mark.asyncio
async def test_http_endpoint():
    """Test HTTP endpoint through ASGI."""
    communicator = HttpCommunicator(application, "GET", "/health")
    response = await communicator.get_response()
    
    assert response['status'] == 200
    assert response['headers'][b'content-type'] == b'application/json'

@pytest.mark.asyncio
async def test_websocket_connection():
    """Test WebSocket connection through ASGI."""
    communicator = WebsocketCommunicator(application, "/ws/test/")
    
    connected, subprotocol = await communicator.connect()
    assert connected
    
    # Send message
    await communicator.send_to(text_data="Hello")
    
    # Receive response
    response = await communicator.receive_from()
    assert response == "Hello"
    
    # Disconnect
    await communicator.disconnect()

@pytest.mark.asyncio
async def test_lifespan_events():
    """Test ASGI lifespan events."""
    from channels.testing import ApplicationCommunicator
    
    communicator = ApplicationCommunicator(application, {"type": "lifespan"})
    
    # Test startup
    await communicator.send_input({"type": "lifespan.startup"})
    output = await communicator.receive_output()
    assert output["type"] == "lifespan.startup.complete"
    
    # Test shutdown
    await communicator.send_input({"type": "lifespan.shutdown"})
    output = await communicator.receive_output()
    assert output["type"] == "lifespan.shutdown.complete"

class TestCustomASGIApp:
    """Test custom ASGI application."""
    
    @pytest.mark.asyncio
    async def test_health_endpoint(self):
        """Test health check endpoint."""
        from .custom_asgi import CustomASGIApp
        
        app = CustomASGIApp()
        
        # Mock scope, receive, send
        scope = {
            'type': 'http',
            'method': 'GET',
            'path': '/health',
            'headers': [],
        }
        
        received_messages = []
        
        async def mock_receive():
            return {'type': 'http.request', 'body': b''}
        
        async def mock_send(message):
            received_messages.append(message)
        
        await app(scope, mock_receive, mock_send)
        
        # Verify response
        assert len(received_messages) == 2
        assert received_messages[0]['type'] == 'http.response.start'
        assert received_messages[0]['status'] == 200
        assert received_messages[1]['type'] == 'http.response.body'

ASGI Performance Testing

# performance/asgi_load_test.py
import asyncio
import aiohttp
import time
from concurrent.futures import ThreadPoolExecutor

async def make_request(session, url):
    """Make a single HTTP request."""
    try:
        async with session.get(url) as response:
            return response.status, await response.text()
    except Exception as e:
        return None, str(e)

async def load_test_http(url, concurrent_requests=100, total_requests=1000):
    """Load test HTTP endpoints."""
    connector = aiohttp.TCPConnector(limit=concurrent_requests)
    
    async with aiohttp.ClientSession(connector=connector) as session:
        start_time = time.time()
        
        # Create semaphore to limit concurrent requests
        semaphore = asyncio.Semaphore(concurrent_requests)
        
        async def bounded_request():
            async with semaphore:
                return await make_request(session, url)
        
        # Execute requests
        tasks = [bounded_request() for _ in range(total_requests)]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        end_time = time.time()
        duration = end_time - start_time
        
        # Analyze results
        successful = sum(1 for status, _ in results if status == 200)
        failed = total_requests - successful
        rps = total_requests / duration
        
        print(f"Load Test Results:")
        print(f"  Total requests: {total_requests}")
        print(f"  Concurrent requests: {concurrent_requests}")
        print(f"  Duration: {duration:.2f}s")
        print(f"  Successful: {successful}")
        print(f"  Failed: {failed}")
        print(f"  Requests per second: {rps:.2f}")
        
        return {
            'total_requests': total_requests,
            'successful': successful,
            'failed': failed,
            'duration': duration,
            'rps': rps
        }

async def load_test_websocket(url, concurrent_connections=100, messages_per_connection=10):
    """Load test WebSocket endpoints."""
    import websockets
    
    async def websocket_client():
        try:
            async with websockets.connect(url) as websocket:
                for i in range(messages_per_connection):
                    await websocket.send(f"Message {i}")
                    response = await websocket.recv()
                return True
        except Exception:
            return False
    
    start_time = time.time()
    
    # Create concurrent connections
    tasks = [websocket_client() for _ in range(concurrent_connections)]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    end_time = time.time()
    duration = end_time - start_time
    
    successful = sum(1 for result in results if result is True)
    failed = concurrent_connections - successful
    
    print(f"WebSocket Load Test Results:")
    print(f"  Concurrent connections: {concurrent_connections}")
    print(f"  Messages per connection: {messages_per_connection}")
    print(f"  Duration: {duration:.2f}s")
    print(f"  Successful connections: {successful}")
    print(f"  Failed connections: {failed}")
    
    return {
        'concurrent_connections': concurrent_connections,
        'successful': successful,
        'failed': failed,
        'duration': duration
    }

# Run load tests
if __name__ == "__main__":
    # HTTP load test
    asyncio.run(load_test_http("http://localhost:8000/api/health/"))
    
    # WebSocket load test
    asyncio.run(load_test_websocket("ws://localhost:8000/ws/test/"))

ASGI represents a fundamental shift in how Python web applications handle concurrency and real-time features. Understanding ASGI's architecture, middleware patterns, and server configuration is essential for building modern Django applications that can handle thousands of concurrent connections while maintaining excellent performance and reliability.