website / app.py
Tim Luka Horstmann
Fixed
3bbf0cd
raw
history blame
9.4 kB
from datetime import datetime
import json
import time
import numpy as np
from sentence_transformers import SentenceTransformer
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from llama_cpp import Llama
from huggingface_hub import login, hf_hub_download
import logging
import os
import faiss
import asyncio
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
# Global lock for model access
model_lock = asyncio.Lock()
# Authenticate with Hugging Face
hf_token = os.getenv("HF_TOKEN")
if not hf_token:
logger.error("HF_TOKEN environment variable not set.")
raise ValueError("HF_TOKEN not set")
login(token=hf_token)
# Models Configuration
sentence_transformer_model = "all-MiniLM-L6-v2"
repo_id = "bartowski/deepcogito_cogito-v1-preview-llama-8B-GGUF"
filename = "deepcogito_cogito-v1-preview-llama-8B-Q4_K_M.gguf"
# Define FAQs
faqs = [
{"question": "What is your name?", "answer": "My name is Tim Luka Horstmann."},
{"question": "Where do you live?", "answer": "I live in Paris, France."},
{"question": "What is your education?", "answer": "I am currently pursuing a MSc in Data and AI at Institut Polytechnique de Paris. I have an MPhil in Advanced Computer Science from the University of Cambridge, and a BSc in Business Informatics from RheinMain University of Applied Sciences."},
{"question": "What are your skills?", "answer": "I am proficient in Python, Java, SQL, Cypher, SPARQL, VBA, JavaScript, HTML/CSS, and Ruby. I also use tools like PyTorch, Hugging Face, Scikit-Learn, NumPy, Pandas, Matplotlib, Jupyter, Git, Bash, IoT, Ansible, QuickSight, and Wordpress."},
{"question": "How are you?", "answer": "I’m doing great, thanks for asking! I’m enjoying life in Paris and working on some exciting AI projects."},
{"question": "What do you do?", "answer": "I’m a Computer Scientist and AI enthusiast, currently pursuing a MSc in Data and AI at Institut Polytechnique de Paris and interning as a Machine Learning Research Engineer at Hi! PARIS."},
{"question": "How’s it going?", "answer": "Things are going well, thanks! I’m busy with my studies and research, but I love the challenges and opportunities I get to explore."},
]
try:
# Load CV embeddings and build FAISS index
logger.info("Loading CV embeddings from cv_embeddings.json")
with open("cv_embeddings.json", "r", encoding="utf-8") as f:
cv_data = json.load(f)
cv_chunks = [item["chunk"] for item in cv_data]
cv_embeddings = np.array([item["embedding"] for item in cv_data]).astype('float32')
faiss.normalize_L2(cv_embeddings)
faiss_index = faiss.IndexFlatIP(cv_embeddings.shape[1])
faiss_index.add(cv_embeddings)
logger.info("FAISS index built successfully")
# Load embedding model
logger.info("Loading SentenceTransformer model")
embedder = SentenceTransformer(sentence_transformer_model, device="cpu")
logger.info("SentenceTransformer model loaded")
# Compute FAQ embeddings
faq_questions = [faq["question"] for faq in faqs]
faq_embeddings = embedder.encode(faq_questions, convert_to_numpy=True).astype("float32")
faiss.normalize_L2(faq_embeddings)
# Load the 8B Cogito model with optimized parameters
logger.info(f"Loading {filename} model")
model_path = hf_hub_download(
repo_id=repo_id,
filename=filename,
local_dir="/app/cache" if os.getenv("HF_HOME") else None,
token=hf_token,
)
generator = Llama(
model_path=model_path,
n_ctx=3072,
n_threads=2,
n_batch=128,
n_gpu_layers=0,
f16_kv=True,
verbose=True,
)
logger.info(f"{filename} model loaded")
except Exception as e:
logger.error(f"Startup error: {str(e)}", exc_info=True)
raise
def retrieve_context(query, top_k=2):
try:
query_embedding = embedder.encode(query, convert_to_numpy=True).astype("float32")
query_embedding = query_embedding.reshape(1, -1)
faiss.normalize_L2(query_embedding)
distances, indices = faiss_index.search(query_embedding, top_k)
return "\n".join([cv_chunks[i] for i in indices[0]])
except Exception as e:
logger.error(f"Error in retrieve_context: {str(e)}")
raise
# Load the full CV at startup with explicit UTF-8 handling
try:
with open("cv_text.txt", "r", encoding="utf-8") as f:
full_cv_text = f.read()
# Ensure full_cv_text is a string
if not isinstance(full_cv_text, str):
full_cv_text = str(full_cv_text)
logger.info("CV text loaded successfully")
except Exception as e:
logger.error(f"Error loading cv_text.txt: {str(e)}")
raise
async def stream_response(query, history):
logger.info(f"Processing query: {query}")
start_time = time.time()
first_token_logged = False
current_date = datetime.now().strftime("%Y-%m-%d")
system_prompt = (
"You are Tim Luka Horstmann, a Computer Scientist. A user is asking you a question. Respond as yourself, using the first person, in a friendly and concise manner. "
"For questions about your CV, base your answer *exclusively* on the provided CV information below and do not add any details not explicitly stated. "
"For casual questions not covered by the CV, respond naturally but limit answers to general truths about yourself (e.g., your current location is Paris, France, or your field is AI) "
"and say 'I don't have specific details to share about that' if pressed for specifics beyond the CV or FAQs. Do not invent facts, experiences, or opinions not supported by the CV or FAQs. "
f"Today’s date is {current_date}. "
f"CV: {full_cv_text}"
)
# Ensure system_prompt is a string and debug its state
if not isinstance(system_prompt, str):
system_prompt = str(system_prompt)
logger.info(f"System prompt type: {type(system_prompt)}, length: {len(system_prompt)}")
# Combine system prompt, history, and current query
messages = [{"role": "system", "content": system_prompt}] + history + [{"role": "user", "content": query}]
# Estimate token counts and truncate history if necessary
try:
system_tokens = len(generator.tokenize(system_prompt.encode('utf-8'), add_bos=True, special=True))
query_tokens = len(generator.tokenize(query.encode('utf-8'), add_bos=False, special=True))
history_tokens = [len(generator.tokenize(msg["content"].encode('utf-8'), add_bos=False, special=True)) for msg in history]
except Exception as e:
logger.error(f"Tokenization error: {str(e)}")
yield f"data: Sorry, I encountered a tokenization error: {str(e)}\n\n"
yield "data: [DONE]\n\n"
return
total_tokens = system_tokens + query_tokens + sum(history_tokens) + len(history) * 10 + 10 # Rough estimate for formatting
max_allowed_tokens = generator.n_ctx - 512 - 100 # max_tokens=512, safety_margin=100
while total_tokens > max_allowed_tokens and history:
removed_msg = history.pop(0)
removed_tokens = len(generator.tokenize(removed_msg["content"].encode('utf-8'), add_bos=False, special=True))
total_tokens -= (removed_tokens + 10)
# Reconstruct messages after possible truncation
messages = [{"role": "system", "content": system_prompt}] + history + [{"role": "user", "content": query}]
# Generate response with lock
async with model_lock:
try:
for chunk in generator.create_chat_completion(
messages=messages,
max_tokens=512,
stream=True,
temperature=0.3,
top_p=0.7,
repeat_penalty=1.2
):
token = chunk['choices'][0]['delta'].get('content', '')
if token:
if not first_token_logged:
logger.info(f"First token time: {time.time() - start_time:.2f}s")
first_token_logged = True
yield f"data: {token}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
logger.error(f"Generation error: {str(e)}")
yield f"data: Sorry, I encountered an error during generation: {str(e)}\n\n"
yield "data: [DONE]\n\n"
class QueryRequest(BaseModel):
query: str
history: list[dict]
@app.post("/api/predict")
async def predict(request: QueryRequest):
query = request.query
history = request.history
return StreamingResponse(stream_response(query, history), media_type="text/event-stream")
@app.get("/health")
async def health_check():
return {"status": "healthy"}
@app.get("/model_info")
async def model_info():
return {
"model_name": "deepcogito_cogito-v1-preview-llama-8B-GGUF",
"model_size": "8B",
"quantization": "Q4_K_M",
"embedding_model": sentence_transformer_model,
"faiss_index_size": len(cv_chunks),
"faiss_index_dim": cv_embeddings.shape[1],
}
@app.on_event("startup")
async def warm_up_model():
logger.info("Warming up the model...")
dummy_query = "Hello"
dummy_history = []
async for _ in stream_response(dummy_query, dummy_history):
pass
logger.info("Model warm-up completed.")