import random import re import os import logging import asyncio from datetime import datetime, timedelta from apscheduler.schedulers.background import BackgroundScheduler from app.utils.logging import format_log_message import app.config.settings as settings logger = logging.getLogger("my_logger") class APIKeyManager: def __init__(self): self.api_keys = re.findall( r"AIzaSy[a-zA-Z0-9_-]{33}", settings.GEMINI_API_KEYS) # 加载更多 GEMINI_API_KEYS for i in range(1, 99): if keys := os.environ.get(f"GEMINI_API_KEYS_{i}", ""): self.api_keys += re.findall(r"AIzaSy[a-zA-Z0-9_-]{33}", keys) else: break self.key_stack = [] # 初始化密钥栈 self._reset_key_stack() # 初始化时创建随机密钥栈 self.scheduler = BackgroundScheduler() self.scheduler.start() self.lock = asyncio.Lock() # Added lock def _reset_key_stack(self): """创建并随机化密钥栈""" shuffled_keys = self.api_keys[:] # 创建 api_keys 的副本以避免直接修改原列表 random.shuffle(shuffled_keys) self.key_stack = shuffled_keys async def get_available_key(self): """从栈顶获取密钥,若栈空则重新生成 实现负载均衡: 1. 维护一个随机排序的栈存储apikey 2. 每次调用从栈顶取出一个key返回 3. 栈空时重新随机生成栈 4. 确保异步和并发安全 """ async with self.lock: # 如果栈为空,重新生成 if not self.key_stack: self._reset_key_stack() # 从栈顶取出key if self.key_stack: return self.key_stack.pop() # 如果没有可用的API密钥,记录错误 if not self.api_keys: log_msg = format_log_message('ERROR', "没有配置任何 API 密钥!") logger.error(log_msg) log_msg = format_log_message('ERROR', "没有可用的API密钥!") logger.error(log_msg) return None def show_all_keys(self): log_msg = format_log_message('INFO', f"当前可用API key个数: {len(self.api_keys)} ") logger.info(log_msg) for i, api_key in enumerate(self.api_keys): log_msg = format_log_message('INFO', f"API Key{i}: {api_key[:8]}...{api_key[-3:]}") logger.info(log_msg) # def blacklist_key(self, key): # log_msg = format_log_message('WARNING', f"{key[:8]} → 暂时禁用 {self.api_key_blacklist_duration} 秒") # logger.warning(log_msg) # self.api_key_blacklist.add(key) # self.scheduler.add_job(lambda: self.api_key_blacklist.discard(key), 'date', # run_date=datetime.now() + timedelta(seconds=self.api_key_blacklist_duration)) async def test_api_key(api_key: str) -> bool: """ 测试 API 密钥是否有效。 """ try: import httpx url = "https://generativelanguage.googleapis.com/v1beta/models?key={}".format(api_key) async with httpx.AsyncClient() as client: response = await client.get(url) response.raise_for_status() return True except Exception: return False