Spaces:
Running
Running
File size: 3,977 Bytes
7cc3183 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
import httpx
import asyncio
import json
from typing import List, Dict, Optional, Any
# Assuming config.py is in the same directory level for Docker execution
import config as app_config
_model_cache: Optional[Dict[str, List[str]]] = None
_cache_lock = asyncio.Lock()
async def fetch_and_parse_models_config() -> Optional[Dict[str, List[str]]]:
"""
Fetches the model configuration JSON from the URL specified in app_config.
Parses it and returns a dictionary with 'vertex_models' and 'vertex_express_models'.
Returns None if fetching or parsing fails.
"""
if not app_config.MODELS_CONFIG_URL:
print("ERROR: MODELS_CONFIG_URL is not set in the environment/config.")
return None
print(f"Fetching model configuration from: {app_config.MODELS_CONFIG_URL}")
try:
async with httpx.AsyncClient() as client:
response = await client.get(app_config.MODELS_CONFIG_URL)
response.raise_for_status() # Raise an exception for HTTP errors (4xx or 5xx)
data = response.json()
# Basic validation of the fetched data structure
if isinstance(data, dict) and \
"vertex_models" in data and isinstance(data["vertex_models"], list) and \
"vertex_express_models" in data and isinstance(data["vertex_express_models"], list):
print("Successfully fetched and parsed model configuration.")
return {
"vertex_models": data["vertex_models"],
"vertex_express_models": data["vertex_express_models"]
}
else:
print(f"ERROR: Fetched model configuration has an invalid structure: {data}")
return None
except httpx.RequestError as e:
print(f"ERROR: HTTP request failed while fetching model configuration: {e}")
return None
except json.JSONDecodeError as e:
print(f"ERROR: Failed to decode JSON from model configuration: {e}")
return None
except Exception as e:
print(f"ERROR: An unexpected error occurred while fetching/parsing model configuration: {e}")
return None
async def get_models_config() -> Dict[str, List[str]]:
"""
Returns the cached model configuration.
If not cached, fetches and caches it.
Returns a default empty structure if fetching fails.
"""
global _model_cache
async with _cache_lock:
if _model_cache is None:
print("Model cache is empty. Fetching configuration...")
_model_cache = await fetch_and_parse_models_config()
if _model_cache is None: # If fetching failed, use a default empty structure
print("WARNING: Using default empty model configuration due to fetch/parse failure.")
_model_cache = {"vertex_models": [], "vertex_express_models": []}
return _model_cache
async def get_vertex_models() -> List[str]:
config = await get_models_config()
return config.get("vertex_models", [])
async def get_vertex_express_models() -> List[str]:
config = await get_models_config()
return config.get("vertex_express_models", [])
async def refresh_models_config_cache() -> bool:
"""
Forces a refresh of the model configuration cache.
Returns True if successful, False otherwise.
"""
global _model_cache
print("Attempting to refresh model configuration cache...")
async with _cache_lock:
new_config = await fetch_and_parse_models_config()
if new_config is not None:
_model_cache = new_config
print("Model configuration cache refreshed successfully.")
return True
else:
print("ERROR: Failed to refresh model configuration cache.")
# Optionally, decide if we want to clear the old cache or keep it
# _model_cache = {"vertex_models": [], "vertex_express_models": []} # To clear
return False |