Pix-Agent / app /utils /cache.py
Cuong2004's picture
version 1.1
e83f5e9
raw
history blame
11.6 kB
import os
import time
import threading
import logging
from typing import Dict, Any, Optional, Tuple, List, Callable, Generic, TypeVar, Union
from datetime import datetime
from dotenv import load_dotenv
import json
# Thiết lập logging
logger = logging.getLogger(__name__)
# Load biến môi trường
load_dotenv()
# Cấu hình cache từ biến môi trường
DEFAULT_CACHE_TTL = int(os.getenv("CACHE_TTL_SECONDS", "300")) # Mặc định 5 phút
DEFAULT_CACHE_CLEANUP_INTERVAL = int(os.getenv("CACHE_CLEANUP_INTERVAL", "60")) # Mặc định 1 phút
DEFAULT_CACHE_MAX_SIZE = int(os.getenv("CACHE_MAX_SIZE", "1000")) # Mặc định 1000 phần tử
DEFAULT_HISTORY_QUEUE_SIZE = int(os.getenv("HISTORY_QUEUE_SIZE", "10")) # Mặc định queue size là 10
DEFAULT_HISTORY_CACHE_TTL = int(os.getenv("HISTORY_CACHE_TTL", "3600")) # Mặc định 1 giờ
# Generic type để có thể sử dụng cho nhiều loại giá trị khác nhau
T = TypeVar('T')
# Cấu trúc cho một phần tử trong cache
class CacheItem(Generic[T]):
def __init__(self, value: T, ttl: int = DEFAULT_CACHE_TTL):
self.value = value
self.expire_at = time.time() + ttl
self.last_accessed = time.time()
def is_expired(self) -> bool:
"""Kiểm tra xem item có hết hạn chưa"""
return time.time() > self.expire_at
def touch(self) -> None:
"""Cập nhật thời gian truy cập lần cuối"""
self.last_accessed = time.time()
def extend(self, ttl: int = DEFAULT_CACHE_TTL) -> None:
"""Gia hạn thời gian sống của item"""
self.expire_at = time.time() + ttl
# Lớp HistoryQueue để lưu trữ lịch sử người dùng
class HistoryQueue:
def __init__(self, max_size: int = DEFAULT_HISTORY_QUEUE_SIZE, ttl: int = DEFAULT_HISTORY_CACHE_TTL):
self.items: List[Dict[str, Any]] = []
self.max_size = max_size
self.ttl = ttl
self.expire_at = time.time() + ttl
def add(self, item: Dict[str, Any]) -> None:
"""Thêm một item vào queue, nếu đã đầy thì loại bỏ item cũ nhất"""
if len(self.items) >= self.max_size:
self.items.pop(0)
self.items.append(item)
# Mỗi khi thêm item mới, cập nhật thời gian hết hạn
self.refresh_expiry()
def get_all(self) -> List[Dict[str, Any]]:
"""Lấy tất cả items trong queue"""
return self.items
def is_expired(self) -> bool:
"""Kiểm tra xem queue có hết hạn chưa"""
return time.time() > self.expire_at
def refresh_expiry(self) -> None:
"""Làm mới thời gian hết hạn"""
self.expire_at = time.time() + self.ttl
# Lớp cache chính
class InMemoryCache:
def __init__(
self,
ttl: int = DEFAULT_CACHE_TTL,
cleanup_interval: int = DEFAULT_CACHE_CLEANUP_INTERVAL,
max_size: int = DEFAULT_CACHE_MAX_SIZE
):
self.cache: Dict[str, CacheItem] = {}
self.ttl = ttl
self.cleanup_interval = cleanup_interval
self.max_size = max_size
self.user_history_queues: Dict[str, HistoryQueue] = {}
self.lock = threading.RLock() # Sử dụng RLock để tránh deadlock
# Khởi động thread dọn dẹp cache định kỳ (active expiration)
self.cleanup_thread = threading.Thread(target=self._cleanup_task, daemon=True)
self.cleanup_thread.start()
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
"""Lưu một giá trị vào cache"""
with self.lock:
ttl_value = ttl if ttl is not None else self.ttl
# Nếu cache đã đầy, xóa bớt các item ít được truy cập nhất
if len(self.cache) >= self.max_size and key not in self.cache:
self._evict_lru_items()
self.cache[key] = CacheItem(value, ttl_value)
logger.debug(f"Cache set: {key} (expires in {ttl_value}s)")
def get(self, key: str, default: Any = None) -> Any:
"""
Lấy giá trị từ cache. Nếu key không tồn tại hoặc đã hết hạn, trả về giá trị mặc định.
Áp dụng lazy expiration: kiểm tra và xóa các item hết hạn khi truy cập.
"""
with self.lock:
item = self.cache.get(key)
# Nếu không tìm thấy key hoặc item đã hết hạn
if item is None or item.is_expired():
# Nếu item tồn tại nhưng đã hết hạn, xóa nó (lazy expiration)
if item is not None:
logger.debug(f"Cache miss (expired): {key}")
del self.cache[key]
else:
logger.debug(f"Cache miss (not found): {key}")
return default
# Cập nhật thời gian truy cập
item.touch()
logger.debug(f"Cache hit: {key}")
return item.value
def delete(self, key: str) -> bool:
"""Xóa một key khỏi cache"""
with self.lock:
if key in self.cache:
del self.cache[key]
logger.debug(f"Cache delete: {key}")
return True
return False
def clear(self) -> None:
"""Xóa tất cả dữ liệu trong cache"""
with self.lock:
self.cache.clear()
logger.debug("Cache cleared")
def get_or_set(self, key: str, callback: Callable[[], T], ttl: Optional[int] = None) -> T:
"""
Lấy giá trị từ cache nếu tồn tại, nếu không thì gọi callback để lấy giá trị
và lưu vào cache trước khi trả về.
"""
with self.lock:
value = self.get(key)
if value is None:
value = callback()
self.set(key, value, ttl)
return value
def _cleanup_task(self) -> None:
"""Thread để dọn dẹp các item đã hết hạn (active expiration)"""
while True:
time.sleep(self.cleanup_interval)
try:
self._remove_expired_items()
except Exception as e:
logger.error(f"Error in cache cleanup task: {e}")
def _remove_expired_items(self) -> None:
"""Xóa tất cả các item đã hết hạn trong cache"""
with self.lock:
now = time.time()
expired_keys = [k for k, v in self.cache.items() if v.is_expired()]
for key in expired_keys:
del self.cache[key]
# Xóa các user history queue đã hết hạn
expired_user_ids = [uid for uid, queue in self.user_history_queues.items() if queue.is_expired()]
for user_id in expired_user_ids:
del self.user_history_queues[user_id]
if expired_keys or expired_user_ids:
logger.debug(f"Cleaned up {len(expired_keys)} expired cache items and {len(expired_user_ids)} expired history queues")
def _evict_lru_items(self, count: int = 1) -> None:
"""Xóa bỏ các item ít được truy cập nhất khi cache đầy"""
items = sorted(self.cache.items(), key=lambda x: x[1].last_accessed)
for i in range(min(count, len(items))):
del self.cache[items[i][0]]
logger.debug(f"Evicted {min(count, len(items))} least recently used items from cache")
def stats(self) -> Dict[str, Any]:
"""Trả về thống kê về cache"""
with self.lock:
now = time.time()
total_items = len(self.cache)
expired_items = sum(1 for item in self.cache.values() if item.is_expired())
memory_usage = self._estimate_memory_usage()
return {
"total_items": total_items,
"expired_items": expired_items,
"active_items": total_items - expired_items,
"memory_usage_bytes": memory_usage,
"memory_usage_mb": memory_usage / (1024 * 1024),
"max_size": self.max_size,
"history_queues": len(self.user_history_queues)
}
def _estimate_memory_usage(self) -> int:
"""Ước tính dung lượng bộ nhớ của cache (gần đúng)"""
# Ước tính dựa trên kích thước của các key và giá trị
cache_size = sum(len(k) for k in self.cache.keys())
for item in self.cache.values():
try:
# Ước tính kích thước của value (gần đúng)
if isinstance(item.value, (str, bytes)):
cache_size += len(item.value)
elif isinstance(item.value, (dict, list)):
cache_size += len(json.dumps(item.value))
else:
# Giá trị mặc định cho các loại dữ liệu khác
cache_size += 100
except:
cache_size += 100
# Ước tính kích thước của user history queues
for queue in self.user_history_queues.values():
try:
cache_size += len(json.dumps(queue.items)) + 100 # 100 bytes cho metadata
except:
cache_size += 100
return cache_size
# Các phương thức chuyên biệt cho việc quản lý lịch sử người dùng
def add_user_history(self, user_id: str, item: Dict[str, Any], queue_size: Optional[int] = None, ttl: Optional[int] = None) -> None:
"""Thêm một item vào history queue của người dùng"""
with self.lock:
# Tạo queue nếu chưa tồn tại
if user_id not in self.user_history_queues:
queue_size_value = queue_size if queue_size is not None else DEFAULT_HISTORY_QUEUE_SIZE
ttl_value = ttl if ttl is not None else DEFAULT_HISTORY_CACHE_TTL
self.user_history_queues[user_id] = HistoryQueue(max_size=queue_size_value, ttl=ttl_value)
# Thêm item vào queue
self.user_history_queues[user_id].add(item)
logger.debug(f"Added history item for user {user_id}")
def get_user_history(self, user_id: str, default: Any = None) -> List[Dict[str, Any]]:
"""Lấy lịch sử của người dùng từ cache"""
with self.lock:
queue = self.user_history_queues.get(user_id)
# Nếu không tìm thấy queue hoặc queue đã hết hạn
if queue is None or queue.is_expired():
if queue is not None and queue.is_expired():
del self.user_history_queues[user_id]
logger.debug(f"User history queue expired: {user_id}")
return default if default is not None else []
# Làm mới thời gian hết hạn
queue.refresh_expiry()
logger.debug(f"Retrieved history for user {user_id}: {len(queue.items)} items")
return queue.get_all()
# Singleton instance
_cache_instance = None
def get_cache() -> InMemoryCache:
"""Trả về instance singleton của InMemoryCache"""
global _cache_instance
if _cache_instance is None:
_cache_instance = InMemoryCache()
return _cache_instance