|
from __future__ import annotations |
|
import re |
|
import random |
|
import string |
|
from aiohttp import ClientSession |
|
from fastapi import FastAPI, HTTPException |
|
from pydantic import BaseModel |
|
from typing import List, Dict, Any, Optional, Union |
|
import asyncio |
|
|
|
|
|
class ImageResponse: |
|
def __init__(self, url: str, alt: str): |
|
self.url = url |
|
self.alt = alt |
|
|
|
def to_data_uri(image: Any) -> str: |
|
return "data:image/png;base64,..." |
|
|
|
class AsyncGeneratorProvider: |
|
pass |
|
|
|
class ProviderModelMixin: |
|
pass |
|
|
|
class Blackbox(AsyncGeneratorProvider, ProviderModelMixin): |
|
url = "https://www.blackbox.ai" |
|
api_endpoint = "https://www.blackbox.ai/api/chat" |
|
|
|
default_model = 'blackbox' |
|
models = [ |
|
'blackbox', |
|
'gemini-1.5-flash', |
|
"llama-3.1-8b", |
|
'llama-3.1-70b', |
|
'llama-3.1-405b', |
|
'ImageGenerationLV45LJp', |
|
'gpt-4o', |
|
'gemini-pro', |
|
'claude-sonnet-3.5', |
|
] |
|
|
|
@classmethod |
|
def get_model(cls, model: str) -> str: |
|
if model in cls.models: |
|
return model |
|
return cls.default_model |
|
|
|
@classmethod |
|
async def create_async_generator( |
|
cls, |
|
model: str, |
|
messages: List[Dict[str, str]], |
|
proxy: Optional[str] = None, |
|
image: Optional[Any] = None, |
|
image_name: Optional[str] = None, |
|
stream: bool = False, |
|
**kwargs |
|
) -> Union[Dict[str, Any], AsyncGenerator[str, None]]: |
|
model = cls.get_model(model) |
|
|
|
headers = { |
|
"Content-Type": "application/json", |
|
"User-Agent": "Mozilla/5.0" |
|
} |
|
|
|
async with ClientSession(headers=headers) as session: |
|
if image is not None: |
|
messages[-1]["data"] = { |
|
"fileText": image_name, |
|
"imageBase64": to_data_uri(image) |
|
} |
|
|
|
data = { |
|
"model": model, |
|
"messages": messages, |
|
"max_tokens": 1024, |
|
"temperature": 0.7, |
|
"stream": stream |
|
} |
|
|
|
async with session.post(cls.api_endpoint, json=data, proxy=proxy) as response: |
|
response.raise_for_status() |
|
if stream: |
|
async for chunk in response.content.iter_any(): |
|
yield chunk.decode() |
|
else: |
|
return await response.json() |
|
|
|
|
|
app = FastAPI() |
|
|
|
class Message(BaseModel): |
|
role: str |
|
content: str |
|
|
|
class ChatRequest(BaseModel): |
|
model: str |
|
messages: List[Message] |
|
stream: Optional[bool] = False |
|
|
|
@app.post("/v1/chat/completions") |
|
async def chat_completions(request: ChatRequest): |
|
messages = [{"role": msg.role, "content": msg.content} for msg in request.messages] |
|
|
|
async_generator = Blackbox.create_async_generator( |
|
model=request.model, |
|
messages=messages, |
|
stream=request.stream |
|
) |
|
|
|
if request.stream: |
|
async def event_stream(): |
|
async for chunk in async_generator: |
|
|
|
yield {"choices": [{"text": chunk.strip()}]} |
|
|
|
return app.streaming_response(event_stream(), media_type="application/json") |
|
|
|
|
|
response = await async_generator |
|
if "choices" in response and len(response["choices"]) > 0: |
|
clean_content = response["choices"][0]["message"]["content"] |
|
return {"choices": [{"text": clean_content.strip()}]} |
|
|
|
raise HTTPException(status_code=500, detail="No valid response received.") |
|
|