Spaces:
Running
Running
File size: 20,546 Bytes
76b9762 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 |
import asyncio
from itertools import cycle
from typing import Dict, Union
from app.config.config import settings
from app.log.logger import get_key_manager_logger
logger = get_key_manager_logger()
class KeyManager:
def __init__(self, api_keys: list, vertex_api_keys: list):
self.api_keys = api_keys
self.vertex_api_keys = vertex_api_keys
self.key_cycle = cycle(api_keys)
self.vertex_key_cycle = cycle(vertex_api_keys)
self.key_cycle_lock = asyncio.Lock()
self.vertex_key_cycle_lock = asyncio.Lock()
self.failure_count_lock = asyncio.Lock()
self.vertex_failure_count_lock = asyncio.Lock()
self.key_failure_counts: Dict[str, int] = {key: 0 for key in api_keys}
self.vertex_key_failure_counts: Dict[str, int] = {
key: 0 for key in vertex_api_keys
}
self.MAX_FAILURES = settings.MAX_FAILURES
self.paid_key = settings.PAID_KEY
async def get_paid_key(self) -> str:
return self.paid_key
async def get_next_key(self) -> str:
"""获取下一个API key"""
async with self.key_cycle_lock:
return next(self.key_cycle)
async def get_next_vertex_key(self) -> str:
"""获取下一个 Vertex API key"""
async with self.vertex_key_cycle_lock:
return next(self.vertex_key_cycle)
async def is_key_valid(self, key: str) -> bool:
"""检查key是否有效"""
async with self.failure_count_lock:
return self.key_failure_counts[key] < self.MAX_FAILURES
async def is_vertex_key_valid(self, key: str) -> bool:
"""检查 Vertex key 是否有效"""
async with self.vertex_failure_count_lock:
return self.vertex_key_failure_counts[key] < self.MAX_FAILURES
async def reset_failure_counts(self):
"""重置所有key的失败计数"""
async with self.failure_count_lock:
for key in self.key_failure_counts:
self.key_failure_counts[key] = 0
async def reset_vertex_failure_counts(self):
"""重置所有 Vertex key 的失败计数"""
async with self.vertex_failure_count_lock:
for key in self.vertex_key_failure_counts:
self.vertex_key_failure_counts[key] = 0
async def reset_key_failure_count(self, key: str) -> bool:
"""重置指定key的失败计数"""
async with self.failure_count_lock:
if key in self.key_failure_counts:
self.key_failure_counts[key] = 0
logger.info(f"Reset failure count for key: {key}")
return True
logger.warning(
f"Attempt to reset failure count for non-existent key: {key}"
)
return False
async def reset_vertex_key_failure_count(self, key: str) -> bool:
"""重置指定 Vertex key 的失败计数"""
async with self.vertex_failure_count_lock:
if key in self.vertex_key_failure_counts:
self.vertex_key_failure_counts[key] = 0
logger.info(f"Reset failure count for Vertex key: {key}")
return True
logger.warning(
f"Attempt to reset failure count for non-existent Vertex key: {key}"
)
return False
async def get_next_working_key(self) -> str:
"""获取下一可用的API key"""
initial_key = await self.get_next_key()
current_key = initial_key
while True:
if await self.is_key_valid(current_key):
return current_key
current_key = await self.get_next_key()
if current_key == initial_key:
return current_key
async def get_next_working_vertex_key(self) -> str:
"""获取下一可用的 Vertex API key"""
initial_key = await self.get_next_vertex_key()
current_key = initial_key
while True:
if await self.is_vertex_key_valid(current_key):
return current_key
current_key = await self.get_next_vertex_key()
if current_key == initial_key:
return current_key
async def handle_api_failure(self, api_key: str, retries: int) -> str:
"""处理API调用失败"""
async with self.failure_count_lock:
self.key_failure_counts[api_key] += 1
if self.key_failure_counts[api_key] >= self.MAX_FAILURES:
logger.warning(
f"API key {api_key} has failed {self.MAX_FAILURES} times"
)
if retries < settings.MAX_RETRIES:
return await self.get_next_working_key()
else:
return ""
async def handle_vertex_api_failure(self, api_key: str, retries: int) -> str:
"""处理 Vertex API 调用失败"""
async with self.vertex_failure_count_lock:
self.vertex_key_failure_counts[api_key] += 1
if self.vertex_key_failure_counts[api_key] >= self.MAX_FAILURES:
logger.warning(
f"Vertex API key {api_key} has failed {self.MAX_FAILURES} times"
)
def get_fail_count(self, key: str) -> int:
"""获取指定密钥的失败次数"""
return self.key_failure_counts.get(key, 0)
def get_vertex_fail_count(self, key: str) -> int:
"""获取指定 Vertex 密钥的失败次数"""
return self.vertex_key_failure_counts.get(key, 0)
async def get_keys_by_status(self) -> dict:
"""获取分类后的API key列表,包括失败次数"""
valid_keys = {}
invalid_keys = {}
async with self.failure_count_lock:
for key in self.api_keys:
fail_count = self.key_failure_counts[key]
if fail_count < self.MAX_FAILURES:
valid_keys[key] = fail_count
else:
invalid_keys[key] = fail_count
return {"valid_keys": valid_keys, "invalid_keys": invalid_keys}
async def get_vertex_keys_by_status(self) -> dict:
"""获取分类后的 Vertex API key 列表,包括失败次数"""
valid_keys = {}
invalid_keys = {}
async with self.vertex_failure_count_lock:
for key in self.vertex_api_keys:
fail_count = self.vertex_key_failure_counts[key]
if fail_count < self.MAX_FAILURES:
valid_keys[key] = fail_count
else:
invalid_keys[key] = fail_count
return {"valid_keys": valid_keys, "invalid_keys": invalid_keys}
async def get_first_valid_key(self) -> str:
"""获取第一个有效的API key"""
async with self.failure_count_lock:
for key in self.key_failure_counts:
if self.key_failure_counts[key] < self.MAX_FAILURES:
return key
if self.api_keys:
return self.api_keys[0]
if not self.api_keys:
logger.warning(
"API key list is empty, cannot get first valid key.")
return ""
return self.api_keys[0]
_singleton_instance = None
_singleton_lock = asyncio.Lock()
_preserved_failure_counts: Union[Dict[str, int], None] = None
_preserved_vertex_failure_counts: Union[Dict[str, int], None] = None
_preserved_old_api_keys_for_reset: Union[list, None] = None
_preserved_vertex_old_api_keys_for_reset: Union[list, None] = None
_preserved_next_key_in_cycle: Union[str, None] = None
_preserved_vertex_next_key_in_cycle: Union[str, None] = None
async def get_key_manager_instance(
api_keys: list = None, vertex_api_keys: list = None
) -> KeyManager:
"""
获取 KeyManager 单例实例。
如果尚未创建实例,将使用提供的 api_keys,vertex_api_keys 初始化 KeyManager。
如果已创建实例,则忽略 api_keys 参数,返回现有单例。
如果在重置后调用,会尝试恢复之前的状态(失败计数、循环位置)。
"""
global _singleton_instance, _preserved_failure_counts, _preserved_vertex_failure_counts, _preserved_old_api_keys_for_reset, _preserved_vertex_old_api_keys_for_reset, _preserved_next_key_in_cycle, _preserved_vertex_next_key_in_cycle
async with _singleton_lock:
if _singleton_instance is None:
if api_keys is None:
raise ValueError(
"API keys are required to initialize or re-initialize the KeyManager instance."
)
if vertex_api_keys is None:
raise ValueError(
"Vertex API keys are required to initialize or re-initialize the KeyManager instance."
)
if not api_keys:
logger.warning(
"Initializing KeyManager with an empty list of API keys."
)
if not vertex_api_keys:
logger.warning(
"Initializing KeyManager with an empty list of Vertex API keys."
)
_singleton_instance = KeyManager(api_keys, vertex_api_keys)
logger.info(
f"KeyManager instance created/re-created with {len(api_keys)} API keys and {len(vertex_api_keys)} Vertex API keys."
)
# 1. 恢复失败计数
if _preserved_failure_counts:
current_failure_counts = {
key: 0 for key in _singleton_instance.api_keys
}
for key, count in _preserved_failure_counts.items():
if key in current_failure_counts:
current_failure_counts[key] = count
_singleton_instance.key_failure_counts = current_failure_counts
logger.info("Inherited failure counts for applicable keys.")
_preserved_failure_counts = None
if _preserved_vertex_failure_counts:
current_vertex_failure_counts = {
key: 0 for key in _singleton_instance.vertex_api_keys
}
for key, count in _preserved_vertex_failure_counts.items():
if key in current_vertex_failure_counts:
current_vertex_failure_counts[key] = count
_singleton_instance.vertex_key_failure_counts = (
current_vertex_failure_counts
)
logger.info(
"Inherited failure counts for applicable Vertex keys.")
_preserved_vertex_failure_counts = None
# 2. 调整 key_cycle 的起始点
start_key_for_new_cycle = None
if (
_preserved_old_api_keys_for_reset
and _preserved_next_key_in_cycle
and _singleton_instance.api_keys
):
try:
start_idx_in_old = _preserved_old_api_keys_for_reset.index(
_preserved_next_key_in_cycle
)
for i in range(len(_preserved_old_api_keys_for_reset)):
current_old_key_idx = (start_idx_in_old + i) % len(
_preserved_old_api_keys_for_reset
)
key_candidate = _preserved_old_api_keys_for_reset[
current_old_key_idx
]
if key_candidate in _singleton_instance.api_keys:
start_key_for_new_cycle = key_candidate
break
except ValueError:
logger.warning(
f"Preserved next key '{_preserved_next_key_in_cycle}' not found in preserved old API keys. "
"New cycle will start from the beginning of the new list."
)
except Exception as e:
logger.error(
f"Error determining start key for new cycle from preserved state: {e}. "
"New cycle will start from the beginning."
)
if start_key_for_new_cycle and _singleton_instance.api_keys:
try:
target_idx = _singleton_instance.api_keys.index(
start_key_for_new_cycle
)
for _ in range(target_idx):
next(_singleton_instance.key_cycle)
logger.info(
f"Key cycle in new instance advanced. Next call to get_next_key() will yield: {start_key_for_new_cycle}"
)
except ValueError:
logger.warning(
f"Determined start key '{start_key_for_new_cycle}' not found in new API keys during cycle advancement. "
"New cycle will start from the beginning."
)
except StopIteration:
logger.error(
"StopIteration while advancing key cycle, implies empty new API key list previously missed."
)
except Exception as e:
logger.error(
f"Error advancing new key cycle: {e}. Cycle will start from beginning."
)
else:
if _singleton_instance.api_keys:
logger.info(
"New key cycle will start from the beginning of the new API key list (no specific start key determined or needed)."
)
else:
logger.info(
"New key cycle not applicable as the new API key list is empty."
)
# 清理所有保存的状态
_preserved_old_api_keys_for_reset = None
_preserved_next_key_in_cycle = None
# 3. 调整 vertex_key_cycle 的起始点
start_key_for_new_vertex_cycle = None
if (
_preserved_vertex_old_api_keys_for_reset
and _preserved_vertex_next_key_in_cycle
and _singleton_instance.vertex_api_keys
):
try:
start_idx_in_old = _preserved_vertex_old_api_keys_for_reset.index(
_preserved_vertex_next_key_in_cycle
)
for i in range(len(_preserved_vertex_old_api_keys_for_reset)):
current_old_key_idx = (start_idx_in_old + i) % len(
_preserved_vertex_old_api_keys_for_reset
)
key_candidate = _preserved_vertex_old_api_keys_for_reset[
current_old_key_idx
]
if key_candidate in _singleton_instance.vertex_api_keys:
start_key_for_new_vertex_cycle = key_candidate
break
except ValueError:
logger.warning(
f"Preserved next key '{_preserved_vertex_next_key_in_cycle}' not found in preserved old Vertex API keys. "
"New cycle will start from the beginning of the new list."
)
except Exception as e:
logger.error(
f"Error determining start key for new Vertex key cycle from preserved state: {e}. "
"New cycle will start from the beginning."
)
if start_key_for_new_vertex_cycle and _singleton_instance.vertex_api_keys:
try:
target_idx = _singleton_instance.vertex_api_keys.index(
start_key_for_new_vertex_cycle
)
for _ in range(target_idx):
next(_singleton_instance.vertex_key_cycle)
logger.info(
f"Vertex key cycle in new instance advanced. Next call to get_next_vertex_key() will yield: {start_key_for_new_vertex_cycle}"
)
except ValueError:
logger.warning(
f"Determined start key '{start_key_for_new_vertex_cycle}' not found in new Vertex API keys during cycle advancement. "
"New cycle will start from the beginning."
)
except StopIteration:
logger.error(
"StopIteration while advancing Vertex key cycle, implies empty new Vertex API key list previously missed."
)
except Exception as e:
logger.error(
f"Error advancing new Vertex key cycle: {e}. Cycle will start from beginning."
)
else:
if _singleton_instance.vertex_api_keys:
logger.info(
"New Vertex key cycle will start from the beginning of the new Vertex API key list (no specific start key determined or needed)."
)
else:
logger.info(
"New Vertex key cycle not applicable as the new Vertex API key list is empty."
)
# 清理所有保存的状态
_preserved_vertex_old_api_keys_for_reset = None
_preserved_vertex_next_key_in_cycle = None
return _singleton_instance
async def reset_key_manager_instance():
"""
重置 KeyManager 单例实例。
将保存当前实例的状态(失败计数、旧 API keys、下一个 key 提示)
以供下一次 get_key_manager_instance 调用时恢复。
"""
global _singleton_instance, _preserved_failure_counts, _preserved_vertex_failure_counts, _preserved_old_api_keys_for_reset, _preserved_vertex_old_api_keys_for_reset, _preserved_next_key_in_cycle, _preserved_vertex_next_key_in_cycle
async with _singleton_lock:
if _singleton_instance:
# 1. 保存失败计数
_preserved_failure_counts = _singleton_instance.key_failure_counts.copy()
_preserved_vertex_failure_counts = _singleton_instance.vertex_key_failure_counts.copy()
# 2. 保存旧的 API keys 列表
_preserved_old_api_keys_for_reset = _singleton_instance.api_keys.copy()
_preserved_vertex_old_api_keys_for_reset = _singleton_instance.vertex_api_keys.copy()
# 3. 保存 key_cycle 的下一个 key 提示
try:
if _singleton_instance.api_keys:
_preserved_next_key_in_cycle = (
await _singleton_instance.get_next_key()
)
else:
_preserved_next_key_in_cycle = None
except StopIteration:
logger.warning(
"Could not preserve next key hint: key cycle was empty or exhausted in old instance."
)
_preserved_next_key_in_cycle = None
except Exception as e:
logger.error(
f"Error preserving next key hint during reset: {e}")
_preserved_next_key_in_cycle = None
# 4. 保存 vertex_key_cycle 的下一个 key 提示
try:
if _singleton_instance.vertex_api_keys:
_preserved_vertex_next_key_in_cycle = (
await _singleton_instance.get_next_vertex_key()
)
else:
_preserved_vertex_next_key_in_cycle = None
except StopIteration:
logger.warning(
"Could not preserve next key hint: Vertex key cycle was empty or exhausted in old instance."
)
_preserved_vertex_next_key_in_cycle = None
except Exception as e:
logger.error(
f"Error preserving next key hint during reset: {e}")
_preserved_vertex_next_key_in_cycle = None
_singleton_instance = None
logger.info(
"KeyManager instance has been reset. State (failure counts, old keys, next key hint) preserved for next instantiation."
)
else:
logger.info(
"KeyManager instance was not set (or already reset), no reset action performed."
)
|