website / app.py
Tim Luka Horstmann
Updated voice settings
b8961cc
from datetime import datetime
import json
import time
import numpy as np
from sentence_transformers import SentenceTransformer
from fastapi import FastAPI, HTTPException, BackgroundTasks, Request
from fastapi.responses import StreamingResponse, Response
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from llama_cpp import Llama
from huggingface_hub import login, hf_hub_download
import logging
import os
import faiss
import asyncio
import psutil # Added for RAM tracking
from google import genai
from google.genai import types
import httpx
from elevenlabs import ElevenLabs, VoiceSettings
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
# Initialize rate limiter
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
# Custom rate limit exceeded handler with logging
async def custom_rate_limit_handler(request: Request, exc: RateLimitExceeded):
client_ip = get_remote_address(request)
logger.warning(f"Rate limit exceeded for IP {client_ip} on endpoint {request.url.path}")
# Return a proper JSON response for rate limiting
return Response(
content=json.dumps({
"error": "rate_limit_exceeded",
"message": "Too many requests. Please wait a moment before trying again.",
"retry_after": 60 # seconds
}),
status_code=429,
headers={
"Content-Type": "application/json",
"Retry-After": "60"
}
)
app.add_exception_handler(RateLimitExceeded, custom_rate_limit_handler)
# Add CORS middleware to handle cross-origin requests
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, specify your domain
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Global lock for model access
model_lock = asyncio.Lock()
# Authenticate with Hugging Face
hf_token = os.getenv("HF_TOKEN")
if not hf_token:
logger.error("HF_TOKEN environment variable not set.")
raise ValueError("HF_TOKEN not set")
login(token=hf_token)
# Models Configuration
USE_GEMINI = os.getenv("USE_GEMINI", "false").lower() == "true"
sentence_transformer_model = "all-MiniLM-L6-v2"
repo_id = "unsloth/Qwen3-1.7B-GGUF" # "bartowski/deepcogito_cogito-v1-preview-llama-3B-GGUF" # "bartowski/deepcogito_cogito-v1-preview-llama-8B-GGUF"
filename = "Qwen3-1.7B-Q4_K_M.gguf" # "deepcogito_cogito-v1-preview-llama-3B-Q4_K_M.gguf"
# Gemini Configuration
if USE_GEMINI:
gemini_api_key = os.getenv("GEMINI_API_KEY")
if not gemini_api_key:
logger.error("GEMINI_API_KEY environment variable not set but USE_GEMINI is true.")
raise ValueError("GEMINI_API_KEY not set")
gemini_client = genai.Client(api_key=gemini_api_key)
gemini_model = "gemini-2.5-flash-preview-05-20"
logger.info("Gemini API client initialized")
else:
gemini_client = None
logger.info("Using local model (Gemini disabled)")
# ElevenLabs Configuration
elevenlabs_api_key = os.getenv("ELEVENLABS_API_KEY")
if elevenlabs_api_key:
elevenlabs_client = ElevenLabs(api_key=elevenlabs_api_key)
# You can set a specific voice ID here or use the default voice
# Get your voice ID from ElevenLabs dashboard after cloning your voice
tts_voice_id = os.getenv("ELEVENLABS_VOICE_ID", "21m00Tcm4TlvDq8ikWAM") # Default voice, replace with your cloned voice ID
logger.info("ElevenLabs TTS client initialized")
else:
elevenlabs_client = None
logger.info("ElevenLabs TTS disabled (no API key provided)")
# Define FAQs
faqs = [
{"question": "What is your name?", "answer": "My name is Tim Luka Horstmann."},
{"question": "Where do you live?", "answer": "I live in Paris, France."},
{"question": "What is your education?", "answer": "I am currently pursuing a MSc in Data and AI at Institut Polytechnique de Paris. I have an MPhil in Advanced Computer Science from the University of Cambridge, and a BSc in Business Informatics from RheinMain University of Applied Sciences."},
{"question": "What are your skills?", "answer": "I am proficient in Python, Java, SQL, Cypher, SPARQL, VBA, JavaScript, HTML/CSS, and Ruby. I also use tools like PyTorch, Hugging Face, Scikit-Learn, NumPy, Pandas, Matplotlib, Jupyter, Git, Bash, IoT, Ansible, QuickSight, and Wordpress."},
{"question": "How are you?", "answer": "I’m doing great, thanks for asking! I’m enjoying life in Paris and working on some exciting AI projects."},
{"question": "What do you do?", "answer": "I’m a Computer Scientist and AI enthusiast, currently pursuing a MSc in Data and AI at Institut Polytechnique de Paris and interning as a Machine Learning Research Engineer at Hi! PARIS."},
{"question": "How’s it going?", "answer": "Things are going well, thanks! I’m busy with my studies and research, but I love the challenges and opportunities I get to explore."},
]
try:
# Load CV embeddings and build FAISS index
logger.info("Loading CV embeddings from cv_embeddings.json")
with open("cv_embeddings.json", "r", encoding="utf-8") as f:
cv_data = json.load(f)
cv_chunks = [item["chunk"] for item in cv_data]
cv_embeddings = np.array([item["embedding"] for item in cv_data]).astype('float32')
faiss.normalize_L2(cv_embeddings)
faiss_index = faiss.IndexFlatIP(cv_embeddings.shape[1])
faiss_index.add(cv_embeddings)
logger.info("FAISS index built successfully")
# Load embedding model
logger.info("Loading SentenceTransformer model")
embedder = SentenceTransformer(sentence_transformer_model, device="cpu")
logger.info("SentenceTransformer model loaded")
# Compute FAQ embeddings
faq_questions = [faq["question"] for faq in faqs]
faq_embeddings = embedder.encode(faq_questions, convert_to_numpy=True).astype("float32")
faiss.normalize_L2(faq_embeddings)
# Load the local model only if not using Gemini
if not USE_GEMINI:
logger.info(f"Loading {filename} model")
model_path = hf_hub_download(
repo_id=repo_id,
filename=filename,
local_dir="/app/cache" if os.getenv("HF_HOME") else None,
token=hf_token,
)
generator = Llama(
model_path=model_path,
n_ctx=3072,
n_threads=2,
n_batch=64,
n_gpu_layers=0,
use_mlock=True,
f16_kv=True,
verbose=True,
batch_prefill=True,
prefill_logits=False,
)
logger.info(f"{filename} model loaded")
else:
generator = None
logger.info("Skipping local model loading (using Gemini API)")
except Exception as e:
logger.error(f"Startup error: {str(e)}", exc_info=True)
raise
def retrieve_context(query, top_k=2):
try:
query_embedding = embedder.encode(query, convert_to_numpy=True).astype("float32")
query_embedding = query_embedding.reshape(1, -1)
faiss.normalize_L2(query_embedding)
distances, indices = faiss_index.search(query_embedding, top_k)
return "\n".join([cv_chunks[i] for i in indices[0]])
except Exception as e:
logger.error(f"Error in retrieve_context: {str(e)}")
raise
# Load the full CV at startup with explicit UTF-8 handling
try:
with open("cv_text.txt", "r", encoding="utf-8") as f:
full_cv_text = f.read()
if not isinstance(full_cv_text, str):
full_cv_text = str(full_cv_text)
logger.info("CV text loaded successfully")
except Exception as e:
logger.error(f"Error loading cv_text.txt: {str(e)}")
raise
async def stream_response(query, history):
"""Main streaming response function that routes to either Gemini or local model"""
if USE_GEMINI:
async for chunk in stream_response_gemini(query, history):
yield chunk
else:
async for chunk in stream_response_local(query, history):
yield chunk
async def stream_response_gemini(query, history):
"""Stream response using Gemini API with a proper system_instruction."""
logger.info(f"Processing query with Gemini: {query}")
start_time = time.time()
first_token_logged = False
# 1) Build your system prompt once
current_date = datetime.now().strftime("%Y-%m-%d")
system_prompt = (
"You are Tim Luka Horstmann, a Computer Scientist. "
"A user is asking you a question. Respond as yourself, using the first person, in a friendly and concise manner. "
"For questions about your CV, base your answer *exclusively* on the provided CV information below and do not add any details not explicitly stated. "
"For casual questions not covered by the CV, respond naturally but limit answers to general truths about yourself (e.g., your current location is Paris, France) "
"and say 'I don't have specific details to share about that' if pressed for specifics beyond the CV or FAQs. "
f"Today's date is {current_date}. CV: {full_cv_text}"
)
# 2) Build only user/model history as `contents`
contents = []
for msg in history:
# Ensure the role is compatible with Gemini API ('user' or 'model')
api_role = ""
if msg["role"] == "user":
api_role = "user"
elif msg["role"] == "assistant": # Map "assistant" from client to "model" for API
api_role = "model"
elif msg["role"] == "model": # Already correct
api_role = "model"
else:
# Log a warning or handle unrecognized roles as needed
logger.warning(f"Unrecognized role '{msg['role']}' in history. Skipping message.")
continue
contents.append(
types.Content(
role=api_role,
parts=[ types.Part.from_text(text=msg["content"]) ]
)
)
# finally append the new user question
contents.append(
types.Content(
role="user",
parts=[ types.Part.from_text(text=query) ]
)
)
# 3) Call Gemini with `system_instruction`
try:
response = gemini_client.models.generate_content_stream(
model=gemini_model,
contents=contents,
config=types.GenerateContentConfig(
system_instruction=system_prompt,
temperature=0.3,
top_p=0.7,
max_output_tokens=1024,
response_mime_type="text/plain",
)
)
for chunk in response:
if chunk.text:
if not first_token_logged:
logger.info(f"First token time (Gemini): {time.time() - start_time:.2f}s")
first_token_logged = True
yield f"data: {chunk.text}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
logger.error(f"Gemini API error: {str(e)}")
yield f"data: Sorry, I encountered an error with Gemini API: {str(e)}\n\n"
yield "data: [DONE]\n\n"
async def stream_response_local(query, history):
"""Stream response using local model"""
logger.info(f"Processing query with local model: {query}")
start_time = time.time()
first_token_logged = False
current_date = datetime.now().strftime("%Y-%m-%d")
system_prompt = (
"/no_think You are Tim Luka Horstmann, a Computer Scientist. A user is asking you a question. Respond as yourself, using the first person, in a friendly and concise manner. "
"For questions about your CV, base your answer *exclusively* on the provided CV information below and do not add any details not explicitly stated. "
"For casual questions not covered by the CV, respond naturally but limit answers to general truths about yourself (e.g., your current location is Paris, France, or your field is AI) "
"and say 'I don't have specific details to share about that' if pressed for specifics beyond the CV or FAQs. Do not invent facts, experiences, or opinions not supported by the CV or FAQs. "
f"Today's date is {current_date}. "
f"CV: {full_cv_text}"
)
if not isinstance(system_prompt, str):
system_prompt = str(system_prompt)
logger.info(f"System prompt type: {type(system_prompt)}, length: {len(system_prompt)}")
messages = [{"role": "system", "content": system_prompt}] + history + [{"role": "user", "content": "/no_think" + query}]
try:
system_tokens = len(generator.tokenize(system_prompt.encode('utf-8'), add_bos=True, special=True))
query_tokens = len(generator.tokenize(query.encode('utf-8'), add_bos=False, special=True))
history_tokens = [len(generator.tokenize(msg["content"].encode('utf-8'), add_bos=False, special=True)) for msg in history]
except Exception as e:
logger.error(f"Tokenization error: {str(e)}")
yield f"data: Sorry, I encountered a tokenization error: {str(e)}\n\n"
yield "data: [DONE]\n\n"
return
total_tokens = system_tokens + query_tokens + sum(history_tokens) + len(history) * 10 + 10
max_allowed_tokens = generator.n_ctx() - 512 - 100
while total_tokens > max_allowed_tokens and history:
removed_msg = history.pop(0)
removed_tokens = len(generator.tokenize(removed_msg["content"].encode('utf-8'), add_bos=False, special=True))
total_tokens -= (removed_tokens + 10)
messages = [{"role": "system", "content": system_prompt}] + history + [{"role": "user", "content": query}]
async with model_lock:
try:
for chunk in generator.create_chat_completion(
messages=messages,
max_tokens=512,
stream=True,
temperature=0.3,
top_p=0.7,
repeat_penalty=1.2
):
token = chunk['choices'][0]['delta'].get('content', '')
if token:
if not first_token_logged:
logger.info(f"First token time (local): {time.time() - start_time:.2f}s")
first_token_logged = True
yield f"data: {token}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
logger.error(f"Generation error: {str(e)}")
yield f"data: Sorry, I encountered an error during generation: {str(e)}\n\n"
yield "data: [DONE]\n\n"
class QueryRequest(BaseModel):
query: str
history: list
class TTSRequest(BaseModel):
text: str
# RAM Usage Tracking Function
def get_ram_usage():
memory = psutil.virtual_memory()
total_ram = memory.total / (1024 ** 3) # Convert to GB
used_ram = memory.used / (1024 ** 3) # Convert to GB
free_ram = memory.available / (1024 ** 3) # Convert to GB
percent_used = memory.percent
return {
"total_ram_gb": round(total_ram, 2),
"used_ram_gb": round(used_ram, 2),
"free_ram_gb": round(free_ram, 2),
"percent_used": percent_used
}
@app.post("/api/predict")
@limiter.limit("5/minute") # Allow 5 chat requests per minute per IP
async def predict(request: Request, query_request: QueryRequest):
query = query_request.query
history = query_request.history
return StreamingResponse(stream_response(query, history), media_type="text/event-stream")
@app.post("/api/tts")
@limiter.limit("5/minute") # Allow 5 TTS requests per minute per IP
async def text_to_speech(request: Request, tts_request: TTSRequest):
"""Convert text to speech using ElevenLabs API"""
if not elevenlabs_client:
raise HTTPException(status_code=503, detail="TTS service not available")
try:
# Clean the text for TTS (remove markdown and special characters)
clean_text = tts_request.text.replace("**", "").replace("*", "").replace("\n", " ").strip()
if not clean_text:
raise HTTPException(status_code=400, detail="No text provided for TTS")
if len(clean_text) > 1000: # Limit text length to avoid long processing times
clean_text = clean_text[:1000] + "..."
# Generate speech
response = elevenlabs_client.text_to_speech.convert(
voice_id=tts_voice_id,
model_id="eleven_flash_v2_5",
text=clean_text,
voice_settings=VoiceSettings(
stability=0.7, # More stability = less variability; best: 0.7–0.85
similarity_boost=0.9, # Boost similarity to original voice
style=0.2, # Keep subtle emotion; increase for expressive output
use_speaker_boost=True # Helps preserve speaker identity better
)
)
# Convert generator to bytes
audio_bytes = b"".join(response)
return Response(
content=audio_bytes,
media_type="audio/mpeg",
headers={
"Content-Disposition": "inline; filename=tts_audio.mp3",
"Cache-Control": "no-cache"
}
)
except Exception as e:
logger.error(f"TTS error: {str(e)}")
raise HTTPException(status_code=500, detail=f"TTS conversion failed: {str(e)}")
@app.get("/health")
@limiter.limit("30/minute") # Allow frequent health checks
async def health_check(request: Request):
return {"status": "healthy"}
@app.get("/model_info")
@limiter.limit("10/minute") # Limit model info requests
async def model_info(request: Request):
base_info = {
"embedding_model": sentence_transformer_model,
"faiss_index_size": len(cv_chunks),
"faiss_index_dim": cv_embeddings.shape[1],
"tts_available": elevenlabs_client is not None,
}
if USE_GEMINI:
base_info.update({
"model_type": "gemini",
"model_name": gemini_model,
"provider": "Google Gemini API",
})
else:
base_info.update({
"model_type": "local",
"model_name": filename,
"repo_id": repo_id,
"model_size": "1.7B",
"quantization": "Q4_K_M",
})
return base_info
@app.get("/ram_usage")
@limiter.limit("20/minute") # Allow moderate monitoring requests
async def ram_usage(request: Request):
"""Endpoint to get current RAM usage."""
try:
ram_stats = get_ram_usage()
return ram_stats
except Exception as e:
logger.error(f"Error retrieving RAM usage: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error retrieving RAM usage: {str(e)}")
# @app.on_event("startup")
# async def warm_up_model():
# logger.info("Warming up the model...")
# dummy_query = "Hello"
# dummy_history = []
# async for _ in stream_response(dummy_query, dummy_history):
# pass
# logger.info("Model warm-up completed.")
# # Log initial RAM usage
# ram_stats = get_ram_usage()
# logger.info(f"Initial RAM usage after startup: {ram_stats}")
# Add a background task to keep the model warm
@app.on_event("startup")
async def setup_periodic_tasks():
if not USE_GEMINI: # Only warm up local models
asyncio.create_task(keep_model_warm())
logger.info("Periodic model warm-up task scheduled for local model")
else:
logger.info("Gemini API in use - no warm-up needed")
async def keep_model_warm():
"""Background task that keeps the local model warm by sending periodic requests"""
while True:
try:
logger.info("Performing periodic local model warm-up")
dummy_query = "Say only the word 'ok.'"
dummy_history = []
# Process a dummy query through the generator to keep it warm
async for _ in stream_response(dummy_query, dummy_history):
pass
logger.info("Periodic warm-up completed")
except Exception as e:
logger.error(f"Error in periodic warm-up: {str(e)}")
# Wait for 13 minutes before the next warm-up
await asyncio.sleep(13 * 60)