husseinelsaadi's picture
Update app.py
fa6324d verified
raw
history blame
80.6 kB
import requests
import os
import json
from langchain_groq import ChatGroq
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import Qdrant
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import CohereRerank
from qdrant_client import QdrantClient
import cohere
import json
import re
import time
from collections import defaultdict
from qdrant_client.http import models
from qdrant_client.models import PointStruct
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.neighbors import NearestNeighbors
from transformers import AutoTokenizer
#from langchain_huggingface import HuggingFaceEndpoint
from langchain_community.embeddings import HuggingFaceEmbeddings
import numpy as np
import os
from dotenv import load_dotenv
from enum import Enum
import time
from inputimeout import inputimeout, TimeoutOccurred
# Import Qdrant client and models (adjust based on your environment)
from qdrant_client import QdrantClient
from qdrant_client.http.models import VectorParams, Distance, Filter, FieldCondition, MatchValue
from qdrant_client.http.models import PointStruct, Filter, FieldCondition, MatchValue, SearchRequest
import traceback
from transformers import pipeline
from textwrap import dedent
import json
import logging
from transformers import pipeline,BitsAndBytesConfig
import os
cohere_api_key = os.getenv("COHERE_API_KEY")
chat_groq_api = os.getenv("GROQ_API_KEY")
hf_api_key = os.getenv("HF_API_KEY")
qdrant_api = os.getenv("QDRANT_API_KEY")
qdrant_url = os.getenv("QDRANT_API_URL")
print("GROQ API Key:", chat_groq_api)
print("QDRANT API Key:", qdrant_api)
print("QDRANT API URL:", qdrant_url)
print("Cohere API Key:", cohere_api_key)
from qdrant_client import QdrantClient
qdrant_client = QdrantClient(
url="https://313b1ceb-057f-4b7b-89f5-7b19a213fe65.us-east-1-0.aws.cloud.qdrant.io:6333",
api_key="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2Nlc3MiOiJtIn0.w13SPZbljbSvt9Ch_0r034QhMFlmEr4ctXqLo2zhxm4",
)
print(qdrant_client.get_collections())
class CustomChatGroq:
def __init__(self, temperature, model_name, api_key):
self.temperature = temperature
self.model_name = model_name
self.api_key = api_key
self.api_url = "https://api.groq.com/openai/v1/chat/completions"
def predict(self, prompt):
"""Send a request to the Groq API and return the generated response."""
try:
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": self.model_name,
"messages": [{"role": "system", "content": "You are an AI interviewer."},
{"role": "user", "content": prompt}],
"temperature": self.temperature,
"max_tokens": 150
}
response = requests.post(self.api_url, headers=headers, json=payload, timeout=10)
response.raise_for_status() # Raise an error for HTTP codes 4xx/5xx
data = response.json()
# Extract response text based on Groq API response format
if "choices" in data and len(data["choices"]) > 0:
return data["choices"][0]["message"]["content"].strip()
logging.warning("Unexpected response structure from Groq API")
return "Interviewer: Could you tell me more about your relevant experience?"
except requests.exceptions.RequestException as e:
logging.error(f"ChatGroq API error: {e}")
return "Interviewer: Due to a system issue, let's move on to another question."
groq_llm = ChatGroq(
temperature=0.7,
model_name="llama-3.3-70b-versatile",
api_key=chat_groq_api
)
from huggingface_hub import login
import os
HF_TOKEN = os.getenv("HF_TOKEN")
if HF_TOKEN:
login(HF_TOKEN)
else:
raise EnvironmentError("Missing HF_TOKEN environment variable.")
#Load mistral Model
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import torch
print(torch.cuda.is_available())
MODEL_PATH = "mistralai/Mistral-7B-Instruct-v0.3"
#MODEL_PATH = "tiiuae/falcon-rw-1b"
bnb_config = BitsAndBytesConfig(
load_in_8bit=True,
)
mistral_tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH,token=hf_api_key)
judge_llm = AutoModelForCausalLM.from_pretrained(
MODEL_PATH,
quantization_config=bnb_config,torch_dtype=torch.float16,
device_map="auto",
token=hf_api_key
)
judge_llm.config.pad_token_id = judge_llm.config.eos_token_id
print(judge_llm.hf_device_map)
judge_pipeline = pipeline(
"text-generation",
model=judge_llm,
tokenizer=mistral_tokenizer,
max_new_tokens=128,
temperature=0.3,
top_p=0.9,
do_sample=True, # Optional but recommended with temperature/top_p
repetition_penalty=1.1,
)
output = judge_pipeline("Q: What is Python?\nA:", max_new_tokens=128)[0]['generated_text']
print(output)
# embedding model
from sentence_transformers import SentenceTransformer
class LocalEmbeddings:
def __init__(self, model_name="all-MiniLM-L6-v2"):
self.model = SentenceTransformer(model_name)
def embed_query(self, text):
return self.model.encode(text).tolist()
def embed_documents(self, documents):
return self.model.encode(documents).tolist()
embeddings = LocalEmbeddings()
# import cohere
qdrant_client = QdrantClient(url=qdrant_url, api_key=qdrant_api,check_compatibility=False)
co = cohere.Client(api_key=cohere_api_key)
class EvaluationScore(str, Enum):
POOR = "Poor"
MEDIUM = "Medium"
GOOD = "Good"
EXCELLENT = "Excellent"
# Cohere Reranker
class CohereReranker:
def __init__(self, client):
self.client = client
def compress_documents(self, documents, query):
if not documents:
return []
doc_texts = [doc.page_content for doc in documents]
try:
reranked = self.client.rerank(
query=query,
documents=doc_texts,
model="rerank-english-v2.0",
top_n=5
)
return [documents[result.index] for result in reranked.results]
except Exception as e:
logging.error(f"Error in CohereReranker.compress_documents: {e}")
return documents[:5]
reranker = CohereReranker(co)
def load_data_from_json(file_path):
"""Load interview Q&A data from a JSON file."""
try:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
job_role_buckets = defaultdict(list)
for idx, item in enumerate(data):
try:
job_role = item["Job Role"].lower().strip()
question = item["Questions"].strip()
answer = item["Answers"].strip()
job_role_buckets[job_role].append({"question": question, "answer": answer})
except KeyError as e:
logging.warning(f"Skipping item {idx}: missing key {e}")
return job_role_buckets # <--- You missed this!
except Exception as e:
logging.error(f"Error loading data: {e}")
raise
def verify_qdrant_collection(collection_name='interview_questions'):
"""Verify if a Qdrant collection exists with the correct configuration."""
try:
collection_info = qdrant_client.get_collection(collection_name)
vector_size = collection_info.config.params.vectors.size
logging.info(f"Collection '{collection_name}' exists with vector size: {vector_size}")
return True
except Exception as e:
logging.warning(f"Collection '{collection_name}' not found: {e}")
return False
def store_data_to_qdrant(data, collection_name='interview_questions', batch_size=100):
"""Store interview data in the Qdrant vector database."""
try:
# Check if collection exists, otherwise create it
if not verify_qdrant_collection(collection_name):
try:
qdrant_client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=384, distance=Distance.COSINE)
)
logging.info(f"Created collection '{collection_name}'")
except Exception as e:
logging.error(f"Error creating collection: {e}\n{traceback.format_exc()}")
return False
points = []
point_id = 0
total_points = sum(len(qa_list) for qa_list in data.values())
processed = 0
for job_role, qa_list in data.items():
for entry in qa_list:
try:
emb = embeddings.embed_query(entry["question"])
print(f"Embedding shape: {len(emb)}")
if not emb or len(emb) != 384:
logging.warning(f"Skipping point {point_id} due to invalid embedding length: {len(emb)}")
continue
points.append(PointStruct(
id=point_id,
vector=emb,
payload={
"job_role": job_role,
"question": entry["question"],
"answer": entry["answer"]
}
))
point_id += 1
processed += 1
# Batch upload
if len(points) >= batch_size:
try:
qdrant_client.upsert(collection_name=collection_name, points=points)
logging.info(f"Stored {processed}/{total_points} points ({processed/total_points*100:.1f}%)")
except Exception as upsert_err:
logging.error(f"Error during upsert: {upsert_err}\n{traceback.format_exc()}")
points = []
except Exception as embed_err:
logging.error(f"Embedding error for point {point_id}: {embed_err}\n{traceback.format_exc()}")
# Final batch upload
if points:
try:
qdrant_client.upsert(collection_name=collection_name, points=points)
logging.info(f"Stored final batch of {len(points)} points")
except Exception as final_upsert_err:
logging.error(f"Error during final upsert: {final_upsert_err}\n{traceback.format_exc()}")
# Final verification
try:
count = qdrant_client.count(collection_name=collection_name, exact=True).count
print("Current count:", count)
logging.info(f"✅ Successfully stored {count} points in Qdrant")
if count != total_points:
logging.warning(f"Expected {total_points} points but stored {count}")
except Exception as count_err:
logging.error(f"Error verifying stored points: {count_err}\n{traceback.format_exc()}")
return True
except Exception as e:
logging.error(f"Error storing data to Qdrant: {e}\n{traceback.format_exc()}")
return False
# to ensure cosine similarity use
info = qdrant_client.get_collection("interview_questions")
print(info.config.params.vectors.distance)
def extract_all_roles_from_qdrant(collection_name='interview_questions'):
""" Extract all unique job roles from the Qdrant vector store """
try:
all_roles = set()
scroll_offset = None
while True:
response = qdrant_client.scroll(
collection_name=collection_name,
limit=200,
offset=scroll_offset,
with_payload=True
)
points, next_page_offset = response
if not points:
break
for point in points:
role = point.payload.get("job_role", "").strip().lower()
if role:
all_roles.add(role)
if not next_page_offset:
break
scroll_offset = next_page_offset
if not all_roles:
logging.warning("[Qdrant] No roles found in payloads.")
else:
logging.info(f"[Qdrant] Extracted {len(all_roles)} unique job roles.")
return list(all_roles)
except Exception as e:
logging.error(f"Error extracting roles from Qdrant: {e}")
return []
import numpy as np
import logging
from sklearn.metrics.pairwise import cosine_similarity
def find_similar_roles(user_role, all_roles, top_k=3):
"""
Find the most similar job roles to the given user_role using embeddings.
"""
try:
# Clean inputs
user_role = user_role.strip().lower()
if not user_role or not all_roles or not isinstance(all_roles, list):
logging.warning("Invalid input for role similarity")
return []
# Embed user role
try:
user_embedding = embeddings.embed_query(user_role)
if user_embedding is None:
logging.error("User embedding is None")
return []
except Exception as e:
logging.error(f"Error embedding user role: {type(e).__name__}: {e}")
return []
# Embed all roles
try:
role_embeddings = []
valid_roles = []
for role in all_roles:
emb = embeddings.embed_query(role.lower())
if emb is not None:
role_embeddings.append(emb)
valid_roles.append(role)
else:
logging.warning(f"Skipping role with no embedding: {role}")
except Exception as e:
logging.error(f"Error embedding all roles: {type(e).__name__}: {e}")
return []
if not role_embeddings:
logging.error("All role embeddings failed")
return []
# Compute similarities
similarities = cosine_similarity([user_embedding], role_embeddings)[0]
top_indices = np.argsort(similarities)[::-1][:top_k]
similar_roles = [valid_roles[i] for i in top_indices]
logging.debug(f"Similar roles to '{user_role}': {similar_roles}")
return similar_roles
except Exception as e:
logging.error(f"Error finding similar roles: {type(e).__name__}: {e}", exc_info=True)
return []
# RETREIVE ALL DATA RELATED TO THE JOB ROLE NOT JUST TOP_K
def get_role_questions(job_role):
try:
if not job_role:
logging.warning("Job role is empty.")
return []
filter_by_role = Filter(
must=[FieldCondition(
key="job_role",
match=MatchValue(value=job_role.lower())
)]
)
all_results = []
offset = None
while True:
results, next_page_offset = qdrant_client.scroll(
collection_name="interview_questions",
scroll_filter=filter_by_role,
with_payload=True,
with_vectors=False,
limit=100, # batch size
offset=offset
)
all_results.extend(results)
if not next_page_offset:
break
offset = next_page_offset
parsed_results = [{
"question": r.payload.get("question"),
"answer": r.payload.get("answer"),
"job_role": r.payload.get("job_role")
} for r in all_results]
return parsed_results
except Exception as e:
logging.error(f"Error fetching role questions: {type(e).__name__}: {e}", exc_info=True)
return []
def retrieve_interview_data(job_role, all_roles):
"""
Retrieve all interview Q&A for a given job role.
Falls back to similar roles if no data found.
Args:
job_role (str): Input job role (can be misspelled)
all_roles (list): Full list of available job roles
Returns:
list: List of QA dicts with keys: 'question', 'answer', 'job_role'
"""
import logging
logging.basicConfig(level=logging.INFO)
job_role = job_role.strip().lower()
seen_questions = set()
final_results = []
# Step 1: Try exact match (fetch all questions for role)
logging.info(f"Trying to fetch all data for exact role: '{job_role}'")
exact_matches = get_role_questions(job_role)
for qa in exact_matches:
question = qa["question"]
if question and question not in seen_questions:
seen_questions.add(question)
final_results.append(qa)
if final_results:
logging.info(f"Found {len(final_results)} QA pairs for exact role '{job_role}'")
return final_results
logging.warning(f"No data found for role '{job_role}'. Trying similar roles...")
# Step 2: No matches — find similar roles
similar_roles = find_similar_roles(job_role, all_roles, top_k=3)
if not similar_roles:
logging.warning("No similar roles found.")
return []
logging.info(f"Found similar roles: {similar_roles}")
# Step 3: Retrieve data for each similar role (all questions)
for role in similar_roles:
logging.info(f"Fetching data for similar role: '{role}'")
role_qa = get_role_questions(role)
for qa in role_qa:
question = qa["question"]
if question and question not in seen_questions:
seen_questions.add(question)
final_results.append(qa)
logging.info(f"Retrieved total {len(final_results)} QA pairs from similar roles")
return final_results
import random
def random_context_chunks(retrieved_data, k=3):
chunks = random.sample(retrieved_data, k)
return "\n\n".join([f"Q: {item['question']}\nA: {item['answer']}" for item in chunks])
import json
import logging
import re
from typing import Dict
def eval_question_quality(
question: str,
job_role: str,
seniority: str,
judge_pipeline=None,
max_retries=1 # Allow at least 1 retry on parse fail
) -> Dict[str, str]:
import time
try:
# Use provided pipeline or fall back to global
if judge_pipeline is None:
judge_pipeline = globals().get("judge_pipeline")
if not judge_pipeline:
return {
"Score": "Error",
"Reasoning": "Judge pipeline not available",
"Improvements": "Please provide a valid language model pipeline"
}
prompt = f"""
... (same as your prompt) ...
Now evaluate this question:
\"{question}\"
"""
for attempt in range(max_retries + 1):
response = judge_pipeline(
prompt,
max_new_tokens=512,
do_sample=False,
temperature=0.1,
repetition_penalty=1.2
)[0]["generated_text"]
try:
# Fallback to last {...} block
match = re.search(r'\{.*\}', response, re.DOTALL)
if not match:
raise ValueError("Could not locate JSON structure in model output.")
json_str = match.group(0)
result = json.loads(json_str)
# Validate required fields and values
required_keys = ["Score", "Reasoning", "Improvements"]
valid_scores = {"Poor", "Medium", "Good", "Excellent"}
if not all(k in result for k in required_keys):
raise ValueError("Missing required fields.")
if result["Score"] not in valid_scores:
raise ValueError("Invalid score value.")
return result
except Exception as e:
logging.warning(f"Attempt {attempt+1} JSON parsing failed: {e}")
time.sleep(0.2) # Small delay before retry
# If all attempts fail, return a default valid dict
return {
"Score": "Poor",
"Reasoning": "The evaluation model failed to produce a valid score, so defaulted to 'Poor'. Check model output and prompt formatting.",
"Improvements": [
"Ensure the question is clear and role-relevant.",
"Double-check prompt and formatting.",
"Try rephrasing the question to match rubric."
]
}
except Exception as e:
logging.error(f"Error in eval_question_quality: {type(e).__name__}: {e}", exc_info=True)
return {
"Score": "Poor",
"Reasoning": f"Critical error occurred: {str(e)}. Defaulted to 'Poor'.",
"Improvements": [
"Retry with a different question.",
"Check your judge pipeline connection.",
"Contact support if this persists."
]
}
def evaluate_answer(
question: str,
answer: str,
ref_answer: str,
job_role: str,
seniority: str,
judge_pipeline=None,
max_retries=1
) -> Dict[str, str]:
"""
Evaluates a candidate's answer to an interview question and returns a structured judgment.
Guarantees a valid, actionable result even if the model fails.
"""
import time
try:
if judge_pipeline is None:
judge_pipeline = globals().get("judge_pipeline")
if not judge_pipeline:
return {
"Score": "Error",
"Reasoning": "Judge pipeline not available",
"Improvements": [
"Please provide a valid language model pipeline"
]
}
# Enhanced prompt (your version)
prompt = f"""
You are an expert technical interviewer evaluating a candidate's response for a {job_role} position at the {seniority} level.
You are provided with:
- The question asked
- The candidate's response
- A reference answer that represents a high-quality expected answer
Evaluate the candidate's response based on:
- Technical correctness
- Clarity and depth of explanation
- Relevance to the job role and seniority
- Completeness and structure
Be objective, concise, and use professional language. Be fair but critical.
--------------------------
Question:
{question}
Candidate Answer:
{answer}
Reference Answer:
{ref_answer}
--------------------------
Now return your evaluation as a valid JSON object using exactly these keys:
- "Score": One of ["Poor", "Medium", "Good", "Excellent"]
- "Reasoning": 2-3 sentence explanation justifying the score, covering clarity, accuracy, completeness, or relevance
- "Improvements": A list of 2-3 specific and constructive suggestions to help the candidate improve this answer
Example:
{{
"Score": "Good",
"Reasoning": "The answer demonstrates a good understanding of the concept and touches on key ideas, but lacks depth in explaining the trade-offs between techniques.",
"Improvements": [
"Explain when this method might fail or produce biased results",
"Include examples or metrics to support the explanation",
"Clarify the specific business impact or outcome achieved"
]
}}
Respond only with the JSON:
"""
for attempt in range(max_retries + 1):
output = judge_pipeline(
prompt,
max_new_tokens=512,
temperature=0.3,
do_sample=False
)[0]["generated_text"]
# Try to extract JSON response from output robustly
try:
start_idx = output.rfind("{")
end_idx = output.rfind("}") + 1
if start_idx != -1 and end_idx != -1 and end_idx > start_idx:
json_str = output[start_idx:end_idx]
result = json.loads(json_str)
valid_scores = {"Poor", "Medium", "Good", "Excellent"}
if result.get("Score") in valid_scores:
return {
"Score": result["Score"],
"Reasoning": result.get("Reasoning", "No explanation provided."),
"Improvements": result.get("Improvements", ["No improvement suggestions provided."])
}
else:
raise ValueError(f"Invalid Score value: {result.get('Score')}")
else:
raise ValueError("JSON format not found in output")
except Exception as e:
logging.warning(f"evaluate_answer: Attempt {attempt+1} failed to parse model output: {e}")
time.sleep(0.2) # Small wait before retry
# Fallback: always return a default 'Poor' score if all attempts fail
return {
"Score": "Poor",
"Reasoning": "The evaluation model failed to produce a valid score or parse output; defaulted to 'Poor'. Please check model output and prompt formatting.",
"Improvements": [
"Be more specific and detailed in the answer.",
"Structure your response with clear points.",
"Relate your answer more closely to the job role and question."
]
}
except Exception as e:
logging.error(f"Evaluation failed: {e}", exc_info=True)
return {
"Score": "Poor",
"Reasoning": f"Critical error occurred: {str(e)}. Defaulted to 'Poor'.",
"Improvements": [
"Try again with a different answer.",
"Check your judge pipeline connection.",
"Contact support if the error persists."
]
}
# SAME BUT USING LLAMA 3.3 FROM GROQ
def generate_reference_answer(question, job_role, seniority):
"""
Generates a high-quality reference answer using Groq-hosted LLaMA model.
Args:
question (str): Interview question to answer.
job_role (str): Target job role (e.g., "Frontend Developer").
seniority (str): Experience level (e.g., "Mid-Level").
Returns:
str: Clean, generated reference answer or error message.
"""
try:
# Clean, role-specific prompt
prompt = f"""You are a {seniority} {job_role}.
Q: {question}
A:"""
# Use Groq-hosted model to generate the answer
ref_answer = groq_llm.predict(prompt)
if not ref_answer.strip():
return "Reference answer not generated."
return ref_answer.strip()
except Exception as e:
logging.error(f"Error generating reference answer: {e}", exc_info=True)
return "Unable to generate reference answer due to an error"
def build_interview_prompt(conversation_history, user_response, context, job_role, skills, seniority,
difficulty_adjustment=None, voice_label=None, face_label=None, effective_confidence=None):
"""Build a prompt for generating the next interview question with adaptive difficulty and fairness logic."""
interview_template = """
You are an AI interviewer conducting a real-time interview for a {job_role} position.
Your objective is to thoroughly evaluate the candidate's suitability for the role using smart, structured, and adaptive questioning.
---
Interview Rules and Principles:
- The **baseline difficulty** of questions must match the candidate’s seniority level (e.g., junior, mid-level, senior).
- Use your judgment to increase difficulty **slightly** if the candidate performs well, or simplify if they struggle — but never drop below the expected baseline for their level.
- Avoid asking extremely difficult questions to junior candidates unless they’ve clearly demonstrated advanced knowledge.
- Be fair: candidates for the same role should be evaluated within a consistent difficulty range.
- Adapt your line of questioning gradually and logically based on the **overall flow**, not just the last answer.
- Include real-world problem-solving scenarios to test how the candidate thinks and behaves practically.
- You must **lead** the interview and make intelligent decisions about what to ask next.
---
Context Use:
{context_instruction}
Note:
If no relevant context was retrieved or the previous answer is unclear, you must still generate a thoughtful interview question using your own knowledge. Do not skip generation. Avoid default or fallback responses — always try to generate a meaningful and fair next question.
---
Job Role: {job_role}
Seniority Level: {seniority}
Skills Focus: {skills}
Difficulty Setting: {difficulty} (based on {difficulty_adjustment})
---
Recent Conversation History:
{history}
Candidate's Last Response:
"{user_response}"
Evaluation of Last Response:
{response_evaluation}
Voice Tone: {voice_label}
---
---
Important:
If no relevant context was retrieved or the previous answer is unclear or off-topic,
you must still generate a meaningful and fair interview question using your own knowledge and best practices.
Do not skip question generation or fall back to default/filler responses.
---
Guidelines for Next Question:
- If this is the beginning of the interview, start with a question about the candidate’s background or experience.
- Base the difficulty primarily on the seniority level, with light adjustment from recent performance.
- Focus on core skills, real-world applications, and depth of reasoning.
- Ask only one question. Be clear and concise.
Generate the next interview question now:
"""
# Calculate difficulty phrase
if difficulty_adjustment == "harder":
difficulty = f"slightly more challenging than typical for {seniority}"
elif difficulty_adjustment == "easier":
difficulty = f"slightly easier than typical for {seniority}"
else:
difficulty = f"appropriate for {seniority}"
# Choose context logic
if context.strip():
context_instruction = (
"Use both your own expertise and the provided context from relevant interview datasets. "
"You can either build on questions from the dataset or generate your own."
)
context = context.strip()
else:
context_instruction = (
"No specific context retrieved. Use your own knowledge and best practices to craft a question."
)
context = "" # Let it be actually empty!
# Format conversation history (last 6 exchanges max)
recent_history = conversation_history[-6:] if len(conversation_history) > 6 else conversation_history
formatted_history = "\n".join([f"{msg['role'].capitalize()}: {msg['content']}" for msg in recent_history])
# Add evaluation summary if available
if conversation_history and conversation_history[-1].get("evaluation"):
eval_data = conversation_history[-1]["evaluation"][-1]
response_evaluation = f"""
- Score: {eval_data.get('Score', 'N/A')}
- Reasoning: {eval_data.get('Reasoning', 'N/A')}
- Improvements: {eval_data.get('Improvements', 'N/A')}
"""
else:
response_evaluation = "No evaluation available yet."
# Fill the template
prompt = interview_template.format(
job_role=job_role,
seniority=seniority,
skills=skills,
difficulty=difficulty,
difficulty_adjustment=difficulty_adjustment if difficulty_adjustment else "default seniority",
context_instruction=context_instruction,
context=context,
history=formatted_history,
user_response=user_response,
response_evaluation=response_evaluation.strip(),
voice_label=voice_label or "unknown",
)
return prompt
def generate_llm_interview_report(
interview_state, logged_samples, job_role, seniority
):
from collections import Counter
# Helper for converting score to 1–5
def score_label(label):
mapping = {
"confident": 5, "calm": 4, "neutral": 3, "nervous": 2, "anxious": 1, "unknown": 3
}
return mapping.get(label.lower(), 3)
def section_score(vals):
return round(sum(vals)/len(vals), 2) if vals else "N/A"
# Aggregate info
scores, voice_conf, face_conf, comm_scores = [], [], [], []
tech_details, comm_details, emotion_details, relevance_details, problem_details = [], [], [], [], []
for entry in logged_samples:
answer_eval = entry.get("answer_evaluation", {})
score = answer_eval.get("Score", "Not Evaluated")
reasoning = answer_eval.get("Reasoning", "")
if score.lower() in ["excellent", "good", "medium", "poor"]:
score_map = {"excellent": 5, "good": 4, "medium": 3, "poor": 2}
scores.append(score_map[score.lower()])
# Section details
tech_details.append(reasoning)
comm_details.append(reasoning)
# Emotions/confidence
voice_conf.append(score_label(entry.get("voice_label", "unknown")))
face_conf.append(score_label(entry.get("face_label", "unknown")))
# Communication estimate
if entry["user_answer"]:
length = len(entry["user_answer"].split())
comm_score = min(5, max(2, length // 30))
comm_scores.append(comm_score)
# Compute averages for sections
avg_problem = section_score(scores)
avg_tech = section_score(scores)
avg_comm = section_score(comm_scores)
avg_emotion = section_score([(v+f)/2 for v, f in zip(voice_conf, face_conf)])
# Compute decision heuristics
section_averages = [avg_problem, avg_tech, avg_comm, avg_emotion]
numeric_avgs = [v for v in section_averages if isinstance(v, (float, int))]
avg_overall = round(sum(numeric_avgs) / len(numeric_avgs), 2) if numeric_avgs else 0
# Hiring logic (you can customize thresholds)
if avg_overall >= 4.5:
verdict = "Strong Hire"
elif avg_overall >= 4.0:
verdict = "Hire"
elif avg_overall >= 3.0:
verdict = "Conditional Hire"
else:
verdict = "No Hire"
# Build LLM report prompt
transcript = "\n\n".join([
f"Q: {e['generated_question']}\nA: {e['user_answer']}\nScore: {e.get('answer_evaluation',{}).get('Score','')}\nReasoning: {e.get('answer_evaluation',{}).get('Reasoning','')}"
for e in logged_samples
])
prompt = f"""
You are a senior technical interviewer at a major tech company.
Write a structured, realistic hiring report for this {seniority} {job_role} interview, using these section scores (scale 1–5, with 5 best):
Section-wise Evaluation
1. *Problem Solving & Critical Thinking*: {avg_problem}
2. *Technical Depth & Knowledge*: {avg_tech}
3. *Communication & Clarity*: {avg_comm}
4. *Emotional Composure & Confidence*: {avg_emotion}
5. *Role Relevance*: 5
*Transcript*
{transcript}
Your report should have the following sections:
1. *Executive Summary* (realistic, hiring-committee style)
2. *Section-wise Comments* (for each numbered category above, with short paragraph citing specifics)
3. *Strengths & Weaknesses* (list at least 2 for each)
4. *Final Verdict*: {verdict}
5. *Recommendations* (2–3 for future improvement)
Use realistic language. If some sections are N/A or lower than others, comment honestly.
Interview Report:
"""
# LLM call, or just return prompt for review
return groq_llm.predict(prompt)
def get_user_info():
"""
Collects essential information from the candidate before starting the interview.
Returns a dictionary with keys: name, job_role, seniority, skills
"""
import logging
logging.info("Collecting user information...")
print("Welcome to the AI Interview Simulator!")
print("Let’s set up your mock interview.\n")
# Get user name
name = input("What is your name? ").strip()
while not name:
print("Please enter your name.")
name = input("What is your name? ").strip()
# Get job role
job_role = input(f"Hi {name}, what job role are you preparing for? (e.g. Frontend Developer) ").strip()
while not job_role:
print("Please specify the job role.")
job_role = input("What job role are you preparing for? ").strip()
# Get seniority level
seniority_options = ["Entry-level", "Junior", "Mid-Level", "Senior", "Lead"]
print("\nSelect your experience level:")
for i, option in enumerate(seniority_options, 1):
print(f"{i}. {option}")
seniority_choice = None
while seniority_choice not in range(1, len(seniority_options)+1):
try:
seniority_choice = int(input("Enter the number corresponding to your level: "))
except ValueError:
print(f"Please enter a number between 1 and {len(seniority_options)}")
seniority = seniority_options[seniority_choice - 1]
# Get skills
skills_input = input(f"\nWhat are your top skills relevant to {job_role}? (Separate with commas): ")
skills = [skill.strip() for skill in skills_input.split(",") if skill.strip()]
while not skills:
print("Please enter at least one skill.")
skills_input = input("Your top skills (comma-separated): ")
skills = [skill.strip() for skill in skills_input.split(",") if skill.strip()]
# Confirm collected info
print("\n Interview Setup Complete!")
print(f"Name: {name}")
print(f"Job Role: {job_role}")
print(f"Experience Level: {seniority}")
print(f"Skills: {', '.join(skills)}")
print("\nStarting your mock interview...\n")
return {
"name": name,
"job_role": job_role,
"seniority": seniority,
"skills": skills
}
import threading
def wait_for_user_response(timeout=200):
"""Wait for user input with timeout. Returns '' if no response."""
user_input = []
def get_input():
answer = input("Your Answer (within timeout): ").strip()
user_input.append(answer)
thread = threading.Thread(target=get_input)
thread.start()
thread.join(timeout)
return user_input[0] if user_input else ""
import json
from datetime import datetime
from time import time
import random
def interview_loop(max_questions, timeout_seconds=300, collection_name="interview_questions", judge_pipeline=None, save_path="interview_log.json"):
user_info = get_user_info()
job_role = user_info['job_role']
seniority = user_info['seniority']
skills = user_info['skills']
all_roles = extract_all_roles_from_qdrant(collection_name=collection_name)
retrieved_data = retrieve_interview_data(job_role, all_roles)
context_data = random_context_chunks(retrieved_data, k=4)
conversation_history = []
interview_state = {
"questions": [],
"user_answer": [],
"job_role": job_role,
"seniority": seniority,
"start_time": time()
}
# Store log for evaluation
logged_samples = []
difficulty_adjustment = None
for i in range(max_questions):
last_user_response = conversation_history[-1]['content'] if conversation_history else ""
# Generate question prompt
prompt = build_interview_prompt(
conversation_history=conversation_history,
user_response=last_user_response,
context=context_data,
job_role=job_role,
skills=skills,
seniority=seniority,
difficulty_adjustment=difficulty_adjustment
)
question = groq_llm.predict(prompt)
question_eval = eval_question_quality(question, job_role, seniority, judge_pipeline)
conversation_history.append({'role': "Interviewer", "content": question})
print(f"Interviewer: Q{i + 1} : {question}")
# Wait for user answer
start_time = time()
user_answer = wait_for_user_response(timeout=timeout_seconds)
response_time = time() - start_time
skipped = False
answer_eval = None
ref_answer = None
if not user_answer:
print("No Response Received, moving to next question.")
user_answer = None
skipped = True
difficulty_adjustment = "medium"
else:
conversation_history.append({"role": "Candidate", "content": user_answer})
ref_answer = generate_reference_answer(question, job_role, seniority)
answer_eval = evaluate_answer(
question=question,
answer=user_answer,
ref_answer=ref_answer,
job_role=job_role,
seniority=seniority,
judge_pipeline=judge_pipeline
)
interview_state["user_answer"].append(user_answer)
# Append inline evaluation for history
conversation_history[-1].setdefault('evaluation', []).append({
"technical_depth": {
"score": answer_eval['Score'],
"Reasoning": answer_eval['Reasoning']
}
})
# Adjust difficulty
score = answer_eval['Score'].lower()
if score == "excellent":
difficulty_adjustment = "harder"
elif score in ['poor', 'medium']:
difficulty_adjustment = "easier"
else:
difficulty_adjustment = None
# Store for local logging
logged_samples.append({
"job_role": job_role,
"seniority": seniority,
"skills": skills,
"context": context_data,
"prompt": prompt,
"generated_question": question,
"question_evaluation": question_eval,
"user_answer": user_answer,
"reference_answer": ref_answer,
"answer_evaluation": answer_eval,
"skipped": skipped
})
# Store state
interview_state['questions'].append({
"question": question,
"question_evaluation": question_eval,
"user_answer": user_answer,
"answer_evaluation": answer_eval,
"skipped": skipped
})
interview_state['end_time'] = time()
report = generate_llm_interview_report(interview_state, job_role, seniority)
print("Report : _____________________\n")
print(report)
print('______________________________________________')
# Save full interview logs to JSON
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{save_path.replace('.json', '')}_{timestamp}.json"
with open(filename, "w", encoding="utf-8") as f:
json.dump(logged_samples, f, indent=2, ensure_ascii=False)
print(f" Interview log saved to {filename}")
print("____________________________________\n")
print(f"interview state : {interview_state}")
return interview_state, report
from sklearn.metrics import precision_score, recall_score, f1_score
import numpy as np
# build ground truth for retrieving data for testing
def build_ground_truth(all_roles):
gt = {}
for role in all_roles:
qa_list = get_role_questions(role)
gt[role] = set(q["question"] for q in qa_list if q["question"])
return gt
def evaluate_retrieval(job_role, all_roles, k=10):
"""
Evaluate retrieval quality using Precision@k, Recall@k, and F1@k.
Args:
job_role (str): The input job role to search for.
all_roles (list): List of all available job roles in the system.
k (int): Top-k retrieved questions to evaluate.
Returns:
dict: Evaluation metrics including precision, recall, and f1.
"""
# Step 1: Ground Truth (all exact questions stored for this role)
ground_truth_qs = set(
q["question"].strip()
for q in get_role_questions(job_role)
if q.get("question")
)
if not ground_truth_qs:
print(f"[!] No ground truth found for role: {job_role}")
return {}
# Step 2: Retrieved Questions (may include fallback roles)
retrieved_qas = retrieve_interview_data(job_role, all_roles)
retrieved_qs = [q["question"].strip() for q in retrieved_qas if q.get("question")]
# Step 3: Take top-k retrieved (you can also do full if needed)
retrieved_top_k = retrieved_qs[:k]
# Step 4: Binary relevance (1 if in ground truth, 0 if not)
y_true = [1 if q in ground_truth_qs else 0 for q in retrieved_top_k]
y_pred = [1] * len(y_true) # all retrieved are treated as predicted relevant
precision = precision_score(y_true, y_pred, zero_division=0)
recall = recall_score(y_true, y_pred, zero_division=0)
f1 = f1_score(y_true, y_pred, zero_division=0)
print(f" Retrieval Evaluation for role: '{job_role}' (Top-{k})")
print(f"Precision@{k}: {precision:.2f}")
print(f"Recall@{k}: {recall:.2f}")
print(f"F1@{k}: {f1:.2f}")
print(f"Relevant Retrieved: {sum(y_true)}/{len(y_true)}")
print("–" * 40)
return {
"job_role": job_role,
"precision": precision,
"recall": recall,
"f1": f1,
"relevant_retrieved": sum(y_true),
"total_retrieved": len(y_true),
"ground_truth_count": len(ground_truth_qs),
}
k_values = [5, 10, 20]
all_roles = extract_all_roles_from_qdrant(collection_name="interview_questions")
results = []
for k in k_values:
for role in all_roles:
metrics = evaluate_retrieval(role, all_roles, k=k)
if metrics: # only if we found ground truth
metrics["k"] = k
results.append(metrics)
import pandas as pd
df = pd.DataFrame(results)
summary = df.groupby("k")[["precision", "recall", "f1"]].mean().round(3)
print(summary)
def extract_job_details(job_description):
"""Extract job details such as title, skills, experience level, and years of experience from the job description."""
title_match = re.search(r"(?i)(?:seeking|hiring) a (.+?) to", job_description)
job_title = title_match.group(1) if title_match else "Unknown"
skills_match = re.findall(r"(?i)(?:Proficiency in|Experience with|Knowledge of) (.+?)(?:,|\.| and| or)", job_description)
skills = list(set([skill.strip() for skill in skills_match])) if skills_match else []
experience_match = re.search(r"(\d+)\+? years of experience", job_description)
if experience_match:
years_experience = int(experience_match.group(1))
experience_level = "Senior" if years_experience >= 5 else "Mid" if years_experience >= 3 else "Junior"
else:
years_experience = None
experience_level = "Unknown"
return {
"job_title": job_title,
"skills": skills,
"experience_level": experience_level,
"years_experience": years_experience
}
import re
from docx import Document
import textract
from PyPDF2 import PdfReader
JOB_TITLES = [
"Accountant", "Data Scientist", "Machine Learning Engineer", "Software Engineer",
"Developer", "Analyst", "Researcher", "Intern", "Consultant", "Manager",
"Engineer", "Specialist", "Project Manager", "Product Manager", "Administrator",
"Director", "Officer", "Assistant", "Coordinator", "Supervisor"
]
def clean_filename_name(filename):
# Remove file extension
base = re.sub(r"\.[^.]+$", "", filename)
base = base.strip()
# Remove 'cv' or 'CV' words
base_clean = re.sub(r"\bcv\b", "", base, flags=re.IGNORECASE).strip()
# If after removing CV it's empty, return None
if not base_clean:
return None
# If it contains any digit, return None (unreliable)
if re.search(r"\d", base_clean):
return None
# Replace underscores/dashes with spaces, capitalize
base_clean = base_clean.replace("_", " ").replace("-", " ")
return base_clean.title()
def looks_like_job_title(line):
for title in JOB_TITLES:
pattern = r"\b" + re.escape(title.lower()) + r"\b"
if re.search(pattern, line.lower()):
return True
return False
def extract_name_from_text(lines):
# Try first 3 lines for a name, skipping job titles
for i in range(min(1, len(lines))):
line = lines[i].strip()
if looks_like_job_title(line):
return "unknown"
if re.search(r"\d", line): # skip lines with digits
continue
if len(line.split()) > 4 or len(line) > 40: # too long or many words
continue
# If line has only uppercase words, it's probably not a name
if line.isupper():
continue
# Passed checks, return title-cased line as name
return line.title()
return None
def extract_text_from_file(file_path):
if file_path.endswith('.pdf'):
reader = PdfReader(file_path)
text = "\n".join(page.extract_text() or '' for page in reader.pages)
elif file_path.endswith('.docx'):
doc = Document(file_path)
text = "\n".join([para.text for para in doc.paragraphs])
else: # For .doc or fallback
text = textract.process(file_path).decode('utf-8')
return text.strip()
def extract_candidate_details(file_path):
text = extract_text_from_file(file_path)
lines = [line.strip() for line in text.splitlines() if line.strip()]
# Extract name
filename = file_path.split("/")[-1] # just filename, no path
name = clean_filename_name(filename)
if not name:
name = extract_name_from_text(lines)
if not name:
name = "Unknown"
# Extract skills (basic version)
skills = []
skills_section = re.search(r"Skills\s*[:\-]?\s*(.+)", text, re.IGNORECASE)
if skills_section:
raw_skills = skills_section.group(1)
skills = [s.strip() for s in re.split(r",|\n|•|-", raw_skills) if s.strip()]
return {
"name": name,
"skills": skills
}
# import gradio as gr
# import time
# import tempfile
# import numpy as np
# import scipy.io.wavfile as wavfile
# import os
# import json
# from transformers import BarkModel, AutoProcessor
# import torch, gc
# import whisper
# from transformers import Wav2Vec2Processor, Wav2Vec2ForSequenceClassification
# import librosa
# import torch
# print(torch.cuda.is_available()) # ✅ Tells you if GPU is available
# torch.cuda.empty_cache()
# gc.collect()
# # Bark TTS
# print("🔁 Loading Bark model...")
# model_bark = BarkModel.from_pretrained("suno/bark").to("cuda" if torch.cuda.is_available() else "cpu")
# print("✅ Bark model loaded")
# print("🔁 Loading Bark processor...")
# processor_bark = AutoProcessor.from_pretrained("suno/bark")
# print("✅ Bark processor loaded")
# bark_voice_preset = "v2/en_speaker_5"
# def bark_tts(text):
# print(f"🔁 Synthesizing TTS for: {text}")
# # Process the text
# inputs = processor_bark(text, return_tensors="pt", voice_preset=bark_voice_preset)
# # Move tensors to device
# input_ids = inputs["input_ids"].to(model_bark.device)
# start = time.time()
# # Generate speech with only the required parameters
# with torch.no_grad():
# speech_values = model_bark.generate(
# input_ids=input_ids,
# do_sample=True,
# fine_temperature=0.4,
# coarse_temperature=0.8
# )
# print(f"✅ Bark finished in {round(time.time() - start, 2)}s")
# # Convert to audio
# speech = speech_values.cpu().numpy().squeeze()
# speech = (speech * 32767).astype(np.int16)
# temp_wav = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
# wavfile.write(temp_wav.name, 22050, speech)
# return temp_wav.name
# # Whisper STT
# print("🔁 Loading Whisper model...")
# whisper_model = whisper.load_model("base", device="cuda")
# print("✅ Whisper model loaded")
# def whisper_stt(audio_path):
# if not audio_path or not os.path.exists(audio_path): return ""
# result = whisper_model.transcribe(audio_path)
# return result["text"]
# seniority_mapping = {
# "Entry-level": 1, "Junior": 2, "Mid-Level": 3, "Senior": 4, "Lead": 5
# }
# # --- 2. Gradio App ---
# with gr.Blocks(theme=gr.themes.Soft()) as demo:
# user_data = gr.State({})
# interview_state = gr.State({})
# missing_fields_state = gr.State([])
# # --- UI Layout ---
# with gr.Column(visible=True) as user_info_section:
# gr.Markdown("## Candidate Information")
# cv_file = gr.File(label="Upload CV")
# job_desc = gr.Textbox(label="Job Description")
# start_btn = gr.Button("Continue", interactive=False)
# with gr.Column(visible=False) as missing_section:
# gr.Markdown("## Missing Information")
# name_in = gr.Textbox(label="Name", visible=False)
# role_in = gr.Textbox(label="Job Role", visible=False)
# seniority_in = gr.Dropdown(list(seniority_mapping.keys()), label="Seniority", visible=False)
# skills_in = gr.Textbox(label="Skills", visible=False)
# submit_btn = gr.Button("Submit", interactive=False)
# with gr.Column(visible=False) as interview_pre_section:
# pre_interview_greeting_md = gr.Markdown()
# start_interview_final_btn = gr.Button("Start Interview")
# with gr.Column(visible=False) as interview_section:
# gr.Markdown("## Interview in Progress")
# question_audio = gr.Audio(label="Listen", interactive=False, autoplay=True)
# question_text = gr.Markdown()
# user_audio_input = gr.Audio(sources=["microphone"], type="filepath", label="1. Record Audio Answer")
# stt_transcript = gr.Textbox(label="Transcribed Answer (edit if needed)")
# confirm_btn = gr.Button("Confirm Answer")
# evaluation_display = gr.Markdown()
# interview_summary = gr.Markdown(visible=False)
# # --- UI Logic ---
# def validate_start_btn(cv_file, job_desc):
# return gr.update(interactive=(cv_file is not None and hasattr(cv_file, "name") and bool(job_desc and job_desc.strip())))
# cv_file.change(validate_start_btn, [cv_file, job_desc], start_btn)
# job_desc.change(validate_start_btn, [cv_file, job_desc], start_btn)
# def process_and_route_initial(cv_file, job_desc):
# details = extract_candidate_details(cv_file.name)
# job_info = extract_job_details(job_desc)
# data = {
# "name": details.get("name", "unknown"), "job_role": job_info.get("job_title", "unknown"),
# "seniority": job_info.get("experience_level", "unknown"), "skills": job_info.get("skills", [])
# }
# missing = [k for k, v in data.items() if (isinstance(v, str) and v.lower() == "unknown") or not v]
# if missing:
# return data, missing, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False)
# else:
# greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' when ready."
# return data, missing, gr.update(visible=False), gr.update(visible=False), gr.update(visible=True, value=greeting)
# start_btn.click(
# process_and_route_initial,
# [cv_file, job_desc],
# [user_data, missing_fields_state, user_info_section, missing_section, pre_interview_greeting_md]
# )
# def show_missing(missing):
# if missing is None: missing = []
# return gr.update(visible="name" in missing), gr.update(visible="job_role" in missing), gr.update(visible="seniority" in missing), gr.update(visible="skills" in missing)
# missing_fields_state.change(show_missing, missing_fields_state, [name_in, role_in, seniority_in, skills_in])
# def validate_fields(name, role, seniority, skills, missing):
# if not missing: return gr.update(interactive=False)
# all_filled = all([(not ("name" in missing) or bool(name.strip())), (not ("job_role" in missing) or bool(role.strip())), (not ("seniority" in missing) or bool(seniority)), (not ("skills" in missing) or bool(skills.strip())),])
# return gr.update(interactive=all_filled)
# for inp in [name_in, role_in, seniority_in, skills_in]:
# inp.change(validate_fields, [name_in, role_in, seniority_in, skills_in, missing_fields_state], submit_btn)
# def complete_manual(data, name, role, seniority, skills):
# if data["name"].lower() == "unknown": data["name"] = name
# if data["job_role"].lower() == "unknown": data["job_role"] = role
# if data["seniority"].lower() == "unknown": data["seniority"] = seniority
# if not data["skills"]: data["skills"] = [s.strip() for s in skills.split(",")]
# greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' to begin."
# return data, gr.update(visible=False), gr.update(visible=True), gr.update(value=greeting)
# submit_btn.click(complete_manual, [user_data, name_in, role_in, seniority_in, skills_in], [user_data, missing_section, interview_pre_section, pre_interview_greeting_md])
# def start_interview(data):
# # --- Advanced state with full logging ---
# state = {
# "questions": [], "answers": [], "face_labels": [], "voice_labels": [], "timings": [],
# "question_evaluations": [], "answer_evaluations": [], "effective_confidences": [],
# "conversation_history": [],
# "difficulty_adjustment": None,
# "question_idx": 0, "max_questions": 3, "q_start_time": time.time(),
# "log": []
# }
# # --- Optionally: context retrieval here (currently just blank) ---
# context = ""
# prompt = build_interview_prompt(
# conversation_history=[], user_response="", context=context, job_role=data["job_role"],
# skills=data["skills"], seniority=data["seniority"], difficulty_adjustment=None,
# voice_label="neutral", face_label="neutral"
# )
# #here the original one
# # first_q = groq_llm.predict(prompt)
# # # Evaluate Q for quality
# # q_eval = eval_question_quality(first_q, data["job_role"], data["seniority"], None)
# # state["questions"].append(first_q)
# # state["question_evaluations"].append(q_eval)
# #here the testing one
# first_q = groq_llm.predict(prompt)
# q_eval = {
# "Score": "N/A",
# "Reasoning": "Skipped to reduce processing time",
# "Improvements": []
# }
# state["questions"].append(first_q)
# state["question_evaluations"].append(q_eval)
# state["conversation_history"].append({'role': 'Interviewer', 'content': first_q})
# start = time.perf_counter()
# audio_path = bark_tts(first_q)
# print("⏱️ Bark TTS took", time.perf_counter() - start, "seconds")
# # LOG
# state["log"].append({"type": "question", "question": first_q, "question_eval": q_eval, "timestamp": time.time()})
# return state, gr.update(visible=False), gr.update(visible=True), audio_path, f"*Question 1:* {first_q}"
# start_interview_final_btn.click(start_interview, [user_data], [interview_state, interview_pre_section, interview_section, question_audio, question_text])
# def transcribe(audio_path):
# return whisper_stt(audio_path)
# user_audio_input.change(transcribe, user_audio_input, stt_transcript)
# def process_answer(transcript, audio_path, state, data):
# if not transcript:
# return state, gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update()
# elapsed = round(time.time() - state.get("q_start_time", time.time()), 2)
# state["timings"].append(elapsed)
# state["answers"].append(transcript)
# state["conversation_history"].append({'role': 'Candidate', 'content': transcript})
# # --- 1. Emotion analysis (simplified for testing) ---
# voice_label = "neutral"
# face_label = "neutral"
# state["voice_labels"].append(voice_label)
# state["face_labels"].append(face_label)
# # --- 2. Evaluate previous Q and Answer ---
# last_q = state["questions"][-1]
# q_eval = state["question_evaluations"][-1] # Already in state
# ref_answer = generate_reference_answer(last_q, data["job_role"], data["seniority"])
# answer_eval = evaluate_answer(last_q, transcript, ref_answer, data["job_role"], data["seniority"], None)
# state["answer_evaluations"].append(answer_eval)
# answer_score = answer_eval.get("Score", "medium") if answer_eval else "medium"
# # --- 3. Adaptive difficulty ---
# if answer_score == "excellent":
# state["difficulty_adjustment"] = "harder"
# elif answer_score in ("medium", "poor"):
# state["difficulty_adjustment"] = "easier"
# else:
# state["difficulty_adjustment"] = None
# # --- 4. Effective confidence (simplified) ---
# eff_conf = {"effective_confidence": 0.6}
# state["effective_confidences"].append(eff_conf)
# # --- LOG ---
# state["log"].append({
# "type": "answer",
# "question": last_q,
# "answer": transcript,
# "answer_eval": answer_eval,
# "ref_answer": ref_answer,
# "face_label": face_label,
# "voice_label": voice_label,
# "effective_confidence": eff_conf,
# "timing": elapsed,
# "timestamp": time.time()
# })
# # --- Next or End ---
# qidx = state["question_idx"] + 1
# if qidx >= state["max_questions"]:
# # Save as JSON (optionally)
# timestamp = time.strftime("%Y%m%d_%H%M%S")
# log_file = f"interview_log_{timestamp}.json"
# with open(log_file, "w", encoding="utf-8") as f:
# json.dump(state["log"], f, indent=2, ensure_ascii=False)
# # Report
# summary = "# Interview Summary\n"
# for i, q in enumerate(state["questions"]):
# summary += (f"\n### Q{i + 1}: {q}\n"
# f"- *Answer*: {state['answers'][i]}\n"
# f"- *Q Eval*: {state['question_evaluations'][i]}\n"
# f"- *A Eval*: {state['answer_evaluations'][i]}\n"
# f"- *Time*: {state['timings'][i]}s\n")
# summary += f"\n\n⏺ Full log saved as {log_file}."
# return (state, gr.update(visible=True, value=summary), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(visible=True, value=f"Last Detected — Face: {face_label}, Voice: {voice_label}"))
# else:
# # --- Build next prompt using adaptive difficulty ---
# state["question_idx"] = qidx
# state["q_start_time"] = time.time()
# context = "" # You can add your context logic here
# prompt = build_interview_prompt(
# conversation_history=state["conversation_history"],
# user_response=transcript,
# context=context,
# job_role=data["job_role"],
# skills=data["skills"],
# seniority=data["seniority"],
# difficulty_adjustment=state["difficulty_adjustment"],
# voice_label=voice_label,
# )
# next_q = groq_llm.predict(prompt)
# # Evaluate Q quality
# q_eval = eval_question_quality(next_q, data["job_role"], data["seniority"], None)
# state["questions"].append(next_q)
# state["question_evaluations"].append(q_eval)
# state["conversation_history"].append({'role': 'Interviewer', 'content': next_q})
# state["log"].append({"type": "question", "question": next_q, "question_eval": q_eval, "timestamp": time.time()})
# audio_path = bark_tts(next_q)
# # Display evaluations
# eval_md = f"*Last Answer Eval:* {answer_eval}\n\n*Effective Confidence:* {eff_conf}"
# return (
# state, gr.update(visible=False), audio_path, f"*Question {qidx + 1}:* {next_q}",
# gr.update(value=None), gr.update(value=None),
# gr.update(visible=True, value=eval_md),
# )
# # Replace your confirm_btn.click with this:
# confirm_btn.click(
# process_answer,
# [stt_transcript, user_audio_input, interview_state, user_data], # Added None for video_path
# [interview_state, interview_summary, question_audio, question_text, user_audio_input, stt_transcript, evaluation_display]
# ).then(
# lambda: (gr.update(value=None), gr.update(value=None)), None, [user_audio_input, stt_transcript]
# )
# demo.launch(debug=True)
import gradio as gr
import time
import tempfile
import numpy as np
import scipy.io.wavfile as wavfile
import os
import json
from transformers import BarkModel, AutoProcessor
import torch, gc
import whisper
import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor
print(torch.cuda.is_available())
torch.cuda.empty_cache()
gc.collect()
# Global variables for lazy loading
model_bark = None
processor_bark = None
whisper_model = None
bark_voice_preset = "v2/en_speaker_9"
# Thread pool for async operations
executor = ThreadPoolExecutor(max_workers=2)
# Add after your imports
if torch.cuda.is_available():
print(f"🔥 CUDA Available: {torch.cuda.get_device_name(0)}")
print(f"🔥 CUDA Memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB")
# Set default device
torch.cuda.set_device(0)
else:
print("⚠️ CUDA not available, using CPU")
def load_models_lazy():
"""Load models only when needed"""
global model_bark, processor_bark, whisper_model
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"🔁 Using device: {device}")
if model_bark is None:
print("🔁 Loading Bark model...")
model_bark = BarkModel.from_pretrained("suno/bark").to(device)
print(f"✅ Bark model loaded on {device}")
if processor_bark is None:
print("🔁 Loading Bark processor...")
processor_bark = AutoProcessor.from_pretrained("suno/bark")
print("✅ Bark processor loaded")
if whisper_model is None:
print("🔁 Loading Whisper model...")
whisper_model = whisper.load_model("base", device=device)
print(f"✅ Whisper model loaded on {device}")
def bark_tts_async(text):
"""Async TTS generation"""
def _generate():
load_models_lazy() # Load only when needed
print(f"🔁 Synthesizing TTS for: {text}")
# Ensure we're using the correct device
device = next(model_bark.parameters()).device
print(f"🔁 Bark model is on device: {device}")
inputs = processor_bark(text, return_tensors="pt", voice_preset=bark_voice_preset)
input_ids = inputs["input_ids"].to(device) # Move to same device as model
start = time.time()
with torch.no_grad():
speech_values = model_bark.generate(
input_ids=input_ids,
do_sample=True,
fine_temperature=0.4,
coarse_temperature=0.8
)
print(f"✅ Bark finished in {round(time.time() - start, 2)}s on {device}")
speech = speech_values.cpu().numpy().squeeze()
speech = (speech * 32767).astype(np.int16)
temp_wav = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
wavfile.write(temp_wav.name, 22050, speech)
return temp_wav.name
return executor.submit(_generate)
def whisper_stt(audio_path):
"""Lazy loading whisper STT"""
if not audio_path or not os.path.exists(audio_path):
return ""
load_models_lazy() # Load only when needed
# Check what device Whisper is actually using
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"🔁 Whisper transcribing on {device}")
result = whisper_model.transcribe(audio_path)
return result["text"]
seniority_mapping = {
"Entry-level": 1, "Junior": 2, "Mid-Level": 3, "Senior": 4, "Lead": 5
}
with gr.Blocks(theme=gr.themes.Soft()) as demo:
user_data = gr.State({})
interview_state = gr.State({})
missing_fields_state = gr.State([])
tts_future = gr.State(None) # Store async TTS future
with gr.Column(visible=True) as user_info_section:
gr.Markdown("## Candidate Information")
cv_file = gr.File(label="Upload CV")
job_desc = gr.Textbox(label="Job Description")
start_btn = gr.Button("Continue", interactive=False)
with gr.Column(visible=False) as missing_section:
gr.Markdown("## Missing Information")
name_in = gr.Textbox(label="Name", visible=False)
role_in = gr.Textbox(label="Job Role", visible=False)
seniority_in = gr.Dropdown(list(seniority_mapping.keys()), label="Seniority", visible=False)
skills_in = gr.Textbox(label="Skills", visible=False)
submit_btn = gr.Button("Submit", interactive=False)
with gr.Column(visible=False) as interview_pre_section:
pre_interview_greeting_md = gr.Markdown()
start_interview_final_btn = gr.Button("Start Interview")
loading_status = gr.Markdown("", visible=False)
with gr.Column(visible=False) as interview_section:
gr.Markdown("## Interview in Progress")
question_audio = gr.Audio(label="Listen", interactive=False, autoplay=True)
question_text = gr.Markdown()
user_audio_input = gr.Audio(sources=["microphone"], type="filepath", label="1. Record Audio Answer")
stt_transcript = gr.Textbox(label="Transcribed Answer (edit if needed)")
confirm_btn = gr.Button("Confirm Answer")
evaluation_display = gr.Markdown()
interview_summary = gr.Markdown(visible=False)
def validate_start_btn(cv_file, job_desc):
return gr.update(interactive=(cv_file is not None and hasattr(cv_file, "name") and bool(job_desc and job_desc.strip())))
cv_file.change(validate_start_btn, [cv_file, job_desc], start_btn)
job_desc.change(validate_start_btn, [cv_file, job_desc], start_btn)
def process_and_route_initial(cv_file, job_desc):
details = extract_candidate_details(cv_file.name)
job_info = extract_job_details(job_desc)
data = {
"name": details.get("name", "unknown"),
"job_role": job_info.get("job_title", "unknown"),
"seniority": job_info.get("experience_level", "unknown"),
"skills": job_info.get("skills", [])
}
missing = [k for k, v in data.items() if (isinstance(v, str) and v.lower() == "unknown") or not v]
if missing:
return data, missing, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False)
else:
greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' when ready."
return data, missing, gr.update(visible=False), gr.update(visible=False), gr.update(visible=True, value=greeting)
start_btn.click(process_and_route_initial, [cv_file, job_desc], [user_data, missing_fields_state, user_info_section, missing_section, pre_interview_greeting_md])
def show_missing(missing):
if missing is None: missing = []
return gr.update(visible="name" in missing), gr.update(visible="job_role" in missing), gr.update(visible="seniority" in missing), gr.update(visible="skills" in missing)
missing_fields_state.change(show_missing, missing_fields_state, [name_in, role_in, seniority_in, skills_in])
def validate_fields(name, role, seniority, skills, missing):
if not missing: return gr.update(interactive=False)
all_filled = all([(not ("name" in missing) or bool(name.strip())), (not ("job_role" in missing) or bool(role.strip())), (not ("seniority" in missing) or bool(seniority)), (not ("skills" in missing) or bool(skills.strip()))])
return gr.update(interactive=all_filled)
for inp in [name_in, role_in, seniority_in, skills_in]:
inp.change(validate_fields, [name_in, role_in, seniority_in, skills_in, missing_fields_state], submit_btn)
def complete_manual(data, name, role, seniority, skills):
if data["name"].lower() == "unknown": data["name"] = name
if data["job_role"].lower() == "unknown": data["job_role"] = role
if data["seniority"].lower() == "unknown": data["seniority"] = seniority
if not data["skills"]: data["skills"] = [s.strip() for s in skills.split(",")]
greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' to begin."
return data, gr.update(visible=False), gr.update(visible=True), gr.update(value=greeting)
submit_btn.click(complete_manual, [user_data, name_in, role_in, seniority_in, skills_in], [user_data, missing_section, interview_pre_section, pre_interview_greeting_md])
def start_interview_immediate(data):
"""Start interview immediately, begin TTS generation in background"""
state = {
"questions": [], "answers": [], "timings": [], "question_evaluations": [], "answer_evaluations": [],
"conversation_history": [], "difficulty_adjustment": None, "question_idx": 0, "max_questions": 3,
"q_start_time": time.time(), "log": []
}
# Generate question text first (fast)
context = ""
prompt = build_interview_prompt(
conversation_history=[], user_response="", context=context, job_role=data["job_role"],
skills=data["skills"], seniority=data["seniority"], difficulty_adjustment=None, voice_label="neutral"
)
first_q = groq_llm.predict(prompt)
q_eval = {"Score": "N/A", "Reasoning": "Skipped to reduce processing time", "Improvements": []}
state["questions"].append(first_q)
state["question_evaluations"].append(q_eval)
state["conversation_history"].append({'role': 'Interviewer', 'content': first_q})
state["log"].append({"type": "question", "question": first_q, "question_eval": q_eval, "timestamp": time.time()})
# Start TTS generation in background
tts_future_obj = bark_tts_async(first_q)
# Return immediately with loading message
return (state, tts_future_obj,
gr.update(visible=False),
gr.update(visible=True),
gr.update(visible=True, value="🔄 Generating audio..."),
gr.update(value=None),
f"*Question 1:* {first_q}")
def check_tts_ready(state, tts_future_obj):
"""Check if TTS is ready and update audio"""
if tts_future_obj and tts_future_obj.done():
try:
audio_path = tts_future_obj.result()
return gr.update(value=audio_path), gr.update(visible=False), None
except Exception as e:
print(f"TTS Error: {e}")
return gr.update(value=None), gr.update(value=f"Error generating audio: {e}"), None
else:
return gr.update(), gr.update(), tts_future_obj
start_interview_final_btn.click(
start_interview_immediate,
[user_data],
[interview_state, tts_future, interview_pre_section, interview_section, loading_status, question_audio, question_text]
).then(
# Check TTS status every 500ms
check_tts_ready,
[interview_state, tts_future],
[question_audio, loading_status, tts_future],
every=0.5
)
def transcribe(audio_path):
return whisper_stt(audio_path)
user_audio_input.change(transcribe, user_audio_input, stt_transcript)
def process_answer(transcript, audio_path, state, data):
if not transcript:
return state, gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update()
elapsed = round(time.time() - state.get("q_start_time", time.time()), 2)
state["timings"].append(elapsed)
state["answers"].append(transcript)
state["conversation_history"].append({'role': 'Candidate', 'content': transcript})
last_q = state["questions"][-1]
q_eval = state["question_evaluations"][-1]
ref_answer = generate_reference_answer(last_q, data["job_role"], data["seniority"])
answer_eval = evaluate_answer(last_q, transcript, ref_answer, data["job_role"], data["seniority"], None)
state["answer_evaluations"].append(answer_eval)
answer_score = answer_eval.get("Score", "medium") if answer_eval else "medium"
if answer_score == "excellent":
state["difficulty_adjustment"] = "harder"
elif answer_score in ("medium", "poor"):
state["difficulty_adjustment"] = "easier"
else:
state["difficulty_adjustment"] = None
state["log"].append({
"type": "answer", "question": last_q, "answer": transcript,
"answer_eval": answer_eval, "ref_answer": ref_answer,
"timing": elapsed, "timestamp": time.time()
})
qidx = state["question_idx"] + 1
if qidx >= state["max_questions"]:
timestamp = time.strftime("%Y%m%d_%H%M%S")
log_file = f"interview_log_{timestamp}.json"
with open(log_file, "w", encoding="utf-8") as f:
json.dump(state["log"], f, indent=2, ensure_ascii=False)
summary = "# Interview Summary\n"
for i, q in enumerate(state["questions"]):
summary += (f"\n### Q{i + 1}: {q}\n"
f"- *Answer*: {state['answers'][i]}\n"
f"- *Q Eval*: {state['question_evaluations'][i]}\n"
f"- *A Eval*: {state['answer_evaluations'][i]}\n"
f"- *Time*: {state['timings'][i]}s\n")
summary += f"\n\n⏺ Full log saved as {log_file}."
return state, gr.update(visible=True, value=summary), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(visible=False)
else:
state["question_idx"] = qidx
state["q_start_time"] = time.time()
context = ""
prompt = build_interview_prompt(
conversation_history=state["conversation_history"],
user_response=transcript, context=context,
job_role=data["job_role"], skills=data["skills"],
seniority=data["seniority"], difficulty_adjustment=state["difficulty_adjustment"],
voice_label="neutral"
)
next_q = groq_llm.predict(prompt)
q_eval = eval_question_quality(next_q, data["job_role"], data["seniority"], None)
state["questions"].append(next_q)
state["question_evaluations"].append(q_eval)
state["conversation_history"].append({'role': 'Interviewer', 'content': next_q})
state["log"].append({"type": "question", "question": next_q, "question_eval": q_eval, "timestamp": time.time()})
# Generate TTS asynchronously for next question too
audio_future = bark_tts_async(next_q)
# For now, we'll wait for it (you can make this async too)
audio_path = audio_future.result()
eval_md = f"*Last Answer Eval:* {answer_eval}"
return state, gr.update(visible=False), audio_path, f"*Question {qidx + 1}:* {next_q}", gr.update(value=None), gr.update(value=None), gr.update(visible=True, value=eval_md)
confirm_btn.click(
process_answer,
[stt_transcript, user_audio_input, interview_state, user_data],
[interview_state, interview_summary, question_audio, question_text, user_audio_input, stt_transcript, evaluation_display]
).then(
lambda: (gr.update(value=None), gr.update(value=None)), None, [user_audio_input, stt_transcript]
)
demo.launch(debug=True)