File size: 9,490 Bytes
c8ae67b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
"""
Minimal OpenAI-compatible local server that serves /LiquidAI/LFM2-1.2B via Hugging Face
Transformers on CPU and exposes a subset of the OpenAI REST API (chat/completions, models).

Save as local_openai_compatible_server.py and run:
    pip install -r requirements.txt
    python local_openai_compatible_server.py

Or run with uvicorn directly (recommended for production/dev):
    uvicorn local_openai_compatible_server:app --host 0.0.0.0 --port 7860

Requirements (requirements.txt):
    fastapi
    "uvicorn[standard]"
    transformers
    torch

Notes:
- CPU-only: model loads on CPU (may be slow for a 1.2B model depending on your machine).
- Model repo id used: "/LiquidAI/LFM2-1.2B" — adjust if you have a different path or local copy.
- This provides a simplified compatibility layer. It is NOT feature-complete with OpenAI's API
  but implements common fields: messages, max_tokens, temperature, top_p, n, stop, stream (basic).
"""

from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse, StreamingResponse, PlainTextResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional, Any, Dict
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
import time
import json
import uuid

# -----------------------------
# Configuration
# -----------------------------
MODEL_ID = "/LiquidAI/LFM2-1.2B"  # change to your model location or HF repo
HOST = "0.0.0.0"
PORT = 7860
DEVICE = torch.device("cpu")  # CPU-only as requested
DEFAULT_MAX_TOKENS = 256

# -----------------------------
# Load model & tokenizer
# -----------------------------
print(f"Loading tokenizer and model '{MODEL_ID}' on device {DEVICE} (CPU-only)... this may take a while")
try:
    tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, use_fast=True)
    model = AutoModelForCausalLM.from_pretrained(MODEL_ID, torch_dtype=torch.float32)
    model.to(DEVICE)
    model.eval()
except Exception as e:
    raise RuntimeError(f"Failed to load model/tokenizer for '{MODEL_ID}': {e}")

# If tokenizer has no pad/eos, try to set sensible defaults
if tokenizer.pad_token_id is None:
    if tokenizer.eos_token_id is not None:
        tokenizer.pad_token_id = tokenizer.eos_token_id

# -----------------------------
# FastAPI app
# -----------------------------
app = FastAPI(title="Local OpenAI-compatible server (transformers)", version="0.1")
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# -----------------------------
# Pydantic models (request bodies)
# -----------------------------
class Message(BaseModel):
    role: str
    content: str

class ChatCompletionRequest(BaseModel):
    model: Optional[str] = MODEL_ID
    messages: List[Message]
    max_tokens: Optional[int] = DEFAULT_MAX_TOKENS
    temperature: Optional[float] = 0.0
    top_p: Optional[float] = 1.0
    n: Optional[int] = 1
    stop: Optional[List[str]] = None
    stream: Optional[bool] = False

# -----------------------------
# Helpers
# -----------------------------
def build_prompt_from_messages(messages: List[Dict[str, Any]]) -> str:
    # Simple conversational prompt formatting. Adjust to suit model's expected format.
    parts = []
    for m in messages:
        role = m.get("role", "user")
        content = m.get("content", "")
        if role == "system":
            parts.append(f"<|system|> {content}\n")
        elif role == "user":
            parts.append(f"User: {content}\n")
        elif role == "assistant":
            parts.append(f"Assistant: {content}\n")
        else:
            parts.append(f"{role}: {content}\n")
    parts.append("Assistant: ")
    return "".join(parts)


def apply_stop_sequences(text: str, stops: Optional[List[str]]) -> str:
    if not stops:
        return text
    idx = None
    for s in stops:
        if s == "":
            continue
        pos = text.find(s)
        if pos != -1:
            if idx is None or pos < idx:
                idx = pos
    if idx is not None:
        return text[:idx]
    return text

# -----------------------------
# Endpoints
# -----------------------------
@app.get("/", response_class=PlainTextResponse)
async def root():
    return "Local OpenAI-compatible server running. Use /v1/chat/completions or /v1/models"

@app.get("/v1/models")
async def list_models():
    return {"data": [{"id": MODEL_ID, "object": "model"}], "object": "list"}

@app.post("/v1/chat/completions")
async def chat_completions(request: Request, body: ChatCompletionRequest):
    # Basic validation
    if body.model is None or body.model != MODEL_ID:
        # Allow the default model but warn if mismatched
        raise HTTPException(status_code=400, detail={"error": "invalid_model", "message": f"Only model {MODEL_ID} is available on this server."})

    prompt = build_prompt_from_messages([m.dict() for m in body.messages])

    # Tokenize
    inputs = tokenizer(prompt, return_tensors="pt")
    input_ids = inputs["input_ids"].to(DEVICE)
    input_len = input_ids.shape[-1]

    # Generation settings
    gen_kwargs = {
        "max_new_tokens": body.max_tokens,
        "do_sample": bool(body.temperature and body.temperature > 0.0),
        "temperature": float(body.temperature or 0.0),
        "top_p": float(body.top_p or 1.0),
        "num_return_sequences": int(body.n or 1),
        "pad_token_id": tokenizer.pad_token_id or tokenizer.eos_token_id,
        # note: on CPU large models may be slow
    }

    # Synchronous generation
    with torch.no_grad():
        outputs = model.generate(input_ids, **gen_kwargs)

    choices = []
    for i, out_ids in enumerate(outputs):
        full_text = tokenizer.decode(out_ids, skip_special_tokens=True)
        # Attempt to strip the prompt prefix to return only generated reply
        # find the last occurrence of the prompt in full_text (best-effort)
        stripped = full_text
        try:
            # prefer exact match; fallback to trimming by token count
            if prompt.strip() and prompt in full_text:
                stripped = full_text.split(prompt, 1)[1]
            else:
                # fallback: remove first input_len tokens from decoded sequence
                decoded_all = full_text
                # naive fallback: no-op (we keep the full_text)
                stripped = decoded_all
        except Exception:
            stripped = full_text

        # apply stop sequences
        stripped = apply_stop_sequences(stripped, body.stop)

        # build choice structure similar to OpenAI
        choice = {
            "index": i,
            "message": {"role": "assistant", "content": stripped},
            "finish_reason": "stop" if body.stop else "length",
        }
        choices.append(choice)

    # approximate token usage
    completion_tokens = max(0, (outputs.shape[-1] - input_len) if outputs is not None else 0)
    usage = {"prompt_tokens": int(input_len), "completion_tokens": int(completion_tokens), "total_tokens": int(input_len + completion_tokens)}

    response = {
        "id": str(uuid.uuid4()),
        "object": "chat.completion",
        "created": int(time.time()),
        "model": body.model,
        "choices": choices,
        "usage": usage,
    }

    # Streaming: rudimentary implementation that streams chunks of the final text as SSE
    if body.stream:
        # Only support streaming a single response (n > 1 will still stream the first)
        text_to_stream = choices[0]["message"]["content"]
        def event_stream():
            # send a few small chunks
            chunk_size = 128
            for start in range(0, len(text_to_stream), chunk_size):
                chunk = text_to_stream[start:start+chunk_size]
                payload = {"id": response["id"], "object": "chat.completion.chunk", "choices": [{"delta": {"content": chunk}, "index": 0}]}
                yield f"data: {json.dumps(payload)}\n\n"
            # final done message
            done_payload = {"id": response["id"], "object": "chat.completion.chunk", "choices": [{"delta": {}, "index": 0}], "done": True}
            yield f"data: {json.dumps(done_payload)}\n\n"
        return StreamingResponse(event_stream(), media_type="text/event-stream")

    return JSONResponse(response)

# A convenience POST /v1/completions that accepts 'prompt' (legacy completions API)
class CompletionRequest(BaseModel):
    model: Optional[str] = MODEL_ID
    prompt: Optional[str] = ""
    max_tokens: Optional[int] = DEFAULT_MAX_TOKENS
    temperature: Optional[float] = 0.0
    top_p: Optional[float] = 1.0
    n: Optional[int] = 1
    stop: Optional[List[str]] = None
    stream: Optional[bool] = False

@app.post("/v1/completions")
async def completions(req: CompletionRequest):
    # wrap prompt into the chat-format for our generator
    messages = [Message(role="user", content=req.prompt)]
    chat_req = ChatCompletionRequest(model=req.model, messages=messages, max_tokens=req.max_tokens, temperature=req.temperature, top_p=req.top_p, n=req.n, stop=req.stop, stream=req.stream)
    # call the chat_completions handler directly
    return await chat_completions(Request(scope={}), chat_req)

# -----------------------------
# If executed directly, run uvicorn
# -----------------------------
if __name__ == "__main__":
    import uvicorn
    uvicorn.run("local_openai_compatible_server:app", host=HOST, port=PORT, log_level="info")