import asyncio import json import logging import os import random import string import time import uuid from http import HTTPStatus from typing import AsyncGenerator, Dict, List, Any import aiohttp from fastapi import FastAPI, Request, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse from tenacity import ( retry, stop_after_attempt, wait_exponential, retry_if_exception_type, RetryError, ) # ─── Logging ─── logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", datefmt="%H:%M:%S", ) logger = logging.getLogger("proxy") # ─── Config ─── BLACKBOX_URL = "https://www.blackbox.ai/api/chat" CONNECTION_LIMIT = 200 CONNECTION_LIMIT_PER_HOST = 50 REQUEST_TIMEOUT = 300 WORKER_COUNT = 10 # ─── Static Headers ─── HEADERS = { "accept": "*/*", "content-type": "application/json", "origin": "https://www.blackbox.ai", "referer": "https://www.blackbox.ai/", "user-agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/136.0.0.0 Safari/537.36" ), } # ─── FastAPI Setup ─── app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) HTTP_SESSION: aiohttp.ClientSession = None REQUEST_QUEUE: asyncio.Queue = asyncio.Queue() WORKER_TASKS: List[asyncio.Task] = [] # ─── Retryable Error ─── class RetryableStatusError(Exception): def __init__(self, status: int, text: str): self.status = status self.text = text super().__init__(f"status={status} body={text[:100]}...") RETRYABLE_STATUSES = {400, 429, 500, 502, 503, 504} # ─── Random Data ─── _ascii = string.ascii_letters + string.digits def _rand(n, pool=_ascii): return "".join(random.choice(pool) for _ in range(n)) def random_email(): return _rand(12) + "@gmail.com" def random_id(): return _rand(21, string.digits) def random_customer_id(): return "cus_" + _rand(12) # ─── Payload Generator ─── def build_payload(messages: List[Dict[str, Any]]) -> Dict[str, Any]: return { "messages": messages, "id": _rand(8), "agentMode": {}, "codeModelMode": True, "trendingAgentMode": {}, "isMicMode": False, "userSystemPrompt": None, "maxTokens": 1024, "playgroundTopP": None, "playgroundTemperature": None, "isChromeExt": False, "githubToken": "", "clickedAnswer2": False, "clickedAnswer3": False, "clickedForceWebSearch": False, "visitFromDelta": False, "isMemoryEnabled": False, "mobileClient": False, "userSelectedModel": None, "validated": str(uuid.uuid4()), "imageGenerationMode": False, "webSearchModePrompt": False, "deepSearchMode": True, "domains": None, "vscodeClient": False, "codeInterpreterMode": False, "customProfile": { "name": "", "occupation": "", "traits": [], "additionalInfo": "", "enableNewChats": False }, "session": { "user": { "name": "S.C gaming", "email": random_email(), "image": "https://lh3.googleusercontent.com/a/default", "id": random_id() }, "expires": "2025-12-31T23:59:59Z", "isNewUser": False }, "isPremium": True, "subscriptionCache": { "status": "PREMIUM", "customerId": random_customer_id(), "expiryTimestamp": int(time.time()) + 30 * 86400, "lastChecked": int(time.time()), "isTrialSubscription": False }, "beastMode": False, "reasoningMode": False, "designerMode": False } # ─── Retry Wrapper ─── def log_retry(retry_state): rid = retry_state.kwargs.get("request_id", "unknown") attempt = retry_state.attempt_number err = retry_state.outcome.exception() logger.warning("[%s] retry %s/10 due to %s", rid, attempt, err) @retry( stop=stop_after_attempt(10), wait=wait_exponential(min=1, max=10), retry=retry_if_exception_type( (aiohttp.ClientConnectionError, aiohttp.ClientResponseError, asyncio.TimeoutError, RetryableStatusError) ), before_sleep=log_retry ) async def get_blackbox_response(*, data, stream: bool, request_id: str) -> AsyncGenerator[str, None]: assert HTTP_SESSION async with HTTP_SESSION.post(BLACKBOX_URL, json=data, headers=HEADERS, timeout=REQUEST_TIMEOUT) as resp: if resp.status != 200: body = await resp.text() logger.error("[%s] Upstream %s error: %s", request_id, BLACKBOX_URL, resp.status) if resp.status in RETRYABLE_STATUSES: raise RetryableStatusError(resp.status, body) raise HTTPException(status_code=502, detail=f"Upstream error {resp.status}") if stream: async for chunk in resp.content.iter_any(): if chunk: yield chunk.decode("utf-8", "ignore") else: yield await resp.text() # ─── Worker Thread ─── async def _worker(): while True: try: data, request_id, out_q = await REQUEST_QUEUE.get() try: async for piece in get_blackbox_response(data=data, stream=False, request_id=request_id): await out_q.put(piece) except Exception as e: await out_q.put(f"Error:{e}") finally: await out_q.put(None) REQUEST_QUEUE.task_done() except asyncio.CancelledError: break # ─── Middleware ─── @app.middleware("http") async def add_request_id(request: Request, call_next): request.state.request_id = rid = str(uuid.uuid4()) logger.info("[%s] %s %s", rid, request.method, request.url.path) start = time.perf_counter() resp = await call_next(request) logger.info("[%s] finished in %.2fs", rid, time.perf_counter() - start) return resp # ─── Root & Health ─── @app.get("/") async def root(): return {"message": "API is running"} @app.get("/health") async def health(): return {"status": "ok"} # ─── Chat Completion ─── @app.post("/v1/chat/completions") async def chat_completions(request: Request): rid = request.state.request_id try: body = await request.json() messages = body.get("messages", []) if not messages: raise HTTPException(status_code=400, detail="Missing 'messages'") stream = body.get("stream", False) payload = build_payload(messages) if not stream: q: asyncio.Queue = asyncio.Queue() await REQUEST_QUEUE.put((payload, rid, q)) chunks: List[str] = [] while True: part = await q.get() if part is None: break if isinstance(part, str) and part.startswith("Error:"): raise HTTPException(status_code=502, detail=part) chunks.append(part) answer = "".join(chunks) or "No response." return { "id": str(uuid.uuid4()), "object": "chat.completion", "created": int(time.time()), "model": "DeepResearch", "choices": [ { "index": 0, "message": {"role": "assistant", "content": answer}, "finish_reason": "stop", } ], } async def event_stream(): try: async for chunk in get_blackbox_response(data=payload, stream=True, request_id=rid): msg = { "id": str(uuid.uuid4()), "object": "chat.completion.chunk", "created": int(time.time()), "model": "DeepResearch", "choices": [{"index": 0, "delta": {"content": chunk}}], } yield f"data: {json.dumps(msg)}\n\n" yield "data: [DONE]\n\n" except Exception as e: logger.error("[%s] stream error: %s", rid, e) raise HTTPException(status_code=500, detail="streaming error") return StreamingResponse(event_stream(), media_type="text/event-stream") except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid JSON") except RetryError as re: logger.error("[%s] retries failed: %s", rid, re) raise HTTPException(status_code=502, detail="Blackbox upstream failed") except Exception as e: logger.exception("[%s] error", rid) raise HTTPException(status_code=500, detail="Internal proxy error") # ─── Startup & Shutdown ─── @app.on_event("startup") async def startup(): global HTTP_SESSION, WORKER_TASKS HTTP_SESSION = aiohttp.ClientSession( connector=aiohttp.TCPConnector(limit=CONNECTION_LIMIT, limit_per_host=CONNECTION_LIMIT_PER_HOST), timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT), ) WORKER_TASKS = [asyncio.create_task(_worker()) for _ in range(WORKER_COUNT)] logger.info("Started %d workers", WORKER_COUNT) @app.on_event("shutdown") async def shutdown(): for t in WORKER_TASKS: t.cancel() await asyncio.gather(*WORKER_TASKS, return_exceptions=True) if HTTP_SESSION: await HTTP_SESSION.close() logger.info("Shutdown complete")