File size: 17,315 Bytes
375ade4
 
 
 
 
 
 
 
 
 
 
78b611a
 
 
 
 
375ade4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
78b611a
 
 
 
 
 
375ade4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
78b611a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
375ade4
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
#!/usr/bin/env python3
"""
Working Gemma 3n GGUF Backend Service
Minimal FastAPI backend using only llama-cpp-python for GGUF models
"""

import os
import logging
import time
from contextlib import asynccontextmanager
from typing import List, Dict, Any, Optional
import uuid
import sys
import subprocess
import threading
from pathlib import Path

from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, field_validator

# Import llama-cpp-python for GGUF model support
try:
    from llama_cpp import Llama
    llama_cpp_available = True
except ImportError:
    llama_cpp_available = False

import uvicorn

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Pydantic models for OpenAI-compatible API
class ChatMessage(BaseModel):
    role: str = Field(..., description="The role of the message author")
    content: str = Field(..., description="The content of the message")
    
    @field_validator('role')
    @classmethod
    def validate_role(cls, v: str) -> str:
        if v not in ["system", "user", "assistant"]:
            raise ValueError("Role must be one of: system, user, assistant")
        return v

class ChatCompletionRequest(BaseModel):
    model: str = Field(default="gemma-3n-e4b-it", description="The model to use for completion")
    messages: List[ChatMessage] = Field(..., description="List of messages in the conversation")
    max_tokens: Optional[int] = Field(default=512, ge=1, le=2048, description="Maximum tokens to generate")
    temperature: Optional[float] = Field(default=1.0, ge=0.0, le=2.0, description="Sampling temperature")
    top_p: Optional[float] = Field(default=0.95, ge=0.0, le=1.0, description="Top-p sampling")
    top_k: Optional[int] = Field(default=64, ge=1, le=100, description="Top-k sampling")
    stream: Optional[bool] = Field(default=False, description="Whether to stream responses")

class ChatCompletionChoice(BaseModel):
    index: int
    message: ChatMessage
    finish_reason: str

class ChatCompletionResponse(BaseModel):
    id: str
    object: str = "chat.completion"
    created: int
    model: str
    choices: List[ChatCompletionChoice]

class HealthResponse(BaseModel):
    status: str
    model: str
    version: str
    backend: str

# Global variables for model management
current_model = os.environ.get("AI_MODEL", "unsloth/gemma-3n-E4B-it-GGUF")
llm = None

def convert_messages_to_gemma_prompt(messages: List[ChatMessage]) -> str:
    """Convert OpenAI messages format to Gemma 3n chat format."""
    # Gemma 3n uses specific format with <start_of_turn> and <end_of_turn>
    prompt_parts = ["<bos>"]
    
    for message in messages:
        role = message.role
        content = message.content
        
        if role == "system":
            prompt_parts.append(f"<start_of_turn>system\n{content}<end_of_turn>")
        elif role == "user":
            prompt_parts.append(f"<start_of_turn>user\n{content}<end_of_turn>")
        elif role == "assistant":
            prompt_parts.append(f"<start_of_turn>model\n{content}<end_of_turn>")
    
    # Add the start for model response
    prompt_parts.append("<start_of_turn>model\n")
    
    return "\n".join(prompt_parts)

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Application lifespan manager for startup and shutdown events"""
    global llm
    logger.info("πŸš€ Starting Gemma 3n GGUF Backend Service...")
    if os.environ.get("DEMO_MODE", "").strip() not in ("", "0", "false", "False"):
        logger.info("πŸ§ͺ DEMO_MODE enabled: skipping model load")
        llm = None
        yield
        logger.info("πŸ”„ Shutting down Gemma 3n Backend Service (demo mode)...")
        return
    
    if not llama_cpp_available:
        logger.error("❌ llama-cpp-python is not available. Please install with: pip install llama-cpp-python")
        raise RuntimeError("llama-cpp-python not available")
    
    try:
        logger.info(f"πŸ“₯ Loading Gemma 3n GGUF model from {current_model}...")
        
        # Configure model parameters for Gemma 3n
        llm = Llama.from_pretrained(
            repo_id=current_model,
            filename="*Q4_K_M.gguf",  # Use Q4_K_M quantization for good performance
            verbose=True,
            # Gemma 3n specific settings
            n_ctx=4096,  # Start with 4K context (can be increased to 32K)
            n_threads=4,  # Adjust based on your CPU
            n_gpu_layers=-1,  # Use all GPU layers if CUDA available, otherwise CPU
            # Chat template for Gemma 3n format
            chat_format="gemma",  # Try built-in gemma format first
        )
        
        logger.info("βœ… Successfully loaded Gemma 3n GGUF model")
        
    except Exception as e:
        logger.error(f"❌ Failed to initialize Gemma 3n model: {e}")
        logger.warning("⚠️ Please download the GGUF model file locally and update the path")
        logger.warning("⚠️ You can download from: https://huggingface.co/unsloth/gemma-3n-E4B-it-GGUF")
        
        # For demo purposes, we'll continue without the model
        logger.info("πŸ”„ Starting service in demo mode (responses will be mocked)")
    
    yield
    logger.info("πŸ”„ Shutting down Gemma 3n Backend Service...")
    if llm:
        # Clean up model resources
        llm = None

# Initialize FastAPI app
app = FastAPI(
    title="Gemma 3n GGUF Backend Service",
    description="OpenAI-compatible chat completion API powered by Gemma-3n-E4B-it-GGUF",
    version="1.0.0",
    lifespan=lifespan
)

# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Configure appropriately for production
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

def ensure_model_ready():
    """Check if model is loaded and ready"""
    # For demo mode, we'll allow the service to run even without a model
    pass

def generate_response_gguf(messages: List[ChatMessage], max_tokens: int = 512, temperature: float = 1.0, top_p: float = 0.95, top_k: int = 64) -> str:
    """Generate response using GGUF model via llama-cpp-python."""
    if llm is None:
        # Demo mode response
        return "πŸ€– Demo mode: Gemma 3n model not loaded. This would be a real response from the Gemma 3n model. Please download the GGUF model from https://huggingface.co/unsloth/gemma-3n-E4B-it-GGUF"
    
    try:
        # Use the chat completion method if available
        if hasattr(llm, 'create_chat_completion'):
            # Convert to dict format for llama-cpp-python
            messages_dict = [{"role": msg.role, "content": msg.content} for msg in messages]
            
            response = llm.create_chat_completion(
                messages=messages_dict,
                max_tokens=max_tokens,
                temperature=temperature,
                top_p=top_p,
                top_k=top_k,
                stop=["<end_of_turn>", "<eos>", "</s>"]  # Gemma 3n stop tokens
            )
            
            return response['choices'][0]['message']['content'].strip()
        
        else:
            # Fallback to direct prompt completion
            prompt = convert_messages_to_gemma_prompt(messages)
            
            response = llm(
                prompt,
                max_tokens=max_tokens,
                temperature=temperature,
                top_p=top_p,
                top_k=top_k,
                stop=["<end_of_turn>", "<eos>", "</s>"],
                echo=False
            )
            
            return response['choices'][0]['text'].strip()
            
    except Exception as e:
        logger.error(f"GGUF generation failed: {e}")
        return "I apologize, but I'm having trouble generating a response right now. Please try again."

@app.get("/", response_class=JSONResponse)
async def root() -> Dict[str, Any]:
    """Root endpoint with service information"""
    return {
        "message": "Gemma 3n GGUF Backend Service is running!",
        "model": current_model,
        "version": "1.0.0",
        "backend": "llama-cpp-python",
        "model_loaded": llm is not None,
        "endpoints": {
            "health": "/health",
            "chat_completions": "/v1/chat/completions"
        }
    }

@app.get("/health", response_model=HealthResponse)
async def health_check():
    """Health check endpoint"""
    return HealthResponse(
        status="healthy" if (llm is not None) else "demo_mode",
        model=current_model,
        version="1.0.0",
        backend="llama-cpp-python"
    )

@app.post("/v1/chat/completions", response_model=ChatCompletionResponse)
async def create_chat_completion(
    request: ChatCompletionRequest
) -> ChatCompletionResponse:
    """Create a chat completion (OpenAI-compatible) using Gemma 3n GGUF"""
    try:
        ensure_model_ready()
        
        if not request.messages:
            raise HTTPException(status_code=400, detail="Messages cannot be empty")
        
        logger.info(f"Generating Gemma 3n response for {len(request.messages)} messages")
        
        response_text = generate_response_gguf(
            request.messages,
            request.max_tokens or 512,
            request.temperature or 1.0,
            request.top_p or 0.95,
            request.top_k or 64
        )
        
        response_text = response_text.strip() if response_text else "No response generated."
        
        return ChatCompletionResponse(
            id=f"chatcmpl-{int(time.time())}",
            created=int(time.time()),
            model=request.model,
            choices=[ChatCompletionChoice(
                index=0,
                message=ChatMessage(role="assistant", content=response_text),
                finish_reason="stop"
            )]
        )
        
    except Exception as e:
        logger.error(f"Error in chat completion: {e}")
        raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")

# -----------------------------
# Training Job Management (Unsloth)
# -----------------------------

# Jobs are tracked in-memory; logs and artifacts are written to disk
TRAIN_JOBS: Dict[str, Dict[str, Any]] = {}
TRAIN_DIR = Path(os.environ.get("TRAIN_DIR", "./training_runs")).resolve()
TRAIN_DIR.mkdir(parents=True, exist_ok=True)

def _start_training_subprocess(job_id: str, args: Dict[str, Any]) -> subprocess.Popen[Any]:
    """Spawn a subprocess to run the Unsloth fine-tuning script."""
    logs_dir = TRAIN_DIR / job_id
    logs_dir.mkdir(parents=True, exist_ok=True)
    log_file = open(logs_dir / "train.log", "w", encoding="utf-8")

    # Build absolute script path to avoid module/package resolution issues
    script_path = (Path(__file__).parent / "training" / "train_gemma_unsloth.py").resolve()
    python_exec = sys.executable

    cmd = [
        python_exec,
        str(script_path),
        "--job-id", job_id,
        "--output-dir", str(logs_dir),
    ]

    # Optional user-specified args
    def _extend(k: str, v: Any):
        if v is None:
            return
        if isinstance(v, bool):
            cmd.extend([f"--{k}"] if v else [])
        else:
            cmd.extend([f"--{k}", str(v)])

    _extend("dataset", args.get("dataset"))
    _extend("text-field", args.get("text_field"))
    _extend("prompt-field", args.get("prompt_field"))
    _extend("response-field", args.get("response_field"))
    _extend("max-steps", args.get("max_steps"))
    _extend("epochs", args.get("epochs"))
    _extend("lr", args.get("lr"))
    _extend("batch-size", args.get("batch_size"))
    _extend("gradient-accumulation", args.get("gradient_accumulation"))
    _extend("lora-r", args.get("lora_r"))
    _extend("lora-alpha", args.get("lora_alpha"))
    _extend("cutoff-len", args.get("cutoff_len"))
    _extend("model-id", args.get("model_id"))
    _extend("use-bf16", args.get("use_bf16"))
    _extend("use-fp16", args.get("use_fp16"))
    _extend("seed", args.get("seed"))
    _extend("dry-run", args.get("dry_run"))

    logger.info(f"🧡 Starting training subprocess for job {job_id}: {' '.join(cmd)}")
    logger.info(f"🐍 Using interpreter: {python_exec}")
    proc = subprocess.Popen(cmd, stdout=log_file, stderr=subprocess.STDOUT, cwd=str(Path(__file__).parent))
    return proc

def _watch_process(job_id: str, proc: subprocess.Popen[Any]):
    """Monitor a training process and update job state on exit."""
    return_code = proc.wait()
    status = "completed" if return_code == 0 else "failed"
    TRAIN_JOBS[job_id]["status"] = status
    TRAIN_JOBS[job_id]["return_code"] = return_code
    TRAIN_JOBS[job_id]["ended_at"] = int(time.time())
    logger.info(f"🏁 Training job {job_id} finished with status={status}, code={return_code}")

class StartTrainingRequest(BaseModel):
    dataset: str = Field(..., description="HF dataset name or path to local JSONL/JSON file")
    model_id: Optional[str] = Field(default="unsloth/gemma-3n-E4B-it", description="Base model for training (HF Transformers format)")
    text_field: Optional[str] = Field(default=None, description="Single text field name (SFT)")
    prompt_field: Optional[str] = Field(default=None, description="Prompt/instruction field (chat data)")
    response_field: Optional[str] = Field(default=None, description="Response/output field (chat data)")
    max_steps: Optional[int] = Field(default=None)
    epochs: Optional[int] = Field(default=1)
    lr: Optional[float] = Field(default=2e-4)
    batch_size: Optional[int] = Field(default=1)
    gradient_accumulation: Optional[int] = Field(default=8)
    lora_r: Optional[int] = Field(default=16)
    lora_alpha: Optional[int] = Field(default=32)
    cutoff_len: Optional[int] = Field(default=4096)
    use_bf16: Optional[bool] = Field(default=True)
    use_fp16: Optional[bool] = Field(default=False)
    seed: Optional[int] = Field(default=42)
    dry_run: Optional[bool] = Field(default=False, description="Write DONE and exit without running (for CI/macOS)")

class StartTrainingResponse(BaseModel):
    job_id: str
    status: str
    output_dir: str

class TrainStatusResponse(BaseModel):
    job_id: str
    status: str
    created_at: int
    started_at: Optional[int] = None
    ended_at: Optional[int] = None
    output_dir: Optional[str] = None
    return_code: Optional[int] = None

@app.post("/train/start", response_model=StartTrainingResponse)
def start_training(req: StartTrainingRequest):
    """Start a background Unsloth fine-tuning job. Returns a job_id to poll."""
    job_id = uuid.uuid4().hex[:12]
    now = int(time.time())
    output_dir = str((TRAIN_DIR / job_id).resolve())
    TRAIN_JOBS[job_id] = {
        "status": "starting",
        "created_at": now,
        "started_at": now,
        "args": req.model_dump(),
        "output_dir": output_dir,
    }

    try:
        proc = _start_training_subprocess(job_id, req.model_dump())
        TRAIN_JOBS[job_id]["status"] = "running"
        TRAIN_JOBS[job_id]["pid"] = proc.pid
        watcher = threading.Thread(target=_watch_process, args=(job_id, proc), daemon=True)
        watcher.start()
        return StartTrainingResponse(job_id=job_id, status="running", output_dir=output_dir)
    except Exception as e:
        logger.exception("Failed to start training job")
        TRAIN_JOBS[job_id]["status"] = "failed_to_start"
        raise HTTPException(status_code=500, detail=f"Failed to start training: {e}")

@app.get("/train/status/{job_id}", response_model=TrainStatusResponse)
def train_status(job_id: str):
    job = TRAIN_JOBS.get(job_id)
    if not job:
        raise HTTPException(status_code=404, detail="Job not found")
    return TrainStatusResponse(
        job_id=job_id,
        status=job.get("status", "unknown"),
        created_at=job.get("created_at", 0),
        started_at=job.get("started_at"),
        ended_at=job.get("ended_at"),
        output_dir=job.get("output_dir"),
        return_code=job.get("return_code"),
    )

@app.get("/train/logs/{job_id}")
def train_logs(job_id: str, tail: int = 200):
    job = TRAIN_JOBS.get(job_id)
    if not job:
        raise HTTPException(status_code=404, detail="Job not found")
    log_path = Path(job["output_dir"]) / "train.log"
    if not log_path.exists():
        return {"job_id": job_id, "logs": "(no logs yet)"}
    try:
        with open(log_path, "r", encoding="utf-8", errors="ignore") as f:
            lines = f.readlines()[-tail:]
        return {"job_id": job_id, "logs": "".join(lines)}
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Failed to read logs: {e}")

@app.post("/train/stop/{job_id}")
def train_stop(job_id: str):
    job = TRAIN_JOBS.get(job_id)
    if not job:
        raise HTTPException(status_code=404, detail="Job not found")
    pid = job.get("pid")
    if not pid:
        raise HTTPException(status_code=400, detail="Job does not have an active PID")
    try:
        os.kill(pid, 15)  # SIGTERM
        job["status"] = "stopping"
        return {"job_id": job_id, "status": "stopping"}
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Failed to stop job: {e}")

# Main entry point
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)