File size: 7,706 Bytes
e83f5e9 c8b8c9b e83f5e9 c8b8c9b e83f5e9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
import os
import time
import threading
import logging
from typing import Dict, Any, Optional, Tuple, List, Callable, Generic, TypeVar, Union
from datetime import datetime
from dotenv import load_dotenv
import json
# Thiết lập logging
logger = logging.getLogger(__name__)
# Load biến môi trường
load_dotenv()
# Cấu hình cache từ biến môi trường
DEFAULT_CACHE_TTL = int(os.getenv("CACHE_TTL_SECONDS", "300")) # Mặc định 5 phút
DEFAULT_CACHE_CLEANUP_INTERVAL = int(os.getenv("CACHE_CLEANUP_INTERVAL", "60")) # Mặc định 1 phút
DEFAULT_CACHE_MAX_SIZE = int(os.getenv("CACHE_MAX_SIZE", "1000")) # Mặc định 1000 phần tử
# Generic type để có thể sử dụng cho nhiều loại giá trị khác nhau
T = TypeVar('T')
# Cấu trúc cho một phần tử trong cache
class CacheItem(Generic[T]):
def __init__(self, value: T, ttl: int = DEFAULT_CACHE_TTL):
self.value = value
self.expire_at = time.time() + ttl
self.last_accessed = time.time()
def is_expired(self) -> bool:
"""Kiểm tra xem item có hết hạn chưa"""
return time.time() > self.expire_at
def touch(self) -> None:
"""Cập nhật thời gian truy cập lần cuối"""
self.last_accessed = time.time()
def extend(self, ttl: int = DEFAULT_CACHE_TTL) -> None:
"""Gia hạn thời gian sống của item"""
self.expire_at = time.time() + ttl
# Lớp cache chính
class InMemoryCache:
def __init__(
self,
ttl: int = DEFAULT_CACHE_TTL,
cleanup_interval: int = DEFAULT_CACHE_CLEANUP_INTERVAL,
max_size: int = DEFAULT_CACHE_MAX_SIZE
):
self.cache: Dict[str, CacheItem] = {}
self.ttl = ttl
self.cleanup_interval = cleanup_interval
self.max_size = max_size
self.lock = threading.RLock() # Sử dụng RLock để tránh deadlock
# Khởi động thread dọn dẹp cache định kỳ (active expiration)
self.cleanup_thread = threading.Thread(target=self._cleanup_task, daemon=True)
self.cleanup_thread.start()
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
"""Lưu một giá trị vào cache"""
with self.lock:
ttl_value = ttl if ttl is not None else self.ttl
# Nếu cache đã đầy, xóa bớt các item ít được truy cập nhất
if len(self.cache) >= self.max_size and key not in self.cache:
self._evict_lru_items()
self.cache[key] = CacheItem(value, ttl_value)
logger.debug(f"Cache set: {key} (expires in {ttl_value}s)")
def get(self, key: str, default: Any = None) -> Any:
"""
Lấy giá trị từ cache. Nếu key không tồn tại hoặc đã hết hạn, trả về giá trị mặc định.
Áp dụng lazy expiration: kiểm tra và xóa các item hết hạn khi truy cập.
"""
with self.lock:
item = self.cache.get(key)
# Nếu không tìm thấy key hoặc item đã hết hạn
if item is None or item.is_expired():
# Nếu item tồn tại nhưng đã hết hạn, xóa nó (lazy expiration)
if item is not None:
logger.debug(f"Cache miss (expired): {key}")
del self.cache[key]
else:
logger.debug(f"Cache miss (not found): {key}")
return default
# Cập nhật thời gian truy cập
item.touch()
logger.debug(f"Cache hit: {key}")
return item.value
def delete(self, key: str) -> bool:
"""Xóa một key khỏi cache"""
with self.lock:
if key in self.cache:
del self.cache[key]
logger.debug(f"Cache delete: {key}")
return True
return False
def clear(self) -> None:
"""Xóa tất cả dữ liệu trong cache"""
with self.lock:
self.cache.clear()
logger.debug("Cache cleared")
def get_or_set(self, key: str, callback: Callable[[], T], ttl: Optional[int] = None) -> T:
"""
Lấy giá trị từ cache nếu tồn tại, nếu không thì gọi callback để lấy giá trị
và lưu vào cache trước khi trả về.
"""
with self.lock:
value = self.get(key)
if value is None:
value = callback()
self.set(key, value, ttl)
return value
def _cleanup_task(self) -> None:
"""Thread để dọn dẹp các item đã hết hạn (active expiration)"""
while True:
time.sleep(self.cleanup_interval)
try:
self._remove_expired_items()
except Exception as e:
logger.error(f"Error in cache cleanup task: {e}")
def _remove_expired_items(self) -> None:
"""Xóa tất cả các item đã hết hạn trong cache"""
with self.lock:
now = time.time()
expired_keys = [k for k, v in self.cache.items() if v.is_expired()]
for key in expired_keys:
del self.cache[key]
if expired_keys:
logger.debug(f"Cleaned up {len(expired_keys)} expired cache items")
def _evict_lru_items(self, count: int = 1) -> None:
"""Xóa bỏ các item ít được truy cập nhất khi cache đầy"""
items = sorted(self.cache.items(), key=lambda x: x[1].last_accessed)
for i in range(min(count, len(items))):
del self.cache[items[i][0]]
logger.debug(f"Evicted {min(count, len(items))} least recently used items from cache")
def stats(self) -> Dict[str, Any]:
"""Trả về thống kê về cache"""
with self.lock:
now = time.time()
total_items = len(self.cache)
expired_items = sum(1 for item in self.cache.values() if item.is_expired())
memory_usage = self._estimate_memory_usage()
return {
"total_items": total_items,
"expired_items": expired_items,
"active_items": total_items - expired_items,
"memory_usage_bytes": memory_usage,
"memory_usage_mb": memory_usage / (1024 * 1024),
"max_size": self.max_size
}
def _estimate_memory_usage(self) -> int:
"""Ước tính dung lượng bộ nhớ của cache (gần đúng)"""
# Ước tính dựa trên kích thước của các key và giá trị
cache_size = sum(len(k) for k in self.cache.keys())
for item in self.cache.values():
try:
# Ước tính kích thước của value (gần đúng)
if isinstance(item.value, (str, bytes)):
cache_size += len(item.value)
elif isinstance(item.value, (dict, list)):
cache_size += len(json.dumps(item.value))
else:
# Giá trị mặc định cho các loại dữ liệu khác
cache_size += 100
except:
cache_size += 100
return cache_size
# Singleton instance
_cache_instance = None
def get_cache() -> InMemoryCache:
"""Trả về instance singleton của InMemoryCache"""
global _cache_instance
if _cache_instance is None:
_cache_instance = InMemoryCache()
return _cache_instance |