SAI2A / main.py
Niansuh's picture
Update main.py
77af3d8 verified
raw
history blame
11.7 kB
import json
import logging
import os
import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional
import httpx
import uvicorn
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
from starlette.middleware.cors import CORSMiddleware
from starlette.responses import StreamingResponse, Response
# ==============================
# Configuration and Setup
# ==============================
# Configure logging
logging.basicConfig(
level=logging.DEBUG, # Set to DEBUG for detailed logs
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Load environment variables from .env file
load_dotenv()
# Initialize FastAPI app
app = FastAPI()
# Constants and configurations
BASE_URL = "https://aichatonlineorg.erweima.ai/aichatonline"
APP_SECRET = os.getenv("APP_SECRET", "666")
ACCESS_TOKEN = os.getenv("SD_ACCESS_TOKEN", "")
headers = {
'accept': '*/*',
'accept-language': 'en-US,en;q=0.9',
'authorization': f'Bearer {ACCESS_TOKEN}',
'cache-control': 'no-cache',
'origin': 'chrome-extension://difoiogjjojoaoomphldepapgpbgkhkb',
'pragma': 'no-cache',
'priority': 'u=1, i',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'none',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
}
# Define allowed models
ALLOWED_MODELS = [
{"id": "claude-3.5-sonnet", "name": "claude-3.5-sonnet"},
{"id": "claude-3-opus", "name": "claude-3-opus"},
{"id": "gemini-1.5-pro", "name": "gemini-1.5-pro"},
{"id": "gpt-4o", "name": "gpt-4o"},
{"id": "o1-preview", "name": "o1-preview"},
{"id": "o1-mini", "name": "o1-mini"},
{"id": "gpt-4o-mini", "name": "gpt-4o-mini"},
]
# Configure CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allow all origins; restrict if necessary
allow_credentials=True,
allow_methods=["*"], # Allow all HTTP methods
allow_headers=["*"], # Allow all headers
)
# Security configuration
security = HTTPBearer()
# ==============================
# Pydantic Models
# ==============================
class Message(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
model: str
messages: List[Message]
stream: Optional[bool] = False
# ==============================
# Helper Functions
# ==============================
def simulate_data(content: str, model: str) -> Dict[str, Any]:
"""Simulate chunked response data."""
return {
"id": f"chatcmpl-{uuid.uuid4()}",
"object": "chat.completion.chunk",
"created": int(datetime.now().timestamp()),
"model": model,
"choices": [
{
"index": 0,
"delta": {"content": content, "role": "assistant"},
"finish_reason": None,
}
],
"usage": None,
}
def stop_data(content: str, model: str) -> Dict[str, Any]:
"""Simulate the final chunk indicating the end of the response."""
return {
"id": f"chatcmpl-{uuid.uuid4()}",
"object": "chat.completion.chunk",
"created": int(datetime.now().timestamp()),
"model": model,
"choices": [
{
"index": 0,
"delta": {"content": content, "role": "assistant"},
"finish_reason": "stop",
}
],
"usage": None,
}
def create_chat_completion_data(content: str, model: str, finish_reason: Optional[str] = None) -> Dict[str, Any]:
"""Create a structured chat completion data chunk."""
return {
"id": f"chatcmpl-{uuid.uuid4()}",
"object": "chat.completion.chunk",
"created": int(datetime.now().timestamp()),
"model": model,
"choices": [
{
"index": 0,
"delta": {"content": content, "role": "assistant"},
"finish_reason": finish_reason,
}
],
"usage": None,
}
def verify_app_secret(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str:
"""Verify the provided APP_SECRET."""
if credentials.credentials != APP_SECRET:
raise HTTPException(status_code=403, detail="Invalid APP_SECRET")
return credentials.credentials
def replace_escaped_newlines(input_string: str) -> str:
"""Replace escaped newline characters with actual newlines."""
return input_string.replace("\\n", "\n")
# ==============================
# API Endpoints
# ==============================
@app.options("/hf/v1/chat/completions")
async def chat_completions_options():
"""Handle CORS preflight requests."""
return Response(
status_code=200,
headers={
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type, Authorization",
},
)
@app.get("/hf/v1/models")
async def list_models():
"""List all allowed models."""
return {"object": "list", "data": ALLOWED_MODELS}
@app.post("/hf/v1/chat/completions")
async def chat_completions(
request: ChatRequest, app_secret: str = Depends(verify_app_secret)
):
"""Handle chat completion requests."""
logger.info(f"Received chat completion request for model: {request.model}")
# Validate model
allowed_model_ids = [model['id'] for model in ALLOWED_MODELS]
if request.model not in allowed_model_ids:
raise HTTPException(
status_code=400,
detail=f"Model {request.model} is not allowed. Allowed models are: {', '.join(allowed_model_ids)}",
)
# Generate a UUID
original_uuid = uuid.uuid4()
uuid_str = str(original_uuid).replace("-", "")
# Construct the payload to send to the external API
json_data = {
'prompt': "\n".join(
[
f"{'User' if msg.role == 'user' else 'Assistant'}: {msg.content}"
for msg in request.messages
]
),
'stream': True,
'app_name': 'ChitChat_Edge_Ext',
'app_version': '4.28.0',
'tz_name': 'Asia/Karachi',
'cid': 'C092SEMXM9BJ',
'model': request.model,
'search': False, # Disable search
'auto_search': False, # Disable auto_search
'filter_search_history': False,
'from': 'chat',
'group_id': 'default',
'chat_models': [],
'files': [],
'prompt_template': {
'key': '',
'attributes': {
'lang': 'original',
},
},
'tools': {
'auto': [
'search', # Re-add search to maintain API expectations
'text_to_image',
'data_analysis',
],
},
'extra_info': {
'origin_url': '',
'origin_title': '',
},
}
# Define the asynchronous generator for streaming responses
async def generate():
async with httpx.AsyncClient() as client:
try:
async with client.stream('POST', 'https://sider.ai/api/v3/completion/text', headers=headers, json=json_data, timeout=120.0) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line:
logger.debug(f"Raw line received: {line}") # Log raw line
if line and ("[DONE]" not in line):
try:
# Remove 'data: ' prefix if present
if line.startswith("data: "):
line_content = line[6:]
else:
line_content = line
# Log the content before parsing
logger.debug(f"Line content to parse: {line_content}")
# Check if the line is not empty after stripping
if not line_content.strip():
logger.warning("Received an empty line, skipping.")
continue
# Attempt to parse JSON
parsed_json = json.loads(line_content)
# Ensure 'data' key exists
if "data" not in parsed_json:
logger.error(f"'data' key not found in the response: {parsed_json}")
continue
content_data = parsed_json["data"]
# Extract text content if available
text_content = content_data.get("text", "")
# Yield the formatted data
yield f"data: {json.dumps(create_chat_completion_data(text_content, request.model))}\n\n"
except json.JSONDecodeError as e:
logger.error(f"JSON decode error: {e} | Line: {line_content}")
continue
else:
if line and "[DONE]" in line:
yield f"data: {json.dumps(create_chat_completion_data('', request.model, 'stop'))}\n\n"
yield "data: [DONE]\n\n"
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error occurred: {e}")
raise HTTPException(status_code=e.response.status_code, detail=str(e))
except httpx.RequestError as e:
logger.error(f"An error occurred while requesting: {e}")
raise HTTPException(status_code=500, detail=str(e))
if request.stream:
logger.info("Streaming response")
return StreamingResponse(generate(), media_type="text/event-stream")
else:
logger.info("Non-streaming response")
full_response = ""
async for chunk in generate():
if chunk.startswith("data: ") and not chunk[6:].startswith("[DONE]"):
try:
data = json.loads(chunk[6:])
if data["choices"][0]["delta"].get("content"):
full_response += data["choices"][0]["delta"]["content"]
except json.JSONDecodeError as e:
logger.error(f"JSON decode error in non-streaming response: {e}")
continue
return {
"id": f"chatcmpl-{uuid.uuid4()}",
"object": "chat.completion",
"created": int(datetime.now().timestamp()),
"model": request.model,
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": full_response},
"finish_reason": "stop",
}
],
"usage": None,
}
# ==============================
# Entry Point
# ==============================
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)