import base64 import json import os import secrets import string import time from typing import List, Optional, Union, Any import httpx from dotenv import load_dotenv from fastapi import FastAPI from fastapi.responses import JSONResponse, StreamingResponse from pydantic import BaseModel # --- Configuration --- load_dotenv() # Env variables for external services IMAGE_API_URL = os.environ.get("IMAGE_API_URL", "https://image.api.example.com") SNAPZION_UPLOAD_URL = "https://upload.snapzion.com/api/public-upload" SNAPZION_API_KEY = os.environ.get("SNAP", "") # --- Dummy Model Definitions --- # In a real application, these would be defined properly. AVAILABLE_MODELS = [ {"id": "gpt-4-turbo", "object": "model", "created": int(time.time()), "owned_by": "system"}, {"id": "gpt-4o", "object": "model", "created": int(time.time()), "owned_by": "system"}, {"id": "gpt-3.5-turbo", "object": "model", "created": int(time.time()), "owned_by": "system"}, {"id": "dall-e-3", "object": "model", "created": int(time.time()), "owned_by": "system"}, {"id": "text-moderation-stable", "object": "model", "created": int(time.time()), "owned_by": "system"}, ] MODEL_ALIASES = {} # --- FastAPI Application --- app = FastAPI( title="OpenAI Compatible API", description="An adapter for various services to be compatible with the OpenAI API specification.", version="1.0.0" ) # --- Helper Function for Random ID Generation --- def generate_random_id(prefix: str, length: int = 29) -> str: """ Generates a cryptographically secure, random alphanumeric ID. """ population = string.ascii_letters + string.digits random_part = "".join(secrets.choice(population) for _ in range(length)) return f"{prefix}{random_part}" # === API Endpoints === @app.get("/v1/models") async def list_models(): """Lists the available models.""" return {"object": "list", "data": AVAILABLE_MODELS} # === Chat Completion === class Message(BaseModel): role: str content: str class ChatRequest(BaseModel): messages: List[Message] model: str stream: Optional[bool] = False tools: Optional[Any] = None @app.post("/v1/chat/completions") async def chat_completion(request: ChatRequest): """ Handles chat completion requests, supporting both streaming and non-streaming responses. """ model_id = MODEL_ALIASES.get(request.model, request.model) chat_id = generate_random_id("chatcmpl-") headers = { 'accept': 'text/event-stream', 'content-type': 'application/json', 'origin': 'https://www.chatwithmono.xyz', 'referer': 'https://www.chatwithmono.xyz/', 'user-agent': 'Mozilla/5.0', } if request.tools: # Handle tool by giving in system prompt. # Tool call must be encoded in XML tag. tool_prompt = f"""You have access to the following tools . To call a tool, please respond with JSON for a tool call within XML tag. Respond in the format {{"name": tool name, "parameters": dictionary of argument name and its value}}. Do not use variables. Tools: {";".join(f"{tool}" for tool in request.tools)} Response Format for tool call: For each function call, return a json object with function name and arguments within XML tags: {{"name": , "arguments": }} Example of tool calling: {{"name": "get_weather", "parameters": {{"city": "New York"}}}} Using tools is recommended. """ if request.messages[0].role == "system": request.messages[0].content += "\n\n" + tool_prompt else: request.messages.insert(0, {"role": "system", "content": tool_prompt}) request_data = request.model_dump(exclude_unset=True) payload = { "messages": request_data["messages"], "model": model_id } if request.stream: async def event_stream(): created = int(time.time()) is_first_chunk = True usage_info = None is_tool_call = False chunks_buffer = [] max_initial_chunks = 4 # Number of initial chunks to buffer try: async with httpx.AsyncClient(timeout=120) as client: async with client.stream("POST", "https://www.chatwithmono.xyz/api/chat", headers=headers, json=payload) as response: response.raise_for_status() async for line in response.aiter_lines(): if not line: continue if line.startswith("0:"): try: content_piece = json.loads(line[2:]) print(content_piece) # Buffer the first few chunks if len(chunks_buffer) < max_initial_chunks: chunks_buffer.append(content_piece) continue # Process the buffered chunks if we haven't already if chunks_buffer and not is_tool_call: full_buffer = ''.join(chunks_buffer) if "" in full_buffer: print("Tool call detected") is_tool_call = True # Process the current chunk if is_tool_call: chunks_buffer.append(content_piece) full_buffer = ''.join(chunks_buffer) if "" in full_buffer: print("Tool call End detected") # Process tool call in the current chunk tool_call_str = full_buffer.split("")[1].split("")[0] tool_call_json = json.loads(tool_call_str.strip()) delta = { "content": None, "tool_calls": [{ "index": 0, "id": generate_random_id("call_"), "type": "function", "function": { "name": tool_call_json["name"], "arguments": json.dumps(tool_call_json["parameters"]) } }] } chunk_data = { "id": chat_id, "object": "chat.completion.chunk", "created": created, "model": model_id, "choices": [{"index": 0, "delta": delta, "finish_reason": None}], "usage": None } yield f"data: {json.dumps(chunk_data)}\n\n" else: continue else: # Regular content if is_first_chunk: delta = {"content": "".join(chunks_buffer), "tool_calls": None} delta["role"] = "assistant" is_first_chunk = False chunk_data = { "id": chat_id, "object": "chat.completion.chunk", "created": created, "model": model_id, "choices": [{"index": 0, "delta": delta, "finish_reason": None}], "usage": None } yield f"data: {json.dumps(chunk_data)}\n\n" delta = {"content": content_piece, "tool_calls": None} chunk_data = { "id": chat_id, "object": "chat.completion.chunk", "created": created, "model": model_id, "choices": [{"index": 0, "delta": delta, "finish_reason": None}], "usage": None } yield f"data: {json.dumps(chunk_data)}\n\n" except json.JSONDecodeError: continue elif line.startswith(("e:", "d:")): try: usage_info = json.loads(line[2:]).get("usage") except (json.JSONDecodeError, AttributeError): pass break final_usage = None if usage_info: prompt_tokens = usage_info.get("promptTokens", 0) completion_tokens = usage_info.get("completionTokens", 0) final_usage = { "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": prompt_tokens + completion_tokens, } done_chunk = { "id": chat_id, "object": "chat.completion.chunk", "created": created, "model": model_id, "choices": [{ "index": 0, "delta": {"role": "assistant", "content": None, "function_call": None, "tool_calls": None}, "finish_reason": "stop" }], "usage": final_usage } yield f"data: {json.dumps(done_chunk)}\n\n" except httpx.HTTPStatusError as e: error_content = { "error": { "message": f"Upstream API error: {e.response.status_code}. Details: {e.response.text}", "type": "upstream_error", "code": str(e.response.status_code) } } yield f"data: {json.dumps(error_content)}\n\n" finally: yield "data: [DONE]\n\n" return StreamingResponse(event_stream(), media_type="text/event-stream") else: # Non-streaming assistant_response, usage_info = "", {} tool_call_json = None try: async with httpx.AsyncClient(timeout=120) as client: async with client.stream("POST", "https://www.chatwithmono.xyz/api/chat", headers=headers, json=payload) as response: response.raise_for_status() async for chunk in response.aiter_lines(): if chunk.startswith("0:"): try: assistant_response += json.loads(chunk[2:]) except: continue elif chunk.startswith(("e:", "d:")): try: usage_info = json.loads(chunk[2:]).get("usage", {}) except: continue if "" in assistant_response and "" in assistant_response: tool_call_str = assistant_response.split("")[1].split("")[0] tool_call = json.loads(tool_call_str.strip()) tool_call_json = [{"id": generate_random_id("call_"),"function": {"name": tool_call["name"], "arguments": json.dumps(tool_call["parameters"])}}] return JSONResponse(content={ "id": chat_id, "object": "chat.completion", "created": int(time.time()), "model": model_id, "choices": [{"index": 0, "message": {"role": "assistant", "content": assistant_response if tool_call_json is None else None, "tool_calls": tool_call_json}, "finish_reason": "stop"}], "usage": { "prompt_tokens": usage_info.get("promptTokens", 0), "completion_tokens": usage_info.get("completionTokens", 0), "total_tokens": usage_info.get("promptTokens", 0) + usage_info.get("completionTokens", 0), } }) except httpx.HTTPStatusError as e: return JSONResponse(status_code=e.response.status_code, content={"error": {"message": f"Upstream API error. Details: {e.response.text}", "type": "upstream_error"}}) # === Image Generation === class ImageGenerationRequest(BaseModel): prompt: str aspect_ratio: Optional[str] = "1:1" n: Optional[int] = 1 user: Optional[str] = None model: Optional[str] = "default" @app.post("/v1/images/generations") async def generate_images(request: ImageGenerationRequest): """Handles image generation requests.""" results = [] try: async with httpx.AsyncClient(timeout=120) as client: for _ in range(request.n): model = request.model or "default" if model in ["gpt-image-1", "dall-e-3", "dall-e-2", "nextlm-image-1"]: headers = {'Content-Type': 'application/json', 'User-Agent': 'Mozilla/5.0', 'Referer': 'https://www.chatwithmono.xyz/'} payload = {"prompt": request.prompt, "model": model} resp = await client.post("https://www.chatwithmono.xyz/api/image", headers=headers, json=payload) resp.raise_for_status() data = resp.json() b64_image = data.get("image") if not b64_image: return JSONResponse(status_code=502, content={"error": "Missing base64 image in response"}) if SNAPZION_API_KEY: upload_headers = {"Authorization": SNAPZION_API_KEY} upload_files = {'file': ('image.png', base64.b64decode(b64_image), 'image/png')} upload_resp = await client.post(SNAPZION_UPLOAD_URL, headers=upload_headers, files=upload_files) upload_resp.raise_for_status() upload_data = upload_resp.json() image_url = upload_data.get("url") else: image_url = f"data:image/png;base64,{b64_image}" results.append({"url": image_url, "b64_json": b64_image, "revised_prompt": data.get("revised_prompt")}) else: params = {"prompt": request.prompt, "aspect_ratio": request.aspect_ratio, "link": "typegpt.net"} resp = await client.get(IMAGE_API_URL, params=params) resp.raise_for_status() data = resp.json() results.append({"url": data.get("image_link"), "b64_json": data.get("base64_output")}) except httpx.HTTPStatusError as e: return JSONResponse(status_code=502, content={"error": f"Image generation failed. Upstream error: {e.response.status_code}", "details": e.response.text}) except Exception as e: return JSONResponse(status_code=500, content={"error": "An internal error occurred.", "details": str(e)}) return {"created": int(time.time()), "data": results} # === Moderation Endpoint === class ModerationRequest(BaseModel): input: Union[str, List[str]] model: Optional[str] = "text-moderation-stable" @app.post("/v1/moderations") async def create_moderation(request: ModerationRequest): """ Handles moderation requests, conforming to the OpenAI API specification. Includes a custom 'reason' field in the result if provided by the upstream API. """ input_texts = [request.input] if isinstance(request.input, str) else request.input if not input_texts: return JSONResponse(status_code=400, content={"error": {"message": "Request must have at least one input string.", "type": "invalid_request_error"}}) moderation_url = "https://www.chatwithmono.xyz/api/moderation" headers = { 'Content-Type': 'application/json', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36', 'Referer': 'https://www.chatwithmono.xyz/', } results = [] try: async with httpx.AsyncClient(timeout=30) as client: for text_input in input_texts: payload = {"text": text_input} resp = await client.post(moderation_url, headers=headers, json=payload) resp.raise_for_status() upstream_data = resp.json() # --- Transform upstream response to OpenAI format --- upstream_categories = upstream_data.get("categories", {}) openai_categories = { "hate": upstream_categories.get("hate", False), "hate/threatening": False, "harassment": False, "harassment/threatening": False, "self-harm": upstream_categories.get("self-harm", False), "self-harm/intent": False, "self-harm/instructions": False, "sexual": upstream_categories.get("sexual", False), "sexual/minors": False, "violence": upstream_categories.get("violence", False), "violence/graphic": False, } category_scores = {k: 1.0 if v else 0.0 for k, v in openai_categories.items()} flagged = upstream_data.get("overall_sentiment") == "flagged" result_item = { "flagged": flagged, "categories": openai_categories, "category_scores": category_scores, } # --- NEW: Conditionally add the 'reason' field --- # This is a custom extension to the OpenAI spec to provide more detail. reason = upstream_data.get("reason") if reason: result_item["reason"] = reason results.append(result_item) except httpx.HTTPStatusError as e: return JSONResponse( status_code=502, # Bad Gateway content={"error": {"message": f"Moderation failed. Upstream error: {e.response.status_code}", "type": "upstream_error", "details": e.response.text}} ) except Exception as e: return JSONResponse(status_code=500, content={"error": {"message": "An internal error occurred during moderation.", "type": "internal_error", "details": str(e)}}) # Build the final OpenAI-compatible response final_response = { "id": generate_random_id("modr-"), "model": request.model, "results": results, } return JSONResponse(content=final_response) # --- Main Execution --- if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)