import asyncio import json import logging import random import string import time import uuid from typing import AsyncGenerator, Dict, List, Any import aiohttp import brotli import sys import importlib from fastapi import FastAPI, Request, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse from tenacity import ( retry, stop_after_attempt, wait_exponential, retry_if_exception_type, RetryError, ) # ─── Logging ─── logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", datefmt="%H:%M:%S", ) logger = logging.getLogger("proxy") # ─── Config ─── BLACKBOX_URL = "https://www.blackbox.ai/api/chat" REQUEST_TIMEOUT = 300 # ─── Headers ─── HEADERS = { "authority": "www.blackbox.ai", "method": "POST", "path": "/api/chat", "scheme": "https", "accept": "*/*", "accept-encoding": "gzip, deflate, br, zstd", "accept-language": "en-US,en;q=0.9", "origin": "https://www.blackbox.ai", "priority": "u=1, i", "referer": "https://www.blackbox.ai/", "sec-ch-ua": '"Chromium";v="136", "Google Chrome";v="136", "Not.A/Brand";v="99"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "user-agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/136.0.0.0 Safari/537.36" ), "content-type": "application/json", } # ─── FastAPI ─── app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) HTTP_SESSION: aiohttp.ClientSession = None class RetryableStatusError(Exception): def __init__(self, status: int, text: str): self.status = status self.text = text super().__init__(f"status={status} body={text[:100]}...") RETRYABLE_STATUSES = {400, 429, 500, 502, 503, 504} _ascii = string.ascii_letters + string.digits def _rand(n, pool=_ascii): return "".join(random.choice(pool) for _ in range(n)) def random_email(): return _rand(12) + "@gmail.com" def random_id(): return _rand(21, string.digits) def random_customer_id(): return "cus_" + _rand(12) def build_payload(messages: List[Dict[str, Any]]) -> Dict[str, Any]: return { "messages": messages, "agentMode": {}, "id": None, "previewToken": None, "userId": None, "codeModelMode": True, "trendingAgentMode": {}, "isMicMode": False, "userSystemPrompt": None, "maxTokens": 1024, "playgroundTopP": None, "playgroundTemperature": None, "isChromeExt": False, "githubToken": "", "clickedAnswer2": False, "clickedAnswer3": False, "clickedForceWebSearch": False, "visitFromDelta": False, "isMemoryEnabled": False, "mobileClient": False, "userSelectedModel": None, "validated": str(uuid.uuid4()), "imageGenerationMode": False, "webSearchModePrompt": False, "deepSearchMode": True, "domains": None, "vscodeClient": False, "codeInterpreterMode": False, "customProfile": { "name": "", "occupation": "", "traits": [], "additionalInfo": "", "enableNewChats": False }, "session": { "user": { "name": "S.C gaming", "email": "simarmanbir@gmail.com", "image": "https://lh3.googleusercontent.com/a/ACg8ocI-ze5Qe42S-j8xaCL6X7KSVwfiOae4fONqpTxzt0d2_a2FIld1=s96-c", "id": "100846841133312010974" }, "expires": "2025-06-09T19:36:08.220Z", "isNewUser": False }, "isPremium": True, "subscriptionCache": { "status": "PREMIUM", "customerId": "cus_Rtiok41PQ2oo1c", "expiryTimestamp": 1749108685, "lastChecked": 1746822333827, "isTrialSubscription": False }, "beastMode": False, "reasoningMode": False, "designerMode": False } def log_retry(retry_state): rid = retry_state.kwargs.get("request_id", "unknown") attempt = retry_state.attempt_number err = retry_state.outcome.exception() logger.warning("[%s] retry %s/3 due to %s", rid, attempt, err) @retry( stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10), retry=retry_if_exception_type( (aiohttp.ClientConnectionError, aiohttp.ClientResponseError, asyncio.TimeoutError, RetryableStatusError) ), before_sleep=log_retry ) async def get_blackbox_response(*, data, stream: bool, request_id: str) -> AsyncGenerator[str, None]: global HTTP_SESSION if not HTTP_SESSION: try: importlib.import_module("brotli") except ImportError: logger.error("Missing Brotli module. Install with: pip install brotli") raise HTTPException(status_code=502, detail="Missing Brotli module on server") HTTP_SESSION = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT)) try: async with HTTP_SESSION.post(BLACKBOX_URL, json=data, headers=HEADERS, timeout=REQUEST_TIMEOUT) as resp: if resp.status != 200: body = await resp.text() logger.error("[%s] Upstream %s error: %s", request_id, BLACKBOX_URL, resp.status) if resp.status in RETRYABLE_STATUSES: raise RetryableStatusError(resp.status, body) raise HTTPException(status_code=502, detail=f"Upstream error {resp.status}") if stream: async for chunk in resp.content.iter_any(): if chunk: yield chunk.decode("utf-8", "ignore") else: yield await resp.text() except Exception as e: logger.exception("[%s] Unexpected upstream error", request_id) raise HTTPException(status_code=502, detail="Failed to contact upstream") from e @app.middleware("http") async def add_request_id(request: Request, call_next): request.state.request_id = rid = str(uuid.uuid4()) logger.info("[%s] %s %s", rid, request.method, request.url.path) start = time.perf_counter() resp = await call_next(request) logger.info("[%s] finished in %.2fs", rid, time.perf_counter() - start) return resp @app.get("/") async def root(): return {"message": "API is running"} @app.get("/health") async def health(): return {"status": "ok"} @app.post("/v1/chat/completions") async def chat_completions(request: Request): rid = request.state.request_id try: body = await request.json() messages = body.get("messages", []) if not messages: raise HTTPException(status_code=400, detail="Missing 'messages'") stream = body.get("stream", False) payload = build_payload(messages) if not stream: chunks: List[str] = [] async for part in get_blackbox_response(data=payload, stream=False, request_id=rid): if part.startswith("Error:"): raise HTTPException(status_code=502, detail=part) chunks.append(part) answer = "".join(chunks) or "No response." return { "id": str(uuid.uuid4()), "object": "chat.completion", "created": int(time.time()), "model": "DeepResearch", "choices": [ { "index": 0, "message": {"role": "assistant", "content": answer}, "finish_reason": "stop", } ], } async def event_stream(): async for chunk in get_blackbox_response(data=payload, stream=True, request_id=rid): msg = { "id": str(uuid.uuid4()), "object": "chat.completion.chunk", "created": int(time.time()), "model": "DeepResearch", "choices": [{"index": 0, "delta": {"content": chunk}}], } yield f"data: {json.dumps(msg)}\n\n" yield "data: [DONE]\n\n" return StreamingResponse(event_stream(), media_type="text/event-stream") except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid JSON") except RetryError as re: logger.error("[%s] retries failed: %s", rid, re) raise HTTPException(status_code=502, detail="Blackbox upstream failed") except Exception as e: logger.exception("[%s] error", rid) raise HTTPException(status_code=500, detail="Internal proxy error")