|
import os |
|
import re |
|
import random |
|
import string |
|
import uuid |
|
import json |
|
import logging |
|
import asyncio |
|
import time |
|
from collections import defaultdict |
|
from typing import List, Dict, Any, Optional, AsyncGenerator |
|
from datetime import datetime |
|
|
|
from aiohttp import ClientSession, ClientTimeout, ClientError |
|
from fastapi import FastAPI, HTTPException, Request, Depends, Header, UploadFile, File |
|
from fastapi.responses import StreamingResponse |
|
from pydantic import BaseModel |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", |
|
handlers=[logging.StreamHandler()] |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
API_KEYS = os.getenv('API_KEYS', '').split(',') |
|
RATE_LIMIT = int(os.getenv('RATE_LIMIT', '60')) |
|
|
|
if not API_KEYS or API_KEYS == ['']: |
|
logger.error("No API keys found. Please set the API_KEYS environment variable. | NiansuhAI") |
|
raise Exception("API_KEYS environment variable not set. | NiansuhAI") |
|
|
|
|
|
rate_limit_store = defaultdict(lambda: {"count": 0, "timestamp": time.time()}) |
|
|
|
async def get_api_key(authorization: str = Header(...)) -> str: |
|
if not authorization.startswith('Bearer '): |
|
logger.warning("Invalid authorization header format.") |
|
raise HTTPException(status_code=401, detail='Invalid authorization header format | NiansuhAI') |
|
api_key = authorization[7:] |
|
if api_key not in API_KEYS: |
|
logger.warning(f"Invalid API key attempted: {api_key}") |
|
raise HTTPException(status_code=401, detail='Invalid API key | NiansuhAI') |
|
return api_key |
|
|
|
async def rate_limiter(api_key: str = Depends(get_api_key)): |
|
current_time = time.time() |
|
window_start = rate_limit_store[api_key]["timestamp"] |
|
if current_time - window_start > 60: |
|
rate_limit_store[api_key] = {"count": 1, "timestamp": current_time} |
|
else: |
|
if rate_limit_store[api_key]["count"] >= RATE_LIMIT: |
|
logger.warning(f"Rate limit exceeded for API key: {api_key}") |
|
raise HTTPException(status_code=429, detail='Rate limit exceeded | NiansuhAI') |
|
rate_limit_store[api_key]["count"] += 1 |
|
|
|
|
|
class ModelNotWorkingException(Exception): |
|
def __init__(self, model: str): |
|
self.model = model |
|
self.message = f"The model '{model}' is currently not working. Please try another model or wait for it to be fixed." |
|
super().__init__(self.message) |
|
|
|
|
|
class ImageResponse: |
|
def __init__(self, url: str, alt: str): |
|
self.url = url |
|
self.alt = alt |
|
|
|
def to_data_uri(image: Any) -> str: |
|
return "data:image/png;base64,..." |
|
|
|
class Blackbox: |
|
url = "https://www.blackbox.ai" |
|
api_endpoint = "https://www.blackbox.ai/api/chat" |
|
working = True |
|
supports_stream = True |
|
supports_system_message = True |
|
supports_message_history = True |
|
|
|
default_model = 'blackboxai' |
|
image_models = ['ImageGeneration'] |
|
models = [ |
|
default_model, |
|
'blackboxai-pro', |
|
"llama-3.1-8b", |
|
'llama-3.1-70b', |
|
'llama-3.1-405b', |
|
'gpt-4o', |
|
'gemini-pro', |
|
'gemini-1.5-flash', |
|
'claude-sonnet-3.5', |
|
'PythonAgent', |
|
'JavaAgent', |
|
'JavaScriptAgent', |
|
'HTMLAgent', |
|
'GoogleCloudAgent', |
|
'AndroidDeveloper', |
|
'SwiftDeveloper', |
|
'Next.jsAgent', |
|
'MongoDBAgent', |
|
'PyTorchAgent', |
|
'ReactAgent', |
|
'XcodeAgent', |
|
'AngularJSAgent', |
|
*image_models, |
|
'Niansuh', |
|
] |
|
|
|
agentMode = { |
|
'ImageGeneration': {'mode': True, 'id': "ImageGenerationLV45LJp", 'name': "Image Generation"}, |
|
'Niansuh': {'mode': True, 'id': "NiansuhAIk1HgESy", 'name': "Niansuh"}, |
|
} |
|
trendingAgentMode = { |
|
"blackboxai": {}, |
|
"gemini-1.5-flash": {'mode': True, 'id': 'Gemini'}, |
|
"llama-3.1-8b": {'mode': True, 'id': "llama-3.1-8b"}, |
|
'llama-3.1-70b': {'mode': True, 'id': "llama-3.1-70b"}, |
|
'llama-3.1-405b': {'mode': True, 'id': "llama-3.1-405b"}, |
|
'blackboxai-pro': {'mode': True, 'id': "BLACKBOXAI-PRO"}, |
|
'PythonAgent': {'mode': True, 'id': "Python Agent"}, |
|
'JavaAgent': {'mode': True, 'id': "Java Agent"}, |
|
'JavaScriptAgent': {'mode': True, 'id': "JavaScript Agent"}, |
|
'HTMLAgent': {'mode': True, 'id': "HTML Agent"}, |
|
'GoogleCloudAgent': {'mode': True, 'id': "Google Cloud Agent"}, |
|
'AndroidDeveloper': {'mode': True, 'id': "Android Developer"}, |
|
'SwiftDeveloper': {'mode': True, 'id': "Swift Developer"}, |
|
'Next.jsAgent': {'mode': True, 'id': "Next.js Agent"}, |
|
'MongoDBAgent': {'mode': True, 'id': "MongoDB Agent"}, |
|
'PyTorchAgent': {'mode': True, 'id': "PyTorch Agent"}, |
|
'ReactAgent': {'mode': True, 'id': "React Agent"}, |
|
'XcodeAgent': {'mode': True, 'id': "Xcode Agent"}, |
|
'AngularJSAgent': {'mode': True, 'id': "AngularJS Agent"}, |
|
} |
|
|
|
userSelectedModel = { |
|
"gpt-4o": "gpt-4o", |
|
"gemini-pro": "gemini-pro", |
|
'claude-sonnet-3.5': "claude-sonnet-3.5", |
|
} |
|
|
|
model_prefixes = { |
|
'gpt-4o': '@GPT-4o', |
|
'gemini-pro': '@Gemini-PRO', |
|
'claude-sonnet-3.5': '@Claude-Sonnet-3.5', |
|
'PythonAgent': '@Python Agent', |
|
'JavaAgent': '@Java Agent', |
|
'JavaScriptAgent': '@JavaScript Agent', |
|
'HTMLAgent': '@HTML Agent', |
|
'GoogleCloudAgent': '@Google Cloud Agent', |
|
'AndroidDeveloper': '@Android Developer', |
|
'SwiftDeveloper': '@Swift Developer', |
|
'Next.jsAgent': '@Next.js Agent', |
|
'MongoDBAgent': '@MongoDB Agent', |
|
'PyTorchAgent': '@PyTorch Agent', |
|
'ReactAgent': '@React Agent', |
|
'XcodeAgent': '@Xcode Agent', |
|
'AngularJSAgent': '@AngularJS Agent', |
|
'blackboxai-pro': '@BLACKBOXAI-PRO', |
|
'ImageGeneration': '@Image Generation', |
|
'Niansuh': '@Niansuh', |
|
} |
|
|
|
model_referers = { |
|
"blackboxai": f"{url}/?model=blackboxai", |
|
"gpt-4o": f"{url}/?model=gpt-4o", |
|
"gemini-pro": f"{url}/?model=gemini-pro", |
|
"claude-sonnet-3.5": f"{url}/?model=claude-sonnet-3.5" |
|
} |
|
|
|
model_aliases = { |
|
"gemini-flash": "gemini-1.5-flash", |
|
"claude-3.5-sonnet": "claude-sonnet-3.5", |
|
"flux": "ImageGeneration", |
|
"niansuh": "Niansuh", |
|
} |
|
|
|
@classmethod |
|
def get_model(cls, model: str) -> str: |
|
if model in cls.models: |
|
return model |
|
elif model in cls.userSelectedModel: |
|
return model |
|
elif model in cls.model_aliases: |
|
return cls.model_aliases[model] |
|
else: |
|
return cls.default_model |
|
|
|
|
|
|
|
|
|
app = FastAPI() |
|
|
|
class Message(BaseModel): |
|
role: str |
|
content: str |
|
|
|
class ChatRequest(BaseModel): |
|
model: str |
|
messages: List[Message] |
|
stream: Optional[bool] = False |
|
webSearchMode: Optional[bool] = False |
|
|
|
def create_response(content: str, model: str, finish_reason: Optional[str] = None) -> Dict[str, Any]: |
|
return { |
|
"id": f"chatcmpl-{uuid.uuid4()}", |
|
"object": "chat.completion.chunk", |
|
"created": int(datetime.now().timestamp()), |
|
"model": model, |
|
"choices": [ |
|
{ |
|
"index": 0, |
|
"delta": {"content": content, "role": "assistant"}, |
|
"finish_reason": finish_reason, |
|
} |
|
], |
|
"usage": None, |
|
} |
|
|
|
@app.post("/niansuhai/v1/chat/completions", dependencies=[Depends(rate_limiter)]) |
|
async def chat_completions(request: ChatRequest, req: Request, api_key: str = Depends(get_api_key)): |
|
|
|
redacted_messages = [{"role": msg.role, "content": "[redacted]"} for msg in request.messages] |
|
|
|
logger.info(f"Received chat completions request from API key: {api_key} | Model: {request.model} | Messages: {redacted_messages}") |
|
|
|
try: |
|
|
|
if request.model not in Blackbox.models and request.model not in Blackbox.model_aliases: |
|
logger.warning(f"Attempt to use unavailable model: {request.model}") |
|
raise HTTPException(status_code=400, detail="Requested model is not available. | NiansuhAI") |
|
|
|
|
|
async_generator = Blackbox.create_async_generator( |
|
model=request.model, |
|
messages=[{"role": msg.role, "content": msg.content} for msg in request.messages], |
|
image=None, |
|
image_name=None, |
|
webSearchMode=request.webSearchMode |
|
) |
|
|
|
if request.stream: |
|
async def generate(): |
|
try: |
|
async for chunk in async_generator: |
|
if isinstance(chunk, ImageResponse): |
|
image_markdown = f"" |
|
response_chunk = create_response(image_markdown, request.model) |
|
else: |
|
response_chunk = create_response(chunk, request.model) |
|
|
|
yield f"data: {json.dumps(response_chunk)}\n\n" |
|
|
|
yield "data: [DONE]\n\n" |
|
except HTTPException as he: |
|
error_response = {"error": he.detail} |
|
yield f"data: {json.dumps(error_response)}\n\n" |
|
except Exception as e: |
|
logger.exception("Error during streaming response generation. | NiansuhAI") |
|
error_response = {"error": str(e)} |
|
yield f"data: {json.dumps(error_response)}\n\n" |
|
|
|
return StreamingResponse(generate(), media_type="text/event-stream") |
|
else: |
|
response_content = "" |
|
async for chunk in async_generator: |
|
if isinstance(chunk, ImageResponse): |
|
response_content += f"\n" |
|
else: |
|
response_content += chunk |
|
|
|
logger.info(f"Completed non-streaming response generation for API key: {api_key}") |
|
return { |
|
"id": f"chatcmpl-{uuid.uuid4()}", |
|
"object": "chat.completion", |
|
"created": int(datetime.now().timestamp()), |
|
"model": request.model, |
|
"choices": [ |
|
{ |
|
"message": { |
|
"role": "assistant", |
|
"content": response_content |
|
}, |
|
"finish_reason": "stop", |
|
"index": 0 |
|
} |
|
], |
|
"usage": { |
|
"prompt_tokens": sum(len(msg.content.split()) for msg in request.messages), |
|
"completion_tokens": len(response_content.split()), |
|
"total_tokens": sum(len(msg.content.split()) for msg in request.messages) + len(response_content.split()) |
|
}, |
|
} |
|
except ModelNotWorkingException as e: |
|
logger.warning(f"Model not working: {e}") |
|
raise HTTPException(status_code=503, detail=str(e)) |
|
except HTTPException as he: |
|
logger.warning(f"HTTPException: {he.detail}") |
|
raise he |
|
except Exception as e: |
|
logger.exception("An unexpected error occurred while processing the chat completions request. | NiansuhAI") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
@app.get("/niansuhai/v1/models", dependencies=[Depends(rate_limiter)]) |
|
async def get_models(api_key: str = Depends(get_api_key)): |
|
logger.info(f"Fetching available models for API key: {api_key}") |
|
return {"data": [{"id": model} for model in Blackbox.models]} |
|
|
|
|
|
|
|
@app.get("/niansuhai/v1/health", dependencies=[Depends(rate_limiter)]) |
|
async def health_check(api_key: str = Depends(get_api_key)): |
|
logger.info(f"Health check requested by API key: {api_key}") |
|
return {"status": "ok"} |
|
|
|
@app.get("/niansuhai/v1/models/{model}/status", dependencies=[Depends(rate_limiter)]) |
|
async def model_status(model: str, api_key: str = Depends(get_api_key)): |
|
logger.info(f"Model status requested for '{model}' by API key: {api_key}") |
|
if model in Blackbox.models: |
|
return {"model": model, "status": "available | NiansuhAI"} |
|
elif model in Blackbox.model_aliases: |
|
actual_model = Blackbox.model_aliases[model] |
|
return {"model": actual_model, "status": "available via alias | NiansuhAI"} |
|
else: |
|
logger.warning(f"Model not found: {model}") |
|
raise HTTPException(status_code=404, detail="Model not found | NiansuhAI") |
|
|
|
|
|
@app.get("/niansuhai/v1/models/{model}/details", dependencies=[Depends(rate_limiter)]) |
|
async def get_model_details(model: str, api_key: str = Depends(get_api_key)): |
|
logger.info(f"Model details requested for '{model}' by API key: {api_key}") |
|
actual_model = Blackbox.get_model(model) |
|
if actual_model not in Blackbox.models: |
|
logger.warning(f"Model not found: {model}") |
|
raise HTTPException(status_code=404, detail="Model not found | NiansuhAI") |
|
|
|
model_details = { |
|
"id": actual_model, |
|
"description": f"Details about model {actual_model}", |
|
"capabilities": ["chat", "completion", "image generation"] if actual_model in Blackbox.image_models else ["chat", "completion"], |
|
"status": "available", |
|
} |
|
return {"data": model_details} |
|
|
|
|
|
session_histories = defaultdict(list) |
|
|
|
@app.post("/niansuhai/v1/sessions/{session_id}/messages", dependencies=[Depends(rate_limiter)]) |
|
async def add_message_to_session(session_id: str, message: Message, api_key: str = Depends(get_api_key)): |
|
logger.info(f"Adding message to session '{session_id}' by API key: {api_key}") |
|
session_histories[session_id].append({"role": message.role, "content": message.content}) |
|
return {"status": "message added"} |
|
|
|
@app.get("/niansuhai/v1/sessions/{session_id}/messages", dependencies=[Depends(rate_limiter)]) |
|
async def get_session_messages(session_id: str, api_key: str = Depends(get_api_key)): |
|
logger.info(f"Fetching messages for session '{session_id}' by API key: {api_key}") |
|
messages = session_histories.get(session_id) |
|
if messages is None: |
|
raise HTTPException(status_code=404, detail="Session not found | NiansuhAI") |
|
return {"data": messages} |
|
|
|
|
|
user_preferences = defaultdict(dict) |
|
|
|
class UserPreferences(BaseModel): |
|
theme: Optional[str] = "light" |
|
notifications_enabled: Optional[bool] = True |
|
|
|
@app.post("/niansuhai/v1/users/{user_id}/preferences", dependencies=[Depends(rate_limiter)]) |
|
async def update_user_preferences(user_id: str, preferences: UserPreferences, api_key: str = Depends(get_api_key)): |
|
logger.info(f"Updating preferences for user '{user_id}' by API key: {api_key}") |
|
user_preferences[user_id] = preferences.dict() |
|
return {"status": "preferences updated"} |
|
|
|
@app.get("/niansuhai/v1/users/{user_id}/preferences", dependencies=[Depends(rate_limiter)]) |
|
async def get_user_preferences(user_id: str, api_key: str = Depends(get_api_key)): |
|
logger.info(f"Fetching preferences for user '{user_id}' by API key: {api_key}") |
|
preferences = user_preferences.get(user_id) |
|
if preferences is None: |
|
raise HTTPException(status_code=404, detail="User not found | NiansuhAI") |
|
return {"data": preferences} |
|
|
|
|
|
@app.post("/niansuhai/v1/images/upload", dependencies=[Depends(rate_limiter)]) |
|
async def upload_image(image: UploadFile = File(...), api_key: str = Depends(get_api_key)): |
|
logger.info(f"Image upload requested by API key: {api_key}") |
|
if not image.content_type.startswith('image/'): |
|
logger.warning("Uploaded file is not an image.") |
|
raise HTTPException(status_code=400, detail="Uploaded file is not an image | NiansuhAI") |
|
|
|
return {"filename": image.filename, "status": "image uploaded"} |
|
|
|
|
|
@app.get("/niansuhai/v1/health/{component}", dependencies=[Depends(rate_limiter)]) |
|
async def component_health_check(component: str, api_key: str = Depends(get_api_key)): |
|
logger.info(f"Health check for component '{component}' requested by API key: {api_key}") |
|
|
|
components_status = { |
|
"database": "healthy", |
|
"message_queue": "healthy", |
|
"cache": "healthy", |
|
} |
|
status = components_status.get(component) |
|
if status is None: |
|
logger.warning(f"Component not found: {component}") |
|
raise HTTPException(status_code=404, detail="Component not found | NiansuhAI") |
|
return {"component": component, "status": status} |
|
|
|
if __name__ == "__main__": |
|
import uvicorn |
|
uvicorn.run(app, host="0.0.0.0", port=8000) |
|
|