|
import logging |
|
import time |
|
import uuid |
|
import threading |
|
import os |
|
from functools import wraps |
|
from datetime import datetime, timedelta |
|
import pytz |
|
from typing import Callable, Any, Dict, Optional, List, Tuple, Set |
|
import gc |
|
import heapq |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
asia_tz = pytz.timezone('Asia/Ho_Chi_Minh') |
|
|
|
def generate_uuid(): |
|
"""Generate a unique identifier""" |
|
return str(uuid.uuid4()) |
|
|
|
def get_current_time(): |
|
"""Get current time in ISO format""" |
|
return datetime.now().isoformat() |
|
|
|
def get_local_time(): |
|
"""Get current time in Asia/Ho_Chi_Minh timezone""" |
|
return datetime.now(asia_tz).strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
def get_local_datetime(): |
|
"""Get current datetime object in Asia/Ho_Chi_Minh timezone""" |
|
return datetime.now(asia_tz) |
|
|
|
|
|
get_vietnam_time = get_local_time |
|
get_vietnam_datetime = get_local_datetime |
|
|
|
def timer_decorator(func: Callable) -> Callable: |
|
""" |
|
Decorator to time function execution and log results. |
|
""" |
|
@wraps(func) |
|
async def wrapper(*args, **kwargs): |
|
start_time = time.time() |
|
try: |
|
result = await func(*args, **kwargs) |
|
elapsed_time = time.time() - start_time |
|
logger.info(f"Function {func.__name__} executed in {elapsed_time:.4f} seconds") |
|
return result |
|
except Exception as e: |
|
elapsed_time = time.time() - start_time |
|
logger.error(f"Function {func.__name__} failed after {elapsed_time:.4f} seconds: {e}") |
|
raise |
|
return wrapper |
|
|
|
def sanitize_input(text): |
|
"""Sanitize input text""" |
|
if not text: |
|
return "" |
|
|
|
return text.strip() |
|
|
|
def truncate_text(text, max_length=100): |
|
""" |
|
Truncate text to given max length and add ellipsis. |
|
""" |
|
if not text or len(text) <= max_length: |
|
return text |
|
return text[:max_length] + "..." |
|
|
|
class CacheStrategy: |
|
"""Cache loading strategy enumeration""" |
|
LAZY = "lazy" |
|
EAGER = "eager" |
|
MIXED = "mixed" |
|
|
|
class CacheItem: |
|
"""Represents an item in the cache with metadata""" |
|
def __init__(self, key: str, value: Any, ttl: int = 300, priority: int = 1): |
|
self.key = key |
|
self.value = value |
|
self.expiry = datetime.now() + timedelta(seconds=ttl) |
|
self.priority = priority |
|
self.access_count = 0 |
|
self.last_accessed = datetime.now() |
|
|
|
def is_expired(self) -> bool: |
|
"""Check if the item is expired""" |
|
return datetime.now() > self.expiry |
|
|
|
def touch(self): |
|
"""Update last accessed time and access count""" |
|
self.last_accessed = datetime.now() |
|
self.access_count += 1 |
|
|
|
def __lt__(self, other): |
|
"""For heap comparisons - lower priority items are evicted first""" |
|
|
|
if self.priority != other.priority: |
|
return self.priority < other.priority |
|
|
|
if self.access_count != other.access_count: |
|
return self.access_count < other.access_count |
|
|
|
return self.last_accessed < other.last_accessed |
|
|
|
def get_size(self) -> int: |
|
"""Approximate memory size of the cache item in bytes""" |
|
try: |
|
import sys |
|
return sys.getsizeof(self.value) + sys.getsizeof(self.key) + 64 |
|
except: |
|
|
|
return 1024 |
|
|
|
|
|
class EnhancedCache: |
|
def __init__(self, |
|
strategy: str = "lazy", |
|
max_items: int = 10000, |
|
max_size_mb: int = 100, |
|
cleanup_interval: int = 60, |
|
stats_enabled: bool = True): |
|
""" |
|
Initialize enhanced cache with configurable strategy. |
|
|
|
Args: |
|
strategy: Cache loading strategy (lazy, eager, mixed) |
|
max_items: Maximum number of items to store in cache |
|
max_size_mb: Maximum size of cache in MB |
|
cleanup_interval: Interval in seconds to run cleanup |
|
stats_enabled: Whether to collect cache statistics |
|
""" |
|
self._cache: Dict[str, CacheItem] = {} |
|
self._namespace_cache: Dict[str, Set[str]] = {} |
|
self._strategy = strategy |
|
self._max_items = max_items |
|
self._max_size_bytes = max_size_mb * 1024 * 1024 |
|
self._current_size_bytes = 0 |
|
self._stats_enabled = stats_enabled |
|
|
|
|
|
self._hits = 0 |
|
self._misses = 0 |
|
self._evictions = 0 |
|
self._total_get_time = 0 |
|
self._total_set_time = 0 |
|
|
|
|
|
self._last_cleanup = datetime.now() |
|
self._cleanup_interval = cleanup_interval |
|
self._lock = threading.RLock() |
|
|
|
if cleanup_interval > 0: |
|
self._start_cleanup_thread(cleanup_interval) |
|
|
|
logger.info(f"Enhanced cache initialized with strategy={strategy}, max_items={max_items}, max_size={max_size_mb}MB") |
|
|
|
def _start_cleanup_thread(self, interval: int): |
|
"""Start background thread for periodic cleanup""" |
|
def cleanup_worker(): |
|
while True: |
|
time.sleep(interval) |
|
try: |
|
self.cleanup() |
|
except Exception as e: |
|
logger.error(f"Error in cache cleanup: {e}") |
|
|
|
thread = threading.Thread(target=cleanup_worker, daemon=True) |
|
thread.start() |
|
logger.info(f"Cache cleanup thread started with interval {interval}s") |
|
|
|
def get(self, key: str, namespace: str = None) -> Optional[Any]: |
|
"""Get value from cache if it exists and hasn't expired""" |
|
if self._stats_enabled: |
|
start_time = time.time() |
|
|
|
|
|
cache_key = f"{namespace}:{key}" if namespace else key |
|
|
|
with self._lock: |
|
cache_item = self._cache.get(cache_key) |
|
|
|
if cache_item: |
|
if cache_item.is_expired(): |
|
|
|
self._remove_item(cache_key, namespace) |
|
if self._stats_enabled: |
|
self._misses += 1 |
|
value = None |
|
else: |
|
|
|
cache_item.touch() |
|
if self._stats_enabled: |
|
self._hits += 1 |
|
value = cache_item.value |
|
else: |
|
if self._stats_enabled: |
|
self._misses += 1 |
|
value = None |
|
|
|
if self._stats_enabled: |
|
self._total_get_time += time.time() - start_time |
|
|
|
return value |
|
|
|
def set(self, key: str, value: Any, ttl: int = 300, priority: int = 1, namespace: str = None) -> None: |
|
"""Set a value in the cache with TTL in seconds""" |
|
if self._stats_enabled: |
|
start_time = time.time() |
|
|
|
|
|
cache_key = f"{namespace}:{key}" if namespace else key |
|
|
|
with self._lock: |
|
|
|
cache_item = CacheItem(cache_key, value, ttl, priority) |
|
item_size = cache_item.get_size() |
|
|
|
|
|
if (len(self._cache) >= self._max_items or |
|
self._current_size_bytes + item_size > self._max_size_bytes): |
|
self._evict_items(item_size) |
|
|
|
|
|
if cache_key in self._cache: |
|
|
|
self._current_size_bytes -= self._cache[cache_key].get_size() |
|
self._current_size_bytes += item_size |
|
|
|
|
|
self._cache[cache_key] = cache_item |
|
|
|
|
|
if namespace: |
|
if namespace not in self._namespace_cache: |
|
self._namespace_cache[namespace] = set() |
|
self._namespace_cache[namespace].add(cache_key) |
|
|
|
if self._stats_enabled: |
|
self._total_set_time += time.time() - start_time |
|
|
|
def delete(self, key: str, namespace: str = None) -> None: |
|
"""Delete a key from the cache""" |
|
|
|
cache_key = f"{namespace}:{key}" if namespace else key |
|
|
|
with self._lock: |
|
self._remove_item(cache_key, namespace) |
|
|
|
def _remove_item(self, key: str, namespace: str = None): |
|
"""Internal method to remove an item and update tracking""" |
|
if key in self._cache: |
|
|
|
self._current_size_bytes -= self._cache[key].get_size() |
|
|
|
del self._cache[key] |
|
|
|
|
|
if namespace and namespace in self._namespace_cache: |
|
if key in self._namespace_cache[namespace]: |
|
self._namespace_cache[namespace].remove(key) |
|
|
|
if not self._namespace_cache[namespace]: |
|
del self._namespace_cache[namespace] |
|
|
|
def _evict_items(self, needed_space: int = 0) -> None: |
|
"""Evict items to make room in the cache""" |
|
if not self._cache: |
|
return |
|
|
|
with self._lock: |
|
|
|
items = list(self._cache.values()) |
|
|
|
|
|
items.sort() |
|
|
|
|
|
space_freed = 0 |
|
evicted_count = 0 |
|
|
|
for item in items: |
|
|
|
if (len(self._cache) - evicted_count <= self._max_items * 0.9 and |
|
(space_freed >= needed_space or |
|
self._current_size_bytes - space_freed <= self._max_size_bytes * 0.9)): |
|
break |
|
|
|
|
|
if item.priority > 9 and evicted_count < len(items) // 2: |
|
continue |
|
|
|
|
|
item_size = item.get_size() |
|
namespace = item.key.split(':', 1)[0] if ':' in item.key else None |
|
self._remove_item(item.key, namespace) |
|
|
|
space_freed += item_size |
|
evicted_count += 1 |
|
if self._stats_enabled: |
|
self._evictions += 1 |
|
|
|
logger.info(f"Cache eviction: removed {evicted_count} items, freed {space_freed / 1024:.2f}KB") |
|
|
|
def clear(self, namespace: str = None) -> None: |
|
""" |
|
Clear the cache or a specific namespace |
|
""" |
|
with self._lock: |
|
if namespace: |
|
|
|
if namespace in self._namespace_cache: |
|
keys_to_remove = list(self._namespace_cache[namespace]) |
|
for key in keys_to_remove: |
|
self._remove_item(key, namespace) |
|
|
|
else: |
|
|
|
self._cache.clear() |
|
self._namespace_cache.clear() |
|
self._current_size_bytes = 0 |
|
|
|
logger.info(f"Cache cleared{' for namespace ' + namespace if namespace else ''}") |
|
|
|
def cleanup(self) -> None: |
|
"""Remove expired items and run garbage collection if needed""" |
|
with self._lock: |
|
now = datetime.now() |
|
|
|
if (now - self._last_cleanup).total_seconds() < self._cleanup_interval: |
|
return |
|
|
|
|
|
expired_keys = [] |
|
for key, item in self._cache.items(): |
|
if item.is_expired(): |
|
expired_keys.append((key, key.split(':', 1)[0] if ':' in key else None)) |
|
|
|
|
|
for key, namespace in expired_keys: |
|
self._remove_item(key, namespace) |
|
|
|
|
|
self._last_cleanup = now |
|
|
|
|
|
if len(expired_keys) > 100: |
|
gc.collect() |
|
|
|
logger.info(f"Cache cleanup: removed {len(expired_keys)} expired items") |
|
|
|
def get_stats(self) -> Dict: |
|
"""Get cache statistics""" |
|
with self._lock: |
|
if not self._stats_enabled: |
|
return {"stats_enabled": False} |
|
|
|
|
|
total_requests = self._hits + self._misses |
|
hit_rate = (self._hits / total_requests) * 100 if total_requests > 0 else 0 |
|
|
|
|
|
avg_get_time = (self._total_get_time / total_requests) * 1000 if total_requests > 0 else 0 |
|
avg_set_time = (self._total_set_time / self._evictions) * 1000 if self._evictions > 0 else 0 |
|
|
|
return { |
|
"stats_enabled": True, |
|
"item_count": len(self._cache), |
|
"max_items": self._max_items, |
|
"size_bytes": self._current_size_bytes, |
|
"max_size_bytes": self._max_size_bytes, |
|
"hits": self._hits, |
|
"misses": self._misses, |
|
"hit_rate_percent": round(hit_rate, 2), |
|
"evictions": self._evictions, |
|
"avg_get_time_ms": round(avg_get_time, 3), |
|
"avg_set_time_ms": round(avg_set_time, 3), |
|
"namespace_count": len(self._namespace_cache), |
|
"namespaces": list(self._namespace_cache.keys()) |
|
} |
|
|
|
def preload(self, items: List[Tuple[str, Any, int, int]], namespace: str = None) -> None: |
|
""" |
|
Preload a list of items into the cache |
|
|
|
Args: |
|
items: List of (key, value, ttl, priority) tuples |
|
namespace: Optional namespace for all items |
|
""" |
|
for key, value, ttl, priority in items: |
|
self.set(key, value, ttl, priority, namespace) |
|
|
|
logger.info(f"Preloaded {len(items)} items into cache{' namespace ' + namespace if namespace else ''}") |
|
|
|
def get_or_load(self, key: str, loader_func: Callable[[], Any], |
|
ttl: int = 300, priority: int = 1, namespace: str = None) -> Any: |
|
""" |
|
Get from cache or load using the provided function |
|
|
|
Args: |
|
key: Cache key |
|
loader_func: Function to call if cache miss occurs |
|
ttl: TTL in seconds |
|
priority: Item priority |
|
namespace: Optional namespace |
|
|
|
Returns: |
|
Cached or freshly loaded value |
|
""" |
|
|
|
value = self.get(key, namespace) |
|
|
|
|
|
if value is None: |
|
value = loader_func() |
|
|
|
if value is not None: |
|
self.set(key, value, ttl, priority, namespace) |
|
|
|
return value |
|
|
|
|
|
CACHE_STRATEGY = os.getenv("CACHE_STRATEGY", "mixed") |
|
CACHE_MAX_ITEMS = int(os.getenv("CACHE_MAX_ITEMS", "10000")) |
|
CACHE_MAX_SIZE_MB = int(os.getenv("CACHE_MAX_SIZE_MB", "100")) |
|
CACHE_CLEANUP_INTERVAL = int(os.getenv("CACHE_CLEANUP_INTERVAL", "60")) |
|
CACHE_STATS_ENABLED = os.getenv("CACHE_STATS_ENABLED", "true").lower() in ("true", "1", "yes") |
|
|
|
|
|
cache = EnhancedCache( |
|
strategy=CACHE_STRATEGY, |
|
max_items=CACHE_MAX_ITEMS, |
|
max_size_mb=CACHE_MAX_SIZE_MB, |
|
cleanup_interval=CACHE_CLEANUP_INTERVAL, |
|
stats_enabled=CACHE_STATS_ENABLED |
|
) |
|
|
|
|
|
class SimpleCache: |
|
def __init__(self): |
|
"""Legacy SimpleCache implementation that uses EnhancedCache underneath""" |
|
logger.warning("SimpleCache is deprecated, please use EnhancedCache directly") |
|
|
|
def get(self, key: str) -> Optional[Any]: |
|
"""Get value from cache if it exists and hasn't expired""" |
|
return cache.get(key) |
|
|
|
def set(self, key: str, value: Any, ttl: int = 300) -> None: |
|
"""Set a value in the cache with TTL in seconds""" |
|
cache.set(key, value, ttl) |
|
|
|
def delete(self, key: str) -> None: |
|
"""Delete a key from the cache""" |
|
cache.delete(key) |
|
|
|
def clear(self) -> None: |
|
"""Clear the entire cache""" |
|
cache.clear() |
|
|
|
def get_host_url(request) -> str: |
|
""" |
|
Get the host URL from a request object. |
|
""" |
|
host = request.headers.get("host", "localhost") |
|
scheme = request.headers.get("x-forwarded-proto", "http") |
|
return f"{scheme}://{host}" |
|
|
|
def format_time(timestamp): |
|
""" |
|
Format a timestamp into a human-readable string. |
|
""" |
|
return timestamp.strftime("%Y-%m-%d %H:%M:%S") |