File size: 7,706 Bytes
e83f5e9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c8b8c9b
 
e83f5e9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c8b8c9b
e83f5e9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
import os
import time
import threading
import logging
from typing import Dict, Any, Optional, Tuple, List, Callable, Generic, TypeVar, Union
from datetime import datetime
from dotenv import load_dotenv
import json

# Thiết lập logging
logger = logging.getLogger(__name__)

# Load biến môi trường
load_dotenv()

# Cấu hình cache từ biến môi trường
DEFAULT_CACHE_TTL = int(os.getenv("CACHE_TTL_SECONDS", "300"))  # Mặc định 5 phút
DEFAULT_CACHE_CLEANUP_INTERVAL = int(os.getenv("CACHE_CLEANUP_INTERVAL", "60"))  # Mặc định 1 phút
DEFAULT_CACHE_MAX_SIZE = int(os.getenv("CACHE_MAX_SIZE", "1000"))  # Mặc định 1000 phần tử

# Generic type để có thể sử dụng cho nhiều loại giá trị khác nhau
T = TypeVar('T')

# Cấu trúc cho một phần tử trong cache
class CacheItem(Generic[T]):
    def __init__(self, value: T, ttl: int = DEFAULT_CACHE_TTL):
        self.value = value
        self.expire_at = time.time() + ttl
        self.last_accessed = time.time()
    
    def is_expired(self) -> bool:
        """Kiểm tra xem item có hết hạn chưa"""
        return time.time() > self.expire_at
    
    def touch(self) -> None:
        """Cập nhật thời gian truy cập lần cuối"""
        self.last_accessed = time.time()
    
    def extend(self, ttl: int = DEFAULT_CACHE_TTL) -> None:
        """Gia hạn thời gian sống của item"""
        self.expire_at = time.time() + ttl

# Lớp cache chính
class InMemoryCache:
    def __init__(
        self, 
        ttl: int = DEFAULT_CACHE_TTL,
        cleanup_interval: int = DEFAULT_CACHE_CLEANUP_INTERVAL,
        max_size: int = DEFAULT_CACHE_MAX_SIZE
    ):
        self.cache: Dict[str, CacheItem] = {}
        self.ttl = ttl
        self.cleanup_interval = cleanup_interval
        self.max_size = max_size
        self.lock = threading.RLock()  # Sử dụng RLock để tránh deadlock
        
        # Khởi động thread dọn dẹp cache định kỳ (active expiration)
        self.cleanup_thread = threading.Thread(target=self._cleanup_task, daemon=True)
        self.cleanup_thread.start()
    
    def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
        """Lưu một giá trị vào cache"""
        with self.lock:
            ttl_value = ttl if ttl is not None else self.ttl
            
            # Nếu cache đã đầy, xóa bớt các item ít được truy cập nhất
            if len(self.cache) >= self.max_size and key not in self.cache:
                self._evict_lru_items()
                
            self.cache[key] = CacheItem(value, ttl_value)
            logger.debug(f"Cache set: {key} (expires in {ttl_value}s)")
    
    def get(self, key: str, default: Any = None) -> Any:
        """
        Lấy giá trị từ cache. Nếu key không tồn tại hoặc đã hết hạn, trả về giá trị mặc định.
        Áp dụng lazy expiration: kiểm tra và xóa các item hết hạn khi truy cập.
        """
        with self.lock:
            item = self.cache.get(key)
            
            # Nếu không tìm thấy key hoặc item đã hết hạn
            if item is None or item.is_expired():
                # Nếu item tồn tại nhưng đã hết hạn, xóa nó (lazy expiration)
                if item is not None:
                    logger.debug(f"Cache miss (expired): {key}")
                    del self.cache[key]
                else:
                    logger.debug(f"Cache miss (not found): {key}")
                return default
            
            # Cập nhật thời gian truy cập
            item.touch()
            logger.debug(f"Cache hit: {key}")
            return item.value
    
    def delete(self, key: str) -> bool:
        """Xóa một key khỏi cache"""
        with self.lock:
            if key in self.cache:
                del self.cache[key]
                logger.debug(f"Cache delete: {key}")
                return True
            return False
    
    def clear(self) -> None:
        """Xóa tất cả dữ liệu trong cache"""
        with self.lock:
            self.cache.clear()
            logger.debug("Cache cleared")
    
    def get_or_set(self, key: str, callback: Callable[[], T], ttl: Optional[int] = None) -> T:
        """
        Lấy giá trị từ cache nếu tồn tại, nếu không thì gọi callback để lấy giá trị
        và lưu vào cache trước khi trả về.
        """
        with self.lock:
            value = self.get(key)
            if value is None:
                value = callback()
                self.set(key, value, ttl)
            return value
    
    def _cleanup_task(self) -> None:
        """Thread để dọn dẹp các item đã hết hạn (active expiration)"""
        while True:
            time.sleep(self.cleanup_interval)
            try:
                self._remove_expired_items()
            except Exception as e:
                logger.error(f"Error in cache cleanup task: {e}")
    
    def _remove_expired_items(self) -> None:
        """Xóa tất cả các item đã hết hạn trong cache"""
        with self.lock:
            now = time.time()
            expired_keys = [k for k, v in self.cache.items() if v.is_expired()]
            for key in expired_keys:
                del self.cache[key]
            
            if expired_keys:
                logger.debug(f"Cleaned up {len(expired_keys)} expired cache items")
    
    def _evict_lru_items(self, count: int = 1) -> None:
        """Xóa bỏ các item ít được truy cập nhất khi cache đầy"""
        items = sorted(self.cache.items(), key=lambda x: x[1].last_accessed)
        for i in range(min(count, len(items))):
            del self.cache[items[i][0]]
        logger.debug(f"Evicted {min(count, len(items))} least recently used items from cache")
    
    def stats(self) -> Dict[str, Any]:
        """Trả về thống kê về cache"""
        with self.lock:
            now = time.time()
            total_items = len(self.cache)
            expired_items = sum(1 for item in self.cache.values() if item.is_expired())
            memory_usage = self._estimate_memory_usage()
            return {
                "total_items": total_items,
                "expired_items": expired_items,
                "active_items": total_items - expired_items,
                "memory_usage_bytes": memory_usage,
                "memory_usage_mb": memory_usage / (1024 * 1024),
                "max_size": self.max_size
            }
    
    def _estimate_memory_usage(self) -> int:
        """Ước tính dung lượng bộ nhớ của cache (gần đúng)"""
        # Ước tính dựa trên kích thước của các key và giá trị
        cache_size = sum(len(k) for k in self.cache.keys())
        for item in self.cache.values():
            try:
                # Ước tính kích thước của value (gần đúng)
                if isinstance(item.value, (str, bytes)):
                    cache_size += len(item.value)
                elif isinstance(item.value, (dict, list)):
                    cache_size += len(json.dumps(item.value))
                else:
                    # Giá trị mặc định cho các loại dữ liệu khác
                    cache_size += 100
            except:
                cache_size += 100
        
        return cache_size

# Singleton instance
_cache_instance = None

def get_cache() -> InMemoryCache:
    """Trả về instance singleton của InMemoryCache"""
    global _cache_instance
    if _cache_instance is None:
        _cache_instance = InMemoryCache()
    return _cache_instance