Security

SQL Injection Protection

SQL injection is one of the most dangerous web application vulnerabilities, allowing attackers to manipulate database queries and potentially access, modify, or delete sensitive data. Django's ORM provides robust protection against SQL injection attacks through parameterized queries and safe query construction.

SQL Injection Protection

SQL injection is one of the most dangerous web application vulnerabilities, allowing attackers to manipulate database queries and potentially access, modify, or delete sensitive data. Django's ORM provides robust protection against SQL injection attacks through parameterized queries and safe query construction.

Understanding SQL Injection

How SQL Injection Works

# VULNERABLE CODE EXAMPLE (DON'T DO THIS!)
def vulnerable_user_search(request):
    """DANGEROUS: Direct string formatting in SQL"""
    
    username = request.GET.get('username', '')
    
    # This is vulnerable to SQL injection!
    query = f"SELECT * FROM auth_user WHERE username = '{username}'"
    
    from django.db import connection
    cursor = connection.cursor()
    cursor.execute(query)  # DANGEROUS!
    
    results = cursor.fetchall()
    return render(request, 'search_results.html', {'results': results})

# Attack examples:
# ?username=admin' OR '1'='1' --
# Result: SELECT * FROM auth_user WHERE username = 'admin' OR '1'='1' --'
# This returns ALL users!

# ?username=admin'; DROP TABLE auth_user; --
# Result: Attempts to delete the entire user table!

# ?username=admin' UNION SELECT password FROM auth_user WHERE username='admin' --
# Result: Attempts to extract password hashes

SQL Injection Attack Types

-- 1. Authentication Bypass
-- Input: admin' OR '1'='1' --
SELECT * FROM users WHERE username = 'admin' OR '1'='1' --' AND password = 'password'

-- 2. Data Extraction (UNION attacks)
-- Input: ' UNION SELECT username, password FROM auth_user --
SELECT name FROM products WHERE id = '' UNION SELECT username, password FROM auth_user --'

-- 3. Blind SQL Injection
-- Input: ' AND (SELECT COUNT(*) FROM auth_user) > 0 --
SELECT * FROM articles WHERE title = '' AND (SELECT COUNT(*) FROM auth_user) > 0 --'

-- 4. Time-based Blind Injection
-- Input: '; WAITFOR DELAY '00:00:05' --
SELECT * FROM products WHERE id = ''; WAITFOR DELAY '00:00:05' --'

-- 5. Boolean-based Blind Injection
-- Input: ' AND 1=1 --  (returns results)
-- Input: ' AND 1=2 --  (returns no results)

Django ORM Protection

Safe Query Construction

Django's ORM automatically uses parameterized queries:

# SAFE: Django ORM automatically parameterizes queries
from django.contrib.auth.models import User

def safe_user_search(request):
    """SAFE: Using Django ORM"""
    
    username = request.GET.get('username', '')
    
    # This is automatically protected against SQL injection
    users = User.objects.filter(username=username)
    
    # Django generates: SELECT * FROM auth_user WHERE username = %s
    # Parameters: [username]
    
    return render(request, 'search_results.html', {'users': users})

def safe_complex_search(request):
    """SAFE: Complex queries with ORM"""
    
    username = request.GET.get('username', '')
    email = request.GET.get('email', '')
    is_active = request.GET.get('is_active') == 'true'
    
    # All of these are safe
    users = User.objects.filter(
        username__icontains=username,
        email__icontains=email,
        is_active=is_active
    ).select_related('profile')
    
    # Django handles parameterization automatically
    return render(request, 'search_results.html', {'users': users})

def safe_dynamic_filtering(request):
    """SAFE: Dynamic filtering with Q objects"""
    
    from django.db.models import Q
    
    search_term = request.GET.get('search', '')
    
    if search_term:
        # Complex query with Q objects - still safe
        users = User.objects.filter(
            Q(username__icontains=search_term) |
            Q(first_name__icontains=search_term) |
            Q(last_name__icontains=search_term) |
            Q(email__icontains=search_term)
        )
    else:
        users = User.objects.none()
    
    return render(request, 'search_results.html', {'users': users})

Safe Raw SQL Usage

When you must use raw SQL, Django provides safe methods:

# SAFE: Using raw() with parameters
def safe_raw_query(request):
    """SAFE: Raw SQL with proper parameterization"""
    
    user_id = request.GET.get('user_id')
    
    # Method 1: Using raw() with parameters
    users = User.objects.raw(
        "SELECT * FROM auth_user WHERE id = %s AND is_active = %s",
        [user_id, True]
    )
    
    return render(request, 'users.html', {'users': users})

def safe_cursor_usage(request):
    """SAFE: Direct cursor usage with parameters"""
    
    from django.db import connection
    
    username = request.GET.get('username')
    
    with connection.cursor() as cursor:
        # SAFE: Using parameter substitution
        cursor.execute(
            "SELECT u.*, p.bio FROM auth_user u "
            "LEFT JOIN profiles_userprofile p ON u.id = p.user_id "
            "WHERE u.username = %s",
            [username]
        )
        
        results = cursor.fetchall()
    
    return render(request, 'user_details.html', {'results': results})

def safe_named_parameters(request):
    """SAFE: Using named parameters (PostgreSQL)"""
    
    from django.db import connection
    
    min_age = request.GET.get('min_age', 18)
    max_age = request.GET.get('max_age', 65)
    
    with connection.cursor() as cursor:
        # SAFE: Named parameters (PostgreSQL syntax)
        cursor.execute(
            "SELECT * FROM users WHERE age BETWEEN %(min_age)s AND %(max_age)s",
            {'min_age': min_age, 'max_age': max_age}
        )
        
        results = cursor.fetchall()
    
    return render(request, 'age_filtered_users.html', {'results': results})

Dangerous Raw SQL Patterns to Avoid

# DANGEROUS PATTERNS - NEVER DO THESE!

def dangerous_string_formatting(request):
    """DANGEROUS: String formatting in SQL"""
    
    username = request.GET.get('username')
    
    # DON'T DO THIS - Vulnerable to SQL injection!
    query = f"SELECT * FROM auth_user WHERE username = '{username}'"
    
    # DON'T DO THIS EITHER!
    query = "SELECT * FROM auth_user WHERE username = '%s'" % username
    
    # OR THIS!
    query = "SELECT * FROM auth_user WHERE username = '{}'".format(username)

def dangerous_concatenation(request):
    """DANGEROUS: String concatenation in SQL"""
    
    order_by = request.GET.get('order_by', 'username')
    
    # DON'T DO THIS - ORDER BY cannot be parameterized safely this way
    query = "SELECT * FROM auth_user ORDER BY " + order_by
    
    # Attacker could inject: username; DROP TABLE auth_user; --

def dangerous_dynamic_queries(request):
    """DANGEROUS: Building dynamic queries unsafely"""
    
    filters = request.GET.get('filters', '')
    
    # DON'T DO THIS!
    query = "SELECT * FROM auth_user WHERE " + filters
    
    # Attacker could inject: 1=1 OR (SELECT COUNT(*) FROM sensitive_table) > 0

Safe Dynamic Query Building

Secure Dynamic Filtering

# Safe dynamic query building
def secure_dynamic_search(request):
    """SECURE: Dynamic search with validation"""
    
    # Define allowed search fields
    ALLOWED_FIELDS = {
        'username': 'username__icontains',
        'email': 'email__icontains',
        'first_name': 'first_name__icontains',
        'last_name': 'last_name__icontains',
        'is_active': 'is_active',
        'date_joined': 'date_joined__gte'
    }
    
    # Build filter dictionary safely
    filters = {}
    for field, value in request.GET.items():
        if field in ALLOWED_FIELDS and value:
            django_field = ALLOWED_FIELDS[field]
            
            # Additional validation based on field type
            if field == 'is_active':
                value = value.lower() in ('true', '1', 'yes')
            elif field == 'date_joined':
                try:
                    from datetime import datetime
                    value = datetime.strptime(value, '%Y-%m-%d')
                except ValueError:
                    continue  # Skip invalid dates
            
            filters[django_field] = value
    
    # Apply filters safely
    users = User.objects.filter(**filters)
    
    return render(request, 'search_results.html', {'users': users})

def secure_ordering(request):
    """SECURE: Dynamic ordering with validation"""
    
    # Define allowed ordering fields
    ALLOWED_ORDER_FIELDS = [
        'username', 'email', 'first_name', 'last_name', 
        'date_joined', 'last_login'
    ]
    
    order_by = request.GET.get('order_by', 'username')
    direction = request.GET.get('direction', 'asc')
    
    # Validate ordering field
    if order_by not in ALLOWED_ORDER_FIELDS:
        order_by = 'username'  # Default safe value
    
    # Validate direction
    if direction not in ['asc', 'desc']:
        direction = 'asc'
    
    # Build ordering string safely
    if direction == 'desc':
        order_by = f'-{order_by}'
    
    users = User.objects.all().order_by(order_by)
    
    return render(request, 'users.html', {'users': users})

Safe Complex Query Building

def secure_advanced_search(request):
    """SECURE: Advanced search with multiple criteria"""
    
    from django.db.models import Q
    from datetime import datetime, timedelta
    
    # Get search parameters
    search_term = request.GET.get('search', '').strip()
    user_type = request.GET.get('user_type', '')
    date_range = request.GET.get('date_range', '')
    
    # Start with base queryset
    queryset = User.objects.all()
    
    # Add search term filter
    if search_term:
        # Limit search term length to prevent DoS
        if len(search_term) > 100:
            search_term = search_term[:100]
        
        queryset = queryset.filter(
            Q(username__icontains=search_term) |
            Q(first_name__icontains=search_term) |
            Q(last_name__icontains=search_term) |
            Q(email__icontains=search_term)
        )
    
    # Add user type filter
    if user_type in ['staff', 'superuser', 'regular']:
        if user_type == 'staff':
            queryset = queryset.filter(is_staff=True)
        elif user_type == 'superuser':
            queryset = queryset.filter(is_superuser=True)
        elif user_type == 'regular':
            queryset = queryset.filter(is_staff=False, is_superuser=False)
    
    # Add date range filter
    if date_range in ['today', 'week', 'month', 'year']:
        now = datetime.now()
        if date_range == 'today':
            start_date = now.replace(hour=0, minute=0, second=0, microsecond=0)
        elif date_range == 'week':
            start_date = now - timedelta(days=7)
        elif date_range == 'month':
            start_date = now - timedelta(days=30)
        elif date_range == 'year':
            start_date = now - timedelta(days=365)
        
        queryset = queryset.filter(date_joined__gte=start_date)
    
    # Optimize query
    queryset = queryset.select_related('profile').prefetch_related('groups')
    
    # Limit results to prevent DoS
    queryset = queryset[:1000]
    
    return render(request, 'advanced_search.html', {'users': queryset})

Input Validation for SQL Safety

Form-Based Validation

# forms.py - Secure form validation
from django import forms
from django.core.exceptions import ValidationError
import re

class UserSearchForm(forms.Form):
    """Secure user search form"""
    
    username = forms.CharField(
        max_length=150,
        required=False,
        widget=forms.TextInput(attrs={'placeholder': 'Username'})
    )
    
    email = forms.EmailField(
        required=False,
        widget=forms.EmailInput(attrs={'placeholder': 'Email'})
    )
    
    order_by = forms.ChoiceField(
        choices=[
            ('username', 'Username'),
            ('email', 'Email'),
            ('date_joined', 'Date Joined'),
            ('last_login', 'Last Login'),
        ],
        required=False,
        initial='username'
    )
    
    direction = forms.ChoiceField(
        choices=[('asc', 'Ascending'), ('desc', 'Descending')],
        required=False,
        initial='asc'
    )
    
    def clean_username(self):
        """Validate username field"""
        username = self.cleaned_data['username']
        
        if username:
            # Remove potentially dangerous characters
            username = re.sub(r'[^\w\-\.]', '', username)
            
            # Limit length
            if len(username) > 150:
                raise ValidationError("Username too long")
        
        return username
    
    def clean(self):
        """Cross-field validation"""
        cleaned_data = super().clean()
        
        username = cleaned_data.get('username')
        email = cleaned_data.get('email')
        
        # Require at least one search criterion
        if not username and not email:
            raise ValidationError("Please provide at least one search criterion")
        
        return cleaned_data

# Using the form in views
def secure_search_view(request):
    """Secure search using validated form data"""
    
    form = UserSearchForm(request.GET)
    users = []
    
    if form.is_valid():
        # Use validated data for queries
        username = form.cleaned_data.get('username')
        email = form.cleaned_data.get('email')
        order_by = form.cleaned_data.get('order_by', 'username')
        direction = form.cleaned_data.get('direction', 'asc')
        
        # Build query safely
        queryset = User.objects.all()
        
        if username:
            queryset = queryset.filter(username__icontains=username)
        
        if email:
            queryset = queryset.filter(email__icontains=email)
        
        # Apply ordering
        if direction == 'desc':
            order_by = f'-{order_by}'
        
        users = queryset.order_by(order_by)[:100]  # Limit results
    
    return render(request, 'search.html', {
        'form': form,
        'users': users
    })

Model-Level Validation

# models.py - Model validation for SQL safety
from django.db import models
from django.core.exceptions import ValidationError
import re

def validate_sql_safe_string(value):
    """Validator to ensure string is safe for SQL operations"""
    
    # Check for SQL injection patterns
    dangerous_patterns = [
        r"'.*'",  # Single quotes
        r'".*"',  # Double quotes
        r'--',    # SQL comments
        r'/\*.*\*/',  # Multi-line comments
        r'\bunion\b',  # UNION keyword
        r'\bselect\b', # SELECT keyword
        r'\binsert\b', # INSERT keyword
        r'\bupdate\b', # UPDATE keyword
        r'\bdelete\b', # DELETE keyword
        r'\bdrop\b',   # DROP keyword
    ]
    
    value_lower = value.lower()
    for pattern in dangerous_patterns:
        if re.search(pattern, value_lower):
            raise ValidationError(f"Value contains potentially dangerous SQL pattern: {pattern}")

class SecureUserProfile(models.Model):
    """User profile with SQL injection protection"""
    
    user = models.OneToOneField(User, on_delete=models.CASCADE)
    
    bio = models.TextField(
        max_length=1000,
        validators=[validate_sql_safe_string],
        help_text="Bio cannot contain SQL keywords or special characters"
    )
    
    website = models.URLField(
        blank=True,
        validators=[validate_sql_safe_string]
    )
    
    def clean(self):
        """Additional model validation"""
        super().clean()
        
        # Additional bio validation
        if self.bio:
            # Remove potentially dangerous characters
            cleaned_bio = re.sub(r'[<>"\']', '', self.bio)
            if cleaned_bio != self.bio:
                raise ValidationError("Bio contains invalid characters")

Database-Specific Protections

PostgreSQL Security Features

# PostgreSQL-specific security settings
DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql',
        'NAME': 'myapp_db',
        'USER': 'myapp_user',  # Limited privilege user
        'PASSWORD': os.environ.get('DB_PASSWORD'),
        'HOST': 'localhost',
        'PORT': '5432',
        'OPTIONS': {
            # Enable SSL
            'sslmode': 'require',
            
            # Connection security
            'connect_timeout': 10,
            
            # Additional security options
            'options': '-c default_transaction_isolation=serializable'
        },
    }
}

# PostgreSQL row-level security example
def setup_row_level_security():
    """Setup row-level security policies"""
    
    from django.db import connection
    
    with connection.cursor() as cursor:
        # Enable RLS on sensitive table
        cursor.execute("ALTER TABLE sensitive_data ENABLE ROW LEVEL SECURITY")
        
        # Create policy for user access
        cursor.execute("""
            CREATE POLICY user_data_policy ON sensitive_data
            FOR ALL TO app_user
            USING (user_id = current_setting('app.current_user_id')::integer)
        """)

MySQL Security Configuration

# MySQL-specific security settings
DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.mysql',
        'NAME': 'myapp_db',
        'USER': 'myapp_user',
        'PASSWORD': os.environ.get('DB_PASSWORD'),
        'HOST': 'localhost',
        'PORT': '3306',
        'OPTIONS': {
            # Enable SSL
            'ssl': {
                'ssl_ca': '/path/to/ca-cert.pem',
                'ssl_cert': '/path/to/client-cert.pem',
                'ssl_key': '/path/to/client-key.pem',
            },
            
            # SQL mode for strict validation
            'init_command': "SET sql_mode='STRICT_TRANS_TABLES'",
            
            # Character set
            'charset': 'utf8mb4',
        },
    }
}

Monitoring and Detection

SQL Injection Detection

# middleware.py - SQL injection detection middleware
import re
import logging

logger = logging.getLogger('security')

class SQLInjectionDetectionMiddleware:
    """Detect potential SQL injection attempts"""
    
    def __init__(self, get_response):
        self.get_response = get_response
        
        # SQL injection patterns
        self.sql_patterns = [
            r"'.*'",
            r'".*"',
            r'--',
            r'/\*.*\*/',
            r'\bunion\s+select\b',
            r'\bor\s+1\s*=\s*1\b',
            r'\band\s+1\s*=\s*1\b',
            r'\bdrop\s+table\b',
            r'\binsert\s+into\b',
            r'\bupdate\s+.*\s+set\b',
            r'\bdelete\s+from\b',
        ]
        
        self.compiled_patterns = [
            re.compile(pattern, re.IGNORECASE) 
            for pattern in self.sql_patterns
        ]
    
    def __call__(self, request):
        # Check for SQL injection patterns
        self.check_request_for_sql_injection(request)
        
        response = self.get_response(request)
        
        return response
    
    def check_request_for_sql_injection(self, request):
        """Check request parameters for SQL injection patterns"""
        
        # Check GET parameters
        for key, value in request.GET.items():
            if self.contains_sql_injection(value):
                self.log_sql_injection_attempt(request, 'GET', key, value)
        
        # Check POST parameters
        for key, value in request.POST.items():
            if self.contains_sql_injection(str(value)):
                self.log_sql_injection_attempt(request, 'POST', key, value)
    
    def contains_sql_injection(self, value):
        """Check if value contains SQL injection patterns"""
        
        for pattern in self.compiled_patterns:
            if pattern.search(value):
                return True
        
        return False
    
    def log_sql_injection_attempt(self, request, method, parameter, value):
        """Log SQL injection attempt"""
        
        logger.warning(
            f"Potential SQL injection attempt detected",
            extra={
                'ip_address': self.get_client_ip(request),
                'user_agent': request.META.get('HTTP_USER_AGENT', ''),
                'path': request.path,
                'method': method,
                'parameter': parameter,
                'value': value[:100],  # Limit logged value length
                'user': getattr(request, 'user', None),
            }
        )
    
    def get_client_ip(self, request):
        """Get client IP address"""
        x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
        if x_forwarded_for:
            ip = x_forwarded_for.split(',')[0]
        else:
            ip = request.META.get('REMOTE_ADDR')
        return ip

Testing SQL Injection Protection

Security Tests

# tests.py - SQL injection protection tests
from django.test import TestCase, Client
from django.contrib.auth.models import User
from django.urls import reverse

class SQLInjectionProtectionTests(TestCase):
    """Test SQL injection protection"""
    
    def setUp(self):
        self.client = Client()
        self.user = User.objects.create_user(
            username='testuser',
            password='testpass123'
        )
    
    def test_orm_protection_against_injection(self):
        """Test that ORM protects against SQL injection"""
        
        # Attempt SQL injection through ORM
        malicious_username = "admin' OR '1'='1' --"
        
        # This should not return all users
        users = User.objects.filter(username=malicious_username)
        
        # Should return no results (not all users)
        self.assertEqual(users.count(), 0)
    
    def test_search_form_injection_protection(self):
        """Test search form protection against injection"""
        
        # Attempt injection through search form
        response = self.client.get(reverse('user_search'), {
            'username': "admin' OR '1'='1' --",
            'email': "test@example.com"
        })
        
        # Should not cause error or return unexpected results
        self.assertEqual(response.status_code, 200)
        
        # Check that no users are returned (injection failed)
        self.assertNotContains(response, 'testuser')
    
    def test_raw_sql_parameterization(self):
        """Test that raw SQL uses proper parameterization"""
        
        from django.db import connection
        
        # Test parameterized query
        with connection.cursor() as cursor:
            cursor.execute(
                "SELECT * FROM auth_user WHERE username = %s",
                ["admin' OR '1'='1' --"]
            )
            results = cursor.fetchall()
        
        # Should return no results (injection prevented)
        self.assertEqual(len(results), 0)
    
    def test_injection_detection_middleware(self):
        """Test SQL injection detection middleware"""
        
        # Make request with injection attempt
        response = self.client.get('/search/', {
            'q': "'; DROP TABLE auth_user; --"
        })
        
        # Should not cause server error
        self.assertNotEqual(response.status_code, 500)
        
        # Check that attempt was logged (would need to check logs in real test)

Best Practices Summary

ORM Usage

  • Always use Django ORM for database operations when possible
  • Use parameterized queries for raw SQL
  • Validate and sanitize all user inputs
  • Use whitelisting for dynamic query parameters

Input Validation

  • Implement comprehensive form validation
  • Use model validators for additional protection
  • Sanitize user input at multiple levels
  • Limit input length and complexity

Query Construction

  • Never use string formatting or concatenation in SQL
  • Use Django's query methods and Q objects
  • Validate dynamic ordering and filtering parameters
  • Implement proper error handling

Monitoring and Detection

  • Log potential injection attempts
  • Monitor database query patterns
  • Implement rate limiting for database operations
  • Regular security audits and penetration testing

Next Steps

Now that you understand SQL injection protection, let's explore clickjacking protection and how to prevent UI redress attacks in Django applications.