from datetime import datetime, timezone from typing import Any, Dict, Optional from app.config.config import settings from app.log.logger import get_model_logger from app.service.client.api_client import GeminiApiClient logger = get_model_logger() class ModelService: async def get_gemini_models(self, api_key: str) -> Optional[Dict[str, Any]]: api_client = GeminiApiClient(base_url=settings.BASE_URL) gemini_models = await api_client.get_models(api_key) if gemini_models is None: logger.error("从 API 客户端获取模型列表失败。") return None try: filtered_models_list = [] for model in gemini_models.get("models", []): model_id = model["name"].split("/")[-1] if model_id not in settings.FILTERED_MODELS: filtered_models_list.append(model) else: logger.debug(f"Filtered out model: {model_id}") gemini_models["models"] = filtered_models_list return gemini_models except Exception as e: logger.error(f"处理模型列表时出错: {e}") return None async def get_gemini_openai_models(self, api_key: str) -> Optional[Dict[str, Any]]: """获取 Gemini 模型并转换为 OpenAI 格式""" gemini_models = await self.get_gemini_models(api_key) if gemini_models is None: return None return await self.convert_to_openai_models_format(gemini_models) async def convert_to_openai_models_format( self, gemini_models: Dict[str, Any] ) -> Dict[str, Any]: openai_format = {"object": "list", "data": [], "success": True} for model in gemini_models.get("models", []): model_id = model["name"].split("/")[-1] openai_model = { "id": model_id, "object": "model", "created": int(datetime.now(timezone.utc).timestamp()), "owned_by": "google", "permission": [], "root": model["name"], "parent": None, } openai_format["data"].append(openai_model) if model_id in settings.SEARCH_MODELS: search_model = openai_model.copy() search_model["id"] = f"{model_id}-search" openai_format["data"].append(search_model) if model_id in settings.IMAGE_MODELS: image_model = openai_model.copy() image_model["id"] = f"{model_id}-image" openai_format["data"].append(image_model) if model_id in settings.THINKING_MODELS: non_thinking_model = openai_model.copy() non_thinking_model["id"] = f"{model_id}-non-thinking" openai_format["data"].append(non_thinking_model) if settings.CREATE_IMAGE_MODEL: image_model = openai_model.copy() image_model["id"] = f"{settings.CREATE_IMAGE_MODEL}-chat" openai_format["data"].append(image_model) return openai_format async def check_model_support(self, model: str) -> bool: if not model or not isinstance(model, str): return False model = model.strip() if model.endswith("-search"): model = model[:-7] return model in settings.SEARCH_MODELS if model.endswith("-image"): model = model[:-6] return model in settings.IMAGE_MODELS return model not in settings.FILTERED_MODELS