Spaces:
mxrkai
/
Runtime error

test24 / main.py
Niansuh's picture
Update main.py
8b177d4 verified
raw
history blame
6.92 kB
import os
import re
import random
import string
import uuid
import json
import logging
import asyncio
import time
from collections import defaultdict
from typing import List, Dict, Any, Optional, AsyncGenerator, Union
from datetime import datetime
from aiohttp import ClientSession, ClientTimeout, ClientError
from fastapi import FastAPI, HTTPException, Request, Depends, Header
from fastapi.responses import StreamingResponse, JSONResponse
from pydantic import BaseModel
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# Load environment variables
API_KEYS = os.getenv('API_KEYS', '').split(',')
RATE_LIMIT = int(os.getenv('RATE_LIMIT', '60'))
AVAILABLE_MODELS = os.getenv('AVAILABLE_MODELS', '')
if not API_KEYS or API_KEYS == ['']:
logger.error("No API keys found. Please set the API_KEYS environment variable.")
raise Exception("API_KEYS environment variable not set.")
if AVAILABLE_MODELS:
AVAILABLE_MODELS = [model.strip() for model in AVAILABLE_MODELS.split(',') if model.strip()]
else:
AVAILABLE_MODELS = []
rate_limit_store = defaultdict(lambda: {"count": 0, "timestamp": time.time()})
CLEANUP_INTERVAL = 60
RATE_LIMIT_WINDOW = 60
async def cleanup_rate_limit_stores():
while True:
current_time = time.time()
ips_to_delete = [ip for ip, value in rate_limit_store.items() if current_time - value["timestamp"] > RATE_LIMIT_WINDOW * 2]
for ip in ips_to_delete:
del rate_limit_store[ip]
await asyncio.sleep(CLEANUP_INTERVAL)
async def rate_limiter_per_ip(request: Request):
client_ip = request.client.host
current_time = time.time()
if current_time - rate_limit_store[client_ip]["timestamp"] > RATE_LIMIT_WINDOW:
rate_limit_store[client_ip] = {"count": 1, "timestamp": current_time}
else:
if rate_limit_store[client_ip]["count"] >= RATE_LIMIT:
raise HTTPException(status_code=429, detail='Rate limit exceeded')
rate_limit_store[client_ip]["count"] += 1
async def get_api_key(request: Request, authorization: str = Header(None)) -> str:
client_ip = request.client.host
if authorization is None or not authorization.startswith('Bearer '):
raise HTTPException(status_code=401, detail='Invalid authorization header format')
api_key = authorization[7:]
if api_key not in API_KEYS:
raise HTTPException(status_code=401, detail='Invalid API key')
return api_key
class ImageResponse:
def __init__(self, url: str, alt: str):
self.url = url
self.alt = alt
def to_data_uri(image_base64: str) -> str:
return f"data:image/jpeg;base64,{image_base64}"
class Blackbox:
url = "https://www.blackbox.ai"
api_endpoint = "https://www.blackbox.ai/api/chat"
working = True
supports_stream = True
default_model = 'blackboxai'
models = [default_model, 'ImageGeneration', 'gpt-4o', 'llama-3.1-8b']
@classmethod
def get_model(cls, model: str) -> Optional[str]:
if model in cls.models:
return model
else:
return cls.default_model
@classmethod
async def create_async_generator(
cls,
model: str,
messages: List[Dict[str, str]],
image_base64: Optional[str] = None,
**kwargs
) -> AsyncGenerator[Any, None]:
model = cls.get_model(model)
if model is None:
raise HTTPException(status_code=400, detail="Model not available")
headers = {
"accept": "*/*",
"content-type": "application/json",
"origin": cls.url,
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36",
"referer": f"{cls.url}/?model={model}"
}
random_id = ''.join(random.choices(string.ascii_letters + string.digits, k=7))
data = {
"messages": messages,
"id": random_id,
"previewToken": None,
"userId": None,
"codeModelMode": True,
"agentMode": {},
"trendingAgentMode": {},
"isMicMode": False,
"userSystemPrompt": None,
"maxTokens": 1024,
"playgroundTopP": 0.9,
"playgroundTemperature": 0.5,
"isChromeExt": False,
"githubToken": None,
"clickedAnswer2": False,
"clickedAnswer3": False,
"clickedForceWebSearch": False,
"visitFromDelta": False,
"mobileClient": False,
"userSelectedModel": model,
"webSearchMode": False,
}
if image_base64:
data["messages"][-1]['data'] = {
'imageBase64': to_data_uri(image_base64),
'fileText': '',
'title': 'Uploaded Image'
}
data["messages"][-1]['content'] = 'FILE:BB\n$#$\n\n$#$\n' + data["messages"][-1]['content']
timeout = ClientTimeout(total=60)
async with ClientSession(headers=headers, timeout=timeout) as session:
async with session.post(cls.api_endpoint, json=data) as response:
response.raise_for_status()
async for chunk in response.content.iter_any():
decoded_chunk = chunk.decode(errors='ignore')
yield decoded_chunk
app = FastAPI()
@app.on_event("startup")
async def startup_event():
asyncio.create_task(cleanup_rate_limit_stores())
class Message(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
model: str
messages: List[Message]
image_base64: Optional[str] = None
@app.post("/v1/chat/completions", dependencies=[Depends(rate_limiter_per_ip)])
async def chat_completions(request: ChatRequest, req: Request, api_key: str = Depends(get_api_key)):
try:
messages = [{"role": msg.role, "content": msg.content} for msg in request.messages]
async_generator = Blackbox.create_async_generator(
model=request.model,
messages=messages,
image_base64=request.image_base64
)
response_content = ""
async for chunk in async_generator:
response_content += chunk
return {"response": response_content}
except HTTPException as e:
raise e
except Exception as e:
raise HTTPException(status_code=500, detail="Internal Server Error")
@app.get("/v1/models", dependencies=[Depends(rate_limiter_per_ip)])
async def get_models():
return {"data": [{"id": model, "object": "model"} for model in Blackbox.models]}
@app.get("/v1/health")
async def health_check():
return {"status": "ok"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)