File size: 28,367 Bytes
ed8e0ee
 
 
713f2f6
 
 
 
2cae477
713f2f6
ed8e0ee
713f2f6
 
 
 
 
 
 
 
 
 
 
 
ed8e0ee
 
 
2cae477
 
 
 
 
713f2f6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed8e0ee
 
 
 
 
 
 
713f2f6
2cae477
 
713f2f6
2cae477
 
 
 
713f2f6
2cae477
 
 
 
 
713f2f6
2cae477
 
 
 
 
713f2f6
2cae477
 
 
 
 
713f2f6
ed8e0ee
 
 
 
713f2f6
ed8e0ee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
713f2f6
ed8e0ee
 
 
 
 
 
 
713f2f6
ed8e0ee
 
 
 
713f2f6
ed8e0ee
 
 
 
 
 
 
 
 
713f2f6
ed8e0ee
713f2f6
ed8e0ee
 
713f2f6
ed8e0ee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
713f2f6
ed8e0ee
 
 
 
 
 
 
 
 
713f2f6
ed8e0ee
 
 
 
 
713f2f6
ed8e0ee
 
 
 
 
 
 
 
 
 
713f2f6
ed8e0ee
 
 
 
 
 
 
 
 
 
713f2f6
ed8e0ee
 
 
 
713f2f6
ed8e0ee
 
 
 
 
 
 
 
 
713f2f6
ed8e0ee
 
713f2f6
ed8e0ee
 
713f2f6
ed8e0ee
 
 
713f2f6
 
ed8e0ee
 
713f2f6
 
ed8e0ee
 
 
 
 
 
713f2f6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed8e0ee
 
 
713f2f6
 
ed8e0ee
 
713f2f6
ed8e0ee
 
713f2f6
ed8e0ee
 
 
 
713f2f6
ed8e0ee
 
 
 
 
713f2f6
ed8e0ee
713f2f6
 
 
 
ed8e0ee
 
 
 
713f2f6
ed8e0ee
 
 
713f2f6
ed8e0ee
 
713f2f6
 
 
 
ed8e0ee
 
 
 
 
713f2f6
 
 
 
 
 
 
 
 
 
 
 
ed8e0ee
 
713f2f6
ed8e0ee
 
 
 
713f2f6
 
 
ed8e0ee
 
 
713f2f6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed8e0ee
713f2f6
 
 
 
 
 
 
 
 
 
ed8e0ee
 
713f2f6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed8e0ee
713f2f6
ed8e0ee
713f2f6
ed8e0ee
713f2f6
ed8e0ee
713f2f6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed8e0ee
 
 
 
713f2f6
 
ed8e0ee
 
713f2f6
 
 
 
 
ed8e0ee
 
 
 
713f2f6
2cae477
713f2f6
 
 
 
 
 
 
 
 
 
 
 
 
 
ed8e0ee
 
713f2f6
ed8e0ee
713f2f6
2cae477
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
import express from 'express';
import { fal } from '@fal-ai/client';

// --- Key Management Setup ---
// Read comma-separated keys from the SINGLE environment variable FAL_KEY
const FAL_KEY_STRING = process.env.FAL_KEY;
const API_KEY = process.env.API_KEY; // Custom API Key for proxy auth

if (!FAL_KEY_STRING) {
    console.error("Error: FAL_KEY environment variable is not set.");
    console.error("Ensure FAL_KEY contains a comma-separated list of your Fal AI keys.");
    process.exit(1);
}

// Parse the comma-separated keys from FAL_KEY_STRING
const falKeys = FAL_KEY_STRING.split(',')
    .map(key => key.trim()) // Remove leading/trailing whitespace
    .filter(key => key.length > 0); // Remove any empty strings resulting from extra commas

if (falKeys.length === 0) {
    console.error("Error: No valid FAL keys found in the FAL_KEY environment variable after parsing.");
    console.error("Ensure FAL_KEY is a comma-separated list, e.g., 'key1,key2,key3'.");
    process.exit(1);
}

if (!API_KEY) {
    console.error("Error: API_KEY environment variable is not set.");
    process.exit(1);
}

let currentKeyIndex = 0;
const invalidKeys = new Set(); // Keep track of keys that failed

console.log(`Loaded ${falKeys.length} Fal AI Key(s) from the FAL_KEY environment variable.`);

// Function to get the next valid key in a round-robin fashion
function getNextValidKey() {
    if (invalidKeys.size >= falKeys.length) {
        console.error("All Fal AI keys are marked as invalid.");
        return null; // No valid keys left
    }

    const initialIndex = currentKeyIndex;
    let attempts = 0;
    while (attempts < falKeys.length) {
        const keyIndex = currentKeyIndex % falKeys.length;
        const key = falKeys[keyIndex];

        // Move to the next index for the *next* call
        currentKeyIndex = (keyIndex + 1) % falKeys.length;

        if (!invalidKeys.has(key)) {
            // Found a valid key
            console.log(`Using Fal Key index: ${keyIndex} (from FAL_KEY list)`);
            return { key, index: keyIndex };
        }

        attempts++;
        // Continue loop to check the next key
    }

    // Should not be reached if invalidKeys.size check is correct, but as a safeguard
    console.error("Could not find a valid Fal AI key after checking all.");
    return null;
}

// Function to check if an error is likely related to a bad key
// NOTE: This is a heuristic. You might need to adjust based on actual errors from Fal AI.
function isKeyRelatedError(error) {
    const message = error?.message?.toLowerCase() || '';
    const status = error?.status; // Check if the error object has a status code

    // Check for specific HTTP status codes indicative of auth/permission issues
    if (status === 401 || status === 403) {
        console.warn(`Detected potential key-related error (HTTP Status: ${status}).`);
        return true;
    }
    // Check for common error message patterns
    if (message.includes('invalid api key') ||
        message.includes('authentication failed') ||
        message.includes('permission denied') ||
        message.includes('quota exceeded') || // Include quota errors as key-related for rotation
        message.includes('forbidden') ||
        message.includes('unauthorized')) { // Add 'unauthorized'
        console.warn(`Detected potential key-related error (message: ${message})`);
        return true;
    }
    // Add more specific checks based on observed Fal AI errors if needed
    return false;
}
// --- End Key Management Setup ---

const app = express();
app.use(express.json({ limit: '50mb' }));
app.use(express.urlencoded({ extended: true, limit: '50mb' }));

const PORT = process.env.PORT || 3000;

// API Key 鉴权中间件 (unchanged)
const apiKeyAuth = (req, res, next) => {
    const authHeader = req.headers['authorization'];

    if (!authHeader) {
        console.warn('Unauthorized: No Authorization header provided');
        return res.status(401).json({ error: 'Unauthorized: No API Key provided' });
    }

    const authParts = authHeader.split(' ');
    if (authParts.length !== 2 || authParts[0].toLowerCase() !== 'bearer') {
        console.warn('Unauthorized: Invalid Authorization header format');
        return res.status(401).json({ error: 'Unauthorized: Invalid Authorization header format' });
    }

    const providedKey = authParts[1];
    if (providedKey !== API_KEY) {
        console.warn('Unauthorized: Invalid API Key');
        return res.status(401).json({ error: 'Unauthorized: Invalid API Key' });
    }

    next();
};

app.use(['/v1/models', '/v1/chat/completions'], apiKeyAuth);

// === 全局定义限制 === (unchanged)
const PROMPT_LIMIT = 4800;
const SYSTEM_PROMPT_LIMIT = 4800;
// === 限制定义结束 ===

// 定义 fal-ai/any-llm 支持的模型列表 (unchanged)
const FAL_SUPPORTED_MODELS = [
    "anthropic/claude-3.7-sonnet",
    "anthropic/claude-3.5-sonnet",
    "anthropic/claude-3-5-haiku",
    "anthropic/claude-3-haiku",
    "google/gemini-pro-1.5",
    "google/gemini-flash-1.5",
    "google/gemini-flash-1.5-8b",
    "google/gemini-2.0-flash-001",
    "meta-llama/llama-3.2-1b-instruct",
    "meta-llama/llama-3.2-3b-instruct",
    "meta-llama/llama-3.1-8b-instruct",
    "meta-llama/llama-3.1-70b-instruct",
    "openai/gpt-4o-mini",
    "openai/gpt-4o",
    "deepseek/deepseek-r1",
    "meta-llama/llama-4-maverick",
    "meta-llama/llama-4-scout"
];

// Helper function getOwner (unchanged)
const getOwner = (modelId) => {
    if (modelId && modelId.includes('/')) {
        return modelId.split('/')[0];
    }
    return 'fal-ai';
}

// GET /v1/models endpoint (unchanged)
app.get('/v1/models', (req, res) => {
    console.log("Received request for GET /v1/models");
    try {
        const modelsData = FAL_SUPPORTED_MODELS.map(modelId => ({
            id: modelId, object: "model", created: Math.floor(Date.now() / 1000), owned_by: getOwner(modelId) // Use current time for created
        }));
        res.json({ object: "list", data: modelsData });
        console.log("Successfully returned model list.");
    } catch (error) {
        console.error("Error processing GET /v1/models:", error);
        res.status(500).json({ error: "Failed to retrieve model list." });
    }
});

// convertMessagesToFalPrompt 函数 (unchanged)
function convertMessagesToFalPrompt(messages) {
    // ... (keep existing conversion logic)
    let fixed_system_prompt_content = "";
    const conversation_message_blocks = [];
    // console.log(`Original messages count: ${messages.length}`); // Less verbose logging

    // 1. 分离 System 消息,格式化 User/Assistant 消息
    for (const message of messages) {
        let content = (message.content === null || message.content === undefined) ? "" : String(message.content);
        switch (message.role) {
            case 'system':
                fixed_system_prompt_content += `System: ${content}\n\n`;
                break;
            case 'user':
                conversation_message_blocks.push(`Human: ${content}\n\n`);
                break;
            case 'assistant':
                conversation_message_blocks.push(`Assistant: ${content}\n\n`);
                break;
            default:
                console.warn(`Unsupported role: ${message.role}`);
                continue;
        }
    }

    // 2. 截断合并后的 system 消息(如果超长)
    if (fixed_system_prompt_content.length > SYSTEM_PROMPT_LIMIT) {
        const originalLength = fixed_system_prompt_content.length;
        fixed_system_prompt_content = fixed_system_prompt_content.substring(0, SYSTEM_PROMPT_LIMIT);
        console.warn(`Combined system messages truncated from ${originalLength} to ${SYSTEM_PROMPT_LIMIT}`);
    }
    fixed_system_prompt_content = fixed_system_prompt_content.trim();

    // 3. 计算 system_prompt 中留给对话历史的剩余空间
    let space_occupied_by_fixed_system = 0;
    if (fixed_system_prompt_content.length > 0) {
         space_occupied_by_fixed_system = fixed_system_prompt_content.length + 4; // 预留 \n\n...\n\n 的长度
    }
     const remaining_system_limit = Math.max(0, SYSTEM_PROMPT_LIMIT - space_occupied_by_fixed_system);
    // console.log(`Trimmed fixed system prompt length: ${fixed_system_prompt_content.length}. Approx remaining system history limit: ${remaining_system_limit}`);

    // 4. 反向填充 User/Assistant 对话历史
    const prompt_history_blocks = [];
    const system_prompt_history_blocks = [];
    let current_prompt_length = 0;
    let current_system_history_length = 0;
    let promptFull = false;
    let systemHistoryFull = (remaining_system_limit <= 0);

    // console.log(`Processing ${conversation_message_blocks.length} user/assistant messages for recency filling.`);
    for (let i = conversation_message_blocks.length - 1; i >= 0; i--) {
        const message_block = conversation_message_blocks[i];
        const block_length = message_block.length;

        if (promptFull && systemHistoryFull) {
            // console.log(`Both prompt and system history slots full. Omitting older messages from index ${i}.`);
            break;
        }

        if (!promptFull) {
            if (current_prompt_length + block_length <= PROMPT_LIMIT) {
                prompt_history_blocks.unshift(message_block);
                current_prompt_length += block_length;
                continue;
            } else {
                promptFull = true;
                // console.log(`Prompt limit (${PROMPT_LIMIT}) reached. Trying system history slot.`);
            }
        }

        if (!systemHistoryFull) {
            if (current_system_history_length + block_length <= remaining_system_limit) {
                 system_prompt_history_blocks.unshift(message_block);
                 current_system_history_length += block_length;
                 continue;
            } else {
                 systemHistoryFull = true;
                 // console.log(`System history limit (${remaining_system_limit}) reached.`);
            }
        }
    }

    // 5. 组合最终的 prompt 和 system_prompt
    const system_prompt_history_content = system_prompt_history_blocks.join('').trim();
    const final_prompt = prompt_history_blocks.join('').trim();
    const SEPARATOR = "\n\n-------下面是比较早之前的对话内容-----\n\n";
    let final_system_prompt = "";
    const hasFixedSystem = fixed_system_prompt_content.length > 0;
    const hasSystemHistory = system_prompt_history_content.length > 0;

    if (hasFixedSystem && hasSystemHistory) {
        final_system_prompt = fixed_system_prompt_content + SEPARATOR + system_prompt_history_content;
        // console.log("Combining fixed system prompt and history with separator.");
    } else if (hasFixedSystem) {
        final_system_prompt = fixed_system_prompt_content;
        // console.log("Using only fixed system prompt.");
    } else if (hasSystemHistory) {
        final_system_prompt = system_prompt_history_content;
        // console.log("Using only history in system prompt slot.");
    }

    const result = {
        system_prompt: final_system_prompt,
        prompt: final_prompt
    };

    // console.log(`Final system_prompt length (Sys+Separator+Hist): ${result.system_prompt.length}`);
    // console.log(`Final prompt length (Hist): ${result.prompt.length}`);

    return result;
}
// === convertMessagesToFalPrompt 函数结束 ===


// --- Helper function to make Fal AI request with retries ---
async function makeFalRequestWithRetry(falInput, stream = false) {
    let attempts = 0;
    const maxAttempts = falKeys.length; // Try each key at most once per request
    const attemptedKeysInThisRequest = new Set(); // Track keys tried for *this* specific request

    while (attempts < maxAttempts) {
        const keyInfo = getNextValidKey();

        if (!keyInfo) {
            // This happens if all keys are currently in the invalidKeys set
            throw new Error("No valid Fal AI keys available (all marked as invalid).");
        }

        // Avoid retrying the *exact same key* within the *same request attempt cycle*
        // This guards against potential infinite loops if getNextValidKey had issues
        if (attemptedKeysInThisRequest.has(keyInfo.key)) {
             console.warn(`Key at index ${keyInfo.index} already attempted for this request cycle. Skipping.`);
             // Don't increment attempts here, as we didn't actually *use* the key.
             // Let the loop continue to find the next *different* valid key.
             // If all keys are invalid, the check at the start of the loop handles it.
             continue;
        }
        attemptedKeysInThisRequest.add(keyInfo.key);
        attempts++; // Count this as a distinct attempt with a key

        try {
            console.log(`Attempt ${attempts}/${maxAttempts}: Trying Fal Key index ${keyInfo.index}...`);

            // *** CRITICAL: Reconfigure fal client with the selected key ***
            console.warn("Concurrency Warning: Reconfiguring global fal client. Ensure sufficient instance isolation if under high load.");
            fal.config({ credentials: keyInfo.key });

            if (stream) {
                // Return the stream directly for the caller to handle
                const falStream = await fal.stream("fal-ai/any-llm", { input: falInput });
                console.log(`Successfully initiated stream with key index ${keyInfo.index}.`);
                return falStream; // Success, let the caller handle iteration
            } else {
                // For non-stream, wait for the result here
                console.log(`Executing non-stream request with key index ${keyInfo.index}...`);
                const result = await fal.subscribe("fal-ai/any-llm", { input: falInput, logs: true });
                console.log(`Successfully received non-stream result with key index ${keyInfo.index}.`);

                // Check for errors *within* the successful response structure
                 if (result && result.error) {
                     console.error(`Fal-ai returned an error in non-stream result (Key Index ${keyInfo.index}):`, result.error);
                     // Treat this like a general Fal error, not necessarily a key error unless message indicates it
                     // Convert it to a standard Error object to be caught below
                     throw new Error(`Fal-ai error in result: ${JSON.stringify(result.error)}`);
                 }
                return result; // Success
            }
        } catch (error) {
            console.error(`Error using Fal Key index ${keyInfo.index}:`, error.message || error);

            if (isKeyRelatedError(error)) {
                console.warn(`Marking Fal Key index ${keyInfo.index} as invalid due to error.`);
                invalidKeys.add(keyInfo.key);
                // Continue to the next iteration to try another key
            } else {
                // Not identified as a key-related error (e.g., network issue, bad input, internal Fal error)
                // Fail the request immediately, don't retry with other keys for this type of error.
                console.error("Error does not appear to be key-related. Failing request without further retries.");
                throw error; // Re-throw the original error to be caught by the main handler
            }
        }
    }

    // If the loop finishes, it means all keys were tried and marked invalid *within this request cycle*
    throw new Error(`Request failed after trying ${attempts} unique Fal key(s). All failed with key-related errors or were already marked invalid.`);
}


// POST /v1/chat/completions endpoint (Modified to use retry logic)
app.post('/v1/chat/completions', async (req, res) => {
    const { model, messages, stream = false, reasoning = false, ...restOpenAIParams } = req.body;

    // Basic logging for request entry
    console.log(`--> POST /v1/chat/completions | Model: ${model} | Stream: ${stream}`);

    if (!FAL_SUPPORTED_MODELS.includes(model)) {
         console.warn(`Warning: Requested model '${model}' is not in the explicitly supported list. Proxy will still attempt.`);
    }
    if (!model || !messages || !Array.isArray(messages) || messages.length === 0) {
        console.error("Invalid request: Missing 'model' or 'messages' array.");
        return res.status(400).json({ error: 'Missing or invalid parameters: model and messages array are required.' });
    }

    try {
        // --- Prepare Input ---
        const { prompt, system_prompt } = convertMessagesToFalPrompt(messages);
        const falInput = {
            model: model,
            prompt: prompt,
            ...(system_prompt && { system_prompt: system_prompt }),
            reasoning: !!reasoning, // Ensure boolean
        };
        // console.log("Fal Input:", JSON.stringify(falInput, null, 2)); // Verbose logging
        console.log("Attempting Fal request with key rotation/retry...");

        // --- Handle Stream vs Non-Stream ---
        if (stream) {
            res.setHeader('Content-Type', 'text/event-stream; charset=utf-8');
            res.setHeader('Cache-Control', 'no-cache');
            res.setHeader('Connection', 'keep-alive');
            res.setHeader('Access-Control-Allow-Origin', '*'); // Consider restricting in production
            res.flushHeaders();

            let previousOutput = '';
            let falStream;

            try {
                 // Initiate stream using the retry logic
                 falStream = await makeFalRequestWithRetry(falInput, true);

                 // Process the stream events
                for await (const event of falStream) {
                    const currentOutput = (event && typeof event.output === 'string') ? event.output : '';
                    const isPartial = (event && typeof event.partial === 'boolean') ? event.partial : true;
                    const errorInfo = (event && event.error) ? event.error : null;

                     if (errorInfo) {
                        // Log error from within the stream, but continue processing if possible
                        console.error("Error received *within* fal stream event:", errorInfo);
                        // Send an error chunk to the client (optional, depends on desired behavior)
                        const errorChunk = { id: `chatcmpl-${Date.now()}-error`, object: "chat.completion.chunk", created: Math.floor(Date.now() / 1000), model: model, choices: [{ index: 0, delta: {}, finish_reason: "error", message: { role: 'assistant', content: `Fal Stream Event Error: ${JSON.stringify(errorInfo)}` } }] };
                        // Safety check before writing
                        if (!res.writableEnded) {
                             res.write(`data: ${JSON.stringify(errorChunk)}\n\n`);
                        } else {
                             console.warn("Stream already ended when trying to write stream event error.");
                        }
                        // Decide whether to break or continue based on error severity if needed
                    }

                    // Calculate delta (same logic as before)
                    let deltaContent = '';
                    if (currentOutput.startsWith(previousOutput)) {
                        deltaContent = currentOutput.substring(previousOutput.length);
                    } else if (currentOutput.length > 0) {
                         // console.warn("Fal stream output mismatch. Sending full current output as delta."); // Less verbose
                         deltaContent = currentOutput;
                         previousOutput = ''; // Reset previous output on mismatch
                    }
                    previousOutput = currentOutput;

                    // Send OpenAI compatible chunk
                    if (deltaContent || !isPartial) { // Send even if delta is empty when finishing
                        const openAIChunk = {
                            id: `chatcmpl-${Date.now()}`, // Consider more unique ID if needed
                            object: "chat.completion.chunk",
                            created: Math.floor(Date.now() / 1000),
                            model: model, // Echo back the requested model
                            choices: [{
                                index: 0,
                                delta: { content: deltaContent },
                                finish_reason: isPartial === false ? "stop" : null
                             }]
                        };
                        // Safety check before writing
                        if (!res.writableEnded) {
                            res.write(`data: ${JSON.stringify(openAIChunk)}\n\n`);
                        } else {
                             console.warn("Stream already ended when trying to write data chunk.");
                        }
                    }
                 } // End for-await loop

                 // Send the final [DONE] marker
                 if (!res.writableEnded) {
                     res.write(`data: [DONE]\n\n`);
                     res.end();
                     console.log("<-- Stream finished successfully.");
                 } else {
                      console.log("<-- Stream finished, but connection was already ended.");
                 }

            } catch (streamError) {
                // Catches errors from makeFalRequestWithRetry OR the stream iteration itself
                console.error('Error during stream request processing:', streamError.message || streamError);
                 try {
                     if (!res.headersSent) {
                         // Error likely occurred in makeFalRequestWithRetry before stream started
                         res.status(502).json({ // 502 Bad Gateway might be appropriate
                            error: 'Failed to initiate Fal stream',
                            details: streamError.message || 'Underlying Fal request failed or timed out.'
                         });
                         console.log("<-- Stream initiation failed response sent.");
                     } else if (!res.writableEnded) {
                         // Stream started but failed during processing
                         const errorDetails = (streamError instanceof Error) ? streamError.message : JSON.stringify(streamError);
                         // Send error details in the stream if possible
                         res.write(`data: ${JSON.stringify({ error: { message: "Stream processing error after initiation", type: "proxy_error", details: errorDetails } })}\n\n`);
                         res.write(`data: [DONE]\n\n`); // Still send DONE after error for client handling
                         res.end();
                         console.log("<-- Stream error sent, stream ended.");
                     } else {
                        console.log("<-- Stream error occurred, but connection already ended.");
                     }
                 } catch (finalError) {
                    console.error('Error sending stream error message to client:', finalError);
                    // Ensure response is ended if possible
                    if (!res.writableEnded) { res.end(); }
                 }
            }

        } else {
            // --- Non-Stream ---
            try {
                // Get the result using the retry logic
                const result = await makeFalRequestWithRetry(falInput, false);
                // console.log("Received non-stream result via retry function:", JSON.stringify(result, null, 2)); // Verbose

                // Construct OpenAI compatible response
                const openAIResponse = {
                    id: `chatcmpl-${result.requestId || Date.now()}`,
                    object: "chat.completion",
                    created: Math.floor(Date.now() / 1000),
                    model: model, // Echo back requested model
                    choices: [{
                        index: 0,
                        message: {
                            role: "assistant",
                            content: result.output || "" // Ensure content is string
                         },
                        finish_reason: "stop" // Assume stop for non-stream success
                    }],
                    usage: { // Provide null usage as Fal doesn't return it
                        prompt_tokens: null,
                        completion_tokens: null,
                        total_tokens: null
                     },
                    system_fingerprint: null, // Fal doesn't provide this
                     ...(result.reasoning && { fal_reasoning: result.reasoning }), // Include Fal specific reasoning if present
                };

                res.json(openAIResponse);
                console.log("<-- Non-stream response sent successfully.");

            } catch (error) {
                 // Catches errors from makeFalRequestWithRetry (e.g., all keys failed or non-key error)
                console.error('Error during non-stream request processing:', error.message || error);
                if (!res.headersSent) {
                    const errorMessage = (error instanceof Error) ? error.message : JSON.stringify(error);
                    // Check if it was the "all keys failed" error
                    const finalMessage = errorMessage.includes("No valid Fal AI keys available") || errorMessage.includes("Request failed after trying")
                        ? `Fal request failed after trying all available keys: ${errorMessage}`
                        : `Internal Server Error processing Fal request: ${errorMessage}`;
                    // Use 502 Bad Gateway if it's likely an upstream (Fal) failure
                    res.status(502).json({ error: 'Fal Request Failed', details: finalMessage });
                    console.log("<-- Non-stream error response sent.");
                } else {
                    // Should be rare for non-stream, but handle just in case
                    console.error("Headers already sent for non-stream error? This is unexpected.");
                    if (!res.writableEnded) { res.end(); }
                }
            }
        }

    } catch (error) {
        // Catch errors from parameter validation or prompt conversion *before* calling Fal
        console.error('Unhandled error before initiating Fal request:', error.message || error);
        if (!res.headersSent) {
            const errorMessage = (error instanceof Error) ? error.message : JSON.stringify(error);
            res.status(500).json({ error: 'Internal Server Error in Proxy Setup', details: errorMessage });
            console.log("<-- Proxy setup error response sent.");
        } else {
             console.error("Headers already sent when catching setup error. Ending response.");
             if (!res.writableEnded) { res.end(); }
        }
    }
});

// 启动服务器 (Updated startup message)
app.listen(PORT, () => {
    console.log(`=====================================================================`);
    console.log(` Fal OpenAI Proxy Server (Multi-Key Rotation & Failover)`);
    console.log(`---------------------------------------------------------------------`);
    console.log(` Listening on port : ${PORT}`);
    console.log(` Reading Fal Keys from : FAL_KEY environment variable (comma-separated)`);
    console.log(` Loaded Keys Count   : ${falKeys.length}`);
    console.log(` API Key Auth        : ${API_KEY ? 'Enabled (using API_KEY env var)' : 'Disabled'}`);
    console.log(` Input Limits        : System Prompt=${SYSTEM_PROMPT_LIMIT}, Prompt=${PROMPT_LIMIT}`);
    console.log(` Concurrency Warning : Global Fal client reconfigured per request.`);
    console.log(`---------------------------------------------------------------------`);
    console.log(` Endpoints:`);
    console.log(`   POST http://localhost:${PORT}/v1/chat/completions`);
    console.log(`   GET  http://localhost:${PORT}/v1/models`);
    console.log(`=====================================================================`);
});

// 根路径响应 (Updated message)
app.get('/', (req, res) => {
    res.send(`Fal OpenAI Proxy (Multi-Key Rotation from FAL_KEY) is running. Loaded ${falKeys.length} key(s).`);
});