import base64
import json
import os
import secrets
import string
import time
import tempfile
import ast
from typing import List, Optional, Union, Any
import httpx
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel, Field, model_validator
# Import for OCR functionality
from gradio_client import Client, handle_file
# --- Configuration ---
load_dotenv()
# Environment variables for external services
IMAGE_API_URL = os.environ.get("IMAGE_API_URL", "https://image.api.example.com")
SNAPZION_UPLOAD_URL = "https://upload.snapzion.com/api/public-upload"
SNAPZION_API_KEY = os.environ.get("SNAP", "")
CHAT_API_URL = "https://www.chatwithmono.xyz/api/chat"
IMAGE_GEN_API_URL = "https://www.chatwithmono.xyz/api/image"
MODERATION_API_URL = "https://www.chatwithmono.xyz/api/moderation"
# --- Model Definitions ---
AVAILABLE_MODELS = [
{"id": "gpt-4-turbo", "object": "model", "created": int(time.time()), "owned_by": "system"},
{"id": "gpt-4o", "object": "model", "created": int(time.time()), "owned_by": "system"},
{"id": "gpt-3.5-turbo", "object": "model", "created": int(time.time()), "owned_by": "system"},
{"id": "dall-e-3", "object": "model", "created": int(time.time()), "owned_by": "system"},
{"id": "text-moderation-stable", "object": "model", "created": int(time.time()), "owned_by": "system"},
{"id": "florence-2-ocr", "object": "model", "created": int(time.time()), "owned_by": "system"},
]
MODEL_ALIASES = {}
# --- FastAPI Application & Global Clients ---
app = FastAPI(
title="OpenAI Compatible API",
description="An adapter for various services to be compatible with the OpenAI API specification.",
version="1.1.3" # Version reflects final formatting and fixes
)
# Initialize Gradio client globally to avoid re-initialization on each request
try:
ocr_client = Client("multimodalart/Florence-2-l4")
except Exception as e:
print(f"Warning: Could not initialize Gradio client for OCR: {e}")
ocr_client = None
# --- Pydantic Models ---
class Message(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
messages: List[Message]
model: str
stream: Optional[bool] = False
tools: Optional[Any] = None
class ImageGenerationRequest(BaseModel):
prompt: str
aspect_ratio: Optional[str] = "1:1"
n: Optional[int] = 1
user: Optional[str] = None
model: Optional[str] = "default"
class ModerationRequest(BaseModel):
input: Union[str, List[str]]
model: Optional[str] = "text-moderation-stable"
class OcrRequest(BaseModel):
image_url: Optional[str] = Field(None, description="URL of the image to process.")
image_b64: Optional[str] = Field(None, description="Base64 encoded string of the image to process.")
@model_validator(mode='before')
@classmethod
def check_sources(cls, data: Any) -> Any:
if isinstance(data, dict):
if not (data.get('image_url') or data.get('image_b64')):
raise ValueError('Either image_url or image_b64 must be provided.')
if data.get('image_url') and data.get('image_b64'):
raise ValueError('Provide either image_url or image_b64, not both.')
return data
class OcrResponse(BaseModel):
ocr_text: str
raw_response: dict
# --- Helper Function ---
def generate_random_id(prefix: str, length: int = 29) -> str:
"""Generates a cryptographically secure, random alphanumeric ID."""
population = string.ascii_letters + string.digits
random_part = "".join(secrets.choice(population) for _ in range(length))
return f"{prefix}{random_part}"
# === API Endpoints ===
@app.get("/v1/models", tags=["Models"])
async def list_models():
"""Lists the available models."""
return {"object": "list", "data": AVAILABLE_MODELS}
@app.post("/v1/chat/completions", tags=["Chat"])
async def chat_completion(request: ChatRequest):
"""Handles chat completion requests, supporting streaming and non-streaming."""
model_id = MODEL_ALIASES.get(request.model, request.model)
chat_id = generate_random_id("chatcmpl-")
headers = {
'accept': 'text/event-stream',
'content-type': 'application/json',
'origin': 'https://www.chatwithmono.xyz',
'referer': 'https://www.chatwithmono.xyz/',
'user-agent': 'Mozilla/5.0',
}
if request.tools:
tool_prompt = f"""You have access to the following tools. To call a tool, please respond with JSON for a tool call within XML tags. Respond in the format {{"name": tool name, "parameters": dictionary of argument name and its value}}. Do not use variables.
Tools: {";".join(f"{tool}" for tool in request.tools)}
Response Format for tool call:
{{"name": , "arguments": }}
"""
if request.messages[0].role == "system":
request.messages[0].content += "\n\n" + tool_prompt
else:
request.messages.insert(0, Message(role="system", content=tool_prompt))
payload = {"messages": [msg.model_dump() for msg in request.messages], "model": model_id}
if request.stream:
async def event_stream():
created = int(time.time())
usage_info = None
is_first_chunk = True
tool_call_buffer = ""
in_tool_call = False
try:
async with httpx.AsyncClient(timeout=120) as client:
async with client.stream("POST", CHAT_API_URL, headers=headers, json=payload) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if not line:
continue
if line.startswith("0:"):
try:
content_piece = json.loads(line[2:])
except json.JSONDecodeError:
continue
current_buffer = content_piece
if in_tool_call:
current_buffer = tool_call_buffer + content_piece
if "" in current_buffer:
tool_str = current_buffer.split("")[1].split("")[0]
tool_json = json.loads(tool_str.strip())
delta = {
"content": None,
"tool_calls": [{"index": 0, "id": generate_random_id("call_"), "type": "function",
"function": {"name": tool_json["name"], "arguments": json.dumps(tool_json["parameters"])}}]
}
chunk = {"id": chat_id, "object": "chat.completion.chunk", "created": created, "model": model_id,
"choices": [{"index": 0, "delta": delta, "finish_reason": None}], "usage": None}
yield f"data: {json.dumps(chunk)}\n\n"
in_tool_call = False
tool_call_buffer = ""
remaining_text = current_buffer.split("", 1)[1]
if remaining_text:
content_piece = remaining_text
else:
continue
if "" in content_piece:
in_tool_call = True
tool_call_buffer += content_piece.split("", 1)[1]
text_before = content_piece.split("", 1)[0]
if text_before:
delta = {"content": text_before, "tool_calls": None}
chunk = {"id": chat_id, "object": "chat.completion.chunk", "created": created, "model": model_id,
"choices": [{"index": 0, "delta": delta, "finish_reason": None}], "usage": None}
yield f"data: {json.dumps(chunk)}\n\n"
if "" not in tool_call_buffer:
continue
if not in_tool_call:
delta = {"content": content_piece}
if is_first_chunk:
delta["role"] = "assistant"
is_first_chunk = False
chunk = {"id": chat_id, "object": "chat.completion.chunk", "created": created, "model": model_id,
"choices": [{"index": 0, "delta": delta, "finish_reason": None}], "usage": None}
yield f"data: {json.dumps(chunk)}\n\n"
elif line.startswith(("e:", "d:")):
try:
usage_info = json.loads(line[2:]).get("usage")
except (json.JSONDecodeError, AttributeError):
pass
break
final_usage = None
if usage_info:
prompt_tokens = usage_info.get("promptTokens", 0)
completion_tokens = usage_info.get("completionTokens", 0)
final_usage = {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens
}
finish_reason = "tool_calls" if in_tool_call else "stop"
done_chunk = {"id": chat_id, "object": "chat.completion.chunk", "created": created, "model": model_id,
"choices": [{"index": 0, "delta": {}, "finish_reason": finish_reason}], "usage": final_usage}
yield f"data: {json.dumps(done_chunk)}\n\n"
except httpx.HTTPStatusError as e:
error_content = {"error": {"message": f"Upstream API error: {e.response.status_code}. Details: {e.response.text}", "type": "upstream_error", "code": str(e.response.status_code)}}
yield f"data: {json.dumps(error_content)}\n\n"
finally:
yield "data: [DONE]\n\n"
return StreamingResponse(event_stream(), media_type="text/event-stream")
else: # Non-streaming response
full_response, usage_info = "", {}
try:
async with httpx.AsyncClient(timeout=120) as client:
async with client.stream("POST", CHAT_API_URL, headers=headers, json=payload) as response:
response.raise_for_status()
async for chunk in response.aiter_lines():
if chunk.startswith("0:"):
try:
full_response += json.loads(chunk[2:])
except:
continue
elif chunk.startswith(("e:", "d:")):
try:
usage_info = json.loads(chunk[2:]).get("usage", {})
except:
continue
tool_calls = None
content_response = full_response
finish_reason = "stop"
if "" in full_response and "" in full_response:
tool_call_str = full_response.split("")[1].split("")[0]
tool_call = json.loads(tool_call_str.strip())
tool_calls = [{
"id": generate_random_id("call_"),
"type": "function",
"function": {
"name": tool_call["name"],
"arguments": json.dumps(tool_call["parameters"])
}
}]
content_response = None
finish_reason = "tool_calls"
prompt_tokens = usage_info.get("promptTokens", 0)
completion_tokens = usage_info.get("completionTokens", 0)
return JSONResponse(content={
"id": chat_id,
"object": "chat.completion",
"created": int(time.time()),
"model": model_id,
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": content_response,
"tool_calls": tool_calls
},
"finish_reason": finish_reason
}],
"usage": {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens
}
})
except httpx.HTTPStatusError as e:
return JSONResponse(
status_code=e.response.status_code,
content={"error": {"message": f"Upstream API error. Details: {e.response.text}", "type": "upstream_error"}}
)
@app.post("/v1/images/generations", tags=["Images"])
async def generate_images(request: ImageGenerationRequest):
"""Handles image generation requests."""
results = []
try:
async with httpx.AsyncClient(timeout=120) as client:
for _ in range(request.n):
model = request.model or "default"
if model in ["gpt-image-1", "dall-e-3", "dall-e-2", "nextlm-image-1"]:
headers = {'Content-Type': 'application/json', 'User-Agent': 'Mozilla/5.0', 'Referer': 'https://www.chatwithmono.xyz/'}
payload = {"prompt": request.prompt, "model": model}
resp = await client.post(IMAGE_GEN_API_URL, headers=headers, json=payload)
resp.raise_for_status()
data = resp.json()
b64_image = data.get("image")
if not b64_image:
return JSONResponse(status_code=502, content={"error": "Missing base64 image in response"})
image_url = f"data:image/png;base64,{b64_image}"
if SNAPZION_API_KEY:
upload_headers = {"Authorization": SNAPZION_API_KEY}
upload_files = {'file': ('image.png', base64.b64decode(b64_image), 'image/png')}
upload_resp = await client.post(SNAPZION_UPLOAD_URL, headers=upload_headers, files=upload_files)
if upload_resp.status_code == 200:
image_url = upload_resp.json().get("url", image_url)
results.append({"url": image_url, "b64_json": b64_image, "revised_prompt": data.get("revised_prompt")})
else:
params = {"prompt": request.prompt, "aspect_ratio": request.aspect_ratio, "link": "typegpt.net"}
resp = await client.get(IMAGE_API_URL, params=params)
resp.raise_for_status()
data = resp.json()
results.append({"url": data.get("image_link"), "b64_json": data.get("base64_output")})
except httpx.HTTPStatusError as e:
return JSONResponse(status_code=502, content={"error": f"Image generation failed. Upstream error: {e.response.status_code}", "details": e.response.text})
except Exception as e:
return JSONResponse(status_code=500, content={"error": "An internal error occurred.", "details": str(e)})
return {"created": int(time.time()), "data": results}
@app.post("/v1/ocr", response_model=OcrResponse, tags=["OCR"])
async def perform_ocr(request: OcrRequest):
"""
Performs Optical Character Recognition (OCR) on an image using the Florence-2 model.
Provide an image via a URL or a base64 encoded string.
"""
if not ocr_client:
raise HTTPException(status_code=503, detail="OCR service is not available. Gradio client failed to initialize.")
image_path, temp_file_path = None, None
try:
if request.image_url:
image_path = request.image_url
elif request.image_b64:
image_bytes = base64.b64decode(request.image_b64)
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as temp_file:
temp_file.write(image_bytes)
temp_file_path = temp_file.name
image_path = temp_file_path
prediction = ocr_client.predict(image=handle_file(image_path), task_prompt="OCR", api_name="/process_image")
if not prediction or not isinstance(prediction, tuple) or len(prediction) == 0:
raise HTTPException(status_code=502, detail="Invalid or empty response from OCR service.")
raw_output = prediction[0]
raw_result_dict = {}
# --- Robust Parsing Logic ---
if isinstance(raw_output, str):
try:
# First, try to parse as standard JSON
raw_result_dict = json.loads(raw_output)
except json.JSONDecodeError:
try:
# If JSON fails, try to evaluate as a Python literal (handles single quotes)
parsed_output = ast.literal_eval(raw_output)
if isinstance(parsed_output, dict):
raw_result_dict = parsed_output
else:
# The literal is something else (e.g., a list), wrap it.
raw_result_dict = {"result": str(parsed_output)}
except (ValueError, SyntaxError):
# If all parsing fails, assume the string is the direct OCR text.
raw_result_dict = {"ocr_text_from_string": raw_output}
elif isinstance(raw_output, dict):
# It's already a dictionary, use it directly
raw_result_dict = raw_output
else:
# Handle other unexpected data types
raise HTTPException(status_code=502, detail=f"Unexpected data type from OCR service: {type(raw_output)}")
# Extract text from the dictionary, with multiple fallbacks
ocr_text = raw_result_dict.get("OCR",
raw_result_dict.get("ocr_text_from_string",
str(raw_result_dict)))
return OcrResponse(ocr_text=ocr_text, raw_response=raw_result_dict)
except Exception as e:
if isinstance(e, HTTPException):
raise e
raise HTTPException(status_code=500, detail=f"An error occurred during OCR processing: {str(e)}")
finally:
if temp_file_path and os.path.exists(temp_file_path):
os.unlink(temp_file_path)
@app.post("/v1/moderations", tags=["Moderation"])
async def create_moderation(request: ModerationRequest):
"""Handles moderation requests, conforming to the OpenAI API specification."""
input_texts = [request.input] if isinstance(request.input, str) else request.input
if not input_texts:
return JSONResponse(status_code=400, content={"error": {"message": "Request must have at least one input string."}})
headers = {'Content-Type': 'application/json', 'User-Agent': 'Mozilla/5.0', 'Referer': 'https://www.chatwithmono.xyz/'}
results = []
try:
async with httpx.AsyncClient(timeout=30) as client:
for text_input in input_texts:
payload = {"text": text_input}
resp = await client.post(MODERATION_API_URL, headers=headers, json=payload)
resp.raise_for_status()
upstream_data = resp.json()
upstream_categories = upstream_data.get("categories", {})
openai_categories = {
"hate": upstream_categories.get("hate", False),
"hate/threatening": False,
"harassment": False,
"harassment/threatening": False,
"self-harm": upstream_categories.get("self-harm", False),
"self-harm/intent": False,
"self-harm/instructions": False,
"sexual": upstream_categories.get("sexual", False),
"sexual/minors": False,
"violence": upstream_categories.get("violence", False),
"violence/graphic": False,
}
result_item = {
"flagged": upstream_data.get("overall_sentiment") == "flagged",
"categories": openai_categories,
"category_scores": {k: 1.0 if v else 0.0 for k, v in openai_categories.items()},
}
if reason := upstream_data.get("reason"):
result_item["reason"] = reason
results.append(result_item)
except httpx.HTTPStatusError as e:
return JSONResponse(
status_code=502,
content={"error": {"message": f"Moderation failed. Upstream error: {e.response.status_code}", "details": e.response.text}}
)
except Exception as e:
return JSONResponse(
status_code=500,
content={"error": {"message": "An internal error occurred during moderation.", "details": str(e)}}
)
final_response = {
"id": generate_random_id("modr-"),
"model": request.model,
"results": results,
}
return JSONResponse(content=final_response)
# --- Main Execution ---
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)