Caching

Low Level Cache API

Django's low-level cache API provides fine-grained control over caching operations, enabling sophisticated caching strategies for specific data, computed results, and complex objects. This approach offers maximum flexibility for optimizing application performance through targeted caching of expensive operations, database queries, and external API calls.

Low Level Cache API

Django's low-level cache API provides fine-grained control over caching operations, enabling sophisticated caching strategies for specific data, computed results, and complex objects. This approach offers maximum flexibility for optimizing application performance through targeted caching of expensive operations, database queries, and external API calls.

Basic Cache Operations

Core Cache Methods

# Basic cache operations
from django.core.cache import cache
from django.core.cache import caches
import json
import pickle

# Basic get/set operations
def basic_cache_operations():
    """Demonstrate basic cache operations."""
    
    # Set a value
    cache.set('my_key', 'my_value', timeout=300)  # 5 minutes
    
    # Get a value
    value = cache.get('my_key')
    print(f"Cached value: {value}")
    
    # Get with default
    value = cache.get('nonexistent_key', 'default_value')
    print(f"Value with default: {value}")
    
    # Check if key exists
    if 'my_key' in cache:
        print("Key exists in cache")
    
    # Delete a key
    cache.delete('my_key')
    
    # Set multiple values
    cache.set_many({
        'key1': 'value1',
        'key2': 'value2',
        'key3': 'value3'
    }, timeout=600)
    
    # Get multiple values
    values = cache.get_many(['key1', 'key2', 'key3'])
    print(f"Multiple values: {values}")
    
    # Delete multiple keys
    cache.delete_many(['key1', 'key2', 'key3'])
    
    # Clear entire cache (use with caution!)
    # cache.clear()

# Working with different cache backends
def multi_cache_operations():
    """Use multiple cache backends."""
    
    # Default cache
    default_cache = cache
    
    # Specific cache backend
    redis_cache = caches['redis']
    memcached_cache = caches['memcached']
    
    # Store in different caches
    default_cache.set('user_data', {'id': 1, 'name': 'John'})
    redis_cache.set('session_data', {'session_id': 'abc123'})
    memcached_cache.set('temp_data', 'temporary_value')

Advanced Cache Operations

# Advanced cache operations
from django.core.cache import cache
import time
import threading

class AdvancedCacheOperations:
    """Advanced caching patterns and operations."""
    
    def atomic_increment(self, key, delta=1, default=0):
        """Atomically increment a cached counter."""
        try:
            # Try to increment existing value
            return cache.incr(key, delta)
        except ValueError:
            # Key doesn't exist, set initial value
            cache.set(key, default + delta, timeout=None)
            return default + delta
    
    def atomic_decrement(self, key, delta=1, default=0):
        """Atomically decrement a cached counter."""
        try:
            return cache.decr(key, delta)
        except ValueError:
            # Key doesn't exist, set initial value
            initial_value = max(0, default - delta)
            cache.set(key, initial_value, timeout=None)
            return initial_value
    
    def get_or_set_with_lock(self, key, callable_func, timeout=300, lock_timeout=10):
        """Get cached value or set it with a lock to prevent cache stampede."""
        # Try to get cached value first
        value = cache.get(key)
        if value is not None:
            return value
        
        # Use a lock to prevent multiple processes from computing the same value
        lock_key = f"{key}:lock"
        
        # Try to acquire lock
        if cache.add(lock_key, "locked", timeout=lock_timeout):
            try:
                # Double-check cache after acquiring lock
                value = cache.get(key)
                if value is not None:
                    return value
                
                # Compute and cache the value
                value = callable_func()
                cache.set(key, value, timeout)
                return value
            
            finally:
                # Always release the lock
                cache.delete(lock_key)
        else:
            # Lock is held by another process, wait and retry
            time.sleep(0.1)
            return self.get_or_set_with_lock(key, callable_func, timeout, lock_timeout)
    
    def cache_with_tags(self, key, value, tags, timeout=300):
        """Cache value with associated tags for group invalidation."""
        # Store the main value
        cache.set(key, value, timeout)
        
        # Store tag associations
        for tag in tags:
            tag_key = f"tag:{tag}"
            tagged_keys = cache.get(tag_key, set())
            tagged_keys.add(key)
            cache.set(tag_key, tagged_keys, timeout)
    
    def invalidate_by_tag(self, tag):
        """Invalidate all cached values associated with a tag."""
        tag_key = f"tag:{tag}"
        tagged_keys = cache.get(tag_key, set())
        
        if tagged_keys:
            # Delete all tagged keys
            cache.delete_many(list(tagged_keys))
            # Delete the tag itself
            cache.delete(tag_key)
    
    def cache_with_dependency(self, key, value, dependencies, timeout=300):
        """Cache value with dependencies for automatic invalidation."""
        # Store the main value
        cache.set(key, value, timeout)
        
        # Store dependency information
        dep_key = f"{key}:deps"
        cache.set(dep_key, dependencies, timeout)
        
        # Register this key with each dependency
        for dep in dependencies:
            dep_list_key = f"dep:{dep}"
            dependent_keys = cache.get(dep_list_key, set())
            dependent_keys.add(key)
            cache.set(dep_list_key, dependent_keys, timeout)
    
    def invalidate_dependents(self, dependency):
        """Invalidate all cached values that depend on a specific dependency."""
        dep_list_key = f"dep:{dependency}"
        dependent_keys = cache.get(dep_list_key, set())
        
        if dependent_keys:
            # Delete all dependent keys and their dependency info
            keys_to_delete = list(dependent_keys)
            keys_to_delete.extend([f"{key}:deps" for key in dependent_keys])
            cache.delete_many(keys_to_delete)
            
            # Delete the dependency list
            cache.delete(dep_list_key)

# Usage examples
advanced_cache = AdvancedCacheOperations()

# Atomic counter
page_views = advanced_cache.atomic_increment('page_views')
print(f"Page views: {page_views}")

# Cache with lock (prevents cache stampede)
def expensive_computation():
    time.sleep(2)  # Simulate expensive operation
    return "computed_result"

result = advanced_cache.get_or_set_with_lock(
    'expensive_data',
    expensive_computation,
    timeout=600
)

# Cache with tags
advanced_cache.cache_with_tags(
    'user_profile_123',
    {'name': 'John', 'email': 'john@example.com'},
    tags=['user_123', 'profiles'],
    timeout=1800
)

# Invalidate by tag
advanced_cache.invalidate_by_tag('user_123')

Caching Patterns

Memoization Pattern

# Memoization with caching
from functools import wraps
from django.core.cache import cache
import hashlib
import pickle

def memoize(timeout=300, cache_key_func=None):
    """Decorator to memoize function results in cache."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Generate cache key
            if cache_key_func:
                cache_key = cache_key_func(*args, **kwargs)
            else:
                # Default key generation
                key_data = pickle.dumps((func.__name__, args, sorted(kwargs.items())))
                cache_key = f"memoize:{hashlib.md5(key_data).hexdigest()}"
            
            # Try cache first
            result = cache.get(cache_key)
            if result is not None:
                return result
            
            # Compute and cache result
            result = func(*args, **kwargs)
            cache.set(cache_key, result, timeout)
            
            return result
        
        # Add cache control methods
        wrapper.cache_clear = lambda: cache.clear()
        wrapper.cache_info = lambda: {"cache_key": cache_key}
        
        return wrapper
    return decorator

# Usage examples
@memoize(timeout=600)
def fibonacci(n):
    """Cached fibonacci calculation."""
    if n < 2:
        return n
    return fibonacci(n-1) + fibonacci(n-2)

@memoize(timeout=1800, cache_key_func=lambda user_id: f"user_stats_{user_id}")
def get_user_statistics(user_id):
    """Get cached user statistics."""
    from django.db.models import Count, Avg
    from .models import Post, Comment
    
    stats = {
        'post_count': Post.objects.filter(author_id=user_id).count(),
        'comment_count': Comment.objects.filter(author_id=user_id).count(),
        'avg_post_length': Post.objects.filter(author_id=user_id).aggregate(
            avg_length=Avg('content_length')
        )['avg_length'] or 0,
    }
    
    return stats

# Usage
user_stats = get_user_statistics(123)  # First call: computed and cached
user_stats = get_user_statistics(123)  # Second call: returned from cache

Cache-Aside Pattern

# Cache-aside pattern implementation
from django.core.cache import cache
from django.db import models
from .models import Post, User

class CacheAsideManager:
    """Implement cache-aside pattern for model operations."""
    
    def __init__(self, model_class, cache_timeout=3600):
        self.model_class = model_class
        self.cache_timeout = cache_timeout
    
    def get_cache_key(self, pk):
        """Generate cache key for model instance."""
        return f"{self.model_class._meta.label_lower}:{pk}"
    
    def get_by_pk(self, pk):
        """Get model instance with cache-aside pattern."""
        cache_key = self.get_cache_key(pk)
        
        # Try cache first
        instance = cache.get(cache_key)
        if instance is not None:
            return instance
        
        # Cache miss - get from database
        try:
            instance = self.model_class.objects.get(pk=pk)
            # Cache the instance
            cache.set(cache_key, instance, self.cache_timeout)
            return instance
        except self.model_class.DoesNotExist:
            # Cache negative result to prevent repeated DB queries
            cache.set(cache_key, None, 300)  # Cache for 5 minutes
            raise
    
    def update_instance(self, pk, **update_fields):
        """Update instance and invalidate cache."""
        # Update in database
        updated_count = self.model_class.objects.filter(pk=pk).update(**update_fields)
        
        if updated_count > 0:
            # Invalidate cache
            cache_key = self.get_cache_key(pk)
            cache.delete(cache_key)
            
            # Optionally, refresh cache immediately
            return self.get_by_pk(pk)
        
        return None
    
    def delete_instance(self, pk):
        """Delete instance and invalidate cache."""
        # Delete from database
        deleted_count = self.model_class.objects.filter(pk=pk).delete()[0]
        
        if deleted_count > 0:
            # Invalidate cache
            cache_key = self.get_cache_key(pk)
            cache.delete(cache_key)
        
        return deleted_count > 0

# Usage
post_cache = CacheAsideManager(Post, cache_timeout=1800)
user_cache = CacheAsideManager(User, cache_timeout=3600)

# Get cached post
post = post_cache.get_by_pk(123)

# Update post (invalidates cache)
updated_post = post_cache.update_instance(123, title="New Title")

# Delete post (invalidates cache)
post_cache.delete_instance(123)

Write-Through Pattern

# Write-through caching pattern
class WriteThroughCache:
    """Implement write-through caching pattern."""
    
    def __init__(self, model_class, cache_timeout=3600):
        self.model_class = model_class
        self.cache_timeout = cache_timeout
    
    def get_cache_key(self, pk):
        return f"{self.model_class._meta.label_lower}:wt:{pk}"
    
    def get(self, pk):
        """Get instance (cache-first)."""
        cache_key = self.get_cache_key(pk)
        
        # Try cache first
        instance = cache.get(cache_key)
        if instance is not None:
            return instance
        
        # Load from database and cache
        try:
            instance = self.model_class.objects.get(pk=pk)
            cache.set(cache_key, instance, self.cache_timeout)
            return instance
        except self.model_class.DoesNotExist:
            return None
    
    def create(self, **fields):
        """Create instance (write to both DB and cache)."""
        # Create in database
        instance = self.model_class.objects.create(**fields)
        
        # Immediately cache
        cache_key = self.get_cache_key(instance.pk)
        cache.set(cache_key, instance, self.cache_timeout)
        
        return instance
    
    def update(self, pk, **fields):
        """Update instance (write to both DB and cache)."""
        # Update in database
        updated_count = self.model_class.objects.filter(pk=pk).update(**fields)
        
        if updated_count > 0:
            # Get updated instance
            instance = self.model_class.objects.get(pk=pk)
            
            # Update cache
            cache_key = self.get_cache_key(pk)
            cache.set(cache_key, instance, self.cache_timeout)
            
            return instance
        
        return None
    
    def delete(self, pk):
        """Delete instance (remove from both DB and cache)."""
        # Delete from database
        deleted_count = self.model_class.objects.filter(pk=pk).delete()[0]
        
        if deleted_count > 0:
            # Remove from cache
            cache_key = self.get_cache_key(pk)
            cache.delete(cache_key)
        
        return deleted_count > 0

# Usage
wt_cache = WriteThroughCache(Post)

# Create (writes to both DB and cache)
new_post = wt_cache.create(title="New Post", content="Content")

# Update (writes to both DB and cache)
updated_post = wt_cache.update(new_post.pk, title="Updated Title")

# Get (cache-first)
post = wt_cache.get(new_post.pk)

Complex Data Caching

Caching Query Results

# Advanced query result caching
from django.core.cache import cache
from django.db import models
from django.db.models import Q, Count, Avg
import hashlib
import json

class QueryCache:
    """Cache complex database query results."""
    
    def __init__(self, cache_timeout=600):
        self.cache_timeout = cache_timeout
    
    def generate_query_key(self, model_class, filters=None, ordering=None, annotations=None):
        """Generate cache key for query parameters."""
        key_parts = [
            model_class._meta.label_lower,
            'query'
        ]
        
        # Add filters
        if filters:
            filter_str = json.dumps(filters, sort_keys=True)
            key_parts.append(hashlib.md5(filter_str.encode()).hexdigest())
        
        # Add ordering
        if ordering:
            order_str = ':'.join(ordering) if isinstance(ordering, (list, tuple)) else ordering
            key_parts.append(f"order_{hashlib.md5(order_str.encode()).hexdigest()}")
        
        # Add annotations
        if annotations:
            ann_str = json.dumps(annotations, sort_keys=True)
            key_parts.append(f"ann_{hashlib.md5(ann_str.encode()).hexdigest()}")
        
        return ':'.join(key_parts)
    
    def cached_query(self, queryset, cache_key=None, timeout=None):
        """Cache queryset results."""
        if cache_key is None:
            # Generate key from queryset
            cache_key = f"queryset:{hash(str(queryset.query))}"
        
        if timeout is None:
            timeout = self.cache_timeout
        
        # Try cache first
        cached_result = cache.get(cache_key)
        if cached_result is not None:
            return cached_result
        
        # Execute query and cache results
        result = list(queryset)
        cache.set(cache_key, result, timeout)
        
        return result
    
    def cached_aggregate(self, queryset, aggregations, cache_key=None, timeout=None):
        """Cache aggregate query results."""
        if cache_key is None:
            agg_str = json.dumps(aggregations, sort_keys=True)
            cache_key = f"aggregate:{hash(str(queryset.query))}:{hashlib.md5(agg_str.encode()).hexdigest()}"
        
        if timeout is None:
            timeout = self.cache_timeout
        
        # Try cache first
        cached_result = cache.get(cache_key)
        if cached_result is not None:
            return cached_result
        
        # Execute aggregate and cache result
        result = queryset.aggregate(**aggregations)
        cache.set(cache_key, result, timeout)
        
        return result
    
    def cached_count(self, queryset, cache_key=None, timeout=None):
        """Cache count query results."""
        if cache_key is None:
            cache_key = f"count:{hash(str(queryset.query))}"
        
        if timeout is None:
            timeout = self.cache_timeout
        
        # Try cache first
        cached_count = cache.get(cache_key)
        if cached_count is not None:
            return cached_count
        
        # Execute count and cache result
        count = queryset.count()
        cache.set(cache_key, count, timeout)
        
        return count

# Usage examples
query_cache = QueryCache(cache_timeout=900)  # 15 minutes

# Cache complex query
def get_popular_posts():
    """Get popular posts with caching."""
    queryset = Post.objects.filter(
        published=True
    ).annotate(
        comment_count=Count('comments')
    ).filter(
        comment_count__gte=5
    ).order_by('-views', '-created_at')[:10]
    
    cache_key = 'popular_posts_with_comments'
    return query_cache.cached_query(queryset, cache_key, timeout=1800)

# Cache aggregate results
def get_blog_statistics():
    """Get blog statistics with caching."""
    queryset = Post.objects.filter(published=True)
    
    aggregations = {
        'total_posts': Count('id'),
        'avg_views': Avg('views'),
        'total_views': models.Sum('views'),
    }
    
    cache_key = 'blog_statistics'
    return query_cache.cached_aggregate(queryset, aggregations, cache_key)

# Cache count queries
def get_published_post_count():
    """Get published post count with caching."""
    queryset = Post.objects.filter(published=True)
    cache_key = 'published_post_count'
    return query_cache.cached_count(queryset, cache_key, timeout=3600)

Caching External API Calls

# Cache external API responses
import requests
from django.core.cache import cache
import json
import hashlib
from datetime import datetime, timedelta

class APICache:
    """Cache external API responses."""
    
    def __init__(self, default_timeout=1800):
        self.default_timeout = default_timeout
    
    def generate_api_key(self, url, params=None, headers=None):
        """Generate cache key for API call."""
        key_parts = [url]
        
        if params:
            param_str = json.dumps(params, sort_keys=True)
            key_parts.append(hashlib.md5(param_str.encode()).hexdigest())
        
        if headers:
            # Only include specific headers that affect response
            cache_headers = {k: v for k, v in headers.items() 
                           if k.lower() in ['authorization', 'accept', 'content-type']}
            if cache_headers:
                header_str = json.dumps(cache_headers, sort_keys=True)
                key_parts.append(hashlib.md5(header_str.encode()).hexdigest())
        
        return f"api:{'_'.join(key_parts)}"
    
    def cached_get(self, url, params=None, headers=None, timeout=None, **kwargs):
        """Make cached GET request."""
        cache_key = self.generate_api_key(url, params, headers)
        
        # Try cache first
        cached_response = cache.get(cache_key)
        if cached_response is not None:
            return cached_response
        
        # Make API call
        try:
            response = requests.get(url, params=params, headers=headers, **kwargs)
            response.raise_for_status()
            
            # Cache successful response
            cache_data = {
                'status_code': response.status_code,
                'headers': dict(response.headers),
                'content': response.text,
                'json': response.json() if response.headers.get('content-type', '').startswith('application/json') else None,
                'cached_at': datetime.now().isoformat(),
            }
            
            cache_timeout = timeout or self.default_timeout
            cache.set(cache_key, cache_data, cache_timeout)
            
            return cache_data
        
        except requests.RequestException as e:
            # Don't cache errors, but log them
            import logging
            logger = logging.getLogger(__name__)
            logger.error(f"API request failed: {url} - {e}")
            raise
    
    def cached_post(self, url, data=None, json_data=None, headers=None, timeout=None, **kwargs):
        """Make cached POST request (use carefully - only for idempotent operations)."""
        # Generate key including request body
        key_data = {
            'url': url,
            'data': data,
            'json': json_data,
            'headers': headers
        }
        
        key_str = json.dumps(key_data, sort_keys=True)
        cache_key = f"api_post:{hashlib.md5(key_str.encode()).hexdigest()}"
        
        # Try cache first
        cached_response = cache.get(cache_key)
        if cached_response is not None:
            return cached_response
        
        # Make API call
        try:
            response = requests.post(
                url, 
                data=data, 
                json=json_data, 
                headers=headers, 
                **kwargs
            )
            response.raise_for_status()
            
            # Cache response
            cache_data = {
                'status_code': response.status_code,
                'headers': dict(response.headers),
                'content': response.text,
                'json': response.json() if response.headers.get('content-type', '').startswith('application/json') else None,
                'cached_at': datetime.now().isoformat(),
            }
            
            cache_timeout = timeout or self.default_timeout
            cache.set(cache_key, cache_data, cache_timeout)
            
            return cache_data
        
        except requests.RequestException as e:
            import logging
            logger = logging.getLogger(__name__)
            logger.error(f"API POST request failed: {url} - {e}")
            raise
    
    def invalidate_url_pattern(self, url_pattern):
        """Invalidate all cached responses matching URL pattern."""
        # This is a simplified implementation
        # In production, you might want to use Redis SCAN or similar
        pass

# Usage examples
api_cache = APICache(default_timeout=3600)

def get_weather_data(city):
    """Get weather data with caching."""
    url = "https://api.weather.com/v1/current"
    params = {
        'city': city,
        'units': 'metric'
    }
    headers = {
        'Authorization': 'Bearer YOUR_API_KEY'
    }
    
    try:
        response = api_cache.cached_get(
            url, 
            params=params, 
            headers=headers, 
            timeout=1800  # Cache for 30 minutes
        )
        
        return response['json'] if response['json'] else json.loads(response['content'])
    
    except requests.RequestException:
        # Return cached data if available, even if expired
        cache_key = api_cache.generate_api_key(url, params, headers)
        stale_data = cache.get(f"{cache_key}:stale")
        if stale_data:
            return stale_data
        
        # No cached data available
        return None

def get_exchange_rates():
    """Get exchange rates with caching and fallback."""
    url = "https://api.exchangerate.com/v4/latest/USD"
    
    try:
        response = api_cache.cached_get(url, timeout=3600)  # Cache for 1 hour
        return response['json']
    
    except requests.RequestException:
        # Use fallback rates if API is unavailable
        return {
            'base': 'USD',
            'rates': {
                'EUR': 0.85,
                'GBP': 0.73,
                'JPY': 110.0
            }
        }

Cache Monitoring and Debugging

Cache Performance Monitoring

# Cache performance monitoring
from django.core.cache import cache
import time
import logging
from functools import wraps

logger = logging.getLogger('cache_performance')

class CacheMonitor:
    """Monitor cache performance and operations."""
    
    def __init__(self):
        self.stats = {
            'hits': 0,
            'misses': 0,
            'sets': 0,
            'deletes': 0,
            'total_time': 0,
            'operations': 0
        }
    
    def record_operation(self, operation, duration, hit=None):
        """Record cache operation statistics."""
        self.stats['operations'] += 1
        self.stats['total_time'] += duration
        
        if operation == 'get':
            if hit:
                self.stats['hits'] += 1
            else:
                self.stats['misses'] += 1
        elif operation in ['set', 'delete']:
            self.stats[f"{operation}s"] += 1
    
    @property
    def hit_rate(self):
        """Calculate cache hit rate."""
        total_gets = self.stats['hits'] + self.stats['misses']
        if total_gets == 0:
            return 0
        return (self.stats['hits'] / total_gets) * 100
    
    @property
    def average_time(self):
        """Calculate average operation time."""
        if self.stats['operations'] == 0:
            return 0
        return self.stats['total_time'] / self.stats['operations']
    
    def get_stats(self):
        """Get current statistics."""
        return {
            **self.stats,
            'hit_rate': self.hit_rate,
            'average_time': self.average_time
        }
    
    def reset_stats(self):
        """Reset statistics."""
        self.stats = {
            'hits': 0,
            'misses': 0,
            'sets': 0,
            'deletes': 0,
            'total_time': 0,
            'operations': 0
        }

# Global monitor instance
cache_monitor = CacheMonitor()

def monitored_cache_operation(operation):
    """Decorator to monitor cache operations."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time.time()
            
            try:
                result = func(*args, **kwargs)
                duration = time.time() - start_time
                
                # Determine if it was a hit (for get operations)
                hit = None
                if operation == 'get' and len(args) > 0:
                    hit = result is not None
                
                cache_monitor.record_operation(operation, duration, hit)
                
                # Log operation
                logger.debug(
                    f"Cache {operation}: {args[0] if args else 'N/A'} "
                    f"({duration:.4f}s) {'HIT' if hit else 'MISS' if hit is False else ''}"
                )
                
                return result
            
            except Exception as e:
                duration = time.time() - start_time
                cache_monitor.record_operation(operation, duration)
                logger.error(f"Cache {operation} error: {e}")
                raise
        
        return wrapper
    return decorator

# Monitored cache operations
@monitored_cache_operation('get')
def monitored_get(key, default=None):
    return cache.get(key, default)

@monitored_cache_operation('set')
def monitored_set(key, value, timeout=None):
    return cache.set(key, value, timeout)

@monitored_cache_operation('delete')
def monitored_delete(key):
    return cache.delete(key)

# Cache debugging utilities
class CacheDebugger:
    """Utilities for debugging cache issues."""
    
    @staticmethod
    def inspect_cache_key(key):
        """Inspect cache key and its value."""
        value = cache.get(key)
        
        info = {
            'key': key,
            'exists': value is not None,
            'value_type': type(value).__name__ if value is not None else None,
            'value_size': len(str(value)) if value is not None else 0,
        }
        
        # Try to get TTL (Redis-specific)
        try:
            from django_redis import get_redis_connection
            redis_conn = get_redis_connection("default")
            ttl = redis_conn.ttl(key)
            info['ttl'] = ttl if ttl > 0 else None
        except:
            info['ttl'] = 'unknown'
        
        return info
    
    @staticmethod
    def list_cache_keys(pattern="*"):
        """List cache keys matching pattern (Redis-specific)."""
        try:
            from django_redis import get_redis_connection
            redis_conn = get_redis_connection("default")
            keys = redis_conn.keys(pattern)
            return [key.decode() if isinstance(key, bytes) else key for key in keys]
        except:
            return []
    
    @staticmethod
    def cache_memory_usage():
        """Get cache memory usage information (Redis-specific)."""
        try:
            from django_redis import get_redis_connection
            redis_conn = get_redis_connection("default")
            info = redis_conn.info('memory')
            return {
                'used_memory': info.get('used_memory'),
                'used_memory_human': info.get('used_memory_human'),
                'used_memory_peak': info.get('used_memory_peak'),
                'used_memory_peak_human': info.get('used_memory_peak_human'),
            }
        except:
            return {}

# Management command for cache monitoring
from django.core.management.base import BaseCommand

class Command(BaseCommand):
    help = 'Monitor cache performance'
    
    def add_arguments(self, parser):
        parser.add_argument(
            '--reset',
            action='store_true',
            help='Reset cache statistics'
        )
        parser.add_argument(
            '--inspect',
            type=str,
            help='Inspect specific cache key'
        )
    
    def handle(self, *args, **options):
        if options['reset']:
            cache_monitor.reset_stats()
            self.stdout.write('Cache statistics reset')
            return
        
        if options['inspect']:
            key = options['inspect']
            info = CacheDebugger.inspect_cache_key(key)
            self.stdout.write(f"Cache key inspection: {key}")
            for k, v in info.items():
                self.stdout.write(f"  {k}: {v}")
            return
        
        # Show current statistics
        stats = cache_monitor.get_stats()
        self.stdout.write('Cache Performance Statistics:')
        self.stdout.write(f"  Hit rate: {stats['hit_rate']:.2f}%")
        self.stdout.write(f"  Total operations: {stats['operations']}")
        self.stdout.write(f"  Hits: {stats['hits']}")
        self.stdout.write(f"  Misses: {stats['misses']}")
        self.stdout.write(f"  Sets: {stats['sets']}")
        self.stdout.write(f"  Deletes: {stats['deletes']}")
        self.stdout.write(f"  Average time: {stats['average_time']:.4f}s")
        
        # Memory usage
        memory_info = CacheDebugger.cache_memory_usage()
        if memory_info:
            self.stdout.write('\nMemory Usage:')
            for k, v in memory_info.items():
                self.stdout.write(f"  {k}: {v}")

Django's low-level cache API provides powerful tools for implementing sophisticated caching strategies. The key is understanding when to use each pattern and implementing proper monitoring to ensure cache effectiveness. Start with simple get/set operations and gradually implement more advanced patterns like memoization, cache-aside, and write-through as your application's caching needs become more complex.