|
from datetime import datetime |
|
import json |
|
import time |
|
import numpy as np |
|
from sentence_transformers import SentenceTransformer |
|
from fastapi import FastAPI, HTTPException, BackgroundTasks |
|
from fastapi.responses import StreamingResponse |
|
from pydantic import BaseModel |
|
from llama_cpp import Llama |
|
from huggingface_hub import login, hf_hub_download |
|
import logging |
|
import os |
|
import faiss |
|
import asyncio |
|
import psutil |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
app = FastAPI() |
|
|
|
|
|
model_lock = asyncio.Lock() |
|
|
|
|
|
hf_token = os.getenv("HF_TOKEN") |
|
if not hf_token: |
|
logger.error("HF_TOKEN environment variable not set.") |
|
raise ValueError("HF_TOKEN not set") |
|
login(token=hf_token) |
|
|
|
|
|
sentence_transformer_model = "all-MiniLM-L6-v2" |
|
repo_id = "bartowski/deepcogito_cogito-v1-preview-llama-3B-GGUF" |
|
filename = "deepcogito_cogito-v1-preview-llama-3B-Q4_K_M.gguf" |
|
|
|
|
|
faqs = [ |
|
{"question": "What is your name?", "answer": "My name is Tim Luka Horstmann."}, |
|
{"question": "Where do you live?", "answer": "I live in Paris, France."}, |
|
{"question": "What is your education?", "answer": "I am currently pursuing a MSc in Data and AI at Institut Polytechnique de Paris. I have an MPhil in Advanced Computer Science from the University of Cambridge, and a BSc in Business Informatics from RheinMain University of Applied Sciences."}, |
|
{"question": "What are your skills?", "answer": "I am proficient in Python, Java, SQL, Cypher, SPARQL, VBA, JavaScript, HTML/CSS, and Ruby. I also use tools like PyTorch, Hugging Face, Scikit-Learn, NumPy, Pandas, Matplotlib, Jupyter, Git, Bash, IoT, Ansible, QuickSight, and Wordpress."}, |
|
{"question": "How are you?", "answer": "I’m doing great, thanks for asking! I’m enjoying life in Paris and working on some exciting AI projects."}, |
|
{"question": "What do you do?", "answer": "I’m a Computer Scientist and AI enthusiast, currently pursuing a MSc in Data and AI at Institut Polytechnique de Paris and interning as a Machine Learning Research Engineer at Hi! PARIS."}, |
|
{"question": "How’s it going?", "answer": "Things are going well, thanks! I’m busy with my studies and research, but I love the challenges and opportunities I get to explore."}, |
|
] |
|
|
|
try: |
|
|
|
logger.info("Loading CV embeddings from cv_embeddings.json") |
|
with open("cv_embeddings.json", "r", encoding="utf-8") as f: |
|
cv_data = json.load(f) |
|
cv_chunks = [item["chunk"] for item in cv_data] |
|
cv_embeddings = np.array([item["embedding"] for item in cv_data]).astype('float32') |
|
faiss.normalize_L2(cv_embeddings) |
|
faiss_index = faiss.IndexFlatIP(cv_embeddings.shape[1]) |
|
faiss_index.add(cv_embeddings) |
|
logger.info("FAISS index built successfully") |
|
|
|
|
|
logger.info("Loading SentenceTransformer model") |
|
embedder = SentenceTransformer(sentence_transformer_model, device="cpu") |
|
logger.info("SentenceTransformer model loaded") |
|
|
|
|
|
faq_questions = [faq["question"] for faq in faqs] |
|
faq_embeddings = embedder.encode(faq_questions, convert_to_numpy=True).astype("float32") |
|
faiss.normalize_L2(faq_embeddings) |
|
|
|
|
|
logger.info(f"Loading {filename} model") |
|
model_path = hf_hub_download( |
|
repo_id=repo_id, |
|
filename=filename, |
|
local_dir="/app/cache" if os.getenv("HF_HOME") else None, |
|
token=hf_token, |
|
) |
|
generator = Llama( |
|
model_path=model_path, |
|
n_ctx=3072, |
|
n_threads=2, |
|
n_batch=64, |
|
n_gpu_layers=0, |
|
use_mlock=True, |
|
f16_kv=True, |
|
verbose=True, |
|
batch_prefill=True, |
|
prefill_logits=False, |
|
) |
|
logger.info(f"{filename} model loaded") |
|
|
|
except Exception as e: |
|
logger.error(f"Startup error: {str(e)}", exc_info=True) |
|
raise |
|
|
|
def retrieve_context(query, top_k=2): |
|
try: |
|
query_embedding = embedder.encode(query, convert_to_numpy=True).astype("float32") |
|
query_embedding = query_embedding.reshape(1, -1) |
|
faiss.normalize_L2(query_embedding) |
|
distances, indices = faiss_index.search(query_embedding, top_k) |
|
return "\n".join([cv_chunks[i] for i in indices[0]]) |
|
except Exception as e: |
|
logger.error(f"Error in retrieve_context: {str(e)}") |
|
raise |
|
|
|
|
|
try: |
|
with open("cv_text.txt", "r", encoding="utf-8") as f: |
|
full_cv_text = f.read() |
|
if not isinstance(full_cv_text, str): |
|
full_cv_text = str(full_cv_text) |
|
logger.info("CV text loaded successfully") |
|
except Exception as e: |
|
logger.error(f"Error loading cv_text.txt: {str(e)}") |
|
raise |
|
|
|
async def stream_response(query, history): |
|
logger.info(f"Processing query: {query}") |
|
start_time = time.time() |
|
first_token_logged = False |
|
|
|
current_date = datetime.now().strftime("%Y-%m-%d") |
|
|
|
system_prompt = ( |
|
"You are Tim Luka Horstmann, a Computer Scientist. A user is asking you a question. Respond as yourself, using the first person, in a friendly and concise manner. " |
|
"For questions about your CV, base your answer *exclusively* on the provided CV information below and do not add any details not explicitly stated. " |
|
"For casual questions not covered by the CV, respond naturally but limit answers to general truths about yourself (e.g., your current location is Paris, France, or your field is AI) " |
|
"and say 'I don't have specific details to share about that' if pressed for specifics beyond the CV or FAQs. Do not invent facts, experiences, or opinions not supported by the CV or FAQs. " |
|
f"Today’s date is {current_date}. " |
|
f"CV: {full_cv_text}" |
|
) |
|
|
|
if not isinstance(system_prompt, str): |
|
system_prompt = str(system_prompt) |
|
logger.info(f"System prompt type: {type(system_prompt)}, length: {len(system_prompt)}") |
|
|
|
messages = [{"role": "system", "content": system_prompt}] + history + [{"role": "user", "content": query}] |
|
|
|
try: |
|
system_tokens = len(generator.tokenize(system_prompt.encode('utf-8'), add_bos=True, special=True)) |
|
query_tokens = len(generator.tokenize(query.encode('utf-8'), add_bos=False, special=True)) |
|
history_tokens = [len(generator.tokenize(msg["content"].encode('utf-8'), add_bos=False, special=True)) for msg in history] |
|
except Exception as e: |
|
logger.error(f"Tokenization error: {str(e)}") |
|
yield f"data: Sorry, I encountered a tokenization error: {str(e)}\n\n" |
|
yield "data: [DONE]\n\n" |
|
return |
|
|
|
total_tokens = system_tokens + query_tokens + sum(history_tokens) + len(history) * 10 + 10 |
|
max_allowed_tokens = generator.n_ctx() - 512 - 100 |
|
|
|
while total_tokens > max_allowed_tokens and history: |
|
removed_msg = history.pop(0) |
|
removed_tokens = len(generator.tokenize(removed_msg["content"].encode('utf-8'), add_bos=False, special=True)) |
|
total_tokens -= (removed_tokens + 10) |
|
|
|
messages = [{"role": "system", "content": system_prompt}] + history + [{"role": "user", "content": query}] |
|
|
|
async with model_lock: |
|
try: |
|
for chunk in generator.create_chat_completion( |
|
messages=messages, |
|
max_tokens=512, |
|
stream=True, |
|
temperature=0.3, |
|
top_p=0.7, |
|
repeat_penalty=1.2 |
|
): |
|
token = chunk['choices'][0]['delta'].get('content', '') |
|
if token: |
|
if not first_token_logged: |
|
logger.info(f"First token time: {time.time() - start_time:.2f}s") |
|
first_token_logged = True |
|
yield f"data: {token}\n\n" |
|
yield "data: [DONE]\n\n" |
|
except Exception as e: |
|
logger.error(f"Generation error: {str(e)}") |
|
yield f"data: Sorry, I encountered an error during generation: {str(e)}\n\n" |
|
yield "data: [DONE]\n\n" |
|
|
|
class QueryRequest(BaseModel): |
|
query: str |
|
history: list[dict] |
|
|
|
|
|
def get_ram_usage(): |
|
memory = psutil.virtual_memory() |
|
total_ram = memory.total / (1024 ** 3) |
|
used_ram = memory.used / (1024 ** 3) |
|
free_ram = memory.available / (1024 ** 3) |
|
percent_used = memory.percent |
|
return { |
|
"total_ram_gb": round(total_ram, 2), |
|
"used_ram_gb": round(used_ram, 2), |
|
"free_ram_gb": round(free_ram, 2), |
|
"percent_used": percent_used |
|
} |
|
|
|
@app.post("/api/predict") |
|
async def predict(request: QueryRequest): |
|
query = request.query |
|
history = request.history |
|
return StreamingResponse(stream_response(query, history), media_type="text/event-stream") |
|
|
|
@app.get("/health") |
|
async def health_check(): |
|
return {"status": "healthy"} |
|
|
|
@app.get("/model_info") |
|
async def model_info(): |
|
return { |
|
"model_name": "deepcogito_cogito-v1-preview-llama-8B-GGUF", |
|
"model_size": "8B", |
|
"quantization": "Q4_K_M", |
|
"embedding_model": sentence_transformer_model, |
|
"faiss_index_size": len(cv_chunks), |
|
"faiss_index_dim": cv_embeddings.shape[1], |
|
} |
|
|
|
@app.get("/ram_usage") |
|
async def ram_usage(): |
|
"""Endpoint to get current RAM usage.""" |
|
try: |
|
ram_stats = get_ram_usage() |
|
return ram_stats |
|
except Exception as e: |
|
logger.error(f"Error retrieving RAM usage: {str(e)}") |
|
raise HTTPException(status_code=500, detail=f"Error retrieving RAM usage: {str(e)}") |
|
|
|
@app.on_event("startup") |
|
async def warm_up_model(): |
|
logger.info("Warming up the model...") |
|
dummy_query = "Hello" |
|
dummy_history = [] |
|
async for _ in stream_response(dummy_query, dummy_history): |
|
pass |
|
logger.info("Model warm-up completed.") |
|
|
|
ram_stats = get_ram_usage() |
|
logger.info(f"Initial RAM usage after startup: {ram_stats}") |
|
|
|
|
|
@app.on_event("startup") |
|
async def setup_periodic_tasks(): |
|
asyncio.create_task(keep_model_warm()) |
|
logger.info("Periodic model warm-up task scheduled") |
|
|
|
async def keep_model_warm(): |
|
"""Background task that keeps the model warm by sending periodic requests""" |
|
while True: |
|
try: |
|
logger.info("Performing periodic model warm-up") |
|
dummy_query = "Say only the word 'ok.'" |
|
dummy_history = [] |
|
|
|
async for _ in stream_response(dummy_query, dummy_history): |
|
pass |
|
logger.info("Periodic warm-up completed") |
|
except Exception as e: |
|
logger.error(f"Error in periodic warm-up: {str(e)}") |
|
|
|
|
|
await asyncio.sleep(13 * 60) |