Spaces:
Sleeping
Sleeping
import logging | |
import time | |
import uuid | |
import threading | |
import os | |
from functools import wraps | |
from datetime import datetime, timedelta | |
import pytz | |
from typing import Callable, Any, Dict, Optional, List, Tuple, Set | |
import gc | |
import heapq | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
) | |
logger = logging.getLogger(__name__) | |
# Asia/Ho_Chi_Minh timezone | |
asia_tz = pytz.timezone('Asia/Ho_Chi_Minh') | |
def generate_uuid(): | |
"""Generate a unique identifier""" | |
return str(uuid.uuid4()) | |
def get_current_time(): | |
"""Get current time in ISO format""" | |
return datetime.now().isoformat() | |
def get_local_time(): | |
"""Get current time in Asia/Ho_Chi_Minh timezone""" | |
return datetime.now(asia_tz).strftime("%Y-%m-%d %H:%M:%S") | |
def get_local_datetime(): | |
"""Get current datetime object in Asia/Ho_Chi_Minh timezone""" | |
return datetime.now(asia_tz) | |
# For backward compatibility | |
get_vietnam_time = get_local_time | |
get_vietnam_datetime = get_local_datetime | |
def timer_decorator(func: Callable) -> Callable: | |
""" | |
Decorator to time function execution and log results. | |
""" | |
async def wrapper(*args, **kwargs): | |
start_time = time.time() | |
try: | |
result = await func(*args, **kwargs) | |
elapsed_time = time.time() - start_time | |
logger.info(f"Function {func.__name__} executed in {elapsed_time:.4f} seconds") | |
return result | |
except Exception as e: | |
elapsed_time = time.time() - start_time | |
logger.error(f"Function {func.__name__} failed after {elapsed_time:.4f} seconds: {e}") | |
raise | |
return wrapper | |
def sanitize_input(text): | |
"""Sanitize input text""" | |
if not text: | |
return "" | |
# Remove potential dangerous characters or patterns | |
return text.strip() | |
def truncate_text(text, max_length=100): | |
""" | |
Truncate text to given max length and add ellipsis. | |
""" | |
if not text or len(text) <= max_length: | |
return text | |
return text[:max_length] + "..." | |
class CacheStrategy: | |
"""Cache loading strategy enumeration""" | |
LAZY = "lazy" # Only load items into cache when requested | |
EAGER = "eager" # Preload items into cache at initialization | |
MIXED = "mixed" # Preload high-priority items, lazy load others | |
class CacheItem: | |
"""Represents an item in the cache with metadata""" | |
def __init__(self, key: str, value: Any, ttl: int = 300, priority: int = 1): | |
self.key = key | |
self.value = value | |
self.expiry = datetime.now() + timedelta(seconds=ttl) | |
self.priority = priority # Higher number = higher priority | |
self.access_count = 0 # Track number of accesses | |
self.last_accessed = datetime.now() | |
def is_expired(self) -> bool: | |
"""Check if the item is expired""" | |
return datetime.now() > self.expiry | |
def touch(self): | |
"""Update last accessed time and access count""" | |
self.last_accessed = datetime.now() | |
self.access_count += 1 | |
def __lt__(self, other): | |
"""For heap comparisons - lower priority items are evicted first""" | |
# First compare priority | |
if self.priority != other.priority: | |
return self.priority < other.priority | |
# Then compare access frequency (less frequently accessed items are evicted first) | |
if self.access_count != other.access_count: | |
return self.access_count < other.access_count | |
# Finally compare last access time (oldest accessed first) | |
return self.last_accessed < other.last_accessed | |
def get_size(self) -> int: | |
"""Approximate memory size of the cache item in bytes""" | |
try: | |
import sys | |
return sys.getsizeof(self.value) + sys.getsizeof(self.key) + 64 # Additional overhead | |
except: | |
# Default estimate if we can't get the size | |
return 1024 | |
# Enhanced in-memory cache implementation | |
class EnhancedCache: | |
def __init__(self, | |
strategy: str = "lazy", | |
max_items: int = 10000, | |
max_size_mb: int = 100, | |
cleanup_interval: int = 60, | |
stats_enabled: bool = True): | |
""" | |
Initialize enhanced cache with configurable strategy. | |
Args: | |
strategy: Cache loading strategy (lazy, eager, mixed) | |
max_items: Maximum number of items to store in cache | |
max_size_mb: Maximum size of cache in MB | |
cleanup_interval: Interval in seconds to run cleanup | |
stats_enabled: Whether to collect cache statistics | |
""" | |
self._cache: Dict[str, CacheItem] = {} | |
self._namespace_cache: Dict[str, Set[str]] = {} # Tracking keys by namespace | |
self._strategy = strategy | |
self._max_items = max_items | |
self._max_size_bytes = max_size_mb * 1024 * 1024 | |
self._current_size_bytes = 0 | |
self._stats_enabled = stats_enabled | |
# Statistics | |
self._hits = 0 | |
self._misses = 0 | |
self._evictions = 0 | |
self._total_get_time = 0 | |
self._total_set_time = 0 | |
# Setup cleanup thread | |
self._last_cleanup = datetime.now() | |
self._cleanup_interval = cleanup_interval | |
self._lock = threading.RLock() | |
if cleanup_interval > 0: | |
self._start_cleanup_thread(cleanup_interval) | |
logger.info(f"Enhanced cache initialized with strategy={strategy}, max_items={max_items}, max_size={max_size_mb}MB") | |
def _start_cleanup_thread(self, interval: int): | |
"""Start background thread for periodic cleanup""" | |
def cleanup_worker(): | |
while True: | |
time.sleep(interval) | |
try: | |
self.cleanup() | |
except Exception as e: | |
logger.error(f"Error in cache cleanup: {e}") | |
thread = threading.Thread(target=cleanup_worker, daemon=True) | |
thread.start() | |
logger.info(f"Cache cleanup thread started with interval {interval}s") | |
def get(self, key: str, namespace: str = None) -> Optional[Any]: | |
"""Get value from cache if it exists and hasn't expired""" | |
if self._stats_enabled: | |
start_time = time.time() | |
# Use namespaced key if namespace is provided | |
cache_key = f"{namespace}:{key}" if namespace else key | |
with self._lock: | |
cache_item = self._cache.get(cache_key) | |
if cache_item: | |
if cache_item.is_expired(): | |
# Clean up expired key | |
self._remove_item(cache_key, namespace) | |
if self._stats_enabled: | |
self._misses += 1 | |
value = None | |
else: | |
# Update access metadata | |
cache_item.touch() | |
if self._stats_enabled: | |
self._hits += 1 | |
value = cache_item.value | |
else: | |
if self._stats_enabled: | |
self._misses += 1 | |
value = None | |
if self._stats_enabled: | |
self._total_get_time += time.time() - start_time | |
return value | |
def set(self, key: str, value: Any, ttl: int = 300, priority: int = 1, namespace: str = None) -> None: | |
"""Set a value in the cache with TTL in seconds""" | |
if self._stats_enabled: | |
start_time = time.time() | |
# Use namespaced key if namespace is provided | |
cache_key = f"{namespace}:{key}" if namespace else key | |
with self._lock: | |
# Create cache item | |
cache_item = CacheItem(cache_key, value, ttl, priority) | |
item_size = cache_item.get_size() | |
# Check if we need to make room | |
if (len(self._cache) >= self._max_items or | |
self._current_size_bytes + item_size > self._max_size_bytes): | |
self._evict_items(item_size) | |
# Update size tracking | |
if cache_key in self._cache: | |
# If replacing, subtract old size first | |
self._current_size_bytes -= self._cache[cache_key].get_size() | |
self._current_size_bytes += item_size | |
# Store the item | |
self._cache[cache_key] = cache_item | |
# Update namespace tracking | |
if namespace: | |
if namespace not in self._namespace_cache: | |
self._namespace_cache[namespace] = set() | |
self._namespace_cache[namespace].add(cache_key) | |
if self._stats_enabled: | |
self._total_set_time += time.time() - start_time | |
def delete(self, key: str, namespace: str = None) -> None: | |
"""Delete a key from the cache""" | |
# Use namespaced key if namespace is provided | |
cache_key = f"{namespace}:{key}" if namespace else key | |
with self._lock: | |
self._remove_item(cache_key, namespace) | |
def _remove_item(self, key: str, namespace: str = None): | |
"""Internal method to remove an item and update tracking""" | |
if key in self._cache: | |
# Update size tracking | |
self._current_size_bytes -= self._cache[key].get_size() | |
# Remove from cache | |
del self._cache[key] | |
# Update namespace tracking | |
if namespace and namespace in self._namespace_cache: | |
if key in self._namespace_cache[namespace]: | |
self._namespace_cache[namespace].remove(key) | |
# Cleanup empty sets | |
if not self._namespace_cache[namespace]: | |
del self._namespace_cache[namespace] | |
def _evict_items(self, needed_space: int = 0) -> None: | |
"""Evict items to make room in the cache""" | |
if not self._cache: | |
return | |
with self._lock: | |
# Convert cache items to a list for sorting | |
items = list(self._cache.values()) | |
# Sort by priority, access count, and last accessed time | |
items.sort() # Uses the __lt__ method of CacheItem | |
# Evict items until we have enough space | |
space_freed = 0 | |
evicted_count = 0 | |
for item in items: | |
# Stop if we've made enough room | |
if (len(self._cache) - evicted_count <= self._max_items * 0.9 and | |
(space_freed >= needed_space or | |
self._current_size_bytes - space_freed <= self._max_size_bytes * 0.9)): | |
break | |
# Skip high priority items unless absolutely necessary | |
if item.priority > 9 and evicted_count < len(items) // 2: | |
continue | |
# Evict this item | |
item_size = item.get_size() | |
namespace = item.key.split(':', 1)[0] if ':' in item.key else None | |
self._remove_item(item.key, namespace) | |
space_freed += item_size | |
evicted_count += 1 | |
if self._stats_enabled: | |
self._evictions += 1 | |
logger.info(f"Cache eviction: removed {evicted_count} items, freed {space_freed / 1024:.2f}KB") | |
def clear(self, namespace: str = None) -> None: | |
""" | |
Clear the cache or a specific namespace | |
""" | |
with self._lock: | |
if namespace: | |
# Clear only keys in the specified namespace | |
if namespace in self._namespace_cache: | |
keys_to_remove = list(self._namespace_cache[namespace]) | |
for key in keys_to_remove: | |
self._remove_item(key, namespace) | |
# The namespace should be auto-cleaned in _remove_item | |
else: | |
# Clear the entire cache | |
self._cache.clear() | |
self._namespace_cache.clear() | |
self._current_size_bytes = 0 | |
logger.info(f"Cache cleared{' for namespace ' + namespace if namespace else ''}") | |
def cleanup(self) -> None: | |
"""Remove expired items and run garbage collection if needed""" | |
with self._lock: | |
now = datetime.now() | |
# Only run if it's been at least cleanup_interval since last cleanup | |
if (now - self._last_cleanup).total_seconds() < self._cleanup_interval: | |
return | |
# Find expired items | |
expired_keys = [] | |
for key, item in self._cache.items(): | |
if item.is_expired(): | |
expired_keys.append((key, key.split(':', 1)[0] if ':' in key else None)) | |
# Remove expired items | |
for key, namespace in expired_keys: | |
self._remove_item(key, namespace) | |
# Update last cleanup time | |
self._last_cleanup = now | |
# Run garbage collection if we removed several items | |
if len(expired_keys) > 100: | |
gc.collect() | |
logger.info(f"Cache cleanup: removed {len(expired_keys)} expired items") | |
def get_stats(self) -> Dict: | |
"""Get cache statistics""" | |
with self._lock: | |
if not self._stats_enabled: | |
return {"stats_enabled": False} | |
# Calculate hit rate | |
total_requests = self._hits + self._misses | |
hit_rate = (self._hits / total_requests) * 100 if total_requests > 0 else 0 | |
# Calculate average times | |
avg_get_time = (self._total_get_time / total_requests) * 1000 if total_requests > 0 else 0 | |
avg_set_time = (self._total_set_time / self._evictions) * 1000 if self._evictions > 0 else 0 | |
return { | |
"stats_enabled": True, | |
"item_count": len(self._cache), | |
"max_items": self._max_items, | |
"size_bytes": self._current_size_bytes, | |
"max_size_bytes": self._max_size_bytes, | |
"hits": self._hits, | |
"misses": self._misses, | |
"hit_rate_percent": round(hit_rate, 2), | |
"evictions": self._evictions, | |
"avg_get_time_ms": round(avg_get_time, 3), | |
"avg_set_time_ms": round(avg_set_time, 3), | |
"namespace_count": len(self._namespace_cache), | |
"namespaces": list(self._namespace_cache.keys()) | |
} | |
def preload(self, items: List[Tuple[str, Any, int, int]], namespace: str = None) -> None: | |
""" | |
Preload a list of items into the cache | |
Args: | |
items: List of (key, value, ttl, priority) tuples | |
namespace: Optional namespace for all items | |
""" | |
for key, value, ttl, priority in items: | |
self.set(key, value, ttl, priority, namespace) | |
logger.info(f"Preloaded {len(items)} items into cache{' namespace ' + namespace if namespace else ''}") | |
def get_or_load(self, key: str, loader_func: Callable[[], Any], | |
ttl: int = 300, priority: int = 1, namespace: str = None) -> Any: | |
""" | |
Get from cache or load using the provided function | |
Args: | |
key: Cache key | |
loader_func: Function to call if cache miss occurs | |
ttl: TTL in seconds | |
priority: Item priority | |
namespace: Optional namespace | |
Returns: | |
Cached or freshly loaded value | |
""" | |
# Try to get from cache first | |
value = self.get(key, namespace) | |
# If not in cache, load it | |
if value is None: | |
value = loader_func() | |
# Only cache if we got a valid value | |
if value is not None: | |
self.set(key, value, ttl, priority, namespace) | |
return value | |
# Load cache configuration from environment variables | |
CACHE_STRATEGY = os.getenv("CACHE_STRATEGY", "mixed") | |
CACHE_MAX_ITEMS = int(os.getenv("CACHE_MAX_ITEMS", "10000")) | |
CACHE_MAX_SIZE_MB = int(os.getenv("CACHE_MAX_SIZE_MB", "100")) | |
CACHE_CLEANUP_INTERVAL = int(os.getenv("CACHE_CLEANUP_INTERVAL", "60")) | |
CACHE_STATS_ENABLED = os.getenv("CACHE_STATS_ENABLED", "true").lower() in ("true", "1", "yes") | |
# Initialize the enhanced cache | |
cache = EnhancedCache( | |
strategy=CACHE_STRATEGY, | |
max_items=CACHE_MAX_ITEMS, | |
max_size_mb=CACHE_MAX_SIZE_MB, | |
cleanup_interval=CACHE_CLEANUP_INTERVAL, | |
stats_enabled=CACHE_STATS_ENABLED | |
) | |
# Backward compatibility for SimpleCache - for a transition period | |
class SimpleCache: | |
def __init__(self): | |
"""Legacy SimpleCache implementation that uses EnhancedCache underneath""" | |
logger.warning("SimpleCache is deprecated, please use EnhancedCache directly") | |
def get(self, key: str) -> Optional[Any]: | |
"""Get value from cache if it exists and hasn't expired""" | |
return cache.get(key) | |
def set(self, key: str, value: Any, ttl: int = 300) -> None: | |
"""Set a value in the cache with TTL in seconds""" | |
cache.set(key, value, ttl) | |
def delete(self, key: str) -> None: | |
"""Delete a key from the cache""" | |
cache.delete(key) | |
def clear(self) -> None: | |
"""Clear the entire cache""" | |
cache.clear() | |
def get_host_url(request) -> str: | |
""" | |
Get the host URL from a request object. | |
""" | |
host = request.headers.get("host", "localhost") | |
scheme = request.headers.get("x-forwarded-proto", "http") | |
return f"{scheme}://{host}" | |
def format_time(timestamp): | |
""" | |
Format a timestamp into a human-readable string. | |
""" | |
return timestamp.strftime("%Y-%m-%d %H:%M:%S") |