import base64 import json import os import secrets import string import time from typing import List, Optional, Union, Any, Literal import httpx from dotenv import load_dotenv from fastapi import FastAPI from fastapi.responses import JSONResponse, StreamingResponse from pydantic import BaseModel # --- Configuration --- load_dotenv() # Env variables for external services IMAGE_API_URL = os.environ.get("IMAGE_API_URL", "https://image.api.example.com") SNAPZION_UPLOAD_URL = "https://upload.snapzion.com/api/public-upload" SNAPZION_API_KEY = os.environ.get("SNAP", "") # --- Dummy Model Definitions --- # In a real application, these would be defined properly. AVAILABLE_MODELS = [ {"id": "gpt-4-turbo", "object": "model", "created": int(time.time()), "owned_by": "system"}, {"id": "gpt-4o", "object": "model", "created": int(time.time()), "owned_by": "system"}, {"id": "gpt-3.5-turbo", "object": "model", "created": int(time.time()), "owned_by": "system"}, {"id": "dall-e-3", "object": "model", "created": int(time.time()), "owned_by": "system"}, {"id": "text-moderation-stable", "object": "model", "created": int(time.time()), "owned_by": "system"}, ] MODEL_ALIASES = {} # --- FastAPI Application --- app = FastAPI( title="OpenAI Compatible API", description="An adapter for various services to be compatible with the OpenAI API specification.", version="1.0.0" ) # --- Helper Function for Random ID Generation --- def generate_random_id(prefix: str, length: int = 29) -> str: """ Generates a cryptographically secure, random alphanumeric ID. """ population = string.ascii_letters + string.digits random_part = "".join(secrets.choice(population) for _ in range(length)) return f"{prefix}{random_part}" # === API Endpoints === @app.get("/v1/models") async def list_models(): """Lists the available models.""" return {"object": "list", "data": AVAILABLE_MODELS} # === Chat Completion === class FunctionCall(BaseModel): name: str arguments: str class ToolCall(BaseModel): id: str type: Literal["function"] = "function" function: FunctionCall class Message(BaseModel): role: str content: Optional[str] = None tool_calls: Optional[List[ToolCall]] = None name: Optional[str] = None class ChatRequest(BaseModel): messages: List[Message] model: str stream: Optional[bool] = False tools: Optional[Any] = None @app.post("/v1/chat/completions") async def chat_completion(request: ChatRequest): """ Handles chat completion requests, supporting both streaming and non-streaming responses. """ model_id = MODEL_ALIASES.get(request.model, request.model) chat_id = generate_random_id("chatcmpl-") headers = { 'accept': 'text/event-stream', 'content-type': 'application/json', 'origin': 'https://www.chatwithmono.xyz', 'referer': 'https://www.chatwithmono.xyz/', 'user-agent': 'Mozilla/5.0', } if request.tools: tool_prompt = """You have access to tools. To call a tool, respond with JSON within XML tags. Format: {"name":,"parameters":{...}}""" if request.messages and request.messages[0].role == "system": request.messages[0].content += "\n\n" + tool_prompt else: request.messages.insert(0, Message(role="system", content=tool_prompt)) request_data = request.model_dump(exclude_unset=True) payload = { "messages": request_data["messages"], "model": model_id } if request.stream: async def event_stream(): created = int(time.time()) is_first_chunk = True usage_info = None tool_call_buffer = "" in_tool_call = False try: async with httpx.AsyncClient(timeout=120) as client: async with client.stream("POST", "https://www.chatwithmono.xyz/api/chat", headers=headers, json=payload) as response: response.raise_for_status() async for line in response.aiter_lines(): if not line: continue if line.startswith("0:"): try: content_piece = json.loads(line[2:]) # Check for tool call tags if not in_tool_call and "" in content_piece: in_tool_call = True tool_call_buffer = "" if in_tool_call: tool_call_buffer += content_piece if "" in tool_call_buffer: # Process complete tool call try: # Extract tool call content start_idx = tool_call_buffer.find("") + len("") end_idx = tool_call_buffer.find("") tool_call_str = tool_call_buffer[start_idx:end_idx].strip() tool_call_json = json.loads(tool_call_str) delta = { "content": None, "tool_calls": [{ "index": 0, "id": generate_random_id("call_"), "type": "function", "function": { "name": tool_call_json["name"], "arguments": json.dumps(tool_call_json["parameters"]) } }] } chunk_data = { "id": chat_id, "object": "chat.completion.chunk", "created": created, "model": model_id, "choices": [{"index": 0, "delta": delta, "finish_reason": None}], "usage": None } yield f"data: {json.dumps(chunk_data)}\n\n" in_tool_call = False tool_call_buffer = "" except (json.JSONDecodeError, KeyError): # Fallback to regular content if parsing fails in_tool_call = False tool_call_buffer = "" else: # Still building tool call - skip sending this chunk continue else: # Regular content delta = {"content": content_piece} if is_first_chunk: delta["role"] = "assistant" is_first_chunk = False chunk_data = { "id": chat_id, "object": "chat.completion.chunk", "created": created, "model": model_id, "choices": [{"index": 0, "delta": delta, "finish_reason": None}], "usage": None } yield f"data: {json.dumps(chunk_data)}\n\n" except json.JSONDecodeError: continue elif line.startswith(("e:", "d:")): try: usage_info = json.loads(line[2:]).get("usage") except (json.JSONDecodeError, AttributeError): pass break # Final chunk done_chunk = { "id": chat_id, "object": "chat.completion.chunk", "created": created, "model": model_id, "choices": [{ "index": 0, "delta": {}, "finish_reason": "stop" }], "usage": usage_info } yield f"data: {json.dumps(done_chunk)}\n\n" yield "data: [DONE]\n\n" except httpx.HTTPStatusError as e: error_content = { "error": { "message": f"Upstream API error: {e.response.status_code}", "type": "upstream_error", "code": str(e.response.status_code) } } yield f"data: {json.dumps(error_content)}\n\n" yield "data: [DONE]\n\n" return StreamingResponse(event_stream(), media_type="text/event-stream") else: # Non-streaming try: async with httpx.AsyncClient(timeout=120) as client: response = await client.post( "https://www.chatwithmono.xyz/api/chat", headers=headers, json=payload ) response.raise_for_status() assistant_response = "" usage_info = {} for line in response.text.splitlines(): if line.startswith("0:"): try: assistant_response += json.loads(line[2:]) except json.JSONDecodeError: continue elif line.startswith(("e:", "d:")): try: usage_info = json.loads(line[2:]).get("usage", {}) except json.JSONDecodeError: continue tool_calls = None if "" in assistant_response and "" in assistant_response: try: # Extract tool call content start_idx = assistant_response.find("") + len("") end_idx = assistant_response.find("") tool_call_str = assistant_response[start_idx:end_idx].strip() tool_call_json = json.loads(tool_call_str) tool_calls = [{ "id": generate_random_id("call_"), "type": "function", "function": { "name": tool_call_json["name"], "arguments": json.dumps(tool_call_json["parameters"]) } }] # Clear content for tool call response assistant_response = None except (json.JSONDecodeError, KeyError): # If parsing fails, treat as regular content tool_calls = None return JSONResponse(content={ "id": chat_id, "object": "chat.completion", "created": int(time.time()), "model": model_id, "choices": [{ "index": 0, "message": { "role": "assistant", "content": assistant_response, "tool_calls": tool_calls }, "finish_reason": "stop" }], "usage": { "prompt_tokens": usage_info.get("promptTokens", 0), "completion_tokens": usage_info.get("completionTokens", 0), "total_tokens": usage_info.get("promptTokens", 0) + usage_info.get("completionTokens", 0), } }) except httpx.HTTPStatusError as e: return JSONResponse( status_code=e.response.status_code, content={ "error": { "message": f"Upstream API error: {e.response.status_code}", "type": "upstream_error", "code": str(e.response.status_code) } } ) # === Image Generation === class ImageGenerationRequest(BaseModel): prompt: str aspect_ratio: Optional[str] = "1:1" n: Optional[int] = 1 user: Optional[str] = None model: Optional[str] = "default" @app.post("/v1/images/generations") async def generate_images(request: ImageGenerationRequest): """Handles image generation requests.""" results = [] try: async with httpx.AsyncClient(timeout=120) as client: for _ in range(request.n): model = request.model or "default" if model in ["gpt-image-1", "dall-e-3", "dall-e-2", "nextlm-image-1"]: headers = {'Content-Type': 'application/json', 'User-Agent': 'Mozilla/5.0', 'Referer': 'https://www.chatwithmono.xyz/'} payload = {"prompt": request.prompt, "model": model} resp = await client.post("https://www.chatwithmono.xyz/api/image", headers=headers, json=payload) resp.raise_for_status() data = resp.json() b64_image = data.get("image") if not b64_image: return JSONResponse(status_code=502, content={"error": "Missing base64 image in response"}) if SNAPZION_API_KEY: upload_headers = {"Authorization": SNAPZION_API_KEY} upload_files = {'file': ('image.png', base64.b64decode(b64_image), 'image/png')} upload_resp = await client.post(SNAPZION_UPLOAD_URL, headers=upload_headers, files=upload_files) upload_resp.raise_for_status() upload_data = upload_resp.json() image_url = upload_data.get("url") else: image_url = f"data:image/png;base64,{b64_image}" results.append({"url": image_url, "b64_json": b64_image, "revised_prompt": data.get("revised_prompt")}) else: params = {"prompt": request.prompt, "aspect_ratio": request.aspect_ratio, "link": "typegpt.net"} resp = await client.get(IMAGE_API_URL, params=params) resp.raise_for_status() data = resp.json() results.append({"url": data.get("image_link"), "b64_json": data.get("base64_output")}) except httpx.HTTPStatusError as e: return JSONResponse(status_code=502, content={"error": f"Image generation failed. Upstream error: {e.response.status_code}", "details": e.response.text}) except Exception as e: return JSONResponse(status_code=500, content={"error": "An internal error occurred.", "details": str(e)}) return {"created": int(time.time()), "data": results} # === Moderation Endpoint === class ModerationRequest(BaseModel): input: Union[str, List[str]] model: Optional[str] = "text-moderation-stable" @app.post("/v1/moderations") async def create_moderation(request: ModerationRequest): """ Handles moderation requests, conforming to the OpenAI API specification. Includes a custom 'reason' field in the result if provided by the upstream API. """ input_texts = [request.input] if isinstance(request.input, str) else request.input if not input_texts: return JSONResponse(status_code=400, content={"error": {"message": "Request must have at least one input string.", "type": "invalid_request_error"}}) moderation_url = "https://www.chatwithmono.xyz/api/moderation" headers = { 'Content-Type': 'application/json', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36', 'Referer': 'https://www.chatwithmono.xyz/', } results = [] try: async with httpx.AsyncClient(timeout=30) as client: for text_input in input_texts: payload = {"text": text_input} resp = await client.post(moderation_url, headers=headers, json=payload) resp.raise_for_status() upstream_data = resp.json() # --- Transform upstream response to OpenAI format --- upstream_categories = upstream_data.get("categories", {}) openai_categories = { "hate": upstream_categories.get("hate", False), "hate/threatening": False, "harassment": False, "harassment/threatening": False, "self-harm": upstream_categories.get("self-harm", False), "self-harm/intent": False, "self-harm/instructions": False, "sexual": upstream_categories.get("sexual", False), "sexual/minors": False, "violence": upstream_categories.get("violence", False), "violence/graphic": False, } category_scores = {k: 1.0 if v else 0.0 for k, v in openai_categories.items()} flagged = upstream_data.get("overall_sentiment") == "flagged" result_item = { "flagged": flagged, "categories": openai_categories, "category_scores": category_scores, } # --- NEW: Conditionally add the 'reason' field --- # This is a custom extension to the OpenAI spec to provide more detail. reason = upstream_data.get("reason") if reason: result_item["reason"] = reason results.append(result_item) except httpx.HTTPStatusError as e: return JSONResponse( status_code=502, # Bad Gateway content={"error": {"message": f"Moderation failed. Upstream error: {e.response.status_code}", "type": "upstream_error", "details": e.response.text}} ) except Exception as e: return JSONResponse(status_code=500, content={"error": {"message": "An internal error occurred during moderation.", "type": "internal_error", "details": str(e)}}) # Build the final OpenAI-compatible response final_response = { "id": generate_random_id("modr-"), "model": request.model, "results": results, } return JSONResponse(content=final_response) # --- Main Execution --- if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)