|
import asyncio |
|
import json |
|
import logging |
|
import random |
|
import string |
|
import time |
|
import uuid |
|
from typing import AsyncGenerator, Dict, List, Any |
|
|
|
import aiohttp |
|
import brotli |
|
from fastapi import FastAPI, Request, HTTPException |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from fastapi.responses import StreamingResponse |
|
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type, RetryError |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format="%(asctime)s %(levelname)s %(message)s", |
|
datefmt="%H:%M:%S", |
|
) |
|
logger = logging.getLogger("proxy") |
|
|
|
|
|
BLACKBOX_URL = "https://www.blackbox.ai/api/chat" |
|
REQUEST_TIMEOUT = 300 |
|
HEADERS = { |
|
"authority": "www.blackbox.ai", |
|
"method": "POST", |
|
"path": "/api/chat", |
|
"scheme": "https", |
|
"accept": "*/*", |
|
"accept-encoding": "gzip, deflate, br, zstd", |
|
"accept-language": "en-US,en;q=0.9", |
|
"origin": "https://www.blackbox.ai", |
|
"priority": "u=1, i", |
|
"referer": "https://www.blackbox.ai/", |
|
"sec-ch-ua": '"Chromium";v="136", "Google Chrome";v="136", "Not.A/Brand";v="99"', |
|
"sec-ch-ua-mobile": "?0", |
|
"sec-ch-ua-platform": '"Windows"', |
|
"sec-fetch-dest": "empty", |
|
"sec-fetch-mode": "cors", |
|
"sec-fetch-site": "same-origin", |
|
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36", |
|
"content-type": "application/json", |
|
} |
|
|
|
|
|
app = FastAPI() |
|
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"]) |
|
HTTP_SESSION: aiohttp.ClientSession = None |
|
RETRYABLE_STATUSES = {400, 429, 500, 502, 503, 504} |
|
_ascii = string.ascii_letters + string.digits |
|
|
|
def _rand(n, pool=_ascii): return ''.join(random.choice(pool) for _ in range(n)) |
|
def random_email(): return _rand(12) + "@gmail.com" |
|
def random_id(): return _rand(21, string.digits) |
|
def random_customer_id(): return "cus_" + _rand(12) |
|
def generate_7char_id(): return _rand(7) |
|
|
|
def build_payload(messages: List[Dict[str, Any]]) -> Dict[str, Any]: |
|
msg_id = generate_7char_id() |
|
if messages: |
|
messages[-1]["id"] = msg_id |
|
now = int(time.time()) |
|
return { |
|
"messages": messages, |
|
"agentMode": {}, |
|
"id": msg_id, |
|
"previewToken": None, |
|
"userId": None, |
|
"codeModelMode": True, |
|
"trendingAgentMode": {}, |
|
"isMicMode": False, |
|
"userSystemPrompt": None, |
|
"maxTokens": 1024, |
|
"playgroundTopP": None, |
|
"playgroundTemperature": None, |
|
"isChromeExt": False, |
|
"githubToken": "", |
|
"clickedAnswer2": False, |
|
"clickedAnswer3": False, |
|
"clickedForceWebSearch": False, |
|
"visitFromDelta": False, |
|
"isMemoryEnabled": False, |
|
"mobileClient": False, |
|
"userSelectedModel": None, |
|
"validated": "00f37b34-a166-4efb-bce5-1312d87f2f94", |
|
"imageGenerationMode": False, |
|
"webSearchModePrompt": False, |
|
"deepSearchMode": True, |
|
"domains": None, |
|
"vscodeClient": False, |
|
"codeInterpreterMode": False, |
|
"customProfile": {"name": "", "occupation": "", "traits": [], "additionalInfo": "", "enableNewChats": False}, |
|
"session": { |
|
"user": { |
|
"name": _rand(10), |
|
"email": random_email(), |
|
"image": "https://lh3.googleusercontent.com/a/default", |
|
"id": random_id() |
|
}, |
|
"expires": "2025-06-09T19:36:08.220Z", |
|
"isNewUser": False |
|
}, |
|
"isPremium": True, |
|
"subscriptionCache": { |
|
"status": "PREMIUM", |
|
"customerId": random_customer_id(), |
|
"expiryTimestamp": now + 60 * 86400, |
|
"lastChecked": int(time.time() * 1000), |
|
"isTrialSubscription": False |
|
}, |
|
"beastMode": False, |
|
"reasoningMode": False, |
|
"designerMode": False |
|
} |
|
|
|
class RetryableStatusError(Exception): |
|
def __init__(self, status: int, text: str): |
|
super().__init__(f"status={status} body={text[:100]}...") |
|
|
|
def log_retry(retry_state): |
|
rid = retry_state.kwargs.get("request_id", "unknown") |
|
logger.warning("[%s] retry %s/3 due to %s", rid, retry_state.attempt_number, retry_state.outcome.exception()) |
|
|
|
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10), retry=retry_if_exception_type( |
|
(aiohttp.ClientConnectionError, aiohttp.ClientResponseError, asyncio.TimeoutError, RetryableStatusError)), |
|
before_sleep=log_retry) |
|
async def get_blackbox_response(*, data, stream: bool, request_id: str) -> AsyncGenerator[str, None]: |
|
global HTTP_SESSION |
|
if not HTTP_SESSION: |
|
HTTP_SESSION = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT)) |
|
async with HTTP_SESSION.post(BLACKBOX_URL, json=data, headers=HEADERS, timeout=REQUEST_TIMEOUT) as resp: |
|
if resp.status != 200: |
|
body = await resp.text() |
|
logger.error("[%s] Upstream %s error: %s", request_id, BLACKBOX_URL, resp.status) |
|
if resp.status in RETRYABLE_STATUSES: |
|
raise RetryableStatusError(resp.status, body) |
|
raise HTTPException(status_code=502, detail=f"Upstream error {resp.status}") |
|
if stream: |
|
async for chunk in resp.content.iter_any(): |
|
if chunk: |
|
yield chunk.decode("utf-8", "ignore") |
|
else: |
|
yield await resp.text() |
|
|
|
@app.middleware("http") |
|
async def add_request_id(request: Request, call_next): |
|
request.state.request_id = rid = str(uuid.uuid4()) |
|
logger.info("[%s] %s %s", rid, request.method, request.url.path) |
|
start = time.perf_counter() |
|
resp = await call_next(request) |
|
logger.info("[%s] finished in %.2fs", rid, time.perf_counter() - start) |
|
return resp |
|
|
|
@app.get("/") |
|
async def root(): |
|
return {"message": "API is running"} |
|
|
|
@app.get("/health") |
|
async def health(): |
|
return {"status": "ok"} |
|
|
|
@app.post("/v1/chat/completions") |
|
async def chat_completions(request: Request): |
|
rid = request.state.request_id |
|
try: |
|
body = await request.json() |
|
messages = body.get("messages", []) |
|
if not messages: |
|
raise HTTPException(status_code=400, detail="Missing 'messages'") |
|
stream = body.get("stream", False) |
|
|
|
|
|
if body.get("test_payload") == "deepsearch": |
|
payload = { |
|
"messages": [{"id": "s2eB86t", "content": "google", "role": "user"}], |
|
"agentMode": {}, |
|
"id": "s2eB86t", |
|
"previewToken": None, |
|
"userId": None, |
|
"codeModelMode": True, |
|
"trendingAgentMode": {}, |
|
"isMicMode": False, |
|
"userSystemPrompt": None, |
|
"maxTokens": 1024, |
|
"playgroundTopP": None, |
|
"playgroundTemperature": None, |
|
"isChromeExt": False, |
|
"githubToken": "", |
|
"clickedAnswer2": False, |
|
"clickedAnswer3": False, |
|
"clickedForceWebSearch": False, |
|
"visitFromDelta": False, |
|
"isMemoryEnabled": False, |
|
"mobileClient": False, |
|
"userSelectedModel": None, |
|
"validated": "00f37b34-a166-4efb-bce5-1312d87f2f94", |
|
"imageGenerationMode": False, |
|
"webSearchModePrompt": False, |
|
"deepSearchMode": True, |
|
"domains": None, |
|
"vscodeClient": False, |
|
"codeInterpreterMode": False, |
|
"customProfile": { |
|
"name": "", |
|
"occupation": "", |
|
"traits": [], |
|
"additionalInfo": "", |
|
"enableNewChats": False |
|
}, |
|
"session": { |
|
"user": { |
|
"name": "S.C gaming", |
|
"email": "[email protected]", |
|
"image": "https://lh3.googleusercontent.com/a/ACg8ocI-ze5Qe42S-j8xaCL6X7KSVwfiOae4fONqpTxzt0d2_a2FIld1=s96-c", |
|
"id": "100846841133312010974" |
|
}, |
|
"expires": "2025-06-09T19:36:08.220Z", |
|
"isNewUser": False |
|
}, |
|
"isPremium": True, |
|
"subscriptionCache": { |
|
"status": "PREMIUM", |
|
"customerId": "cus_Rtiok4sPQNoo1c", |
|
"expiryTimestamp": 1749108685, |
|
"lastChecked": 1746822333827, |
|
"isTrialSubscription": True |
|
}, |
|
"beastMode": False, |
|
"reasoningMode": False, |
|
"designerMode": False |
|
} |
|
else: |
|
payload = build_payload(messages) |
|
|
|
if not stream: |
|
chunks = [] |
|
async for part in get_blackbox_response(data=payload, stream=False, request_id=rid): |
|
if part.startswith("Error:"): |
|
raise HTTPException(status_code=502, detail=part) |
|
chunks.append(part) |
|
return { |
|
"id": str(uuid.uuid4()), |
|
"object": "chat.completion", |
|
"created": int(time.time()), |
|
"model": "DeepResearch", |
|
"choices": [{ |
|
"index": 0, |
|
"message": {"role": "assistant", "content": "".join(chunks)}, |
|
"finish_reason": "stop" |
|
}] |
|
} |
|
|
|
async def event_stream(): |
|
async for chunk in get_blackbox_response(data=payload, stream=True, request_id=rid): |
|
msg = { |
|
"id": str(uuid.uuid4()), |
|
"object": "chat.completion.chunk", |
|
"created": int(time.time()), |
|
"model": "DeepResearch", |
|
"choices": [{"index": 0, "delta": {"content": chunk}}], |
|
} |
|
yield f"data: {json.dumps(msg)}\n\n" |
|
yield "data: [DONE]\n\n" |
|
|
|
return StreamingResponse(event_stream(), media_type="text/event-stream") |
|
|
|
except json.JSONDecodeError: |
|
raise HTTPException(status_code=400, detail="Invalid JSON") |
|
except RetryError as re: |
|
logger.error("[%s] retries failed: %s", rid, re) |
|
raise HTTPException(status_code=502, detail="Blackbox upstream failed") |
|
except Exception as e: |
|
logger.exception("[%s] error", rid) |
|
raise HTTPException(status_code=500, detail="Internal proxy error") |
|
|