Spaces:
Paused
Paused
import requests | |
import os | |
import json | |
from langchain_groq import ChatGroq | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from langchain_community.vectorstores import Qdrant | |
from langchain.prompts import PromptTemplate | |
from langchain.chains import LLMChain | |
from langchain.retrievers import ContextualCompressionRetriever | |
from langchain.retrievers.document_compressors import CohereRerank | |
from qdrant_client import QdrantClient | |
import cohere | |
import json | |
import re | |
import time | |
from collections import defaultdict | |
from qdrant_client.http import models | |
from qdrant_client.models import PointStruct | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.neighbors import NearestNeighbors | |
from transformers import AutoTokenizer | |
#from langchain_huggingface import HuggingFaceEndpoint | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
import numpy as np | |
import os | |
from dotenv import load_dotenv | |
from enum import Enum | |
import time | |
from inputimeout import inputimeout, TimeoutOccurred | |
# Import Qdrant client and models (adjust based on your environment) | |
from qdrant_client import QdrantClient | |
from qdrant_client.http.models import VectorParams, Distance, Filter, FieldCondition, MatchValue | |
from qdrant_client.http.models import PointStruct, Filter, FieldCondition, MatchValue, SearchRequest | |
import traceback | |
from transformers import pipeline | |
from textwrap import dedent | |
import json | |
import logging | |
from transformers import pipeline,BitsAndBytesConfig | |
import os | |
cohere_api_key = os.getenv("COHERE_API_KEY") | |
chat_groq_api = os.getenv("GROQ_API_KEY") | |
hf_api_key = os.getenv("HF_API_KEY") | |
qdrant_api = os.getenv("QDRANT_API_KEY") | |
qdrant_url = os.getenv("QDRANT_API_URL") | |
print("GROQ API Key:", chat_groq_api) | |
print("QDRANT API Key:", qdrant_api) | |
print("QDRANT API URL:", qdrant_url) | |
print("Cohere API Key:", cohere_api_key) | |
from qdrant_client import QdrantClient | |
qdrant_client = QdrantClient( | |
url="https://313b1ceb-057f-4b7b-89f5-7b19a213fe65.us-east-1-0.aws.cloud.qdrant.io:6333", | |
api_key="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2Nlc3MiOiJtIn0.w13SPZbljbSvt9Ch_0r034QhMFlmEr4ctXqLo2zhxm4", | |
) | |
print(qdrant_client.get_collections()) | |
class CustomChatGroq: | |
def __init__(self, temperature, model_name, api_key): | |
self.temperature = temperature | |
self.model_name = model_name | |
self.api_key = api_key | |
self.api_url = "https://api.groq.com/openai/v1/chat/completions" | |
def predict(self, prompt): | |
"""Send a request to the Groq API and return the generated response.""" | |
try: | |
headers = { | |
"Authorization": f"Bearer {self.api_key}", | |
"Content-Type": "application/json" | |
} | |
payload = { | |
"model": self.model_name, | |
"messages": [{"role": "system", "content": "You are an AI interviewer."}, | |
{"role": "user", "content": prompt}], | |
"temperature": self.temperature, | |
"max_tokens": 150 | |
} | |
response = requests.post(self.api_url, headers=headers, json=payload, timeout=10) | |
response.raise_for_status() # Raise an error for HTTP codes 4xx/5xx | |
data = response.json() | |
# Extract response text based on Groq API response format | |
if "choices" in data and len(data["choices"]) > 0: | |
return data["choices"][0]["message"]["content"].strip() | |
logging.warning("Unexpected response structure from Groq API") | |
return "Interviewer: Could you tell me more about your relevant experience?" | |
except requests.exceptions.RequestException as e: | |
logging.error(f"ChatGroq API error: {e}") | |
return "Interviewer: Due to a system issue, let's move on to another question." | |
groq_llm = ChatGroq( | |
temperature=0.7, | |
model_name="llama-3.3-70b-versatile", | |
api_key=chat_groq_api | |
) | |
from huggingface_hub import login | |
import os | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
if HF_TOKEN: | |
login(HF_TOKEN) | |
else: | |
raise EnvironmentError("Missing HF_TOKEN environment variable.") | |
#Load mistral Model | |
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
import torch | |
print(torch.cuda.is_available()) | |
MODEL_PATH = "mistralai/Mistral-7B-Instruct-v0.3" | |
#MODEL_PATH = "tiiuae/falcon-rw-1b" | |
bnb_config = BitsAndBytesConfig( | |
load_in_8bit=True, | |
) | |
mistral_tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH,token=hf_api_key) | |
judge_llm = AutoModelForCausalLM.from_pretrained( | |
MODEL_PATH, | |
quantization_config=bnb_config,torch_dtype=torch.float16, | |
device_map="auto", | |
token=hf_api_key | |
) | |
judge_llm.config.pad_token_id = judge_llm.config.eos_token_id | |
print(judge_llm.hf_device_map) | |
judge_pipeline = pipeline( | |
"text-generation", | |
model=judge_llm, | |
tokenizer=mistral_tokenizer, | |
max_new_tokens=128, | |
temperature=0.3, | |
top_p=0.9, | |
do_sample=True, # Optional but recommended with temperature/top_p | |
repetition_penalty=1.1, | |
) | |
output = judge_pipeline("Q: What is Python?\nA:", max_new_tokens=128)[0]['generated_text'] | |
print(output) | |
# embedding model | |
from sentence_transformers import SentenceTransformer | |
class LocalEmbeddings: | |
def __init__(self, model_name="all-MiniLM-L6-v2"): | |
self.model = SentenceTransformer(model_name) | |
def embed_query(self, text): | |
return self.model.encode(text).tolist() | |
def embed_documents(self, documents): | |
return self.model.encode(documents).tolist() | |
embeddings = LocalEmbeddings() | |
# import cohere | |
qdrant_client = QdrantClient(url=qdrant_url, api_key=qdrant_api,check_compatibility=False) | |
co = cohere.Client(api_key=cohere_api_key) | |
class EvaluationScore(str, Enum): | |
POOR = "Poor" | |
MEDIUM = "Medium" | |
GOOD = "Good" | |
EXCELLENT = "Excellent" | |
# Cohere Reranker | |
class CohereReranker: | |
def __init__(self, client): | |
self.client = client | |
def compress_documents(self, documents, query): | |
if not documents: | |
return [] | |
doc_texts = [doc.page_content for doc in documents] | |
try: | |
reranked = self.client.rerank( | |
query=query, | |
documents=doc_texts, | |
model="rerank-english-v2.0", | |
top_n=5 | |
) | |
return [documents[result.index] for result in reranked.results] | |
except Exception as e: | |
logging.error(f"Error in CohereReranker.compress_documents: {e}") | |
return documents[:5] | |
reranker = CohereReranker(co) | |
def load_data_from_json(file_path): | |
"""Load interview Q&A data from a JSON file.""" | |
try: | |
with open(file_path, "r", encoding="utf-8") as f: | |
data = json.load(f) | |
job_role_buckets = defaultdict(list) | |
for idx, item in enumerate(data): | |
try: | |
job_role = item["Job Role"].lower().strip() | |
question = item["Questions"].strip() | |
answer = item["Answers"].strip() | |
job_role_buckets[job_role].append({"question": question, "answer": answer}) | |
except KeyError as e: | |
logging.warning(f"Skipping item {idx}: missing key {e}") | |
return job_role_buckets # <--- You missed this! | |
except Exception as e: | |
logging.error(f"Error loading data: {e}") | |
raise | |
def verify_qdrant_collection(collection_name='interview_questions'): | |
"""Verify if a Qdrant collection exists with the correct configuration.""" | |
try: | |
collection_info = qdrant_client.get_collection(collection_name) | |
vector_size = collection_info.config.params.vectors.size | |
logging.info(f"Collection '{collection_name}' exists with vector size: {vector_size}") | |
return True | |
except Exception as e: | |
logging.warning(f"Collection '{collection_name}' not found: {e}") | |
return False | |
def store_data_to_qdrant(data, collection_name='interview_questions', batch_size=100): | |
"""Store interview data in the Qdrant vector database.""" | |
try: | |
# Check if collection exists, otherwise create it | |
if not verify_qdrant_collection(collection_name): | |
try: | |
qdrant_client.create_collection( | |
collection_name=collection_name, | |
vectors_config=VectorParams(size=384, distance=Distance.COSINE) | |
) | |
logging.info(f"Created collection '{collection_name}'") | |
except Exception as e: | |
logging.error(f"Error creating collection: {e}\n{traceback.format_exc()}") | |
return False | |
points = [] | |
point_id = 0 | |
total_points = sum(len(qa_list) for qa_list in data.values()) | |
processed = 0 | |
for job_role, qa_list in data.items(): | |
for entry in qa_list: | |
try: | |
emb = embeddings.embed_query(entry["question"]) | |
print(f"Embedding shape: {len(emb)}") | |
if not emb or len(emb) != 384: | |
logging.warning(f"Skipping point {point_id} due to invalid embedding length: {len(emb)}") | |
continue | |
points.append(PointStruct( | |
id=point_id, | |
vector=emb, | |
payload={ | |
"job_role": job_role, | |
"question": entry["question"], | |
"answer": entry["answer"] | |
} | |
)) | |
point_id += 1 | |
processed += 1 | |
# Batch upload | |
if len(points) >= batch_size: | |
try: | |
qdrant_client.upsert(collection_name=collection_name, points=points) | |
logging.info(f"Stored {processed}/{total_points} points ({processed/total_points*100:.1f}%)") | |
except Exception as upsert_err: | |
logging.error(f"Error during upsert: {upsert_err}\n{traceback.format_exc()}") | |
points = [] | |
except Exception as embed_err: | |
logging.error(f"Embedding error for point {point_id}: {embed_err}\n{traceback.format_exc()}") | |
# Final batch upload | |
if points: | |
try: | |
qdrant_client.upsert(collection_name=collection_name, points=points) | |
logging.info(f"Stored final batch of {len(points)} points") | |
except Exception as final_upsert_err: | |
logging.error(f"Error during final upsert: {final_upsert_err}\n{traceback.format_exc()}") | |
# Final verification | |
try: | |
count = qdrant_client.count(collection_name=collection_name, exact=True).count | |
print("Current count:", count) | |
logging.info(f"✅ Successfully stored {count} points in Qdrant") | |
if count != total_points: | |
logging.warning(f"Expected {total_points} points but stored {count}") | |
except Exception as count_err: | |
logging.error(f"Error verifying stored points: {count_err}\n{traceback.format_exc()}") | |
return True | |
except Exception as e: | |
logging.error(f"Error storing data to Qdrant: {e}\n{traceback.format_exc()}") | |
return False | |
# to ensure cosine similarity use | |
info = qdrant_client.get_collection("interview_questions") | |
print(info.config.params.vectors.distance) | |
def extract_all_roles_from_qdrant(collection_name='interview_questions'): | |
""" Extract all unique job roles from the Qdrant vector store """ | |
try: | |
all_roles = set() | |
scroll_offset = None | |
while True: | |
response = qdrant_client.scroll( | |
collection_name=collection_name, | |
limit=200, | |
offset=scroll_offset, | |
with_payload=True | |
) | |
points, next_page_offset = response | |
if not points: | |
break | |
for point in points: | |
role = point.payload.get("job_role", "").strip().lower() | |
if role: | |
all_roles.add(role) | |
if not next_page_offset: | |
break | |
scroll_offset = next_page_offset | |
if not all_roles: | |
logging.warning("[Qdrant] No roles found in payloads.") | |
else: | |
logging.info(f"[Qdrant] Extracted {len(all_roles)} unique job roles.") | |
return list(all_roles) | |
except Exception as e: | |
logging.error(f"Error extracting roles from Qdrant: {e}") | |
return [] | |
import numpy as np | |
import logging | |
from sklearn.metrics.pairwise import cosine_similarity | |
def find_similar_roles(user_role, all_roles, top_k=3): | |
""" | |
Find the most similar job roles to the given user_role using embeddings. | |
""" | |
try: | |
# Clean inputs | |
user_role = user_role.strip().lower() | |
if not user_role or not all_roles or not isinstance(all_roles, list): | |
logging.warning("Invalid input for role similarity") | |
return [] | |
# Embed user role | |
try: | |
user_embedding = embeddings.embed_query(user_role) | |
if user_embedding is None: | |
logging.error("User embedding is None") | |
return [] | |
except Exception as e: | |
logging.error(f"Error embedding user role: {type(e).__name__}: {e}") | |
return [] | |
# Embed all roles | |
try: | |
role_embeddings = [] | |
valid_roles = [] | |
for role in all_roles: | |
emb = embeddings.embed_query(role.lower()) | |
if emb is not None: | |
role_embeddings.append(emb) | |
valid_roles.append(role) | |
else: | |
logging.warning(f"Skipping role with no embedding: {role}") | |
except Exception as e: | |
logging.error(f"Error embedding all roles: {type(e).__name__}: {e}") | |
return [] | |
if not role_embeddings: | |
logging.error("All role embeddings failed") | |
return [] | |
# Compute similarities | |
similarities = cosine_similarity([user_embedding], role_embeddings)[0] | |
top_indices = np.argsort(similarities)[::-1][:top_k] | |
similar_roles = [valid_roles[i] for i in top_indices] | |
logging.debug(f"Similar roles to '{user_role}': {similar_roles}") | |
return similar_roles | |
except Exception as e: | |
logging.error(f"Error finding similar roles: {type(e).__name__}: {e}", exc_info=True) | |
return [] | |
# RETREIVE ALL DATA RELATED TO THE JOB ROLE NOT JUST TOP_K | |
def get_role_questions(job_role): | |
try: | |
if not job_role: | |
logging.warning("Job role is empty.") | |
return [] | |
filter_by_role = Filter( | |
must=[FieldCondition( | |
key="job_role", | |
match=MatchValue(value=job_role.lower()) | |
)] | |
) | |
all_results = [] | |
offset = None | |
while True: | |
results, next_page_offset = qdrant_client.scroll( | |
collection_name="interview_questions", | |
scroll_filter=filter_by_role, | |
with_payload=True, | |
with_vectors=False, | |
limit=100, # batch size | |
offset=offset | |
) | |
all_results.extend(results) | |
if not next_page_offset: | |
break | |
offset = next_page_offset | |
parsed_results = [{ | |
"question": r.payload.get("question"), | |
"answer": r.payload.get("answer"), | |
"job_role": r.payload.get("job_role") | |
} for r in all_results] | |
return parsed_results | |
except Exception as e: | |
logging.error(f"Error fetching role questions: {type(e).__name__}: {e}", exc_info=True) | |
return [] | |
def retrieve_interview_data(job_role, all_roles): | |
""" | |
Retrieve all interview Q&A for a given job role. | |
Falls back to similar roles if no data found. | |
Args: | |
job_role (str): Input job role (can be misspelled) | |
all_roles (list): Full list of available job roles | |
Returns: | |
list: List of QA dicts with keys: 'question', 'answer', 'job_role' | |
""" | |
import logging | |
logging.basicConfig(level=logging.INFO) | |
job_role = job_role.strip().lower() | |
seen_questions = set() | |
final_results = [] | |
# Step 1: Try exact match (fetch all questions for role) | |
logging.info(f"Trying to fetch all data for exact role: '{job_role}'") | |
exact_matches = get_role_questions(job_role) | |
for qa in exact_matches: | |
question = qa["question"] | |
if question and question not in seen_questions: | |
seen_questions.add(question) | |
final_results.append(qa) | |
if final_results: | |
logging.info(f"Found {len(final_results)} QA pairs for exact role '{job_role}'") | |
return final_results | |
logging.warning(f"No data found for role '{job_role}'. Trying similar roles...") | |
# Step 2: No matches — find similar roles | |
similar_roles = find_similar_roles(job_role, all_roles, top_k=3) | |
if not similar_roles: | |
logging.warning("No similar roles found.") | |
return [] | |
logging.info(f"Found similar roles: {similar_roles}") | |
# Step 3: Retrieve data for each similar role (all questions) | |
for role in similar_roles: | |
logging.info(f"Fetching data for similar role: '{role}'") | |
role_qa = get_role_questions(role) | |
for qa in role_qa: | |
question = qa["question"] | |
if question and question not in seen_questions: | |
seen_questions.add(question) | |
final_results.append(qa) | |
logging.info(f"Retrieved total {len(final_results)} QA pairs from similar roles") | |
return final_results | |
import random | |
def random_context_chunks(retrieved_data, k=3): | |
chunks = random.sample(retrieved_data, k) | |
return "\n\n".join([f"Q: {item['question']}\nA: {item['answer']}" for item in chunks]) | |
import json | |
import logging | |
import re | |
from typing import Dict | |
def eval_question_quality( | |
question: str, | |
job_role: str, | |
seniority: str | |
) -> Dict[str, str]: | |
""" | |
Evaluate the quality of a generated interview question using Groq LLM. | |
Returns a structured JSON with score, reasoning, and suggestions. | |
""" | |
import time, json | |
prompt = f""" | |
You are a senior AI hiring expert evaluating the quality of an interview question for a {seniority} {job_role} role. | |
Evaluate the question based on: | |
- Relevance to the role and level | |
- Clarity and conciseness | |
- Depth of technical insight | |
--- | |
Question: {question} | |
--- | |
Respond only with a valid JSON like: | |
{{ | |
"Score": "Poor" | "Medium" | "Good" | "Excellent", | |
"Reasoning": "short justification", | |
"Improvements": ["tip1", "tip2"] | |
}} | |
""" | |
try: | |
start = time.time() | |
response = groq_llm.invoke(prompt) | |
print("⏱️ eval_question_quality duration:", round(time.time() - start, 2), "s") | |
# Extract JSON safely | |
start_idx = response.rfind("{") | |
end_idx = response.rfind("}") + 1 | |
json_str = response[start_idx:end_idx] | |
result = json.loads(json_str) | |
if result.get("Score") in {"Poor", "Medium", "Good", "Excellent"}: | |
return result | |
else: | |
raise ValueError("Invalid Score value in model output") | |
except Exception as e: | |
print(f"⚠️ eval_question_quality fallback: {e}") | |
return { | |
"Score": "Poor", | |
"Reasoning": "Evaluation failed, using fallback.", | |
"Improvements": [ | |
"Ensure the question is relevant and clear.", | |
"Avoid vague or overly generic phrasing.", | |
"Include role-specific context if needed." | |
] | |
} | |
def evaluate_answer( | |
question: str, | |
answer: str, | |
ref_answer: str, | |
job_role: str, | |
seniority: str, | |
) -> Dict[str, str]: | |
""" | |
Fast and structured answer evaluation using Groq LLM (e.g. Mixtral or LLaMA 3). | |
""" | |
import time, json | |
from langchain_core.messages import AIMessage | |
prompt = f""" | |
You are a technical interviewer evaluating a candidate for a {seniority} {job_role} role. | |
Evaluate the response based on: | |
- Technical correctness | |
- Clarity | |
- Relevance | |
- Structure | |
--- | |
Question: {question} | |
Candidate Answer: {answer} | |
Reference Answer: {ref_answer} | |
--- | |
Respond ONLY with valid JSON in the following format: | |
{{ | |
"Score": "Poor" | "Medium" | "Good" | "Excellent", | |
"Reasoning": "short justification", | |
"Improvements": ["tip1", "tip2"] | |
}} | |
""" | |
try: | |
start = time.time() | |
raw = groq_llm.invoke(prompt) | |
print("⏱️ evaluate_answer duration:", round(time.time() - start, 2), "s") | |
if isinstance(raw, AIMessage): | |
output = raw.content | |
else: | |
output = str(raw) | |
print("🔍 Raw Groq Response:\n", output) | |
start_idx = output.rfind("{") | |
end_idx = output.rfind("}") + 1 | |
json_str = output[start_idx:end_idx] | |
result = json.loads(json_str) | |
if result.get("Score") in {"Poor", "Medium", "Good", "Excellent"}: | |
return result | |
else: | |
raise ValueError("Invalid score value") | |
except Exception as e: | |
print(f"⚠️ evaluate_answer fallback: {e}") | |
return { | |
"Score": "Poor", | |
"Reasoning": "Failed to evaluate properly. Defaulted to Poor.", | |
"Improvements": [ | |
"Be more specific", | |
"Add technical details", | |
"Structure the answer clearly" | |
] | |
} | |
# SAME BUT USING LLAMA 3.3 FROM GROQ | |
def generate_reference_answer(question, job_role, seniority): | |
""" | |
Generates a high-quality reference answer using Groq-hosted LLaMA model. | |
Args: | |
question (str): Interview question to answer. | |
job_role (str): Target job role (e.g., "Frontend Developer"). | |
seniority (str): Experience level (e.g., "Mid-Level"). | |
Returns: | |
str: Clean, generated reference answer or error message. | |
""" | |
try: | |
# Clean, role-specific prompt | |
prompt = f"""You are a {seniority} {job_role}. | |
Q: {question} | |
A:""" | |
# Use Groq-hosted model to generate the answer | |
ref_answer = groq_llm.predict(prompt) | |
if not ref_answer.strip(): | |
return "Reference answer not generated." | |
return ref_answer.strip() | |
except Exception as e: | |
logging.error(f"Error generating reference answer: {e}", exc_info=True) | |
return "Unable to generate reference answer due to an error" | |
def build_interview_prompt(conversation_history, user_response, context, job_role, skills, seniority, | |
difficulty_adjustment=None, voice_label=None, face_label=None, effective_confidence=None): | |
"""Build a prompt for generating the next interview question with adaptive difficulty and fairness logic.""" | |
interview_template = """ | |
You are an AI interviewer conducting a real-time interview for a {job_role} position. | |
Your objective is to thoroughly evaluate the candidate's suitability for the role using smart, structured, and adaptive questioning. | |
--- | |
Interview Rules and Principles: | |
- The **baseline difficulty** of questions must match the candidate’s seniority level (e.g., junior, mid-level, senior). | |
- Use your judgment to increase difficulty **slightly** if the candidate performs well, or simplify if they struggle — but never drop below the expected baseline for their level. | |
- Avoid asking extremely difficult questions to junior candidates unless they’ve clearly demonstrated advanced knowledge. | |
- Be fair: candidates for the same role should be evaluated within a consistent difficulty range. | |
- Adapt your line of questioning gradually and logically based on the **overall flow**, not just the last answer. | |
- Include real-world problem-solving scenarios to test how the candidate thinks and behaves practically. | |
- You must **lead** the interview and make intelligent decisions about what to ask next. | |
--- | |
Context Use: | |
{context_instruction} | |
Note: | |
If no relevant context was retrieved or the previous answer is unclear, you must still generate a thoughtful interview question using your own knowledge. Do not skip generation. Avoid default or fallback responses — always try to generate a meaningful and fair next question. | |
--- | |
Job Role: {job_role} | |
Seniority Level: {seniority} | |
Skills Focus: {skills} | |
Difficulty Setting: {difficulty} (based on {difficulty_adjustment}) | |
--- | |
Recent Conversation History: | |
{history} | |
Candidate's Last Response: | |
"{user_response}" | |
Evaluation of Last Response: | |
{response_evaluation} | |
Voice Tone: {voice_label} | |
--- | |
--- | |
Important: | |
If no relevant context was retrieved or the previous answer is unclear or off-topic, | |
you must still generate a meaningful and fair interview question using your own knowledge and best practices. | |
Do not skip question generation or fall back to default/filler responses. | |
--- | |
Guidelines for Next Question: | |
- If this is the beginning of the interview, start with a question about the candidate’s background or experience. | |
- Base the difficulty primarily on the seniority level, with light adjustment from recent performance. | |
- Focus on core skills, real-world applications, and depth of reasoning. | |
- Ask only one question. Be clear and concise. | |
Generate the next interview question now: | |
""" | |
# Calculate difficulty phrase | |
if difficulty_adjustment == "harder": | |
difficulty = f"slightly more challenging than typical for {seniority}" | |
elif difficulty_adjustment == "easier": | |
difficulty = f"slightly easier than typical for {seniority}" | |
else: | |
difficulty = f"appropriate for {seniority}" | |
# Choose context logic | |
if context.strip(): | |
context_instruction = ( | |
"Use both your own expertise and the provided context from relevant interview datasets. " | |
"You can either build on questions from the dataset or generate your own." | |
) | |
context = context.strip() | |
else: | |
context_instruction = ( | |
"No specific context retrieved. Use your own knowledge and best practices to craft a question." | |
) | |
context = "" # Let it be actually empty! | |
# Format conversation history (last 6 exchanges max) | |
recent_history = conversation_history[-6:] if len(conversation_history) > 6 else conversation_history | |
formatted_history = "\n".join([f"{msg['role'].capitalize()}: {msg['content']}" for msg in recent_history]) | |
# Add evaluation summary if available | |
if conversation_history and conversation_history[-1].get("evaluation"): | |
eval_data = conversation_history[-1]["evaluation"][-1] | |
response_evaluation = f""" | |
- Score: {eval_data.get('Score', 'N/A')} | |
- Reasoning: {eval_data.get('Reasoning', 'N/A')} | |
- Improvements: {eval_data.get('Improvements', 'N/A')} | |
""" | |
else: | |
response_evaluation = "No evaluation available yet." | |
# Fill the template | |
prompt = interview_template.format( | |
job_role=job_role, | |
seniority=seniority, | |
skills=skills, | |
difficulty=difficulty, | |
difficulty_adjustment=difficulty_adjustment if difficulty_adjustment else "default seniority", | |
context_instruction=context_instruction, | |
context=context, | |
history=formatted_history, | |
user_response=user_response, | |
response_evaluation=response_evaluation.strip(), | |
voice_label=voice_label or "unknown", | |
) | |
return prompt | |
def generate_llm_interview_report( | |
interview_state, logged_samples, job_role, seniority | |
): | |
from collections import Counter | |
# Helper for converting score to 1–5 | |
def score_label(label): | |
mapping = { | |
"confident": 5, "calm": 4, "neutral": 3, "nervous": 2, "anxious": 1, "unknown": 3 | |
} | |
return mapping.get(label.lower(), 3) | |
def section_score(vals): | |
return round(sum(vals)/len(vals), 2) if vals else "N/A" | |
# Aggregate info | |
scores, voice_conf, face_conf, comm_scores = [], [], [], [] | |
tech_details, comm_details, emotion_details, relevance_details, problem_details = [], [], [], [], [] | |
for entry in logged_samples: | |
answer_eval = entry.get("answer_evaluation", {}) | |
score = answer_eval.get("Score", "Not Evaluated") | |
reasoning = answer_eval.get("Reasoning", "") | |
if score.lower() in ["excellent", "good", "medium", "poor"]: | |
score_map = {"excellent": 5, "good": 4, "medium": 3, "poor": 2} | |
scores.append(score_map[score.lower()]) | |
# Section details | |
tech_details.append(reasoning) | |
comm_details.append(reasoning) | |
# Emotions/confidence | |
voice_conf.append(score_label(entry.get("voice_label", "unknown"))) | |
face_conf.append(score_label(entry.get("face_label", "unknown"))) | |
# Communication estimate | |
if entry["user_answer"]: | |
length = len(entry["user_answer"].split()) | |
comm_score = min(5, max(2, length // 30)) | |
comm_scores.append(comm_score) | |
# Compute averages for sections | |
avg_problem = section_score(scores) | |
avg_tech = section_score(scores) | |
avg_comm = section_score(comm_scores) | |
avg_emotion = section_score([(v+f)/2 for v, f in zip(voice_conf, face_conf)]) | |
# Compute decision heuristics | |
section_averages = [avg_problem, avg_tech, avg_comm, avg_emotion] | |
numeric_avgs = [v for v in section_averages if isinstance(v, (float, int))] | |
avg_overall = round(sum(numeric_avgs) / len(numeric_avgs), 2) if numeric_avgs else 0 | |
# Hiring logic (you can customize thresholds) | |
if avg_overall >= 4.5: | |
verdict = "Strong Hire" | |
elif avg_overall >= 4.0: | |
verdict = "Hire" | |
elif avg_overall >= 3.0: | |
verdict = "Conditional Hire" | |
else: | |
verdict = "No Hire" | |
# Build LLM report prompt | |
transcript = "\n\n".join([ | |
f"Q: {e['generated_question']}\nA: {e['user_answer']}\nScore: {e.get('answer_evaluation',{}).get('Score','')}\nReasoning: {e.get('answer_evaluation',{}).get('Reasoning','')}" | |
for e in logged_samples | |
]) | |
prompt = f""" | |
You are a senior technical interviewer at a major tech company. | |
Write a structured, realistic hiring report for this {seniority} {job_role} interview, using these section scores (scale 1–5, with 5 best): | |
Section-wise Evaluation | |
1. *Problem Solving & Critical Thinking*: {avg_problem} | |
2. *Technical Depth & Knowledge*: {avg_tech} | |
3. *Communication & Clarity*: {avg_comm} | |
4. *Emotional Composure & Confidence*: {avg_emotion} | |
5. *Role Relevance*: 5 | |
*Transcript* | |
{transcript} | |
Your report should have the following sections: | |
1. *Executive Summary* (realistic, hiring-committee style) | |
2. *Section-wise Comments* (for each numbered category above, with short paragraph citing specifics) | |
3. *Strengths & Weaknesses* (list at least 2 for each) | |
4. *Final Verdict*: {verdict} | |
5. *Recommendations* (2–3 for future improvement) | |
Use realistic language. If some sections are N/A or lower than others, comment honestly. | |
Interview Report: | |
""" | |
# LLM call, or just return prompt for review | |
return groq_llm.predict(prompt) | |
def get_user_info(): | |
""" | |
Collects essential information from the candidate before starting the interview. | |
Returns a dictionary with keys: name, job_role, seniority, skills | |
""" | |
import logging | |
logging.info("Collecting user information...") | |
print("Welcome to the AI Interview Simulator!") | |
print("Let’s set up your mock interview.\n") | |
# Get user name | |
name = input("What is your name? ").strip() | |
while not name: | |
print("Please enter your name.") | |
name = input("What is your name? ").strip() | |
# Get job role | |
job_role = input(f"Hi {name}, what job role are you preparing for? (e.g. Frontend Developer) ").strip() | |
while not job_role: | |
print("Please specify the job role.") | |
job_role = input("What job role are you preparing for? ").strip() | |
# Get seniority level | |
seniority_options = ["Entry-level", "Junior", "Mid-Level", "Senior", "Lead"] | |
print("\nSelect your experience level:") | |
for i, option in enumerate(seniority_options, 1): | |
print(f"{i}. {option}") | |
seniority_choice = None | |
while seniority_choice not in range(1, len(seniority_options)+1): | |
try: | |
seniority_choice = int(input("Enter the number corresponding to your level: ")) | |
except ValueError: | |
print(f"Please enter a number between 1 and {len(seniority_options)}") | |
seniority = seniority_options[seniority_choice - 1] | |
# Get skills | |
skills_input = input(f"\nWhat are your top skills relevant to {job_role}? (Separate with commas): ") | |
skills = [skill.strip() for skill in skills_input.split(",") if skill.strip()] | |
while not skills: | |
print("Please enter at least one skill.") | |
skills_input = input("Your top skills (comma-separated): ") | |
skills = [skill.strip() for skill in skills_input.split(",") if skill.strip()] | |
# Confirm collected info | |
print("\n Interview Setup Complete!") | |
print(f"Name: {name}") | |
print(f"Job Role: {job_role}") | |
print(f"Experience Level: {seniority}") | |
print(f"Skills: {', '.join(skills)}") | |
print("\nStarting your mock interview...\n") | |
return { | |
"name": name, | |
"job_role": job_role, | |
"seniority": seniority, | |
"skills": skills | |
} | |
import threading | |
def wait_for_user_response(timeout=200): | |
"""Wait for user input with timeout. Returns '' if no response.""" | |
user_input = [] | |
def get_input(): | |
answer = input("Your Answer (within timeout): ").strip() | |
user_input.append(answer) | |
thread = threading.Thread(target=get_input) | |
thread.start() | |
thread.join(timeout) | |
return user_input[0] if user_input else "" | |
import json | |
from datetime import datetime | |
from time import time | |
import random | |
def interview_loop(max_questions, timeout_seconds=300, collection_name="interview_questions", judge_pipeline=None, save_path="interview_log.json"): | |
user_info = get_user_info() | |
job_role = user_info['job_role'] | |
seniority = user_info['seniority'] | |
skills = user_info['skills'] | |
all_roles = extract_all_roles_from_qdrant(collection_name=collection_name) | |
retrieved_data = retrieve_interview_data(job_role, all_roles) | |
context_data = random_context_chunks(retrieved_data, k=4) | |
conversation_history = [] | |
interview_state = { | |
"questions": [], | |
"user_answer": [], | |
"job_role": job_role, | |
"seniority": seniority, | |
"start_time": time() | |
} | |
# Store log for evaluation | |
logged_samples = [] | |
difficulty_adjustment = None | |
for i in range(max_questions): | |
last_user_response = conversation_history[-1]['content'] if conversation_history else "" | |
# Generate question prompt | |
prompt = build_interview_prompt( | |
conversation_history=conversation_history, | |
user_response=last_user_response, | |
context=context_data, | |
job_role=job_role, | |
skills=skills, | |
seniority=seniority, | |
difficulty_adjustment=difficulty_adjustment | |
) | |
question = groq_llm.predict(prompt) | |
question_eval = eval_question_quality(question, job_role, seniority) | |
conversation_history.append({'role': "Interviewer", "content": question}) | |
print(f"Interviewer: Q{i + 1} : {question}") | |
# Wait for user answer | |
start_time = time() | |
user_answer = wait_for_user_response(timeout=timeout_seconds) | |
response_time = time() - start_time | |
skipped = False | |
answer_eval = None | |
ref_answer = None | |
if not user_answer: | |
print("No Response Received, moving to next question.") | |
user_answer = None | |
skipped = True | |
difficulty_adjustment = "medium" | |
else: | |
conversation_history.append({"role": "Candidate", "content": user_answer}) | |
ref_answer = generate_reference_answer(question, job_role, seniority) | |
answer_eval = evaluate_answer( | |
question=question, | |
answer=user_answer, | |
ref_answer=ref_answer, | |
job_role=job_role, | |
seniority=seniority, | |
judge_pipeline=judge_pipeline | |
) | |
interview_state["user_answer"].append(user_answer) | |
# Append inline evaluation for history | |
conversation_history[-1].setdefault('evaluation', []).append({ | |
"technical_depth": { | |
"score": answer_eval['Score'], | |
"Reasoning": answer_eval['Reasoning'] | |
} | |
}) | |
# Adjust difficulty | |
score = answer_eval['Score'].lower() | |
if score == "excellent": | |
difficulty_adjustment = "harder" | |
elif score in ['poor', 'medium']: | |
difficulty_adjustment = "easier" | |
else: | |
difficulty_adjustment = None | |
# Store for local logging | |
logged_samples.append({ | |
"job_role": job_role, | |
"seniority": seniority, | |
"skills": skills, | |
"context": context_data, | |
"prompt": prompt, | |
"generated_question": question, | |
"question_evaluation": question_eval, | |
"user_answer": user_answer, | |
"reference_answer": ref_answer, | |
"answer_evaluation": answer_eval, | |
"skipped": skipped | |
}) | |
# Store state | |
interview_state['questions'].append({ | |
"question": question, | |
"question_evaluation": question_eval, | |
"user_answer": user_answer, | |
"answer_evaluation": answer_eval, | |
"skipped": skipped | |
}) | |
interview_state['end_time'] = time() | |
report = generate_llm_interview_report(interview_state, job_role, seniority) | |
print("Report : _____________________\n") | |
print(report) | |
print('______________________________________________') | |
# Save full interview logs to JSON | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
filename = f"{save_path.replace('.json', '')}_{timestamp}.json" | |
with open(filename, "w", encoding="utf-8") as f: | |
json.dump(logged_samples, f, indent=2, ensure_ascii=False) | |
print(f" Interview log saved to {filename}") | |
print("____________________________________\n") | |
print(f"interview state : {interview_state}") | |
return interview_state, report | |
from sklearn.metrics import precision_score, recall_score, f1_score | |
import numpy as np | |
# build ground truth for retrieving data for testing | |
def build_ground_truth(all_roles): | |
gt = {} | |
for role in all_roles: | |
qa_list = get_role_questions(role) | |
gt[role] = set(q["question"] for q in qa_list if q["question"]) | |
return gt | |
def evaluate_retrieval(job_role, all_roles, k=10): | |
""" | |
Evaluate retrieval quality using Precision@k, Recall@k, and F1@k. | |
Args: | |
job_role (str): The input job role to search for. | |
all_roles (list): List of all available job roles in the system. | |
k (int): Top-k retrieved questions to evaluate. | |
Returns: | |
dict: Evaluation metrics including precision, recall, and f1. | |
""" | |
# Step 1: Ground Truth (all exact questions stored for this role) | |
ground_truth_qs = set( | |
q["question"].strip() | |
for q in get_role_questions(job_role) | |
if q.get("question") | |
) | |
if not ground_truth_qs: | |
print(f"[!] No ground truth found for role: {job_role}") | |
return {} | |
# Step 2: Retrieved Questions (may include fallback roles) | |
retrieved_qas = retrieve_interview_data(job_role, all_roles) | |
retrieved_qs = [q["question"].strip() for q in retrieved_qas if q.get("question")] | |
# Step 3: Take top-k retrieved (you can also do full if needed) | |
retrieved_top_k = retrieved_qs[:k] | |
# Step 4: Binary relevance (1 if in ground truth, 0 if not) | |
y_true = [1 if q in ground_truth_qs else 0 for q in retrieved_top_k] | |
y_pred = [1] * len(y_true) # all retrieved are treated as predicted relevant | |
precision = precision_score(y_true, y_pred, zero_division=0) | |
recall = recall_score(y_true, y_pred, zero_division=0) | |
f1 = f1_score(y_true, y_pred, zero_division=0) | |
print(f" Retrieval Evaluation for role: '{job_role}' (Top-{k})") | |
print(f"Precision@{k}: {precision:.2f}") | |
print(f"Recall@{k}: {recall:.2f}") | |
print(f"F1@{k}: {f1:.2f}") | |
print(f"Relevant Retrieved: {sum(y_true)}/{len(y_true)}") | |
print("–" * 40) | |
return { | |
"job_role": job_role, | |
"precision": precision, | |
"recall": recall, | |
"f1": f1, | |
"relevant_retrieved": sum(y_true), | |
"total_retrieved": len(y_true), | |
"ground_truth_count": len(ground_truth_qs), | |
} | |
k_values = [5, 10, 20] | |
all_roles = extract_all_roles_from_qdrant(collection_name="interview_questions") | |
results = [] | |
for k in k_values: | |
for role in all_roles: | |
metrics = evaluate_retrieval(role, all_roles, k=k) | |
if metrics: # only if we found ground truth | |
metrics["k"] = k | |
results.append(metrics) | |
import pandas as pd | |
df = pd.DataFrame(results) | |
summary = df.groupby("k")[["precision", "recall", "f1"]].mean().round(3) | |
print(summary) | |
def extract_job_details(job_description): | |
"""Extract job details such as title, skills, experience level, and years of experience from the job description.""" | |
title_match = re.search(r"(?i)(?:seeking|hiring) a (.+?) to", job_description) | |
job_title = title_match.group(1) if title_match else "Unknown" | |
skills_match = re.findall(r"(?i)(?:Proficiency in|Experience with|Knowledge of) (.+?)(?:,|\.| and| or)", job_description) | |
skills = list(set([skill.strip() for skill in skills_match])) if skills_match else [] | |
experience_match = re.search(r"(\d+)\+? years of experience", job_description) | |
if experience_match: | |
years_experience = int(experience_match.group(1)) | |
experience_level = "Senior" if years_experience >= 5 else "Mid" if years_experience >= 3 else "Junior" | |
else: | |
years_experience = None | |
experience_level = "Unknown" | |
return { | |
"job_title": job_title, | |
"skills": skills, | |
"experience_level": experience_level, | |
"years_experience": years_experience | |
} | |
import re | |
from docx import Document | |
import textract | |
from PyPDF2 import PdfReader | |
JOB_TITLES = [ | |
"Accountant", "Data Scientist", "Machine Learning Engineer", "Software Engineer", | |
"Developer", "Analyst", "Researcher", "Intern", "Consultant", "Manager", | |
"Engineer", "Specialist", "Project Manager", "Product Manager", "Administrator", | |
"Director", "Officer", "Assistant", "Coordinator", "Supervisor" | |
] | |
def clean_filename_name(filename): | |
# Remove file extension | |
base = re.sub(r"\.[^.]+$", "", filename) | |
base = base.strip() | |
# Remove 'cv' or 'CV' words | |
base_clean = re.sub(r"\bcv\b", "", base, flags=re.IGNORECASE).strip() | |
# If after removing CV it's empty, return None | |
if not base_clean: | |
return None | |
# If it contains any digit, return None (unreliable) | |
if re.search(r"\d", base_clean): | |
return None | |
# Replace underscores/dashes with spaces, capitalize | |
base_clean = base_clean.replace("_", " ").replace("-", " ") | |
return base_clean.title() | |
def looks_like_job_title(line): | |
for title in JOB_TITLES: | |
pattern = r"\b" + re.escape(title.lower()) + r"\b" | |
if re.search(pattern, line.lower()): | |
return True | |
return False | |
def extract_name_from_text(lines): | |
# Try first 3 lines for a name, skipping job titles | |
for i in range(min(1, len(lines))): | |
line = lines[i].strip() | |
if looks_like_job_title(line): | |
return "unknown" | |
if re.search(r"\d", line): # skip lines with digits | |
continue | |
if len(line.split()) > 4 or len(line) > 40: # too long or many words | |
continue | |
# If line has only uppercase words, it's probably not a name | |
if line.isupper(): | |
continue | |
# Passed checks, return title-cased line as name | |
return line.title() | |
return None | |
def extract_text_from_file(file_path): | |
if file_path.endswith('.pdf'): | |
reader = PdfReader(file_path) | |
text = "\n".join(page.extract_text() or '' for page in reader.pages) | |
elif file_path.endswith('.docx'): | |
doc = Document(file_path) | |
text = "\n".join([para.text for para in doc.paragraphs]) | |
else: # For .doc or fallback | |
text = textract.process(file_path).decode('utf-8') | |
return text.strip() | |
def extract_candidate_details(file_path): | |
text = extract_text_from_file(file_path) | |
lines = [line.strip() for line in text.splitlines() if line.strip()] | |
# Extract name | |
filename = file_path.split("/")[-1] # just filename, no path | |
name = clean_filename_name(filename) | |
if not name: | |
name = extract_name_from_text(lines) | |
if not name: | |
name = "Unknown" | |
# Extract skills (basic version) | |
skills = [] | |
skills_section = re.search(r"Skills\s*[:\-]?\s*(.+)", text, re.IGNORECASE) | |
if skills_section: | |
raw_skills = skills_section.group(1) | |
skills = [s.strip() for s in re.split(r",|\n|•|-", raw_skills) if s.strip()] | |
return { | |
"name": name, | |
"skills": skills | |
} | |
# import gradio as gr | |
# import time | |
# import tempfile | |
# import numpy as np | |
# import scipy.io.wavfile as wavfile | |
# import os | |
# import json | |
# from transformers import BarkModel, AutoProcessor | |
# import torch, gc | |
# import whisper | |
# from transformers import Wav2Vec2Processor, Wav2Vec2ForSequenceClassification | |
# import librosa | |
# import torch | |
# print(torch.cuda.is_available()) # ✅ Tells you if GPU is available | |
# torch.cuda.empty_cache() | |
# gc.collect() | |
# # Bark TTS | |
# print("🔁 Loading Bark model...") | |
# model_bark = BarkModel.from_pretrained("suno/bark").to("cuda" if torch.cuda.is_available() else "cpu") | |
# print("✅ Bark model loaded") | |
# print("🔁 Loading Bark processor...") | |
# processor_bark = AutoProcessor.from_pretrained("suno/bark") | |
# print("✅ Bark processor loaded") | |
# bark_voice_preset = "v2/en_speaker_5" | |
# def bark_tts(text): | |
# print(f"🔁 Synthesizing TTS for: {text}") | |
# # Process the text | |
# inputs = processor_bark(text, return_tensors="pt", voice_preset=bark_voice_preset) | |
# # Move tensors to device | |
# input_ids = inputs["input_ids"].to(model_bark.device) | |
# start = time.time() | |
# # Generate speech with only the required parameters | |
# with torch.no_grad(): | |
# speech_values = model_bark.generate( | |
# input_ids=input_ids, | |
# do_sample=True, | |
# fine_temperature=0.4, | |
# coarse_temperature=0.8 | |
# ) | |
# print(f"✅ Bark finished in {round(time.time() - start, 2)}s") | |
# # Convert to audio | |
# speech = speech_values.cpu().numpy().squeeze() | |
# speech = (speech * 32767).astype(np.int16) | |
# temp_wav = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") | |
# wavfile.write(temp_wav.name, 22050, speech) | |
# return temp_wav.name | |
# # Whisper STT | |
# print("🔁 Loading Whisper model...") | |
# whisper_model = whisper.load_model("base", device="cuda") | |
# print("✅ Whisper model loaded") | |
# def whisper_stt(audio_path): | |
# if not audio_path or not os.path.exists(audio_path): return "" | |
# result = whisper_model.transcribe(audio_path) | |
# return result["text"] | |
# seniority_mapping = { | |
# "Entry-level": 1, "Junior": 2, "Mid-Level": 3, "Senior": 4, "Lead": 5 | |
# } | |
# # --- 2. Gradio App --- | |
# with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
# user_data = gr.State({}) | |
# interview_state = gr.State({}) | |
# missing_fields_state = gr.State([]) | |
# # --- UI Layout --- | |
# with gr.Column(visible=True) as user_info_section: | |
# gr.Markdown("## Candidate Information") | |
# cv_file = gr.File(label="Upload CV") | |
# job_desc = gr.Textbox(label="Job Description") | |
# start_btn = gr.Button("Continue", interactive=False) | |
# with gr.Column(visible=False) as missing_section: | |
# gr.Markdown("## Missing Information") | |
# name_in = gr.Textbox(label="Name", visible=False) | |
# role_in = gr.Textbox(label="Job Role", visible=False) | |
# seniority_in = gr.Dropdown(list(seniority_mapping.keys()), label="Seniority", visible=False) | |
# skills_in = gr.Textbox(label="Skills", visible=False) | |
# submit_btn = gr.Button("Submit", interactive=False) | |
# with gr.Column(visible=False) as interview_pre_section: | |
# pre_interview_greeting_md = gr.Markdown() | |
# start_interview_final_btn = gr.Button("Start Interview") | |
# with gr.Column(visible=False) as interview_section: | |
# gr.Markdown("## Interview in Progress") | |
# question_audio = gr.Audio(label="Listen", interactive=False, autoplay=True) | |
# question_text = gr.Markdown() | |
# user_audio_input = gr.Audio(sources=["microphone"], type="filepath", label="1. Record Audio Answer") | |
# stt_transcript = gr.Textbox(label="Transcribed Answer (edit if needed)") | |
# confirm_btn = gr.Button("Confirm Answer") | |
# evaluation_display = gr.Markdown() | |
# interview_summary = gr.Markdown(visible=False) | |
# # --- UI Logic --- | |
# def validate_start_btn(cv_file, job_desc): | |
# return gr.update(interactive=(cv_file is not None and hasattr(cv_file, "name") and bool(job_desc and job_desc.strip()))) | |
# cv_file.change(validate_start_btn, [cv_file, job_desc], start_btn) | |
# job_desc.change(validate_start_btn, [cv_file, job_desc], start_btn) | |
# def process_and_route_initial(cv_file, job_desc): | |
# details = extract_candidate_details(cv_file.name) | |
# job_info = extract_job_details(job_desc) | |
# data = { | |
# "name": details.get("name", "unknown"), "job_role": job_info.get("job_title", "unknown"), | |
# "seniority": job_info.get("experience_level", "unknown"), "skills": job_info.get("skills", []) | |
# } | |
# missing = [k for k, v in data.items() if (isinstance(v, str) and v.lower() == "unknown") or not v] | |
# if missing: | |
# return data, missing, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False) | |
# else: | |
# greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' when ready." | |
# return data, missing, gr.update(visible=False), gr.update(visible=False), gr.update(visible=True, value=greeting) | |
# start_btn.click( | |
# process_and_route_initial, | |
# [cv_file, job_desc], | |
# [user_data, missing_fields_state, user_info_section, missing_section, pre_interview_greeting_md] | |
# ) | |
# def show_missing(missing): | |
# if missing is None: missing = [] | |
# return gr.update(visible="name" in missing), gr.update(visible="job_role" in missing), gr.update(visible="seniority" in missing), gr.update(visible="skills" in missing) | |
# missing_fields_state.change(show_missing, missing_fields_state, [name_in, role_in, seniority_in, skills_in]) | |
# def validate_fields(name, role, seniority, skills, missing): | |
# if not missing: return gr.update(interactive=False) | |
# all_filled = all([(not ("name" in missing) or bool(name.strip())), (not ("job_role" in missing) or bool(role.strip())), (not ("seniority" in missing) or bool(seniority)), (not ("skills" in missing) or bool(skills.strip())),]) | |
# return gr.update(interactive=all_filled) | |
# for inp in [name_in, role_in, seniority_in, skills_in]: | |
# inp.change(validate_fields, [name_in, role_in, seniority_in, skills_in, missing_fields_state], submit_btn) | |
# def complete_manual(data, name, role, seniority, skills): | |
# if data["name"].lower() == "unknown": data["name"] = name | |
# if data["job_role"].lower() == "unknown": data["job_role"] = role | |
# if data["seniority"].lower() == "unknown": data["seniority"] = seniority | |
# if not data["skills"]: data["skills"] = [s.strip() for s in skills.split(",")] | |
# greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' to begin." | |
# return data, gr.update(visible=False), gr.update(visible=True), gr.update(value=greeting) | |
# submit_btn.click(complete_manual, [user_data, name_in, role_in, seniority_in, skills_in], [user_data, missing_section, interview_pre_section, pre_interview_greeting_md]) | |
# def start_interview(data): | |
# # --- Advanced state with full logging --- | |
# state = { | |
# "questions": [], "answers": [], "face_labels": [], "voice_labels": [], "timings": [], | |
# "question_evaluations": [], "answer_evaluations": [], "effective_confidences": [], | |
# "conversation_history": [], | |
# "difficulty_adjustment": None, | |
# "question_idx": 0, "max_questions": 3, "q_start_time": time.time(), | |
# "log": [] | |
# } | |
# # --- Optionally: context retrieval here (currently just blank) --- | |
# context = "" | |
# prompt = build_interview_prompt( | |
# conversation_history=[], user_response="", context=context, job_role=data["job_role"], | |
# skills=data["skills"], seniority=data["seniority"], difficulty_adjustment=None, | |
# voice_label="neutral", face_label="neutral" | |
# ) | |
# #here the original one | |
# # first_q = groq_llm.predict(prompt) | |
# # # Evaluate Q for quality | |
# # q_eval = eval_question_quality(first_q, data["job_role"], data["seniority"], None) | |
# # state["questions"].append(first_q) | |
# # state["question_evaluations"].append(q_eval) | |
# #here the testing one | |
# first_q = groq_llm.predict(prompt) | |
# q_eval = { | |
# "Score": "N/A", | |
# "Reasoning": "Skipped to reduce processing time", | |
# "Improvements": [] | |
# } | |
# state["questions"].append(first_q) | |
# state["question_evaluations"].append(q_eval) | |
# state["conversation_history"].append({'role': 'Interviewer', 'content': first_q}) | |
# start = time.perf_counter() | |
# audio_path = bark_tts(first_q) | |
# print("⏱️ Bark TTS took", time.perf_counter() - start, "seconds") | |
# # LOG | |
# state["log"].append({"type": "question", "question": first_q, "question_eval": q_eval, "timestamp": time.time()}) | |
# return state, gr.update(visible=False), gr.update(visible=True), audio_path, f"*Question 1:* {first_q}" | |
# start_interview_final_btn.click(start_interview, [user_data], [interview_state, interview_pre_section, interview_section, question_audio, question_text]) | |
# def transcribe(audio_path): | |
# return whisper_stt(audio_path) | |
# user_audio_input.change(transcribe, user_audio_input, stt_transcript) | |
# def process_answer(transcript, audio_path, state, data): | |
# if not transcript: | |
# return state, gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update() | |
# elapsed = round(time.time() - state.get("q_start_time", time.time()), 2) | |
# state["timings"].append(elapsed) | |
# state["answers"].append(transcript) | |
# state["conversation_history"].append({'role': 'Candidate', 'content': transcript}) | |
# # --- 1. Emotion analysis (simplified for testing) --- | |
# voice_label = "neutral" | |
# face_label = "neutral" | |
# state["voice_labels"].append(voice_label) | |
# state["face_labels"].append(face_label) | |
# # --- 2. Evaluate previous Q and Answer --- | |
# last_q = state["questions"][-1] | |
# q_eval = state["question_evaluations"][-1] # Already in state | |
# ref_answer = generate_reference_answer(last_q, data["job_role"], data["seniority"]) | |
# answer_eval = evaluate_answer(last_q, transcript, ref_answer, data["job_role"], data["seniority"], None) | |
# state["answer_evaluations"].append(answer_eval) | |
# answer_score = answer_eval.get("Score", "medium") if answer_eval else "medium" | |
# # --- 3. Adaptive difficulty --- | |
# if answer_score == "excellent": | |
# state["difficulty_adjustment"] = "harder" | |
# elif answer_score in ("medium", "poor"): | |
# state["difficulty_adjustment"] = "easier" | |
# else: | |
# state["difficulty_adjustment"] = None | |
# # --- 4. Effective confidence (simplified) --- | |
# eff_conf = {"effective_confidence": 0.6} | |
# state["effective_confidences"].append(eff_conf) | |
# # --- LOG --- | |
# state["log"].append({ | |
# "type": "answer", | |
# "question": last_q, | |
# "answer": transcript, | |
# "answer_eval": answer_eval, | |
# "ref_answer": ref_answer, | |
# "face_label": face_label, | |
# "voice_label": voice_label, | |
# "effective_confidence": eff_conf, | |
# "timing": elapsed, | |
# "timestamp": time.time() | |
# }) | |
# # --- Next or End --- | |
# qidx = state["question_idx"] + 1 | |
# if qidx >= state["max_questions"]: | |
# # Save as JSON (optionally) | |
# timestamp = time.strftime("%Y%m%d_%H%M%S") | |
# log_file = f"interview_log_{timestamp}.json" | |
# with open(log_file, "w", encoding="utf-8") as f: | |
# json.dump(state["log"], f, indent=2, ensure_ascii=False) | |
# # Report | |
# summary = "# Interview Summary\n" | |
# for i, q in enumerate(state["questions"]): | |
# summary += (f"\n### Q{i + 1}: {q}\n" | |
# f"- *Answer*: {state['answers'][i]}\n" | |
# f"- *Q Eval*: {state['question_evaluations'][i]}\n" | |
# f"- *A Eval*: {state['answer_evaluations'][i]}\n" | |
# f"- *Time*: {state['timings'][i]}s\n") | |
# summary += f"\n\n⏺ Full log saved as {log_file}." | |
# return (state, gr.update(visible=True, value=summary), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(visible=True, value=f"Last Detected — Face: {face_label}, Voice: {voice_label}")) | |
# else: | |
# # --- Build next prompt using adaptive difficulty --- | |
# state["question_idx"] = qidx | |
# state["q_start_time"] = time.time() | |
# context = "" # You can add your context logic here | |
# prompt = build_interview_prompt( | |
# conversation_history=state["conversation_history"], | |
# user_response=transcript, | |
# context=context, | |
# job_role=data["job_role"], | |
# skills=data["skills"], | |
# seniority=data["seniority"], | |
# difficulty_adjustment=state["difficulty_adjustment"], | |
# voice_label=voice_label, | |
# ) | |
# next_q = groq_llm.predict(prompt) | |
# # Evaluate Q quality | |
# q_eval = eval_question_quality(next_q, data["job_role"], data["seniority"], None) | |
# state["questions"].append(next_q) | |
# state["question_evaluations"].append(q_eval) | |
# state["conversation_history"].append({'role': 'Interviewer', 'content': next_q}) | |
# state["log"].append({"type": "question", "question": next_q, "question_eval": q_eval, "timestamp": time.time()}) | |
# audio_path = bark_tts(next_q) | |
# # Display evaluations | |
# eval_md = f"*Last Answer Eval:* {answer_eval}\n\n*Effective Confidence:* {eff_conf}" | |
# return ( | |
# state, gr.update(visible=False), audio_path, f"*Question {qidx + 1}:* {next_q}", | |
# gr.update(value=None), gr.update(value=None), | |
# gr.update(visible=True, value=eval_md), | |
# ) | |
# # Replace your confirm_btn.click with this: | |
# confirm_btn.click( | |
# process_answer, | |
# [stt_transcript, user_audio_input, interview_state, user_data], # Added None for video_path | |
# [interview_state, interview_summary, question_audio, question_text, user_audio_input, stt_transcript, evaluation_display] | |
# ).then( | |
# lambda: (gr.update(value=None), gr.update(value=None)), None, [user_audio_input, stt_transcript] | |
# ) | |
# demo.launch(debug=True) | |
import gradio as gr | |
import time | |
import tempfile | |
import numpy as np | |
import scipy.io.wavfile as wavfile | |
import os | |
import json | |
import edge_tts | |
import torch, gc | |
from faster_whisper import WhisperModel | |
import asyncio | |
import threading | |
from concurrent.futures import ThreadPoolExecutor | |
print(torch.cuda.is_available()) | |
torch.cuda.empty_cache() | |
gc.collect() | |
# Global variables for lazy loading | |
faster_whisper_model = None | |
tts_voice = "en-US-AriaNeural" | |
# Thread pool for async operations | |
executor = ThreadPoolExecutor(max_workers=2) | |
# Add after your imports | |
if torch.cuda.is_available(): | |
print(f"🔥 CUDA Available: {torch.cuda.get_device_name(0)}") | |
print(f"🔥 CUDA Memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB") | |
# Set default device | |
torch.cuda.set_device(0) | |
else: | |
print("⚠️ CUDA not available, using CPU") | |
def load_models_lazy(): | |
"""Load models only when needed""" | |
global faster_whisper_model | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
print(f"🔁 Using device: {device}") | |
if faster_whisper_model is None: | |
print("🔁 Loading Faster-Whisper model...") | |
compute_type = "float16" if device == "cuda" else "int8" | |
faster_whisper_model = WhisperModel("base", device=device, compute_type=compute_type) | |
print(f"✅ Faster-Whisper model loaded on {device}") | |
async def edge_tts_to_file(text, output_path="tts.wav", voice=tts_voice): | |
communicate = edge_tts.Communicate(text, voice) | |
await communicate.save(output_path) | |
return output_path | |
def tts_async(text): | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
return executor.submit(loop.run_until_complete, edge_tts_to_file(text)) | |
def whisper_stt(audio_path): | |
"""STT using Faster-Whisper""" | |
if not audio_path or not os.path.exists(audio_path): | |
return "" | |
load_models_lazy() | |
print("🔁 Transcribing with Faster-Whisper") | |
segments, _ = faster_whisper_model.transcribe(audio_path) | |
transcript = " ".join(segment.text for segment in segments) | |
return transcript.strip() | |
seniority_mapping = { | |
"Entry-level": 1, "Junior": 2, "Mid-Level": 3, "Senior": 4, "Lead": 5 | |
} | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
user_data = gr.State({}) | |
interview_state = gr.State({}) | |
missing_fields_state = gr.State([]) | |
tts_future = gr.State(None) # Store async TTS future | |
with gr.Column(visible=True) as user_info_section: | |
gr.Markdown("## Candidate Information") | |
cv_file = gr.File(label="Upload CV") | |
job_desc = gr.Textbox(label="Job Description") | |
start_btn = gr.Button("Continue", interactive=False) | |
with gr.Column(visible=False) as missing_section: | |
gr.Markdown("## Missing Information") | |
name_in = gr.Textbox(label="Name", visible=False) | |
role_in = gr.Textbox(label="Job Role", visible=False) | |
seniority_in = gr.Dropdown(list(seniority_mapping.keys()), label="Seniority", visible=False) | |
skills_in = gr.Textbox(label="Skills", visible=False) | |
submit_btn = gr.Button("Submit", interactive=False) | |
with gr.Column(visible=False) as interview_pre_section: | |
pre_interview_greeting_md = gr.Markdown() | |
start_interview_final_btn = gr.Button("Start Interview") | |
loading_status = gr.Markdown("", visible=False) | |
with gr.Column(visible=False) as interview_section: | |
gr.Markdown("## Interview in Progress") | |
question_audio = gr.Audio(label="Listen", interactive=False, autoplay=True) | |
question_text = gr.Markdown() | |
user_audio_input = gr.Audio(sources=["microphone"], type="filepath", label="1. Record Audio Answer") | |
stt_transcript = gr.Textbox(label="Transcribed Answer (edit if needed)") | |
confirm_btn = gr.Button("Confirm Answer") | |
evaluation_display = gr.Markdown() | |
interview_summary = gr.Markdown(visible=False) | |
def validate_start_btn(cv_file, job_desc): | |
return gr.update(interactive=(cv_file is not None and hasattr(cv_file, "name") and bool(job_desc and job_desc.strip()))) | |
cv_file.change(validate_start_btn, [cv_file, job_desc], start_btn) | |
job_desc.change(validate_start_btn, [cv_file, job_desc], start_btn) | |
def process_and_route_initial(cv_file, job_desc): | |
details = extract_candidate_details(cv_file.name) | |
job_info = extract_job_details(job_desc) | |
data = { | |
"name": details.get("name", "unknown"), | |
"job_role": job_info.get("job_title", "unknown"), | |
"seniority": job_info.get("experience_level", "unknown"), | |
"skills": job_info.get("skills", []) | |
} | |
missing = [k for k, v in data.items() if (isinstance(v, str) and v.lower() == "unknown") or not v] | |
if missing: | |
return data, missing, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False) | |
else: | |
greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' when ready." | |
return data, missing, gr.update(visible=False), gr.update(visible=False), gr.update(visible=True, value=greeting) | |
start_btn.click(process_and_route_initial, [cv_file, job_desc], [user_data, missing_fields_state, user_info_section, missing_section, pre_interview_greeting_md]) | |
def show_missing(missing): | |
if missing is None: missing = [] | |
return gr.update(visible="name" in missing), gr.update(visible="job_role" in missing), gr.update(visible="seniority" in missing), gr.update(visible="skills" in missing) | |
missing_fields_state.change(show_missing, missing_fields_state, [name_in, role_in, seniority_in, skills_in]) | |
def validate_fields(name, role, seniority, skills, missing): | |
if not missing: return gr.update(interactive=False) | |
all_filled = all([(not ("name" in missing) or bool(name.strip())), (not ("job_role" in missing) or bool(role.strip())), (not ("seniority" in missing) or bool(seniority)), (not ("skills" in missing) or bool(skills.strip()))]) | |
return gr.update(interactive=all_filled) | |
for inp in [name_in, role_in, seniority_in, skills_in]: | |
inp.change(validate_fields, [name_in, role_in, seniority_in, skills_in, missing_fields_state], submit_btn) | |
def complete_manual(data, name, role, seniority, skills): | |
if data["name"].lower() == "unknown": data["name"] = name | |
if data["job_role"].lower() == "unknown": data["job_role"] = role | |
if data["seniority"].lower() == "unknown": data["seniority"] = seniority | |
if not data["skills"]: data["skills"] = [s.strip() for s in skills.split(",")] | |
greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' to begin." | |
return data, gr.update(visible=False), gr.update(visible=True), gr.update(value=greeting) | |
submit_btn.click(complete_manual, [user_data, name_in, role_in, seniority_in, skills_in], [user_data, missing_section, interview_pre_section, pre_interview_greeting_md]) | |
async def start_interview(data): | |
# Initialize interview state | |
state = { | |
"questions": [], | |
"answers": [], | |
"timings": [], | |
"question_evaluations": [], | |
"answer_evaluations": [], | |
"conversation_history": [], | |
"difficulty_adjustment": None, | |
"question_idx": 0, | |
"max_questions": 3, | |
"q_start_time": time.time(), | |
"log": [] | |
} | |
# Build prompt for first question | |
context = "" | |
prompt = build_interview_prompt( | |
conversation_history=[], | |
user_response="", | |
context=context, | |
job_role=data["job_role"], | |
skills=data["skills"], | |
seniority=data["seniority"], | |
difficulty_adjustment=None, | |
voice_label="neutral" | |
) | |
# Generate first question | |
start = time.time() | |
first_q = groq_llm.predict(prompt) | |
print("⏱️ Groq LLM Response Time:", round(time.time() - start, 2), "seconds") | |
q_eval = { | |
"Score": "N/A", | |
"Reasoning": "Skipped to reduce processing time", | |
"Improvements": [] | |
} | |
state["questions"].append(first_q) | |
state["question_evaluations"].append(q_eval) | |
state["conversation_history"].append({'role': 'Interviewer', 'content': first_q}) | |
# Generate audio with Bark (wait for it) | |
start = time.perf_counter() | |
cleaned_text = first_q.strip().replace("\n", " ") | |
audio_path = await edge_tts_to_file(first_q) | |
print("⏱️ TTS (edge-tts) took", round(time.perf_counter() - start, 2), "seconds") | |
# Log question | |
state["log"].append({ | |
"type": "question", | |
"question": first_q, | |
"question_eval": q_eval, | |
"timestamp": time.time() | |
}) | |
return ( | |
state, | |
gr.update(visible=False), # Hide interview_pre_section | |
gr.update(visible=True), # Show interview_section | |
audio_path, # Set audio | |
f"*Question 1:* {first_q}" # Set question text | |
) | |
# Hook into Gradio | |
start_interview_final_btn.click( | |
fn=start_interview, | |
inputs=[user_data], | |
outputs=[interview_state, interview_pre_section, interview_section, question_audio, question_text], | |
concurrency_limit=1 | |
) | |
def transcribe(audio_path): | |
return whisper_stt(audio_path) | |
user_audio_input.change(transcribe, user_audio_input, stt_transcript) | |
async def process_answer(transcript, audio_path, state, data): | |
start = time.time() | |
if not transcript: | |
return state, gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update() | |
elapsed = round(time.time() - state.get("q_start_time", time.time()), 2) | |
state["timings"].append(elapsed) | |
state["answers"].append(transcript) | |
state["conversation_history"].append({'role': 'Candidate', 'content': transcript}) | |
last_q = state["questions"][-1] | |
q_eval = state["question_evaluations"][-1] | |
ref_answer = generate_reference_answer(last_q, data["job_role"], data["seniority"]) | |
answer_eval = await asyncio.get_event_loop().run_in_executor( | |
executor, | |
evaluate_answer, | |
last_q, transcript, ref_answer, data["job_role"], data["seniority"] | |
) | |
state["answer_evaluations"].append(answer_eval) | |
answer_score = answer_eval.get("Score", "medium") if answer_eval else "medium" | |
if answer_score == "excellent": | |
state["difficulty_adjustment"] = "harder" | |
elif answer_score in ("medium", "poor"): | |
state["difficulty_adjustment"] = "easier" | |
else: | |
state["difficulty_adjustment"] = None | |
state["log"].append({ | |
"type": "answer", "question": last_q, "answer": transcript, | |
"answer_eval": answer_eval, "ref_answer": ref_answer, | |
"timing": elapsed, "timestamp": time.time() | |
}) | |
qidx = state["question_idx"] + 1 | |
if qidx >= state["max_questions"]: | |
timestamp = time.strftime("%Y%m%d_%H%M%S") | |
log_file = f"interview_log_{timestamp}.json" | |
with open(log_file, "w", encoding="utf-8") as f: | |
json.dump(state["log"], f, indent=2, ensure_ascii=False) | |
summary = "# Interview Summary\n" | |
for i, q in enumerate(state["questions"]): | |
summary += (f"\n### Q{i + 1}: {q}\n" | |
f"- *Answer*: {state['answers'][i]}\n" | |
f"- *Q Eval*: {state['question_evaluations'][i]}\n" | |
f"- *A Eval*: {state['answer_evaluations'][i]}\n" | |
f"- *Time*: {state['timings'][i]}s\n") | |
summary += f"\n\n⏺ Full log saved as {log_file}." | |
return state, gr.update(visible=True, value=summary), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(visible=False) | |
else: | |
state["question_idx"] = qidx | |
state["q_start_time"] = time.time() | |
context = "" | |
prompt = build_interview_prompt( | |
conversation_history=state["conversation_history"], | |
user_response=transcript, context=context, | |
job_role=data["job_role"], skills=data["skills"], | |
seniority=data["seniority"], difficulty_adjustment=state["difficulty_adjustment"], | |
voice_label="neutral" | |
) | |
start = time.time() | |
next_q = groq_llm.predict(prompt) | |
print("⏱️ Groq LLM Response Time:", round(time.time() - start, 2), "seconds") | |
start = time.time() | |
q_eval_future = executor.submit( | |
eval_question_quality, | |
next_q, data["job_role"], data["seniority"] | |
) | |
q_eval = q_eval_future.result() | |
print("⏱️ Evaluation time:", round(time.time() - start, 2), "seconds") | |
state["questions"].append(next_q) | |
state["question_evaluations"].append(q_eval) | |
state["conversation_history"].append({'role': 'Interviewer', 'content': next_q}) | |
state["log"].append({"type": "question", "question": next_q, "question_eval": q_eval, "timestamp": time.time()}) | |
audio_path = await edge_tts_to_file(next_q) | |
eval_md = f"*Last Answer Eval:* {answer_eval}" | |
print("✅ process_answer time:", round(time.time() - start, 2), "s") | |
return state, gr.update(visible=False), audio_path, f"*Question {qidx + 1}:* {next_q}", gr.update(value=None), gr.update(value=None), gr.update(visible=True, value=eval_md) | |
confirm_btn.click( | |
fn=process_answer, | |
inputs=[stt_transcript, user_audio_input, interview_state, user_data], | |
outputs=[interview_state, interview_summary, question_audio, question_text, user_audio_input, stt_transcript, | |
evaluation_display], | |
concurrency_limit=1 | |
).then( | |
lambda: (gr.update(value=None), gr.update(value=None)), None, [user_audio_input, stt_transcript] | |
) | |
demo.launch(debug=True) |