Spaces:
Running
Running
import httpx | |
import asyncio | |
import json | |
from typing import List, Dict, Optional, Any | |
from app.utils.logging import vertex_log | |
# 导入settings和app_config | |
from app.config import settings | |
import app.vertex.config as app_config | |
_model_cache: Optional[Dict[str, List[str]]] = None | |
_cache_lock = asyncio.Lock() | |
async def fetch_and_parse_models_config() -> Optional[Dict[str, List[str]]]: | |
""" | |
Fetches the model configuration JSON from the URL specified in app_config. | |
Parses it and returns a dictionary with 'vertex_models' and 'vertex_express_models'. | |
Returns None if fetching or parsing fails. | |
""" | |
# 优先从settings中获取MODELS_CONFIG_URL | |
models_config_url = None | |
if hasattr(settings, 'MODELS_CONFIG_URL') and settings.MODELS_CONFIG_URL: | |
models_config_url = settings.MODELS_CONFIG_URL | |
vertex_log('info', "使用settings中的MODELS_CONFIG_URL") | |
else: | |
models_config_url = app_config.MODELS_CONFIG_URL | |
vertex_log('info', "使用app_config中的MODELS_CONFIG_URL") | |
if not models_config_url: | |
vertex_log('error', "MODELS_CONFIG_URL is not set in the environment/config.") | |
vertex_log('info', "Using default model configuration with empty lists.") | |
return { | |
"vertex_models": [], | |
"vertex_express_models": [] | |
} | |
vertex_log('info', f"Fetching model configuration from: {models_config_url}") | |
# 添加重试机制 | |
max_retries = 3 | |
retry_delay = 1 # 初始延迟1秒 | |
for retry in range(max_retries): | |
try: | |
async with httpx.AsyncClient(timeout=30.0) as client: # 增加超时时间 | |
vertex_log('info', f"尝试获取模型配置,第{retry+1}次尝试") | |
response = await client.get(models_config_url) | |
response.raise_for_status() # Raise an exception for HTTP errors | |
# 记录原始响应内容,便于调试 | |
response_text = response.text | |
vertex_log('debug', f"接收到原始响应: {response_text[:200]}...") # 只记录前200个字符 | |
data = response.json() | |
# 更详细的验证和日志 | |
if not isinstance(data, dict): | |
vertex_log('error', f"模型配置不是有效的JSON对象: {type(data)}") | |
await asyncio.sleep(retry_delay) | |
retry_delay *= 2 # 指数退避 | |
continue | |
if "vertex_models" not in data: | |
vertex_log('error', f"模型配置缺少'vertex_models'字段") | |
await asyncio.sleep(retry_delay) | |
retry_delay *= 2 | |
continue | |
if "vertex_express_models" not in data: | |
vertex_log('error', f"模型配置缺少'vertex_express_models'字段") | |
await asyncio.sleep(retry_delay) | |
retry_delay *= 2 | |
continue | |
if not isinstance(data["vertex_models"], list): | |
vertex_log('error', f"'vertex_models'不是列表: {type(data['vertex_models'])}") | |
await asyncio.sleep(retry_delay) | |
retry_delay *= 2 | |
continue | |
if not isinstance(data["vertex_express_models"], list): | |
vertex_log('error', f"'vertex_express_models'不是列表: {type(data['vertex_express_models'])}") | |
await asyncio.sleep(retry_delay) | |
retry_delay *= 2 | |
continue | |
vertex_log('info', f"成功获取和解析模型配置。找到 {len(data['vertex_models'])} 个标准模型和 {len(data['vertex_express_models'])} 个Express模型。") | |
# Add [EXPRESS] prefix to express models | |
prefixed_express_models = [f"[EXPRESS] {model_name}" for model_name in data["vertex_express_models"]] | |
return { | |
"vertex_models": data["vertex_models"], | |
"vertex_express_models": prefixed_express_models | |
} | |
except httpx.RequestError as e: | |
vertex_log('error', f"HTTP请求失败({retry+1}/{max_retries}): {e}") | |
if retry < max_retries - 1: | |
vertex_log('info', f"将在{retry_delay}秒后重试...") | |
await asyncio.sleep(retry_delay) | |
retry_delay *= 2 # 指数退避 | |
else: | |
vertex_log('error', f"HTTP请求在{max_retries}次尝试后仍然失败,放弃尝试") | |
return None | |
except json.JSONDecodeError as e: | |
vertex_log('error', f"JSON解析失败({retry+1}/{max_retries}): {e}") | |
if retry < max_retries - 1: | |
vertex_log('info', f"将在{retry_delay}秒后重试...") | |
await asyncio.sleep(retry_delay) | |
retry_delay *= 2 | |
else: | |
vertex_log('error', f"JSON解析在{max_retries}次尝试后仍然失败,放弃尝试") | |
return None | |
except Exception as e: | |
vertex_log('error', f"获取/解析模型配置时发生意外错误({retry+1}/{max_retries}): {str(e)}") | |
if retry < max_retries - 1: | |
vertex_log('info', f"将在{retry_delay}秒后重试...") | |
await asyncio.sleep(retry_delay) | |
retry_delay *= 2 | |
else: | |
vertex_log('error', f"获取/解析在{max_retries}次尝试后仍然失败,放弃尝试") | |
return None | |
# 如果所有重试都失败,返回默认空配置 | |
vertex_log('warning', "获取模型配置失败,使用默认空配置") | |
return { | |
"vertex_models": [], | |
"vertex_express_models": [] | |
} | |
async def get_models_config() -> Dict[str, List[str]]: | |
""" | |
Returns the cached model configuration. | |
If not cached, fetches and caches it. | |
Returns a default empty structure if fetching fails. | |
""" | |
global _model_cache | |
async with _cache_lock: | |
if _model_cache is None: | |
vertex_log('info', "Model cache is empty. Fetching configuration...") | |
_model_cache = await fetch_and_parse_models_config() | |
if _model_cache is None: # If fetching failed, use a default empty structure | |
vertex_log('warning', "Using default empty model configuration due to fetch/parse failure.") | |
_model_cache = {"vertex_models": [], "vertex_express_models": []} | |
return _model_cache | |
async def get_vertex_models() -> List[str]: | |
config = await get_models_config() | |
return config.get("vertex_models", []) | |
async def get_vertex_express_models() -> List[str]: | |
config = await get_models_config() | |
return config.get("vertex_express_models", []) | |
async def refresh_models_config_cache() -> bool: | |
""" | |
Forces a refresh of the model configuration cache. | |
Returns True if successful, False otherwise. | |
""" | |
global _model_cache | |
vertex_log('info', "Attempting to refresh model configuration cache...") | |
async with _cache_lock: | |
new_config = await fetch_and_parse_models_config() | |
if new_config is not None: | |
_model_cache = new_config | |
vertex_log('info', "Model configuration cache refreshed successfully.") | |
return True | |
else: | |
vertex_log('error', "Failed to refresh model configuration cache.") | |
# Optionally, decide if we want to clear the old cache or keep it | |
# _model_cache = {"vertex_models": [], "vertex_express_models": []} # To clear | |
return False |