eb2 / app.py
nbugs's picture
Update app.py
e30c86f verified
import json
import uuid
import time
import asyncio
import logging
from datetime import datetime
from typing import Dict, List, Optional, Union, Any
import httpx
from fastapi import FastAPI, Request, Response, Depends, HTTPException, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse, JSONResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] %(levelname)s: %(message)s',
datefmt='%Y-%m-%dT%H:%M:%S'
)
logger = logging.getLogger(__name__)
# Import os for environment variables
import os
# Configuration constants
CONFIG = {
"API": {
"BASE_URL": os.environ.get("API_BASE_URL", "https://fragments.e2b.dev"),
"API_KEY": os.environ.get("API_KEY", "sk-123456") # Customize your own authentication key
},
"RETRY": {
"MAX_ATTEMPTS": 1,
"DELAY_BASE": 1000
},
"MODEL_CONFIG": {
"o1-preview": {
"id": "o1",
"provider": "OpenAI",
"providerId": "openai",
"name": "o1",
"multiModal": True,
"Systemprompt": "",
"opt_max": {
"temperatureMax": 2,
"max_tokensMax": 0,
"presence_penaltyMax": 2,
"frequency_penaltyMax": 2,
"top_pMax": 1,
"top_kMax": 500
}
},
"o3-mini": {
"id": "o3-mini",
"provider": "OpenAI",
"providerId": "openai",
"name": "o3 Mini",
"multiModal": True,
"Systemprompt": "",
"opt_max": {
"temperatureMax": 2,
"max_tokensMax": 4096,
"presence_penaltyMax": 2,
"frequency_penaltyMax": 2,
"top_pMax": 1,
"top_kMax": 500
}
},
"gpt-4o": {
"id": "gpt-4o",
"provider": "OpenAI",
"providerId": "openai",
"name": "GPT-4o",
"multiModal": True,
"Systemprompt": "",
"opt_max": {
"temperatureMax": 2,
"max_tokensMax": 16380,
"presence_penaltyMax": 2,
"frequency_penaltyMax": 2,
"top_pMax": 1,
"top_kMax": 500
}
},
"gpt-4.5-preview": {
"id": "gpt-4.5-preview",
"provider": "OpenAI",
"providerId": "openai",
"name": "GPT-4.5",
"multiModal": True,
"Systemprompt": "",
"opt_max": {
"temperatureMax": 2,
"max_tokensMax": 16380,
"presence_penaltyMax": 2,
"frequency_penaltyMax": 2,
"top_pMax": 1,
"top_kMax": 500
}
},
"gpt-4-turbo": {
"id": "gpt-4-turbo",
"provider": "OpenAI",
"providerId": "openai",
"name": "GPT-4 Turbo",
"multiModal": True,
"Systemprompt": "",
"opt_max": {
"temperatureMax": 2,
"max_tokensMax": 16380,
"presence_penaltyMax": 2,
"frequency_penaltyMax": 2,
"top_pMax": 1,
"top_kMax": 500
}
},
"gemini-1.5-pro": {
"id": "gemini-1.5-pro-002",
"provider": "Google Vertex AI",
"providerId": "vertex",
"name": "Gemini 1.5 Pro",
"multiModal": True,
"Systemprompt": "",
"opt_max": {
"temperatureMax": 2,
"max_tokensMax": 8192,
"presence_penaltyMax": 2,
"frequency_penaltyMax": 2,
"top_pMax": 1,
"top_kMax": 500
}
},
"gemini-2.5-pro-exp-03-25": {
"id": "gemini-2.5-pro-exp-03-25",
"provider": "Google Generative AI",
"providerId": "google",
"name": "Gemini 2.5 Pro Experimental 03-25",
"multiModal": True,
"Systemprompt": "",
"opt_max": {
"temperatureMax": 2,
"max_tokensMax": 8192,
"presence_penaltyMax": 2,
"frequency_penaltyMax": 2,
"top_pMax": 1,
"top_kMax": 40
}
},
"gemini-exp-1121": {
"id": "gemini-exp-1121",
"provider": "Google Generative AI",
"providerId": "google",
"name": "Gemini Experimental 1121",
"multiModal": True,
"Systemprompt": "",
"opt_max": {
"temperatureMax": 2,
"max_tokensMax": 8192,
"presence_penaltyMax": 2,
"frequency_penaltyMax": 2,
"top_pMax": 1,
"top_kMax": 40
}
},
"gemini-2.0-flash-exp": {
"id": "models/gemini-2.0-flash-exp",
"provider": "Google Generative AI",
"providerId": "google",
"name": "Gemini 2.0 Flash",
"multiModal": True,
"Systemprompt": "",
"opt_max": {
"temperatureMax": 2,
"max_tokensMax": 8192,
"presence_penaltyMax": 2,
"frequency_penaltyMax": 2,
"top_pMax": 1,
"top_kMax": 40
}
},
"claude-3-5-sonnet-latest": {
"id": "claude-3-5-sonnet-latest",
"provider": "Anthropic",
"providerId": "anthropic",
"name": "Claude 3.5 Sonnet",
"multiModal": True,
"Systemprompt": "",
"opt_max": {
"temperatureMax": 1,
"max_tokensMax": 8192,
"presence_penaltyMax": 2,
"frequency_penaltyMax": 2,
"top_pMax": 1,
"top_kMax": 500
}
},
"claude-3-7-sonnet-latest": {
"id": "claude-3-7-sonnet-latest",
"provider": "Anthropic",
"providerId": "anthropic",
"name": "Claude 3.7 Sonnet",
"multiModal": True,
"Systemprompt": "",
"opt_max": {
"temperatureMax": 1,
"max_tokensMax": 8192,
"presence_penaltyMax": 2,
"frequency_penaltyMax": 2,
"top_pMax": 1,
"top_kMax": 500
}
},
"claude-3-5-haiku-latest": {
"id": "claude-3-5-haiku-latest",
"provider": "Anthropic",
"providerId": "anthropic",
"name": "Claude 3.5 Haiku",
"multiModal": False,
"Systemprompt": "",
"opt_max": {
"temperatureMax": 1,
"max_tokensMax": 8192,
"presence_penaltyMax": 2,
"frequency_penaltyMax": 2,
"top_pMax": 1,
"top_kMax": 500
}
}
},
"DEFAULT_HEADERS": {
"accept": "*/*",
"accept-language": "zh-CN,zh;q=0.9",
"content-type": "application/json",
"priority": "u=1, i",
"sec-ch-ua": "\"Microsoft Edge\";v=\"131\", \"Chromium\";v=\"131\", \"Not_A Brand\";v=\"24\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"Referer": "https://fragments.e2b.dev/",
"Referrer-Policy": "strict-origin-when-cross-origin"
},
"MODEL_PROMPT": "Chatting with users and starting role-playing, the most important thing is to pay attention to their latest messages, use only 'text' to output the chat text reply content generated for user messages, and finally output it in code"
}
# Utility functions
def generate_uuid():
"""Generate a UUID v4 string."""
return str(uuid.uuid4())
async def config_opt(params: Dict[str, Any], model_config: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Constrain parameters based on model configuration."""
if not model_config.get("opt_max"):
return None
options_map = {
"temperature": "temperatureMax",
"max_tokens": "max_tokensMax",
"presence_penalty": "presence_penaltyMax",
"frequency_penalty": "frequency_penaltyMax",
"top_p": "top_pMax",
"top_k": "top_kMax"
}
constrained_params = {}
for key, value in params.items():
max_key = options_map.get(key)
if (max_key and
max_key in model_config["opt_max"] and
value is not None):
constrained_params[key] = min(value, model_config["opt_max"][max_key])
return constrained_params
# API client class
class ApiClient:
def __init__(self, model_id: str, request_id: str = ""):
if model_id not in CONFIG["MODEL_CONFIG"]:
raise ValueError(f"Unsupported model: {model_id}")
self.model_config = CONFIG["MODEL_CONFIG"][model_id]
self.request_id = request_id
def process_message_content(self, content: Any) -> Optional[str]:
"""Process message content to extract text."""
if isinstance(content, str):
return content
if isinstance(content, list):
return "\n".join([item.get("text", "") for item in content if item.get("type") == "text"])
if isinstance(content, dict):
return content.get("text")
return None
async def prepare_chat_request(self, request: Dict[str, Any], config: Optional[Dict[str, Any]]) -> Dict[str, Any]:
"""Prepare chat request for E2B API."""
logger.info(f"[{self.request_id}] Preparing chat request, model: {self.model_config['name']}, messages count: {len(request.get('messages', []))}")
opt_config = config or {"model": self.model_config["id"]}
transformed_messages = await self.transform_messages(request)
logger.info(f"[{self.request_id}] Transformed messages count: {len(transformed_messages)}")
return {
"userID": generate_uuid(),
"messages": transformed_messages,
"template": {
"text": {
"name": CONFIG["MODEL_PROMPT"],
"lib": [""],
"file": "pages/ChatWithUsers.txt",
"instructions": self.model_config["Systemprompt"],
"port": None
}
},
"model": {
"id": self.model_config["id"],
"provider": self.model_config["provider"],
"providerId": self.model_config["providerId"],
"name": self.model_config["name"],
"multiModal": self.model_config["multiModal"]
},
"config": opt_config
}
async def transform_messages(self, request: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Transform and merge messages for E2B API."""
messages = request.get("messages", [])
merged_messages = []
for current in messages:
current_content = self.process_message_content(current.get("content"))
if current_content is None:
continue
if (merged_messages and
current.get("role") == merged_messages[-1].get("role")):
last_content = self.process_message_content(merged_messages[-1].get("content"))
if last_content is not None:
merged_messages[-1]["content"] = f"{last_content}\n{current_content}"
continue
merged_messages.append(current)
result = []
for msg in merged_messages:
role = msg.get("role", "")
content = msg.get("content", "")
if role in ["system", "user"]:
result.append({
"role": "user",
"content": [{"type": "text", "text": content}]
})
elif role == "assistant":
result.append({
"role": "assistant",
"content": [{"type": "text", "text": content}]
})
else:
result.append(msg)
return result
# Response handler class
class ResponseHandler:
@staticmethod
async def handle_stream_response(chat_message: str, model: str, request_id: str):
"""Handle streaming response."""
logger.info(f"[{request_id}] Handling streaming response, content length: {len(chat_message)} characters")
async def generate():
index = 0
while index < len(chat_message):
# Simulate chunking similar to the Deno implementation
chunk_size = min(15 + int(15 * (0.5 - (0.5 * (index / len(chat_message))))), 30)
chunk = chat_message[index:index + chunk_size]
event_data = {
"id": generate_uuid(),
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [{
"index": 0,
"delta": {"content": chunk},
"finish_reason": "stop" if index + chunk_size >= len(chat_message) else None
}]
}
yield f"data: {json.dumps(event_data)}\n\n"
index += chunk_size
await asyncio.sleep(0.05) # 50ms delay between chunks
yield "data: [DONE]\n\n"
logger.info(f"[{request_id}] Streaming response completed")
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)
@staticmethod
async def handle_normal_response(chat_message: str, model: str, request_id: str):
"""Handle normal (non-streaming) response."""
logger.info(f"[{request_id}] Handling normal response, content length: {len(chat_message)} characters")
response_data = {
"id": generate_uuid(),
"object": "chat.completion",
"created": int(time.time()),
"model": model,
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": chat_message
},
"finish_reason": "stop"
}],
"usage": None
}
return JSONResponse(content=response_data)
# Pydantic models for request validation
class Message(BaseModel):
role: str
content: Union[str, List[Dict[str, Any]], Dict[str, Any]]
class ChatCompletionRequest(BaseModel):
model: str
messages: List[Message]
temperature: Optional[float] = None
max_tokens: Optional[int] = None
presence_penalty: Optional[float] = None
frequency_penalty: Optional[float] = None
top_p: Optional[float] = None
top_k: Optional[int] = None
stream: Optional[bool] = False
# Create FastAPI app
app = FastAPI(title="E2B API Proxy")
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Mount static files directory
app.mount("/static", StaticFiles(directory="static"), name="static")
# Dependency for API key validation
async def verify_api_key(request: Request):
auth_header = request.headers.get("authorization")
if not auth_header:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing API key"
)
token = auth_header.replace("Bearer ", "")
if token != CONFIG["API"]["API_KEY"]:
logger.error(f"Authentication failed, provided token: {token[:8]}...")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key"
)
return token
# API endpoints
@app.get("/hf/v1/models")
async def get_models():
"""Get available models."""
logger.info("Getting model list")
models = [
{
"id": model_id,
"object": "model",
"created": int(time.time()),
"owned_by": "e2b"
}
for model_id in CONFIG["MODEL_CONFIG"].keys()
]
logger.info(f"Model list returned successfully, model count: {len(models)}")
return {"object": "list", "data": models}
@app.post("/hf/v1/chat/completions")
async def chat_completions(
request: ChatCompletionRequest,
api_key: str = Depends(verify_api_key)
):
"""Handle chat completions."""
request_id = generate_uuid()
logger.info(f"[{request_id}] Processing chat completion request")
try:
logger.info(f"[{request_id}] User request body:", {
"model": request.model,
"messages_count": len(request.messages),
"stream": request.stream,
"temperature": request.temperature,
"max_tokens": request.max_tokens
})
# Configure options based on model limits
config_options = await config_opt(
{
"temperature": request.temperature,
"max_tokens": request.max_tokens,
"presence_penalty": request.presence_penalty,
"frequency_penalty": request.frequency_penalty,
"top_p": request.top_p,
"top_k": request.top_k
},
CONFIG["MODEL_CONFIG"][request.model]
)
# Prepare request for E2B API
api_client = ApiClient(request.model, request_id)
request_payload = await api_client.prepare_chat_request(
request.dict(),
config_options
)
logger.info(f"[{request_id}] Sending request to E2B:", {
"model": request_payload["model"]["name"],
"messages_count": len(request_payload["messages"]),
"config": request_payload["config"]
})
# Send request to E2B API
fetch_start_time = time.time()
async with httpx.AsyncClient() as client:
fetch_response = await client.post(
f"{CONFIG['API']['BASE_URL']}/api/chat",
headers=CONFIG["DEFAULT_HEADERS"],
json=request_payload,
timeout=60.0
)
fetch_end_time = time.time()
print(fetch_response.text)
# Process response
response_data = fetch_response.json()
logger.info(
f"[{request_id}] Received E2B response: {fetch_response.status_code}, "
f"time: {(fetch_end_time - fetch_start_time) * 1000:.0f}ms",
{
"status": fetch_response.status_code,
"has_code": bool(response_data.get("code")),
"has_text": bool(response_data.get("text")),
"response_preview": (response_data.get("code", "") or
response_data.get("text", "") or
"")[:100] + "..."
}
)
# Extract message content
chat_message = (
response_data.get("code", "").strip() or
response_data.get("text", "").strip() or
(response_data.strip() if isinstance(response_data, str) else None)
)
#chat_message = fetch_response.text
if not chat_message:
logger.error(f"[{request_id}] E2B did not return a valid response")
raise ValueError("No response from upstream service")
# Return response based on streaming preference
if request.stream:
return await ResponseHandler.handle_stream_response(
chat_message,
request.model,
request_id
)
else:
return await ResponseHandler.handle_normal_response(
chat_message,
request.model,
request_id
)
except Exception as e:
logger.error(f"[{request_id}] Error processing request:", exc_info=e)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"error": {
"message": f"{str(e)} Request failed, possibly due to context limit exceeded or other error. Please try again later.",
"type": "server_error",
"param": None,
"code": None
}
}
)
@app.get("/", response_class=HTMLResponse)
async def root():
"""Root endpoint that serves the HTML UI."""
with open("static/index.html", "r") as f:
html_content = f.read()
return HTMLResponse(content=html_content)
@app.get("/health")
async def health_check():
"""Health check endpoint for Hugging Face."""
return {"status": "ok", "message": "E2B API Proxy is running"}
if __name__ == "__main__":
import uvicorn
import os
port = int(os.environ.get("PORT", 7860))
uvicorn.run("app:app", host="0.0.0.0", port=port, reload=True)