Spaces:
Paused
Paused
import os | |
import json | |
import asyncio | |
import edge_tts | |
from faster_whisper import WhisperModel | |
from langchain_groq import ChatGroq | |
import logging | |
import tempfile | |
import shutil | |
import torch | |
from backend.services.interview_retrieval import ( | |
extract_all_roles_from_qdrant, | |
retrieve_interview_data, | |
random_context_chunks, | |
get_role_questions, # π For sample questions | |
qdrant_client # π For collection info | |
) | |
try: | |
print("π Qdrant Collections:", qdrant_client.get_collections()) | |
info = qdrant_client.get_collection("interview_questions") | |
print("β Vector size:", info.config.params.vectors.size) | |
print("β Distance metric:", info.config.params.vectors.distance) | |
all_roles_debug = extract_all_roles_from_qdrant() | |
print(f"β Found {len(all_roles_debug)} roles:", all_roles_debug) | |
if all_roles_debug: | |
sample_questions_debug = get_role_questions(all_roles_debug[0]) | |
print(f"β Sample questions for '{all_roles_debug[0]}': {len(sample_questions_debug)} found") | |
except Exception as e: | |
print("β οΈ Qdrant check failed:", e) | |
if torch.cuda.is_available(): | |
print("π₯ CUDA Available") | |
print(torch.cuda.get_device_name(0)) | |
print("cuDNN version:", torch.backends.cudnn.version()) | |
else: | |
print("β CUDA Not Available") | |
print("π₯ CUDA:", torch.cuda.is_available()) | |
print("π§ GPU:", torch.cuda.get_device_name(0)) | |
print("π‘ cuDNN version:", torch.backends.cudnn.version()) | |
print("π₯ cuDNN enabled:", torch.backends.cudnn.is_available()) | |
# Initialize models | |
chat_groq_api = os.getenv("GROQ_API_KEY") | |
# Attempt to initialize the Groq LLM only if an API key is provided. When | |
# running in environments where the key is unavailable (such as local | |
# development or automated testing), fall back to a simple stub that | |
# generates generic responses. This avoids raising an exception at import | |
# time and allows the rest of the application to run without external | |
# dependencies. See the DummyGroq class defined below. | |
if chat_groq_api: | |
try: | |
groq_llm = ChatGroq( | |
temperature=0.7, | |
model_name="llama-3.3-70b-versatile", | |
api_key=chat_groq_api | |
) | |
except Exception as e: | |
logging.error(f"Error initializing ChatGroq LLM: {e}. Falling back to dummy model.") | |
groq_llm = None | |
else: | |
groq_llm = None | |
if groq_llm is None: | |
class DummyGroq: | |
"""A fallback language model used when no Groq API key is set. | |
The ``invoke`` method of this class returns a simple canned response | |
rather than calling an external API. This ensures that the | |
interview functionality still produces a sensible prompt, albeit | |
without advanced LLM behaviour. | |
""" | |
def invoke(self, prompt: str): | |
# Provide a very generic question based on the prompt. This | |
# implementation ignores the prompt contents entirely; in a more | |
# sophisticated fallback you could parse ``prompt`` to tailor | |
# responses. | |
return "Tell me about yourself and why you're interested in this position." | |
groq_llm = DummyGroq() | |
# Initialize Whisper model | |
# | |
# Loading the Whisper model can take several seconds on first use because the | |
# model weights must be downloaded from Hugging Face. This delay can cause | |
# the API call to ``/api/transcribe_audio`` to appear stuck while the model | |
# downloads. To mitigate this, we allow the model size to be configured via | |
# the ``WHISPER_MODEL_NAME`` environment variable and preload the model when | |
# this module is imported. Using a smaller model (e.g. "tiny" or "base.en") | |
# reduces download size and inference time considerably. | |
whisper_model = None | |
def load_whisper_model(): | |
global whisper_model | |
if whisper_model is None: | |
try: | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
compute_type = "float16" if device == "cuda" else "int8" | |
# Allow overriding the model size via environment. Default to a | |
# lightweight model to improve startup times. Available options | |
# include: tiny, base, base.en, small, medium, large. See | |
# https://huggingface.co/ggerganov/whisper.cpp for details. | |
model_name = os.getenv("WHISPER_MODEL_NAME", "tiny") | |
whisper_model = WhisperModel(model_name, device=device, compute_type=compute_type) | |
logging.info(f"Whisper model '{model_name}' loaded on {device} with {compute_type}") | |
except Exception as e: | |
logging.error(f"Error loading Whisper model: {e}") | |
# Fallback to CPU | |
whisper_model = WhisperModel(model_name if 'model_name' in locals() else "tiny", device="cpu", compute_type="int8") | |
return whisper_model | |
load_whisper_model() | |
def generate_first_question(profile, job): | |
"""Generate the first interview question based on profile and job""" | |
all_roles = extract_all_roles_from_qdrant() | |
logging.info(f"[QDRANT DEBUG] Available Roles: {all_roles}") | |
retrieved_data = retrieve_interview_data(job.role.lower(), all_roles) | |
logging.info(f"[QDRANT DEBUG] Role requested: {job.role.lower()}") | |
logging.info(f"[QDRANT DEBUG] Questions retrieved: {len(retrieved_data)}") | |
if retrieved_data: | |
logging.info(f"[QDRANT DEBUG] Sample Q: {retrieved_data[0]['question']}") | |
else: | |
logging.warning("[QDRANT DEBUG] No questions retrieved, falling back to defaults") | |
context_data = random_context_chunks(retrieved_data, k=4) if retrieved_data else "" | |
try: | |
prompt = f""" | |
You are LUNA, an AI recruiter conducting an interview for a {job.role} position at {job.company}. | |
Candidate profile: | |
- Skills: {profile.get('skills', [])} | |
- Experience: {profile.get('experience', [])} | |
- Education: {profile.get('education', [])} | |
Interview style: | |
- Start the interview in a friendly but professional way. | |
- Always begin with: "Hi, how are you? I'm LUNA, your AI recruiter." | |
- If the candidate has previous experience, reference their most recent role or company: | |
Example: "I see you previously worked at {profile.get('experience', [''])[0]}. | |
Can you tell me more about your time there, along with your education and overall background?" | |
- If no experience is available, simply ask them to tell you about their background, education, and experience. | |
Respond ONLY with the question text, no formatting or extra notes. | |
""" | |
response = groq_llm.invoke(prompt) | |
# Fix: Handle AIMessage object properly | |
if hasattr(response, 'content'): | |
question = response.content.strip() | |
elif isinstance(response, str): | |
question = response.strip() | |
else: | |
question = str(response).strip() | |
# Ensure we have a valid question | |
if not question or len(question) < 10: | |
question = "Tell me about yourself and why you're interested in this position." | |
logging.info(f"Generated question: {question}") | |
return question | |
except Exception as e: | |
logging.error(f"Error generating first question: {e}") | |
return "Tell me about yourself and why you're interested in this position." | |
def generate_next_question(profile, job, conversation_history, last_answer): | |
"""Generate the next interview question based on profile, job, and conversation so far""" | |
all_roles = extract_all_roles_from_qdrant() | |
logging.info(f"[QDRANT DEBUG] Available Roles: {all_roles}") | |
retrieved_data = retrieve_interview_data(job.role.lower(), all_roles) | |
logging.info(f"[QDRANT DEBUG] Role requested: {job.role.lower()}") | |
logging.info(f"[QDRANT DEBUG] Questions retrieved: {len(retrieved_data)}") | |
if retrieved_data: | |
logging.info(f"[QDRANT DEBUG] Sample Next Q: {retrieved_data[0]['question']}") | |
else: | |
logging.warning("[QDRANT DEBUG] No questions retrieved, falling back to defaults") | |
context_data = random_context_chunks(retrieved_data, k=4) if retrieved_data else "" | |
try: | |
prompt = f""" | |
You are LUNA, an AI recruiter continuing an interview for a {job.role} position at {job.company}. | |
Candidate profile: | |
- Skills: {profile.get('skills', [])} | |
- Experience: {profile.get('experience', [])} | |
- Education: {profile.get('education', [])} | |
Conversation so far: | |
{conversation_history} | |
Candidate's last answer: | |
{last_answer} | |
Interview style: | |
- Acknowledge the candidate's last answer naturally (e.g., "That's a great point", "I see what you mean"). | |
- Then ask a related follow-up question that connects to what they just said. | |
- Keep the tone professional, concise, and relevant to the role. | |
- If technical, dig deeper into skills or tools they mentioned. | |
- If behavioral, expand on situations or experiences they described. | |
Respond ONLY with the next question text (no formatting, no commentary). | |
""" | |
response = groq_llm.invoke(prompt) | |
if hasattr(response, 'content'): | |
question = response.content.strip() | |
elif isinstance(response, str): | |
question = response.strip() | |
else: | |
question = str(response).strip() | |
if not question or len(question) < 10: | |
question = "Could you elaborate more on your last point?" | |
logging.info(f"Generated next question: {question}") | |
return question | |
except Exception as e: | |
logging.error(f"Error generating next question: {e}") | |
return "Could you elaborate more on your last point?" | |
def edge_tts_to_file_sync(text, output_path, voice="en-US-AriaNeural"): | |
"""Synchronous wrapper for edge-tts with better error handling""" | |
try: | |
# Ensure text is not empty | |
if not text or not text.strip(): | |
logging.error("Empty text provided for TTS") | |
return None | |
# Ensure the directory exists and is writable | |
directory = os.path.dirname(output_path) | |
if not directory: | |
directory = "/tmp/audio" | |
output_path = os.path.join(directory, os.path.basename(output_path)) | |
os.makedirs(directory, exist_ok=True) | |
# Test write permissions with a temporary file | |
test_file = os.path.join(directory, f"test_{os.getpid()}.tmp") | |
try: | |
with open(test_file, 'w') as f: | |
f.write("test") | |
os.remove(test_file) | |
logging.info(f"Directory {directory} is writable") | |
except (PermissionError, OSError) as e: | |
logging.error(f"Directory {directory} is not writable: {e}") | |
# Fallback to /tmp | |
directory = "/tmp/audio" | |
output_path = os.path.join(directory, os.path.basename(output_path)) | |
os.makedirs(directory, exist_ok=True) | |
async def generate_audio(): | |
try: | |
communicate = edge_tts.Communicate(text, voice) | |
await communicate.save(output_path) | |
logging.info(f"TTS audio saved to: {output_path}") | |
except Exception as e: | |
logging.error(f"Error in async TTS generation: {e}") | |
raise | |
# Run async function in sync context | |
try: | |
loop = asyncio.get_event_loop() | |
if loop.is_running(): | |
# If loop is already running, create a new one in a thread | |
import threading | |
import concurrent.futures | |
def run_in_thread(): | |
new_loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(new_loop) | |
try: | |
new_loop.run_until_complete(generate_audio()) | |
finally: | |
new_loop.close() | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
future = executor.submit(run_in_thread) | |
future.result(timeout=30) # 30 second timeout | |
else: | |
loop.run_until_complete(generate_audio()) | |
except RuntimeError: | |
# No event loop exists | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
try: | |
loop.run_until_complete(generate_audio()) | |
finally: | |
loop.close() | |
# Verify file was created and has content | |
if os.path.exists(output_path): | |
file_size = os.path.getsize(output_path) | |
if file_size > 1000: # At least 1KB for a valid audio file | |
logging.info(f"TTS file created successfully: {output_path} ({file_size} bytes)") | |
return output_path | |
else: | |
logging.error(f"TTS file is too small: {output_path} ({file_size} bytes)") | |
return None | |
else: | |
logging.error(f"TTS file was not created: {output_path}") | |
return None | |
except Exception as e: | |
logging.error(f"Error in TTS generation: {e}") | |
return None | |
def convert_webm_to_wav(webm_path, wav_path): | |
"""Convert WebM audio to WAV using ffmpeg if available""" | |
try: | |
import subprocess | |
result = subprocess.run([ | |
'ffmpeg', '-i', webm_path, '-ar', '16000', '-ac', '1', '-y', wav_path | |
], capture_output=True, text=True, timeout=30) | |
if result.returncode == 0 and os.path.exists(wav_path) and os.path.getsize(wav_path) > 0: | |
logging.info(f"Successfully converted {webm_path} to {wav_path}") | |
return wav_path | |
else: | |
logging.error(f"FFmpeg conversion failed: {result.stderr}") | |
return None | |
except (subprocess.TimeoutExpired, FileNotFoundError, Exception) as e: | |
logging.error(f"Error converting audio: {e}") | |
return None | |
def generate_next_question(profile, job, conversation_history, last_answer): | |
"""Generate the next interview question based on profile, job, and conversation so far""" | |
all_roles = extract_all_roles_from_qdrant() | |
logging.info(f"[QDRANT DEBUG] Available Roles: {all_roles}") | |
retrieved_data = retrieve_interview_data(job.role.lower(), all_roles) | |
logging.info(f"[QDRANT DEBUG] Role requested: {job.role.lower()}") | |
logging.info(f"[QDRANT DEBUG] Questions retrieved: {len(retrieved_data)}") | |
if retrieved_data: | |
logging.info(f"[QDRANT DEBUG] Sample Next Q: {retrieved_data[0]['question']}") | |
else: | |
logging.warning("[QDRANT DEBUG] No questions retrieved, falling back to defaults") | |
context_data = random_context_chunks(retrieved_data, k=4) if retrieved_data else "" | |
try: | |
prompt = f""" | |
You are continuing an interview for a {job.role} position at {job.company}. | |
Candidate's profile: | |
- Skills: {profile.get('skills', [])} | |
- Experience: {profile.get('experience', [])} | |
- Education: {profile.get('education', [])} | |
Conversation so far: | |
{conversation_history} | |
Candidate's last answer: | |
{last_answer} | |
Use the following context to generate the next question: | |
{context_data} | |
Generate an appropriate follow-up interview question that is professional and relevant. | |
Keep it concise and clear. If the interview is for a technical role, focus on technical skills. | |
""" | |
response = groq_llm.invoke(prompt) | |
if hasattr(response, 'content'): | |
question = response.content.strip() | |
elif isinstance(response, str): | |
question = response.strip() | |
else: | |
question = str(response).strip() | |
if not question or len(question) < 10: | |
question = "Could you elaborate more on your last point?" | |
logging.info(f"Generated next question: {question}") | |
return question | |
except Exception as e: | |
logging.error(f"Error generating next question: {e}") | |
return "Could you elaborate more on your last point?" | |
import subprocess # top of the file if not already imported | |
def whisper_stt(audio_path): | |
"""Speech-to-text using Faster-Whisper""" | |
try: | |
if not os.path.exists(audio_path) or os.path.getsize(audio_path) == 0: | |
logging.error(f"Audio file is empty or missing: {audio_path}") | |
return "" | |
# Convert webm to wav using ffmpeg | |
wav_path = audio_path.replace(".webm", ".wav") | |
cmd = [ | |
"ffmpeg", | |
"-y", # overwrite | |
"-i", audio_path, | |
"-ar", "16000", | |
"-ac", "1", | |
"-f", "wav", | |
wav_path | |
] | |
subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
if not os.path.exists(wav_path) or os.path.getsize(wav_path) == 0: | |
logging.error(f"FFmpeg conversion failed or produced empty file: {wav_path}") | |
return "" | |
model = load_whisper_model() | |
segments, _ = model.transcribe(wav_path) | |
transcript = " ".join(segment.text for segment in segments) | |
return transcript.strip() | |
except Exception as e: | |
logging.error(f"Error in STT: {e}") | |
return "" | |
def evaluate_answer(question, answer, job_role="Software Developer", seniority="Mid-level"): | |
"""Evaluate candidate's answer with better error handling""" | |
try: | |
if not answer or not answer.strip(): | |
return { | |
"score": "Poor", | |
"feedback": "No answer provided." | |
} | |
prompt = f""" | |
You are evaluating a candidate's answer for a {seniority} {job_role} position. | |
Question: {question} | |
Candidate Answer: {answer} | |
Evaluate based on technical correctness, clarity, and relevance. | |
Provide a brief evaluation in 1-2 sentences. | |
Rate the answer as one of: Poor, Medium, Good, Excellent | |
Respond in this exact format: | |
Score: [Poor/Medium/Good/Excellent] | |
Feedback: [Your brief feedback here] | |
""" | |
response = groq_llm.invoke(prompt) | |
# Handle AIMessage object properly | |
if hasattr(response, 'content'): | |
response_text = response.content.strip() | |
elif isinstance(response, str): | |
response_text = response.strip() | |
else: | |
response_text = str(response).strip() | |
# Parse the response | |
lines = response_text.split('\n') | |
score = "Medium" # default | |
feedback = "Good answer, but could be more detailed." # default | |
for line in lines: | |
line = line.strip() | |
if line.startswith('Score:'): | |
score = line.replace('Score:', '').strip() | |
elif line.startswith('Feedback:'): | |
feedback = line.replace('Feedback:', '').strip() | |
# Ensure score is valid | |
valid_scores = ["Poor", "Medium", "Good", "Excellent"] | |
if score not in valid_scores: | |
score = "Medium" | |
return { | |
"score": score, | |
"feedback": feedback | |
} | |
except Exception as e: | |
logging.error(f"Error evaluating answer: {e}") | |
return { | |
"score": "Medium", | |
"feedback": "Unable to evaluate answer at this time." | |
} |