File size: 3,602 Bytes
76b9762
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
from datetime import datetime, timezone
from typing import Any, Dict, Optional

from app.config.config import settings
from app.log.logger import get_model_logger
from app.service.client.api_client import GeminiApiClient

logger = get_model_logger()


class ModelService:
    async def get_gemini_models(self, api_key: str) -> Optional[Dict[str, Any]]:
        api_client = GeminiApiClient(base_url=settings.BASE_URL)
        gemini_models = await api_client.get_models(api_key)

        if gemini_models is None:
            logger.error("从 API 客户端获取模型列表失败。")
            return None

        try:
            filtered_models_list = []
            for model in gemini_models.get("models", []):
                model_id = model["name"].split("/")[-1]
                if model_id not in settings.FILTERED_MODELS:
                    filtered_models_list.append(model)
                else:
                    logger.debug(f"Filtered out model: {model_id}")

            gemini_models["models"] = filtered_models_list
            return gemini_models
        except Exception as e:
            logger.error(f"处理模型列表时出错: {e}")
            return None

    async def get_gemini_openai_models(self, api_key: str) -> Optional[Dict[str, Any]]:
        """获取 Gemini 模型并转换为 OpenAI 格式"""
        gemini_models = await self.get_gemini_models(api_key)
        if gemini_models is None:
            return None
        
        return await self.convert_to_openai_models_format(gemini_models)

    async def convert_to_openai_models_format(
        self, gemini_models: Dict[str, Any]
    ) -> Dict[str, Any]:
        openai_format = {"object": "list", "data": [], "success": True}

        for model in gemini_models.get("models", []):
            model_id = model["name"].split("/")[-1]
            openai_model = {
                "id": model_id,
                "object": "model",
                "created": int(datetime.now(timezone.utc).timestamp()),
                "owned_by": "google",
                "permission": [],
                "root": model["name"],
                "parent": None,
            }
            openai_format["data"].append(openai_model)

            if model_id in settings.SEARCH_MODELS:
                search_model = openai_model.copy()
                search_model["id"] = f"{model_id}-search"
                openai_format["data"].append(search_model)
            if model_id in settings.IMAGE_MODELS:
                image_model = openai_model.copy()
                image_model["id"] = f"{model_id}-image"
                openai_format["data"].append(image_model)
            if model_id in settings.THINKING_MODELS:
                non_thinking_model = openai_model.copy()
                non_thinking_model["id"] = f"{model_id}-non-thinking"
                openai_format["data"].append(non_thinking_model)

        if settings.CREATE_IMAGE_MODEL:
            image_model = openai_model.copy()
            image_model["id"] = f"{settings.CREATE_IMAGE_MODEL}-chat"
            openai_format["data"].append(image_model)
        return openai_format

    async def check_model_support(self, model: str) -> bool:
        if not model or not isinstance(model, str):
            return False

        model = model.strip()
        if model.endswith("-search"):
            model = model[:-7]
            return model in settings.SEARCH_MODELS
        if model.endswith("-image"):
            model = model[:-6]
            return model in settings.IMAGE_MODELS

        return model not in settings.FILTERED_MODELS