SAI2A / main.py
Niansuh's picture
Update main.py
a932354 verified
raw
history blame
10.8 kB
import json
import logging
import os
import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional
import httpx
import uvicorn
from fastapi import FastAPI, HTTPException, Depends, Request, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
from starlette.middleware.cors import CORSMiddleware
from starlette.responses import StreamingResponse, Response
from dotenv import load_dotenv
# Retry Mechanism Libraries
from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type
# Initialize Logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Load Environment Variables for Sensitive Information
load_dotenv()
app = FastAPI()
# Configuration Constants (Hardcoded as per requirements)
APP_NAME = "ChitChat_Chrome_Ext"
APP_VERSION = "4.28.0"
ORIGIN_URL = "chrome-extension://difoiogjjojoaoomphldepapgpbgkhkb/standalone.html?from=sidebar"
ORIGIN_TITLE = "Sider"
TZ_NAME = "Asia/Karachi"
PROMPT_TEMPLATE_KEY = "artifacts"
PROMPT_TEMPLATE_LANG = "original"
TOOLS_AUTO = ["search", "text_to_image", "data_analysis"]
# Security Configuration
APP_SECRET = os.getenv("APP_SECRET", "666")
ACCESS_TOKEN = os.getenv("SD_ACCESS_TOKEN", "")
if not ACCESS_TOKEN:
logger.error("SD_ACCESS_TOKEN is not set in the environment variables.")
raise RuntimeError("SD_ACCESS_TOKEN is required but not set.")
# Outgoing Request Headers (As per provided sample)
OUTGOING_HEADERS = {
'accept': '*/*',
'accept-encoding': 'gzip, deflate, br, zstd',
'accept-language': 'en-US,en;q=0.9',
'authorization': f'Bearer {ACCESS_TOKEN}',
'content-type': 'application/json',
'origin': 'chrome-extension://difoiogjjojoaoomphldepapgpbgkhkb',
'priority': 'u=1, i',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'none',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/130.0.0.0 Safari/537.36',
}
# Updated ALLOWED_MODELS List (No Duplicates)
ALLOWED_MODELS = [
{"id": "claude-3.5-sonnet", "name": "Claude 3.5 Sonnet"},
{"id": "sider", "name": "Sider"},
{"id": "gpt-4o-mini", "name": "GPT-4o Mini"},
{"id": "claude-3-haiku", "name": "Claude 3 Haiku"},
{"id": "claude-3.5-haiku", "name": "Claude 3.5 Haiku"},
{"id": "gemini-1.5-flash", "name": "Gemini 1.5 Flash"},
{"id": "llama-3", "name": "Llama 3"},
{"id": "gpt-4o", "name": "GPT-4o"},
{"id": "gemini-1.5-pro", "name": "Gemini 1.5 Pro"},
{"id": "llama-3.1-405b", "name": "Llama 3.1 405b"},
]
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # ⚠️ IMPORTANT: Restrict this to specific origins in production for security
allow_credentials=True,
allow_methods=["*"], # Allow all HTTP methods
allow_headers=["*"], # Allow all headers
)
# Security Dependency
security = HTTPBearer()
# Pydantic Models
class Message(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
model: str
messages: List[Message]
stream: Optional[bool] = False
# Utility Functions
def create_chat_completion_data(content: str, model: str, finish_reason: Optional[str] = None) -> Dict[str, Any]:
return {
"id": f"chatcmpl-{uuid.uuid4()}",
"object": "chat.completion.chunk",
"created": int(datetime.now().timestamp()),
"model": model,
"choices": [
{
"index": 0,
"delta": {"content": content, "role": "assistant"},
"finish_reason": finish_reason,
}
],
"usage": None,
}
def verify_app_secret(credentials: HTTPAuthorizationCredentials = Depends(security)):
if credentials.credentials != APP_SECRET:
logger.warning(f"Invalid APP_SECRET provided: {credentials.credentials}")
raise HTTPException(status_code=403, detail="Invalid APP_SECRET")
logger.info("APP_SECRET verified successfully.")
return credentials.credentials
# Retry Configuration using Tenacity
def is_retryable_exception(exception):
return isinstance(exception, httpx.HTTPStatusError) and exception.response.status_code == 429
@retry(
retry=retry_if_exception_type(httpx.HTTPStatusError) & is_retryable_exception,
wait=wait_exponential(multiplier=1, min=2, max=10),
stop=stop_after_attempt(5),
reraise=True
)
async def send_request_with_retry(json_data: Dict[str, Any]) -> httpx.Response:
async with httpx.AsyncClient() as client:
response = await client.post(
'https://sider.ai/api/v3/completion/text', # Updated endpoint
headers=OUTGOING_HEADERS,
json=json_data,
timeout=120.0
)
response.raise_for_status()
return response
# CORS Preflight Options Endpoint
@app.options("/hf/v1/chat/completions")
async def chat_completions_options():
return Response(
status_code=200,
headers={
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type, Authorization",
},
)
# List Available Models
@app.get("/hf/v1/models")
async def list_models():
return {"object": "list", "data": ALLOWED_MODELS}
# Chat Completions Endpoint
@app.post("/hf/v1/chat/completions")
async def chat_completions(
request: ChatRequest, app_secret: str = Depends(verify_app_secret), req: Request = None
):
client_ip = req.client.host if req else "unknown"
logger.info(f"Received chat completion request from {client_ip} for model: {request.model}")
# Validate Selected Model
if request.model not in [model['id'] for model in ALLOWED_MODELS]:
allowed = ', '.join(model['id'] for model in ALLOWED_MODELS)
logger.error(f"Model '{request.model}' is not allowed.")
raise HTTPException(
status_code=400,
detail=f"Model '{request.model}' is not allowed. Allowed models are: {allowed}",
)
logger.info(f"Using model: {request.model}")
# Generate a unique CID for each request
cid = str(uuid.uuid4()).replace("-", "").upper()[:12] # Example: C0MES13070J1
logger.debug(f"Generated CID: {cid}")
# Prepare JSON Payload for External API
if not request.messages:
prompt_text = "make a dog"
else:
prompt_text = "\n".join(
[
f"{'User' if msg.role == 'user' else 'Assistant'}: {msg.content}"
for msg in request.messages
]
)
json_data = {
'prompt': prompt_text,
'stream': request.stream,
'app_name': APP_NAME,
'app_version': APP_VERSION,
'tz_name': TZ_NAME,
'cid': cid,
'model': request.model,
'search': False,
'auto_search': False,
'filter_search_history': False,
'from': 'chat',
'group_id': 'default',
'chat_models': [], # As per the sample payload
'files': [],
'prompt_template': {
'key': PROMPT_TEMPLATE_KEY,
'attributes': {
'lang': PROMPT_TEMPLATE_LANG,
},
},
'tools': {
'auto': TOOLS_AUTO,
},
'extra_info': {
'origin_url': ORIGIN_URL,
'origin_title': ORIGIN_TITLE,
},
}
logger.debug(f"JSON Data Sent to External API: {json.dumps(json_data, indent=2)}")
try:
response = await send_request_with_retry(json_data)
except httpx.HTTPStatusError as e:
status_code = e.response.status_code
if status_code == 429:
retry_after = e.response.headers.get("Retry-After", "60")
logger.warning(f"Rate limited by Sider AI. Retry after {retry_after} seconds.")
raise HTTPException(
status_code=429,
detail=f"Rate limited by external service. Please retry after {retry_after} seconds."
)
else:
logger.error(f"HTTP error occurred: {e} - Response: {e.response.text}")
raise HTTPException(status_code=status_code, detail=str(e))
except httpx.RequestError as e:
logger.error(f"An error occurred while requesting: {e}")
raise HTTPException(status_code=500, detail=str(e))
async def generate():
async for line in response.aiter_lines():
if line and ("[DONE]" not in line):
# Assuming the line starts with 'data: ' followed by JSON
if line.startswith("data: "):
json_line = line[6:]
if json_line.startswith("{"):
try:
data = json.loads(json_line)
content = data.get("data", {}).get("text", "")
logger.debug(f"Received content: {content}")
yield f"data: {json.dumps(create_chat_completion_data(content, request.model))}\n\n"
except json.JSONDecodeError as e:
logger.error(f"JSON decode error: {e} - Line: {json_line}")
# Send the stop signal
yield f"data: {json.dumps(create_chat_completion_data('', request.model, 'stop'))}\n\n"
yield "data: [DONE]\n\n"
if request.stream:
logger.info("Streaming response initiated.")
return StreamingResponse(generate(), media_type="text/event-stream")
else:
logger.info("Non-streaming response initiated.")
full_response = ""
async for chunk in generate():
if chunk.startswith("data: ") and not chunk[6:].startswith("[DONE]"):
# Parse the JSON part after 'data: '
try:
data = json.loads(chunk[6:])
if data["choices"][0]["delta"].get("content"):
full_response += data["choices"][0]["delta"]["content"]
except json.JSONDecodeError:
logger.warning(f"Failed to decode JSON from chunk: {chunk}")
# Final Response Structure
return {
"id": f"chatcmpl-{uuid.uuid4()}",
"object": "chat.completion",
"created": int(datetime.now().timestamp()),
"model": request.model,
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": full_response},
"finish_reason": "stop",
}
],
"usage": None,
}
# Entry Point
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)