import json import uuid import time import asyncio import logging from datetime import datetime from typing import Dict, List, Optional, Union, Any import httpx from fastapi import FastAPI, Request, Response, Depends, HTTPException, status from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse, JSONResponse, HTMLResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, Field # Configure logging logging.basicConfig( level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s', datefmt='%Y-%m-%dT%H:%M:%S' ) logger = logging.getLogger(__name__) # Import os for environment variables import os # Configuration constants CONFIG = { "API": { "BASE_URL": os.environ.get("API_BASE_URL", "https://fragments.e2b.dev"), "API_KEY": os.environ.get("API_KEY", "sk-123456") # Customize your own authentication key }, "RETRY": { "MAX_ATTEMPTS": 1, "DELAY_BASE": 1000 }, "MODEL_CONFIG": { "o1-preview": { "id": "o1", "provider": "OpenAI", "providerId": "openai", "name": "o1", "multiModal": True, "Systemprompt": "", "opt_max": { "temperatureMax": 2, "max_tokensMax": 0, "presence_penaltyMax": 2, "frequency_penaltyMax": 2, "top_pMax": 1, "top_kMax": 500 } }, "o3-mini": { "id": "o3-mini", "provider": "OpenAI", "providerId": "openai", "name": "o3 Mini", "multiModal": True, "Systemprompt": "", "opt_max": { "temperatureMax": 2, "max_tokensMax": 4096, "presence_penaltyMax": 2, "frequency_penaltyMax": 2, "top_pMax": 1, "top_kMax": 500 } }, "gpt-4o": { "id": "gpt-4o", "provider": "OpenAI", "providerId": "openai", "name": "GPT-4o", "multiModal": True, "Systemprompt": "", "opt_max": { "temperatureMax": 2, "max_tokensMax": 16380, "presence_penaltyMax": 2, "frequency_penaltyMax": 2, "top_pMax": 1, "top_kMax": 500 } }, "gpt-4.5-preview": { "id": "gpt-4.5-preview", "provider": "OpenAI", "providerId": "openai", "name": "GPT-4.5", "multiModal": True, "Systemprompt": "", "opt_max": { "temperatureMax": 2, "max_tokensMax": 16380, "presence_penaltyMax": 2, "frequency_penaltyMax": 2, "top_pMax": 1, "top_kMax": 500 } }, "gpt-4-turbo": { "id": "gpt-4-turbo", "provider": "OpenAI", "providerId": "openai", "name": "GPT-4 Turbo", "multiModal": True, "Systemprompt": "", "opt_max": { "temperatureMax": 2, "max_tokensMax": 16380, "presence_penaltyMax": 2, "frequency_penaltyMax": 2, "top_pMax": 1, "top_kMax": 500 } }, "gemini-1.5-pro": { "id": "gemini-1.5-pro-002", "provider": "Google Vertex AI", "providerId": "vertex", "name": "Gemini 1.5 Pro", "multiModal": True, "Systemprompt": "", "opt_max": { "temperatureMax": 2, "max_tokensMax": 8192, "presence_penaltyMax": 2, "frequency_penaltyMax": 2, "top_pMax": 1, "top_kMax": 500 } }, "gemini-2.5-pro-exp-03-25": { "id": "gemini-2.5-pro-exp-03-25", "provider": "Google Generative AI", "providerId": "google", "name": "Gemini 2.5 Pro Experimental 03-25", "multiModal": True, "Systemprompt": "", "opt_max": { "temperatureMax": 2, "max_tokensMax": 8192, "presence_penaltyMax": 2, "frequency_penaltyMax": 2, "top_pMax": 1, "top_kMax": 40 } }, "gemini-exp-1121": { "id": "gemini-exp-1121", "provider": "Google Generative AI", "providerId": "google", "name": "Gemini Experimental 1121", "multiModal": True, "Systemprompt": "", "opt_max": { "temperatureMax": 2, "max_tokensMax": 8192, "presence_penaltyMax": 2, "frequency_penaltyMax": 2, "top_pMax": 1, "top_kMax": 40 } }, "gemini-2.0-flash-exp": { "id": "models/gemini-2.0-flash-exp", "provider": "Google Generative AI", "providerId": "google", "name": "Gemini 2.0 Flash", "multiModal": True, "Systemprompt": "", "opt_max": { "temperatureMax": 2, "max_tokensMax": 8192, "presence_penaltyMax": 2, "frequency_penaltyMax": 2, "top_pMax": 1, "top_kMax": 40 } }, "claude-3-5-sonnet-latest": { "id": "claude-3-5-sonnet-latest", "provider": "Anthropic", "providerId": "anthropic", "name": "Claude 3.5 Sonnet", "multiModal": True, "Systemprompt": "", "opt_max": { "temperatureMax": 1, "max_tokensMax": 8192, "presence_penaltyMax": 2, "frequency_penaltyMax": 2, "top_pMax": 1, "top_kMax": 500 } }, "claude-3-7-sonnet-latest": { "id": "claude-3-7-sonnet-latest", "provider": "Anthropic", "providerId": "anthropic", "name": "Claude 3.7 Sonnet", "multiModal": True, "Systemprompt": "", "opt_max": { "temperatureMax": 1, "max_tokensMax": 8192, "presence_penaltyMax": 2, "frequency_penaltyMax": 2, "top_pMax": 1, "top_kMax": 500 } }, "claude-3-5-haiku-latest": { "id": "claude-3-5-haiku-latest", "provider": "Anthropic", "providerId": "anthropic", "name": "Claude 3.5 Haiku", "multiModal": False, "Systemprompt": "", "opt_max": { "temperatureMax": 1, "max_tokensMax": 8192, "presence_penaltyMax": 2, "frequency_penaltyMax": 2, "top_pMax": 1, "top_kMax": 500 } } }, "DEFAULT_HEADERS": { "accept": "*/*", "accept-language": "zh-CN,zh;q=0.9", "content-type": "application/json", "priority": "u=1, i", "sec-ch-ua": "\"Microsoft Edge\";v=\"131\", \"Chromium\";v=\"131\", \"Not_A Brand\";v=\"24\"", "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": "\"Windows\"", "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "Referer": "https://fragments.e2b.dev/", "Referrer-Policy": "strict-origin-when-cross-origin" }, "MODEL_PROMPT": "Chatting with users and starting role-playing, the most important thing is to pay attention to their latest messages, use only 'text' to output the chat text reply content generated for user messages, and finally output it in code" } # Utility functions def generate_uuid(): """Generate a UUID v4 string.""" return str(uuid.uuid4()) async def config_opt(params: Dict[str, Any], model_config: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Constrain parameters based on model configuration.""" if not model_config.get("opt_max"): return None options_map = { "temperature": "temperatureMax", "max_tokens": "max_tokensMax", "presence_penalty": "presence_penaltyMax", "frequency_penalty": "frequency_penaltyMax", "top_p": "top_pMax", "top_k": "top_kMax" } constrained_params = {} for key, value in params.items(): max_key = options_map.get(key) if (max_key and max_key in model_config["opt_max"] and value is not None): constrained_params[key] = min(value, model_config["opt_max"][max_key]) return constrained_params # API client class class ApiClient: def __init__(self, model_id: str, request_id: str = ""): if model_id not in CONFIG["MODEL_CONFIG"]: raise ValueError(f"Unsupported model: {model_id}") self.model_config = CONFIG["MODEL_CONFIG"][model_id] self.request_id = request_id def process_message_content(self, content: Any) -> Optional[str]: """Process message content to extract text.""" if isinstance(content, str): return content if isinstance(content, list): return "\n".join([item.get("text", "") for item in content if item.get("type") == "text"]) if isinstance(content, dict): return content.get("text") return None async def prepare_chat_request(self, request: Dict[str, Any], config: Optional[Dict[str, Any]]) -> Dict[str, Any]: """Prepare chat request for E2B API.""" logger.info(f"[{self.request_id}] Preparing chat request, model: {self.model_config['name']}, messages count: {len(request.get('messages', []))}") opt_config = config or {"model": self.model_config["id"]} transformed_messages = await self.transform_messages(request) logger.info(f"[{self.request_id}] Transformed messages count: {len(transformed_messages)}") return { "userID": generate_uuid(), "messages": transformed_messages, "template": { "text": { "name": CONFIG["MODEL_PROMPT"], "lib": [""], "file": "pages/ChatWithUsers.txt", "instructions": self.model_config["Systemprompt"], "port": None } }, "model": { "id": self.model_config["id"], "provider": self.model_config["provider"], "providerId": self.model_config["providerId"], "name": self.model_config["name"], "multiModal": self.model_config["multiModal"] }, "config": opt_config } async def transform_messages(self, request: Dict[str, Any]) -> List[Dict[str, Any]]: """Transform and merge messages for E2B API.""" messages = request.get("messages", []) merged_messages = [] for current in messages: current_content = self.process_message_content(current.get("content")) if current_content is None: continue if (merged_messages and current.get("role") == merged_messages[-1].get("role")): last_content = self.process_message_content(merged_messages[-1].get("content")) if last_content is not None: merged_messages[-1]["content"] = f"{last_content}\n{current_content}" continue merged_messages.append(current) result = [] for msg in merged_messages: role = msg.get("role", "") content = msg.get("content", "") if role in ["system", "user"]: result.append({ "role": "user", "content": [{"type": "text", "text": content}] }) elif role == "assistant": result.append({ "role": "assistant", "content": [{"type": "text", "text": content}] }) else: result.append(msg) return result # Response handler class class ResponseHandler: @staticmethod async def handle_stream_response(chat_message: str, model: str, request_id: str): """Handle streaming response.""" logger.info(f"[{request_id}] Handling streaming response, content length: {len(chat_message)} characters") async def generate(): index = 0 while index < len(chat_message): # Simulate chunking similar to the Deno implementation chunk_size = min(15 + int(15 * (0.5 - (0.5 * (index / len(chat_message))))), 30) chunk = chat_message[index:index + chunk_size] event_data = { "id": generate_uuid(), "object": "chat.completion.chunk", "created": int(time.time()), "model": model, "choices": [{ "index": 0, "delta": {"content": chunk}, "finish_reason": "stop" if index + chunk_size >= len(chat_message) else None }] } yield f"data: {json.dumps(event_data)}\n\n" index += chunk_size await asyncio.sleep(0.05) # 50ms delay between chunks yield "data: [DONE]\n\n" logger.info(f"[{request_id}] Streaming response completed") return StreamingResponse( generate(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", } ) @staticmethod async def handle_normal_response(chat_message: str, model: str, request_id: str): """Handle normal (non-streaming) response.""" logger.info(f"[{request_id}] Handling normal response, content length: {len(chat_message)} characters") response_data = { "id": generate_uuid(), "object": "chat.completion", "created": int(time.time()), "model": model, "choices": [{ "index": 0, "message": { "role": "assistant", "content": chat_message }, "finish_reason": "stop" }], "usage": None } return JSONResponse(content=response_data) # Pydantic models for request validation class Message(BaseModel): role: str content: Union[str, List[Dict[str, Any]], Dict[str, Any]] class ChatCompletionRequest(BaseModel): model: str messages: List[Message] temperature: Optional[float] = None max_tokens: Optional[int] = None presence_penalty: Optional[float] = None frequency_penalty: Optional[float] = None top_p: Optional[float] = None top_k: Optional[int] = None stream: Optional[bool] = False # Create FastAPI app app = FastAPI(title="E2B API Proxy") # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Mount static files directory app.mount("/static", StaticFiles(directory="static"), name="static") # Dependency for API key validation async def verify_api_key(request: Request): auth_header = request.headers.get("authorization") if not auth_header: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing API key" ) token = auth_header.replace("Bearer ", "") if token != CONFIG["API"]["API_KEY"]: logger.error(f"Authentication failed, provided token: {token[:8]}...") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key" ) return token # API endpoints @app.get("/hf/v1/models") async def get_models(): """Get available models.""" logger.info("Getting model list") models = [ { "id": model_id, "object": "model", "created": int(time.time()), "owned_by": "e2b" } for model_id in CONFIG["MODEL_CONFIG"].keys() ] logger.info(f"Model list returned successfully, model count: {len(models)}") return {"object": "list", "data": models} @app.post("/hf/v1/chat/completions") async def chat_completions( request: ChatCompletionRequest, api_key: str = Depends(verify_api_key) ): """Handle chat completions.""" request_id = generate_uuid() logger.info(f"[{request_id}] Processing chat completion request") try: logger.info(f"[{request_id}] User request body:", { "model": request.model, "messages_count": len(request.messages), "stream": request.stream, "temperature": request.temperature, "max_tokens": request.max_tokens }) # Configure options based on model limits config_options = await config_opt( { "temperature": request.temperature, "max_tokens": request.max_tokens, "presence_penalty": request.presence_penalty, "frequency_penalty": request.frequency_penalty, "top_p": request.top_p, "top_k": request.top_k }, CONFIG["MODEL_CONFIG"][request.model] ) # Prepare request for E2B API api_client = ApiClient(request.model, request_id) request_payload = await api_client.prepare_chat_request( request.dict(), config_options ) logger.info(f"[{request_id}] Sending request to E2B:", { "model": request_payload["model"]["name"], "messages_count": len(request_payload["messages"]), "config": request_payload["config"] }) # Send request to E2B API fetch_start_time = time.time() async with httpx.AsyncClient() as client: fetch_response = await client.post( f"{CONFIG['API']['BASE_URL']}/api/chat", headers=CONFIG["DEFAULT_HEADERS"], json=request_payload, timeout=60.0 ) fetch_end_time = time.time() print(fetch_response.text) # Process response response_data = fetch_response.json() logger.info( f"[{request_id}] Received E2B response: {fetch_response.status_code}, " f"time: {(fetch_end_time - fetch_start_time) * 1000:.0f}ms", { "status": fetch_response.status_code, "has_code": bool(response_data.get("code")), "has_text": bool(response_data.get("text")), "response_preview": (response_data.get("code", "") or response_data.get("text", "") or "")[:100] + "..." } ) # Extract message content chat_message = ( response_data.get("code", "").strip() or response_data.get("text", "").strip() or (response_data.strip() if isinstance(response_data, str) else None) ) #chat_message = fetch_response.text if not chat_message: logger.error(f"[{request_id}] E2B did not return a valid response") raise ValueError("No response from upstream service") # Return response based on streaming preference if request.stream: return await ResponseHandler.handle_stream_response( chat_message, request.model, request_id ) else: return await ResponseHandler.handle_normal_response( chat_message, request.model, request_id ) except Exception as e: logger.error(f"[{request_id}] Error processing request:", exc_info=e) return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={ "error": { "message": f"{str(e)} Request failed, possibly due to context limit exceeded or other error. Please try again later.", "type": "server_error", "param": None, "code": None } } ) @app.get("/", response_class=HTMLResponse) async def root(): """Root endpoint that serves the HTML UI.""" with open("static/index.html", "r") as f: html_content = f.read() return HTMLResponse(content=html_content) @app.get("/health") async def health_check(): """Health check endpoint for Hugging Face.""" return {"status": "ok", "message": "E2B API Proxy is running"} if __name__ == "__main__": import uvicorn import os port = int(os.environ.get("PORT", 7860)) uvicorn.run("app:app", host="0.0.0.0", port=port, reload=True)