ASGI (Asynchronous Server Gateway Interface) is the spiritual successor to WSGI, designed to handle both synchronous and asynchronous Python web applications. Understanding ASGI is fundamental to building modern Django applications that support WebSockets, HTTP/2, and high-concurrency scenarios while maintaining compatibility with traditional synchronous code.
# Traditional WSGI Application
def wsgi_application(environ, start_response):
"""Synchronous WSGI application."""
status = '200 OK'
headers = [('Content-Type', 'text/plain')]
start_response(status, headers)
return [b'Hello World']
# Modern ASGI Application
async def asgi_application(scope, receive, send):
"""Asynchronous ASGI application."""
assert scope['type'] == 'http'
await send({
'type': 'http.response.start',
'status': 200,
'headers': [[b'content-type', b'text/plain']],
})
await send({
'type': 'http.response.body',
'body': b'Hello World',
})
ASGI supports multiple protocol types through different scopes:
# HTTP Scope
http_scope = {
'type': 'http',
'method': 'GET',
'path': '/api/users/',
'query_string': b'page=1&limit=10',
'headers': [[b'host', b'example.com']],
}
# WebSocket Scope
websocket_scope = {
'type': 'websocket',
'path': '/ws/chat/',
'query_string': b'room=general',
'headers': [[b'origin', b'https://example.com']],
}
# Lifespan Scope (for startup/shutdown events)
lifespan_scope = {
'type': 'lifespan',
}
# asgi.py
import os
from django.core.asgi import get_asgi_application
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
# Initialize Django ASGI application early
django_asgi_app = get_asgi_application()
# Simple ASGI application
application = django_asgi_app
# asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from channels.security.websocket import AllowedHostsOriginValidator
import chat.routing
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
# Initialize Django ASGI application
django_asgi_app = get_asgi_application()
application = ProtocolTypeRouter({
# HTTP requests handled by Django
"http": django_asgi_app,
# WebSocket requests handled by Channels
"websocket": AllowedHostsOriginValidator(
AuthMiddlewareStack(
URLRouter(
chat.routing.websocket_urlpatterns
)
)
),
})
# middleware/asgi.py
import time
import logging
from django.utils.deprecation import MiddlewareMixin
logger = logging.getLogger(__name__)
class ASGILoggingMiddleware:
"""ASGI middleware for request logging."""
def __init__(self, inner):
self.inner = inner
async def __call__(self, scope, receive, send):
if scope['type'] == 'http':
start_time = time.time()
# Log request start
logger.info(f"Request started: {scope['method']} {scope['path']}")
# Wrap send to log response
async def send_wrapper(message):
if message['type'] == 'http.response.start':
duration = time.time() - start_time
logger.info(
f"Request completed: {scope['method']} {scope['path']} "
f"- {message['status']} ({duration:.3f}s)"
)
await send(message)
await self.inner(scope, receive, send_wrapper)
else:
await self.inner(scope, receive, send)
class ASGICORSMiddleware:
"""ASGI middleware for CORS handling."""
def __init__(self, inner):
self.inner = inner
self.allowed_origins = ['http://localhost:3000', 'https://myapp.com']
self.allowed_methods = ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS']
self.allowed_headers = ['Content-Type', 'Authorization']
async def __call__(self, scope, receive, send):
if scope['type'] == 'http':
# Handle CORS preflight requests
if scope['method'] == 'OPTIONS':
await self.handle_preflight(scope, receive, send)
return
# Add CORS headers to regular responses
async def send_wrapper(message):
if message['type'] == 'http.response.start':
headers = dict(message.get('headers', []))
# Add CORS headers
origin = self.get_origin(scope)
if origin in self.allowed_origins:
headers[b'access-control-allow-origin'] = origin.encode()
headers[b'access-control-allow-credentials'] = b'true'
message['headers'] = list(headers.items())
await send(message)
await self.inner(scope, receive, send_wrapper)
else:
await self.inner(scope, receive, send)
async def handle_preflight(self, scope, receive, send):
"""Handle CORS preflight requests."""
origin = self.get_origin(scope)
if origin in self.allowed_origins:
await send({
'type': 'http.response.start',
'status': 200,
'headers': [
[b'access-control-allow-origin', origin.encode()],
[b'access-control-allow-methods', ', '.join(self.allowed_methods).encode()],
[b'access-control-allow-headers', ', '.join(self.allowed_headers).encode()],
[b'access-control-max-age', b'86400'],
],
})
else:
await send({
'type': 'http.response.start',
'status': 403,
'headers': [[b'content-type', b'text/plain']],
})
await send({
'type': 'http.response.body',
'body': b'',
})
def get_origin(self, scope):
"""Extract origin from request headers."""
headers = dict(scope.get('headers', []))
return headers.get(b'origin', b'').decode()
# Apply middleware in asgi.py
from .middleware.asgi import ASGILoggingMiddleware, ASGICORSMiddleware
application = ASGILoggingMiddleware(
ASGICORSMiddleware(
ProtocolTypeRouter({
"http": django_asgi_app,
"websocket": websocket_application,
})
)
)
# uvicorn_config.py
import multiprocessing
# Basic configuration
bind = "0.0.0.0:8000"
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "uvicorn.workers.UvicornWorker"
worker_connections = 1000
max_requests = 1000
max_requests_jitter = 100
timeout = 30
keepalive = 5
# SSL configuration
keyfile = "/path/to/keyfile.key"
certfile = "/path/to/certfile.crt"
# Logging
accesslog = "-"
errorlog = "-"
loglevel = "info"
access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(D)s'
# Advanced settings
preload_app = True
worker_tmp_dir = "/dev/shm" # Use memory for worker temp files
# Install uvicorn with performance extras
pip install uvicorn[standard]
# Run with multiple workers
uvicorn myproject.asgi:application \
--host 0.0.0.0 \
--port 8000 \
--workers 4 \
--worker-class uvicorn.workers.UvicornWorker \
--access-log \
--loop uvloop \
--http httptools
# Run with Gunicorn + Uvicorn workers
gunicorn myproject.asgi:application \
-k uvicorn.workers.UvicornWorker \
--bind 0.0.0.0:8000 \
--workers 4 \
--worker-connections 1000 \
--max-requests 1000 \
--timeout 30 \
--access-logfile - \
--error-logfile -
# hypercorn_config.py
from hypercorn.config import Config
config = Config()
# Basic settings
config.bind = ["0.0.0.0:8000"]
config.workers = 4
config.worker_class = "asyncio"
# HTTP/2 support
config.h2 = True
# SSL settings
config.keyfile = "/path/to/keyfile.key"
config.certfile = "/path/to/certfile.crt"
# Performance settings
config.keep_alive_timeout = 5
config.max_requests = 1000
config.max_requests_jitter = 100
# Logging
config.accesslog = "-"
config.errorlog = "-"
config.loglevel = "INFO"
# WebSocket settings
config.websocket_ping_interval = 20
config.websocket_ping_timeout = 20
# custom_asgi.py
import json
import asyncio
from urllib.parse import parse_qs
class CustomASGIApp:
"""Custom ASGI application for specific use cases."""
def __init__(self):
self.connections = set()
async def __call__(self, scope, receive, send):
if scope['type'] == 'http':
await self.handle_http(scope, receive, send)
elif scope['type'] == 'websocket':
await self.handle_websocket(scope, receive, send)
elif scope['type'] == 'lifespan':
await self.handle_lifespan(scope, receive, send)
async def handle_http(self, scope, receive, send):
"""Handle HTTP requests."""
path = scope['path']
method = scope['method']
if path == '/health' and method == 'GET':
await self.health_check(scope, receive, send)
elif path == '/metrics' and method == 'GET':
await self.metrics(scope, receive, send)
else:
await self.not_found(scope, receive, send)
async def handle_websocket(self, scope, receive, send):
"""Handle WebSocket connections."""
await send({'type': 'websocket.accept'})
self.connections.add(send)
try:
while True:
message = await receive()
if message['type'] == 'websocket.disconnect':
break
elif message['type'] == 'websocket.receive':
# Echo message to all connections
data = message.get('text', message.get('bytes'))
await self.broadcast(data)
finally:
self.connections.discard(send)
async def handle_lifespan(self, scope, receive, send):
"""Handle application lifespan events."""
message = await receive()
if message['type'] == 'lifespan.startup':
# Perform startup tasks
await self.startup()
await send({'type': 'lifespan.startup.complete'})
elif message['type'] == 'lifespan.shutdown':
# Perform cleanup tasks
await self.shutdown()
await send({'type': 'lifespan.shutdown.complete'})
async def health_check(self, scope, receive, send):
"""Health check endpoint."""
await send({
'type': 'http.response.start',
'status': 200,
'headers': [[b'content-type', b'application/json']],
})
health_data = {
'status': 'healthy',
'connections': len(self.connections),
'timestamp': asyncio.get_event_loop().time()
}
await send({
'type': 'http.response.body',
'body': json.dumps(health_data).encode(),
})
async def metrics(self, scope, receive, send):
"""Metrics endpoint."""
await send({
'type': 'http.response.start',
'status': 200,
'headers': [[b'content-type', b'text/plain']],
})
metrics = f"""
# HELP active_connections Number of active WebSocket connections
# TYPE active_connections gauge
active_connections {len(self.connections)}
""".strip()
await send({
'type': 'http.response.body',
'body': metrics.encode(),
})
async def not_found(self, scope, receive, send):
"""404 response."""
await send({
'type': 'http.response.start',
'status': 404,
'headers': [[b'content-type', b'text/plain']],
})
await send({
'type': 'http.response.body',
'body': b'Not Found',
})
async def broadcast(self, data):
"""Broadcast data to all connected WebSocket clients."""
if self.connections:
await asyncio.gather(
*[self.send_to_connection(conn, data) for conn in self.connections],
return_exceptions=True
)
async def send_to_connection(self, send, data):
"""Send data to a specific connection."""
try:
await send({
'type': 'websocket.send',
'text': data if isinstance(data, str) else data.decode()
})
except Exception:
# Connection might be closed
self.connections.discard(send)
async def startup(self):
"""Application startup tasks."""
print("ASGI application starting up...")
# Initialize resources, connect to databases, etc.
async def shutdown(self):
"""Application shutdown tasks."""
print("ASGI application shutting down...")
# Close connections, cleanup resources, etc.
# Close all WebSocket connections
if self.connections:
await asyncio.gather(
*[send({'type': 'websocket.close', 'code': 1000})
for send in self.connections],
return_exceptions=True
)
# routing.py
from django.urls import path, re_path
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from . import consumers
class ASGIRouter:
"""Custom ASGI router for complex routing needs."""
def __init__(self):
self.http_routes = {}
self.websocket_routes = {}
def add_http_route(self, path, handler):
"""Add HTTP route."""
self.http_routes[path] = handler
def add_websocket_route(self, path, consumer):
"""Add WebSocket route."""
self.websocket_routes[path] = consumer
async def __call__(self, scope, receive, send):
if scope['type'] == 'http':
await self.route_http(scope, receive, send)
elif scope['type'] == 'websocket':
await self.route_websocket(scope, receive, send)
async def route_http(self, scope, receive, send):
"""Route HTTP requests."""
path = scope['path']
handler = self.http_routes.get(path)
if handler:
await handler(scope, receive, send)
else:
# Default to Django
from django.core.asgi import get_asgi_application
django_app = get_asgi_application()
await django_app(scope, receive, send)
async def route_websocket(self, scope, receive, send):
"""Route WebSocket connections."""
path = scope['path']
consumer = self.websocket_routes.get(path)
if consumer:
await consumer(scope, receive, send)
else:
# Reject connection
await send({'type': 'websocket.close', 'code': 4004})
# Usage
router = ASGIRouter()
router.add_websocket_route('/ws/chat/', consumers.ChatConsumer.as_asgi())
router.add_websocket_route('/ws/notifications/', consumers.NotificationConsumer.as_asgi())
application = router
# tests/test_asgi.py
import pytest
from channels.testing import HttpCommunicator, WebsocketCommunicator
from myproject.asgi import application
@pytest.mark.asyncio
async def test_http_endpoint():
"""Test HTTP endpoint through ASGI."""
communicator = HttpCommunicator(application, "GET", "/health")
response = await communicator.get_response()
assert response['status'] == 200
assert response['headers'][b'content-type'] == b'application/json'
@pytest.mark.asyncio
async def test_websocket_connection():
"""Test WebSocket connection through ASGI."""
communicator = WebsocketCommunicator(application, "/ws/test/")
connected, subprotocol = await communicator.connect()
assert connected
# Send message
await communicator.send_to(text_data="Hello")
# Receive response
response = await communicator.receive_from()
assert response == "Hello"
# Disconnect
await communicator.disconnect()
@pytest.mark.asyncio
async def test_lifespan_events():
"""Test ASGI lifespan events."""
from channels.testing import ApplicationCommunicator
communicator = ApplicationCommunicator(application, {"type": "lifespan"})
# Test startup
await communicator.send_input({"type": "lifespan.startup"})
output = await communicator.receive_output()
assert output["type"] == "lifespan.startup.complete"
# Test shutdown
await communicator.send_input({"type": "lifespan.shutdown"})
output = await communicator.receive_output()
assert output["type"] == "lifespan.shutdown.complete"
class TestCustomASGIApp:
"""Test custom ASGI application."""
@pytest.mark.asyncio
async def test_health_endpoint(self):
"""Test health check endpoint."""
from .custom_asgi import CustomASGIApp
app = CustomASGIApp()
# Mock scope, receive, send
scope = {
'type': 'http',
'method': 'GET',
'path': '/health',
'headers': [],
}
received_messages = []
async def mock_receive():
return {'type': 'http.request', 'body': b''}
async def mock_send(message):
received_messages.append(message)
await app(scope, mock_receive, mock_send)
# Verify response
assert len(received_messages) == 2
assert received_messages[0]['type'] == 'http.response.start'
assert received_messages[0]['status'] == 200
assert received_messages[1]['type'] == 'http.response.body'
# performance/asgi_load_test.py
import asyncio
import aiohttp
import time
from concurrent.futures import ThreadPoolExecutor
async def make_request(session, url):
"""Make a single HTTP request."""
try:
async with session.get(url) as response:
return response.status, await response.text()
except Exception as e:
return None, str(e)
async def load_test_http(url, concurrent_requests=100, total_requests=1000):
"""Load test HTTP endpoints."""
connector = aiohttp.TCPConnector(limit=concurrent_requests)
async with aiohttp.ClientSession(connector=connector) as session:
start_time = time.time()
# Create semaphore to limit concurrent requests
semaphore = asyncio.Semaphore(concurrent_requests)
async def bounded_request():
async with semaphore:
return await make_request(session, url)
# Execute requests
tasks = [bounded_request() for _ in range(total_requests)]
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.time()
duration = end_time - start_time
# Analyze results
successful = sum(1 for status, _ in results if status == 200)
failed = total_requests - successful
rps = total_requests / duration
print(f"Load Test Results:")
print(f" Total requests: {total_requests}")
print(f" Concurrent requests: {concurrent_requests}")
print(f" Duration: {duration:.2f}s")
print(f" Successful: {successful}")
print(f" Failed: {failed}")
print(f" Requests per second: {rps:.2f}")
return {
'total_requests': total_requests,
'successful': successful,
'failed': failed,
'duration': duration,
'rps': rps
}
async def load_test_websocket(url, concurrent_connections=100, messages_per_connection=10):
"""Load test WebSocket endpoints."""
import websockets
async def websocket_client():
try:
async with websockets.connect(url) as websocket:
for i in range(messages_per_connection):
await websocket.send(f"Message {i}")
response = await websocket.recv()
return True
except Exception:
return False
start_time = time.time()
# Create concurrent connections
tasks = [websocket_client() for _ in range(concurrent_connections)]
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.time()
duration = end_time - start_time
successful = sum(1 for result in results if result is True)
failed = concurrent_connections - successful
print(f"WebSocket Load Test Results:")
print(f" Concurrent connections: {concurrent_connections}")
print(f" Messages per connection: {messages_per_connection}")
print(f" Duration: {duration:.2f}s")
print(f" Successful connections: {successful}")
print(f" Failed connections: {failed}")
return {
'concurrent_connections': concurrent_connections,
'successful': successful,
'failed': failed,
'duration': duration
}
# Run load tests
if __name__ == "__main__":
# HTTP load test
asyncio.run(load_test_http("http://localhost:8000/api/health/"))
# WebSocket load test
asyncio.run(load_test_websocket("ws://localhost:8000/ws/test/"))
ASGI represents a fundamental shift in how Python web applications handle concurrency and real-time features. Understanding ASGI's architecture, middleware patterns, and server configuration is essential for building modern Django applications that can handle thousands of concurrent connections while maintaining excellent performance and reliability.
Asynchronous Django
Modern web applications demand high concurrency, real-time features, and efficient resource utilization. Django's asynchronous capabilities enable building applications that handle thousands of concurrent connections, provide real-time updates, and process background tasks efficiently. This comprehensive guide covers Django's async ecosystem, from ASGI and async views to WebSockets and background task processing.
Async Views
Django's async views enable handling high-concurrency scenarios efficiently by allowing views to perform I/O operations without blocking the server thread. This chapter covers implementing async views, handling concurrent operations, integrating with external services, and optimizing performance for I/O-bound operations.