Spaces:
Running
Running
File size: 6,243 Bytes
76b9762 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import StreamingResponse
from copy import deepcopy
from app.config.config import settings
from app.log.logger import get_vertex_express_logger
from app.core.security import SecurityService
from app.domain.gemini_models import GeminiRequest
from app.service.chat.vertex_express_chat_service import GeminiChatService
from app.service.key.key_manager import KeyManager, get_key_manager_instance
from app.service.model.model_service import ModelService
from app.handler.retry_handler import RetryHandler
from app.handler.error_handler import handle_route_errors
from app.core.constants import API_VERSION
router = APIRouter(prefix=f"/vertex-express/{API_VERSION}")
logger = get_vertex_express_logger()
security_service = SecurityService()
model_service = ModelService()
async def get_key_manager():
"""获取密钥管理器实例"""
return await get_key_manager_instance()
async def get_next_working_key(key_manager: KeyManager = Depends(get_key_manager)):
"""获取下一个可用的API密钥"""
return await key_manager.get_next_working_vertex_key()
async def get_chat_service(key_manager: KeyManager = Depends(get_key_manager)):
"""获取Gemini聊天服务实例"""
return GeminiChatService(settings.VERTEX_EXPRESS_BASE_URL, key_manager)
@router.get("/models")
async def list_models(
_=Depends(security_service.verify_key_or_goog_api_key),
key_manager: KeyManager = Depends(get_key_manager)
):
"""获取可用的 Gemini 模型列表,并根据配置添加衍生模型(搜索、图像、非思考)。"""
operation_name = "list_gemini_models"
logger.info("-" * 50 + operation_name + "-" * 50)
logger.info("Handling Gemini models list request")
try:
api_key = await key_manager.get_first_valid_key()
if not api_key:
raise HTTPException(status_code=503, detail="No valid API keys available to fetch models.")
logger.info(f"Using API key: {api_key}")
models_data = await model_service.get_gemini_models(api_key)
if not models_data or "models" not in models_data:
raise HTTPException(status_code=500, detail="Failed to fetch base models list.")
models_json = deepcopy(models_data)
model_mapping = {x.get("name", "").split("/", maxsplit=1)[-1]: x for x in models_json.get("models", [])}
def add_derived_model(base_name, suffix, display_suffix):
model = model_mapping.get(base_name)
if not model:
logger.warning(f"Base model '{base_name}' not found for derived model '{suffix}'.")
return
item = deepcopy(model)
item["name"] = f"models/{base_name}{suffix}"
display_name = f'{item.get("displayName", base_name)}{display_suffix}'
item["displayName"] = display_name
item["description"] = display_name
models_json["models"].append(item)
if settings.SEARCH_MODELS:
for name in settings.SEARCH_MODELS:
add_derived_model(name, "-search", " For Search")
if settings.IMAGE_MODELS:
for name in settings.IMAGE_MODELS:
add_derived_model(name, "-image", " For Image")
if settings.THINKING_MODELS:
for name in settings.THINKING_MODELS:
add_derived_model(name, "-non-thinking", " Non Thinking")
logger.info("Gemini models list request successful")
return models_json
except HTTPException as http_exc:
raise http_exc
except Exception as e:
logger.error(f"Error getting Gemini models list: {str(e)}")
raise HTTPException(
status_code=500, detail="Internal server error while fetching Gemini models list"
) from e
@router.post("/models/{model_name}:generateContent")
@RetryHandler(key_arg="api_key")
async def generate_content(
model_name: str,
request: GeminiRequest,
_=Depends(security_service.verify_key_or_goog_api_key),
api_key: str = Depends(get_next_working_key),
key_manager: KeyManager = Depends(get_key_manager),
chat_service: GeminiChatService = Depends(get_chat_service)
):
"""处理 Gemini 非流式内容生成请求。"""
operation_name = "gemini_generate_content"
async with handle_route_errors(logger, operation_name, failure_message="Content generation failed"):
logger.info(f"Handling Gemini content generation request for model: {model_name}")
logger.debug(f"Request: \n{request.model_dump_json(indent=2)}")
logger.info(f"Using API key: {api_key}")
if not await model_service.check_model_support(model_name):
raise HTTPException(status_code=400, detail=f"Model {model_name} is not supported")
response = await chat_service.generate_content(
model=model_name,
request=request,
api_key=api_key
)
return response
@router.post("/models/{model_name}:streamGenerateContent")
@RetryHandler(key_arg="api_key")
async def stream_generate_content(
model_name: str,
request: GeminiRequest,
_=Depends(security_service.verify_key_or_goog_api_key),
api_key: str = Depends(get_next_working_key),
key_manager: KeyManager = Depends(get_key_manager),
chat_service: GeminiChatService = Depends(get_chat_service)
):
"""处理 Gemini 流式内容生成请求。"""
operation_name = "gemini_stream_generate_content"
async with handle_route_errors(logger, operation_name, failure_message="Streaming request initiation failed"):
logger.info(f"Handling Gemini streaming content generation for model: {model_name}")
logger.debug(f"Request: \n{request.model_dump_json(indent=2)}")
logger.info(f"Using API key: {api_key}")
if not await model_service.check_model_support(model_name):
raise HTTPException(status_code=400, detail=f"Model {model_name} is not supported")
response_stream = chat_service.stream_generate_content(
model=model_name,
request=request,
api_key=api_key
)
return StreamingResponse(response_stream, media_type="text/event-stream") |