Spaces:
Runtime error
Runtime error
""" | |
Minimal OpenAI-compatible local server that serves /LiquidAI/LFM2-1.2B via Hugging Face | |
Transformers on CPU and exposes a subset of the OpenAI REST API (chat/completions, models). | |
Save as local_openai_compatible_server.py and run: | |
pip install -r requirements.txt | |
python local_openai_compatible_server.py | |
Or run with uvicorn directly (recommended for production/dev): | |
uvicorn local_openai_compatible_server:app --host 0.0.0.0 --port 7860 | |
Requirements (requirements.txt): | |
fastapi | |
"uvicorn[standard]" | |
transformers | |
torch | |
Notes: | |
- CPU-only: model loads on CPU (may be slow for a 1.2B model depending on your machine). | |
- Model repo id used: "/LiquidAI/LFM2-1.2B" — adjust if you have a different path or local copy. | |
- This provides a simplified compatibility layer. It is NOT feature-complete with OpenAI's API | |
but implements common fields: messages, max_tokens, temperature, top_p, n, stop, stream (basic). | |
""" | |
from fastapi import FastAPI, Request, HTTPException | |
from fastapi.responses import JSONResponse, StreamingResponse, PlainTextResponse | |
from fastapi.middleware.cors import CORSMiddleware | |
from pydantic import BaseModel | |
from typing import List, Optional, Any, Dict | |
import torch | |
from transformers import AutoTokenizer, AutoModelForCausalLM | |
import time | |
import json | |
import uuid | |
# ----------------------------- | |
# Configuration | |
# ----------------------------- | |
MODEL_ID = "/LiquidAI/LFM2-1.2B" # change to your model location or HF repo | |
HOST = "0.0.0.0" | |
PORT = 7860 | |
DEVICE = torch.device("cpu") # CPU-only as requested | |
DEFAULT_MAX_TOKENS = 256 | |
# ----------------------------- | |
# Load model & tokenizer | |
# ----------------------------- | |
print(f"Loading tokenizer and model '{MODEL_ID}' on device {DEVICE} (CPU-only)... this may take a while") | |
try: | |
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, use_fast=True) | |
model = AutoModelForCausalLM.from_pretrained(MODEL_ID, torch_dtype=torch.float32) | |
model.to(DEVICE) | |
model.eval() | |
except Exception as e: | |
raise RuntimeError(f"Failed to load model/tokenizer for '{MODEL_ID}': {e}") | |
# If tokenizer has no pad/eos, try to set sensible defaults | |
if tokenizer.pad_token_id is None: | |
if tokenizer.eos_token_id is not None: | |
tokenizer.pad_token_id = tokenizer.eos_token_id | |
# ----------------------------- | |
# FastAPI app | |
# ----------------------------- | |
app = FastAPI(title="Local OpenAI-compatible server (transformers)", version="0.1") | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# ----------------------------- | |
# Pydantic models (request bodies) | |
# ----------------------------- | |
class Message(BaseModel): | |
role: str | |
content: str | |
class ChatCompletionRequest(BaseModel): | |
model: Optional[str] = MODEL_ID | |
messages: List[Message] | |
max_tokens: Optional[int] = DEFAULT_MAX_TOKENS | |
temperature: Optional[float] = 0.0 | |
top_p: Optional[float] = 1.0 | |
n: Optional[int] = 1 | |
stop: Optional[List[str]] = None | |
stream: Optional[bool] = False | |
# ----------------------------- | |
# Helpers | |
# ----------------------------- | |
def build_prompt_from_messages(messages: List[Dict[str, Any]]) -> str: | |
# Simple conversational prompt formatting. Adjust to suit model's expected format. | |
parts = [] | |
for m in messages: | |
role = m.get("role", "user") | |
content = m.get("content", "") | |
if role == "system": | |
parts.append(f"<|system|> {content}\n") | |
elif role == "user": | |
parts.append(f"User: {content}\n") | |
elif role == "assistant": | |
parts.append(f"Assistant: {content}\n") | |
else: | |
parts.append(f"{role}: {content}\n") | |
parts.append("Assistant: ") | |
return "".join(parts) | |
def apply_stop_sequences(text: str, stops: Optional[List[str]]) -> str: | |
if not stops: | |
return text | |
idx = None | |
for s in stops: | |
if s == "": | |
continue | |
pos = text.find(s) | |
if pos != -1: | |
if idx is None or pos < idx: | |
idx = pos | |
if idx is not None: | |
return text[:idx] | |
return text | |
# ----------------------------- | |
# Endpoints | |
# ----------------------------- | |
async def root(): | |
return "Local OpenAI-compatible server running. Use /v1/chat/completions or /v1/models" | |
async def list_models(): | |
return {"data": [{"id": MODEL_ID, "object": "model"}], "object": "list"} | |
async def chat_completions(request: Request, body: ChatCompletionRequest): | |
# Basic validation | |
if body.model is None or body.model != MODEL_ID: | |
# Allow the default model but warn if mismatched | |
raise HTTPException(status_code=400, detail={"error": "invalid_model", "message": f"Only model {MODEL_ID} is available on this server."}) | |
prompt = build_prompt_from_messages([m.dict() for m in body.messages]) | |
# Tokenize | |
inputs = tokenizer(prompt, return_tensors="pt") | |
input_ids = inputs["input_ids"].to(DEVICE) | |
input_len = input_ids.shape[-1] | |
# Generation settings | |
gen_kwargs = { | |
"max_new_tokens": body.max_tokens, | |
"do_sample": bool(body.temperature and body.temperature > 0.0), | |
"temperature": float(body.temperature or 0.0), | |
"top_p": float(body.top_p or 1.0), | |
"num_return_sequences": int(body.n or 1), | |
"pad_token_id": tokenizer.pad_token_id or tokenizer.eos_token_id, | |
# note: on CPU large models may be slow | |
} | |
# Synchronous generation | |
with torch.no_grad(): | |
outputs = model.generate(input_ids, **gen_kwargs) | |
choices = [] | |
for i, out_ids in enumerate(outputs): | |
full_text = tokenizer.decode(out_ids, skip_special_tokens=True) | |
# Attempt to strip the prompt prefix to return only generated reply | |
# find the last occurrence of the prompt in full_text (best-effort) | |
stripped = full_text | |
try: | |
# prefer exact match; fallback to trimming by token count | |
if prompt.strip() and prompt in full_text: | |
stripped = full_text.split(prompt, 1)[1] | |
else: | |
# fallback: remove first input_len tokens from decoded sequence | |
decoded_all = full_text | |
# naive fallback: no-op (we keep the full_text) | |
stripped = decoded_all | |
except Exception: | |
stripped = full_text | |
# apply stop sequences | |
stripped = apply_stop_sequences(stripped, body.stop) | |
# build choice structure similar to OpenAI | |
choice = { | |
"index": i, | |
"message": {"role": "assistant", "content": stripped}, | |
"finish_reason": "stop" if body.stop else "length", | |
} | |
choices.append(choice) | |
# approximate token usage | |
completion_tokens = max(0, (outputs.shape[-1] - input_len) if outputs is not None else 0) | |
usage = {"prompt_tokens": int(input_len), "completion_tokens": int(completion_tokens), "total_tokens": int(input_len + completion_tokens)} | |
response = { | |
"id": str(uuid.uuid4()), | |
"object": "chat.completion", | |
"created": int(time.time()), | |
"model": body.model, | |
"choices": choices, | |
"usage": usage, | |
} | |
# Streaming: rudimentary implementation that streams chunks of the final text as SSE | |
if body.stream: | |
# Only support streaming a single response (n > 1 will still stream the first) | |
text_to_stream = choices[0]["message"]["content"] | |
def event_stream(): | |
# send a few small chunks | |
chunk_size = 128 | |
for start in range(0, len(text_to_stream), chunk_size): | |
chunk = text_to_stream[start:start+chunk_size] | |
payload = {"id": response["id"], "object": "chat.completion.chunk", "choices": [{"delta": {"content": chunk}, "index": 0}]} | |
yield f"data: {json.dumps(payload)}\n\n" | |
# final done message | |
done_payload = {"id": response["id"], "object": "chat.completion.chunk", "choices": [{"delta": {}, "index": 0}], "done": True} | |
yield f"data: {json.dumps(done_payload)}\n\n" | |
return StreamingResponse(event_stream(), media_type="text/event-stream") | |
return JSONResponse(response) | |
# A convenience POST /v1/completions that accepts 'prompt' (legacy completions API) | |
class CompletionRequest(BaseModel): | |
model: Optional[str] = MODEL_ID | |
prompt: Optional[str] = "" | |
max_tokens: Optional[int] = DEFAULT_MAX_TOKENS | |
temperature: Optional[float] = 0.0 | |
top_p: Optional[float] = 1.0 | |
n: Optional[int] = 1 | |
stop: Optional[List[str]] = None | |
stream: Optional[bool] = False | |
async def completions(req: CompletionRequest): | |
# wrap prompt into the chat-format for our generator | |
messages = [Message(role="user", content=req.prompt)] | |
chat_req = ChatCompletionRequest(model=req.model, messages=messages, max_tokens=req.max_tokens, temperature=req.temperature, top_p=req.top_p, n=req.n, stop=req.stop, stream=req.stream) | |
# call the chat_completions handler directly | |
return await chat_completions(Request(scope={}), chat_req) | |
# ----------------------------- | |
# If executed directly, run uvicorn | |
# ----------------------------- | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run("local_openai_compatible_server:app", host=HOST, port=PORT, log_level="info") | |