SQL injection is one of the most dangerous web application vulnerabilities, allowing attackers to manipulate database queries and potentially access, modify, or delete sensitive data. Django's ORM provides robust protection against SQL injection attacks through parameterized queries and safe query construction.
# VULNERABLE CODE EXAMPLE (DON'T DO THIS!)
def vulnerable_user_search(request):
"""DANGEROUS: Direct string formatting in SQL"""
username = request.GET.get('username', '')
# This is vulnerable to SQL injection!
query = f"SELECT * FROM auth_user WHERE username = '{username}'"
from django.db import connection
cursor = connection.cursor()
cursor.execute(query) # DANGEROUS!
results = cursor.fetchall()
return render(request, 'search_results.html', {'results': results})
# Attack examples:
# ?username=admin' OR '1'='1' --
# Result: SELECT * FROM auth_user WHERE username = 'admin' OR '1'='1' --'
# This returns ALL users!
# ?username=admin'; DROP TABLE auth_user; --
# Result: Attempts to delete the entire user table!
# ?username=admin' UNION SELECT password FROM auth_user WHERE username='admin' --
# Result: Attempts to extract password hashes
-- 1. Authentication Bypass
-- Input: admin' OR '1'='1' --
SELECT * FROM users WHERE username = 'admin' OR '1'='1' --' AND password = 'password'
-- 2. Data Extraction (UNION attacks)
-- Input: ' UNION SELECT username, password FROM auth_user --
SELECT name FROM products WHERE id = '' UNION SELECT username, password FROM auth_user --'
-- 3. Blind SQL Injection
-- Input: ' AND (SELECT COUNT(*) FROM auth_user) > 0 --
SELECT * FROM articles WHERE title = '' AND (SELECT COUNT(*) FROM auth_user) > 0 --'
-- 4. Time-based Blind Injection
-- Input: '; WAITFOR DELAY '00:00:05' --
SELECT * FROM products WHERE id = ''; WAITFOR DELAY '00:00:05' --'
-- 5. Boolean-based Blind Injection
-- Input: ' AND 1=1 -- (returns results)
-- Input: ' AND 1=2 -- (returns no results)
Django's ORM automatically uses parameterized queries:
# SAFE: Django ORM automatically parameterizes queries
from django.contrib.auth.models import User
def safe_user_search(request):
"""SAFE: Using Django ORM"""
username = request.GET.get('username', '')
# This is automatically protected against SQL injection
users = User.objects.filter(username=username)
# Django generates: SELECT * FROM auth_user WHERE username = %s
# Parameters: [username]
return render(request, 'search_results.html', {'users': users})
def safe_complex_search(request):
"""SAFE: Complex queries with ORM"""
username = request.GET.get('username', '')
email = request.GET.get('email', '')
is_active = request.GET.get('is_active') == 'true'
# All of these are safe
users = User.objects.filter(
username__icontains=username,
email__icontains=email,
is_active=is_active
).select_related('profile')
# Django handles parameterization automatically
return render(request, 'search_results.html', {'users': users})
def safe_dynamic_filtering(request):
"""SAFE: Dynamic filtering with Q objects"""
from django.db.models import Q
search_term = request.GET.get('search', '')
if search_term:
# Complex query with Q objects - still safe
users = User.objects.filter(
Q(username__icontains=search_term) |
Q(first_name__icontains=search_term) |
Q(last_name__icontains=search_term) |
Q(email__icontains=search_term)
)
else:
users = User.objects.none()
return render(request, 'search_results.html', {'users': users})
When you must use raw SQL, Django provides safe methods:
# SAFE: Using raw() with parameters
def safe_raw_query(request):
"""SAFE: Raw SQL with proper parameterization"""
user_id = request.GET.get('user_id')
# Method 1: Using raw() with parameters
users = User.objects.raw(
"SELECT * FROM auth_user WHERE id = %s AND is_active = %s",
[user_id, True]
)
return render(request, 'users.html', {'users': users})
def safe_cursor_usage(request):
"""SAFE: Direct cursor usage with parameters"""
from django.db import connection
username = request.GET.get('username')
with connection.cursor() as cursor:
# SAFE: Using parameter substitution
cursor.execute(
"SELECT u.*, p.bio FROM auth_user u "
"LEFT JOIN profiles_userprofile p ON u.id = p.user_id "
"WHERE u.username = %s",
[username]
)
results = cursor.fetchall()
return render(request, 'user_details.html', {'results': results})
def safe_named_parameters(request):
"""SAFE: Using named parameters (PostgreSQL)"""
from django.db import connection
min_age = request.GET.get('min_age', 18)
max_age = request.GET.get('max_age', 65)
with connection.cursor() as cursor:
# SAFE: Named parameters (PostgreSQL syntax)
cursor.execute(
"SELECT * FROM users WHERE age BETWEEN %(min_age)s AND %(max_age)s",
{'min_age': min_age, 'max_age': max_age}
)
results = cursor.fetchall()
return render(request, 'age_filtered_users.html', {'results': results})
# DANGEROUS PATTERNS - NEVER DO THESE!
def dangerous_string_formatting(request):
"""DANGEROUS: String formatting in SQL"""
username = request.GET.get('username')
# DON'T DO THIS - Vulnerable to SQL injection!
query = f"SELECT * FROM auth_user WHERE username = '{username}'"
# DON'T DO THIS EITHER!
query = "SELECT * FROM auth_user WHERE username = '%s'" % username
# OR THIS!
query = "SELECT * FROM auth_user WHERE username = '{}'".format(username)
def dangerous_concatenation(request):
"""DANGEROUS: String concatenation in SQL"""
order_by = request.GET.get('order_by', 'username')
# DON'T DO THIS - ORDER BY cannot be parameterized safely this way
query = "SELECT * FROM auth_user ORDER BY " + order_by
# Attacker could inject: username; DROP TABLE auth_user; --
def dangerous_dynamic_queries(request):
"""DANGEROUS: Building dynamic queries unsafely"""
filters = request.GET.get('filters', '')
# DON'T DO THIS!
query = "SELECT * FROM auth_user WHERE " + filters
# Attacker could inject: 1=1 OR (SELECT COUNT(*) FROM sensitive_table) > 0
# Safe dynamic query building
def secure_dynamic_search(request):
"""SECURE: Dynamic search with validation"""
# Define allowed search fields
ALLOWED_FIELDS = {
'username': 'username__icontains',
'email': 'email__icontains',
'first_name': 'first_name__icontains',
'last_name': 'last_name__icontains',
'is_active': 'is_active',
'date_joined': 'date_joined__gte'
}
# Build filter dictionary safely
filters = {}
for field, value in request.GET.items():
if field in ALLOWED_FIELDS and value:
django_field = ALLOWED_FIELDS[field]
# Additional validation based on field type
if field == 'is_active':
value = value.lower() in ('true', '1', 'yes')
elif field == 'date_joined':
try:
from datetime import datetime
value = datetime.strptime(value, '%Y-%m-%d')
except ValueError:
continue # Skip invalid dates
filters[django_field] = value
# Apply filters safely
users = User.objects.filter(**filters)
return render(request, 'search_results.html', {'users': users})
def secure_ordering(request):
"""SECURE: Dynamic ordering with validation"""
# Define allowed ordering fields
ALLOWED_ORDER_FIELDS = [
'username', 'email', 'first_name', 'last_name',
'date_joined', 'last_login'
]
order_by = request.GET.get('order_by', 'username')
direction = request.GET.get('direction', 'asc')
# Validate ordering field
if order_by not in ALLOWED_ORDER_FIELDS:
order_by = 'username' # Default safe value
# Validate direction
if direction not in ['asc', 'desc']:
direction = 'asc'
# Build ordering string safely
if direction == 'desc':
order_by = f'-{order_by}'
users = User.objects.all().order_by(order_by)
return render(request, 'users.html', {'users': users})
def secure_advanced_search(request):
"""SECURE: Advanced search with multiple criteria"""
from django.db.models import Q
from datetime import datetime, timedelta
# Get search parameters
search_term = request.GET.get('search', '').strip()
user_type = request.GET.get('user_type', '')
date_range = request.GET.get('date_range', '')
# Start with base queryset
queryset = User.objects.all()
# Add search term filter
if search_term:
# Limit search term length to prevent DoS
if len(search_term) > 100:
search_term = search_term[:100]
queryset = queryset.filter(
Q(username__icontains=search_term) |
Q(first_name__icontains=search_term) |
Q(last_name__icontains=search_term) |
Q(email__icontains=search_term)
)
# Add user type filter
if user_type in ['staff', 'superuser', 'regular']:
if user_type == 'staff':
queryset = queryset.filter(is_staff=True)
elif user_type == 'superuser':
queryset = queryset.filter(is_superuser=True)
elif user_type == 'regular':
queryset = queryset.filter(is_staff=False, is_superuser=False)
# Add date range filter
if date_range in ['today', 'week', 'month', 'year']:
now = datetime.now()
if date_range == 'today':
start_date = now.replace(hour=0, minute=0, second=0, microsecond=0)
elif date_range == 'week':
start_date = now - timedelta(days=7)
elif date_range == 'month':
start_date = now - timedelta(days=30)
elif date_range == 'year':
start_date = now - timedelta(days=365)
queryset = queryset.filter(date_joined__gte=start_date)
# Optimize query
queryset = queryset.select_related('profile').prefetch_related('groups')
# Limit results to prevent DoS
queryset = queryset[:1000]
return render(request, 'advanced_search.html', {'users': queryset})
# forms.py - Secure form validation
from django import forms
from django.core.exceptions import ValidationError
import re
class UserSearchForm(forms.Form):
"""Secure user search form"""
username = forms.CharField(
max_length=150,
required=False,
widget=forms.TextInput(attrs={'placeholder': 'Username'})
)
email = forms.EmailField(
required=False,
widget=forms.EmailInput(attrs={'placeholder': 'Email'})
)
order_by = forms.ChoiceField(
choices=[
('username', 'Username'),
('email', 'Email'),
('date_joined', 'Date Joined'),
('last_login', 'Last Login'),
],
required=False,
initial='username'
)
direction = forms.ChoiceField(
choices=[('asc', 'Ascending'), ('desc', 'Descending')],
required=False,
initial='asc'
)
def clean_username(self):
"""Validate username field"""
username = self.cleaned_data['username']
if username:
# Remove potentially dangerous characters
username = re.sub(r'[^\w\-\.]', '', username)
# Limit length
if len(username) > 150:
raise ValidationError("Username too long")
return username
def clean(self):
"""Cross-field validation"""
cleaned_data = super().clean()
username = cleaned_data.get('username')
email = cleaned_data.get('email')
# Require at least one search criterion
if not username and not email:
raise ValidationError("Please provide at least one search criterion")
return cleaned_data
# Using the form in views
def secure_search_view(request):
"""Secure search using validated form data"""
form = UserSearchForm(request.GET)
users = []
if form.is_valid():
# Use validated data for queries
username = form.cleaned_data.get('username')
email = form.cleaned_data.get('email')
order_by = form.cleaned_data.get('order_by', 'username')
direction = form.cleaned_data.get('direction', 'asc')
# Build query safely
queryset = User.objects.all()
if username:
queryset = queryset.filter(username__icontains=username)
if email:
queryset = queryset.filter(email__icontains=email)
# Apply ordering
if direction == 'desc':
order_by = f'-{order_by}'
users = queryset.order_by(order_by)[:100] # Limit results
return render(request, 'search.html', {
'form': form,
'users': users
})
# models.py - Model validation for SQL safety
from django.db import models
from django.core.exceptions import ValidationError
import re
def validate_sql_safe_string(value):
"""Validator to ensure string is safe for SQL operations"""
# Check for SQL injection patterns
dangerous_patterns = [
r"'.*'", # Single quotes
r'".*"', # Double quotes
r'--', # SQL comments
r'/\*.*\*/', # Multi-line comments
r'\bunion\b', # UNION keyword
r'\bselect\b', # SELECT keyword
r'\binsert\b', # INSERT keyword
r'\bupdate\b', # UPDATE keyword
r'\bdelete\b', # DELETE keyword
r'\bdrop\b', # DROP keyword
]
value_lower = value.lower()
for pattern in dangerous_patterns:
if re.search(pattern, value_lower):
raise ValidationError(f"Value contains potentially dangerous SQL pattern: {pattern}")
class SecureUserProfile(models.Model):
"""User profile with SQL injection protection"""
user = models.OneToOneField(User, on_delete=models.CASCADE)
bio = models.TextField(
max_length=1000,
validators=[validate_sql_safe_string],
help_text="Bio cannot contain SQL keywords or special characters"
)
website = models.URLField(
blank=True,
validators=[validate_sql_safe_string]
)
def clean(self):
"""Additional model validation"""
super().clean()
# Additional bio validation
if self.bio:
# Remove potentially dangerous characters
cleaned_bio = re.sub(r'[<>"\']', '', self.bio)
if cleaned_bio != self.bio:
raise ValidationError("Bio contains invalid characters")
# PostgreSQL-specific security settings
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'myapp_db',
'USER': 'myapp_user', # Limited privilege user
'PASSWORD': os.environ.get('DB_PASSWORD'),
'HOST': 'localhost',
'PORT': '5432',
'OPTIONS': {
# Enable SSL
'sslmode': 'require',
# Connection security
'connect_timeout': 10,
# Additional security options
'options': '-c default_transaction_isolation=serializable'
},
}
}
# PostgreSQL row-level security example
def setup_row_level_security():
"""Setup row-level security policies"""
from django.db import connection
with connection.cursor() as cursor:
# Enable RLS on sensitive table
cursor.execute("ALTER TABLE sensitive_data ENABLE ROW LEVEL SECURITY")
# Create policy for user access
cursor.execute("""
CREATE POLICY user_data_policy ON sensitive_data
FOR ALL TO app_user
USING (user_id = current_setting('app.current_user_id')::integer)
""")
# MySQL-specific security settings
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': 'myapp_db',
'USER': 'myapp_user',
'PASSWORD': os.environ.get('DB_PASSWORD'),
'HOST': 'localhost',
'PORT': '3306',
'OPTIONS': {
# Enable SSL
'ssl': {
'ssl_ca': '/path/to/ca-cert.pem',
'ssl_cert': '/path/to/client-cert.pem',
'ssl_key': '/path/to/client-key.pem',
},
# SQL mode for strict validation
'init_command': "SET sql_mode='STRICT_TRANS_TABLES'",
# Character set
'charset': 'utf8mb4',
},
}
}
# middleware.py - SQL injection detection middleware
import re
import logging
logger = logging.getLogger('security')
class SQLInjectionDetectionMiddleware:
"""Detect potential SQL injection attempts"""
def __init__(self, get_response):
self.get_response = get_response
# SQL injection patterns
self.sql_patterns = [
r"'.*'",
r'".*"',
r'--',
r'/\*.*\*/',
r'\bunion\s+select\b',
r'\bor\s+1\s*=\s*1\b',
r'\band\s+1\s*=\s*1\b',
r'\bdrop\s+table\b',
r'\binsert\s+into\b',
r'\bupdate\s+.*\s+set\b',
r'\bdelete\s+from\b',
]
self.compiled_patterns = [
re.compile(pattern, re.IGNORECASE)
for pattern in self.sql_patterns
]
def __call__(self, request):
# Check for SQL injection patterns
self.check_request_for_sql_injection(request)
response = self.get_response(request)
return response
def check_request_for_sql_injection(self, request):
"""Check request parameters for SQL injection patterns"""
# Check GET parameters
for key, value in request.GET.items():
if self.contains_sql_injection(value):
self.log_sql_injection_attempt(request, 'GET', key, value)
# Check POST parameters
for key, value in request.POST.items():
if self.contains_sql_injection(str(value)):
self.log_sql_injection_attempt(request, 'POST', key, value)
def contains_sql_injection(self, value):
"""Check if value contains SQL injection patterns"""
for pattern in self.compiled_patterns:
if pattern.search(value):
return True
return False
def log_sql_injection_attempt(self, request, method, parameter, value):
"""Log SQL injection attempt"""
logger.warning(
f"Potential SQL injection attempt detected",
extra={
'ip_address': self.get_client_ip(request),
'user_agent': request.META.get('HTTP_USER_AGENT', ''),
'path': request.path,
'method': method,
'parameter': parameter,
'value': value[:100], # Limit logged value length
'user': getattr(request, 'user', None),
}
)
def get_client_ip(self, request):
"""Get client IP address"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
# tests.py - SQL injection protection tests
from django.test import TestCase, Client
from django.contrib.auth.models import User
from django.urls import reverse
class SQLInjectionProtectionTests(TestCase):
"""Test SQL injection protection"""
def setUp(self):
self.client = Client()
self.user = User.objects.create_user(
username='testuser',
password='testpass123'
)
def test_orm_protection_against_injection(self):
"""Test that ORM protects against SQL injection"""
# Attempt SQL injection through ORM
malicious_username = "admin' OR '1'='1' --"
# This should not return all users
users = User.objects.filter(username=malicious_username)
# Should return no results (not all users)
self.assertEqual(users.count(), 0)
def test_search_form_injection_protection(self):
"""Test search form protection against injection"""
# Attempt injection through search form
response = self.client.get(reverse('user_search'), {
'username': "admin' OR '1'='1' --",
'email': "test@example.com"
})
# Should not cause error or return unexpected results
self.assertEqual(response.status_code, 200)
# Check that no users are returned (injection failed)
self.assertNotContains(response, 'testuser')
def test_raw_sql_parameterization(self):
"""Test that raw SQL uses proper parameterization"""
from django.db import connection
# Test parameterized query
with connection.cursor() as cursor:
cursor.execute(
"SELECT * FROM auth_user WHERE username = %s",
["admin' OR '1'='1' --"]
)
results = cursor.fetchall()
# Should return no results (injection prevented)
self.assertEqual(len(results), 0)
def test_injection_detection_middleware(self):
"""Test SQL injection detection middleware"""
# Make request with injection attempt
response = self.client.get('/search/', {
'q': "'; DROP TABLE auth_user; --"
})
# Should not cause server error
self.assertNotEqual(response.status_code, 500)
# Check that attempt was logged (would need to check logs in real test)
Now that you understand SQL injection protection, let's explore clickjacking protection and how to prevent UI redress attacks in Django applications.
Cross Site Scripting
Cross-Site Scripting (XSS) is a vulnerability that allows attackers to inject malicious scripts into web pages viewed by other users. Django provides robust protection against XSS attacks through automatic template escaping and security best practices.
Clickjacking Protection
Clickjacking is a malicious technique where attackers trick users into clicking on something different from what they perceive, potentially leading to unauthorized actions. Django provides built-in protection against clickjacking attacks through frame options and Content Security Policy headers.