fufeigemini / app /api /stream_handlers.py
Leeflour's picture
Upload 197 files
d0dd276 verified
import asyncio
import json
from fastapi.responses import StreamingResponse
from app.models.schemas import ChatCompletionRequest
from app.services import GeminiClient
from app.utils import handle_gemini_error, update_api_call_stats,log,openAI_from_text
from app.utils.response import openAI_from_Gemini,gemini_from_text
from app.utils.stats import get_api_key_usage
import app.config.settings as settings
async def stream_response_generator(
chat_request,
key_manager,
response_cache_manager,
safety_settings,
safety_settings_g2,
cache_key: str
):
format_type = getattr(chat_request, 'format_type', None)
if format_type and (format_type == "gemini"):
is_gemini = True
contents, system_instruction = None,None
else:
is_gemini = False
# 转换消息格式
contents, system_instruction = GeminiClient.convert_messages(GeminiClient, chat_request.messages,model=chat_request.model)
# 设置初始并发数
current_concurrent = settings.CONCURRENT_REQUESTS
max_retry_num = settings.MAX_RETRY_NUM
# 当前请求次数
current_try_num = 0
# 空响应计数
empty_response_count = 0
# (假流式) 尝试使用不同API密钥,直到达到最大重试次数或空响应限制
while (settings.FAKE_STREAMING and (current_try_num < max_retry_num) and (empty_response_count < settings.MAX_EMPTY_RESPONSES)):
# 获取当前批次的密钥数量
batch_num = min(max_retry_num - current_try_num, current_concurrent)
# 获取当前批次的密钥
valid_keys = []
checked_keys = set() # 用于记录已检查过的密钥
all_keys_checked = False # 标记是否已检查所有密钥
# 尝试获取足够数量的有效密钥
while len(valid_keys) < batch_num:
api_key = await key_manager.get_available_key()
if not api_key:
break
# 如果这个密钥已经检查过,说明已经检查了所有密钥
if api_key in checked_keys:
all_keys_checked = True
break
checked_keys.add(api_key)
# 获取API密钥的调用次数
usage = await get_api_key_usage(settings.api_call_stats, api_key)
# 如果调用次数小于限制,则添加到有效密钥列表
if usage < settings.API_KEY_DAILY_LIMIT:
valid_keys.append(api_key)
else:
log('warning', f"API密钥 {api_key[:8]}... 已达到每日调用限制 ({usage}/{settings.API_KEY_DAILY_LIMIT})",
extra={'key': api_key[:8], 'request_type': 'stream', 'model': chat_request.model})
# 如果已经检查了所有密钥且没有找到有效密钥,则重置密钥栈
if all_keys_checked and not valid_keys:
log('warning', "所有API密钥已达到每日调用限制,重置密钥栈",
extra={'request_type': 'stream', 'model': chat_request.model})
key_manager._reset_key_stack()
# 重置后重新获取一个密钥
api_key = await key_manager.get_available_key()
if api_key:
valid_keys = [api_key]
# 如果没有获取到任何有效密钥,跳出循环
if not valid_keys:
break
# 更新当前尝试次数
current_try_num += len(valid_keys)
# 创建并发任务
tasks = []
tasks_map = {}
for api_key in valid_keys:
# 假流式模式的处理逻辑
log('info', f"假流式请求开始,使用密钥: {api_key[:8]}...",
extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model})
task = asyncio.create_task(
handle_fake_streaming(
api_key,
chat_request,
contents,
response_cache_manager,
system_instruction,
safety_settings,
safety_settings_g2,
cache_key
)
)
tasks.append((api_key, task))
tasks_map[task] = api_key
# 等待所有任务完成或找到成功响应
success = False
while tasks and not success:
# 等待任务完成
done, pending = await asyncio.wait(
[task for _, task in tasks],
timeout=settings.FAKE_STREAMING_INTERVAL,
return_when=asyncio.FIRST_COMPLETED
)
# 如果没有任务完成,发送保活消息
if not done :
if is_gemini:
yield gemini_from_text(content='',stream=True)
else:
yield openAI_from_text(model=chat_request.model,content='',stream=True)
continue
# 检查已完成的任务是否成功
for task in done:
api_key = tasks_map[task]
if not task.cancelled():
try:
status = task.result()
# 如果有成功响应内容
if status == "success" :
success = True
log('info', f"假流式请求成功",
extra={'key': api_key[:8],'request_type': "fake-stream", 'model': chat_request.model})
cached_response, cache_hit = await response_cache_manager.get_and_remove(cache_key)
if cache_hit and cached_response:
if is_gemini :
json_payload = json.dumps(cached_response.data, ensure_ascii=False)
data_to_yield = f"data: {json_payload}\n\n"
yield data_to_yield
else:
yield openAI_from_Gemini(cached_response,stream=True)
else:
success = False
break
elif status == "empty":
# 增加空响应计数
empty_response_count += 1
log('warning', f"空响应计数: {empty_response_count}/{settings.MAX_EMPTY_RESPONSES}",
extra={'key': api_key[:8], 'request_type': 'stream', 'model': chat_request.model})
except Exception as e:
error_detail = handle_gemini_error(e, api_key)
log('error', f"请求失败: {error_detail}",
extra={'key': api_key[:8], 'request_type': 'stream', 'model': chat_request.model})
# 如果找到成功的响应,跳出循环
if success:
return
# 如果空响应次数达到限制,跳出循环
if empty_response_count >= settings.MAX_EMPTY_RESPONSES:
log('warning', f"空响应次数达到限制 ({empty_response_count}/{settings.MAX_EMPTY_RESPONSES}),停止轮询",
extra={'request_type': 'fake-stream', 'model': chat_request.model})
if is_gemini :
yield gemini_from_text(content="空响应次数达到上限\n请修改输入提示词",finish_reason="STOP",stream=True)
else:
yield openAI_from_text(model=chat_request.model,content="空响应次数达到上限\n请修改输入提示词",finish_reason="stop",stream=True)
return
# 更新任务列表,移除已完成的任务
tasks = [(k, t) for k, t in tasks if not t.done()]
# 如果所有请求都失败,增加并发数并继续尝试
if not success and valid_keys:
# 增加并发数,但不超过最大并发数
current_concurrent = min(current_concurrent + settings.INCREASE_CONCURRENT_ON_FAILURE, settings.MAX_CONCURRENT_REQUESTS)
log('info', f"所有假流式请求失败,增加并发数至: {current_concurrent}",
extra={'request_type': 'stream', 'model': chat_request.model})
# (真流式) 尝试使用不同API密钥,直到达到最大重试次数或空响应限制
while (not settings.FAKE_STREAMING and (current_try_num < max_retry_num) and (empty_response_count < settings.MAX_EMPTY_RESPONSES)):
# 获取当前批次的密钥
valid_keys = []
checked_keys = set() # 用于记录已检查过的密钥
all_keys_checked = False # 标记是否已检查所有密钥
# 尝试获取一个有效密钥
while len(valid_keys) < 1:
api_key = await key_manager.get_available_key()
if not api_key:
break
# 如果这个密钥已经检查过,说明已经检查了所有密钥
if api_key in checked_keys:
all_keys_checked = True
break
checked_keys.add(api_key)
# 获取API密钥的调用次数
usage = await get_api_key_usage(settings.api_call_stats, api_key)
# 如果调用次数小于限制,则添加到有效密钥列表
if usage < settings.API_KEY_DAILY_LIMIT:
valid_keys.append(api_key)
else:
log('warning', f"API密钥 {api_key[:8]}... 已达到每日调用限制 ({usage}/{settings.API_KEY_DAILY_LIMIT})",
extra={'key': api_key[:8], 'request_type': 'stream', 'model': chat_request.model})
# 如果已经检查了所有密钥且没有找到有效密钥,则重置密钥栈
if all_keys_checked and not valid_keys:
log('warning', "所有API密钥已达到每日调用限制,重置密钥栈",
extra={'request_type': 'stream', 'model': chat_request.model})
key_manager._reset_key_stack()
# 重置后重新获取一个密钥
api_key = await key_manager.get_available_key()
if api_key:
valid_keys = [api_key]
# 如果没有获取到任何有效密钥,跳出循环
if not valid_keys:
break
# 更新当前尝试次数
current_try_num += 1
# 获取密钥
api_key = valid_keys[0]
success = False
try:
client = GeminiClient(api_key)
# 获取流式响应
stream_generator = client.stream_chat(
chat_request,
contents,
safety_settings_g2 if 'gemini-2.5' in chat_request.model else safety_settings,
system_instruction
)
token=0
# 处理流式响应
async for chunk in stream_generator:
if chunk :
if chunk.total_token_count:
token = int(chunk.total_token_count)
success = True
if is_gemini:
json_payload = json.dumps(chunk.data, ensure_ascii=False)
data = f"data: {json_payload}\n\n"
else:
data = openAI_from_Gemini(chunk,stream=True)
# log('info', f"流式响应发送数据: {data}")
yield data
else:
log('warning', f"流式请求返回空响应,空响应计数: {empty_response_count}/{settings.MAX_EMPTY_RESPONSES}",
extra={'key': api_key[:8], 'request_type': 'stream', 'model': chat_request.model})
# 增加空响应计数
empty_response_count += 1
await update_api_call_stats(
settings.api_call_stats,
endpoint=api_key,
model=chat_request.model,
token=token
)
break
except Exception as e:
error_detail = handle_gemini_error(e, api_key)
log('error', f"流式响应: API密钥 {api_key[:8]}... 请求失败: {error_detail}",
extra={'key': api_key[:8], 'request_type': 'stream', 'model': chat_request.model})
finally:
# 如果成功获取相应,更新API调用统计
if success:
await update_api_call_stats(
settings.api_call_stats,
endpoint=api_key,
model=chat_request.model,
token=token
)
return
# 如果空响应次数达到限制,跳出循环
if empty_response_count >= settings.MAX_EMPTY_RESPONSES:
log('warning', f"空响应次数达到限制 ({empty_response_count}/{settings.MAX_EMPTY_RESPONSES}),停止轮询",
extra={'request_type': 'stream', 'model': chat_request.model})
if is_gemini:
yield gemini_from_text(content="空响应次数达到上限\n请修改输入提示词",finish_reason="STOP",stream=True)
else:
yield openAI_from_text(model=chat_request.model,content="空响应次数达到上限\n请修改输入提示词",finish_reason="stop",stream=True)
return
# 所有API密钥都尝试失败的处理
log('error', "所有 API 密钥均请求失败,请稍后重试",
extra={'key': 'ALL', 'request_type': 'stream', 'model': chat_request.model})
if is_gemini:
yield gemini_from_text(content="所有API密钥均请求失败\n具体错误请查看轮询日志",finish_reason="STOP",stream=True)
else:
yield openAI_from_text(model=chat_request.model,content="所有API密钥均请求失败\n具体错误请查看轮询日志",finish_reason="stop")
# 处理假流式模式
async def handle_fake_streaming(api_key,chat_request, contents, response_cache_manager,system_instruction, safety_settings, safety_settings_g2, cache_key):
# 使用非流式请求内容
gemini_client = GeminiClient(api_key)
gemini_task = asyncio.create_task(
gemini_client.complete_chat(
chat_request,
contents,
safety_settings_g2 if 'gemini-2.5' in chat_request.model else safety_settings,
system_instruction
)
)
gemini_task = asyncio.shield(gemini_task)
try:
# 获取响应内容
response_content = await gemini_task
response_content.set_model(chat_request.model)
log('info', f"假流式成功获取响应,进行缓存",
extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model})
# 更新API调用统计
await update_api_call_stats(settings.api_call_stats, endpoint=api_key, model=chat_request.model,token=response_content.total_token_count)
# 检查响应内容是否为空
if not response_content or (not response_content.text and not response_content.function_call):
log('warning', f"请求返回空响应",
extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model})
return "empty"
# 缓存
await response_cache_manager.store(cache_key, response_content)
return "success"
except Exception as e:
handle_gemini_error(e, api_key)
# log('error', f"假流式模式: API密钥 {api_key[:8]}... 请求失败: {error_detail}",
# extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model})
return "error"
# 流式请求处理函数
async def process_stream_request(
chat_request: ChatCompletionRequest,
key_manager,
response_cache_manager,
safety_settings,
safety_settings_g2,
cache_key: str
) -> StreamingResponse:
"""处理流式API请求"""
return StreamingResponse(stream_response_generator(
chat_request,
key_manager,
response_cache_manager,
safety_settings,
safety_settings_g2,
cache_key
), media_type="text/event-stream")