AIMaster7 commited on
Commit
ccb8513
Β·
verified Β·
1 Parent(s): cbea1f1

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +23 -72
main.py CHANGED
@@ -1,12 +1,10 @@
1
  import asyncio
2
  import json
3
  import logging
4
- import os
5
  import random
6
  import string
7
  import time
8
  import uuid
9
- from http import HTTPStatus
10
  from typing import AsyncGenerator, Dict, List, Any
11
 
12
  import aiohttp
@@ -31,12 +29,10 @@ logger = logging.getLogger("proxy")
31
 
32
  # ─── Config ───
33
  BLACKBOX_URL = "https://www.blackbox.ai/api/chat"
34
- CONNECTION_LIMIT = 200
35
- CONNECTION_LIMIT_PER_HOST = 50
36
  REQUEST_TIMEOUT = 300
37
  WORKER_COUNT = 10
38
 
39
- # ─── Required Headers ───
40
  HEADERS = {
41
  "authority": "www.blackbox.ai",
42
  "method": "POST",
@@ -62,7 +58,7 @@ HEADERS = {
62
  "content-type": "application/json",
63
  }
64
 
65
- # ─── FastAPI Setup ───
66
  app = FastAPI()
67
  app.add_middleware(
68
  CORSMiddleware,
@@ -76,7 +72,6 @@ HTTP_SESSION: aiohttp.ClientSession = None
76
  REQUEST_QUEUE: asyncio.Queue = asyncio.Queue()
77
  WORKER_TASKS: List[asyncio.Task] = []
78
 
79
- # ─── Retryable Error ───
80
  class RetryableStatusError(Exception):
81
  def __init__(self, status: int, text: str):
82
  self.status = status
@@ -86,18 +81,21 @@ class RetryableStatusError(Exception):
86
  RETRYABLE_STATUSES = {400, 429, 500, 502, 503, 504}
87
 
88
  _ascii = string.ascii_letters + string.digits
 
89
  def _rand(n, pool=_ascii): return "".join(random.choice(pool) for _ in range(n))
90
  def random_email(): return _rand(12) + "@gmail.com"
91
  def random_id(): return _rand(21, string.digits)
92
  def random_customer_id(): return "cus_" + _rand(12)
93
 
94
- # ─── Payload Generator ───
95
  def build_payload(messages: List[Dict[str, Any]]) -> Dict[str, Any]:
96
  unique_id = str(uuid.uuid4())
 
 
 
97
  return {
98
  "messages": messages,
99
  "agentMode": {},
100
- "id": messages[-1]["id"] if messages else _rand(8),
101
  "previewToken": None,
102
  "userId": None,
103
  "codeModelMode": True,
@@ -153,7 +151,6 @@ def build_payload(messages: List[Dict[str, Any]]) -> Dict[str, Any]:
153
  "designerMode": False
154
  }
155
 
156
- # ─── Retry Wrapper ───
157
  def log_retry(retry_state):
158
  rid = retry_state.kwargs.get("request_id", "unknown")
159
  attempt = retry_state.attempt_number
@@ -161,7 +158,7 @@ def log_retry(retry_state):
161
  logger.warning("[%s] retry %s/10 due to %s", rid, attempt, err)
162
 
163
  @retry(
164
- stop=stop_after_attempt(10),
165
  wait=wait_exponential(min=1, max=10),
166
  retry=retry_if_exception_type(
167
  (aiohttp.ClientConnectionError, aiohttp.ClientResponseError, asyncio.TimeoutError, RetryableStatusError)
@@ -169,7 +166,9 @@ def log_retry(retry_state):
169
  before_sleep=log_retry
170
  )
171
  async def get_blackbox_response(*, data, stream: bool, request_id: str) -> AsyncGenerator[str, None]:
172
- assert HTTP_SESSION
 
 
173
  async with HTTP_SESSION.post(BLACKBOX_URL, json=data, headers=HEADERS, timeout=REQUEST_TIMEOUT) as resp:
174
  if resp.status != 200:
175
  body = await resp.text()
@@ -184,23 +183,6 @@ async def get_blackbox_response(*, data, stream: bool, request_id: str) -> Async
184
  else:
185
  yield await resp.text()
186
 
187
- # ─── Worker Thread ───
188
- async def _worker():
189
- while True:
190
- try:
191
- data, request_id, out_q = await REQUEST_QUEUE.get()
192
- try:
193
- async for piece in get_blackbox_response(data=data, stream=False, request_id=request_id):
194
- await out_q.put(piece)
195
- except Exception as e:
196
- await out_q.put(f"Error:{e}")
197
- finally:
198
- await out_q.put(None)
199
- REQUEST_QUEUE.task_done()
200
- except asyncio.CancelledError:
201
- break
202
-
203
- # ─── Middleware ───
204
  @app.middleware("http")
205
  async def add_request_id(request: Request, call_next):
206
  request.state.request_id = rid = str(uuid.uuid4())
@@ -210,7 +192,6 @@ async def add_request_id(request: Request, call_next):
210
  logger.info("[%s] finished in %.2fs", rid, time.perf_counter() - start)
211
  return resp
212
 
213
- # ─── Root & Health ───
214
  @app.get("/")
215
  async def root():
216
  return {"message": "API is running"}
@@ -219,7 +200,6 @@ async def root():
219
  async def health():
220
  return {"status": "ok"}
221
 
222
- # ─── Chat Completion ───
223
  @app.post("/v1/chat/completions")
224
  async def chat_completions(request: Request):
225
  rid = request.state.request_id
@@ -232,14 +212,9 @@ async def chat_completions(request: Request):
232
  payload = build_payload(messages)
233
 
234
  if not stream:
235
- q: asyncio.Queue = asyncio.Queue()
236
- await REQUEST_QUEUE.put((payload, rid, q))
237
  chunks: List[str] = []
238
- while True:
239
- part = await q.get()
240
- if part is None:
241
- break
242
- if isinstance(part, str) and part.startswith("Error:"):
243
  raise HTTPException(status_code=502, detail=part)
244
  chunks.append(part)
245
  answer = "".join(chunks) or "No response."
@@ -258,20 +233,16 @@ async def chat_completions(request: Request):
258
  }
259
 
260
  async def event_stream():
261
- try:
262
- async for chunk in get_blackbox_response(data=payload, stream=True, request_id=rid):
263
- msg = {
264
- "id": str(uuid.uuid4()),
265
- "object": "chat.completion.chunk",
266
- "created": int(time.time()),
267
- "model": "DeepResearch",
268
- "choices": [{"index": 0, "delta": {"content": chunk}}],
269
- }
270
- yield f"data: {json.dumps(msg)}\n\n"
271
- yield "data: [DONE]\n\n"
272
- except Exception as e:
273
- logger.error("[%s] stream error: %s", rid, e)
274
- raise HTTPException(status_code=500, detail="streaming error")
275
 
276
  return StreamingResponse(event_stream(), media_type="text/event-stream")
277
 
@@ -283,23 +254,3 @@ async def chat_completions(request: Request):
283
  except Exception as e:
284
  logger.exception("[%s] error", rid)
285
  raise HTTPException(status_code=500, detail="Internal proxy error")
286
-
287
- # ─── Startup & Shutdown ───
288
- @app.on_event("startup")
289
- async def startup():
290
- global HTTP_SESSION, WORKER_TASKS
291
- HTTP_SESSION = aiohttp.ClientSession(
292
- connector=aiohttp.TCPConnector(limit=CONNECTION_LIMIT, limit_per_host=CONNECTION_LIMIT_PER_HOST),
293
- timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT),
294
- )
295
- WORKER_TASKS = [asyncio.create_task(_worker()) for _ in range(WORKER_COUNT)]
296
- logger.info("Started %d workers", WORKER_COUNT)
297
-
298
- @app.on_event("shutdown")
299
- async def shutdown():
300
- for t in WORKER_TASKS:
301
- t.cancel()
302
- await asyncio.gather(*WORKER_TASKS, return_exceptions=True)
303
- if HTTP_SESSION:
304
- await HTTP_SESSION.close()
305
- logger.info("Shutdown complete")
 
1
  import asyncio
2
  import json
3
  import logging
 
4
  import random
5
  import string
6
  import time
7
  import uuid
 
8
  from typing import AsyncGenerator, Dict, List, Any
9
 
10
  import aiohttp
 
29
 
30
  # ─── Config ───
31
  BLACKBOX_URL = "https://www.blackbox.ai/api/chat"
 
 
32
  REQUEST_TIMEOUT = 300
33
  WORKER_COUNT = 10
34
 
35
+ # ─── Headers ───
36
  HEADERS = {
37
  "authority": "www.blackbox.ai",
38
  "method": "POST",
 
58
  "content-type": "application/json",
59
  }
60
 
61
+ # ─── FastAPI ───
62
  app = FastAPI()
63
  app.add_middleware(
64
  CORSMiddleware,
 
72
  REQUEST_QUEUE: asyncio.Queue = asyncio.Queue()
73
  WORKER_TASKS: List[asyncio.Task] = []
74
 
 
75
  class RetryableStatusError(Exception):
76
  def __init__(self, status: int, text: str):
77
  self.status = status
 
81
  RETRYABLE_STATUSES = {400, 429, 500, 502, 503, 504}
82
 
83
  _ascii = string.ascii_letters + string.digits
84
+
85
  def _rand(n, pool=_ascii): return "".join(random.choice(pool) for _ in range(n))
86
  def random_email(): return _rand(12) + "@gmail.com"
87
  def random_id(): return _rand(21, string.digits)
88
  def random_customer_id(): return "cus_" + _rand(12)
89
 
 
90
  def build_payload(messages: List[Dict[str, Any]]) -> Dict[str, Any]:
91
  unique_id = str(uuid.uuid4())
92
+ msg_id = _rand(8)
93
+ if messages and isinstance(messages[-1], dict):
94
+ msg_id = messages[-1].get("id", msg_id)
95
  return {
96
  "messages": messages,
97
  "agentMode": {},
98
+ "id": msg_id,
99
  "previewToken": None,
100
  "userId": None,
101
  "codeModelMode": True,
 
151
  "designerMode": False
152
  }
153
 
 
154
  def log_retry(retry_state):
155
  rid = retry_state.kwargs.get("request_id", "unknown")
156
  attempt = retry_state.attempt_number
 
158
  logger.warning("[%s] retry %s/10 due to %s", rid, attempt, err)
159
 
160
  @retry(
161
+ stop=stop_after_attempt(3),
162
  wait=wait_exponential(min=1, max=10),
163
  retry=retry_if_exception_type(
164
  (aiohttp.ClientConnectionError, aiohttp.ClientResponseError, asyncio.TimeoutError, RetryableStatusError)
 
166
  before_sleep=log_retry
167
  )
168
  async def get_blackbox_response(*, data, stream: bool, request_id: str) -> AsyncGenerator[str, None]:
169
+ global HTTP_SESSION
170
+ if not HTTP_SESSION:
171
+ HTTP_SESSION = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT))
172
  async with HTTP_SESSION.post(BLACKBOX_URL, json=data, headers=HEADERS, timeout=REQUEST_TIMEOUT) as resp:
173
  if resp.status != 200:
174
  body = await resp.text()
 
183
  else:
184
  yield await resp.text()
185
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
186
  @app.middleware("http")
187
  async def add_request_id(request: Request, call_next):
188
  request.state.request_id = rid = str(uuid.uuid4())
 
192
  logger.info("[%s] finished in %.2fs", rid, time.perf_counter() - start)
193
  return resp
194
 
 
195
  @app.get("/")
196
  async def root():
197
  return {"message": "API is running"}
 
200
  async def health():
201
  return {"status": "ok"}
202
 
 
203
  @app.post("/v1/chat/completions")
204
  async def chat_completions(request: Request):
205
  rid = request.state.request_id
 
212
  payload = build_payload(messages)
213
 
214
  if not stream:
 
 
215
  chunks: List[str] = []
216
+ async for part in get_blackbox_response(data=payload, stream=False, request_id=rid):
217
+ if part.startswith("Error:"):
 
 
 
218
  raise HTTPException(status_code=502, detail=part)
219
  chunks.append(part)
220
  answer = "".join(chunks) or "No response."
 
233
  }
234
 
235
  async def event_stream():
236
+ async for chunk in get_blackbox_response(data=payload, stream=True, request_id=rid):
237
+ msg = {
238
+ "id": str(uuid.uuid4()),
239
+ "object": "chat.completion.chunk",
240
+ "created": int(time.time()),
241
+ "model": "DeepResearch",
242
+ "choices": [{"index": 0, "delta": {"content": chunk}}],
243
+ }
244
+ yield f"data: {json.dumps(msg)}\n\n"
245
+ yield "data: [DONE]\n\n"
 
 
 
 
246
 
247
  return StreamingResponse(event_stream(), media_type="text/event-stream")
248
 
 
254
  except Exception as e:
255
  logger.exception("[%s] error", rid)
256
  raise HTTPException(status_code=500, detail="Internal proxy error")