import base64 import json import os import secrets import string import time import tempfile import ast from typing import List, Optional, Union, Any import httpx from dotenv import load_dotenv from fastapi import FastAPI, HTTPException from fastapi.responses import JSONResponse, StreamingResponse from pydantic import BaseModel, Field, model_validator # Import for OCR functionality from gradio_client import Client, handle_file # --- Configuration --- load_dotenv() # Environment variables for external services IMAGE_API_URL = os.environ.get("IMAGE_API_URL", "https://image.api.example.com") SNAPZION_UPLOAD_URL = "https://upload.snapzion.com/api/public-upload" SNAPZION_API_KEY = os.environ.get("SNAP", "") CHAT_API_URL = "https://www.chatwithmono.xyz/api/chat" IMAGE_GEN_API_URL = "https://www.chatwithmono.xyz/api/image" MODERATION_API_URL = "https://www.chatwithmono.xyz/api/moderation" # --- Model Definitions --- AVAILABLE_MODELS = [ {"id": "gpt-4-turbo", "object": "model", "created": int(time.time()), "owned_by": "system"}, {"id": "gpt-4o", "object": "model", "created": int(time.time()), "owned_by": "system"}, {"id": "gpt-3.5-turbo", "object": "model", "created": int(time.time()), "owned_by": "system"}, {"id": "dall-e-3", "object": "model", "created": int(time.time()), "owned_by": "system"}, {"id": "text-moderation-stable", "object": "model", "created": int(time.time()), "owned_by": "system"}, {"id": "florence-2-ocr", "object": "model", "created": int(time.time()), "owned_by": "system"}, ] MODEL_ALIASES = {} # --- FastAPI Application & Global Clients --- app = FastAPI( title="OpenAI Compatible API", description="An adapter for various services to be compatible with the OpenAI API specification.", version="1.1.3" # Version reflects final formatting and fixes ) # Initialize Gradio client globally to avoid re-initialization on each request try: ocr_client = Client("multimodalart/Florence-2-l4") except Exception as e: print(f"Warning: Could not initialize Gradio client for OCR: {e}") ocr_client = None # --- Pydantic Models --- class Message(BaseModel): role: str content: str class ChatRequest(BaseModel): messages: List[Message] model: str stream: Optional[bool] = False tools: Optional[Any] = None class ImageGenerationRequest(BaseModel): prompt: str aspect_ratio: Optional[str] = "1:1" n: Optional[int] = 1 user: Optional[str] = None model: Optional[str] = "default" class ModerationRequest(BaseModel): input: Union[str, List[str]] model: Optional[str] = "text-moderation-stable" class OcrRequest(BaseModel): image_url: Optional[str] = Field(None, description="URL of the image to process.") image_b64: Optional[str] = Field(None, description="Base64 encoded string of the image to process.") @model_validator(mode='before') @classmethod def check_sources(cls, data: Any) -> Any: if isinstance(data, dict): if not (data.get('image_url') or data.get('image_b64')): raise ValueError('Either image_url or image_b64 must be provided.') if data.get('image_url') and data.get('image_b64'): raise ValueError('Provide either image_url or image_b64, not both.') return data class OcrResponse(BaseModel): ocr_text: str raw_response: dict # --- Helper Function --- def generate_random_id(prefix: str, length: int = 29) -> str: """Generates a cryptographically secure, random alphanumeric ID.""" population = string.ascii_letters + string.digits random_part = "".join(secrets.choice(population) for _ in range(length)) return f"{prefix}{random_part}" # === API Endpoints === @app.get("/v1/models", tags=["Models"]) async def list_models(): """Lists the available models.""" return {"object": "list", "data": AVAILABLE_MODELS} @app.post("/v1/chat/completions", tags=["Chat"]) async def chat_completion(request: ChatRequest): """Handles chat completion requests, supporting streaming and non-streaming.""" model_id = MODEL_ALIASES.get(request.model, request.model) chat_id = generate_random_id("chatcmpl-") headers = { 'accept': 'text/event-stream', 'content-type': 'application/json', 'origin': 'https://www.chatwithmono.xyz', 'referer': 'https://www.chatwithmono.xyz/', 'user-agent': 'Mozilla/5.0', } if request.tools: tool_prompt = f"""You have access to the following tools. To call a tool, please respond with JSON for a tool call within XML tags. Respond in the format {{"name": tool name, "parameters": dictionary of argument name and its value}}. Do not use variables. Tools: {";".join(f"{tool}" for tool in request.tools)} Response Format for tool call: {{"name": , "arguments": }} """ if request.messages[0].role == "system": request.messages[0].content += "\n\n" + tool_prompt else: request.messages.insert(0, Message(role="system", content=tool_prompt)) payload = {"messages": [msg.model_dump() for msg in request.messages], "model": model_id} if request.stream: async def event_stream(): created = int(time.time()) usage_info = None is_first_chunk = True tool_call_buffer = "" in_tool_call = False try: async with httpx.AsyncClient(timeout=120) as client: async with client.stream("POST", CHAT_API_URL, headers=headers, json=payload) as response: response.raise_for_status() async for line in response.aiter_lines(): if not line: continue if line.startswith("0:"): try: content_piece = json.loads(line[2:]) except json.JSONDecodeError: continue current_buffer = content_piece if in_tool_call: current_buffer = tool_call_buffer + content_piece if "" in current_buffer: tool_str = current_buffer.split("")[1].split("")[0] tool_json = json.loads(tool_str.strip()) delta = { "content": None, "tool_calls": [{"index": 0, "id": generate_random_id("call_"), "type": "function", "function": {"name": tool_json["name"], "arguments": json.dumps(tool_json["parameters"])}}] } chunk = {"id": chat_id, "object": "chat.completion.chunk", "created": created, "model": model_id, "choices": [{"index": 0, "delta": delta, "finish_reason": None}], "usage": None} yield f"data: {json.dumps(chunk)}\n\n" in_tool_call = False tool_call_buffer = "" remaining_text = current_buffer.split("", 1)[1] if remaining_text: content_piece = remaining_text else: continue if "" in content_piece: in_tool_call = True tool_call_buffer += content_piece.split("", 1)[1] text_before = content_piece.split("", 1)[0] if text_before: delta = {"content": text_before, "tool_calls": None} chunk = {"id": chat_id, "object": "chat.completion.chunk", "created": created, "model": model_id, "choices": [{"index": 0, "delta": delta, "finish_reason": None}], "usage": None} yield f"data: {json.dumps(chunk)}\n\n" if "" not in tool_call_buffer: continue if not in_tool_call: delta = {"content": content_piece} if is_first_chunk: delta["role"] = "assistant" is_first_chunk = False chunk = {"id": chat_id, "object": "chat.completion.chunk", "created": created, "model": model_id, "choices": [{"index": 0, "delta": delta, "finish_reason": None}], "usage": None} yield f"data: {json.dumps(chunk)}\n\n" elif line.startswith(("e:", "d:")): try: usage_info = json.loads(line[2:]).get("usage") except (json.JSONDecodeError, AttributeError): pass break final_usage = None if usage_info: prompt_tokens = usage_info.get("promptTokens", 0) completion_tokens = usage_info.get("completionTokens", 0) final_usage = { "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": prompt_tokens + completion_tokens } finish_reason = "tool_calls" if in_tool_call else "stop" done_chunk = {"id": chat_id, "object": "chat.completion.chunk", "created": created, "model": model_id, "choices": [{"index": 0, "delta": {}, "finish_reason": finish_reason}], "usage": final_usage} yield f"data: {json.dumps(done_chunk)}\n\n" except httpx.HTTPStatusError as e: error_content = {"error": {"message": f"Upstream API error: {e.response.status_code}. Details: {e.response.text}", "type": "upstream_error", "code": str(e.response.status_code)}} yield f"data: {json.dumps(error_content)}\n\n" finally: yield "data: [DONE]\n\n" return StreamingResponse(event_stream(), media_type="text/event-stream") else: # Non-streaming response full_response, usage_info = "", {} try: async with httpx.AsyncClient(timeout=120) as client: async with client.stream("POST", CHAT_API_URL, headers=headers, json=payload) as response: response.raise_for_status() async for chunk in response.aiter_lines(): if chunk.startswith("0:"): try: full_response += json.loads(chunk[2:]) except: continue elif chunk.startswith(("e:", "d:")): try: usage_info = json.loads(chunk[2:]).get("usage", {}) except: continue tool_calls = None content_response = full_response finish_reason = "stop" if "" in full_response and "" in full_response: tool_call_str = full_response.split("")[1].split("")[0] tool_call = json.loads(tool_call_str.strip()) tool_calls = [{ "id": generate_random_id("call_"), "type": "function", "function": { "name": tool_call["name"], "arguments": json.dumps(tool_call["parameters"]) } }] content_response = None finish_reason = "tool_calls" prompt_tokens = usage_info.get("promptTokens", 0) completion_tokens = usage_info.get("completionTokens", 0) return JSONResponse(content={ "id": chat_id, "object": "chat.completion", "created": int(time.time()), "model": model_id, "choices": [{ "index": 0, "message": { "role": "assistant", "content": content_response, "tool_calls": tool_calls }, "finish_reason": finish_reason }], "usage": { "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": prompt_tokens + completion_tokens } }) except httpx.HTTPStatusError as e: return JSONResponse( status_code=e.response.status_code, content={"error": {"message": f"Upstream API error. Details: {e.response.text}", "type": "upstream_error"}} ) @app.post("/v1/images/generations", tags=["Images"]) async def generate_images(request: ImageGenerationRequest): """Handles image generation requests.""" results = [] try: async with httpx.AsyncClient(timeout=120) as client: for _ in range(request.n): model = request.model or "default" if model in ["gpt-image-1", "dall-e-3", "dall-e-2", "nextlm-image-1"]: headers = {'Content-Type': 'application/json', 'User-Agent': 'Mozilla/5.0', 'Referer': 'https://www.chatwithmono.xyz/'} payload = {"prompt": request.prompt, "model": model} resp = await client.post(IMAGE_GEN_API_URL, headers=headers, json=payload) resp.raise_for_status() data = resp.json() b64_image = data.get("image") if not b64_image: return JSONResponse(status_code=502, content={"error": "Missing base64 image in response"}) image_url = f"data:image/png;base64,{b64_image}" if SNAPZION_API_KEY: upload_headers = {"Authorization": SNAPZION_API_KEY} upload_files = {'file': ('image.png', base64.b64decode(b64_image), 'image/png')} upload_resp = await client.post(SNAPZION_UPLOAD_URL, headers=upload_headers, files=upload_files) if upload_resp.status_code == 200: image_url = upload_resp.json().get("url", image_url) results.append({"url": image_url, "b64_json": b64_image, "revised_prompt": data.get("revised_prompt")}) else: params = {"prompt": request.prompt, "aspect_ratio": request.aspect_ratio, "link": "typegpt.net"} resp = await client.get(IMAGE_API_URL, params=params) resp.raise_for_status() data = resp.json() results.append({"url": data.get("image_link"), "b64_json": data.get("base64_output")}) except httpx.HTTPStatusError as e: return JSONResponse(status_code=502, content={"error": f"Image generation failed. Upstream error: {e.response.status_code}", "details": e.response.text}) except Exception as e: return JSONResponse(status_code=500, content={"error": "An internal error occurred.", "details": str(e)}) return {"created": int(time.time()), "data": results} @app.post("/v1/ocr", response_model=OcrResponse, tags=["OCR"]) async def perform_ocr(request: OcrRequest): """ Performs Optical Character Recognition (OCR) on an image using the Florence-2 model. Provide an image via a URL or a base64 encoded string. """ if not ocr_client: raise HTTPException(status_code=503, detail="OCR service is not available. Gradio client failed to initialize.") image_path, temp_file_path = None, None try: if request.image_url: image_path = request.image_url elif request.image_b64: image_bytes = base64.b64decode(request.image_b64) with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as temp_file: temp_file.write(image_bytes) temp_file_path = temp_file.name image_path = temp_file_path prediction = ocr_client.predict(image=handle_file(image_path), task_prompt="OCR", api_name="/process_image") if not prediction or not isinstance(prediction, tuple) or len(prediction) == 0: raise HTTPException(status_code=502, detail="Invalid or empty response from OCR service.") raw_output = prediction[0] raw_result_dict = {} # --- Robust Parsing Logic --- if isinstance(raw_output, str): try: # First, try to parse as standard JSON raw_result_dict = json.loads(raw_output) except json.JSONDecodeError: try: # If JSON fails, try to evaluate as a Python literal (handles single quotes) parsed_output = ast.literal_eval(raw_output) if isinstance(parsed_output, dict): raw_result_dict = parsed_output else: # The literal is something else (e.g., a list), wrap it. raw_result_dict = {"result": str(parsed_output)} except (ValueError, SyntaxError): # If all parsing fails, assume the string is the direct OCR text. raw_result_dict = {"ocr_text_from_string": raw_output} elif isinstance(raw_output, dict): # It's already a dictionary, use it directly raw_result_dict = raw_output else: # Handle other unexpected data types raise HTTPException(status_code=502, detail=f"Unexpected data type from OCR service: {type(raw_output)}") # Extract text from the dictionary, with multiple fallbacks ocr_text = raw_result_dict.get("OCR", raw_result_dict.get("ocr_text_from_string", str(raw_result_dict))) return OcrResponse(ocr_text=ocr_text, raw_response=raw_result_dict) except Exception as e: if isinstance(e, HTTPException): raise e raise HTTPException(status_code=500, detail=f"An error occurred during OCR processing: {str(e)}") finally: if temp_file_path and os.path.exists(temp_file_path): os.unlink(temp_file_path) @app.post("/v1/moderations", tags=["Moderation"]) async def create_moderation(request: ModerationRequest): """Handles moderation requests, conforming to the OpenAI API specification.""" input_texts = [request.input] if isinstance(request.input, str) else request.input if not input_texts: return JSONResponse(status_code=400, content={"error": {"message": "Request must have at least one input string."}}) headers = {'Content-Type': 'application/json', 'User-Agent': 'Mozilla/5.0', 'Referer': 'https://www.chatwithmono.xyz/'} results = [] try: async with httpx.AsyncClient(timeout=30) as client: for text_input in input_texts: payload = {"text": text_input} resp = await client.post(MODERATION_API_URL, headers=headers, json=payload) resp.raise_for_status() upstream_data = resp.json() upstream_categories = upstream_data.get("categories", {}) openai_categories = { "hate": upstream_categories.get("hate", False), "hate/threatening": False, "harassment": False, "harassment/threatening": False, "self-harm": upstream_categories.get("self-harm", False), "self-harm/intent": False, "self-harm/instructions": False, "sexual": upstream_categories.get("sexual", False), "sexual/minors": False, "violence": upstream_categories.get("violence", False), "violence/graphic": False, } result_item = { "flagged": upstream_data.get("overall_sentiment") == "flagged", "categories": openai_categories, "category_scores": {k: 1.0 if v else 0.0 for k, v in openai_categories.items()}, } if reason := upstream_data.get("reason"): result_item["reason"] = reason results.append(result_item) except httpx.HTTPStatusError as e: return JSONResponse( status_code=502, content={"error": {"message": f"Moderation failed. Upstream error: {e.response.status_code}", "details": e.response.text}} ) except Exception as e: return JSONResponse( status_code=500, content={"error": {"message": "An internal error occurred during moderation.", "details": str(e)}} ) final_response = { "id": generate_random_id("modr-"), "model": request.model, "results": results, } return JSONResponse(content=final_response) # --- Main Execution --- if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)