Database transactions ensure data consistency and integrity by grouping multiple database operations into atomic units. Django provides comprehensive transaction management tools that help you maintain data consistency even in complex scenarios.
# Transaction basics in Django
from django.db import transaction
from django.db.models import F
from myapp.models import Account, Transaction as TransactionModel, Product, Order, OrderItem
# Atomic operations - all succeed or all fail
@transaction.atomic
def transfer_money(from_account_id, to_account_id, amount):
"""Transfer money between accounts atomically"""
# Get accounts with select_for_update to prevent race conditions
from_account = Account.objects.select_for_update().get(id=from_account_id)
to_account = Account.objects.select_for_update().get(id=to_account_id)
# Validate sufficient funds
if from_account.balance < amount:
raise ValueError("Insufficient funds")
# Perform the transfer
from_account.balance = F('balance') - amount
to_account.balance = F('balance') + amount
from_account.save(update_fields=['balance'])
to_account.save(update_fields=['balance'])
# Create transaction records
TransactionModel.objects.create(
account=from_account,
amount=-amount,
transaction_type='transfer_out',
description=f'Transfer to account {to_account_id}'
)
TransactionModel.objects.create(
account=to_account,
amount=amount,
transaction_type='transfer_in',
description=f'Transfer from account {from_account_id}'
)
return True
# Context manager approach
def create_order_with_items(customer, items_data):
"""Create order with items in a transaction"""
with transaction.atomic():
# Create the order
order = Order.objects.create(
customer=customer,
status='pending',
total_amount=0
)
total_amount = 0
for item_data in items_data:
product = Product.objects.select_for_update().get(
id=item_data['product_id']
)
# Check stock availability
if product.stock_quantity < item_data['quantity']:
raise ValueError(f"Insufficient stock for {product.name}")
# Create order item
order_item = OrderItem.objects.create(
order=order,
product=product,
quantity=item_data['quantity'],
unit_price=product.price
)
# Update stock
product.stock_quantity = F('stock_quantity') - item_data['quantity']
product.save(update_fields=['stock_quantity'])
total_amount += order_item.quantity * order_item.unit_price
# Update order total
order.total_amount = total_amount
order.save(update_fields=['total_amount'])
return order
from django.db import transaction
from django.http import JsonResponse
from django.views.decorators.http import require_http_methods
@transaction.atomic
@require_http_methods(["POST"])
def process_payment(request):
"""Process payment with automatic transaction management"""
payment_data = json.loads(request.body)
try:
# Create payment record
payment = Payment.objects.create(
user=request.user,
amount=payment_data['amount'],
payment_method=payment_data['method'],
status='processing'
)
# Process with external payment gateway
gateway_response = process_with_gateway(payment_data)
if gateway_response['success']:
payment.status = 'completed'
payment.gateway_transaction_id = gateway_response['transaction_id']
payment.save()
# Update user account
user_account = Account.objects.select_for_update().get(user=request.user)
user_account.balance = F('balance') + payment_data['amount']
user_account.save()
return JsonResponse({'success': True, 'payment_id': payment.id})
else:
payment.status = 'failed'
payment.error_message = gateway_response['error']
payment.save()
return JsonResponse({'success': False, 'error': gateway_response['error']})
except Exception as e:
# Transaction will be rolled back automatically
return JsonResponse({'success': False, 'error': str(e)})
# Class-based view with atomic decorator
from django.views.generic import CreateView
class AtomicOrderCreateView(CreateView):
model = Order
@transaction.atomic
def form_valid(self, form):
"""Override form_valid to add transaction management"""
# Create the order
self.object = form.save(commit=False)
self.object.customer = self.request.user
self.object.save()
# Process cart items
cart_items = CartItem.objects.filter(user=self.request.user)
for cart_item in cart_items:
# Check stock
if cart_item.product.stock_quantity < cart_item.quantity:
raise ValueError(f"Insufficient stock for {cart_item.product.name}")
# Create order item
OrderItem.objects.create(
order=self.object,
product=cart_item.product,
quantity=cart_item.quantity,
unit_price=cart_item.product.price
)
# Update stock
cart_item.product.stock_quantity = F('stock_quantity') - cart_item.quantity
cart_item.product.save()
# Clear cart
cart_items.delete()
return super().form_valid(form)
from django.db import transaction
def complex_business_operation(user_id, operation_data):
"""Complex operation with nested transactions and savepoints"""
with transaction.atomic():
user = User.objects.select_for_update().get(id=user_id)
# Create audit log
audit_log = AuditLog.objects.create(
user=user,
operation='complex_business_operation',
status='started',
data=operation_data
)
try:
# Step 1: Update user profile
user.profile.credits = F('credits') + operation_data['credit_adjustment']
user.profile.save()
# Step 2: Process each sub-operation
for sub_op in operation_data['sub_operations']:
# Create savepoint for each sub-operation
with transaction.atomic():
process_sub_operation(user, sub_op)
# Step 3: Send notifications
send_operation_notifications(user, operation_data)
# Update audit log
audit_log.status = 'completed'
audit_log.save()
return {'success': True, 'audit_id': audit_log.id}
except Exception as e:
# Update audit log with error
audit_log.status = 'failed'
audit_log.error_message = str(e)
audit_log.save()
# Re-raise to trigger rollback
raise
def process_sub_operation(user, sub_op_data):
"""Process individual sub-operation with its own transaction boundary"""
if sub_op_data['type'] == 'purchase':
product = Product.objects.select_for_update().get(id=sub_op_data['product_id'])
if product.stock_quantity < sub_op_data['quantity']:
raise ValueError(f"Insufficient stock for {product.name}")
# Create purchase record
Purchase.objects.create(
user=user,
product=product,
quantity=sub_op_data['quantity'],
unit_price=product.price
)
# Update stock
product.stock_quantity = F('stock_quantity') - sub_op_data['quantity']
product.save()
elif sub_op_data['type'] == 'reward':
# Award points or credits
user.profile.reward_points = F('reward_points') + sub_op_data['points']
user.profile.save()
RewardTransaction.objects.create(
user=user,
points=sub_op_data['points'],
reason=sub_op_data['reason']
)
from django.db import transaction
def batch_import_with_error_handling(data_records):
"""Import data with savepoints for error recovery"""
results = {
'successful': 0,
'failed': 0,
'errors': []
}
with transaction.atomic():
for i, record in enumerate(data_records):
try:
# Create savepoint for each record
with transaction.atomic():
import_single_record(record)
results['successful'] += 1
except Exception as e:
# Savepoint is automatically rolled back
results['failed'] += 1
results['errors'].append({
'record_index': i,
'error': str(e),
'record_data': record
})
# Continue with next record
continue
return results
def import_single_record(record_data):
"""Import a single record with validation"""
# Validate required fields
if not record_data.get('email'):
raise ValueError("Email is required")
# Check for duplicates
if User.objects.filter(email=record_data['email']).exists():
raise ValueError(f"User with email {record_data['email']} already exists")
# Create user
user = User.objects.create(
username=record_data['username'],
email=record_data['email'],
first_name=record_data.get('first_name', ''),
last_name=record_data.get('last_name', '')
)
# Create profile
UserProfile.objects.create(
user=user,
phone=record_data.get('phone', ''),
address=record_data.get('address', '')
)
return user
# Manual savepoint management
def advanced_savepoint_usage():
"""Advanced savepoint management"""
with transaction.atomic():
# Main transaction work
main_record = MainModel.objects.create(name='Main Record')
# Create manual savepoint
savepoint_id = transaction.savepoint()
try:
# Risky operation
risky_operation(main_record)
# If successful, release savepoint
transaction.savepoint_commit(savepoint_id)
except Exception as e:
# Rollback to savepoint
transaction.savepoint_rollback(savepoint_id)
# Log error but continue
ErrorLog.objects.create(
related_object=main_record,
error_message=str(e)
)
# Continue with other operations
additional_operations(main_record)
from django.db import transaction
from django.db.models import F
def concurrent_safe_operations():
"""Handle concurrent access safely"""
@transaction.atomic
def reserve_inventory(product_id, quantity):
"""Reserve inventory with row-level locking"""
# Lock the product row to prevent concurrent modifications
product = Product.objects.select_for_update().get(id=product_id)
if product.available_quantity < quantity:
raise ValueError("Insufficient inventory")
# Update inventory atomically
product.available_quantity = F('available_quantity') - quantity
product.reserved_quantity = F('reserved_quantity') + quantity
product.save()
# Create reservation record
reservation = InventoryReservation.objects.create(
product=product,
quantity=quantity,
expires_at=timezone.now() + timedelta(minutes=15)
)
return reservation
@transaction.atomic
def process_concurrent_orders(order_data_list):
"""Process multiple orders concurrently with proper locking"""
# Group orders by product to minimize lock contention
orders_by_product = {}
for order_data in order_data_list:
product_id = order_data['product_id']
if product_id not in orders_by_product:
orders_by_product[product_id] = []
orders_by_product[product_id].append(order_data)
# Process orders for each product
for product_id, orders in orders_by_product.items():
# Lock product once for all orders
product = Product.objects.select_for_update().get(id=product_id)
total_quantity = sum(order['quantity'] for order in orders)
if product.stock_quantity < total_quantity:
raise ValueError(f"Insufficient stock for product {product.name}")
# Process all orders for this product
for order_data in orders:
create_order_item(product, order_data)
# Update stock once
product.stock_quantity = F('stock_quantity') - total_quantity
product.save()
# Different locking strategies
def locking_strategies_examples():
"""Examples of different locking strategies"""
# Standard select_for_update (exclusive lock)
def exclusive_lock_example():
with transaction.atomic():
account = Account.objects.select_for_update().get(id=1)
# Other transactions will wait
account.balance = F('balance') + 100
account.save()
# Select for update with NOWAIT (fail fast)
def nowait_lock_example():
try:
with transaction.atomic():
account = Account.objects.select_for_update(nowait=True).get(id=1)
account.balance = F('balance') + 100
account.save()
except DatabaseError:
# Handle lock acquisition failure
return "Resource is busy, try again later"
# Select for update with SKIP LOCKED
def skip_locked_example():
"""Process available records, skip locked ones"""
with transaction.atomic():
# Get unlocked pending orders
pending_orders = Order.objects.select_for_update(
skip_locked=True
).filter(status='pending')[:10]
for order in pending_orders:
process_order(order)
# Shared lock (PostgreSQL)
def shared_lock_example():
with transaction.atomic():
# Multiple readers can acquire shared locks
products = Product.objects.select_for_update(
of=('self',) # PostgreSQL specific
).filter(category_id=1)
# Read operations are allowed by other transactions
return list(products.values('name', 'price'))
from django.db import transaction
class OptimizedTransactionOperations:
"""Optimized transaction patterns for performance"""
@staticmethod
@transaction.atomic
def bulk_create_with_relations(data_list):
"""Efficiently create objects with relationships"""
# Bulk create main objects
main_objects = []
for data in data_list:
main_objects.append(MainModel(
name=data['name'],
description=data['description']
))
created_objects = MainModel.objects.bulk_create(main_objects)
# Bulk create related objects
related_objects = []
for i, data in enumerate(data_list):
for related_data in data['related_items']:
related_objects.append(RelatedModel(
main_object=created_objects[i],
value=related_data['value']
))
RelatedModel.objects.bulk_create(related_objects)
return created_objects
@staticmethod
@transaction.atomic
def batch_update_with_validation(updates):
"""Batch update with validation and rollback on any failure"""
objects_to_update = []
# Validate all updates first
for update_data in updates:
obj = MyModel.objects.select_for_update().get(id=update_data['id'])
# Validate update
if not validate_update(obj, update_data):
raise ValueError(f"Invalid update for object {obj.id}")
# Apply changes
for field, value in update_data['changes'].items():
setattr(obj, field, value)
objects_to_update.append(obj)
# Bulk update if all validations pass
MyModel.objects.bulk_update(
objects_to_update,
[field for field in update_data['changes'].keys()]
)
return len(objects_to_update)
@staticmethod
def chunked_transaction_processing(queryset, chunk_size=1000):
"""Process large datasets in chunked transactions"""
total_processed = 0
# Process in chunks to avoid long-running transactions
for chunk in queryset.iterator(chunk_size=chunk_size):
chunk_list = list(chunk)
with transaction.atomic():
for obj in chunk_list:
process_single_object(obj)
total_processed += len(chunk_list)
# Optional: Add progress logging
if total_processed % (chunk_size * 10) == 0:
logger.info(f"Processed {total_processed} objects")
return total_processed
# Connection and transaction pooling
def connection_management_examples():
"""Examples of connection and transaction management"""
# Using specific database connections
def multi_database_transaction():
"""Transaction across multiple databases"""
# Note: Django doesn't support distributed transactions
# Each database transaction is separate
with transaction.atomic(using='primary'):
# Operations on primary database
PrimaryModel.objects.create(name='Primary Record')
with transaction.atomic(using='analytics'):
# Operations on analytics database
AnalyticsModel.objects.create(event='Record Created')
# Manual transaction management
def manual_transaction_control():
"""Manual transaction control for fine-grained management"""
# Start transaction manually
transaction.set_autocommit(False)
try:
# Perform operations
obj1 = Model1.objects.create(name='Object 1')
obj2 = Model2.objects.create(related=obj1)
# Commit manually
transaction.commit()
except Exception as e:
# Rollback on error
transaction.rollback()
raise
finally:
# Restore autocommit
transaction.set_autocommit(True)
# Transaction middleware for request-level transactions
class TransactionMiddleware:
"""Middleware to wrap each request in a transaction"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
if request.method in ['POST', 'PUT', 'PATCH', 'DELETE']:
with transaction.atomic():
response = self.get_response(request)
# Check for error responses and rollback if needed
if response.status_code >= 400:
transaction.set_rollback(True)
return response
else:
# Read-only requests don't need transactions
return self.get_response(request)
from django.db import transaction, IntegrityError, DatabaseError
import logging
logger = logging.getLogger(__name__)
class TransactionErrorHandling:
"""Patterns for handling transaction errors"""
@staticmethod
def retry_on_deadlock(max_retries=3):
"""Decorator to retry operations on deadlock"""
def decorator(func):
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
with transaction.atomic():
return func(*args, **kwargs)
except DatabaseError as e:
if 'deadlock' in str(e).lower() and attempt < max_retries - 1:
logger.warning(f"Deadlock detected, retrying ({attempt + 1}/{max_retries})")
time.sleep(0.1 * (2 ** attempt)) # Exponential backoff
continue
raise
raise Exception(f"Operation failed after {max_retries} attempts")
return wrapper
return decorator
@staticmethod
@retry_on_deadlock(max_retries=3)
def concurrent_inventory_update(product_id, quantity_change):
"""Update inventory with deadlock retry"""
product = Product.objects.select_for_update().get(id=product_id)
new_quantity = product.stock_quantity + quantity_change
if new_quantity < 0:
raise ValueError("Insufficient inventory")
product.stock_quantity = new_quantity
product.save()
return product
@staticmethod
def graceful_constraint_handling():
"""Handle constraint violations gracefully"""
try:
with transaction.atomic():
# Attempt to create user
user = User.objects.create(
username='newuser',
email='user@example.com'
)
# Create profile
UserProfile.objects.create(
user=user,
phone='123-456-7890'
)
return user
except IntegrityError as e:
if 'unique constraint' in str(e).lower():
# Handle duplicate user
logger.info("User already exists, returning existing user")
return User.objects.get(email='user@example.com')
else:
# Re-raise other integrity errors
raise
@staticmethod
def partial_failure_recovery(operations_data):
"""Handle partial failures in batch operations"""
successful_operations = []
failed_operations = []
for operation_data in operations_data:
try:
with transaction.atomic():
result = perform_operation(operation_data)
successful_operations.append({
'operation': operation_data,
'result': result
})
except Exception as e:
failed_operations.append({
'operation': operation_data,
'error': str(e)
})
# Log error but continue with other operations
logger.error(f"Operation failed: {e}", extra={
'operation_data': operation_data
})
return {
'successful': successful_operations,
'failed': failed_operations,
'success_rate': len(successful_operations) / len(operations_data)
}
# Custom transaction context manager
class CustomTransactionManager:
"""Custom transaction manager with additional features"""
def __init__(self, using=None, savepoint=True, rollback_on_error=True):
self.using = using
self.savepoint = savepoint
self.rollback_on_error = rollback_on_error
self.start_time = None
self.operation_log = []
def __enter__(self):
self.start_time = time.time()
self.atomic = transaction.atomic(using=self.using, savepoint=self.savepoint)
return self.atomic.__enter__()
def __exit__(self, exc_type, exc_value, traceback):
duration = time.time() - self.start_time
if exc_type is not None:
logger.error(f"Transaction failed after {duration:.2f}s: {exc_value}")
if self.rollback_on_error:
# Force rollback
transaction.set_rollback(True, using=self.using)
else:
logger.info(f"Transaction completed successfully in {duration:.2f}s")
return self.atomic.__exit__(exc_type, exc_value, traceback)
def log_operation(self, operation_name, **kwargs):
"""Log operation within transaction"""
self.operation_log.append({
'operation': operation_name,
'timestamp': time.time() - self.start_time,
'details': kwargs
})
# Usage
def complex_operation_with_logging():
"""Example using custom transaction manager"""
with CustomTransactionManager() as tx_manager:
tx_manager.log_operation('start', user_id=123)
# Perform operations
user = User.objects.get(id=123)
tx_manager.log_operation('user_loaded', username=user.username)
# Update user
user.last_login = timezone.now()
user.save()
tx_manager.log_operation('user_updated')
# Create audit record
AuditLog.objects.create(
user=user,
action='login',
timestamp=timezone.now()
)
tx_manager.log_operation('audit_created')
Understanding Django's transaction system enables you to build robust applications that maintain data consistency even under concurrent access and error conditions. Proper transaction management is crucial for financial applications, inventory systems, and any scenario where data integrity is paramount.
Raw SQL Queries
While Django's ORM handles most database operations elegantly, there are times when you need the power and flexibility of raw SQL. Understanding how to safely execute raw SQL queries enables you to optimize performance, use database-specific features, and handle complex operations that are difficult to express with the ORM.
Multiple Databases
Django supports using multiple databases in a single application, enabling you to separate different types of data, implement read/write splitting, or integrate with legacy systems. Understanding how to configure and use multiple databases effectively is crucial for scalable applications.