File size: 16,937 Bytes
d0dd276
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
import asyncio
import json
from fastapi.responses import StreamingResponse
from app.models.schemas import ChatCompletionRequest
from app.services import GeminiClient
from app.utils import handle_gemini_error, update_api_call_stats,log,openAI_from_text
from app.utils.response import openAI_from_Gemini,gemini_from_text
from app.utils.stats import get_api_key_usage
import app.config.settings as settings

async def stream_response_generator(
    chat_request,
    key_manager,
    response_cache_manager,
    safety_settings,
    safety_settings_g2,
    cache_key: str
):
    format_type = getattr(chat_request, 'format_type', None)
    if format_type and (format_type == "gemini"):
        is_gemini = True
        contents, system_instruction = None,None
    else:
        is_gemini = False
        # 转换消息格式
        contents, system_instruction = GeminiClient.convert_messages(GeminiClient, chat_request.messages,model=chat_request.model)
    # 设置初始并发数
    current_concurrent = settings.CONCURRENT_REQUESTS
    max_retry_num = settings.MAX_RETRY_NUM
    
    # 当前请求次数
    current_try_num = 0
    
    # 空响应计数
    empty_response_count = 0
    
    # (假流式) 尝试使用不同API密钥,直到达到最大重试次数或空响应限制
    while (settings.FAKE_STREAMING and (current_try_num < max_retry_num) and (empty_response_count < settings.MAX_EMPTY_RESPONSES)):
        # 获取当前批次的密钥数量
        batch_num = min(max_retry_num - current_try_num, current_concurrent)
        
        # 获取当前批次的密钥
        valid_keys = []
        checked_keys = set()  # 用于记录已检查过的密钥
        all_keys_checked = False  # 标记是否已检查所有密钥
        
        # 尝试获取足够数量的有效密钥
        while len(valid_keys) < batch_num:
            api_key = await key_manager.get_available_key()
            if not api_key:
                break
                
            # 如果这个密钥已经检查过,说明已经检查了所有密钥
            if api_key in checked_keys:
                all_keys_checked = True
                break
            
            checked_keys.add(api_key)
            # 获取API密钥的调用次数
            usage = await get_api_key_usage(settings.api_call_stats, api_key)
            # 如果调用次数小于限制,则添加到有效密钥列表
            if usage < settings.API_KEY_DAILY_LIMIT:
                valid_keys.append(api_key)
            else:
                log('warning', f"API密钥 {api_key[:8]}... 已达到每日调用限制 ({usage}/{settings.API_KEY_DAILY_LIMIT})",
                    extra={'key': api_key[:8], 'request_type': 'stream', 'model': chat_request.model})
        
        # 如果已经检查了所有密钥且没有找到有效密钥,则重置密钥栈
        if all_keys_checked and not valid_keys:
            log('warning', "所有API密钥已达到每日调用限制,重置密钥栈",
                extra={'request_type': 'stream', 'model': chat_request.model})
            key_manager._reset_key_stack()
            # 重置后重新获取一个密钥
            api_key = await key_manager.get_available_key()
            if api_key:
                valid_keys = [api_key]
        
        # 如果没有获取到任何有效密钥,跳出循环
        if not valid_keys:
            break
            
        # 更新当前尝试次数
        current_try_num += len(valid_keys)
        
        # 创建并发任务
        tasks = []
        tasks_map = {}
        for api_key in valid_keys:
            # 假流式模式的处理逻辑
            log('info', f"假流式请求开始,使用密钥: {api_key[:8]}...",
                extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model})
            
            task = asyncio.create_task(
                handle_fake_streaming(
                    api_key, 
                    chat_request, 
                    contents, 
                    response_cache_manager,
                    system_instruction, 
                    safety_settings, 
                    safety_settings_g2,
                    cache_key
                )
            )
            
            tasks.append((api_key, task))
            tasks_map[task] = api_key
        
        # 等待所有任务完成或找到成功响应
        success = False
        while tasks and not success:
            # 等待任务完成
            done, pending = await asyncio.wait(
                [task for _, task in tasks],
                timeout=settings.FAKE_STREAMING_INTERVAL,
                return_when=asyncio.FIRST_COMPLETED
            )
            
            # 如果没有任务完成,发送保活消息
            if not done :
                if is_gemini:
                    yield gemini_from_text(content='',stream=True)
                else:
                    yield openAI_from_text(model=chat_request.model,content='',stream=True)
                continue
            
            # 检查已完成的任务是否成功
            for task in done:
                api_key = tasks_map[task]
                if not task.cancelled():
                    try:
                        status = task.result()
                        # 如果有成功响应内容
                        if status == "success" :  
                            success = True
                            log('info', f"假流式请求成功", 
                                extra={'key': api_key[:8],'request_type': "fake-stream", 'model': chat_request.model})
                            cached_response, cache_hit = await response_cache_manager.get_and_remove(cache_key)
                            if cache_hit and cached_response: 
                                if is_gemini :
                                    json_payload = json.dumps(cached_response.data, ensure_ascii=False)
                                    data_to_yield = f"data: {json_payload}\n\n"
                                    yield data_to_yield
                                else:
                                    yield openAI_from_Gemini(cached_response,stream=True)
                            else:
                                success = False
                            break
                        elif status == "empty":
                            # 增加空响应计数
                            empty_response_count += 1
                            log('warning', f"空响应计数: {empty_response_count}/{settings.MAX_EMPTY_RESPONSES}",
                                extra={'key': api_key[:8], 'request_type': 'stream', 'model': chat_request.model})
                        
                    except Exception as e:
                        error_detail = handle_gemini_error(e, api_key)
                        log('error', f"请求失败: {error_detail}",
                            extra={'key': api_key[:8], 'request_type': 'stream', 'model': chat_request.model})

            # 如果找到成功的响应,跳出循环
            if success:
                return
            
            # 如果空响应次数达到限制,跳出循环
            if empty_response_count >= settings.MAX_EMPTY_RESPONSES:
                log('warning', f"空响应次数达到限制 ({empty_response_count}/{settings.MAX_EMPTY_RESPONSES}),停止轮询",
                    extra={'request_type': 'fake-stream', 'model': chat_request.model})
                if is_gemini :
                    yield gemini_from_text(content="空响应次数达到上限\n请修改输入提示词",finish_reason="STOP",stream=True)
                else:
                    yield openAI_from_text(model=chat_request.model,content="空响应次数达到上限\n请修改输入提示词",finish_reason="stop",stream=True)
                
                return
            
            # 更新任务列表,移除已完成的任务
            tasks = [(k, t) for k, t in tasks if not t.done()]
        
        # 如果所有请求都失败,增加并发数并继续尝试
        if not success and valid_keys:
            # 增加并发数,但不超过最大并发数
            current_concurrent = min(current_concurrent + settings.INCREASE_CONCURRENT_ON_FAILURE, settings.MAX_CONCURRENT_REQUESTS)
            log('info', f"所有假流式请求失败,增加并发数至: {current_concurrent}", 
                extra={'request_type': 'stream', 'model': chat_request.model})

    # (真流式) 尝试使用不同API密钥,直到达到最大重试次数或空响应限制
    while (not settings.FAKE_STREAMING and (current_try_num < max_retry_num) and (empty_response_count < settings.MAX_EMPTY_RESPONSES)):
        # 获取当前批次的密钥
        valid_keys = []
        checked_keys = set()  # 用于记录已检查过的密钥
        all_keys_checked = False  # 标记是否已检查所有密钥
        
        # 尝试获取一个有效密钥
        while len(valid_keys) < 1:
            api_key = await key_manager.get_available_key()
            if not api_key:
                break
                
            # 如果这个密钥已经检查过,说明已经检查了所有密钥
            if api_key in checked_keys:
                all_keys_checked = True
                break
            
            checked_keys.add(api_key)
            # 获取API密钥的调用次数
            usage = await get_api_key_usage(settings.api_call_stats, api_key)
            # 如果调用次数小于限制,则添加到有效密钥列表
            if usage < settings.API_KEY_DAILY_LIMIT:
                valid_keys.append(api_key)
            else:
                log('warning', f"API密钥 {api_key[:8]}... 已达到每日调用限制 ({usage}/{settings.API_KEY_DAILY_LIMIT})",
                    extra={'key': api_key[:8], 'request_type': 'stream', 'model': chat_request.model})
        
        # 如果已经检查了所有密钥且没有找到有效密钥,则重置密钥栈
        if all_keys_checked and not valid_keys:
            log('warning', "所有API密钥已达到每日调用限制,重置密钥栈",
                extra={'request_type': 'stream', 'model': chat_request.model})
            key_manager._reset_key_stack()
            # 重置后重新获取一个密钥
            api_key = await key_manager.get_available_key()
            if api_key:
                valid_keys = [api_key]
        
        # 如果没有获取到任何有效密钥,跳出循环
        if not valid_keys:
            break
            
        # 更新当前尝试次数
        current_try_num += 1
        
        # 获取密钥
        api_key = valid_keys[0]
        
        success = False
        try:            
            client = GeminiClient(api_key)
            
            # 获取流式响应
            stream_generator = client.stream_chat(
                chat_request,
                contents,
                safety_settings_g2 if 'gemini-2.5' in chat_request.model else safety_settings,
                system_instruction
            )
            token=0
            # 处理流式响应
            async for chunk in stream_generator:
                if chunk :
                    
                    if chunk.total_token_count:
                        token = int(chunk.total_token_count)
                    success = True
                    
                    if is_gemini:
                        json_payload = json.dumps(chunk.data, ensure_ascii=False)
                        data = f"data: {json_payload}\n\n"
                    else:
                        data = openAI_from_Gemini(chunk,stream=True)
                    
                    # log('info', f"流式响应发送数据: {data}")
                    yield data
                    
                else:
                    log('warning', f"流式请求返回空响应,空响应计数: {empty_response_count}/{settings.MAX_EMPTY_RESPONSES}",
                        extra={'key': api_key[:8], 'request_type': 'stream', 'model': chat_request.model})
                    # 增加空响应计数
                    empty_response_count += 1
                    await update_api_call_stats(
                        settings.api_call_stats, 
                        endpoint=api_key, 
                        model=chat_request.model,
                        token=token
                    )
                    break
        
        except Exception as e:
            error_detail = handle_gemini_error(e, api_key)
            log('error', f"流式响应: API密钥 {api_key[:8]}... 请求失败: {error_detail}",
                extra={'key': api_key[:8], 'request_type': 'stream', 'model': chat_request.model})
        finally: 
            # 如果成功获取相应,更新API调用统计
            if success:
                await update_api_call_stats(
                    settings.api_call_stats, 
                    endpoint=api_key, 
                    model=chat_request.model,
                    token=token
                )
                return
            
            # 如果空响应次数达到限制,跳出循环
            if empty_response_count >= settings.MAX_EMPTY_RESPONSES:
                
                log('warning', f"空响应次数达到限制 ({empty_response_count}/{settings.MAX_EMPTY_RESPONSES}),停止轮询",
                    extra={'request_type': 'stream', 'model': chat_request.model})
                
                if is_gemini:
                    yield gemini_from_text(content="空响应次数达到上限\n请修改输入提示词",finish_reason="STOP",stream=True)
                else:
                    yield openAI_from_text(model=chat_request.model,content="空响应次数达到上限\n请修改输入提示词",finish_reason="stop",stream=True)
                
                return
    
    # 所有API密钥都尝试失败的处理
    log('error', "所有 API 密钥均请求失败,请稍后重试",
        extra={'key': 'ALL', 'request_type': 'stream', 'model': chat_request.model})
    
    if is_gemini:
        yield gemini_from_text(content="所有API密钥均请求失败\n具体错误请查看轮询日志",finish_reason="STOP",stream=True)
    else:
        yield openAI_from_text(model=chat_request.model,content="所有API密钥均请求失败\n具体错误请查看轮询日志",finish_reason="stop")

# 处理假流式模式
async def handle_fake_streaming(api_key,chat_request, contents, response_cache_manager,system_instruction, safety_settings, safety_settings_g2, cache_key):
    
    # 使用非流式请求内容
    gemini_client = GeminiClient(api_key)
    
    gemini_task = asyncio.create_task(
        gemini_client.complete_chat( 
            chat_request,
            contents,
            safety_settings_g2 if 'gemini-2.5' in chat_request.model else safety_settings,
            system_instruction
        )
    )
    gemini_task = asyncio.shield(gemini_task)
    
    try:
        # 获取响应内容
        response_content = await gemini_task
        response_content.set_model(chat_request.model)
        log('info', f"假流式成功获取响应,进行缓存",
            extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model})

        # 更新API调用统计
        await update_api_call_stats(settings.api_call_stats, endpoint=api_key, model=chat_request.model,token=response_content.total_token_count)
        
        # 检查响应内容是否为空
        if not response_content or (not response_content.text and not response_content.function_call):
            log('warning', f"请求返回空响应",
                extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model})        
            return "empty"

        # 缓存
        await response_cache_manager.store(cache_key, response_content)
        return "success"
    
    except Exception as e:
        handle_gemini_error(e, api_key)
        # log('error', f"假流式模式: API密钥 {api_key[:8]}... 请求失败: {error_detail}",
        #     extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model})
        return "error"
        


# 流式请求处理函数
async def process_stream_request(
    chat_request: ChatCompletionRequest,
    key_manager,
    response_cache_manager,
    safety_settings,
    safety_settings_g2,
    cache_key: str
) -> StreamingResponse:
    """处理流式API请求"""
    
    return StreamingResponse(stream_response_generator(
                chat_request,
                key_manager,
                response_cache_manager,
                safety_settings,
                safety_settings_g2,
                cache_key
            ), media_type="text/event-stream")