import time import xxhash import asyncio from typing import Dict, Any, Optional, Tuple import logging from collections import deque from app.utils.logging import log logger = logging.getLogger("my_logger") import heapq # 定义缓存项的结构 CacheItem = Dict[str, Any] class ResponseCacheManager: """管理API响应缓存的类,一个键可以对应多个缓存项(使用deque)""" def __init__(self, expiry_time: int, max_entries: int, cache_dict: Dict[str, deque[CacheItem]] = None): """ 初始化缓存管理器。 Args: expiry_time (int): 缓存项的过期时间(秒)。 max_entries (int): 缓存中允许的最大总条目数。 cache_dict (Dict[str, deque[CacheItem]], optional): 初始缓存字典。默认为 None。 """ self.cache: Dict[str, deque[CacheItem]] = cache_dict if cache_dict is not None else {} self.expiry_time = expiry_time self.max_entries = max_entries # 总条目数限制 self.cur_cache_num = 0 # 当前条目数 self.lock = asyncio.Lock() # Added lock async def get(self, cache_key: str) -> Tuple[Optional[Any], bool]: # Made async """获取指定键的第一个有效缓存项(不删除)""" now = time.time() async with self.lock: if cache_key in self.cache: cache_deque = self.cache[cache_key] # 查找第一个未过期的项,且不删除 for item in cache_deque: if now < item.get('expiry_time', 0): response = item.get('response',None) return response, True return None, False async def get_and_remove(self, cache_key: str) -> Tuple[Optional[Any], bool]: """获取并删除指定键的第一个有效缓存项。""" now = time.time() async with self.lock: if cache_key in self.cache: cache_deque = self.cache[cache_key] # 查找第一个有效项并收集过期项 valid_item_to_remove = None response_to_return = None new_deque = deque() items_removed_count = 0 for item in cache_deque: if now < item.get('expiry_time', 0): if valid_item_to_remove is None: # 找到第一个有效项 valid_item_to_remove = item response_to_return = item.get('response', None) items_removed_count += 1 # 计数此项为移除 else: new_deque.append(item) # 保留后续有效项 else: items_removed_count += 1 # 计数过期项为移除 # 更新缓存状态 if items_removed_count > 0: self.cur_cache_num = max(0, self.cur_cache_num - items_removed_count) if not new_deque: # 如果所有项都被移除(过期或我们取的那个) del self.cache[cache_key] else: self.cache[cache_key] = new_deque if valid_item_to_remove: return response_to_return, True # 返回找到的有效项 # 如果键不存在或未找到有效项 return None, False async def store(self, cache_key: str, response: Any): """存储响应到缓存(追加到键对应的deque)""" now = time.time() new_item: CacheItem = { 'response': response, 'expiry_time': now + self.expiry_time, 'created_at': now, } needs_cleaning = False async with self.lock: if cache_key not in self.cache: self.cache[cache_key] = deque() self.cache[cache_key].append(new_item) # 追加到deque末尾 self.cur_cache_num += 1 needs_cleaning = self.cur_cache_num > self.max_entries if needs_cleaning: # 在锁外调用清理,避免长时间持有锁 await self.clean_if_needed() async def clean_expired(self): """清理所有缓存项中已过期的项。""" now = time.time() keys_to_remove = [] total_cleaned = 0 async with self.lock: # 迭代 cache 的副本以允许在循环中安全地修改 cache for key, cache_deque in list(self.cache.items()): original_len = len(cache_deque) # 创建一个新的 deque,只包含未过期的项 valid_items = deque(item for item in cache_deque if now < item.get('expiry_time', 0)) cleaned_count = original_len - len(valid_items) if cleaned_count > 0: log('info', f"清理键 {key[:8]}... 的过期缓存项 {cleaned_count} 个。") total_cleaned += cleaned_count if not valid_items: keys_to_remove.append(key) # 标记此键以便稍后删除 # 在持有锁时直接删除键 if key in self.cache: del self.cache[key] log('info', f"缓存键 {key[:8]}... 的所有项均已过期,移除该键。") elif cleaned_count > 0: # 替换为只包含有效项的 deque self.cache[key] = valid_items # 统一更新缓存计数 if total_cleaned > 0: self.cur_cache_num = max(0, self.cur_cache_num - total_cleaned) async def clean_if_needed(self): """如果缓存总条目数超过限制,清理全局最旧的项目。""" async with self.lock: if self.cur_cache_num <= self.max_entries: return # 计算目标大小和需要移除的数量 target_size = max(self.max_entries - 10, 10) if self.cur_cache_num <= target_size: return items_to_remove_count = self.cur_cache_num - target_size log('info', f"缓存总数 {self.cur_cache_num} 超过限制 {self.max_entries},需要清理 {items_to_remove_count} 个") # 收集所有缓存项及其元数据 all_items_meta = [] for key, cache_deque in self.cache.items(): for item in cache_deque: all_items_meta.append({'key': key, 'created_at': item.get('created_at', 0), 'item': item}) # 找出最旧的 N 项 actual_remove_count = min(items_to_remove_count, len(all_items_meta)) if actual_remove_count <= 0: return # 没有项目可移除或无需移除 items_to_remove = heapq.nsmallest(actual_remove_count, all_items_meta, key=lambda x: x['created_at']) # 执行移除 items_actually_removed = 0 keys_potentially_empty = set() for item_meta in items_to_remove: key_to_clean = item_meta['key'] item_to_clean = item_meta['item'] if key_to_clean in self.cache: try: # 直接从 deque 中移除指定的 item 对象 self.cache[key_to_clean].remove(item_to_clean) items_actually_removed += 1 # 计数器在最后统一更新 log('info', f"因容量限制,删除键 {key_to_clean[:8]}... 的旧缓存项 (创建于 {item_meta['created_at']})。") keys_potentially_empty.add(key_to_clean) except (KeyError, ValueError): log('warning', f"尝试因容量限制删除缓存项时未找到 (可能已被提前移除): {key_to_clean[:8]}...") pass # 检查是否有 deque 因本次清理变空 for key in keys_potentially_empty: if key in self.cache and not self.cache[key]: del self.cache[key] log('info', f"因容量限制清理后,键 {key[:8]}... 的deque已空,移除该键。") # 统一更新缓存计数 if items_actually_removed > 0: self.cur_cache_num = max(0, self.cur_cache_num - items_actually_removed) log('info', f"因容量限制,共清理了 {items_actually_removed} 个旧缓存项。清理后缓存数: {self.cur_cache_num}") def generate_cache_key(chat_request, last_n_messages: int = 65536, is_gemini=False) -> str: """ 根据模型名称和最后 N 条消息生成请求的唯一缓存键。 Args: chat_request: 包含模型和消息列表的请求对象 (符合OpenAI格式)。 last_n_messages: 需要包含在缓存键计算中的最后消息的数量。 Returns: 一个代表该请求的唯一缓存键字符串 (xxhash64哈希值)。 """ h = xxhash.xxh64() # 1. 哈希模型名称 h.update(chat_request.model.encode('utf-8')) if last_n_messages <= 0: # 如果不考虑消息,直接返回基于模型的哈希 return h.hexdigest() messages_processed = 0 # 2. 增量哈希最后 N 条消息 (从后往前) if is_gemini: # log('INFO', f"开启增量哈希gemini格式内容") for content_item in reversed(chat_request.payload.contents): if messages_processed >= last_n_messages: break role = content_item.get('role') if role is not None and isinstance(role, str): h.update(b'role:') h.update(role.encode('utf-8')) # log('INFO', f"哈希gemini格式角色{role}") parts = content_item.get('parts', []) if not isinstance(parts, list): parts = [] for part in parts: text_content = part.get('text') if text_content is not None and isinstance(text_content, str): h.update(b'text:') h.update(text_content.encode('utf-8')) # log('INFO', f"哈希gemini格式文本内容{text_content}") inline_data_obj = part.get('inline_data') if inline_data_obj is not None and isinstance(inline_data_obj, dict): h.update(b'inline_data:') data_payload = inline_data_obj.get('data', '') # log('INFO', f"哈希gemini格式非文本内容{data_payload[:32]}") if isinstance(data_payload, str): h.update(b'data_prefix:') h.update(data_payload[:32].encode('utf-8')) file_data_obj = part.get('file_data') if file_data_obj is not None and isinstance(file_data_obj, dict): h.update(b'file_data:') file_uri = file_data_obj.get('file_uri', '') if isinstance(file_uri, str): h.update(b'file_uri:') h.update(file_uri.encode('utf-8')) messages_processed += 1 else : for msg in reversed(chat_request.messages): if messages_processed >= last_n_messages: break # 哈希角色 h.update(b'role:') h.update(msg.get('role', '').encode('utf-8')) # 哈希内容 content = msg.get('content') if isinstance(content, str): h.update(b'text:') h.update(content.encode('utf-8')) elif isinstance(content, list): # 处理图文混合内容 for item in content: item_type = item.get('type') if hasattr(item, 'get') else None if item_type == 'text': text = item.get('text', '') if hasattr(item, 'get') else '' h.update(b'text:') h.update(text.encode('utf-8')) elif item_type == 'image_url': image_url = item.get('image_url', {}) if hasattr(item, 'get') else {} image_data = image_url.get('url', '') if hasattr(image_url, 'get') else '' h.update(b'image_url:') # 加入类型标识符 if image_data.startswith('data:image/'): # 对于base64图像,使用前32字符作为标识符 h.update(image_data[:32].encode('utf-8')) else: h.update(image_data.encode('utf-8')) messages_processed += 1 return h.hexdigest()