Spaces:
Running
Running
File size: 13,046 Bytes
d0dd276 |
|
import time
import xxhash
import asyncio
from typing import Dict, Any, Optional, Tuple
import logging
from collections import deque
from app.utils.logging import log
logger = logging.getLogger("my_logger")
import heapq
# 定义缓存项的结构
CacheItem = Dict[str, Any]
class ResponseCacheManager:
"""管理API响应缓存的类,一个键可以对应多个缓存项(使用deque)"""
def __init__(self, expiry_time: int, max_entries: int,
cache_dict: Dict[str, deque[CacheItem]] = None):
"""
初始化缓存管理器。
Args:
expiry_time (int): 缓存项的过期时间(秒)。
max_entries (int): 缓存中允许的最大总条目数。
cache_dict (Dict[str, deque[CacheItem]], optional): 初始缓存字典。默认为 None。
"""
self.cache: Dict[str, deque[CacheItem]] = cache_dict if cache_dict is not None else {}
self.expiry_time = expiry_time
self.max_entries = max_entries # 总条目数限制
self.cur_cache_num = 0 # 当前条目数
self.lock = asyncio.Lock() # Added lock
async def get(self, cache_key: str) -> Tuple[Optional[Any], bool]: # Made async
"""获取指定键的第一个有效缓存项(不删除)"""
now = time.time()
async with self.lock:
if cache_key in self.cache:
cache_deque = self.cache[cache_key]
# 查找第一个未过期的项,且不删除
for item in cache_deque:
if now < item.get('expiry_time', 0):
response = item.get('response',None)
return response, True
return None, False
async def get_and_remove(self, cache_key: str) -> Tuple[Optional[Any], bool]:
"""获取并删除指定键的第一个有效缓存项。"""
now = time.time()
async with self.lock:
if cache_key in self.cache:
cache_deque = self.cache[cache_key]
# 查找第一个有效项并收集过期项
valid_item_to_remove = None
response_to_return = None
new_deque = deque()
items_removed_count = 0
for item in cache_deque:
if now < item.get('expiry_time', 0):
if valid_item_to_remove is None: # 找到第一个有效项
valid_item_to_remove = item
response_to_return = item.get('response', None)
items_removed_count += 1 # 计数此项为移除
else:
new_deque.append(item) # 保留后续有效项
else:
items_removed_count += 1 # 计数过期项为移除
# 更新缓存状态
if items_removed_count > 0:
self.cur_cache_num = max(0, self.cur_cache_num - items_removed_count)
if not new_deque:
# 如果所有项都被移除(过期或我们取的那个)
del self.cache[cache_key]
else:
self.cache[cache_key] = new_deque
if valid_item_to_remove:
return response_to_return, True # 返回找到的有效项
# 如果键不存在或未找到有效项
return None, False
async def store(self, cache_key: str, response: Any):
"""存储响应到缓存(追加到键对应的deque)"""
now = time.time()
new_item: CacheItem = {
'response': response,
'expiry_time': now + self.expiry_time,
'created_at': now,
}
needs_cleaning = False
async with self.lock:
if cache_key not in self.cache:
self.cache[cache_key] = deque()
self.cache[cache_key].append(new_item) # 追加到deque末尾
self.cur_cache_num += 1
needs_cleaning = self.cur_cache_num > self.max_entries
if needs_cleaning:
# 在锁外调用清理,避免长时间持有锁
await self.clean_if_needed()
async def clean_expired(self):
"""清理所有缓存项中已过期的项。"""
now = time.time()
keys_to_remove = []
total_cleaned = 0
async with self.lock:
# 迭代 cache 的副本以允许在循环中安全地修改 cache
for key, cache_deque in list(self.cache.items()):
original_len = len(cache_deque)
# 创建一个新的 deque,只包含未过期的项
valid_items = deque(item for item in cache_deque if now < item.get('expiry_time', 0))
cleaned_count = original_len - len(valid_items)
if cleaned_count > 0:
log('info', f"清理键 {key[:8]}... 的过期缓存项 {cleaned_count} 个。")
total_cleaned += cleaned_count
if not valid_items:
keys_to_remove.append(key) # 标记此键以便稍后删除
# 在持有锁时直接删除键
if key in self.cache:
del self.cache[key]
log('info', f"缓存键 {key[:8]}... 的所有项均已过期,移除该键。")
elif cleaned_count > 0:
# 替换为只包含有效项的 deque
self.cache[key] = valid_items
# 统一更新缓存计数
if total_cleaned > 0:
self.cur_cache_num = max(0, self.cur_cache_num - total_cleaned)
async def clean_if_needed(self):
"""如果缓存总条目数超过限制,清理全局最旧的项目。"""
async with self.lock:
if self.cur_cache_num <= self.max_entries:
return
# 计算目标大小和需要移除的数量
target_size = max(self.max_entries - 10, 10)
if self.cur_cache_num <= target_size:
return
items_to_remove_count = self.cur_cache_num - target_size
log('info', f"缓存总数 {self.cur_cache_num} 超过限制 {self.max_entries},需要清理 {items_to_remove_count} 个")
# 收集所有缓存项及其元数据
all_items_meta = []
for key, cache_deque in self.cache.items():
for item in cache_deque:
all_items_meta.append({'key': key, 'created_at': item.get('created_at', 0), 'item': item})
# 找出最旧的 N 项
actual_remove_count = min(items_to_remove_count, len(all_items_meta))
if actual_remove_count <= 0:
return # 没有项目可移除或无需移除
items_to_remove = heapq.nsmallest(actual_remove_count, all_items_meta, key=lambda x: x['created_at'])
# 执行移除
items_actually_removed = 0
keys_potentially_empty = set()
for item_meta in items_to_remove:
key_to_clean = item_meta['key']
item_to_clean = item_meta['item']
if key_to_clean in self.cache:
try:
# 直接从 deque 中移除指定的 item 对象
self.cache[key_to_clean].remove(item_to_clean)
items_actually_removed += 1
# 计数器在最后统一更新
log('info', f"因容量限制,删除键 {key_to_clean[:8]}... 的旧缓存项 (创建于 {item_meta['created_at']})。")
keys_potentially_empty.add(key_to_clean)
except (KeyError, ValueError):
log('warning', f"尝试因容量限制删除缓存项时未找到 (可能已被提前移除): {key_to_clean[:8]}...")
pass
# 检查是否有 deque 因本次清理变空
for key in keys_potentially_empty:
if key in self.cache and not self.cache[key]:
del self.cache[key]
log('info', f"因容量限制清理后,键 {key[:8]}... 的deque已空,移除该键。")
# 统一更新缓存计数
if items_actually_removed > 0:
self.cur_cache_num = max(0, self.cur_cache_num - items_actually_removed)
log('info', f"因容量限制,共清理了 {items_actually_removed} 个旧缓存项。清理后缓存数: {self.cur_cache_num}")
def generate_cache_key(chat_request, last_n_messages: int = 65536, is_gemini=False) -> str:
"""
根据模型名称和最后 N 条消息生成请求的唯一缓存键。
Args:
chat_request: 包含模型和消息列表的请求对象 (符合OpenAI格式)。
last_n_messages: 需要包含在缓存键计算中的最后消息的数量。
Returns:
一个代表该请求的唯一缓存键字符串 (xxhash64哈希值)。
"""
h = xxhash.xxh64()
# 1. 哈希模型名称
h.update(chat_request.model.encode('utf-8'))
if last_n_messages <= 0:
# 如果不考虑消息,直接返回基于模型的哈希
return h.hexdigest()
messages_processed = 0
# 2. 增量哈希最后 N 条消息 (从后往前)
if is_gemini:
# log('INFO', f"开启增量哈希gemini格式内容")
for content_item in reversed(chat_request.payload.contents):
if messages_processed >= last_n_messages:
break
role = content_item.get('role')
if role is not None and isinstance(role, str):
h.update(b'role:')
h.update(role.encode('utf-8'))
# log('INFO', f"哈希gemini格式角色{role}")
parts = content_item.get('parts', [])
if not isinstance(parts, list):
parts = []
for part in parts:
text_content = part.get('text')
if text_content is not None and isinstance(text_content, str):
h.update(b'text:')
h.update(text_content.encode('utf-8'))
# log('INFO', f"哈希gemini格式文本内容{text_content}")
inline_data_obj = part.get('inline_data')
if inline_data_obj is not None and isinstance(inline_data_obj, dict):
h.update(b'inline_data:')
data_payload = inline_data_obj.get('data', '')
# log('INFO', f"哈希gemini格式非文本内容{data_payload[:32]}")
if isinstance(data_payload, str):
h.update(b'data_prefix:')
h.update(data_payload[:32].encode('utf-8'))
file_data_obj = part.get('file_data')
if file_data_obj is not None and isinstance(file_data_obj, dict):
h.update(b'file_data:')
file_uri = file_data_obj.get('file_uri', '')
if isinstance(file_uri, str):
h.update(b'file_uri:')
h.update(file_uri.encode('utf-8'))
messages_processed += 1
else :
for msg in reversed(chat_request.messages):
if messages_processed >= last_n_messages:
break
# 哈希角色
h.update(b'role:')
h.update(msg.get('role', '').encode('utf-8'))
# 哈希内容
content = msg.get('content')
if isinstance(content, str):
h.update(b'text:')
h.update(content.encode('utf-8'))
elif isinstance(content, list):
# 处理图文混合内容
for item in content:
item_type = item.get('type') if hasattr(item, 'get') else None
if item_type == 'text':
text = item.get('text', '') if hasattr(item, 'get') else ''
h.update(b'text:')
h.update(text.encode('utf-8'))
elif item_type == 'image_url':
image_url = item.get('image_url', {}) if hasattr(item, 'get') else {}
image_data = image_url.get('url', '') if hasattr(image_url, 'get') else ''
h.update(b'image_url:') # 加入类型标识符
if image_data.startswith('data:image/'):
# 对于base64图像,使用前32字符作为标识符
h.update(image_data[:32].encode('utf-8'))
else:
h.update(image_data.encode('utf-8'))
messages_processed += 1
return h.hexdigest()
|