Models and Databases

Transactions

Database transactions ensure data consistency and integrity by grouping multiple database operations into atomic units. Django provides comprehensive transaction management tools that help you maintain data consistency even in complex scenarios.

Transactions

Database transactions ensure data consistency and integrity by grouping multiple database operations into atomic units. Django provides comprehensive transaction management tools that help you maintain data consistency even in complex scenarios.

Understanding Transactions

ACID Properties and Django

# Transaction basics in Django
from django.db import transaction
from django.db.models import F
from myapp.models import Account, Transaction as TransactionModel, Product, Order, OrderItem

# Atomic operations - all succeed or all fail
@transaction.atomic
def transfer_money(from_account_id, to_account_id, amount):
    """Transfer money between accounts atomically"""
    
    # Get accounts with select_for_update to prevent race conditions
    from_account = Account.objects.select_for_update().get(id=from_account_id)
    to_account = Account.objects.select_for_update().get(id=to_account_id)
    
    # Validate sufficient funds
    if from_account.balance < amount:
        raise ValueError("Insufficient funds")
    
    # Perform the transfer
    from_account.balance = F('balance') - amount
    to_account.balance = F('balance') + amount
    
    from_account.save(update_fields=['balance'])
    to_account.save(update_fields=['balance'])
    
    # Create transaction records
    TransactionModel.objects.create(
        account=from_account,
        amount=-amount,
        transaction_type='transfer_out',
        description=f'Transfer to account {to_account_id}'
    )
    
    TransactionModel.objects.create(
        account=to_account,
        amount=amount,
        transaction_type='transfer_in',
        description=f'Transfer from account {from_account_id}'
    )
    
    return True

# Context manager approach
def create_order_with_items(customer, items_data):
    """Create order with items in a transaction"""
    
    with transaction.atomic():
        # Create the order
        order = Order.objects.create(
            customer=customer,
            status='pending',
            total_amount=0
        )
        
        total_amount = 0
        
        for item_data in items_data:
            product = Product.objects.select_for_update().get(
                id=item_data['product_id']
            )
            
            # Check stock availability
            if product.stock_quantity < item_data['quantity']:
                raise ValueError(f"Insufficient stock for {product.name}")
            
            # Create order item
            order_item = OrderItem.objects.create(
                order=order,
                product=product,
                quantity=item_data['quantity'],
                unit_price=product.price
            )
            
            # Update stock
            product.stock_quantity = F('stock_quantity') - item_data['quantity']
            product.save(update_fields=['stock_quantity'])
            
            total_amount += order_item.quantity * order_item.unit_price
        
        # Update order total
        order.total_amount = total_amount
        order.save(update_fields=['total_amount'])
        
        return order

Transaction Decorators and Context Managers

Atomic Decorator

from django.db import transaction
from django.http import JsonResponse
from django.views.decorators.http import require_http_methods

@transaction.atomic
@require_http_methods(["POST"])
def process_payment(request):
    """Process payment with automatic transaction management"""
    
    payment_data = json.loads(request.body)
    
    try:
        # Create payment record
        payment = Payment.objects.create(
            user=request.user,
            amount=payment_data['amount'],
            payment_method=payment_data['method'],
            status='processing'
        )
        
        # Process with external payment gateway
        gateway_response = process_with_gateway(payment_data)
        
        if gateway_response['success']:
            payment.status = 'completed'
            payment.gateway_transaction_id = gateway_response['transaction_id']
            payment.save()
            
            # Update user account
            user_account = Account.objects.select_for_update().get(user=request.user)
            user_account.balance = F('balance') + payment_data['amount']
            user_account.save()
            
            return JsonResponse({'success': True, 'payment_id': payment.id})
        else:
            payment.status = 'failed'
            payment.error_message = gateway_response['error']
            payment.save()
            
            return JsonResponse({'success': False, 'error': gateway_response['error']})
            
    except Exception as e:
        # Transaction will be rolled back automatically
        return JsonResponse({'success': False, 'error': str(e)})

# Class-based view with atomic decorator
from django.views.generic import CreateView

class AtomicOrderCreateView(CreateView):
    model = Order
    
    @transaction.atomic
    def form_valid(self, form):
        """Override form_valid to add transaction management"""
        
        # Create the order
        self.object = form.save(commit=False)
        self.object.customer = self.request.user
        self.object.save()
        
        # Process cart items
        cart_items = CartItem.objects.filter(user=self.request.user)
        
        for cart_item in cart_items:
            # Check stock
            if cart_item.product.stock_quantity < cart_item.quantity:
                raise ValueError(f"Insufficient stock for {cart_item.product.name}")
            
            # Create order item
            OrderItem.objects.create(
                order=self.object,
                product=cart_item.product,
                quantity=cart_item.quantity,
                unit_price=cart_item.product.price
            )
            
            # Update stock
            cart_item.product.stock_quantity = F('stock_quantity') - cart_item.quantity
            cart_item.product.save()
        
        # Clear cart
        cart_items.delete()
        
        return super().form_valid(form)

Context Manager Approach

from django.db import transaction

def complex_business_operation(user_id, operation_data):
    """Complex operation with nested transactions and savepoints"""
    
    with transaction.atomic():
        user = User.objects.select_for_update().get(id=user_id)
        
        # Create audit log
        audit_log = AuditLog.objects.create(
            user=user,
            operation='complex_business_operation',
            status='started',
            data=operation_data
        )
        
        try:
            # Step 1: Update user profile
            user.profile.credits = F('credits') + operation_data['credit_adjustment']
            user.profile.save()
            
            # Step 2: Process each sub-operation
            for sub_op in operation_data['sub_operations']:
                # Create savepoint for each sub-operation
                with transaction.atomic():
                    process_sub_operation(user, sub_op)
            
            # Step 3: Send notifications
            send_operation_notifications(user, operation_data)
            
            # Update audit log
            audit_log.status = 'completed'
            audit_log.save()
            
            return {'success': True, 'audit_id': audit_log.id}
            
        except Exception as e:
            # Update audit log with error
            audit_log.status = 'failed'
            audit_log.error_message = str(e)
            audit_log.save()
            
            # Re-raise to trigger rollback
            raise

def process_sub_operation(user, sub_op_data):
    """Process individual sub-operation with its own transaction boundary"""
    
    if sub_op_data['type'] == 'purchase':
        product = Product.objects.select_for_update().get(id=sub_op_data['product_id'])
        
        if product.stock_quantity < sub_op_data['quantity']:
            raise ValueError(f"Insufficient stock for {product.name}")
        
        # Create purchase record
        Purchase.objects.create(
            user=user,
            product=product,
            quantity=sub_op_data['quantity'],
            unit_price=product.price
        )
        
        # Update stock
        product.stock_quantity = F('stock_quantity') - sub_op_data['quantity']
        product.save()
        
    elif sub_op_data['type'] == 'reward':
        # Award points or credits
        user.profile.reward_points = F('reward_points') + sub_op_data['points']
        user.profile.save()
        
        RewardTransaction.objects.create(
            user=user,
            points=sub_op_data['points'],
            reason=sub_op_data['reason']
        )

Savepoints and Nested Transactions

Using Savepoints

from django.db import transaction

def batch_import_with_error_handling(data_records):
    """Import data with savepoints for error recovery"""
    
    results = {
        'successful': 0,
        'failed': 0,
        'errors': []
    }
    
    with transaction.atomic():
        for i, record in enumerate(data_records):
            try:
                # Create savepoint for each record
                with transaction.atomic():
                    import_single_record(record)
                    results['successful'] += 1
                    
            except Exception as e:
                # Savepoint is automatically rolled back
                results['failed'] += 1
                results['errors'].append({
                    'record_index': i,
                    'error': str(e),
                    'record_data': record
                })
                
                # Continue with next record
                continue
    
    return results

def import_single_record(record_data):
    """Import a single record with validation"""
    
    # Validate required fields
    if not record_data.get('email'):
        raise ValueError("Email is required")
    
    # Check for duplicates
    if User.objects.filter(email=record_data['email']).exists():
        raise ValueError(f"User with email {record_data['email']} already exists")
    
    # Create user
    user = User.objects.create(
        username=record_data['username'],
        email=record_data['email'],
        first_name=record_data.get('first_name', ''),
        last_name=record_data.get('last_name', '')
    )
    
    # Create profile
    UserProfile.objects.create(
        user=user,
        phone=record_data.get('phone', ''),
        address=record_data.get('address', '')
    )
    
    return user

# Manual savepoint management
def advanced_savepoint_usage():
    """Advanced savepoint management"""
    
    with transaction.atomic():
        # Main transaction work
        main_record = MainModel.objects.create(name='Main Record')
        
        # Create manual savepoint
        savepoint_id = transaction.savepoint()
        
        try:
            # Risky operation
            risky_operation(main_record)
            
            # If successful, release savepoint
            transaction.savepoint_commit(savepoint_id)
            
        except Exception as e:
            # Rollback to savepoint
            transaction.savepoint_rollback(savepoint_id)
            
            # Log error but continue
            ErrorLog.objects.create(
                related_object=main_record,
                error_message=str(e)
            )
        
        # Continue with other operations
        additional_operations(main_record)

Transaction Isolation and Locking

Select for Update

from django.db import transaction
from django.db.models import F

def concurrent_safe_operations():
    """Handle concurrent access safely"""
    
    @transaction.atomic
    def reserve_inventory(product_id, quantity):
        """Reserve inventory with row-level locking"""
        
        # Lock the product row to prevent concurrent modifications
        product = Product.objects.select_for_update().get(id=product_id)
        
        if product.available_quantity < quantity:
            raise ValueError("Insufficient inventory")
        
        # Update inventory atomically
        product.available_quantity = F('available_quantity') - quantity
        product.reserved_quantity = F('reserved_quantity') + quantity
        product.save()
        
        # Create reservation record
        reservation = InventoryReservation.objects.create(
            product=product,
            quantity=quantity,
            expires_at=timezone.now() + timedelta(minutes=15)
        )
        
        return reservation
    
    @transaction.atomic
    def process_concurrent_orders(order_data_list):
        """Process multiple orders concurrently with proper locking"""
        
        # Group orders by product to minimize lock contention
        orders_by_product = {}
        for order_data in order_data_list:
            product_id = order_data['product_id']
            if product_id not in orders_by_product:
                orders_by_product[product_id] = []
            orders_by_product[product_id].append(order_data)
        
        # Process orders for each product
        for product_id, orders in orders_by_product.items():
            # Lock product once for all orders
            product = Product.objects.select_for_update().get(id=product_id)
            
            total_quantity = sum(order['quantity'] for order in orders)
            
            if product.stock_quantity < total_quantity:
                raise ValueError(f"Insufficient stock for product {product.name}")
            
            # Process all orders for this product
            for order_data in orders:
                create_order_item(product, order_data)
            
            # Update stock once
            product.stock_quantity = F('stock_quantity') - total_quantity
            product.save()

# Different locking strategies
def locking_strategies_examples():
    """Examples of different locking strategies"""
    
    # Standard select_for_update (exclusive lock)
    def exclusive_lock_example():
        with transaction.atomic():
            account = Account.objects.select_for_update().get(id=1)
            # Other transactions will wait
            account.balance = F('balance') + 100
            account.save()
    
    # Select for update with NOWAIT (fail fast)
    def nowait_lock_example():
        try:
            with transaction.atomic():
                account = Account.objects.select_for_update(nowait=True).get(id=1)
                account.balance = F('balance') + 100
                account.save()
        except DatabaseError:
            # Handle lock acquisition failure
            return "Resource is busy, try again later"
    
    # Select for update with SKIP LOCKED
    def skip_locked_example():
        """Process available records, skip locked ones"""
        with transaction.atomic():
            # Get unlocked pending orders
            pending_orders = Order.objects.select_for_update(
                skip_locked=True
            ).filter(status='pending')[:10]
            
            for order in pending_orders:
                process_order(order)
    
    # Shared lock (PostgreSQL)
    def shared_lock_example():
        with transaction.atomic():
            # Multiple readers can acquire shared locks
            products = Product.objects.select_for_update(
                of=('self',)  # PostgreSQL specific
            ).filter(category_id=1)
            
            # Read operations are allowed by other transactions
            return list(products.values('name', 'price'))

Transaction Performance and Optimization

Bulk Operations in Transactions

from django.db import transaction

class OptimizedTransactionOperations:
    """Optimized transaction patterns for performance"""
    
    @staticmethod
    @transaction.atomic
    def bulk_create_with_relations(data_list):
        """Efficiently create objects with relationships"""
        
        # Bulk create main objects
        main_objects = []
        for data in data_list:
            main_objects.append(MainModel(
                name=data['name'],
                description=data['description']
            ))
        
        created_objects = MainModel.objects.bulk_create(main_objects)
        
        # Bulk create related objects
        related_objects = []
        for i, data in enumerate(data_list):
            for related_data in data['related_items']:
                related_objects.append(RelatedModel(
                    main_object=created_objects[i],
                    value=related_data['value']
                ))
        
        RelatedModel.objects.bulk_create(related_objects)
        
        return created_objects
    
    @staticmethod
    @transaction.atomic
    def batch_update_with_validation(updates):
        """Batch update with validation and rollback on any failure"""
        
        objects_to_update = []
        
        # Validate all updates first
        for update_data in updates:
            obj = MyModel.objects.select_for_update().get(id=update_data['id'])
            
            # Validate update
            if not validate_update(obj, update_data):
                raise ValueError(f"Invalid update for object {obj.id}")
            
            # Apply changes
            for field, value in update_data['changes'].items():
                setattr(obj, field, value)
            
            objects_to_update.append(obj)
        
        # Bulk update if all validations pass
        MyModel.objects.bulk_update(
            objects_to_update,
            [field for field in update_data['changes'].keys()]
        )
        
        return len(objects_to_update)
    
    @staticmethod
    def chunked_transaction_processing(queryset, chunk_size=1000):
        """Process large datasets in chunked transactions"""
        
        total_processed = 0
        
        # Process in chunks to avoid long-running transactions
        for chunk in queryset.iterator(chunk_size=chunk_size):
            chunk_list = list(chunk)
            
            with transaction.atomic():
                for obj in chunk_list:
                    process_single_object(obj)
                
                total_processed += len(chunk_list)
                
                # Optional: Add progress logging
                if total_processed % (chunk_size * 10) == 0:
                    logger.info(f"Processed {total_processed} objects")
        
        return total_processed

# Connection and transaction pooling
def connection_management_examples():
    """Examples of connection and transaction management"""
    
    # Using specific database connections
    def multi_database_transaction():
        """Transaction across multiple databases"""
        
        # Note: Django doesn't support distributed transactions
        # Each database transaction is separate
        
        with transaction.atomic(using='primary'):
            # Operations on primary database
            PrimaryModel.objects.create(name='Primary Record')
            
        with transaction.atomic(using='analytics'):
            # Operations on analytics database
            AnalyticsModel.objects.create(event='Record Created')
    
    # Manual transaction management
    def manual_transaction_control():
        """Manual transaction control for fine-grained management"""
        
        # Start transaction manually
        transaction.set_autocommit(False)
        
        try:
            # Perform operations
            obj1 = Model1.objects.create(name='Object 1')
            obj2 = Model2.objects.create(related=obj1)
            
            # Commit manually
            transaction.commit()
            
        except Exception as e:
            # Rollback on error
            transaction.rollback()
            raise
        finally:
            # Restore autocommit
            transaction.set_autocommit(True)

# Transaction middleware for request-level transactions
class TransactionMiddleware:
    """Middleware to wrap each request in a transaction"""
    
    def __init__(self, get_response):
        self.get_response = get_response
    
    def __call__(self, request):
        if request.method in ['POST', 'PUT', 'PATCH', 'DELETE']:
            with transaction.atomic():
                response = self.get_response(request)
                
                # Check for error responses and rollback if needed
                if response.status_code >= 400:
                    transaction.set_rollback(True)
                
                return response
        else:
            # Read-only requests don't need transactions
            return self.get_response(request)

Error Handling and Recovery

Transaction Error Patterns

from django.db import transaction, IntegrityError, DatabaseError
import logging

logger = logging.getLogger(__name__)

class TransactionErrorHandling:
    """Patterns for handling transaction errors"""
    
    @staticmethod
    def retry_on_deadlock(max_retries=3):
        """Decorator to retry operations on deadlock"""
        
        def decorator(func):
            def wrapper(*args, **kwargs):
                for attempt in range(max_retries):
                    try:
                        with transaction.atomic():
                            return func(*args, **kwargs)
                    
                    except DatabaseError as e:
                        if 'deadlock' in str(e).lower() and attempt < max_retries - 1:
                            logger.warning(f"Deadlock detected, retrying ({attempt + 1}/{max_retries})")
                            time.sleep(0.1 * (2 ** attempt))  # Exponential backoff
                            continue
                        raise
                
                raise Exception(f"Operation failed after {max_retries} attempts")
            
            return wrapper
        return decorator
    
    @staticmethod
    @retry_on_deadlock(max_retries=3)
    def concurrent_inventory_update(product_id, quantity_change):
        """Update inventory with deadlock retry"""
        
        product = Product.objects.select_for_update().get(id=product_id)
        
        new_quantity = product.stock_quantity + quantity_change
        if new_quantity < 0:
            raise ValueError("Insufficient inventory")
        
        product.stock_quantity = new_quantity
        product.save()
        
        return product
    
    @staticmethod
    def graceful_constraint_handling():
        """Handle constraint violations gracefully"""
        
        try:
            with transaction.atomic():
                # Attempt to create user
                user = User.objects.create(
                    username='newuser',
                    email='user@example.com'
                )
                
                # Create profile
                UserProfile.objects.create(
                    user=user,
                    phone='123-456-7890'
                )
                
                return user
                
        except IntegrityError as e:
            if 'unique constraint' in str(e).lower():
                # Handle duplicate user
                logger.info("User already exists, returning existing user")
                return User.objects.get(email='user@example.com')
            else:
                # Re-raise other integrity errors
                raise
    
    @staticmethod
    def partial_failure_recovery(operations_data):
        """Handle partial failures in batch operations"""
        
        successful_operations = []
        failed_operations = []
        
        for operation_data in operations_data:
            try:
                with transaction.atomic():
                    result = perform_operation(operation_data)
                    successful_operations.append({
                        'operation': operation_data,
                        'result': result
                    })
                    
            except Exception as e:
                failed_operations.append({
                    'operation': operation_data,
                    'error': str(e)
                })
                
                # Log error but continue with other operations
                logger.error(f"Operation failed: {e}", extra={
                    'operation_data': operation_data
                })
        
        return {
            'successful': successful_operations,
            'failed': failed_operations,
            'success_rate': len(successful_operations) / len(operations_data)
        }

# Custom transaction context manager
class CustomTransactionManager:
    """Custom transaction manager with additional features"""
    
    def __init__(self, using=None, savepoint=True, rollback_on_error=True):
        self.using = using
        self.savepoint = savepoint
        self.rollback_on_error = rollback_on_error
        self.start_time = None
        self.operation_log = []
    
    def __enter__(self):
        self.start_time = time.time()
        self.atomic = transaction.atomic(using=self.using, savepoint=self.savepoint)
        return self.atomic.__enter__()
    
    def __exit__(self, exc_type, exc_value, traceback):
        duration = time.time() - self.start_time
        
        if exc_type is not None:
            logger.error(f"Transaction failed after {duration:.2f}s: {exc_value}")
            
            if self.rollback_on_error:
                # Force rollback
                transaction.set_rollback(True, using=self.using)
        else:
            logger.info(f"Transaction completed successfully in {duration:.2f}s")
        
        return self.atomic.__exit__(exc_type, exc_value, traceback)
    
    def log_operation(self, operation_name, **kwargs):
        """Log operation within transaction"""
        self.operation_log.append({
            'operation': operation_name,
            'timestamp': time.time() - self.start_time,
            'details': kwargs
        })

# Usage
def complex_operation_with_logging():
    """Example using custom transaction manager"""
    
    with CustomTransactionManager() as tx_manager:
        tx_manager.log_operation('start', user_id=123)
        
        # Perform operations
        user = User.objects.get(id=123)
        tx_manager.log_operation('user_loaded', username=user.username)
        
        # Update user
        user.last_login = timezone.now()
        user.save()
        tx_manager.log_operation('user_updated')
        
        # Create audit record
        AuditLog.objects.create(
            user=user,
            action='login',
            timestamp=timezone.now()
        )
        tx_manager.log_operation('audit_created')

Understanding Django's transaction system enables you to build robust applications that maintain data consistency even under concurrent access and error conditions. Proper transaction management is crucial for financial applications, inventory systems, and any scenario where data integrity is paramount.