|
import base64 |
|
import json |
|
import os |
|
import secrets |
|
import string |
|
import time |
|
from typing import List, Optional, Union, Any |
|
import httpx |
|
from dotenv import load_dotenv |
|
from fastapi import FastAPI |
|
from fastapi.responses import JSONResponse, StreamingResponse |
|
from pydantic import BaseModel |
|
|
|
|
|
load_dotenv() |
|
|
|
IMAGE_API_URL = os.environ.get("IMAGE_API_URL", "https://image.api.example.com") |
|
SNAPZION_UPLOAD_URL = "https://upload.snapzion.com/api/public-upload" |
|
SNAPZION_API_KEY = os.environ.get("SNAP", "") |
|
|
|
|
|
|
|
AVAILABLE_MODELS = [ |
|
{"id": "gpt-4-turbo", "object": "model", "created": int(time.time()), "owned_by": "system"}, |
|
{"id": "gpt-4o", "object": "model", "created": int(time.time()), "owned_by": "system"}, |
|
{"id": "gpt-3.5-turbo", "object": "model", "created": int(time.time()), "owned_by": "system"}, |
|
{"id": "dall-e-3", "object": "model", "created": int(time.time()), "owned_by": "system"}, |
|
{"id": "text-moderation-stable", "object": "model", "created": int(time.time()), "owned_by": "system"}, |
|
] |
|
MODEL_ALIASES = {} |
|
|
|
|
|
app = FastAPI( |
|
title="OpenAI Compatible API", |
|
description="An adapter for various services to be compatible with the OpenAI API specification.", |
|
version="1.0.0" |
|
) |
|
|
|
|
|
def generate_random_id(prefix: str, length: int = 29) -> str: |
|
""" |
|
Generates a cryptographically secure, random alphanumeric ID. |
|
""" |
|
population = string.ascii_letters + string.digits |
|
random_part = "".join(secrets.choice(population) for _ in range(length)) |
|
return f"{prefix}{random_part}" |
|
|
|
|
|
@app.get("/v1/models") |
|
async def list_models(): |
|
"""Lists the available models.""" |
|
return {"object": "list", "data": AVAILABLE_MODELS} |
|
|
|
|
|
class Message(BaseModel): |
|
role: str |
|
content: str |
|
|
|
class ChatRequest(BaseModel): |
|
messages: List[Message] |
|
model: str |
|
stream: Optional[bool] = False |
|
tools: Optional[Any] = None |
|
|
|
@app.post("/v1/chat/completions") |
|
async def chat_completion(request: ChatRequest): |
|
""" |
|
Handles chat completion requests, supporting both streaming and non-streaming responses. |
|
""" |
|
model_id = MODEL_ALIASES.get(request.model, request.model) |
|
chat_id = generate_random_id("chatcmpl-") |
|
headers = { |
|
'accept': 'text/event-stream', |
|
'content-type': 'application/json', |
|
'origin': 'https://www.chatwithmono.xyz', |
|
'referer': 'https://www.chatwithmono.xyz/', |
|
'user-agent': 'Mozilla/5.0', |
|
} |
|
if request.tools: |
|
|
|
|
|
tool_prompt = f"""You have access to the following tools . To call a tool, please respond with JSON for a tool call within <tool_call><tool_call> XML tag. Respond in the format {{"name": tool name, "parameters": dictionary of argument name and its value}}. Do not use variables. |
|
Tools: |
|
{";".join(f"<tool>{tool}</tool>" for tool in request.tools)} |
|
|
|
Response Format for tool call: |
|
For each function call, return a json object with function name and arguments within <tool_call></tool_call> XML tags: |
|
<tool_call> |
|
{{"name": <function-name>, "arguments": <args-json-object>}} |
|
</tool_call> |
|
|
|
Example of tool calling: |
|
<tool_call> |
|
{{"name": "get_weather", "parameters": {{"city": "New York"}}}} |
|
</tool_call> |
|
|
|
Using tools is recommended. |
|
""" |
|
if request.messages[0].role == "system": |
|
request.messages[0].content += "\n\n" + tool_prompt |
|
else: |
|
request.messages.insert(0, {"role": "system", "content": tool_prompt}) |
|
request_data = request.model_dump(exclude_unset=True) |
|
|
|
payload = { |
|
"messages": request_data["messages"], |
|
"model": model_id |
|
} |
|
if request.stream: |
|
async def event_stream(): |
|
created = int(time.time()) |
|
is_first_chunk = True |
|
usage_info = None |
|
is_tool_call = False |
|
chunks_buffer = [] |
|
max_initial_chunks = 4 |
|
try: |
|
async with httpx.AsyncClient(timeout=120) as client: |
|
async with client.stream("POST", "https://www.chatwithmono.xyz/api/chat", headers=headers, json=payload) as response: |
|
response.raise_for_status() |
|
async for line in response.aiter_lines(): |
|
if not line: continue |
|
if line.startswith("0:"): |
|
try: |
|
content_piece = json.loads(line[2:]) |
|
print(content_piece) |
|
|
|
if len(chunks_buffer) < max_initial_chunks: |
|
chunks_buffer.append(content_piece) |
|
continue |
|
|
|
if chunks_buffer and not is_tool_call: |
|
full_buffer = ''.join(chunks_buffer) |
|
if "<tool_call>" in full_buffer: |
|
print("Tool call detected") |
|
is_tool_call = True |
|
|
|
|
|
if is_tool_call: |
|
chunks_buffer.append(content_piece) |
|
|
|
full_buffer = ''.join(chunks_buffer) |
|
|
|
if "</tool_call>" in full_buffer: |
|
print("Tool call End detected") |
|
|
|
tool_call_str = full_buffer.split("<tool_call>")[1].split("</tool_call>")[0] |
|
tool_call_json = json.loads(tool_call_str.strip()) |
|
delta = { |
|
"content": None, |
|
"tool_calls": [{ |
|
"index": 0, |
|
"id": generate_random_id("call_"), |
|
"type": "function", |
|
"function": { |
|
"name": tool_call_json["name"], |
|
"arguments": json.dumps(tool_call_json["parameters"]) |
|
} |
|
}] |
|
} |
|
chunk_data = { |
|
"id": chat_id, "object": "chat.completion.chunk", "created": created, |
|
"model": model_id, |
|
"choices": [{"index": 0, "delta": delta, "finish_reason": None}], |
|
"usage": None |
|
} |
|
yield f"data: {json.dumps(chunk_data)}\n\n" |
|
else: |
|
continue |
|
else: |
|
|
|
|
|
if is_first_chunk: |
|
delta = {"content": "".join(chunks_buffer), "tool_calls": None} |
|
delta["role"] = "assistant" |
|
is_first_chunk = False |
|
chunk_data = { |
|
"id": chat_id, "object": "chat.completion.chunk", "created": created, |
|
"model": model_id, |
|
"choices": [{"index": 0, "delta": delta, "finish_reason": None}], |
|
"usage": None |
|
} |
|
yield f"data: {json.dumps(chunk_data)}\n\n" |
|
|
|
delta = {"content": content_piece, "tool_calls": None} |
|
|
|
chunk_data = { |
|
"id": chat_id, "object": "chat.completion.chunk", "created": created, |
|
"model": model_id, |
|
"choices": [{"index": 0, "delta": delta, "finish_reason": None}], |
|
"usage": None |
|
} |
|
yield f"data: {json.dumps(chunk_data)}\n\n" |
|
except json.JSONDecodeError: continue |
|
elif line.startswith(("e:", "d:")): |
|
try: |
|
usage_info = json.loads(line[2:]).get("usage") |
|
except (json.JSONDecodeError, AttributeError): pass |
|
break |
|
|
|
final_usage = None |
|
if usage_info: |
|
prompt_tokens = usage_info.get("promptTokens", 0) |
|
completion_tokens = usage_info.get("completionTokens", 0) |
|
final_usage = { |
|
"prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, |
|
"total_tokens": prompt_tokens + completion_tokens, |
|
} |
|
done_chunk = { |
|
"id": chat_id, "object": "chat.completion.chunk", "created": created, "model": model_id, |
|
"choices": [{ |
|
"index": 0, |
|
"delta": {"role": "assistant", "content": None, "function_call": None, "tool_calls": None}, |
|
"finish_reason": "stop" |
|
}], |
|
"usage": final_usage |
|
} |
|
yield f"data: {json.dumps(done_chunk)}\n\n" |
|
except httpx.HTTPStatusError as e: |
|
error_content = { |
|
"error": { |
|
"message": f"Upstream API error: {e.response.status_code}. Details: {e.response.text}", |
|
"type": "upstream_error", "code": str(e.response.status_code) |
|
} |
|
} |
|
yield f"data: {json.dumps(error_content)}\n\n" |
|
finally: |
|
yield "data: [DONE]\n\n" |
|
return StreamingResponse(event_stream(), media_type="text/event-stream") |
|
else: |
|
assistant_response, usage_info = "", {} |
|
tool_call_json = None |
|
try: |
|
async with httpx.AsyncClient(timeout=120) as client: |
|
async with client.stream("POST", "https://www.chatwithmono.xyz/api/chat", headers=headers, json=payload) as response: |
|
response.raise_for_status() |
|
async for chunk in response.aiter_lines(): |
|
if chunk.startswith("0:"): |
|
try: assistant_response += json.loads(chunk[2:]) |
|
except: continue |
|
elif chunk.startswith(("e:", "d:")): |
|
try: usage_info = json.loads(chunk[2:]).get("usage", {}) |
|
except: continue |
|
|
|
if "<tool_call>" in assistant_response and "</tool_call>" in assistant_response: |
|
tool_call_str = assistant_response.split("<tool_call>")[1].split("</tool_call>")[0] |
|
tool_call = json.loads(tool_call_str.strip()) |
|
tool_call_json = [{"id": generate_random_id("call_"),"function": {"name": tool_call["name"], "arguments": json.dumps(tool_call["parameters"])}}] |
|
|
|
|
|
|
|
return JSONResponse(content={ |
|
"id": chat_id, "object": "chat.completion", "created": int(time.time()), "model": model_id, |
|
"choices": [{"index": 0, "message": {"role": "assistant", "content": assistant_response if tool_call_json is None else None, "tool_calls": tool_call_json}, "finish_reason": "stop"}], |
|
"usage": { |
|
"prompt_tokens": usage_info.get("promptTokens", 0), |
|
"completion_tokens": usage_info.get("completionTokens", 0), |
|
"total_tokens": usage_info.get("promptTokens", 0) + usage_info.get("completionTokens", 0), |
|
} |
|
}) |
|
except httpx.HTTPStatusError as e: |
|
return JSONResponse(status_code=e.response.status_code, content={"error": {"message": f"Upstream API error. Details: {e.response.text}", "type": "upstream_error"}}) |
|
|
|
|
|
|
|
class ImageGenerationRequest(BaseModel): |
|
prompt: str |
|
aspect_ratio: Optional[str] = "1:1" |
|
n: Optional[int] = 1 |
|
user: Optional[str] = None |
|
model: Optional[str] = "default" |
|
|
|
@app.post("/v1/images/generations") |
|
async def generate_images(request: ImageGenerationRequest): |
|
"""Handles image generation requests.""" |
|
results = [] |
|
try: |
|
async with httpx.AsyncClient(timeout=120) as client: |
|
for _ in range(request.n): |
|
model = request.model or "default" |
|
if model in ["gpt-image-1", "dall-e-3", "dall-e-2", "nextlm-image-1"]: |
|
headers = {'Content-Type': 'application/json', 'User-Agent': 'Mozilla/5.0', 'Referer': 'https://www.chatwithmono.xyz/'} |
|
payload = {"prompt": request.prompt, "model": model} |
|
resp = await client.post("https://www.chatwithmono.xyz/api/image", headers=headers, json=payload) |
|
resp.raise_for_status() |
|
data = resp.json() |
|
b64_image = data.get("image") |
|
if not b64_image: return JSONResponse(status_code=502, content={"error": "Missing base64 image in response"}) |
|
if SNAPZION_API_KEY: |
|
upload_headers = {"Authorization": SNAPZION_API_KEY} |
|
upload_files = {'file': ('image.png', base64.b64decode(b64_image), 'image/png')} |
|
upload_resp = await client.post(SNAPZION_UPLOAD_URL, headers=upload_headers, files=upload_files) |
|
upload_resp.raise_for_status() |
|
upload_data = upload_resp.json() |
|
image_url = upload_data.get("url") |
|
else: |
|
image_url = f"data:image/png;base64,{b64_image}" |
|
results.append({"url": image_url, "b64_json": b64_image, "revised_prompt": data.get("revised_prompt")}) |
|
else: |
|
params = {"prompt": request.prompt, "aspect_ratio": request.aspect_ratio, "link": "typegpt.net"} |
|
resp = await client.get(IMAGE_API_URL, params=params) |
|
resp.raise_for_status() |
|
data = resp.json() |
|
results.append({"url": data.get("image_link"), "b64_json": data.get("base64_output")}) |
|
except httpx.HTTPStatusError as e: |
|
return JSONResponse(status_code=502, content={"error": f"Image generation failed. Upstream error: {e.response.status_code}", "details": e.response.text}) |
|
except Exception as e: |
|
return JSONResponse(status_code=500, content={"error": "An internal error occurred.", "details": str(e)}) |
|
return {"created": int(time.time()), "data": results} |
|
|
|
|
|
class ModerationRequest(BaseModel): |
|
input: Union[str, List[str]] |
|
model: Optional[str] = "text-moderation-stable" |
|
|
|
@app.post("/v1/moderations") |
|
async def create_moderation(request: ModerationRequest): |
|
""" |
|
Handles moderation requests, conforming to the OpenAI API specification. |
|
Includes a custom 'reason' field in the result if provided by the upstream API. |
|
""" |
|
input_texts = [request.input] if isinstance(request.input, str) else request.input |
|
if not input_texts: |
|
return JSONResponse(status_code=400, content={"error": {"message": "Request must have at least one input string.", "type": "invalid_request_error"}}) |
|
moderation_url = "https://www.chatwithmono.xyz/api/moderation" |
|
headers = { |
|
'Content-Type': 'application/json', |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36', |
|
'Referer': 'https://www.chatwithmono.xyz/', |
|
} |
|
results = [] |
|
try: |
|
async with httpx.AsyncClient(timeout=30) as client: |
|
for text_input in input_texts: |
|
payload = {"text": text_input} |
|
resp = await client.post(moderation_url, headers=headers, json=payload) |
|
resp.raise_for_status() |
|
upstream_data = resp.json() |
|
|
|
upstream_categories = upstream_data.get("categories", {}) |
|
openai_categories = { |
|
"hate": upstream_categories.get("hate", False), "hate/threatening": False, |
|
"harassment": False, "harassment/threatening": False, |
|
"self-harm": upstream_categories.get("self-harm", False), "self-harm/intent": False, "self-harm/instructions": False, |
|
"sexual": upstream_categories.get("sexual", False), "sexual/minors": False, |
|
"violence": upstream_categories.get("violence", False), "violence/graphic": False, |
|
} |
|
category_scores = {k: 1.0 if v else 0.0 for k, v in openai_categories.items()} |
|
flagged = upstream_data.get("overall_sentiment") == "flagged" |
|
result_item = { |
|
"flagged": flagged, |
|
"categories": openai_categories, |
|
"category_scores": category_scores, |
|
} |
|
|
|
|
|
|
|
reason = upstream_data.get("reason") |
|
if reason: |
|
result_item["reason"] = reason |
|
|
|
results.append(result_item) |
|
except httpx.HTTPStatusError as e: |
|
return JSONResponse( |
|
status_code=502, |
|
content={"error": {"message": f"Moderation failed. Upstream error: {e.response.status_code}", "type": "upstream_error", "details": e.response.text}} |
|
) |
|
except Exception as e: |
|
return JSONResponse(status_code=500, content={"error": {"message": "An internal error occurred during moderation.", "type": "internal_error", "details": str(e)}}) |
|
|
|
final_response = { |
|
"id": generate_random_id("modr-"), |
|
"model": request.model, |
|
"results": results, |
|
} |
|
return JSONResponse(content=final_response) |
|
|
|
|
|
if __name__ == "__main__": |
|
import uvicorn |
|
uvicorn.run(app, host="0.0.0.0", port=8000) |
|
|