Testing

Advanced Testing Topics

Advanced testing techniques help you handle complex scenarios, improve test reliability, and ensure comprehensive coverage of your Django application. This chapter covers sophisticated testing patterns, mocking strategies, async testing, and integration with external services.

Advanced Testing Topics

Advanced testing techniques help you handle complex scenarios, improve test reliability, and ensure comprehensive coverage of your Django application. This chapter covers sophisticated testing patterns, mocking strategies, async testing, and integration with external services.

Mocking and Patching

Advanced Mocking Strategies

from unittest.mock import Mock, patch, MagicMock, PropertyMock, call
from django.test import TestCase
from django.core.cache import cache
import requests
import time

class AdvancedMockingTests(TestCase):
    """Advanced mocking techniques"""
    
    def setUp(self):
        self.user = User.objects.create_user('testuser', 'test@example.com', 'pass')
    
    def test_mock_with_side_effects(self):
        """Test mock with side effects for complex behavior"""
        
        # Mock that simulates network retry logic
        mock_api = Mock()
        mock_api.get_data.side_effect = [
            requests.ConnectionError("Network error"),  # First call fails
            requests.Timeout("Request timeout"),        # Second call times out
            {"status": "success", "data": "result"}     # Third call succeeds
        ]
        
        # Service with retry logic
        class APIService:
            def __init__(self, api_client):
                self.api = api_client
            
            def fetch_data_with_retry(self, max_retries=3):
                for attempt in range(max_retries):
                    try:
                        return self.api.get_data()
                    except (requests.ConnectionError, requests.Timeout):
                        if attempt == max_retries - 1:
                            raise
                        time.sleep(0.1)  # Brief delay between retries
        
        service = APIService(mock_api)
        
        # Should succeed after retries
        result = service.fetch_data_with_retry()
        
        self.assertEqual(result["status"], "success")
        self.assertEqual(mock_api.get_data.call_count, 3)
    
    def test_mock_context_manager(self):
        """Test mocking context managers"""
        
        # Mock file operations
        mock_file = MagicMock()
        mock_file.__enter__.return_value = mock_file
        mock_file.read.return_value = "file content"
        
        with patch('builtins.open', return_value=mock_file):
            # Code that uses file operations
            with open('test.txt', 'r') as f:
                content = f.read()
            
            self.assertEqual(content, "file content")
            mock_file.__enter__.assert_called_once()
            mock_file.__exit__.assert_called_once()
    
    def test_mock_property(self):
        """Test mocking properties"""
        
        class UserService:
            @property
            def current_time(self):
                return time.time()
            
            def get_user_activity(self, user):
                timestamp = self.current_time
                return f"User {user.username} active at {timestamp}"
        
        service = UserService()
        
        # Mock the property
        with patch.object(
            UserService, 
            'current_time', 
            new_callable=PropertyMock
        ) as mock_time:
            mock_time.return_value = 1234567890.0
            
            result = service.get_user_activity(self.user)
            
            self.assertIn("1234567890.0", result)
            mock_time.assert_called_once()
    
    def test_mock_chained_calls(self):
        """Test mocking chained method calls"""
        
        # Mock complex API with chained calls
        mock_api = Mock()
        mock_api.users.get.return_value.profile.update.return_value = {
            "success": True
        }
        
        # Service that uses chained API calls
        class UserProfileService:
            def __init__(self, api):
                self.api = api
            
            def update_user_profile(self, user_id, data):
                return self.api.users.get(user_id).profile.update(data)
        
        service = UserProfileService(mock_api)
        result = service.update_user_profile(123, {"name": "John"})
        
        self.assertEqual(result["success"], True)
        mock_api.users.get.assert_called_once_with(123)
        mock_api.users.get.return_value.profile.update.assert_called_once_with(
            {"name": "John"}
        )
    
    def test_mock_async_functions(self):
        """Test mocking async functions"""
        
        import asyncio
        from unittest.mock import AsyncMock
        
        # Async service
        class AsyncEmailService:
            async def send_email(self, to, subject, body):
                # Simulate async email sending
                await asyncio.sleep(0.1)
                return {"message_id": "12345", "status": "sent"}
        
        async def test_async_email():
            service = AsyncEmailService()
            
            # Mock the async method
            service.send_email = AsyncMock(return_value={
                "message_id": "mock_id", 
                "status": "sent"
            })
            
            result = await service.send_email(
                "test@example.com", 
                "Test", 
                "Body"
            )
            
            assert result["message_id"] == "mock_id"
            service.send_email.assert_called_once_with(
                "test@example.com", 
                "Test", 
                "Body"
            )
        
        # Run async test
        asyncio.run(test_async_email())
    
    def test_mock_with_spec(self):
        """Test mock with spec for better error detection"""
        
        class RealService:
            def process_data(self, data):
                return {"processed": data}
            
            def validate_input(self, input_data):
                return len(input_data) > 0
        
        # Mock with spec prevents calling non-existent methods
        mock_service = Mock(spec=RealService)
        mock_service.process_data.return_value = {"processed": "test"}
        
        # This works - method exists in spec
        result = mock_service.process_data("test")
        self.assertEqual(result["processed"], "test")
        
        # This would raise AttributeError - method doesn't exist in spec
        with self.assertRaises(AttributeError):
            mock_service.nonexistent_method()

Mocking External Services

class ExternalServiceMockingTests(TestCase):
    """Test mocking external services and APIs"""
    
    def setUp(self):
        self.user = User.objects.create_user('testuser', 'test@example.com', 'pass')
    
    @patch('requests.post')
    def test_payment_service_integration(self, mock_post):
        """Test payment service integration with mocking"""
        
        # Mock successful payment response
        mock_response = Mock()
        mock_response.status_code = 200
        mock_response.json.return_value = {
            "transaction_id": "txn_12345",
            "status": "completed",
            "amount": 99.99
        }
        mock_post.return_value = mock_response
        
        # Payment service
        class PaymentService:
            def process_payment(self, amount, card_token):
                response = requests.post(
                    'https://api.payment.com/charge',
                    json={
                        'amount': amount,
                        'card_token': card_token
                    },
                    headers={'Authorization': 'Bearer secret_key'}
                )
                
                if response.status_code == 200:
                    return response.json()
                else:
                    raise Exception("Payment failed")
        
        service = PaymentService()
        result = service.process_payment(99.99, "card_token_123")
        
        # Verify payment was processed
        self.assertEqual(result["transaction_id"], "txn_12345")
        self.assertEqual(result["status"], "completed")
        
        # Verify API was called correctly
        mock_post.assert_called_once_with(
            'https://api.payment.com/charge',
            json={'amount': 99.99, 'card_token': 'card_token_123'},
            headers={'Authorization': 'Bearer secret_key'}
        )
    
    @patch('boto3.client')
    def test_aws_s3_integration(self, mock_boto_client):
        """Test AWS S3 integration with mocking"""
        
        # Mock S3 client
        mock_s3 = Mock()
        mock_s3.upload_file.return_value = None
        mock_s3.generate_presigned_url.return_value = "https://s3.amazonaws.com/bucket/file.jpg"
        mock_boto_client.return_value = mock_s3
        
        # S3 service
        class S3Service:
            def __init__(self):
                import boto3
                self.s3_client = boto3.client('s3')
            
            def upload_file(self, file_path, bucket, key):
                self.s3_client.upload_file(file_path, bucket, key)
                return self.s3_client.generate_presigned_url(
                    'get_object',
                    Params={'Bucket': bucket, 'Key': key},
                    ExpiresIn=3600
                )
        
        service = S3Service()
        url = service.upload_file('/tmp/test.jpg', 'my-bucket', 'uploads/test.jpg')
        
        # Verify S3 operations
        mock_s3.upload_file.assert_called_once_with(
            '/tmp/test.jpg', 
            'my-bucket', 
            'uploads/test.jpg'
        )
        
        self.assertEqual(url, "https://s3.amazonaws.com/bucket/file.jpg")
    
    @patch('smtplib.SMTP')
    def test_email_service_integration(self, mock_smtp):
        """Test email service integration with mocking"""
        
        # Mock SMTP server
        mock_server = Mock()
        mock_smtp.return_value.__enter__.return_value = mock_server
        
        # Email service
        class EmailService:
            def send_email(self, to, subject, body):
                import smtplib
                from email.mime.text import MIMEText
                
                msg = MIMEText(body)
                msg['Subject'] = subject
                msg['To'] = to
                msg['From'] = 'noreply@example.com'
                
                with smtplib.SMTP('smtp.gmail.com', 587) as server:
                    server.starttls()
                    server.login('user@example.com', 'password')
                    server.send_message(msg)
                
                return True
        
        service = EmailService()
        result = service.send_email(
            'test@example.com',
            'Test Subject',
            'Test Body'
        )
        
        # Verify email operations
        self.assertTrue(result)
        mock_server.starttls.assert_called_once()
        mock_server.login.assert_called_once_with('user@example.com', 'password')
        mock_server.send_message.assert_called_once()
    
    def test_cache_service_mocking(self):
        """Test caching service with mocking"""
        
        # Service that uses caching
        class CacheService:
            def get_user_data(self, user_id):
                cache_key = f"user_data_{user_id}"
                
                # Try to get from cache first
                cached_data = cache.get(cache_key)
                if cached_data:
                    return cached_data
                
                # Simulate expensive database operation
                data = {"user_id": user_id, "name": "John Doe"}
                
                # Cache for 1 hour
                cache.set(cache_key, data, 3600)
                return data
        
        service = CacheService()
        
        # Mock cache operations
        with patch.object(cache, 'get') as mock_get, \
             patch.object(cache, 'set') as mock_set:
            
            # First call - cache miss
            mock_get.return_value = None
            
            result = service.get_user_data(123)
            
            self.assertEqual(result["user_id"], 123)
            mock_get.assert_called_once_with("user_data_123")
            mock_set.assert_called_once_with(
                "user_data_123", 
                {"user_id": 123, "name": "John Doe"}, 
                3600
            )
            
            # Second call - cache hit
            mock_get.reset_mock()
            mock_set.reset_mock()
            mock_get.return_value = {"user_id": 123, "name": "John Doe"}
            
            result = service.get_user_data(123)
            
            mock_get.assert_called_once_with("user_data_123")
            mock_set.assert_not_called()  # Should not set cache on hit

Testing Async Code

Testing Async Views and Functions

import asyncio
from django.test import TestCase
from asgiref.sync import sync_to_async
from channels.testing import HttpCommunicator
from channels.db import database_sync_to_async

class AsyncTestCase(TestCase):
    """Base class for async testing"""
    
    def setUp(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
    
    def tearDown(self):
        self.loop.close()
    
    def async_test(self, coro):
        """Helper to run async test functions"""
        return self.loop.run_until_complete(coro)

class AsyncViewTests(AsyncTestCase):
    """Test async views and functions"""
    
    def setUp(self):
        super().setUp()
        self.user = User.objects.create_user('testuser', 'test@example.com', 'pass')
    
    def test_async_view_function(self):
        """Test async view function"""
        
        from django.http import JsonResponse
        import asyncio
        
        async def async_api_view(request):
            """Async view that fetches data from multiple sources"""
            
            # Simulate async operations
            async def fetch_user_data():
                await asyncio.sleep(0.1)
                return {"user": "data"}
            
            async def fetch_posts_data():
                await asyncio.sleep(0.1)
                return {"posts": ["post1", "post2"]}
            
            # Run operations concurrently
            user_data, posts_data = await asyncio.gather(
                fetch_user_data(),
                fetch_posts_data()
            )
            
            return JsonResponse({
                "user": user_data,
                "posts": posts_data
            })
        
        # Test the async view
        async def run_test():
            from django.test import RequestFactory
            
            factory = RequestFactory()
            request = factory.get('/api/data/')
            
            response = await async_api_view(request)
            
            self.assertEqual(response.status_code, 200)
            
            import json
            data = json.loads(response.content)
            self.assertIn("user", data)
            self.assertIn("posts", data)
        
        self.async_test(run_test())
    
    def test_async_database_operations(self):
        """Test async database operations"""
        
        async def async_database_test():
            # Convert sync model operations to async
            user_count = await database_sync_to_async(User.objects.count)()
            self.assertEqual(user_count, 1)
            
            # Create user asynchronously
            new_user = await database_sync_to_async(User.objects.create_user)(
                username='asyncuser',
                email='async@example.com',
                password='pass123'
            )
            
            self.assertEqual(new_user.username, 'asyncuser')
            
            # Query users asynchronously
            users = await database_sync_to_async(list)(
                User.objects.all()
            )
            
            self.assertEqual(len(users), 2)
        
        self.async_test(async_database_test())
    
    def test_async_service_integration(self):
        """Test async service integration"""
        
        class AsyncNotificationService:
            async def send_notification(self, user_id, message):
                # Simulate async notification sending
                await asyncio.sleep(0.1)
                return {
                    "notification_id": f"notif_{user_id}",
                    "status": "sent",
                    "message": message
                }
            
            async def send_bulk_notifications(self, notifications):
                # Send multiple notifications concurrently
                tasks = [
                    self.send_notification(notif["user_id"], notif["message"])
                    for notif in notifications
                ]
                
                results = await asyncio.gather(*tasks)
                return results
        
        async def test_notifications():
            service = AsyncNotificationService()
            
            # Test single notification
            result = await service.send_notification(123, "Hello!")
            self.assertEqual(result["notification_id"], "notif_123")
            self.assertEqual(result["status"], "sent")
            
            # Test bulk notifications
            notifications = [
                {"user_id": 1, "message": "Message 1"},
                {"user_id": 2, "message": "Message 2"},
                {"user_id": 3, "message": "Message 3"}
            ]
            
            results = await service.send_bulk_notifications(notifications)
            
            self.assertEqual(len(results), 3)
            for i, result in enumerate(results, 1):
                self.assertEqual(result["notification_id"], f"notif_{i}")
                self.assertEqual(result["status"], "sent")
        
        self.async_test(test_notifications())

Testing WebSocket Connections

from channels.testing import WebsocketCommunicator
from channels.routing import URLRouter
from django.urls import path

class WebSocketTests(TestCase):
    """Test WebSocket functionality"""
    
    def setUp(self):
        self.user = User.objects.create_user('testuser', 'test@example.com', 'pass')
    
    async def test_websocket_connection(self):
        """Test WebSocket connection and messaging"""
        
        # Mock WebSocket consumer
        from channels.generic.websocket import AsyncWebsocketConsumer
        import json
        
        class ChatConsumer(AsyncWebsocketConsumer):
            async def connect(self):
                self.room_name = self.scope['url_route']['kwargs']['room_name']
                self.room_group_name = f'chat_{self.room_name}'
                
                # Join room group
                await self.channel_layer.group_add(
                    self.room_group_name,
                    self.channel_name
                )
                
                await self.accept()
            
            async def disconnect(self, close_code):
                # Leave room group
                await self.channel_layer.group_discard(
                    self.room_group_name,
                    self.channel_name
                )
            
            async def receive(self, text_data):
                data = json.loads(text_data)
                message = data['message']
                
                # Send message to room group
                await self.channel_layer.group_send(
                    self.room_group_name,
                    {
                        'type': 'chat_message',
                        'message': message
                    }
                )
            
            async def chat_message(self, event):
                message = event['message']
                
                # Send message to WebSocket
                await self.send(text_data=json.dumps({
                    'message': message
                }))
        
        # Create WebSocket communicator
        application = URLRouter([
            path('ws/chat/<str:room_name>/', ChatConsumer.as_asgi()),
        ])
        
        communicator = WebsocketCommunicator(
            application, 
            '/ws/chat/testroom/'
        )
        
        # Test connection
        connected, subprotocol = await communicator.connect()
        self.assertTrue(connected)
        
        # Test sending message
        await communicator.send_json_to({
            'message': 'Hello WebSocket!'
        })
        
        # Test receiving message
        response = await communicator.receive_json_from()
        self.assertEqual(response['message'], 'Hello WebSocket!')
        
        # Disconnect
        await communicator.disconnect()
    
    async def test_websocket_authentication(self):
        """Test WebSocket authentication"""
        
        from channels.auth import AuthMiddlewareStack
        from channels.generic.websocket import AsyncWebsocketConsumer
        
        class AuthenticatedConsumer(AsyncWebsocketConsumer):
            async def connect(self):
                # Check if user is authenticated
                if self.scope['user'].is_anonymous:
                    await self.close()
                else:
                    await self.accept()
            
            async def receive(self, text_data):
                # Echo message with user info
                await self.send(text_data=json.dumps({
                    'message': text_data,
                    'user': self.scope['user'].username
                }))
        
        # Test with authenticated user
        application = AuthMiddlewareStack(
            URLRouter([
                path('ws/auth/', AuthenticatedConsumer.as_asgi()),
            ])
        )
        
        # This would require proper authentication setup
        # communicator = WebsocketCommunicator(application, '/ws/auth/')
        # communicator.scope['user'] = self.user
        
        # connected, _ = await communicator.connect()
        # self.assertTrue(connected)

Performance Testing

Load Testing and Benchmarking

import time
import threading
from concurrent.futures import ThreadPoolExecutor
from django.test import TestCase, Client
from django.test.utils import override_settings

class PerformanceTests(TestCase):
    """Test application performance"""
    
    def setUp(self):
        self.user = User.objects.create_user('testuser', 'test@example.com', 'pass')
        
        # Create test data
        from blog.models import BlogPost, Category
        self.category = Category.objects.create(name='Tech', slug='tech')
        
        # Create multiple posts for performance testing
        for i in range(100):
            BlogPost.objects.create(
                title=f'Post {i}',
                content=f'Content for post {i}' * 100,  # Longer content
                author=self.user,
                category=self.category,
                status='published'
            )
    
    def test_view_response_time(self):
        """Test view response time under normal load"""
        
        url = '/blog/'
        
        # Measure response time
        start_time = time.time()
        response = self.client.get(url)
        end_time = time.time()
        
        response_time = end_time - start_time
        
        # Assert reasonable response time (adjust threshold as needed)
        self.assertEqual(response.status_code, 200)
        self.assertLess(response_time, 1.0, f"Response time too slow: {response_time}s")
    
    def test_database_query_performance(self):
        """Test database query performance"""
        
        from django.test.utils import override_settings
        from django.db import connection
        
        with override_settings(DEBUG=True):
            # Clear query log
            connection.queries_log.clear()
            
            # Perform operation that should be optimized
            from blog.models import BlogPost
            
            # Test N+1 query problem
            posts = BlogPost.objects.all()[:10]
            
            # This should use select_related to avoid N+1 queries
            for post in posts:
                author_name = post.author.username
                category_name = post.category.name
            
            query_count = len(connection.queries)
            
            # Should be minimal queries (ideally 1 with proper optimization)
            self.assertLessEqual(
                query_count, 
                3, 
                f"Too many queries: {query_count}. Check for N+1 problem."
            )
    
    def test_concurrent_requests(self):
        """Test handling concurrent requests"""
        
        def make_request():
            """Make a single request"""
            client = Client()
            start_time = time.time()
            response = client.get('/blog/')
            end_time = time.time()
            
            return {
                'status_code': response.status_code,
                'response_time': end_time - start_time
            }
        
        # Simulate concurrent requests
        num_threads = 10
        results = []
        
        with ThreadPoolExecutor(max_workers=num_threads) as executor:
            futures = [executor.submit(make_request) for _ in range(num_threads)]
            
            for future in futures:
                results.append(future.result())
        
        # Analyze results
        successful_requests = [r for r in results if r['status_code'] == 200]
        response_times = [r['response_time'] for r in successful_requests]
        
        # All requests should succeed
        self.assertEqual(len(successful_requests), num_threads)
        
        # Average response time should be reasonable
        avg_response_time = sum(response_times) / len(response_times)
        self.assertLess(avg_response_time, 2.0, f"Average response time too slow: {avg_response_time}s")
        
        # No request should be extremely slow
        max_response_time = max(response_times)
        self.assertLess(max_response_time, 5.0, f"Slowest request too slow: {max_response_time}s")
    
    def test_memory_usage(self):
        """Test memory usage during operations"""
        
        import psutil
        import os
        
        # Get current process
        process = psutil.Process(os.getpid())
        
        # Measure initial memory
        initial_memory = process.memory_info().rss / 1024 / 1024  # MB
        
        # Perform memory-intensive operation
        from blog.models import BlogPost
        
        # Load all posts (should be optimized in real code)
        posts = list(BlogPost.objects.all())
        
        # Process posts
        processed_data = []
        for post in posts:
            processed_data.append({
                'title': post.title,
                'content_length': len(post.content),
                'author': post.author.username
            })
        
        # Measure final memory
        final_memory = process.memory_info().rss / 1024 / 1024  # MB
        memory_increase = final_memory - initial_memory
        
        # Memory increase should be reasonable
        self.assertLess(
            memory_increase, 
            100,  # 100MB threshold
            f"Memory usage increased by {memory_increase}MB"
        )
    
    @override_settings(DEBUG=False)  # Disable debug for realistic performance
    def test_production_performance(self):
        """Test performance with production-like settings"""
        
        # Test with caching enabled
        from django.core.cache import cache
        
        # Clear cache
        cache.clear()
        
        # First request (cache miss)
        start_time = time.time()
        response1 = self.client.get('/blog/')
        first_request_time = time.time() - start_time
        
        # Second request (cache hit, if caching is implemented)
        start_time = time.time()
        response2 = self.client.get('/blog/')
        second_request_time = time.time() - start_time
        
        # Both should succeed
        self.assertEqual(response1.status_code, 200)
        self.assertEqual(response2.status_code, 200)
        
        # If caching is implemented, second request should be faster
        # (This test assumes you have caching implemented)
        # self.assertLess(second_request_time, first_request_time)

Profiling and Optimization Testing

import cProfile
import pstats
from io import StringIO

class ProfilingTests(TestCase):
    """Test code profiling and optimization"""
    
    def setUp(self):
        self.user = User.objects.create_user('testuser', 'test@example.com', 'pass')
    
    def test_function_profiling(self):
        """Test function performance profiling"""
        
        def expensive_function():
            """Function to profile"""
            total = 0
            for i in range(10000):
                total += i * i
            return total
        
        # Profile the function
        profiler = cProfile.Profile()
        profiler.enable()
        
        result = expensive_function()
        
        profiler.disable()
        
        # Analyze profiling results
        stats_stream = StringIO()
        stats = pstats.Stats(profiler, stream=stats_stream)
        stats.sort_stats('cumulative')
        stats.print_stats(10)  # Top 10 functions
        
        profiling_output = stats_stream.getvalue()
        
        # Verify function was called and completed
        self.assertEqual(result, sum(i * i for i in range(10000)))
        self.assertIn('expensive_function', profiling_output)
    
    def test_view_profiling(self):
        """Test view performance profiling"""
        
        from django.test.utils import override_settings
        
        with override_settings(DEBUG=True):
            # Profile view request
            profiler = cProfile.Profile()
            profiler.enable()
            
            response = self.client.get('/blog/')
            
            profiler.disable()
            
            # Analyze results
            stats = pstats.Stats(profiler)
            stats.sort_stats('cumulative')
            
            # Get top functions by cumulative time
            top_functions = []
            for func_info in stats.get_stats_profile().func_profiles:
                if stats.get_stats_profile().func_profiles[func_info].cumulative > 0.01:  # > 10ms
                    top_functions.append(func_info)
            
            # Response should be successful
            self.assertEqual(response.status_code, 200)
            
            # Should have some measurable function calls
            self.assertGreater(len(top_functions), 0)
    
    def test_query_optimization(self):
        """Test database query optimization"""
        
        from django.test.utils import override_settings
        from django.db import connection
        
        # Create test data
        from blog.models import BlogPost, Category
        category = Category.objects.create(name='Tech', slug='tech')
        
        posts = []
        for i in range(50):
            post = BlogPost.objects.create(
                title=f'Post {i}',
                content=f'Content {i}',
                author=self.user,
                category=category
            )
            posts.append(post)
        
        with override_settings(DEBUG=True):
            # Test unoptimized query (N+1 problem)
            connection.queries_log.clear()
            
            unoptimized_posts = BlogPost.objects.all()[:10]
            for post in unoptimized_posts:
                # This creates N+1 queries
                author_name = post.author.username
                category_name = post.category.name
            
            unoptimized_query_count = len(connection.queries)
            
            # Test optimized query
            connection.queries_log.clear()
            
            optimized_posts = BlogPost.objects.select_related(
                'author', 'category'
            ).all()[:10]
            
            for post in optimized_posts:
                # This should use the prefetched data
                author_name = post.author.username
                category_name = post.category.name
            
            optimized_query_count = len(connection.queries)
            
            # Optimized version should use fewer queries
            self.assertLess(
                optimized_query_count, 
                unoptimized_query_count,
                f"Optimized: {optimized_query_count}, Unoptimized: {unoptimized_query_count}"
            )
            
            # Optimized should ideally be just 1 query
            self.assertLessEqual(optimized_query_count, 2)

Integration Testing

Testing Third-Party Integrations

class IntegrationTests(TestCase):
    """Test integration with external services"""
    
    def setUp(self):
        self.user = User.objects.create_user('testuser', 'test@example.com', 'pass')
    
    @patch('stripe.Charge.create')
    def test_stripe_payment_integration(self, mock_charge_create):
        """Test Stripe payment integration"""
        
        # Mock successful Stripe charge
        mock_charge_create.return_value = {
            'id': 'ch_test_charge',
            'amount': 2000,  # $20.00 in cents
            'currency': 'usd',
            'status': 'succeeded',
            'paid': True
        }
        
        # Payment service
        class StripePaymentService:
            def process_payment(self, amount, token):
                import stripe
                
                charge = stripe.Charge.create(
                    amount=int(amount * 100),  # Convert to cents
                    currency='usd',
                    source=token,
                    description='Test payment'
                )
                
                return {
                    'success': charge['paid'],
                    'charge_id': charge['id'],
                    'amount': charge['amount'] / 100
                }
        
        service = StripePaymentService()
        result = service.process_payment(20.00, 'tok_test_token')
        
        # Verify payment processing
        self.assertTrue(result['success'])
        self.assertEqual(result['charge_id'], 'ch_test_charge')
        self.assertEqual(result['amount'], 20.00)
        
        # Verify Stripe API was called correctly
        mock_charge_create.assert_called_once_with(
            amount=2000,
            currency='usd',
            source='tok_test_token',
            description='Test payment'
        )
    
    @patch('sendgrid.SendGridAPIClient.send')
    def test_sendgrid_email_integration(self, mock_send):
        """Test SendGrid email integration"""
        
        # Mock successful email send
        mock_response = Mock()
        mock_response.status_code = 202
        mock_send.return_value = mock_response
        
        # Email service
        class SendGridEmailService:
            def send_email(self, to_email, subject, content):
                import sendgrid
                from sendgrid.helpers.mail import Mail
                
                sg = sendgrid.SendGridAPIClient(api_key='test_key')
                
                mail = Mail(
                    from_email='noreply@example.com',
                    to_emails=to_email,
                    subject=subject,
                    html_content=content
                )
                
                response = sg.send(mail)
                return response.status_code == 202
        
        service = SendGridEmailService()
        success = service.send_email(
            'test@example.com',
            'Test Subject',
            '<p>Test content</p>'
        )
        
        # Verify email was sent
        self.assertTrue(success)
        mock_send.assert_called_once()
    
    def test_database_integration(self):
        """Test database integration and transactions"""
        
        from django.db import transaction
        from blog.models import BlogPost, Category
        
        category = Category.objects.create(name='Tech', slug='tech')
        
        # Test successful transaction
        with transaction.atomic():
            post = BlogPost.objects.create(
                title='Test Post',
                content='Test content',
                author=self.user,
                category=category
            )
            
            # Verify post was created
            self.assertTrue(BlogPost.objects.filter(id=post.id).exists())
        
        # Test transaction rollback
        with self.assertRaises(Exception):
            with transaction.atomic():
                BlogPost.objects.create(
                    title='Another Post',
                    content='Content',
                    author=self.user,
                    category=category
                )
                
                # Force an error to trigger rollback
                raise Exception("Forced error")
        
        # Second post should not exist due to rollback
        self.assertFalse(
            BlogPost.objects.filter(title='Another Post').exists()
        )
    
    def test_cache_integration(self):
        """Test cache integration"""
        
        from django.core.cache import cache
        
        # Test cache operations
        cache.set('test_key', 'test_value', 300)
        
        # Verify cache retrieval
        cached_value = cache.get('test_key')
        self.assertEqual(cached_value, 'test_value')
        
        # Test cache deletion
        cache.delete('test_key')
        cached_value = cache.get('test_key')
        self.assertIsNone(cached_value)
        
        # Test cache with complex data
        complex_data = {
            'user_id': self.user.id,
            'preferences': ['pref1', 'pref2'],
            'metadata': {'key': 'value'}
        }
        
        cache.set('complex_key', complex_data, 300)
        retrieved_data = cache.get('complex_key')
        
        self.assertEqual(retrieved_data, complex_data)

Next Steps

With advanced testing techniques mastered, you're ready to explore performance testing in detail. The next chapter will focus specifically on performance testing, including load testing, stress testing, and optimization strategies for Django applications.

Key advanced testing concepts covered:

  • Advanced mocking and patching strategies
  • Testing async code and WebSocket connections
  • Performance testing and profiling
  • Integration testing with external services
  • Concurrent request testing
  • Memory usage and optimization testing

These advanced techniques help ensure your Django application performs well under various conditions and integrates reliably with external services.