Spaces:
Running
Running
File size: 3,377 Bytes
d0dd276 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
import random
import re
import os
import logging
import asyncio
from datetime import datetime, timedelta
from apscheduler.schedulers.background import BackgroundScheduler
from app.utils.logging import format_log_message
import app.config.settings as settings
logger = logging.getLogger("my_logger")
class APIKeyManager:
def __init__(self):
self.api_keys = re.findall(
r"AIzaSy[a-zA-Z0-9_-]{33}", settings.GEMINI_API_KEYS)
# 加载更多 GEMINI_API_KEYS
for i in range(1, 99):
if keys := os.environ.get(f"GEMINI_API_KEYS_{i}", ""):
self.api_keys += re.findall(r"AIzaSy[a-zA-Z0-9_-]{33}", keys)
else:
break
self.key_stack = [] # 初始化密钥栈
self._reset_key_stack() # 初始化时创建随机密钥栈
self.scheduler = BackgroundScheduler()
self.scheduler.start()
self.lock = asyncio.Lock() # Added lock
def _reset_key_stack(self):
"""创建并随机化密钥栈"""
shuffled_keys = self.api_keys[:] # 创建 api_keys 的副本以避免直接修改原列表
random.shuffle(shuffled_keys)
self.key_stack = shuffled_keys
async def get_available_key(self):
"""从栈顶获取密钥,若栈空则重新生成
实现负载均衡:
1. 维护一个随机排序的栈存储apikey
2. 每次调用从栈顶取出一个key返回
3. 栈空时重新随机生成栈
4. 确保异步和并发安全
"""
async with self.lock:
# 如果栈为空,重新生成
if not self.key_stack:
self._reset_key_stack()
# 从栈顶取出key
if self.key_stack:
return self.key_stack.pop()
# 如果没有可用的API密钥,记录错误
if not self.api_keys:
log_msg = format_log_message('ERROR', "没有配置任何 API 密钥!")
logger.error(log_msg)
log_msg = format_log_message('ERROR', "没有可用的API密钥!")
logger.error(log_msg)
return None
def show_all_keys(self):
log_msg = format_log_message('INFO', f"当前可用API key个数: {len(self.api_keys)} ")
logger.info(log_msg)
for i, api_key in enumerate(self.api_keys):
log_msg = format_log_message('INFO', f"API Key{i}: {api_key[:8]}...{api_key[-3:]}")
logger.info(log_msg)
# def blacklist_key(self, key):
# log_msg = format_log_message('WARNING', f"{key[:8]} → 暂时禁用 {self.api_key_blacklist_duration} 秒")
# logger.warning(log_msg)
# self.api_key_blacklist.add(key)
# self.scheduler.add_job(lambda: self.api_key_blacklist.discard(key), 'date',
# run_date=datetime.now() + timedelta(seconds=self.api_key_blacklist_duration))
async def test_api_key(api_key: str) -> bool:
"""
测试 API 密钥是否有效。
"""
try:
import httpx
url = "https://generativelanguage.googleapis.com/v1beta/models?key={}".format(api_key)
async with httpx.AsyncClient() as client:
response = await client.get(url)
response.raise_for_status()
return True
except Exception:
return False
|