import logging import time import uuid import threading import os from functools import wraps from datetime import datetime, timedelta import pytz from typing import Callable, Any, Dict, Optional, List, Tuple, Set import gc import heapq # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', ) logger = logging.getLogger(__name__) # Asia/Ho_Chi_Minh timezone asia_tz = pytz.timezone('Asia/Ho_Chi_Minh') def generate_uuid(): """Generate a unique identifier""" return str(uuid.uuid4()) def get_current_time(): """Get current time in ISO format""" return datetime.now().isoformat() def get_local_time(): """Get current time in Asia/Ho_Chi_Minh timezone""" return datetime.now(asia_tz).strftime("%Y-%m-%d %H:%M:%S") def get_local_datetime(): """Get current datetime object in Asia/Ho_Chi_Minh timezone""" return datetime.now(asia_tz) # For backward compatibility get_vietnam_time = get_local_time get_vietnam_datetime = get_local_datetime def timer_decorator(func: Callable) -> Callable: """ Decorator to time function execution and log results. """ @wraps(func) async def wrapper(*args, **kwargs): start_time = time.time() try: result = await func(*args, **kwargs) elapsed_time = time.time() - start_time logger.info(f"Function {func.__name__} executed in {elapsed_time:.4f} seconds") return result except Exception as e: elapsed_time = time.time() - start_time logger.error(f"Function {func.__name__} failed after {elapsed_time:.4f} seconds: {e}") raise return wrapper def sanitize_input(text): """Sanitize input text""" if not text: return "" # Remove potential dangerous characters or patterns return text.strip() def truncate_text(text, max_length=100): """ Truncate text to given max length and add ellipsis. """ if not text or len(text) <= max_length: return text return text[:max_length] + "..." class CacheStrategy: """Cache loading strategy enumeration""" LAZY = "lazy" # Only load items into cache when requested EAGER = "eager" # Preload items into cache at initialization MIXED = "mixed" # Preload high-priority items, lazy load others class CacheItem: """Represents an item in the cache with metadata""" def __init__(self, key: str, value: Any, ttl: int = 300, priority: int = 1): self.key = key self.value = value self.expiry = datetime.now() + timedelta(seconds=ttl) self.priority = priority # Higher number = higher priority self.access_count = 0 # Track number of accesses self.last_accessed = datetime.now() def is_expired(self) -> bool: """Check if the item is expired""" return datetime.now() > self.expiry def touch(self): """Update last accessed time and access count""" self.last_accessed = datetime.now() self.access_count += 1 def __lt__(self, other): """For heap comparisons - lower priority items are evicted first""" # First compare priority if self.priority != other.priority: return self.priority < other.priority # Then compare access frequency (less frequently accessed items are evicted first) if self.access_count != other.access_count: return self.access_count < other.access_count # Finally compare last access time (oldest accessed first) return self.last_accessed < other.last_accessed def get_size(self) -> int: """Approximate memory size of the cache item in bytes""" try: import sys return sys.getsizeof(self.value) + sys.getsizeof(self.key) + 64 # Additional overhead except: # Default estimate if we can't get the size return 1024 # Enhanced in-memory cache implementation class EnhancedCache: def __init__(self, strategy: str = "lazy", max_items: int = 10000, max_size_mb: int = 100, cleanup_interval: int = 60, stats_enabled: bool = True): """ Initialize enhanced cache with configurable strategy. Args: strategy: Cache loading strategy (lazy, eager, mixed) max_items: Maximum number of items to store in cache max_size_mb: Maximum size of cache in MB cleanup_interval: Interval in seconds to run cleanup stats_enabled: Whether to collect cache statistics """ self._cache: Dict[str, CacheItem] = {} self._namespace_cache: Dict[str, Set[str]] = {} # Tracking keys by namespace self._strategy = strategy self._max_items = max_items self._max_size_bytes = max_size_mb * 1024 * 1024 self._current_size_bytes = 0 self._stats_enabled = stats_enabled # Statistics self._hits = 0 self._misses = 0 self._evictions = 0 self._total_get_time = 0 self._total_set_time = 0 # Setup cleanup thread self._last_cleanup = datetime.now() self._cleanup_interval = cleanup_interval self._lock = threading.RLock() if cleanup_interval > 0: self._start_cleanup_thread(cleanup_interval) logger.info(f"Enhanced cache initialized with strategy={strategy}, max_items={max_items}, max_size={max_size_mb}MB") def _start_cleanup_thread(self, interval: int): """Start background thread for periodic cleanup""" def cleanup_worker(): while True: time.sleep(interval) try: self.cleanup() except Exception as e: logger.error(f"Error in cache cleanup: {e}") thread = threading.Thread(target=cleanup_worker, daemon=True) thread.start() logger.info(f"Cache cleanup thread started with interval {interval}s") def get(self, key: str, namespace: str = None) -> Optional[Any]: """Get value from cache if it exists and hasn't expired""" if self._stats_enabled: start_time = time.time() # Use namespaced key if namespace is provided cache_key = f"{namespace}:{key}" if namespace else key with self._lock: cache_item = self._cache.get(cache_key) if cache_item: if cache_item.is_expired(): # Clean up expired key self._remove_item(cache_key, namespace) if self._stats_enabled: self._misses += 1 value = None else: # Update access metadata cache_item.touch() if self._stats_enabled: self._hits += 1 value = cache_item.value else: if self._stats_enabled: self._misses += 1 value = None if self._stats_enabled: self._total_get_time += time.time() - start_time return value def set(self, key: str, value: Any, ttl: int = 300, priority: int = 1, namespace: str = None) -> None: """Set a value in the cache with TTL in seconds""" if self._stats_enabled: start_time = time.time() # Use namespaced key if namespace is provided cache_key = f"{namespace}:{key}" if namespace else key with self._lock: # Create cache item cache_item = CacheItem(cache_key, value, ttl, priority) item_size = cache_item.get_size() # Check if we need to make room if (len(self._cache) >= self._max_items or self._current_size_bytes + item_size > self._max_size_bytes): self._evict_items(item_size) # Update size tracking if cache_key in self._cache: # If replacing, subtract old size first self._current_size_bytes -= self._cache[cache_key].get_size() self._current_size_bytes += item_size # Store the item self._cache[cache_key] = cache_item # Update namespace tracking if namespace: if namespace not in self._namespace_cache: self._namespace_cache[namespace] = set() self._namespace_cache[namespace].add(cache_key) if self._stats_enabled: self._total_set_time += time.time() - start_time def delete(self, key: str, namespace: str = None) -> None: """Delete a key from the cache""" # Use namespaced key if namespace is provided cache_key = f"{namespace}:{key}" if namespace else key with self._lock: self._remove_item(cache_key, namespace) def _remove_item(self, key: str, namespace: str = None): """Internal method to remove an item and update tracking""" if key in self._cache: # Update size tracking self._current_size_bytes -= self._cache[key].get_size() # Remove from cache del self._cache[key] # Update namespace tracking if namespace and namespace in self._namespace_cache: if key in self._namespace_cache[namespace]: self._namespace_cache[namespace].remove(key) # Cleanup empty sets if not self._namespace_cache[namespace]: del self._namespace_cache[namespace] def _evict_items(self, needed_space: int = 0) -> None: """Evict items to make room in the cache""" if not self._cache: return with self._lock: # Convert cache items to a list for sorting items = list(self._cache.values()) # Sort by priority, access count, and last accessed time items.sort() # Uses the __lt__ method of CacheItem # Evict items until we have enough space space_freed = 0 evicted_count = 0 for item in items: # Stop if we've made enough room if (len(self._cache) - evicted_count <= self._max_items * 0.9 and (space_freed >= needed_space or self._current_size_bytes - space_freed <= self._max_size_bytes * 0.9)): break # Skip high priority items unless absolutely necessary if item.priority > 9 and evicted_count < len(items) // 2: continue # Evict this item item_size = item.get_size() namespace = item.key.split(':', 1)[0] if ':' in item.key else None self._remove_item(item.key, namespace) space_freed += item_size evicted_count += 1 if self._stats_enabled: self._evictions += 1 logger.info(f"Cache eviction: removed {evicted_count} items, freed {space_freed / 1024:.2f}KB") def clear(self, namespace: str = None) -> None: """ Clear the cache or a specific namespace """ with self._lock: if namespace: # Clear only keys in the specified namespace if namespace in self._namespace_cache: keys_to_remove = list(self._namespace_cache[namespace]) for key in keys_to_remove: self._remove_item(key, namespace) # The namespace should be auto-cleaned in _remove_item else: # Clear the entire cache self._cache.clear() self._namespace_cache.clear() self._current_size_bytes = 0 logger.info(f"Cache cleared{' for namespace ' + namespace if namespace else ''}") def cleanup(self) -> None: """Remove expired items and run garbage collection if needed""" with self._lock: now = datetime.now() # Only run if it's been at least cleanup_interval since last cleanup if (now - self._last_cleanup).total_seconds() < self._cleanup_interval: return # Find expired items expired_keys = [] for key, item in self._cache.items(): if item.is_expired(): expired_keys.append((key, key.split(':', 1)[0] if ':' in key else None)) # Remove expired items for key, namespace in expired_keys: self._remove_item(key, namespace) # Update last cleanup time self._last_cleanup = now # Run garbage collection if we removed several items if len(expired_keys) > 100: gc.collect() logger.info(f"Cache cleanup: removed {len(expired_keys)} expired items") def get_stats(self) -> Dict: """Get cache statistics""" with self._lock: if not self._stats_enabled: return {"stats_enabled": False} # Calculate hit rate total_requests = self._hits + self._misses hit_rate = (self._hits / total_requests) * 100 if total_requests > 0 else 0 # Calculate average times avg_get_time = (self._total_get_time / total_requests) * 1000 if total_requests > 0 else 0 avg_set_time = (self._total_set_time / self._evictions) * 1000 if self._evictions > 0 else 0 return { "stats_enabled": True, "item_count": len(self._cache), "max_items": self._max_items, "size_bytes": self._current_size_bytes, "max_size_bytes": self._max_size_bytes, "hits": self._hits, "misses": self._misses, "hit_rate_percent": round(hit_rate, 2), "evictions": self._evictions, "avg_get_time_ms": round(avg_get_time, 3), "avg_set_time_ms": round(avg_set_time, 3), "namespace_count": len(self._namespace_cache), "namespaces": list(self._namespace_cache.keys()) } def preload(self, items: List[Tuple[str, Any, int, int]], namespace: str = None) -> None: """ Preload a list of items into the cache Args: items: List of (key, value, ttl, priority) tuples namespace: Optional namespace for all items """ for key, value, ttl, priority in items: self.set(key, value, ttl, priority, namespace) logger.info(f"Preloaded {len(items)} items into cache{' namespace ' + namespace if namespace else ''}") def get_or_load(self, key: str, loader_func: Callable[[], Any], ttl: int = 300, priority: int = 1, namespace: str = None) -> Any: """ Get from cache or load using the provided function Args: key: Cache key loader_func: Function to call if cache miss occurs ttl: TTL in seconds priority: Item priority namespace: Optional namespace Returns: Cached or freshly loaded value """ # Try to get from cache first value = self.get(key, namespace) # If not in cache, load it if value is None: value = loader_func() # Only cache if we got a valid value if value is not None: self.set(key, value, ttl, priority, namespace) return value # Load cache configuration from environment variables CACHE_STRATEGY = os.getenv("CACHE_STRATEGY", "mixed") CACHE_MAX_ITEMS = int(os.getenv("CACHE_MAX_ITEMS", "10000")) CACHE_MAX_SIZE_MB = int(os.getenv("CACHE_MAX_SIZE_MB", "100")) CACHE_CLEANUP_INTERVAL = int(os.getenv("CACHE_CLEANUP_INTERVAL", "60")) CACHE_STATS_ENABLED = os.getenv("CACHE_STATS_ENABLED", "true").lower() in ("true", "1", "yes") # Initialize the enhanced cache cache = EnhancedCache( strategy=CACHE_STRATEGY, max_items=CACHE_MAX_ITEMS, max_size_mb=CACHE_MAX_SIZE_MB, cleanup_interval=CACHE_CLEANUP_INTERVAL, stats_enabled=CACHE_STATS_ENABLED ) # Backward compatibility for SimpleCache - for a transition period class SimpleCache: def __init__(self): """Legacy SimpleCache implementation that uses EnhancedCache underneath""" logger.warning("SimpleCache is deprecated, please use EnhancedCache directly") def get(self, key: str) -> Optional[Any]: """Get value from cache if it exists and hasn't expired""" return cache.get(key) def set(self, key: str, value: Any, ttl: int = 300) -> None: """Set a value in the cache with TTL in seconds""" cache.set(key, value, ttl) def delete(self, key: str) -> None: """Delete a key from the cache""" cache.delete(key) def clear(self) -> None: """Clear the entire cache""" cache.clear() def get_host_url(request) -> str: """ Get the host URL from a request object. """ host = request.headers.get("host", "localhost") scheme = request.headers.get("x-forwarded-proto", "http") return f"{scheme}://{host}" def format_time(timestamp): """ Format a timestamp into a human-readable string. """ return timestamp.strftime("%Y-%m-%d %H:%M:%S")