|
import requests |
|
import os |
|
import json |
|
from langchain_groq import ChatGroq |
|
from langchain_community.embeddings import HuggingFaceEmbeddings |
|
from langchain_community.vectorstores import Qdrant |
|
from langchain.prompts import PromptTemplate |
|
from langchain.chains import LLMChain |
|
from langchain.retrievers import ContextualCompressionRetriever |
|
from langchain.retrievers.document_compressors import CohereRerank |
|
from qdrant_client import QdrantClient |
|
import cohere |
|
import json |
|
import re |
|
import time |
|
from collections import defaultdict |
|
|
|
|
|
from qdrant_client.http import models |
|
from qdrant_client.models import PointStruct |
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
from sklearn.neighbors import NearestNeighbors |
|
from transformers import AutoTokenizer |
|
|
|
from langchain_community.embeddings import HuggingFaceEmbeddings |
|
import numpy as np |
|
import os |
|
from dotenv import load_dotenv |
|
from enum import Enum |
|
import time |
|
from inputimeout import inputimeout, TimeoutOccurred |
|
|
|
|
|
|
|
from qdrant_client import QdrantClient |
|
from qdrant_client.http.models import VectorParams, Distance, Filter, FieldCondition, MatchValue |
|
from qdrant_client.http.models import PointStruct, Filter, FieldCondition, MatchValue, SearchRequest |
|
import traceback |
|
from transformers import pipeline |
|
|
|
from textwrap import dedent |
|
import json |
|
import logging |
|
|
|
from transformers import pipeline,BitsAndBytesConfig |
|
|
|
|
|
import os |
|
|
|
cohere_api_key = os.getenv("COHERE_API_KEY") |
|
chat_groq_api = os.getenv("GROQ_API_KEY") |
|
hf_api_key = os.getenv("HF_API_KEY") |
|
qdrant_api = os.getenv("QDRANT_API_KEY") |
|
qdrant_url = os.getenv("QDRANT_API_URL") |
|
|
|
print("GROQ API Key:", chat_groq_api) |
|
print("QDRANT API Key:", qdrant_api) |
|
print("QDRANT API URL:", qdrant_url) |
|
print("Cohere API Key:", cohere_api_key) |
|
|
|
|
|
from qdrant_client import QdrantClient |
|
|
|
qdrant_client = QdrantClient( |
|
url="https://313b1ceb-057f-4b7b-89f5-7b19a213fe65.us-east-1-0.aws.cloud.qdrant.io:6333", |
|
api_key="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2Nlc3MiOiJtIn0.w13SPZbljbSvt9Ch_0r034QhMFlmEr4ctXqLo2zhxm4", |
|
) |
|
|
|
print(qdrant_client.get_collections()) |
|
|
|
class CustomChatGroq: |
|
def __init__(self, temperature, model_name, api_key): |
|
self.temperature = temperature |
|
self.model_name = model_name |
|
self.api_key = api_key |
|
self.api_url = "https://api.groq.com/openai/v1/chat/completions" |
|
|
|
def predict(self, prompt): |
|
"""Send a request to the Groq API and return the generated response.""" |
|
try: |
|
headers = { |
|
"Authorization": f"Bearer {self.api_key}", |
|
"Content-Type": "application/json" |
|
} |
|
|
|
payload = { |
|
"model": self.model_name, |
|
"messages": [{"role": "system", "content": "You are an AI interviewer."}, |
|
{"role": "user", "content": prompt}], |
|
"temperature": self.temperature, |
|
"max_tokens": 150 |
|
} |
|
|
|
response = requests.post(self.api_url, headers=headers, json=payload, timeout=10) |
|
response.raise_for_status() |
|
|
|
data = response.json() |
|
|
|
|
|
if "choices" in data and len(data["choices"]) > 0: |
|
return data["choices"][0]["message"]["content"].strip() |
|
|
|
logging.warning("Unexpected response structure from Groq API") |
|
return "Interviewer: Could you tell me more about your relevant experience?" |
|
|
|
except requests.exceptions.RequestException as e: |
|
logging.error(f"ChatGroq API error: {e}") |
|
return "Interviewer: Due to a system issue, let's move on to another question." |
|
|
|
groq_llm = ChatGroq( |
|
temperature=0.7, |
|
model_name="llama-3.3-70b-versatile", |
|
api_key=chat_groq_api |
|
) |
|
|
|
from huggingface_hub import login |
|
import os |
|
|
|
HF_TOKEN = os.getenv("HF_TOKEN") |
|
|
|
if HF_TOKEN: |
|
login(HF_TOKEN) |
|
else: |
|
raise EnvironmentError("Missing HF_TOKEN environment variable.") |
|
|
|
|
|
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline |
|
import torch |
|
print(torch.cuda.is_available()) |
|
|
|
MODEL_PATH = "mistralai/Mistral-7B-Instruct-v0.3" |
|
|
|
|
|
bnb_config = BitsAndBytesConfig( |
|
load_in_8bit=True, |
|
) |
|
|
|
mistral_tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH,token=hf_api_key) |
|
|
|
judge_llm = AutoModelForCausalLM.from_pretrained( |
|
MODEL_PATH, |
|
quantization_config=bnb_config,torch_dtype=torch.float16, |
|
device_map="auto", |
|
token=hf_api_key |
|
) |
|
judge_llm.config.pad_token_id = judge_llm.config.eos_token_id |
|
|
|
|
|
print(judge_llm.hf_device_map) |
|
|
|
judge_pipeline = pipeline( |
|
"text-generation", |
|
model=judge_llm, |
|
tokenizer=mistral_tokenizer, |
|
max_new_tokens=128, |
|
temperature=0.3, |
|
top_p=0.9, |
|
do_sample=True, |
|
repetition_penalty=1.1, |
|
) |
|
|
|
|
|
output = judge_pipeline("Q: What is Python?\nA:", max_new_tokens=128)[0]['generated_text'] |
|
print(output) |
|
|
|
|
|
|
|
|
|
from sentence_transformers import SentenceTransformer |
|
|
|
class LocalEmbeddings: |
|
def __init__(self, model_name="all-MiniLM-L6-v2"): |
|
self.model = SentenceTransformer(model_name) |
|
|
|
def embed_query(self, text): |
|
return self.model.encode(text).tolist() |
|
|
|
def embed_documents(self, documents): |
|
return self.model.encode(documents).tolist() |
|
|
|
|
|
embeddings = LocalEmbeddings() |
|
|
|
|
|
qdrant_client = QdrantClient(url=qdrant_url, api_key=qdrant_api,check_compatibility=False) |
|
co = cohere.Client(api_key=cohere_api_key) |
|
|
|
class EvaluationScore(str, Enum): |
|
POOR = "Poor" |
|
MEDIUM = "Medium" |
|
GOOD = "Good" |
|
EXCELLENT = "Excellent" |
|
|
|
|
|
class CohereReranker: |
|
def __init__(self, client): |
|
self.client = client |
|
|
|
def compress_documents(self, documents, query): |
|
if not documents: |
|
return [] |
|
doc_texts = [doc.page_content for doc in documents] |
|
try: |
|
reranked = self.client.rerank( |
|
query=query, |
|
documents=doc_texts, |
|
model="rerank-english-v2.0", |
|
top_n=5 |
|
) |
|
return [documents[result.index] for result in reranked.results] |
|
except Exception as e: |
|
logging.error(f"Error in CohereReranker.compress_documents: {e}") |
|
return documents[:5] |
|
|
|
reranker = CohereReranker(co) |
|
|
|
def load_data_from_json(file_path): |
|
"""Load interview Q&A data from a JSON file.""" |
|
try: |
|
with open(file_path, "r", encoding="utf-8") as f: |
|
data = json.load(f) |
|
job_role_buckets = defaultdict(list) |
|
for idx, item in enumerate(data): |
|
try: |
|
job_role = item["Job Role"].lower().strip() |
|
question = item["Questions"].strip() |
|
answer = item["Answers"].strip() |
|
job_role_buckets[job_role].append({"question": question, "answer": answer}) |
|
except KeyError as e: |
|
logging.warning(f"Skipping item {idx}: missing key {e}") |
|
return job_role_buckets |
|
except Exception as e: |
|
logging.error(f"Error loading data: {e}") |
|
raise |
|
|
|
|
|
def verify_qdrant_collection(collection_name='interview_questions'): |
|
"""Verify if a Qdrant collection exists with the correct configuration.""" |
|
try: |
|
collection_info = qdrant_client.get_collection(collection_name) |
|
vector_size = collection_info.config.params.vectors.size |
|
logging.info(f"Collection '{collection_name}' exists with vector size: {vector_size}") |
|
return True |
|
except Exception as e: |
|
logging.warning(f"Collection '{collection_name}' not found: {e}") |
|
return False |
|
|
|
|
|
|
|
|
|
def store_data_to_qdrant(data, collection_name='interview_questions', batch_size=100): |
|
"""Store interview data in the Qdrant vector database.""" |
|
try: |
|
|
|
if not verify_qdrant_collection(collection_name): |
|
try: |
|
qdrant_client.create_collection( |
|
collection_name=collection_name, |
|
vectors_config=VectorParams(size=384, distance=Distance.COSINE) |
|
) |
|
logging.info(f"Created collection '{collection_name}'") |
|
except Exception as e: |
|
logging.error(f"Error creating collection: {e}\n{traceback.format_exc()}") |
|
return False |
|
|
|
points = [] |
|
point_id = 0 |
|
total_points = sum(len(qa_list) for qa_list in data.values()) |
|
processed = 0 |
|
|
|
for job_role, qa_list in data.items(): |
|
for entry in qa_list: |
|
try: |
|
emb = embeddings.embed_query(entry["question"]) |
|
print(f"Embedding shape: {len(emb)}") |
|
|
|
if not emb or len(emb) != 384: |
|
logging.warning(f"Skipping point {point_id} due to invalid embedding length: {len(emb)}") |
|
continue |
|
|
|
points.append(PointStruct( |
|
id=point_id, |
|
vector=emb, |
|
payload={ |
|
"job_role": job_role, |
|
"question": entry["question"], |
|
"answer": entry["answer"] |
|
} |
|
)) |
|
point_id += 1 |
|
processed += 1 |
|
|
|
|
|
if len(points) >= batch_size: |
|
try: |
|
qdrant_client.upsert(collection_name=collection_name, points=points) |
|
logging.info(f"Stored {processed}/{total_points} points ({processed/total_points*100:.1f}%)") |
|
except Exception as upsert_err: |
|
logging.error(f"Error during upsert: {upsert_err}\n{traceback.format_exc()}") |
|
points = [] |
|
|
|
except Exception as embed_err: |
|
logging.error(f"Embedding error for point {point_id}: {embed_err}\n{traceback.format_exc()}") |
|
|
|
|
|
if points: |
|
try: |
|
qdrant_client.upsert(collection_name=collection_name, points=points) |
|
logging.info(f"Stored final batch of {len(points)} points") |
|
except Exception as final_upsert_err: |
|
logging.error(f"Error during final upsert: {final_upsert_err}\n{traceback.format_exc()}") |
|
|
|
|
|
try: |
|
count = qdrant_client.count(collection_name=collection_name, exact=True).count |
|
print("Current count:", count) |
|
logging.info(f"✅ Successfully stored {count} points in Qdrant") |
|
if count != total_points: |
|
logging.warning(f"Expected {total_points} points but stored {count}") |
|
except Exception as count_err: |
|
logging.error(f"Error verifying stored points: {count_err}\n{traceback.format_exc()}") |
|
|
|
return True |
|
|
|
except Exception as e: |
|
logging.error(f"Error storing data to Qdrant: {e}\n{traceback.format_exc()}") |
|
return False |
|
|
|
|
|
info = qdrant_client.get_collection("interview_questions") |
|
print(info.config.params.vectors.distance) |
|
|
|
def extract_all_roles_from_qdrant(collection_name='interview_questions'): |
|
""" Extract all unique job roles from the Qdrant vector store """ |
|
try: |
|
all_roles = set() |
|
scroll_offset = None |
|
|
|
while True: |
|
response = qdrant_client.scroll( |
|
collection_name=collection_name, |
|
limit=200, |
|
offset=scroll_offset, |
|
with_payload=True |
|
) |
|
points, next_page_offset = response |
|
|
|
if not points: |
|
break |
|
|
|
for point in points: |
|
role = point.payload.get("job_role", "").strip().lower() |
|
if role: |
|
all_roles.add(role) |
|
|
|
if not next_page_offset: |
|
break |
|
|
|
scroll_offset = next_page_offset |
|
|
|
if not all_roles: |
|
logging.warning("[Qdrant] No roles found in payloads.") |
|
else: |
|
logging.info(f"[Qdrant] Extracted {len(all_roles)} unique job roles.") |
|
|
|
return list(all_roles) |
|
except Exception as e: |
|
logging.error(f"Error extracting roles from Qdrant: {e}") |
|
return [] |
|
|
|
import numpy as np |
|
import logging |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
|
def find_similar_roles(user_role, all_roles, top_k=3): |
|
""" |
|
Find the most similar job roles to the given user_role using embeddings. |
|
""" |
|
try: |
|
|
|
user_role = user_role.strip().lower() |
|
if not user_role or not all_roles or not isinstance(all_roles, list): |
|
logging.warning("Invalid input for role similarity") |
|
return [] |
|
|
|
|
|
try: |
|
user_embedding = embeddings.embed_query(user_role) |
|
if user_embedding is None: |
|
logging.error("User embedding is None") |
|
return [] |
|
except Exception as e: |
|
logging.error(f"Error embedding user role: {type(e).__name__}: {e}") |
|
return [] |
|
|
|
|
|
try: |
|
role_embeddings = [] |
|
valid_roles = [] |
|
for role in all_roles: |
|
emb = embeddings.embed_query(role.lower()) |
|
if emb is not None: |
|
role_embeddings.append(emb) |
|
valid_roles.append(role) |
|
else: |
|
logging.warning(f"Skipping role with no embedding: {role}") |
|
except Exception as e: |
|
logging.error(f"Error embedding all roles: {type(e).__name__}: {e}") |
|
return [] |
|
|
|
if not role_embeddings: |
|
logging.error("All role embeddings failed") |
|
return [] |
|
|
|
|
|
similarities = cosine_similarity([user_embedding], role_embeddings)[0] |
|
top_indices = np.argsort(similarities)[::-1][:top_k] |
|
|
|
similar_roles = [valid_roles[i] for i in top_indices] |
|
logging.debug(f"Similar roles to '{user_role}': {similar_roles}") |
|
return similar_roles |
|
|
|
except Exception as e: |
|
logging.error(f"Error finding similar roles: {type(e).__name__}: {e}", exc_info=True) |
|
return [] |
|
|
|
|
|
def get_role_questions(job_role): |
|
try: |
|
if not job_role: |
|
logging.warning("Job role is empty.") |
|
return [] |
|
|
|
filter_by_role = Filter( |
|
must=[FieldCondition( |
|
key="job_role", |
|
match=MatchValue(value=job_role.lower()) |
|
)] |
|
) |
|
|
|
all_results = [] |
|
offset = None |
|
while True: |
|
results, next_page_offset = qdrant_client.scroll( |
|
collection_name="interview_questions", |
|
scroll_filter=filter_by_role, |
|
with_payload=True, |
|
with_vectors=False, |
|
limit=100, |
|
offset=offset |
|
) |
|
all_results.extend(results) |
|
|
|
if not next_page_offset: |
|
break |
|
offset = next_page_offset |
|
|
|
parsed_results = [{ |
|
"question": r.payload.get("question"), |
|
"answer": r.payload.get("answer"), |
|
"job_role": r.payload.get("job_role") |
|
} for r in all_results] |
|
|
|
return parsed_results |
|
|
|
except Exception as e: |
|
logging.error(f"Error fetching role questions: {type(e).__name__}: {e}", exc_info=True) |
|
return [] |
|
|
|
def retrieve_interview_data(job_role, all_roles): |
|
""" |
|
Retrieve all interview Q&A for a given job role. |
|
Falls back to similar roles if no data found. |
|
Args: |
|
job_role (str): Input job role (can be misspelled) |
|
all_roles (list): Full list of available job roles |
|
Returns: |
|
list: List of QA dicts with keys: 'question', 'answer', 'job_role' |
|
""" |
|
import logging |
|
logging.basicConfig(level=logging.INFO) |
|
|
|
job_role = job_role.strip().lower() |
|
seen_questions = set() |
|
final_results = [] |
|
|
|
|
|
logging.info(f"Trying to fetch all data for exact role: '{job_role}'") |
|
exact_matches = get_role_questions(job_role) |
|
|
|
for qa in exact_matches: |
|
question = qa["question"] |
|
if question and question not in seen_questions: |
|
seen_questions.add(question) |
|
final_results.append(qa) |
|
|
|
if final_results: |
|
logging.info(f"Found {len(final_results)} QA pairs for exact role '{job_role}'") |
|
return final_results |
|
|
|
logging.warning(f"No data found for role '{job_role}'. Trying similar roles...") |
|
|
|
|
|
similar_roles = find_similar_roles(job_role, all_roles, top_k=3) |
|
|
|
if not similar_roles: |
|
logging.warning("No similar roles found.") |
|
return [] |
|
|
|
logging.info(f"Found similar roles: {similar_roles}") |
|
|
|
|
|
for role in similar_roles: |
|
logging.info(f"Fetching data for similar role: '{role}'") |
|
role_qa = get_role_questions(role) |
|
|
|
for qa in role_qa: |
|
question = qa["question"] |
|
if question and question not in seen_questions: |
|
seen_questions.add(question) |
|
final_results.append(qa) |
|
|
|
logging.info(f"Retrieved total {len(final_results)} QA pairs from similar roles") |
|
return final_results |
|
|
|
import random |
|
|
|
def random_context_chunks(retrieved_data, k=3): |
|
chunks = random.sample(retrieved_data, k) |
|
return "\n\n".join([f"Q: {item['question']}\nA: {item['answer']}" for item in chunks]) |
|
|
|
import json |
|
import logging |
|
import re |
|
from typing import Dict |
|
|
|
def eval_question_quality( |
|
question: str, |
|
job_role: str, |
|
seniority: str, |
|
judge_pipeline=None, |
|
max_retries=1 |
|
) -> Dict[str, str]: |
|
import time |
|
try: |
|
|
|
if judge_pipeline is None: |
|
judge_pipeline = globals().get("judge_pipeline") |
|
|
|
if not judge_pipeline: |
|
return { |
|
"Score": "Error", |
|
"Reasoning": "Judge pipeline not available", |
|
"Improvements": "Please provide a valid language model pipeline" |
|
} |
|
|
|
prompt = f""" |
|
... (same as your prompt) ... |
|
Now evaluate this question: |
|
\"{question}\" |
|
""" |
|
|
|
for attempt in range(max_retries + 1): |
|
response = judge_pipeline( |
|
prompt, |
|
max_new_tokens=512, |
|
do_sample=False, |
|
temperature=0.1, |
|
repetition_penalty=1.2 |
|
)[0]["generated_text"] |
|
|
|
try: |
|
|
|
match = re.search(r'\{.*\}', response, re.DOTALL) |
|
if not match: |
|
raise ValueError("Could not locate JSON structure in model output.") |
|
json_str = match.group(0) |
|
result = json.loads(json_str) |
|
|
|
|
|
required_keys = ["Score", "Reasoning", "Improvements"] |
|
valid_scores = {"Poor", "Medium", "Good", "Excellent"} |
|
if not all(k in result for k in required_keys): |
|
raise ValueError("Missing required fields.") |
|
if result["Score"] not in valid_scores: |
|
raise ValueError("Invalid score value.") |
|
return result |
|
|
|
except Exception as e: |
|
logging.warning(f"Attempt {attempt+1} JSON parsing failed: {e}") |
|
time.sleep(0.2) |
|
|
|
|
|
return { |
|
"Score": "Poor", |
|
"Reasoning": "The evaluation model failed to produce a valid score, so defaulted to 'Poor'. Check model output and prompt formatting.", |
|
"Improvements": [ |
|
"Ensure the question is clear and role-relevant.", |
|
"Double-check prompt and formatting.", |
|
"Try rephrasing the question to match rubric." |
|
] |
|
} |
|
|
|
except Exception as e: |
|
logging.error(f"Error in eval_question_quality: {type(e).__name__}: {e}", exc_info=True) |
|
return { |
|
"Score": "Poor", |
|
"Reasoning": f"Critical error occurred: {str(e)}. Defaulted to 'Poor'.", |
|
"Improvements": [ |
|
"Retry with a different question.", |
|
"Check your judge pipeline connection.", |
|
"Contact support if this persists." |
|
] |
|
} |
|
|
|
def evaluate_answer( |
|
question: str, |
|
answer: str, |
|
ref_answer: str, |
|
job_role: str, |
|
seniority: str, |
|
judge_pipeline=None, |
|
max_retries=1 |
|
) -> Dict[str, str]: |
|
""" |
|
Evaluates a candidate's answer to an interview question and returns a structured judgment. |
|
Guarantees a valid, actionable result even if the model fails. |
|
""" |
|
|
|
import time |
|
try: |
|
if judge_pipeline is None: |
|
judge_pipeline = globals().get("judge_pipeline") |
|
|
|
if not judge_pipeline: |
|
return { |
|
"Score": "Error", |
|
"Reasoning": "Judge pipeline not available", |
|
"Improvements": [ |
|
"Please provide a valid language model pipeline" |
|
] |
|
} |
|
|
|
|
|
prompt = f""" |
|
You are an expert technical interviewer evaluating a candidate's response for a {job_role} position at the {seniority} level. |
|
You are provided with: |
|
- The question asked |
|
- The candidate's response |
|
- A reference answer that represents a high-quality expected answer |
|
Evaluate the candidate's response based on: |
|
- Technical correctness |
|
- Clarity and depth of explanation |
|
- Relevance to the job role and seniority |
|
- Completeness and structure |
|
Be objective, concise, and use professional language. Be fair but critical. |
|
-------------------------- |
|
Question: |
|
{question} |
|
Candidate Answer: |
|
{answer} |
|
Reference Answer: |
|
{ref_answer} |
|
-------------------------- |
|
Now return your evaluation as a valid JSON object using exactly these keys: |
|
- "Score": One of ["Poor", "Medium", "Good", "Excellent"] |
|
- "Reasoning": 2-3 sentence explanation justifying the score, covering clarity, accuracy, completeness, or relevance |
|
- "Improvements": A list of 2-3 specific and constructive suggestions to help the candidate improve this answer |
|
Example: |
|
{{ |
|
"Score": "Good", |
|
"Reasoning": "The answer demonstrates a good understanding of the concept and touches on key ideas, but lacks depth in explaining the trade-offs between techniques.", |
|
"Improvements": [ |
|
"Explain when this method might fail or produce biased results", |
|
"Include examples or metrics to support the explanation", |
|
"Clarify the specific business impact or outcome achieved" |
|
] |
|
}} |
|
Respond only with the JSON: |
|
""" |
|
for attempt in range(max_retries + 1): |
|
output = judge_pipeline( |
|
prompt, |
|
max_new_tokens=512, |
|
temperature=0.3, |
|
do_sample=False |
|
)[0]["generated_text"] |
|
|
|
|
|
try: |
|
start_idx = output.rfind("{") |
|
end_idx = output.rfind("}") + 1 |
|
|
|
if start_idx != -1 and end_idx != -1 and end_idx > start_idx: |
|
json_str = output[start_idx:end_idx] |
|
result = json.loads(json_str) |
|
valid_scores = {"Poor", "Medium", "Good", "Excellent"} |
|
if result.get("Score") in valid_scores: |
|
return { |
|
"Score": result["Score"], |
|
"Reasoning": result.get("Reasoning", "No explanation provided."), |
|
"Improvements": result.get("Improvements", ["No improvement suggestions provided."]) |
|
} |
|
else: |
|
raise ValueError(f"Invalid Score value: {result.get('Score')}") |
|
else: |
|
raise ValueError("JSON format not found in output") |
|
except Exception as e: |
|
logging.warning(f"evaluate_answer: Attempt {attempt+1} failed to parse model output: {e}") |
|
time.sleep(0.2) |
|
|
|
|
|
return { |
|
"Score": "Poor", |
|
"Reasoning": "The evaluation model failed to produce a valid score or parse output; defaulted to 'Poor'. Please check model output and prompt formatting.", |
|
"Improvements": [ |
|
"Be more specific and detailed in the answer.", |
|
"Structure your response with clear points.", |
|
"Relate your answer more closely to the job role and question." |
|
] |
|
} |
|
except Exception as e: |
|
logging.error(f"Evaluation failed: {e}", exc_info=True) |
|
return { |
|
"Score": "Poor", |
|
"Reasoning": f"Critical error occurred: {str(e)}. Defaulted to 'Poor'.", |
|
"Improvements": [ |
|
"Try again with a different answer.", |
|
"Check your judge pipeline connection.", |
|
"Contact support if the error persists." |
|
] |
|
} |
|
|
|
|
|
def generate_reference_answer(question, job_role, seniority): |
|
""" |
|
Generates a high-quality reference answer using Groq-hosted LLaMA model. |
|
Args: |
|
question (str): Interview question to answer. |
|
job_role (str): Target job role (e.g., "Frontend Developer"). |
|
seniority (str): Experience level (e.g., "Mid-Level"). |
|
Returns: |
|
str: Clean, generated reference answer or error message. |
|
""" |
|
try: |
|
|
|
prompt = f"""You are a {seniority} {job_role}. |
|
Q: {question} |
|
A:""" |
|
|
|
|
|
ref_answer = groq_llm.predict(prompt) |
|
|
|
if not ref_answer.strip(): |
|
return "Reference answer not generated." |
|
|
|
return ref_answer.strip() |
|
|
|
except Exception as e: |
|
logging.error(f"Error generating reference answer: {e}", exc_info=True) |
|
return "Unable to generate reference answer due to an error" |
|
|
|
|
|
|
|
def build_interview_prompt(conversation_history, user_response, context, job_role, skills, seniority, |
|
difficulty_adjustment=None, voice_label=None, face_label=None, effective_confidence=None): |
|
"""Build a prompt for generating the next interview question with adaptive difficulty and fairness logic.""" |
|
|
|
interview_template = """ |
|
You are an AI interviewer conducting a real-time interview for a {job_role} position. |
|
Your objective is to thoroughly evaluate the candidate's suitability for the role using smart, structured, and adaptive questioning. |
|
--- |
|
Interview Rules and Principles: |
|
- The **baseline difficulty** of questions must match the candidate’s seniority level (e.g., junior, mid-level, senior). |
|
- Use your judgment to increase difficulty **slightly** if the candidate performs well, or simplify if they struggle — but never drop below the expected baseline for their level. |
|
- Avoid asking extremely difficult questions to junior candidates unless they’ve clearly demonstrated advanced knowledge. |
|
- Be fair: candidates for the same role should be evaluated within a consistent difficulty range. |
|
- Adapt your line of questioning gradually and logically based on the **overall flow**, not just the last answer. |
|
- Include real-world problem-solving scenarios to test how the candidate thinks and behaves practically. |
|
- You must **lead** the interview and make intelligent decisions about what to ask next. |
|
--- |
|
Context Use: |
|
{context_instruction} |
|
Note: |
|
If no relevant context was retrieved or the previous answer is unclear, you must still generate a thoughtful interview question using your own knowledge. Do not skip generation. Avoid default or fallback responses — always try to generate a meaningful and fair next question. |
|
--- |
|
Job Role: {job_role} |
|
Seniority Level: {seniority} |
|
Skills Focus: {skills} |
|
Difficulty Setting: {difficulty} (based on {difficulty_adjustment}) |
|
--- |
|
Recent Conversation History: |
|
{history} |
|
Candidate's Last Response: |
|
"{user_response}" |
|
Evaluation of Last Response: |
|
{response_evaluation} |
|
Voice Tone: {voice_label} |
|
--- |
|
--- |
|
Important: |
|
If no relevant context was retrieved or the previous answer is unclear or off-topic, |
|
you must still generate a meaningful and fair interview question using your own knowledge and best practices. |
|
Do not skip question generation or fall back to default/filler responses. |
|
--- |
|
Guidelines for Next Question: |
|
- If this is the beginning of the interview, start with a question about the candidate’s background or experience. |
|
- Base the difficulty primarily on the seniority level, with light adjustment from recent performance. |
|
- Focus on core skills, real-world applications, and depth of reasoning. |
|
- Ask only one question. Be clear and concise. |
|
Generate the next interview question now: |
|
""" |
|
|
|
|
|
if difficulty_adjustment == "harder": |
|
difficulty = f"slightly more challenging than typical for {seniority}" |
|
elif difficulty_adjustment == "easier": |
|
difficulty = f"slightly easier than typical for {seniority}" |
|
else: |
|
difficulty = f"appropriate for {seniority}" |
|
|
|
|
|
if context.strip(): |
|
context_instruction = ( |
|
"Use both your own expertise and the provided context from relevant interview datasets. " |
|
"You can either build on questions from the dataset or generate your own." |
|
) |
|
context = context.strip() |
|
else: |
|
context_instruction = ( |
|
"No specific context retrieved. Use your own knowledge and best practices to craft a question." |
|
) |
|
context = "" |
|
|
|
|
|
|
|
recent_history = conversation_history[-6:] if len(conversation_history) > 6 else conversation_history |
|
formatted_history = "\n".join([f"{msg['role'].capitalize()}: {msg['content']}" for msg in recent_history]) |
|
|
|
|
|
|
|
if conversation_history and conversation_history[-1].get("evaluation"): |
|
eval_data = conversation_history[-1]["evaluation"][-1] |
|
response_evaluation = f""" |
|
- Score: {eval_data.get('Score', 'N/A')} |
|
- Reasoning: {eval_data.get('Reasoning', 'N/A')} |
|
- Improvements: {eval_data.get('Improvements', 'N/A')} |
|
""" |
|
else: |
|
response_evaluation = "No evaluation available yet." |
|
|
|
|
|
|
|
prompt = interview_template.format( |
|
job_role=job_role, |
|
seniority=seniority, |
|
skills=skills, |
|
difficulty=difficulty, |
|
difficulty_adjustment=difficulty_adjustment if difficulty_adjustment else "default seniority", |
|
context_instruction=context_instruction, |
|
context=context, |
|
history=formatted_history, |
|
user_response=user_response, |
|
response_evaluation=response_evaluation.strip(), |
|
voice_label=voice_label or "unknown", |
|
) |
|
|
|
return prompt |
|
|
|
|
|
def generate_llm_interview_report( |
|
interview_state, logged_samples, job_role, seniority |
|
): |
|
from collections import Counter |
|
|
|
|
|
def score_label(label): |
|
mapping = { |
|
"confident": 5, "calm": 4, "neutral": 3, "nervous": 2, "anxious": 1, "unknown": 3 |
|
} |
|
return mapping.get(label.lower(), 3) |
|
|
|
def section_score(vals): |
|
return round(sum(vals)/len(vals), 2) if vals else "N/A" |
|
|
|
|
|
scores, voice_conf, face_conf, comm_scores = [], [], [], [] |
|
tech_details, comm_details, emotion_details, relevance_details, problem_details = [], [], [], [], [] |
|
|
|
for entry in logged_samples: |
|
answer_eval = entry.get("answer_evaluation", {}) |
|
score = answer_eval.get("Score", "Not Evaluated") |
|
reasoning = answer_eval.get("Reasoning", "") |
|
if score.lower() in ["excellent", "good", "medium", "poor"]: |
|
score_map = {"excellent": 5, "good": 4, "medium": 3, "poor": 2} |
|
scores.append(score_map[score.lower()]) |
|
|
|
tech_details.append(reasoning) |
|
comm_details.append(reasoning) |
|
|
|
voice_conf.append(score_label(entry.get("voice_label", "unknown"))) |
|
face_conf.append(score_label(entry.get("face_label", "unknown"))) |
|
|
|
if entry["user_answer"]: |
|
length = len(entry["user_answer"].split()) |
|
comm_score = min(5, max(2, length // 30)) |
|
comm_scores.append(comm_score) |
|
|
|
|
|
avg_problem = section_score(scores) |
|
avg_tech = section_score(scores) |
|
avg_comm = section_score(comm_scores) |
|
avg_emotion = section_score([(v+f)/2 for v, f in zip(voice_conf, face_conf)]) |
|
|
|
|
|
section_averages = [avg_problem, avg_tech, avg_comm, avg_emotion] |
|
numeric_avgs = [v for v in section_averages if isinstance(v, (float, int))] |
|
avg_overall = round(sum(numeric_avgs) / len(numeric_avgs), 2) if numeric_avgs else 0 |
|
|
|
|
|
if avg_overall >= 4.5: |
|
verdict = "Strong Hire" |
|
elif avg_overall >= 4.0: |
|
verdict = "Hire" |
|
elif avg_overall >= 3.0: |
|
verdict = "Conditional Hire" |
|
else: |
|
verdict = "No Hire" |
|
|
|
|
|
transcript = "\n\n".join([ |
|
f"Q: {e['generated_question']}\nA: {e['user_answer']}\nScore: {e.get('answer_evaluation',{}).get('Score','')}\nReasoning: {e.get('answer_evaluation',{}).get('Reasoning','')}" |
|
for e in logged_samples |
|
]) |
|
|
|
prompt = f""" |
|
You are a senior technical interviewer at a major tech company. |
|
Write a structured, realistic hiring report for this {seniority} {job_role} interview, using these section scores (scale 1–5, with 5 best): |
|
Section-wise Evaluation |
|
1. *Problem Solving & Critical Thinking*: {avg_problem} |
|
2. *Technical Depth & Knowledge*: {avg_tech} |
|
3. *Communication & Clarity*: {avg_comm} |
|
4. *Emotional Composure & Confidence*: {avg_emotion} |
|
5. *Role Relevance*: 5 |
|
*Transcript* |
|
{transcript} |
|
Your report should have the following sections: |
|
1. *Executive Summary* (realistic, hiring-committee style) |
|
2. *Section-wise Comments* (for each numbered category above, with short paragraph citing specifics) |
|
3. *Strengths & Weaknesses* (list at least 2 for each) |
|
4. *Final Verdict*: {verdict} |
|
5. *Recommendations* (2–3 for future improvement) |
|
Use realistic language. If some sections are N/A or lower than others, comment honestly. |
|
Interview Report: |
|
""" |
|
|
|
return groq_llm.predict(prompt) |
|
|
|
def get_user_info(): |
|
""" |
|
Collects essential information from the candidate before starting the interview. |
|
Returns a dictionary with keys: name, job_role, seniority, skills |
|
""" |
|
import logging |
|
logging.info("Collecting user information...") |
|
|
|
print("Welcome to the AI Interview Simulator!") |
|
print("Let’s set up your mock interview.\n") |
|
|
|
|
|
name = input("What is your name? ").strip() |
|
while not name: |
|
print("Please enter your name.") |
|
name = input("What is your name? ").strip() |
|
|
|
|
|
job_role = input(f"Hi {name}, what job role are you preparing for? (e.g. Frontend Developer) ").strip() |
|
while not job_role: |
|
print("Please specify the job role.") |
|
job_role = input("What job role are you preparing for? ").strip() |
|
|
|
|
|
seniority_options = ["Entry-level", "Junior", "Mid-Level", "Senior", "Lead"] |
|
print("\nSelect your experience level:") |
|
for i, option in enumerate(seniority_options, 1): |
|
print(f"{i}. {option}") |
|
|
|
seniority_choice = None |
|
while seniority_choice not in range(1, len(seniority_options)+1): |
|
try: |
|
seniority_choice = int(input("Enter the number corresponding to your level: ")) |
|
except ValueError: |
|
print(f"Please enter a number between 1 and {len(seniority_options)}") |
|
|
|
seniority = seniority_options[seniority_choice - 1] |
|
|
|
|
|
skills_input = input(f"\nWhat are your top skills relevant to {job_role}? (Separate with commas): ") |
|
skills = [skill.strip() for skill in skills_input.split(",") if skill.strip()] |
|
|
|
while not skills: |
|
print("Please enter at least one skill.") |
|
skills_input = input("Your top skills (comma-separated): ") |
|
skills = [skill.strip() for skill in skills_input.split(",") if skill.strip()] |
|
|
|
|
|
print("\n Interview Setup Complete!") |
|
print(f"Name: {name}") |
|
print(f"Job Role: {job_role}") |
|
print(f"Experience Level: {seniority}") |
|
print(f"Skills: {', '.join(skills)}") |
|
print("\nStarting your mock interview...\n") |
|
|
|
return { |
|
"name": name, |
|
"job_role": job_role, |
|
"seniority": seniority, |
|
"skills": skills |
|
} |
|
|
|
import threading |
|
|
|
def wait_for_user_response(timeout=200): |
|
"""Wait for user input with timeout. Returns '' if no response.""" |
|
user_input = [] |
|
|
|
def get_input(): |
|
answer = input("Your Answer (within timeout): ").strip() |
|
user_input.append(answer) |
|
|
|
thread = threading.Thread(target=get_input) |
|
thread.start() |
|
thread.join(timeout) |
|
|
|
return user_input[0] if user_input else "" |
|
|
|
import json |
|
from datetime import datetime |
|
from time import time |
|
import random |
|
|
|
def interview_loop(max_questions, timeout_seconds=300, collection_name="interview_questions", judge_pipeline=None, save_path="interview_log.json"): |
|
|
|
|
|
user_info = get_user_info() |
|
job_role = user_info['job_role'] |
|
seniority = user_info['seniority'] |
|
skills = user_info['skills'] |
|
|
|
all_roles = extract_all_roles_from_qdrant(collection_name=collection_name) |
|
retrieved_data = retrieve_interview_data(job_role, all_roles) |
|
context_data = random_context_chunks(retrieved_data, k=4) |
|
|
|
conversation_history = [] |
|
interview_state = { |
|
"questions": [], |
|
"user_answer": [], |
|
"job_role": job_role, |
|
"seniority": seniority, |
|
"start_time": time() |
|
} |
|
|
|
|
|
logged_samples = [] |
|
|
|
difficulty_adjustment = None |
|
|
|
for i in range(max_questions): |
|
last_user_response = conversation_history[-1]['content'] if conversation_history else "" |
|
|
|
|
|
prompt = build_interview_prompt( |
|
conversation_history=conversation_history, |
|
user_response=last_user_response, |
|
context=context_data, |
|
job_role=job_role, |
|
skills=skills, |
|
seniority=seniority, |
|
difficulty_adjustment=difficulty_adjustment |
|
) |
|
question = groq_llm.predict(prompt) |
|
question_eval = eval_question_quality(question, job_role, seniority, judge_pipeline) |
|
|
|
conversation_history.append({'role': "Interviewer", "content": question}) |
|
print(f"Interviewer: Q{i + 1} : {question}") |
|
|
|
|
|
start_time = time() |
|
user_answer = wait_for_user_response(timeout=timeout_seconds) |
|
response_time = time() - start_time |
|
|
|
skipped = False |
|
answer_eval = None |
|
ref_answer = None |
|
|
|
if not user_answer: |
|
print("No Response Received, moving to next question.") |
|
user_answer = None |
|
skipped = True |
|
difficulty_adjustment = "medium" |
|
else: |
|
conversation_history.append({"role": "Candidate", "content": user_answer}) |
|
|
|
ref_answer = generate_reference_answer(question, job_role, seniority) |
|
answer_eval = evaluate_answer( |
|
question=question, |
|
answer=user_answer, |
|
ref_answer=ref_answer, |
|
job_role=job_role, |
|
seniority=seniority, |
|
judge_pipeline=judge_pipeline |
|
) |
|
|
|
|
|
interview_state["user_answer"].append(user_answer) |
|
|
|
conversation_history[-1].setdefault('evaluation', []).append({ |
|
"technical_depth": { |
|
"score": answer_eval['Score'], |
|
"Reasoning": answer_eval['Reasoning'] |
|
} |
|
}) |
|
|
|
|
|
score = answer_eval['Score'].lower() |
|
if score == "excellent": |
|
difficulty_adjustment = "harder" |
|
elif score in ['poor', 'medium']: |
|
difficulty_adjustment = "easier" |
|
else: |
|
difficulty_adjustment = None |
|
|
|
|
|
logged_samples.append({ |
|
"job_role": job_role, |
|
"seniority": seniority, |
|
"skills": skills, |
|
"context": context_data, |
|
"prompt": prompt, |
|
"generated_question": question, |
|
"question_evaluation": question_eval, |
|
"user_answer": user_answer, |
|
"reference_answer": ref_answer, |
|
"answer_evaluation": answer_eval, |
|
"skipped": skipped |
|
}) |
|
|
|
|
|
interview_state['questions'].append({ |
|
"question": question, |
|
"question_evaluation": question_eval, |
|
"user_answer": user_answer, |
|
"answer_evaluation": answer_eval, |
|
"skipped": skipped |
|
}) |
|
|
|
interview_state['end_time'] = time() |
|
report = generate_llm_interview_report(interview_state, job_role, seniority) |
|
print("Report : _____________________\n") |
|
print(report) |
|
print('______________________________________________') |
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
filename = f"{save_path.replace('.json', '')}_{timestamp}.json" |
|
with open(filename, "w", encoding="utf-8") as f: |
|
json.dump(logged_samples, f, indent=2, ensure_ascii=False) |
|
|
|
print(f" Interview log saved to {filename}") |
|
print("____________________________________\n") |
|
|
|
print(f"interview state : {interview_state}") |
|
return interview_state, report |
|
|
|
from sklearn.metrics import precision_score, recall_score, f1_score |
|
import numpy as np |
|
|
|
|
|
def build_ground_truth(all_roles): |
|
gt = {} |
|
for role in all_roles: |
|
qa_list = get_role_questions(role) |
|
gt[role] = set(q["question"] for q in qa_list if q["question"]) |
|
return gt |
|
|
|
|
|
def evaluate_retrieval(job_role, all_roles, k=10): |
|
""" |
|
Evaluate retrieval quality using Precision@k, Recall@k, and F1@k. |
|
Args: |
|
job_role (str): The input job role to search for. |
|
all_roles (list): List of all available job roles in the system. |
|
k (int): Top-k retrieved questions to evaluate. |
|
Returns: |
|
dict: Evaluation metrics including precision, recall, and f1. |
|
""" |
|
|
|
|
|
ground_truth_qs = set( |
|
q["question"].strip() |
|
for q in get_role_questions(job_role) |
|
if q.get("question") |
|
) |
|
|
|
if not ground_truth_qs: |
|
print(f"[!] No ground truth found for role: {job_role}") |
|
return {} |
|
|
|
|
|
retrieved_qas = retrieve_interview_data(job_role, all_roles) |
|
retrieved_qs = [q["question"].strip() for q in retrieved_qas if q.get("question")] |
|
|
|
|
|
retrieved_top_k = retrieved_qs[:k] |
|
|
|
|
|
y_true = [1 if q in ground_truth_qs else 0 for q in retrieved_top_k] |
|
y_pred = [1] * len(y_true) |
|
|
|
precision = precision_score(y_true, y_pred, zero_division=0) |
|
recall = recall_score(y_true, y_pred, zero_division=0) |
|
f1 = f1_score(y_true, y_pred, zero_division=0) |
|
|
|
print(f" Retrieval Evaluation for role: '{job_role}' (Top-{k})") |
|
print(f"Precision@{k}: {precision:.2f}") |
|
print(f"Recall@{k}: {recall:.2f}") |
|
print(f"F1@{k}: {f1:.2f}") |
|
print(f"Relevant Retrieved: {sum(y_true)}/{len(y_true)}") |
|
print("–" * 40) |
|
|
|
return { |
|
"job_role": job_role, |
|
"precision": precision, |
|
"recall": recall, |
|
"f1": f1, |
|
"relevant_retrieved": sum(y_true), |
|
"total_retrieved": len(y_true), |
|
"ground_truth_count": len(ground_truth_qs), |
|
} |
|
|
|
|
|
k_values = [5, 10, 20] |
|
all_roles = extract_all_roles_from_qdrant(collection_name="interview_questions") |
|
|
|
results = [] |
|
|
|
for k in k_values: |
|
for role in all_roles: |
|
metrics = evaluate_retrieval(role, all_roles, k=k) |
|
if metrics: |
|
metrics["k"] = k |
|
results.append(metrics) |
|
|
|
import pandas as pd |
|
|
|
df = pd.DataFrame(results) |
|
summary = df.groupby("k")[["precision", "recall", "f1"]].mean().round(3) |
|
print(summary) |
|
|
|
|
|
def extract_job_details(job_description): |
|
"""Extract job details such as title, skills, experience level, and years of experience from the job description.""" |
|
title_match = re.search(r"(?i)(?:seeking|hiring) a (.+?) to", job_description) |
|
job_title = title_match.group(1) if title_match else "Unknown" |
|
|
|
skills_match = re.findall(r"(?i)(?:Proficiency in|Experience with|Knowledge of) (.+?)(?:,|\.| and| or)", job_description) |
|
skills = list(set([skill.strip() for skill in skills_match])) if skills_match else [] |
|
|
|
experience_match = re.search(r"(\d+)\+? years of experience", job_description) |
|
if experience_match: |
|
years_experience = int(experience_match.group(1)) |
|
experience_level = "Senior" if years_experience >= 5 else "Mid" if years_experience >= 3 else "Junior" |
|
else: |
|
years_experience = None |
|
experience_level = "Unknown" |
|
|
|
return { |
|
"job_title": job_title, |
|
"skills": skills, |
|
"experience_level": experience_level, |
|
"years_experience": years_experience |
|
} |
|
|
|
import re |
|
from docx import Document |
|
import textract |
|
from PyPDF2 import PdfReader |
|
|
|
JOB_TITLES = [ |
|
"Accountant", "Data Scientist", "Machine Learning Engineer", "Software Engineer", |
|
"Developer", "Analyst", "Researcher", "Intern", "Consultant", "Manager", |
|
"Engineer", "Specialist", "Project Manager", "Product Manager", "Administrator", |
|
"Director", "Officer", "Assistant", "Coordinator", "Supervisor" |
|
] |
|
|
|
def clean_filename_name(filename): |
|
|
|
base = re.sub(r"\.[^.]+$", "", filename) |
|
base = base.strip() |
|
|
|
|
|
base_clean = re.sub(r"\bcv\b", "", base, flags=re.IGNORECASE).strip() |
|
|
|
|
|
if not base_clean: |
|
return None |
|
|
|
|
|
if re.search(r"\d", base_clean): |
|
return None |
|
|
|
|
|
base_clean = base_clean.replace("_", " ").replace("-", " ") |
|
return base_clean.title() |
|
|
|
def looks_like_job_title(line): |
|
for title in JOB_TITLES: |
|
pattern = r"\b" + re.escape(title.lower()) + r"\b" |
|
if re.search(pattern, line.lower()): |
|
return True |
|
return False |
|
|
|
def extract_name_from_text(lines): |
|
|
|
for i in range(min(1, len(lines))): |
|
line = lines[i].strip() |
|
if looks_like_job_title(line): |
|
return "unknown" |
|
if re.search(r"\d", line): |
|
continue |
|
if len(line.split()) > 4 or len(line) > 40: |
|
continue |
|
|
|
if line.isupper(): |
|
continue |
|
|
|
return line.title() |
|
return None |
|
|
|
def extract_text_from_file(file_path): |
|
if file_path.endswith('.pdf'): |
|
reader = PdfReader(file_path) |
|
text = "\n".join(page.extract_text() or '' for page in reader.pages) |
|
elif file_path.endswith('.docx'): |
|
doc = Document(file_path) |
|
text = "\n".join([para.text for para in doc.paragraphs]) |
|
else: |
|
text = textract.process(file_path).decode('utf-8') |
|
return text.strip() |
|
|
|
def extract_candidate_details(file_path): |
|
text = extract_text_from_file(file_path) |
|
lines = [line.strip() for line in text.splitlines() if line.strip()] |
|
|
|
|
|
filename = file_path.split("/")[-1] |
|
name = clean_filename_name(filename) |
|
if not name: |
|
name = extract_name_from_text(lines) |
|
if not name: |
|
name = "Unknown" |
|
|
|
|
|
skills = [] |
|
skills_section = re.search(r"Skills\s*[:\-]?\s*(.+)", text, re.IGNORECASE) |
|
if skills_section: |
|
raw_skills = skills_section.group(1) |
|
skills = [s.strip() for s in re.split(r",|\n|•|-", raw_skills) if s.strip()] |
|
|
|
return { |
|
"name": name, |
|
"skills": skills |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import gradio as gr |
|
import time |
|
import tempfile |
|
import numpy as np |
|
import scipy.io.wavfile as wavfile |
|
import os |
|
import json |
|
import edge_tts |
|
import torch, gc |
|
from faster_whisper import WhisperModel |
|
import asyncio |
|
import threading |
|
from concurrent.futures import ThreadPoolExecutor |
|
|
|
print(torch.cuda.is_available()) |
|
torch.cuda.empty_cache() |
|
gc.collect() |
|
|
|
|
|
faster_whisper_model = None |
|
tts_voice = "en-US-AriaNeural" |
|
|
|
|
|
|
|
executor = ThreadPoolExecutor(max_workers=2) |
|
|
|
|
|
if torch.cuda.is_available(): |
|
print(f"🔥 CUDA Available: {torch.cuda.get_device_name(0)}") |
|
print(f"🔥 CUDA Memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB") |
|
|
|
torch.cuda.set_device(0) |
|
else: |
|
print("⚠️ CUDA not available, using CPU") |
|
|
|
def load_models_lazy(): |
|
"""Load models only when needed""" |
|
global faster_whisper_model |
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
print(f"🔁 Using device: {device}") |
|
|
|
if faster_whisper_model is None: |
|
print("🔁 Loading Faster-Whisper model...") |
|
compute_type = "float16" if device == "cuda" else "int8" |
|
faster_whisper_model = WhisperModel("base", device=device, compute_type=compute_type) |
|
print(f"✅ Faster-Whisper model loaded on {device}") |
|
|
|
|
|
async def edge_tts_to_file(text, output_path="tts.wav", voice=tts_voice): |
|
communicate = edge_tts.Communicate(text, voice) |
|
await communicate.save(output_path) |
|
return output_path |
|
|
|
def tts_async(text): |
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
return executor.submit(loop.run_until_complete, edge_tts_to_file(text)) |
|
|
|
|
|
|
|
|
|
def whisper_stt(audio_path): |
|
"""STT using Faster-Whisper""" |
|
if not audio_path or not os.path.exists(audio_path): |
|
return "" |
|
|
|
load_models_lazy() |
|
print("🔁 Transcribing with Faster-Whisper") |
|
|
|
segments, _ = faster_whisper_model.transcribe(audio_path) |
|
transcript = " ".join(segment.text for segment in segments) |
|
return transcript.strip() |
|
|
|
|
|
seniority_mapping = { |
|
"Entry-level": 1, "Junior": 2, "Mid-Level": 3, "Senior": 4, "Lead": 5 |
|
} |
|
|
|
with gr.Blocks(theme=gr.themes.Soft()) as demo: |
|
user_data = gr.State({}) |
|
interview_state = gr.State({}) |
|
missing_fields_state = gr.State([]) |
|
tts_future = gr.State(None) |
|
|
|
with gr.Column(visible=True) as user_info_section: |
|
gr.Markdown("## Candidate Information") |
|
cv_file = gr.File(label="Upload CV") |
|
job_desc = gr.Textbox(label="Job Description") |
|
start_btn = gr.Button("Continue", interactive=False) |
|
|
|
with gr.Column(visible=False) as missing_section: |
|
gr.Markdown("## Missing Information") |
|
name_in = gr.Textbox(label="Name", visible=False) |
|
role_in = gr.Textbox(label="Job Role", visible=False) |
|
seniority_in = gr.Dropdown(list(seniority_mapping.keys()), label="Seniority", visible=False) |
|
skills_in = gr.Textbox(label="Skills", visible=False) |
|
submit_btn = gr.Button("Submit", interactive=False) |
|
|
|
with gr.Column(visible=False) as interview_pre_section: |
|
pre_interview_greeting_md = gr.Markdown() |
|
start_interview_final_btn = gr.Button("Start Interview") |
|
loading_status = gr.Markdown("", visible=False) |
|
|
|
with gr.Column(visible=False) as interview_section: |
|
gr.Markdown("## Interview in Progress") |
|
question_audio = gr.Audio(label="Listen", interactive=False, autoplay=True) |
|
question_text = gr.Markdown() |
|
user_audio_input = gr.Audio(sources=["microphone"], type="filepath", label="1. Record Audio Answer") |
|
stt_transcript = gr.Textbox(label="Transcribed Answer (edit if needed)") |
|
confirm_btn = gr.Button("Confirm Answer") |
|
evaluation_display = gr.Markdown() |
|
interview_summary = gr.Markdown(visible=False) |
|
|
|
def validate_start_btn(cv_file, job_desc): |
|
return gr.update(interactive=(cv_file is not None and hasattr(cv_file, "name") and bool(job_desc and job_desc.strip()))) |
|
|
|
cv_file.change(validate_start_btn, [cv_file, job_desc], start_btn) |
|
job_desc.change(validate_start_btn, [cv_file, job_desc], start_btn) |
|
|
|
def process_and_route_initial(cv_file, job_desc): |
|
details = extract_candidate_details(cv_file.name) |
|
job_info = extract_job_details(job_desc) |
|
data = { |
|
"name": details.get("name", "unknown"), |
|
"job_role": job_info.get("job_title", "unknown"), |
|
"seniority": job_info.get("experience_level", "unknown"), |
|
"skills": job_info.get("skills", []) |
|
} |
|
missing = [k for k, v in data.items() if (isinstance(v, str) and v.lower() == "unknown") or not v] |
|
if missing: |
|
return data, missing, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False) |
|
else: |
|
greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' when ready." |
|
return data, missing, gr.update(visible=False), gr.update(visible=False), gr.update(visible=True, value=greeting) |
|
|
|
start_btn.click(process_and_route_initial, [cv_file, job_desc], [user_data, missing_fields_state, user_info_section, missing_section, pre_interview_greeting_md]) |
|
|
|
def show_missing(missing): |
|
if missing is None: missing = [] |
|
return gr.update(visible="name" in missing), gr.update(visible="job_role" in missing), gr.update(visible="seniority" in missing), gr.update(visible="skills" in missing) |
|
|
|
missing_fields_state.change(show_missing, missing_fields_state, [name_in, role_in, seniority_in, skills_in]) |
|
|
|
def validate_fields(name, role, seniority, skills, missing): |
|
if not missing: return gr.update(interactive=False) |
|
all_filled = all([(not ("name" in missing) or bool(name.strip())), (not ("job_role" in missing) or bool(role.strip())), (not ("seniority" in missing) or bool(seniority)), (not ("skills" in missing) or bool(skills.strip()))]) |
|
return gr.update(interactive=all_filled) |
|
|
|
for inp in [name_in, role_in, seniority_in, skills_in]: |
|
inp.change(validate_fields, [name_in, role_in, seniority_in, skills_in, missing_fields_state], submit_btn) |
|
|
|
def complete_manual(data, name, role, seniority, skills): |
|
if data["name"].lower() == "unknown": data["name"] = name |
|
if data["job_role"].lower() == "unknown": data["job_role"] = role |
|
if data["seniority"].lower() == "unknown": data["seniority"] = seniority |
|
if not data["skills"]: data["skills"] = [s.strip() for s in skills.split(",")] |
|
greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' to begin." |
|
return data, gr.update(visible=False), gr.update(visible=True), gr.update(value=greeting) |
|
|
|
submit_btn.click(complete_manual, [user_data, name_in, role_in, seniority_in, skills_in], [user_data, missing_section, interview_pre_section, pre_interview_greeting_md]) |
|
|
|
def start_interview(data): |
|
|
|
state = { |
|
"questions": [], |
|
"answers": [], |
|
"timings": [], |
|
"question_evaluations": [], |
|
"answer_evaluations": [], |
|
"conversation_history": [], |
|
"difficulty_adjustment": None, |
|
"question_idx": 0, |
|
"max_questions": 3, |
|
"q_start_time": time.time(), |
|
"log": [] |
|
} |
|
|
|
|
|
context = "" |
|
prompt = build_interview_prompt( |
|
conversation_history=[], |
|
user_response="", |
|
context=context, |
|
job_role=data["job_role"], |
|
skills=data["skills"], |
|
seniority=data["seniority"], |
|
difficulty_adjustment=None, |
|
voice_label="neutral" |
|
) |
|
|
|
|
|
first_q = groq_llm.predict(prompt) |
|
q_eval = { |
|
"Score": "N/A", |
|
"Reasoning": "Skipped to reduce processing time", |
|
"Improvements": [] |
|
} |
|
|
|
state["questions"].append(first_q) |
|
state["question_evaluations"].append(q_eval) |
|
state["conversation_history"].append({'role': 'Interviewer', 'content': first_q}) |
|
|
|
|
|
start = time.perf_counter() |
|
cleaned_text = first_q.strip().replace("\n", " ") |
|
audio_future = tts_async(cleaned_text) |
|
audio_path = audio_future.result() |
|
print("⏱️ TTS (edge-tts) took", round(time.perf_counter() - start, 2), "seconds") |
|
|
|
|
|
state["log"].append({ |
|
"type": "question", |
|
"question": first_q, |
|
"question_eval": q_eval, |
|
"timestamp": time.time() |
|
}) |
|
|
|
return ( |
|
state, |
|
gr.update(visible=False), |
|
gr.update(visible=True), |
|
audio_path, |
|
f"*Question 1:* {first_q}" |
|
) |
|
|
|
|
|
start_interview_final_btn.click( |
|
start_interview, |
|
[user_data], |
|
[interview_state, interview_pre_section, interview_section, question_audio, question_text] |
|
) |
|
|
|
|
|
def transcribe(audio_path): |
|
return whisper_stt(audio_path) |
|
|
|
user_audio_input.change(transcribe, user_audio_input, stt_transcript) |
|
|
|
def process_answer(transcript, audio_path, state, data): |
|
if not transcript: |
|
return state, gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update() |
|
|
|
elapsed = round(time.time() - state.get("q_start_time", time.time()), 2) |
|
state["timings"].append(elapsed) |
|
state["answers"].append(transcript) |
|
state["conversation_history"].append({'role': 'Candidate', 'content': transcript}) |
|
|
|
last_q = state["questions"][-1] |
|
q_eval = state["question_evaluations"][-1] |
|
ref_answer = generate_reference_answer(last_q, data["job_role"], data["seniority"]) |
|
answer_eval = evaluate_answer(last_q, transcript, ref_answer, data["job_role"], data["seniority"], None) |
|
state["answer_evaluations"].append(answer_eval) |
|
answer_score = answer_eval.get("Score", "medium") if answer_eval else "medium" |
|
|
|
if answer_score == "excellent": |
|
state["difficulty_adjustment"] = "harder" |
|
elif answer_score in ("medium", "poor"): |
|
state["difficulty_adjustment"] = "easier" |
|
else: |
|
state["difficulty_adjustment"] = None |
|
|
|
state["log"].append({ |
|
"type": "answer", "question": last_q, "answer": transcript, |
|
"answer_eval": answer_eval, "ref_answer": ref_answer, |
|
"timing": elapsed, "timestamp": time.time() |
|
}) |
|
|
|
qidx = state["question_idx"] + 1 |
|
if qidx >= state["max_questions"]: |
|
timestamp = time.strftime("%Y%m%d_%H%M%S") |
|
log_file = f"interview_log_{timestamp}.json" |
|
with open(log_file, "w", encoding="utf-8") as f: |
|
json.dump(state["log"], f, indent=2, ensure_ascii=False) |
|
summary = "# Interview Summary\n" |
|
for i, q in enumerate(state["questions"]): |
|
summary += (f"\n### Q{i + 1}: {q}\n" |
|
f"- *Answer*: {state['answers'][i]}\n" |
|
f"- *Q Eval*: {state['question_evaluations'][i]}\n" |
|
f"- *A Eval*: {state['answer_evaluations'][i]}\n" |
|
f"- *Time*: {state['timings'][i]}s\n") |
|
summary += f"\n\n⏺ Full log saved as {log_file}." |
|
return state, gr.update(visible=True, value=summary), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(visible=False) |
|
else: |
|
state["question_idx"] = qidx |
|
state["q_start_time"] = time.time() |
|
context = "" |
|
prompt = build_interview_prompt( |
|
conversation_history=state["conversation_history"], |
|
user_response=transcript, context=context, |
|
job_role=data["job_role"], skills=data["skills"], |
|
seniority=data["seniority"], difficulty_adjustment=state["difficulty_adjustment"], |
|
voice_label="neutral" |
|
) |
|
next_q = groq_llm.predict(prompt) |
|
q_eval = eval_question_quality(next_q, data["job_role"], data["seniority"], None) |
|
state["questions"].append(next_q) |
|
state["question_evaluations"].append(q_eval) |
|
state["conversation_history"].append({'role': 'Interviewer', 'content': next_q}) |
|
state["log"].append({"type": "question", "question": next_q, "question_eval": q_eval, "timestamp": time.time()}) |
|
|
|
|
|
audio_future = tts_async(next_q) |
|
|
|
audio_path = audio_future.result() |
|
|
|
eval_md = f"*Last Answer Eval:* {answer_eval}" |
|
return state, gr.update(visible=False), audio_path, f"*Question {qidx + 1}:* {next_q}", gr.update(value=None), gr.update(value=None), gr.update(visible=True, value=eval_md) |
|
|
|
confirm_btn.click( |
|
process_answer, |
|
[stt_transcript, user_audio_input, interview_state, user_data], |
|
[interview_state, interview_summary, question_audio, question_text, user_audio_input, stt_transcript, evaluation_display] |
|
).then( |
|
lambda: (gr.update(value=None), gr.update(value=None)), None, [user_audio_input, stt_transcript] |
|
) |
|
|
|
demo.launch(debug=True) |