Spaces:
mxrkai
/
Runtime error

test24 / main.py
Niansuh's picture
Update main.py
0d812a5 verified
raw
history blame
3.76 kB
from __future__ import annotations
import re
import random
import string
from aiohttp import ClientSession
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Dict, Any, Optional, Union
import asyncio
# Mock implementations for ImageResponse and to_data_uri
class ImageResponse:
def __init__(self, url: str, alt: str):
self.url = url
self.alt = alt
def to_data_uri(image: Any) -> str:
return "data:image/png;base64,..." # Replace with actual base64 data
class AsyncGeneratorProvider:
pass
class ProviderModelMixin:
pass
class Blackbox(AsyncGeneratorProvider, ProviderModelMixin):
url = "https://www.blackbox.ai"
api_endpoint = "https://www.blackbox.ai/api/chat"
default_model = 'blackbox'
models = [
'blackbox',
'gemini-1.5-flash',
"llama-3.1-8b",
'llama-3.1-70b',
'llama-3.1-405b',
'ImageGenerationLV45LJp',
'gpt-4o',
'gemini-pro',
'claude-sonnet-3.5',
]
@classmethod
def get_model(cls, model: str) -> str:
if model in cls.models:
return model
return cls.default_model
@classmethod
async def create_async_generator(
cls,
model: str,
messages: List[Dict[str, str]],
proxy: Optional[str] = None,
image: Optional[Any] = None,
image_name: Optional[str] = None,
stream: bool = False,
**kwargs
) -> Union[Dict[str, Any], AsyncGenerator[str, None]]:
model = cls.get_model(model)
headers = {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0"
}
async with ClientSession(headers=headers) as session:
if image is not None:
messages[-1]["data"] = {
"fileText": image_name,
"imageBase64": to_data_uri(image)
}
data = {
"model": model,
"messages": messages,
"max_tokens": 1024,
"temperature": 0.7,
"stream": stream
}
async with session.post(cls.api_endpoint, json=data, proxy=proxy) as response:
response.raise_for_status()
if stream:
async for chunk in response.content.iter_any():
yield chunk.decode()
else:
return await response.json()
# FastAPI app setup
app = FastAPI()
class Message(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
model: str
messages: List[Message]
stream: Optional[bool] = False # Add stream option
@app.post("/v1/chat/completions")
async def chat_completions(request: ChatRequest):
messages = [{"role": msg.role, "content": msg.content} for msg in request.messages]
async_generator = Blackbox.create_async_generator(
model=request.model,
messages=messages,
stream=request.stream # Pass the stream flag
)
if request.stream:
async def event_stream():
async for chunk in async_generator:
# If chunk is a string, it should be processed as needed
yield {"choices": [{"text": chunk.strip()}]}
return app.streaming_response(event_stream(), media_type="application/json")
# Handle non-streaming response
response = await async_generator
if "choices" in response and len(response["choices"]) > 0:
clean_content = response["choices"][0]["message"]["content"]
return {"choices": [{"text": clean_content.strip()}]}
raise HTTPException(status_code=500, detail="No valid response received.")