|
import asyncio |
|
import json |
|
import logging |
|
import os |
|
import random |
|
import string |
|
import time |
|
import uuid |
|
from http import HTTPStatus |
|
from typing import AsyncGenerator, Dict, List, Any |
|
|
|
import aiohttp |
|
from fastapi import FastAPI, Request, HTTPException |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from fastapi.responses import StreamingResponse |
|
from tenacity import ( |
|
retry, |
|
stop_after_attempt, |
|
wait_exponential, |
|
retry_if_exception_type, |
|
RetryError, |
|
) |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format="%(asctime)s %(levelname)s %(message)s", |
|
datefmt="%H:%M:%S", |
|
) |
|
logger = logging.getLogger("proxy") |
|
|
|
|
|
BLACKBOX_URL = "https://www.blackbox.ai/api/chat" |
|
CONNECTION_LIMIT = 200 |
|
CONNECTION_LIMIT_PER_HOST = 50 |
|
REQUEST_TIMEOUT = 300 |
|
WORKER_COUNT = 10 |
|
|
|
|
|
HEADERS = { |
|
"accept": "*/*", |
|
"content-type": "application/json", |
|
"origin": "https://www.blackbox.ai", |
|
"referer": "https://www.blackbox.ai/", |
|
"user-agent": ( |
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) " |
|
"AppleWebKit/537.36 (KHTML, like Gecko) " |
|
"Chrome/136.0.0.0 Safari/537.36" |
|
), |
|
} |
|
|
|
|
|
app = FastAPI() |
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
HTTP_SESSION: aiohttp.ClientSession = None |
|
REQUEST_QUEUE: asyncio.Queue = asyncio.Queue() |
|
WORKER_TASKS: List[asyncio.Task] = [] |
|
|
|
|
|
class RetryableStatusError(Exception): |
|
def __init__(self, status: int, text: str): |
|
self.status = status |
|
self.text = text |
|
super().__init__(f"status={status} body={text[:100]}...") |
|
|
|
RETRYABLE_STATUSES = {400, 429, 500, 502, 503, 504} |
|
|
|
|
|
_ascii = string.ascii_letters + string.digits |
|
def _rand(n, pool=_ascii): return "".join(random.choice(pool) for _ in range(n)) |
|
def random_email(): return _rand(12) + "@gmail.com" |
|
def random_id(): return _rand(21, string.digits) |
|
def random_customer_id(): return "cus_" + _rand(12) |
|
|
|
|
|
def build_payload(messages: List[Dict[str, Any]]) -> Dict[str, Any]: |
|
return { |
|
"messages": messages, |
|
"id": _rand(8), |
|
"agentMode": {}, |
|
"codeModelMode": True, |
|
"trendingAgentMode": {}, |
|
"isMicMode": False, |
|
"userSystemPrompt": None, |
|
"maxTokens": 1024, |
|
"playgroundTopP": None, |
|
"playgroundTemperature": None, |
|
"isChromeExt": False, |
|
"githubToken": "", |
|
"clickedAnswer2": False, |
|
"clickedAnswer3": False, |
|
"clickedForceWebSearch": False, |
|
"visitFromDelta": False, |
|
"isMemoryEnabled": False, |
|
"mobileClient": False, |
|
"userSelectedModel": None, |
|
"validated": str(uuid.uuid4()), |
|
"imageGenerationMode": False, |
|
"webSearchModePrompt": False, |
|
"deepSearchMode": True, |
|
"domains": None, |
|
"vscodeClient": False, |
|
"codeInterpreterMode": False, |
|
"customProfile": { |
|
"name": "", |
|
"occupation": "", |
|
"traits": [], |
|
"additionalInfo": "", |
|
"enableNewChats": False |
|
}, |
|
"session": { |
|
"user": { |
|
"name": "S.C gaming", |
|
"email": random_email(), |
|
"image": "https://lh3.googleusercontent.com/a/default", |
|
"id": random_id() |
|
}, |
|
"expires": "2025-12-31T23:59:59Z", |
|
"isNewUser": False |
|
}, |
|
"isPremium": True, |
|
"subscriptionCache": { |
|
"status": "PREMIUM", |
|
"customerId": random_customer_id(), |
|
"expiryTimestamp": int(time.time()) + 30 * 86400, |
|
"lastChecked": int(time.time()), |
|
"isTrialSubscription": False |
|
}, |
|
"beastMode": False, |
|
"reasoningMode": False, |
|
"designerMode": False |
|
} |
|
|
|
|
|
def log_retry(retry_state): |
|
rid = retry_state.kwargs.get("request_id", "unknown") |
|
attempt = retry_state.attempt_number |
|
err = retry_state.outcome.exception() |
|
logger.warning("[%s] retry %s/10 due to %s", rid, attempt, err) |
|
|
|
@retry( |
|
stop=stop_after_attempt(10), |
|
wait=wait_exponential(min=1, max=10), |
|
retry=retry_if_exception_type( |
|
(aiohttp.ClientConnectionError, aiohttp.ClientResponseError, asyncio.TimeoutError, RetryableStatusError) |
|
), |
|
before_sleep=log_retry |
|
) |
|
async def get_blackbox_response(*, data, stream: bool, request_id: str) -> AsyncGenerator[str, None]: |
|
assert HTTP_SESSION |
|
async with HTTP_SESSION.post(BLACKBOX_URL, json=data, headers=HEADERS, timeout=REQUEST_TIMEOUT) as resp: |
|
if resp.status != 200: |
|
body = await resp.text() |
|
logger.error("[%s] Upstream %s error: %s", request_id, BLACKBOX_URL, resp.status) |
|
if resp.status in RETRYABLE_STATUSES: |
|
raise RetryableStatusError(resp.status, body) |
|
raise HTTPException(status_code=502, detail=f"Upstream error {resp.status}") |
|
if stream: |
|
async for chunk in resp.content.iter_any(): |
|
if chunk: |
|
yield chunk.decode("utf-8", "ignore") |
|
else: |
|
yield await resp.text() |
|
|
|
|
|
async def _worker(): |
|
while True: |
|
try: |
|
data, request_id, out_q = await REQUEST_QUEUE.get() |
|
try: |
|
async for piece in get_blackbox_response(data=data, stream=False, request_id=request_id): |
|
await out_q.put(piece) |
|
except Exception as e: |
|
await out_q.put(f"Error:{e}") |
|
finally: |
|
await out_q.put(None) |
|
REQUEST_QUEUE.task_done() |
|
except asyncio.CancelledError: |
|
break |
|
|
|
|
|
@app.middleware("http") |
|
async def add_request_id(request: Request, call_next): |
|
request.state.request_id = rid = str(uuid.uuid4()) |
|
logger.info("[%s] %s %s", rid, request.method, request.url.path) |
|
start = time.perf_counter() |
|
resp = await call_next(request) |
|
logger.info("[%s] finished in %.2fs", rid, time.perf_counter() - start) |
|
return resp |
|
|
|
|
|
@app.get("/") |
|
async def root(): |
|
return {"message": "API is running"} |
|
|
|
@app.get("/health") |
|
async def health(): |
|
return {"status": "ok"} |
|
|
|
|
|
@app.post("/v1/chat/completions") |
|
async def chat_completions(request: Request): |
|
rid = request.state.request_id |
|
try: |
|
body = await request.json() |
|
messages = body.get("messages", []) |
|
if not messages: |
|
raise HTTPException(status_code=400, detail="Missing 'messages'") |
|
stream = body.get("stream", False) |
|
|
|
payload = build_payload(messages) |
|
|
|
if not stream: |
|
q: asyncio.Queue = asyncio.Queue() |
|
await REQUEST_QUEUE.put((payload, rid, q)) |
|
chunks: List[str] = [] |
|
while True: |
|
part = await q.get() |
|
if part is None: |
|
break |
|
if isinstance(part, str) and part.startswith("Error:"): |
|
raise HTTPException(status_code=502, detail=part) |
|
chunks.append(part) |
|
answer = "".join(chunks) or "No response." |
|
return { |
|
"id": str(uuid.uuid4()), |
|
"object": "chat.completion", |
|
"created": int(time.time()), |
|
"model": "DeepResearch", |
|
"choices": [ |
|
{ |
|
"index": 0, |
|
"message": {"role": "assistant", "content": answer}, |
|
"finish_reason": "stop", |
|
} |
|
], |
|
} |
|
|
|
async def event_stream(): |
|
try: |
|
async for chunk in get_blackbox_response(data=payload, stream=True, request_id=rid): |
|
msg = { |
|
"id": str(uuid.uuid4()), |
|
"object": "chat.completion.chunk", |
|
"created": int(time.time()), |
|
"model": "DeepResearch", |
|
"choices": [{"index": 0, "delta": {"content": chunk}}], |
|
} |
|
yield f"data: {json.dumps(msg)}\n\n" |
|
yield "data: [DONE]\n\n" |
|
except Exception as e: |
|
logger.error("[%s] stream error: %s", rid, e) |
|
raise HTTPException(status_code=500, detail="streaming error") |
|
|
|
return StreamingResponse(event_stream(), media_type="text/event-stream") |
|
|
|
except json.JSONDecodeError: |
|
raise HTTPException(status_code=400, detail="Invalid JSON") |
|
except RetryError as re: |
|
logger.error("[%s] retries failed: %s", rid, re) |
|
raise HTTPException(status_code=502, detail="Blackbox upstream failed") |
|
except Exception as e: |
|
logger.exception("[%s] error", rid) |
|
raise HTTPException(status_code=500, detail="Internal proxy error") |
|
|
|
|
|
@app.on_event("startup") |
|
async def startup(): |
|
global HTTP_SESSION, WORKER_TASKS |
|
HTTP_SESSION = aiohttp.ClientSession( |
|
connector=aiohttp.TCPConnector(limit=CONNECTION_LIMIT, limit_per_host=CONNECTION_LIMIT_PER_HOST), |
|
timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT), |
|
) |
|
WORKER_TASKS = [asyncio.create_task(_worker()) for _ in range(WORKER_COUNT)] |
|
logger.info("Started %d workers", WORKER_COUNT) |
|
|
|
@app.on_event("shutdown") |
|
async def shutdown(): |
|
for t in WORKER_TASKS: |
|
t.cancel() |
|
await asyncio.gather(*WORKER_TASKS, return_exceptions=True) |
|
if HTTP_SESSION: |
|
await HTTP_SESSION.close() |
|
logger.info("Shutdown complete") |
|
|