Spaces:
Running
Running
import random | |
import re | |
import os | |
import logging | |
import asyncio | |
from datetime import datetime, timedelta | |
from apscheduler.schedulers.background import BackgroundScheduler | |
from app.utils.logging import format_log_message | |
import app.config.settings as settings | |
logger = logging.getLogger("my_logger") | |
class APIKeyManager: | |
def __init__(self): | |
self.api_keys = re.findall( | |
r"AIzaSy[a-zA-Z0-9_-]{33}", settings.GEMINI_API_KEYS) | |
# 加载更多 GEMINI_API_KEYS | |
for i in range(1, 99): | |
if keys := os.environ.get(f"GEMINI_API_KEYS_{i}", ""): | |
self.api_keys += re.findall(r"AIzaSy[a-zA-Z0-9_-]{33}", keys) | |
else: | |
break | |
self.key_stack = [] # 初始化密钥栈 | |
self._reset_key_stack() # 初始化时创建随机密钥栈 | |
self.scheduler = BackgroundScheduler() | |
self.scheduler.start() | |
self.lock = asyncio.Lock() # Added lock | |
def _reset_key_stack(self): | |
"""创建并随机化密钥栈""" | |
shuffled_keys = self.api_keys[:] # 创建 api_keys 的副本以避免直接修改原列表 | |
random.shuffle(shuffled_keys) | |
self.key_stack = shuffled_keys | |
async def get_available_key(self): | |
"""从栈顶获取密钥,若栈空则重新生成 | |
实现负载均衡: | |
1. 维护一个随机排序的栈存储apikey | |
2. 每次调用从栈顶取出一个key返回 | |
3. 栈空时重新随机生成栈 | |
4. 确保异步和并发安全 | |
""" | |
async with self.lock: | |
# 如果栈为空,重新生成 | |
if not self.key_stack: | |
self._reset_key_stack() | |
# 从栈顶取出key | |
if self.key_stack: | |
return self.key_stack.pop() | |
# 如果没有可用的API密钥,记录错误 | |
if not self.api_keys: | |
log_msg = format_log_message('ERROR', "没有配置任何 API 密钥!") | |
logger.error(log_msg) | |
log_msg = format_log_message('ERROR', "没有可用的API密钥!") | |
logger.error(log_msg) | |
return None | |
def show_all_keys(self): | |
log_msg = format_log_message('INFO', f"当前可用API key个数: {len(self.api_keys)} ") | |
logger.info(log_msg) | |
for i, api_key in enumerate(self.api_keys): | |
log_msg = format_log_message('INFO', f"API Key{i}: {api_key[:8]}...{api_key[-3:]}") | |
logger.info(log_msg) | |
# def blacklist_key(self, key): | |
# log_msg = format_log_message('WARNING', f"{key[:8]} → 暂时禁用 {self.api_key_blacklist_duration} 秒") | |
# logger.warning(log_msg) | |
# self.api_key_blacklist.add(key) | |
# self.scheduler.add_job(lambda: self.api_key_blacklist.discard(key), 'date', | |
# run_date=datetime.now() + timedelta(seconds=self.api_key_blacklist_duration)) | |
async def test_api_key(api_key: str) -> bool: | |
""" | |
测试 API 密钥是否有效。 | |
""" | |
try: | |
import httpx | |
url = "https://generativelanguage.googleapis.com/v1beta/models?key={}".format(api_key) | |
async with httpx.AsyncClient() as client: | |
response = await client.get(url) | |
response.raise_for_status() | |
return True | |
except Exception: | |
return False | |