Spaces:
mxrkai
/
Runtime error

test24 / main.py
Niansuh's picture
Update main.py
36aebd6 verified
raw
history blame
17.4 kB
import os
import re
import random
import string
import uuid
import json
import logging
import asyncio
import time
from collections import defaultdict
from typing import List, Dict, Any, Optional, AsyncGenerator
from datetime import datetime
from aiohttp import ClientSession, ClientTimeout, ClientError
from fastapi import FastAPI, HTTPException, Request, Depends, Header, UploadFile, File
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# Load environment variables
API_KEYS = os.getenv('API_KEYS', '').split(',') # Comma-separated API keys
RATE_LIMIT = int(os.getenv('RATE_LIMIT', '60')) # Requests per minute
if not API_KEYS or API_KEYS == ['']:
logger.error("No API keys found. Please set the API_KEYS environment variable. | NiansuhAI")
raise Exception("API_KEYS environment variable not set. | NiansuhAI")
# Simple in-memory rate limiter
rate_limit_store = defaultdict(lambda: {"count": 0, "timestamp": time.time()})
async def get_api_key(authorization: str = Header(...)) -> str:
if not authorization.startswith('Bearer '):
logger.warning("Invalid authorization header format.")
raise HTTPException(status_code=401, detail='Invalid authorization header format | NiansuhAI')
api_key = authorization[7:]
if api_key not in API_KEYS:
logger.warning(f"Invalid API key attempted: {api_key}")
raise HTTPException(status_code=401, detail='Invalid API key | NiansuhAI')
return api_key
async def rate_limiter(api_key: str = Depends(get_api_key)):
current_time = time.time()
window_start = rate_limit_store[api_key]["timestamp"]
if current_time - window_start > 60:
rate_limit_store[api_key] = {"count": 1, "timestamp": current_time}
else:
if rate_limit_store[api_key]["count"] >= RATE_LIMIT:
logger.warning(f"Rate limit exceeded for API key: {api_key}")
raise HTTPException(status_code=429, detail='Rate limit exceeded | NiansuhAI')
rate_limit_store[api_key]["count"] += 1
# Custom exception for model not working
class ModelNotWorkingException(Exception):
def __init__(self, model: str):
self.model = model
self.message = f"The model '{model}' is currently not working. Please try another model or wait for it to be fixed."
super().__init__(self.message)
# Mock implementations for ImageResponse and to_data_uri
class ImageResponse:
def __init__(self, url: str, alt: str):
self.url = url
self.alt = alt
def to_data_uri(image: Any) -> str:
return "data:image/png;base64,..." # Replace with actual base64 data
class Blackbox:
url = "https://www.blackbox.ai"
api_endpoint = "https://www.blackbox.ai/api/chat"
working = True
supports_stream = True
supports_system_message = True
supports_message_history = True
default_model = 'blackboxai'
image_models = ['ImageGeneration']
models = [
default_model,
'blackboxai-pro',
"llama-3.1-8b",
'llama-3.1-70b',
'llama-3.1-405b',
'gpt-4o',
'gemini-pro',
'gemini-1.5-flash',
'claude-sonnet-3.5',
'PythonAgent',
'JavaAgent',
'JavaScriptAgent',
'HTMLAgent',
'GoogleCloudAgent',
'AndroidDeveloper',
'SwiftDeveloper',
'Next.jsAgent',
'MongoDBAgent',
'PyTorchAgent',
'ReactAgent',
'XcodeAgent',
'AngularJSAgent',
*image_models,
'Niansuh',
]
agentMode = {
'ImageGeneration': {'mode': True, 'id': "ImageGenerationLV45LJp", 'name': "Image Generation"},
'Niansuh': {'mode': True, 'id': "NiansuhAIk1HgESy", 'name': "Niansuh"},
}
trendingAgentMode = {
"blackboxai": {},
"gemini-1.5-flash": {'mode': True, 'id': 'Gemini'},
"llama-3.1-8b": {'mode': True, 'id': "llama-3.1-8b"},
'llama-3.1-70b': {'mode': True, 'id': "llama-3.1-70b"},
'llama-3.1-405b': {'mode': True, 'id': "llama-3.1-405b"},
'blackboxai-pro': {'mode': True, 'id': "BLACKBOXAI-PRO"},
'PythonAgent': {'mode': True, 'id': "Python Agent"},
'JavaAgent': {'mode': True, 'id': "Java Agent"},
'JavaScriptAgent': {'mode': True, 'id': "JavaScript Agent"},
'HTMLAgent': {'mode': True, 'id': "HTML Agent"},
'GoogleCloudAgent': {'mode': True, 'id': "Google Cloud Agent"},
'AndroidDeveloper': {'mode': True, 'id': "Android Developer"},
'SwiftDeveloper': {'mode': True, 'id': "Swift Developer"},
'Next.jsAgent': {'mode': True, 'id': "Next.js Agent"},
'MongoDBAgent': {'mode': True, 'id': "MongoDB Agent"},
'PyTorchAgent': {'mode': True, 'id': "PyTorch Agent"},
'ReactAgent': {'mode': True, 'id': "React Agent"},
'XcodeAgent': {'mode': True, 'id': "Xcode Agent"},
'AngularJSAgent': {'mode': True, 'id': "AngularJS Agent"},
}
userSelectedModel = {
"gpt-4o": "gpt-4o",
"gemini-pro": "gemini-pro",
'claude-sonnet-3.5': "claude-sonnet-3.5",
}
model_prefixes = {
'gpt-4o': '@GPT-4o',
'gemini-pro': '@Gemini-PRO',
'claude-sonnet-3.5': '@Claude-Sonnet-3.5',
'PythonAgent': '@Python Agent',
'JavaAgent': '@Java Agent',
'JavaScriptAgent': '@JavaScript Agent',
'HTMLAgent': '@HTML Agent',
'GoogleCloudAgent': '@Google Cloud Agent',
'AndroidDeveloper': '@Android Developer',
'SwiftDeveloper': '@Swift Developer',
'Next.jsAgent': '@Next.js Agent',
'MongoDBAgent': '@MongoDB Agent',
'PyTorchAgent': '@PyTorch Agent',
'ReactAgent': '@React Agent',
'XcodeAgent': '@Xcode Agent',
'AngularJSAgent': '@AngularJS Agent',
'blackboxai-pro': '@BLACKBOXAI-PRO',
'ImageGeneration': '@Image Generation',
'Niansuh': '@Niansuh',
}
model_referers = {
"blackboxai": f"{url}/?model=blackboxai",
"gpt-4o": f"{url}/?model=gpt-4o",
"gemini-pro": f"{url}/?model=gemini-pro",
"claude-sonnet-3.5": f"{url}/?model=claude-sonnet-3.5"
}
model_aliases = {
"gemini-flash": "gemini-1.5-flash",
"claude-3.5-sonnet": "claude-sonnet-3.5",
"flux": "ImageGeneration",
"niansuh": "Niansuh",
}
@classmethod
def get_model(cls, model: str) -> str:
if model in cls.models:
return model
elif model in cls.userSelectedModel:
return model
elif model in cls.model_aliases:
return cls.model_aliases[model]
else:
return cls.default_model
# (Rest of the Blackbox class remains unchanged)
# FastAPI app setup
app = FastAPI()
class Message(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
model: str
messages: List[Message]
stream: Optional[bool] = False
webSearchMode: Optional[bool] = False
def create_response(content: str, model: str, finish_reason: Optional[str] = None) -> Dict[str, Any]:
return {
"id": f"chatcmpl-{uuid.uuid4()}",
"object": "chat.completion.chunk",
"created": int(datetime.now().timestamp()),
"model": model,
"choices": [
{
"index": 0,
"delta": {"content": content, "role": "assistant"},
"finish_reason": finish_reason,
}
],
"usage": None,
}
@app.post("/niansuhai/v1/chat/completions", dependencies=[Depends(rate_limiter)])
async def chat_completions(request: ChatRequest, req: Request, api_key: str = Depends(get_api_key)):
# Redact user messages only for logging purposes
redacted_messages = [{"role": msg.role, "content": "[redacted]"} for msg in request.messages]
logger.info(f"Received chat completions request from API key: {api_key} | Model: {request.model} | Messages: {redacted_messages}")
try:
# Validate that the requested model is available
if request.model not in Blackbox.models and request.model not in Blackbox.model_aliases:
logger.warning(f"Attempt to use unavailable model: {request.model}")
raise HTTPException(status_code=400, detail="Requested model is not available. | NiansuhAI")
# Process the request with actual message content, but don't log it
async_generator = Blackbox.create_async_generator(
model=request.model,
messages=[{"role": msg.role, "content": msg.content} for msg in request.messages], # Actual message content used here
image=None,
image_name=None,
webSearchMode=request.webSearchMode
)
if request.stream:
async def generate():
try:
async for chunk in async_generator:
if isinstance(chunk, ImageResponse):
image_markdown = f"![image]({chunk.url})"
response_chunk = create_response(image_markdown, request.model)
else:
response_chunk = create_response(chunk, request.model)
yield f"data: {json.dumps(response_chunk)}\n\n"
yield "data: [DONE]\n\n"
except HTTPException as he:
error_response = {"error": he.detail}
yield f"data: {json.dumps(error_response)}\n\n"
except Exception as e:
logger.exception("Error during streaming response generation. | NiansuhAI")
error_response = {"error": str(e)}
yield f"data: {json.dumps(error_response)}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
else:
response_content = ""
async for chunk in async_generator:
if isinstance(chunk, ImageResponse):
response_content += f"![image]({chunk.url})\n"
else:
response_content += chunk
logger.info(f"Completed non-streaming response generation for API key: {api_key}")
return {
"id": f"chatcmpl-{uuid.uuid4()}",
"object": "chat.completion",
"created": int(datetime.now().timestamp()),
"model": request.model,
"choices": [
{
"message": {
"role": "assistant",
"content": response_content
},
"finish_reason": "stop",
"index": 0
}
],
"usage": {
"prompt_tokens": sum(len(msg.content.split()) for msg in request.messages),
"completion_tokens": len(response_content.split()),
"total_tokens": sum(len(msg.content.split()) for msg in request.messages) + len(response_content.split())
},
}
except ModelNotWorkingException as e:
logger.warning(f"Model not working: {e}")
raise HTTPException(status_code=503, detail=str(e))
except HTTPException as he:
logger.warning(f"HTTPException: {he.detail}")
raise he
except Exception as e:
logger.exception("An unexpected error occurred while processing the chat completions request. | NiansuhAI")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/niansuhai/v1/models", dependencies=[Depends(rate_limiter)])
async def get_models(api_key: str = Depends(get_api_key)):
logger.info(f"Fetching available models for API key: {api_key}")
return {"data": [{"id": model} for model in Blackbox.models]}
# Additional endpoints for better functionality
@app.get("/niansuhai/v1/health", dependencies=[Depends(rate_limiter)])
async def health_check(api_key: str = Depends(get_api_key)):
logger.info(f"Health check requested by API key: {api_key}")
return {"status": "ok"}
@app.get("/niansuhai/v1/models/{model}/status", dependencies=[Depends(rate_limiter)])
async def model_status(model: str, api_key: str = Depends(get_api_key)):
logger.info(f"Model status requested for '{model}' by API key: {api_key}")
if model in Blackbox.models:
return {"model": model, "status": "available | NiansuhAI"}
elif model in Blackbox.model_aliases:
actual_model = Blackbox.model_aliases[model]
return {"model": actual_model, "status": "available via alias | NiansuhAI"}
else:
logger.warning(f"Model not found: {model}")
raise HTTPException(status_code=404, detail="Model not found | NiansuhAI")
# New endpoint to get model details
@app.get("/niansuhai/v1/models/{model}/details", dependencies=[Depends(rate_limiter)])
async def get_model_details(model: str, api_key: str = Depends(get_api_key)):
logger.info(f"Model details requested for '{model}' by API key: {api_key}")
actual_model = Blackbox.get_model(model)
if actual_model not in Blackbox.models:
logger.warning(f"Model not found: {model}")
raise HTTPException(status_code=404, detail="Model not found | NiansuhAI")
# For demonstration, we'll return mock details
model_details = {
"id": actual_model,
"description": f"Details about model {actual_model}",
"capabilities": ["chat", "completion", "image generation"] if actual_model in Blackbox.image_models else ["chat", "completion"],
"status": "available",
}
return {"data": model_details}
# Session history endpoints
session_histories = defaultdict(list) # In-memory storage for session histories
@app.post("/niansuhai/v1/sessions/{session_id}/messages", dependencies=[Depends(rate_limiter)])
async def add_message_to_session(session_id: str, message: Message, api_key: str = Depends(get_api_key)):
logger.info(f"Adding message to session '{session_id}' by API key: {api_key}")
session_histories[session_id].append({"role": message.role, "content": message.content})
return {"status": "message added"}
@app.get("/niansuhai/v1/sessions/{session_id}/messages", dependencies=[Depends(rate_limiter)])
async def get_session_messages(session_id: str, api_key: str = Depends(get_api_key)):
logger.info(f"Fetching messages for session '{session_id}' by API key: {api_key}")
messages = session_histories.get(session_id)
if messages is None:
raise HTTPException(status_code=404, detail="Session not found | NiansuhAI")
return {"data": messages}
# User preferences endpoints
user_preferences = defaultdict(dict) # In-memory storage for user preferences
class UserPreferences(BaseModel):
theme: Optional[str] = "light"
notifications_enabled: Optional[bool] = True
@app.post("/niansuhai/v1/users/{user_id}/preferences", dependencies=[Depends(rate_limiter)])
async def update_user_preferences(user_id: str, preferences: UserPreferences, api_key: str = Depends(get_api_key)):
logger.info(f"Updating preferences for user '{user_id}' by API key: {api_key}")
user_preferences[user_id] = preferences.dict()
return {"status": "preferences updated"}
@app.get("/niansuhai/v1/users/{user_id}/preferences", dependencies=[Depends(rate_limiter)])
async def get_user_preferences(user_id: str, api_key: str = Depends(get_api_key)):
logger.info(f"Fetching preferences for user '{user_id}' by API key: {api_key}")
preferences = user_preferences.get(user_id)
if preferences is None:
raise HTTPException(status_code=404, detail="User not found | NiansuhAI")
return {"data": preferences}
# Image upload endpoint
@app.post("/niansuhai/v1/images/upload", dependencies=[Depends(rate_limiter)])
async def upload_image(image: UploadFile = File(...), api_key: str = Depends(get_api_key)):
logger.info(f"Image upload requested by API key: {api_key}")
if not image.content_type.startswith('image/'):
logger.warning("Uploaded file is not an image.")
raise HTTPException(status_code=400, detail="Uploaded file is not an image | NiansuhAI")
# For demonstration, we'll just return the filename
return {"filename": image.filename, "status": "image uploaded"}
# Component health check endpoint
@app.get("/niansuhai/v1/health/{component}", dependencies=[Depends(rate_limiter)])
async def component_health_check(component: str, api_key: str = Depends(get_api_key)):
logger.info(f"Health check for component '{component}' requested by API key: {api_key}")
# Mock health status for components
components_status = {
"database": "healthy",
"message_queue": "healthy",
"cache": "healthy",
}
status = components_status.get(component)
if status is None:
logger.warning(f"Component not found: {component}")
raise HTTPException(status_code=404, detail="Component not found | NiansuhAI")
return {"component": component, "status": status}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)