Codingo / backend /services /interview_engine.py
husseinelsaadi's picture
updated
aea35a7
import os
import json
import asyncio
import edge_tts
from faster_whisper import WhisperModel
from langchain_groq import ChatGroq
import logging
import tempfile
import shutil
import torch
from backend.services.interview_retrieval import (
extract_all_roles_from_qdrant,
retrieve_interview_data,
random_context_chunks,
get_role_questions, # πŸ” For sample questions
qdrant_client # πŸ” For collection info
)
try:
print("πŸ” Qdrant Collections:", qdrant_client.get_collections())
info = qdrant_client.get_collection("interview_questions")
print("βœ… Vector size:", info.config.params.vectors.size)
print("βœ… Distance metric:", info.config.params.vectors.distance)
all_roles_debug = extract_all_roles_from_qdrant()
print(f"βœ… Found {len(all_roles_debug)} roles:", all_roles_debug)
if all_roles_debug:
sample_questions_debug = get_role_questions(all_roles_debug[0])
print(f"βœ… Sample questions for '{all_roles_debug[0]}': {len(sample_questions_debug)} found")
except Exception as e:
print("⚠️ Qdrant check failed:", e)
if torch.cuda.is_available():
print("πŸ”₯ CUDA Available")
print(torch.cuda.get_device_name(0))
print("cuDNN version:", torch.backends.cudnn.version())
else:
print("❌ CUDA Not Available")
print("πŸ”₯ CUDA:", torch.cuda.is_available())
print("🧠 GPU:", torch.cuda.get_device_name(0))
print("πŸ’‘ cuDNN version:", torch.backends.cudnn.version())
print("πŸ’₯ cuDNN enabled:", torch.backends.cudnn.is_available())
# Initialize models
chat_groq_api = os.getenv("GROQ_API_KEY")
# Attempt to initialize the Groq LLM only if an API key is provided. When
# running in environments where the key is unavailable (such as local
# development or automated testing), fall back to a simple stub that
# generates generic responses. This avoids raising an exception at import
# time and allows the rest of the application to run without external
# dependencies. See the DummyGroq class defined below.
if chat_groq_api:
try:
groq_llm = ChatGroq(
temperature=0.7,
model_name="llama-3.3-70b-versatile",
api_key=chat_groq_api
)
except Exception as e:
logging.error(f"Error initializing ChatGroq LLM: {e}. Falling back to dummy model.")
groq_llm = None
else:
groq_llm = None
if groq_llm is None:
class DummyGroq:
"""A fallback language model used when no Groq API key is set.
The ``invoke`` method of this class returns a simple canned response
rather than calling an external API. This ensures that the
interview functionality still produces a sensible prompt, albeit
without advanced LLM behaviour.
"""
def invoke(self, prompt: str):
# Provide a very generic question based on the prompt. This
# implementation ignores the prompt contents entirely; in a more
# sophisticated fallback you could parse ``prompt`` to tailor
# responses.
return "Tell me about yourself and why you're interested in this position."
groq_llm = DummyGroq()
# Initialize Whisper model
#
# Loading the Whisper model can take several seconds on first use because the
# model weights must be downloaded from Hugging Face. This delay can cause
# the API call to ``/api/transcribe_audio`` to appear stuck while the model
# downloads. To mitigate this, we allow the model size to be configured via
# the ``WHISPER_MODEL_NAME`` environment variable and preload the model when
# this module is imported. Using a smaller model (e.g. "tiny" or "base.en")
# reduces download size and inference time considerably.
whisper_model = None
def load_whisper_model():
global whisper_model
if whisper_model is None:
try:
device = "cuda" if torch.cuda.is_available() else "cpu"
compute_type = "float16" if device == "cuda" else "int8"
# Allow overriding the model size via environment. Default to a
# lightweight model to improve startup times. Available options
# include: tiny, base, base.en, small, medium, large. See
# https://huggingface.co/ggerganov/whisper.cpp for details.
model_name = os.getenv("WHISPER_MODEL_NAME", "tiny")
whisper_model = WhisperModel(model_name, device=device, compute_type=compute_type)
logging.info(f"Whisper model '{model_name}' loaded on {device} with {compute_type}")
except Exception as e:
logging.error(f"Error loading Whisper model: {e}")
# Fallback to CPU
whisper_model = WhisperModel(model_name if 'model_name' in locals() else "tiny", device="cpu", compute_type="int8")
return whisper_model
load_whisper_model()
def generate_first_question(profile, job):
"""Generate the first interview question based on profile and job"""
all_roles = extract_all_roles_from_qdrant()
logging.info(f"[QDRANT DEBUG] Available Roles: {all_roles}")
retrieved_data = retrieve_interview_data(job.role.lower(), all_roles)
logging.info(f"[QDRANT DEBUG] Role requested: {job.role.lower()}")
logging.info(f"[QDRANT DEBUG] Questions retrieved: {len(retrieved_data)}")
if retrieved_data:
logging.info(f"[QDRANT DEBUG] Sample Q: {retrieved_data[0]['question']}")
else:
logging.warning("[QDRANT DEBUG] No questions retrieved, falling back to defaults")
context_data = random_context_chunks(retrieved_data, k=4) if retrieved_data else ""
try:
prompt = f"""
You are LUNA, an AI recruiter conducting an interview for a {job.role} position at {job.company}.
Candidate profile:
- Skills: {profile.get('skills', [])}
- Experience: {profile.get('experience', [])}
- Education: {profile.get('education', [])}
Interview style:
- Start the interview in a friendly but professional way.
- Always begin with: "Hi, how are you? I'm LUNA, your AI recruiter."
- If the candidate has previous experience, reference their most recent role or company:
Example: "I see you previously worked at {profile.get('experience', [''])[0]}.
Can you tell me more about your time there, along with your education and overall background?"
- If no experience is available, simply ask them to tell you about their background, education, and experience.
Respond ONLY with the question text, no formatting or extra notes.
"""
response = groq_llm.invoke(prompt)
# Fix: Handle AIMessage object properly
if hasattr(response, 'content'):
question = response.content.strip()
elif isinstance(response, str):
question = response.strip()
else:
question = str(response).strip()
# Ensure we have a valid question
if not question or len(question) < 10:
question = "Tell me about yourself and why you're interested in this position."
logging.info(f"Generated question: {question}")
return question
except Exception as e:
logging.error(f"Error generating first question: {e}")
return "Tell me about yourself and why you're interested in this position."
def generate_next_question(profile, job, conversation_history, last_answer):
"""Generate the next interview question based on profile, job, and conversation so far"""
all_roles = extract_all_roles_from_qdrant()
logging.info(f"[QDRANT DEBUG] Available Roles: {all_roles}")
retrieved_data = retrieve_interview_data(job.role.lower(), all_roles)
logging.info(f"[QDRANT DEBUG] Role requested: {job.role.lower()}")
logging.info(f"[QDRANT DEBUG] Questions retrieved: {len(retrieved_data)}")
if retrieved_data:
logging.info(f"[QDRANT DEBUG] Sample Next Q: {retrieved_data[0]['question']}")
else:
logging.warning("[QDRANT DEBUG] No questions retrieved, falling back to defaults")
context_data = random_context_chunks(retrieved_data, k=4) if retrieved_data else ""
try:
prompt = f"""
You are LUNA, an AI recruiter continuing an interview for a {job.role} position at {job.company}.
Candidate profile:
- Skills: {profile.get('skills', [])}
- Experience: {profile.get('experience', [])}
- Education: {profile.get('education', [])}
Conversation so far:
{conversation_history}
Candidate's last answer:
{last_answer}
Interview style:
- Acknowledge the candidate's last answer naturally (e.g., "That's a great point", "I see what you mean").
- Then ask a related follow-up question that connects to what they just said.
- Keep the tone professional, concise, and relevant to the role.
- If technical, dig deeper into skills or tools they mentioned.
- If behavioral, expand on situations or experiences they described.
Respond ONLY with the next question text (no formatting, no commentary).
"""
response = groq_llm.invoke(prompt)
if hasattr(response, 'content'):
question = response.content.strip()
elif isinstance(response, str):
question = response.strip()
else:
question = str(response).strip()
if not question or len(question) < 10:
question = "Could you elaborate more on your last point?"
logging.info(f"Generated next question: {question}")
return question
except Exception as e:
logging.error(f"Error generating next question: {e}")
return "Could you elaborate more on your last point?"
def edge_tts_to_file_sync(text, output_path, voice="en-US-AriaNeural"):
"""Synchronous wrapper for edge-tts with better error handling"""
try:
# Ensure text is not empty
if not text or not text.strip():
logging.error("Empty text provided for TTS")
return None
# Ensure the directory exists and is writable
directory = os.path.dirname(output_path)
if not directory:
directory = "/tmp/audio"
output_path = os.path.join(directory, os.path.basename(output_path))
os.makedirs(directory, exist_ok=True)
# Test write permissions with a temporary file
test_file = os.path.join(directory, f"test_{os.getpid()}.tmp")
try:
with open(test_file, 'w') as f:
f.write("test")
os.remove(test_file)
logging.info(f"Directory {directory} is writable")
except (PermissionError, OSError) as e:
logging.error(f"Directory {directory} is not writable: {e}")
# Fallback to /tmp
directory = "/tmp/audio"
output_path = os.path.join(directory, os.path.basename(output_path))
os.makedirs(directory, exist_ok=True)
async def generate_audio():
try:
communicate = edge_tts.Communicate(text, voice)
await communicate.save(output_path)
logging.info(f"TTS audio saved to: {output_path}")
except Exception as e:
logging.error(f"Error in async TTS generation: {e}")
raise
# Run async function in sync context
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# If loop is already running, create a new one in a thread
import threading
import concurrent.futures
def run_in_thread():
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
try:
new_loop.run_until_complete(generate_audio())
finally:
new_loop.close()
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(run_in_thread)
future.result(timeout=30) # 30 second timeout
else:
loop.run_until_complete(generate_audio())
except RuntimeError:
# No event loop exists
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(generate_audio())
finally:
loop.close()
# Verify file was created and has content
if os.path.exists(output_path):
file_size = os.path.getsize(output_path)
if file_size > 1000: # At least 1KB for a valid audio file
logging.info(f"TTS file created successfully: {output_path} ({file_size} bytes)")
return output_path
else:
logging.error(f"TTS file is too small: {output_path} ({file_size} bytes)")
return None
else:
logging.error(f"TTS file was not created: {output_path}")
return None
except Exception as e:
logging.error(f"Error in TTS generation: {e}")
return None
def convert_webm_to_wav(webm_path, wav_path):
"""Convert WebM audio to WAV using ffmpeg if available"""
try:
import subprocess
result = subprocess.run([
'ffmpeg', '-i', webm_path, '-ar', '16000', '-ac', '1', '-y', wav_path
], capture_output=True, text=True, timeout=30)
if result.returncode == 0 and os.path.exists(wav_path) and os.path.getsize(wav_path) > 0:
logging.info(f"Successfully converted {webm_path} to {wav_path}")
return wav_path
else:
logging.error(f"FFmpeg conversion failed: {result.stderr}")
return None
except (subprocess.TimeoutExpired, FileNotFoundError, Exception) as e:
logging.error(f"Error converting audio: {e}")
return None
def generate_next_question(profile, job, conversation_history, last_answer):
"""Generate the next interview question based on profile, job, and conversation so far"""
all_roles = extract_all_roles_from_qdrant()
logging.info(f"[QDRANT DEBUG] Available Roles: {all_roles}")
retrieved_data = retrieve_interview_data(job.role.lower(), all_roles)
logging.info(f"[QDRANT DEBUG] Role requested: {job.role.lower()}")
logging.info(f"[QDRANT DEBUG] Questions retrieved: {len(retrieved_data)}")
if retrieved_data:
logging.info(f"[QDRANT DEBUG] Sample Next Q: {retrieved_data[0]['question']}")
else:
logging.warning("[QDRANT DEBUG] No questions retrieved, falling back to defaults")
context_data = random_context_chunks(retrieved_data, k=4) if retrieved_data else ""
try:
prompt = f"""
You are continuing an interview for a {job.role} position at {job.company}.
Candidate's profile:
- Skills: {profile.get('skills', [])}
- Experience: {profile.get('experience', [])}
- Education: {profile.get('education', [])}
Conversation so far:
{conversation_history}
Candidate's last answer:
{last_answer}
Use the following context to generate the next question:
{context_data}
Generate an appropriate follow-up interview question that is professional and relevant.
Keep it concise and clear. If the interview is for a technical role, focus on technical skills.
"""
response = groq_llm.invoke(prompt)
if hasattr(response, 'content'):
question = response.content.strip()
elif isinstance(response, str):
question = response.strip()
else:
question = str(response).strip()
if not question or len(question) < 10:
question = "Could you elaborate more on your last point?"
logging.info(f"Generated next question: {question}")
return question
except Exception as e:
logging.error(f"Error generating next question: {e}")
return "Could you elaborate more on your last point?"
import subprocess # top of the file if not already imported
def whisper_stt(audio_path):
"""Speech-to-text using Faster-Whisper"""
try:
if not os.path.exists(audio_path) or os.path.getsize(audio_path) == 0:
logging.error(f"Audio file is empty or missing: {audio_path}")
return ""
# Convert webm to wav using ffmpeg
wav_path = audio_path.replace(".webm", ".wav")
cmd = [
"ffmpeg",
"-y", # overwrite
"-i", audio_path,
"-ar", "16000",
"-ac", "1",
"-f", "wav",
wav_path
]
subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if not os.path.exists(wav_path) or os.path.getsize(wav_path) == 0:
logging.error(f"FFmpeg conversion failed or produced empty file: {wav_path}")
return ""
model = load_whisper_model()
segments, _ = model.transcribe(wav_path)
transcript = " ".join(segment.text for segment in segments)
return transcript.strip()
except Exception as e:
logging.error(f"Error in STT: {e}")
return ""
def evaluate_answer(question, answer, job_role="Software Developer", seniority="Mid-level"):
"""Evaluate candidate's answer with better error handling"""
try:
if not answer or not answer.strip():
return {
"score": "Poor",
"feedback": "No answer provided."
}
prompt = f"""
You are evaluating a candidate's answer for a {seniority} {job_role} position.
Question: {question}
Candidate Answer: {answer}
Evaluate based on technical correctness, clarity, and relevance.
Provide a brief evaluation in 1-2 sentences.
Rate the answer as one of: Poor, Medium, Good, Excellent
Respond in this exact format:
Score: [Poor/Medium/Good/Excellent]
Feedback: [Your brief feedback here]
"""
response = groq_llm.invoke(prompt)
# Handle AIMessage object properly
if hasattr(response, 'content'):
response_text = response.content.strip()
elif isinstance(response, str):
response_text = response.strip()
else:
response_text = str(response).strip()
# Parse the response
lines = response_text.split('\n')
score = "Medium" # default
feedback = "Good answer, but could be more detailed." # default
for line in lines:
line = line.strip()
if line.startswith('Score:'):
score = line.replace('Score:', '').strip()
elif line.startswith('Feedback:'):
feedback = line.replace('Feedback:', '').strip()
# Ensure score is valid
valid_scores = ["Poor", "Medium", "Good", "Excellent"]
if score not in valid_scores:
score = "Medium"
return {
"score": score,
"feedback": feedback
}
except Exception as e:
logging.error(f"Error evaluating answer: {e}")
return {
"score": "Medium",
"feedback": "Unable to evaluate answer at this time."
}