import requests import os import json from langchain_groq import ChatGroq from langchain_community.embeddings import HuggingFaceEmbeddings from langchain_community.vectorstores import Qdrant from langchain.prompts import PromptTemplate from langchain.chains import LLMChain from langchain.retrievers import ContextualCompressionRetriever from langchain.retrievers.document_compressors import CohereRerank from qdrant_client import QdrantClient import cohere import json import re import time from collections import defaultdict from qdrant_client.http import models from qdrant_client.models import PointStruct from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.neighbors import NearestNeighbors from transformers import AutoTokenizer #from langchain_huggingface import HuggingFaceEndpoint from langchain_community.embeddings import HuggingFaceEmbeddings import numpy as np import os from dotenv import load_dotenv from enum import Enum import time from inputimeout import inputimeout, TimeoutOccurred # Import Qdrant client and models (adjust based on your environment) from qdrant_client import QdrantClient from qdrant_client.http.models import VectorParams, Distance, Filter, FieldCondition, MatchValue from qdrant_client.http.models import PointStruct, Filter, FieldCondition, MatchValue, SearchRequest import traceback from transformers import pipeline from textwrap import dedent import json import logging from transformers import pipeline,BitsAndBytesConfig import os cohere_api_key = os.getenv("COHERE_API_KEY") chat_groq_api = os.getenv("GROQ_API_KEY") hf_api_key = os.getenv("HF_API_KEY") qdrant_api = os.getenv("QDRANT_API_KEY") qdrant_url = os.getenv("QDRANT_API_URL") print("GROQ API Key:", chat_groq_api) print("QDRANT API Key:", qdrant_api) print("QDRANT API URL:", qdrant_url) print("Cohere API Key:", cohere_api_key) from qdrant_client import QdrantClient qdrant_client = QdrantClient( url="https://313b1ceb-057f-4b7b-89f5-7b19a213fe65.us-east-1-0.aws.cloud.qdrant.io:6333", api_key="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2Nlc3MiOiJtIn0.w13SPZbljbSvt9Ch_0r034QhMFlmEr4ctXqLo2zhxm4", ) print(qdrant_client.get_collections()) class CustomChatGroq: def __init__(self, temperature, model_name, api_key): self.temperature = temperature self.model_name = model_name self.api_key = api_key self.api_url = "https://api.groq.com/openai/v1/chat/completions" def predict(self, prompt): """Send a request to the Groq API and return the generated response.""" try: headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } payload = { "model": self.model_name, "messages": [{"role": "system", "content": "You are an AI interviewer."}, {"role": "user", "content": prompt}], "temperature": self.temperature, "max_tokens": 150 } response = requests.post(self.api_url, headers=headers, json=payload, timeout=10) response.raise_for_status() # Raise an error for HTTP codes 4xx/5xx data = response.json() # Extract response text based on Groq API response format if "choices" in data and len(data["choices"]) > 0: return data["choices"][0]["message"]["content"].strip() logging.warning("Unexpected response structure from Groq API") return "Interviewer: Could you tell me more about your relevant experience?" except requests.exceptions.RequestException as e: logging.error(f"ChatGroq API error: {e}") return "Interviewer: Due to a system issue, let's move on to another question." groq_llm = ChatGroq( temperature=0.7, model_name="llama-3.3-70b-versatile", api_key=chat_groq_api ) from huggingface_hub import login import os HF_TOKEN = os.getenv("HF_TOKEN") if HF_TOKEN: login(HF_TOKEN) else: raise EnvironmentError("Missing HF_TOKEN environment variable.") #Load mistral Model from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline import torch print(torch.cuda.is_available()) MODEL_PATH = "mistralai/Mistral-7B-Instruct-v0.3" #MODEL_PATH = "tiiuae/falcon-rw-1b" bnb_config = BitsAndBytesConfig( load_in_8bit=True, ) mistral_tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH,token=hf_api_key) judge_llm = AutoModelForCausalLM.from_pretrained( MODEL_PATH, quantization_config=bnb_config,torch_dtype=torch.float16, device_map="auto", token=hf_api_key ) judge_llm.config.pad_token_id = judge_llm.config.eos_token_id print(judge_llm.hf_device_map) judge_pipeline = pipeline( "text-generation", model=judge_llm, tokenizer=mistral_tokenizer, max_new_tokens=128, temperature=0.3, top_p=0.9, do_sample=True, # Optional but recommended with temperature/top_p repetition_penalty=1.1, ) output = judge_pipeline("Q: What is Python?\nA:", max_new_tokens=128)[0]['generated_text'] print(output) # embedding model from sentence_transformers import SentenceTransformer class LocalEmbeddings: def __init__(self, model_name="all-MiniLM-L6-v2"): self.model = SentenceTransformer(model_name) def embed_query(self, text): return self.model.encode(text).tolist() def embed_documents(self, documents): return self.model.encode(documents).tolist() embeddings = LocalEmbeddings() # import cohere qdrant_client = QdrantClient(url=qdrant_url, api_key=qdrant_api,check_compatibility=False) co = cohere.Client(api_key=cohere_api_key) class EvaluationScore(str, Enum): POOR = "Poor" MEDIUM = "Medium" GOOD = "Good" EXCELLENT = "Excellent" # Cohere Reranker class CohereReranker: def __init__(self, client): self.client = client def compress_documents(self, documents, query): if not documents: return [] doc_texts = [doc.page_content for doc in documents] try: reranked = self.client.rerank( query=query, documents=doc_texts, model="rerank-english-v2.0", top_n=5 ) return [documents[result.index] for result in reranked.results] except Exception as e: logging.error(f"Error in CohereReranker.compress_documents: {e}") return documents[:5] reranker = CohereReranker(co) def load_data_from_json(file_path): """Load interview Q&A data from a JSON file.""" try: with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) job_role_buckets = defaultdict(list) for idx, item in enumerate(data): try: job_role = item["Job Role"].lower().strip() question = item["Questions"].strip() answer = item["Answers"].strip() job_role_buckets[job_role].append({"question": question, "answer": answer}) except KeyError as e: logging.warning(f"Skipping item {idx}: missing key {e}") return job_role_buckets # <--- You missed this! except Exception as e: logging.error(f"Error loading data: {e}") raise def verify_qdrant_collection(collection_name='interview_questions'): """Verify if a Qdrant collection exists with the correct configuration.""" try: collection_info = qdrant_client.get_collection(collection_name) vector_size = collection_info.config.params.vectors.size logging.info(f"Collection '{collection_name}' exists with vector size: {vector_size}") return True except Exception as e: logging.warning(f"Collection '{collection_name}' not found: {e}") return False def store_data_to_qdrant(data, collection_name='interview_questions', batch_size=100): """Store interview data in the Qdrant vector database.""" try: # Check if collection exists, otherwise create it if not verify_qdrant_collection(collection_name): try: qdrant_client.create_collection( collection_name=collection_name, vectors_config=VectorParams(size=384, distance=Distance.COSINE) ) logging.info(f"Created collection '{collection_name}'") except Exception as e: logging.error(f"Error creating collection: {e}\n{traceback.format_exc()}") return False points = [] point_id = 0 total_points = sum(len(qa_list) for qa_list in data.values()) processed = 0 for job_role, qa_list in data.items(): for entry in qa_list: try: emb = embeddings.embed_query(entry["question"]) print(f"Embedding shape: {len(emb)}") if not emb or len(emb) != 384: logging.warning(f"Skipping point {point_id} due to invalid embedding length: {len(emb)}") continue points.append(PointStruct( id=point_id, vector=emb, payload={ "job_role": job_role, "question": entry["question"], "answer": entry["answer"] } )) point_id += 1 processed += 1 # Batch upload if len(points) >= batch_size: try: qdrant_client.upsert(collection_name=collection_name, points=points) logging.info(f"Stored {processed}/{total_points} points ({processed/total_points*100:.1f}%)") except Exception as upsert_err: logging.error(f"Error during upsert: {upsert_err}\n{traceback.format_exc()}") points = [] except Exception as embed_err: logging.error(f"Embedding error for point {point_id}: {embed_err}\n{traceback.format_exc()}") # Final batch upload if points: try: qdrant_client.upsert(collection_name=collection_name, points=points) logging.info(f"Stored final batch of {len(points)} points") except Exception as final_upsert_err: logging.error(f"Error during final upsert: {final_upsert_err}\n{traceback.format_exc()}") # Final verification try: count = qdrant_client.count(collection_name=collection_name, exact=True).count print("Current count:", count) logging.info(f"✅ Successfully stored {count} points in Qdrant") if count != total_points: logging.warning(f"Expected {total_points} points but stored {count}") except Exception as count_err: logging.error(f"Error verifying stored points: {count_err}\n{traceback.format_exc()}") return True except Exception as e: logging.error(f"Error storing data to Qdrant: {e}\n{traceback.format_exc()}") return False # to ensure cosine similarity use info = qdrant_client.get_collection("interview_questions") print(info.config.params.vectors.distance) def extract_all_roles_from_qdrant(collection_name='interview_questions'): """ Extract all unique job roles from the Qdrant vector store """ try: all_roles = set() scroll_offset = None while True: response = qdrant_client.scroll( collection_name=collection_name, limit=200, offset=scroll_offset, with_payload=True ) points, next_page_offset = response if not points: break for point in points: role = point.payload.get("job_role", "").strip().lower() if role: all_roles.add(role) if not next_page_offset: break scroll_offset = next_page_offset if not all_roles: logging.warning("[Qdrant] No roles found in payloads.") else: logging.info(f"[Qdrant] Extracted {len(all_roles)} unique job roles.") return list(all_roles) except Exception as e: logging.error(f"Error extracting roles from Qdrant: {e}") return [] import numpy as np import logging from sklearn.metrics.pairwise import cosine_similarity def find_similar_roles(user_role, all_roles, top_k=3): """ Find the most similar job roles to the given user_role using embeddings. """ try: # Clean inputs user_role = user_role.strip().lower() if not user_role or not all_roles or not isinstance(all_roles, list): logging.warning("Invalid input for role similarity") return [] # Embed user role try: user_embedding = embeddings.embed_query(user_role) if user_embedding is None: logging.error("User embedding is None") return [] except Exception as e: logging.error(f"Error embedding user role: {type(e).__name__}: {e}") return [] # Embed all roles try: role_embeddings = [] valid_roles = [] for role in all_roles: emb = embeddings.embed_query(role.lower()) if emb is not None: role_embeddings.append(emb) valid_roles.append(role) else: logging.warning(f"Skipping role with no embedding: {role}") except Exception as e: logging.error(f"Error embedding all roles: {type(e).__name__}: {e}") return [] if not role_embeddings: logging.error("All role embeddings failed") return [] # Compute similarities similarities = cosine_similarity([user_embedding], role_embeddings)[0] top_indices = np.argsort(similarities)[::-1][:top_k] similar_roles = [valid_roles[i] for i in top_indices] logging.debug(f"Similar roles to '{user_role}': {similar_roles}") return similar_roles except Exception as e: logging.error(f"Error finding similar roles: {type(e).__name__}: {e}", exc_info=True) return [] # RETREIVE ALL DATA RELATED TO THE JOB ROLE NOT JUST TOP_K def get_role_questions(job_role): try: if not job_role: logging.warning("Job role is empty.") return [] filter_by_role = Filter( must=[FieldCondition( key="job_role", match=MatchValue(value=job_role.lower()) )] ) all_results = [] offset = None while True: results, next_page_offset = qdrant_client.scroll( collection_name="interview_questions", scroll_filter=filter_by_role, with_payload=True, with_vectors=False, limit=100, # batch size offset=offset ) all_results.extend(results) if not next_page_offset: break offset = next_page_offset parsed_results = [{ "question": r.payload.get("question"), "answer": r.payload.get("answer"), "job_role": r.payload.get("job_role") } for r in all_results] return parsed_results except Exception as e: logging.error(f"Error fetching role questions: {type(e).__name__}: {e}", exc_info=True) return [] def retrieve_interview_data(job_role, all_roles): """ Retrieve all interview Q&A for a given job role. Falls back to similar roles if no data found. Args: job_role (str): Input job role (can be misspelled) all_roles (list): Full list of available job roles Returns: list: List of QA dicts with keys: 'question', 'answer', 'job_role' """ import logging logging.basicConfig(level=logging.INFO) job_role = job_role.strip().lower() seen_questions = set() final_results = [] # Step 1: Try exact match (fetch all questions for role) logging.info(f"Trying to fetch all data for exact role: '{job_role}'") exact_matches = get_role_questions(job_role) for qa in exact_matches: question = qa["question"] if question and question not in seen_questions: seen_questions.add(question) final_results.append(qa) if final_results: logging.info(f"Found {len(final_results)} QA pairs for exact role '{job_role}'") return final_results logging.warning(f"No data found for role '{job_role}'. Trying similar roles...") # Step 2: No matches — find similar roles similar_roles = find_similar_roles(job_role, all_roles, top_k=3) if not similar_roles: logging.warning("No similar roles found.") return [] logging.info(f"Found similar roles: {similar_roles}") # Step 3: Retrieve data for each similar role (all questions) for role in similar_roles: logging.info(f"Fetching data for similar role: '{role}'") role_qa = get_role_questions(role) for qa in role_qa: question = qa["question"] if question and question not in seen_questions: seen_questions.add(question) final_results.append(qa) logging.info(f"Retrieved total {len(final_results)} QA pairs from similar roles") return final_results import random def random_context_chunks(retrieved_data, k=3): chunks = random.sample(retrieved_data, k) return "\n\n".join([f"Q: {item['question']}\nA: {item['answer']}" for item in chunks]) import json import logging import re from typing import Dict def eval_question_quality( question: str, job_role: str, seniority: str, judge_pipeline=None, max_retries=1 # Allow at least 1 retry on parse fail ) -> Dict[str, str]: import time try: # Use provided pipeline or fall back to global if judge_pipeline is None: judge_pipeline = globals().get("judge_pipeline") if not judge_pipeline: return { "Score": "Error", "Reasoning": "Judge pipeline not available", "Improvements": "Please provide a valid language model pipeline" } prompt = f""" ... (same as your prompt) ... Now evaluate this question: \"{question}\" """ for attempt in range(max_retries + 1): response = judge_pipeline( prompt, max_new_tokens=512, do_sample=False, temperature=0.1, repetition_penalty=1.2 )[0]["generated_text"] try: # Fallback to last {...} block match = re.search(r'\{.*\}', response, re.DOTALL) if not match: raise ValueError("Could not locate JSON structure in model output.") json_str = match.group(0) result = json.loads(json_str) # Validate required fields and values required_keys = ["Score", "Reasoning", "Improvements"] valid_scores = {"Poor", "Medium", "Good", "Excellent"} if not all(k in result for k in required_keys): raise ValueError("Missing required fields.") if result["Score"] not in valid_scores: raise ValueError("Invalid score value.") return result except Exception as e: logging.warning(f"Attempt {attempt+1} JSON parsing failed: {e}") time.sleep(0.2) # Small delay before retry # If all attempts fail, return a default valid dict return { "Score": "Poor", "Reasoning": "The evaluation model failed to produce a valid score, so defaulted to 'Poor'. Check model output and prompt formatting.", "Improvements": [ "Ensure the question is clear and role-relevant.", "Double-check prompt and formatting.", "Try rephrasing the question to match rubric." ] } except Exception as e: logging.error(f"Error in eval_question_quality: {type(e).__name__}: {e}", exc_info=True) return { "Score": "Poor", "Reasoning": f"Critical error occurred: {str(e)}. Defaulted to 'Poor'.", "Improvements": [ "Retry with a different question.", "Check your judge pipeline connection.", "Contact support if this persists." ] } def evaluate_answer( question: str, answer: str, ref_answer: str, job_role: str, seniority: str, judge_pipeline=None, max_retries=1 ) -> Dict[str, str]: """ Evaluates a candidate's answer to an interview question and returns a structured judgment. Guarantees a valid, actionable result even if the model fails. """ import time try: if judge_pipeline is None: judge_pipeline = globals().get("judge_pipeline") if not judge_pipeline: return { "Score": "Error", "Reasoning": "Judge pipeline not available", "Improvements": [ "Please provide a valid language model pipeline" ] } # Enhanced prompt (your version) prompt = f""" You are an expert technical interviewer evaluating a candidate's response for a {job_role} position at the {seniority} level. You are provided with: - The question asked - The candidate's response - A reference answer that represents a high-quality expected answer Evaluate the candidate's response based on: - Technical correctness - Clarity and depth of explanation - Relevance to the job role and seniority - Completeness and structure Be objective, concise, and use professional language. Be fair but critical. -------------------------- Question: {question} Candidate Answer: {answer} Reference Answer: {ref_answer} -------------------------- Now return your evaluation as a valid JSON object using exactly these keys: - "Score": One of ["Poor", "Medium", "Good", "Excellent"] - "Reasoning": 2-3 sentence explanation justifying the score, covering clarity, accuracy, completeness, or relevance - "Improvements": A list of 2-3 specific and constructive suggestions to help the candidate improve this answer Example: {{ "Score": "Good", "Reasoning": "The answer demonstrates a good understanding of the concept and touches on key ideas, but lacks depth in explaining the trade-offs between techniques.", "Improvements": [ "Explain when this method might fail or produce biased results", "Include examples or metrics to support the explanation", "Clarify the specific business impact or outcome achieved" ] }} Respond only with the JSON: """ for attempt in range(max_retries + 1): output = judge_pipeline( prompt, max_new_tokens=512, temperature=0.3, do_sample=False )[0]["generated_text"] # Try to extract JSON response from output robustly try: start_idx = output.rfind("{") end_idx = output.rfind("}") + 1 if start_idx != -1 and end_idx != -1 and end_idx > start_idx: json_str = output[start_idx:end_idx] result = json.loads(json_str) valid_scores = {"Poor", "Medium", "Good", "Excellent"} if result.get("Score") in valid_scores: return { "Score": result["Score"], "Reasoning": result.get("Reasoning", "No explanation provided."), "Improvements": result.get("Improvements", ["No improvement suggestions provided."]) } else: raise ValueError(f"Invalid Score value: {result.get('Score')}") else: raise ValueError("JSON format not found in output") except Exception as e: logging.warning(f"evaluate_answer: Attempt {attempt+1} failed to parse model output: {e}") time.sleep(0.2) # Small wait before retry # Fallback: always return a default 'Poor' score if all attempts fail return { "Score": "Poor", "Reasoning": "The evaluation model failed to produce a valid score or parse output; defaulted to 'Poor'. Please check model output and prompt formatting.", "Improvements": [ "Be more specific and detailed in the answer.", "Structure your response with clear points.", "Relate your answer more closely to the job role and question." ] } except Exception as e: logging.error(f"Evaluation failed: {e}", exc_info=True) return { "Score": "Poor", "Reasoning": f"Critical error occurred: {str(e)}. Defaulted to 'Poor'.", "Improvements": [ "Try again with a different answer.", "Check your judge pipeline connection.", "Contact support if the error persists." ] } # SAME BUT USING LLAMA 3.3 FROM GROQ def generate_reference_answer(question, job_role, seniority): """ Generates a high-quality reference answer using Groq-hosted LLaMA model. Args: question (str): Interview question to answer. job_role (str): Target job role (e.g., "Frontend Developer"). seniority (str): Experience level (e.g., "Mid-Level"). Returns: str: Clean, generated reference answer or error message. """ try: # Clean, role-specific prompt prompt = f"""You are a {seniority} {job_role}. Q: {question} A:""" # Use Groq-hosted model to generate the answer ref_answer = groq_llm.predict(prompt) if not ref_answer.strip(): return "Reference answer not generated." return ref_answer.strip() except Exception as e: logging.error(f"Error generating reference answer: {e}", exc_info=True) return "Unable to generate reference answer due to an error" def build_interview_prompt(conversation_history, user_response, context, job_role, skills, seniority, difficulty_adjustment=None, voice_label=None, face_label=None, effective_confidence=None): """Build a prompt for generating the next interview question with adaptive difficulty and fairness logic.""" interview_template = """ You are an AI interviewer conducting a real-time interview for a {job_role} position. Your objective is to thoroughly evaluate the candidate's suitability for the role using smart, structured, and adaptive questioning. --- Interview Rules and Principles: - The **baseline difficulty** of questions must match the candidate’s seniority level (e.g., junior, mid-level, senior). - Use your judgment to increase difficulty **slightly** if the candidate performs well, or simplify if they struggle — but never drop below the expected baseline for their level. - Avoid asking extremely difficult questions to junior candidates unless they’ve clearly demonstrated advanced knowledge. - Be fair: candidates for the same role should be evaluated within a consistent difficulty range. - Adapt your line of questioning gradually and logically based on the **overall flow**, not just the last answer. - Include real-world problem-solving scenarios to test how the candidate thinks and behaves practically. - You must **lead** the interview and make intelligent decisions about what to ask next. --- Context Use: {context_instruction} Note: If no relevant context was retrieved or the previous answer is unclear, you must still generate a thoughtful interview question using your own knowledge. Do not skip generation. Avoid default or fallback responses — always try to generate a meaningful and fair next question. --- Job Role: {job_role} Seniority Level: {seniority} Skills Focus: {skills} Difficulty Setting: {difficulty} (based on {difficulty_adjustment}) --- Recent Conversation History: {history} Candidate's Last Response: "{user_response}" Evaluation of Last Response: {response_evaluation} Voice Tone: {voice_label} --- --- Important: If no relevant context was retrieved or the previous answer is unclear or off-topic, you must still generate a meaningful and fair interview question using your own knowledge and best practices. Do not skip question generation or fall back to default/filler responses. --- Guidelines for Next Question: - If this is the beginning of the interview, start with a question about the candidate’s background or experience. - Base the difficulty primarily on the seniority level, with light adjustment from recent performance. - Focus on core skills, real-world applications, and depth of reasoning. - Ask only one question. Be clear and concise. Generate the next interview question now: """ # Calculate difficulty phrase if difficulty_adjustment == "harder": difficulty = f"slightly more challenging than typical for {seniority}" elif difficulty_adjustment == "easier": difficulty = f"slightly easier than typical for {seniority}" else: difficulty = f"appropriate for {seniority}" # Choose context logic if context.strip(): context_instruction = ( "Use both your own expertise and the provided context from relevant interview datasets. " "You can either build on questions from the dataset or generate your own." ) context = context.strip() else: context_instruction = ( "No specific context retrieved. Use your own knowledge and best practices to craft a question." ) context = "" # Let it be actually empty! # Format conversation history (last 6 exchanges max) recent_history = conversation_history[-6:] if len(conversation_history) > 6 else conversation_history formatted_history = "\n".join([f"{msg['role'].capitalize()}: {msg['content']}" for msg in recent_history]) # Add evaluation summary if available if conversation_history and conversation_history[-1].get("evaluation"): eval_data = conversation_history[-1]["evaluation"][-1] response_evaluation = f""" - Score: {eval_data.get('Score', 'N/A')} - Reasoning: {eval_data.get('Reasoning', 'N/A')} - Improvements: {eval_data.get('Improvements', 'N/A')} """ else: response_evaluation = "No evaluation available yet." # Fill the template prompt = interview_template.format( job_role=job_role, seniority=seniority, skills=skills, difficulty=difficulty, difficulty_adjustment=difficulty_adjustment if difficulty_adjustment else "default seniority", context_instruction=context_instruction, context=context, history=formatted_history, user_response=user_response, response_evaluation=response_evaluation.strip(), voice_label=voice_label or "unknown", ) return prompt def generate_llm_interview_report( interview_state, logged_samples, job_role, seniority ): from collections import Counter # Helper for converting score to 1–5 def score_label(label): mapping = { "confident": 5, "calm": 4, "neutral": 3, "nervous": 2, "anxious": 1, "unknown": 3 } return mapping.get(label.lower(), 3) def section_score(vals): return round(sum(vals)/len(vals), 2) if vals else "N/A" # Aggregate info scores, voice_conf, face_conf, comm_scores = [], [], [], [] tech_details, comm_details, emotion_details, relevance_details, problem_details = [], [], [], [], [] for entry in logged_samples: answer_eval = entry.get("answer_evaluation", {}) score = answer_eval.get("Score", "Not Evaluated") reasoning = answer_eval.get("Reasoning", "") if score.lower() in ["excellent", "good", "medium", "poor"]: score_map = {"excellent": 5, "good": 4, "medium": 3, "poor": 2} scores.append(score_map[score.lower()]) # Section details tech_details.append(reasoning) comm_details.append(reasoning) # Emotions/confidence voice_conf.append(score_label(entry.get("voice_label", "unknown"))) face_conf.append(score_label(entry.get("face_label", "unknown"))) # Communication estimate if entry["user_answer"]: length = len(entry["user_answer"].split()) comm_score = min(5, max(2, length // 30)) comm_scores.append(comm_score) # Compute averages for sections avg_problem = section_score(scores) avg_tech = section_score(scores) avg_comm = section_score(comm_scores) avg_emotion = section_score([(v+f)/2 for v, f in zip(voice_conf, face_conf)]) # Compute decision heuristics section_averages = [avg_problem, avg_tech, avg_comm, avg_emotion] numeric_avgs = [v for v in section_averages if isinstance(v, (float, int))] avg_overall = round(sum(numeric_avgs) / len(numeric_avgs), 2) if numeric_avgs else 0 # Hiring logic (you can customize thresholds) if avg_overall >= 4.5: verdict = "Strong Hire" elif avg_overall >= 4.0: verdict = "Hire" elif avg_overall >= 3.0: verdict = "Conditional Hire" else: verdict = "No Hire" # Build LLM report prompt transcript = "\n\n".join([ f"Q: {e['generated_question']}\nA: {e['user_answer']}\nScore: {e.get('answer_evaluation',{}).get('Score','')}\nReasoning: {e.get('answer_evaluation',{}).get('Reasoning','')}" for e in logged_samples ]) prompt = f""" You are a senior technical interviewer at a major tech company. Write a structured, realistic hiring report for this {seniority} {job_role} interview, using these section scores (scale 1–5, with 5 best): Section-wise Evaluation 1. *Problem Solving & Critical Thinking*: {avg_problem} 2. *Technical Depth & Knowledge*: {avg_tech} 3. *Communication & Clarity*: {avg_comm} 4. *Emotional Composure & Confidence*: {avg_emotion} 5. *Role Relevance*: 5 *Transcript* {transcript} Your report should have the following sections: 1. *Executive Summary* (realistic, hiring-committee style) 2. *Section-wise Comments* (for each numbered category above, with short paragraph citing specifics) 3. *Strengths & Weaknesses* (list at least 2 for each) 4. *Final Verdict*: {verdict} 5. *Recommendations* (2–3 for future improvement) Use realistic language. If some sections are N/A or lower than others, comment honestly. Interview Report: """ # LLM call, or just return prompt for review return groq_llm.predict(prompt) def get_user_info(): """ Collects essential information from the candidate before starting the interview. Returns a dictionary with keys: name, job_role, seniority, skills """ import logging logging.info("Collecting user information...") print("Welcome to the AI Interview Simulator!") print("Let’s set up your mock interview.\n") # Get user name name = input("What is your name? ").strip() while not name: print("Please enter your name.") name = input("What is your name? ").strip() # Get job role job_role = input(f"Hi {name}, what job role are you preparing for? (e.g. Frontend Developer) ").strip() while not job_role: print("Please specify the job role.") job_role = input("What job role are you preparing for? ").strip() # Get seniority level seniority_options = ["Entry-level", "Junior", "Mid-Level", "Senior", "Lead"] print("\nSelect your experience level:") for i, option in enumerate(seniority_options, 1): print(f"{i}. {option}") seniority_choice = None while seniority_choice not in range(1, len(seniority_options)+1): try: seniority_choice = int(input("Enter the number corresponding to your level: ")) except ValueError: print(f"Please enter a number between 1 and {len(seniority_options)}") seniority = seniority_options[seniority_choice - 1] # Get skills skills_input = input(f"\nWhat are your top skills relevant to {job_role}? (Separate with commas): ") skills = [skill.strip() for skill in skills_input.split(",") if skill.strip()] while not skills: print("Please enter at least one skill.") skills_input = input("Your top skills (comma-separated): ") skills = [skill.strip() for skill in skills_input.split(",") if skill.strip()] # Confirm collected info print("\n Interview Setup Complete!") print(f"Name: {name}") print(f"Job Role: {job_role}") print(f"Experience Level: {seniority}") print(f"Skills: {', '.join(skills)}") print("\nStarting your mock interview...\n") return { "name": name, "job_role": job_role, "seniority": seniority, "skills": skills } import threading def wait_for_user_response(timeout=200): """Wait for user input with timeout. Returns '' if no response.""" user_input = [] def get_input(): answer = input("Your Answer (within timeout): ").strip() user_input.append(answer) thread = threading.Thread(target=get_input) thread.start() thread.join(timeout) return user_input[0] if user_input else "" import json from datetime import datetime from time import time import random def interview_loop(max_questions, timeout_seconds=300, collection_name="interview_questions", judge_pipeline=None, save_path="interview_log.json"): user_info = get_user_info() job_role = user_info['job_role'] seniority = user_info['seniority'] skills = user_info['skills'] all_roles = extract_all_roles_from_qdrant(collection_name=collection_name) retrieved_data = retrieve_interview_data(job_role, all_roles) context_data = random_context_chunks(retrieved_data, k=4) conversation_history = [] interview_state = { "questions": [], "user_answer": [], "job_role": job_role, "seniority": seniority, "start_time": time() } # Store log for evaluation logged_samples = [] difficulty_adjustment = None for i in range(max_questions): last_user_response = conversation_history[-1]['content'] if conversation_history else "" # Generate question prompt prompt = build_interview_prompt( conversation_history=conversation_history, user_response=last_user_response, context=context_data, job_role=job_role, skills=skills, seniority=seniority, difficulty_adjustment=difficulty_adjustment ) question = groq_llm.predict(prompt) question_eval = eval_question_quality(question, job_role, seniority, judge_pipeline) conversation_history.append({'role': "Interviewer", "content": question}) print(f"Interviewer: Q{i + 1} : {question}") # Wait for user answer start_time = time() user_answer = wait_for_user_response(timeout=timeout_seconds) response_time = time() - start_time skipped = False answer_eval = None ref_answer = None if not user_answer: print("No Response Received, moving to next question.") user_answer = None skipped = True difficulty_adjustment = "medium" else: conversation_history.append({"role": "Candidate", "content": user_answer}) ref_answer = generate_reference_answer(question, job_role, seniority) answer_eval = evaluate_answer( question=question, answer=user_answer, ref_answer=ref_answer, job_role=job_role, seniority=seniority, judge_pipeline=judge_pipeline ) interview_state["user_answer"].append(user_answer) # Append inline evaluation for history conversation_history[-1].setdefault('evaluation', []).append({ "technical_depth": { "score": answer_eval['Score'], "Reasoning": answer_eval['Reasoning'] } }) # Adjust difficulty score = answer_eval['Score'].lower() if score == "excellent": difficulty_adjustment = "harder" elif score in ['poor', 'medium']: difficulty_adjustment = "easier" else: difficulty_adjustment = None # Store for local logging logged_samples.append({ "job_role": job_role, "seniority": seniority, "skills": skills, "context": context_data, "prompt": prompt, "generated_question": question, "question_evaluation": question_eval, "user_answer": user_answer, "reference_answer": ref_answer, "answer_evaluation": answer_eval, "skipped": skipped }) # Store state interview_state['questions'].append({ "question": question, "question_evaluation": question_eval, "user_answer": user_answer, "answer_evaluation": answer_eval, "skipped": skipped }) interview_state['end_time'] = time() report = generate_llm_interview_report(interview_state, job_role, seniority) print("Report : _____________________\n") print(report) print('______________________________________________') # Save full interview logs to JSON timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{save_path.replace('.json', '')}_{timestamp}.json" with open(filename, "w", encoding="utf-8") as f: json.dump(logged_samples, f, indent=2, ensure_ascii=False) print(f" Interview log saved to {filename}") print("____________________________________\n") print(f"interview state : {interview_state}") return interview_state, report from sklearn.metrics import precision_score, recall_score, f1_score import numpy as np # build ground truth for retrieving data for testing def build_ground_truth(all_roles): gt = {} for role in all_roles: qa_list = get_role_questions(role) gt[role] = set(q["question"] for q in qa_list if q["question"]) return gt def evaluate_retrieval(job_role, all_roles, k=10): """ Evaluate retrieval quality using Precision@k, Recall@k, and F1@k. Args: job_role (str): The input job role to search for. all_roles (list): List of all available job roles in the system. k (int): Top-k retrieved questions to evaluate. Returns: dict: Evaluation metrics including precision, recall, and f1. """ # Step 1: Ground Truth (all exact questions stored for this role) ground_truth_qs = set( q["question"].strip() for q in get_role_questions(job_role) if q.get("question") ) if not ground_truth_qs: print(f"[!] No ground truth found for role: {job_role}") return {} # Step 2: Retrieved Questions (may include fallback roles) retrieved_qas = retrieve_interview_data(job_role, all_roles) retrieved_qs = [q["question"].strip() for q in retrieved_qas if q.get("question")] # Step 3: Take top-k retrieved (you can also do full if needed) retrieved_top_k = retrieved_qs[:k] # Step 4: Binary relevance (1 if in ground truth, 0 if not) y_true = [1 if q in ground_truth_qs else 0 for q in retrieved_top_k] y_pred = [1] * len(y_true) # all retrieved are treated as predicted relevant precision = precision_score(y_true, y_pred, zero_division=0) recall = recall_score(y_true, y_pred, zero_division=0) f1 = f1_score(y_true, y_pred, zero_division=0) print(f" Retrieval Evaluation for role: '{job_role}' (Top-{k})") print(f"Precision@{k}: {precision:.2f}") print(f"Recall@{k}: {recall:.2f}") print(f"F1@{k}: {f1:.2f}") print(f"Relevant Retrieved: {sum(y_true)}/{len(y_true)}") print("–" * 40) return { "job_role": job_role, "precision": precision, "recall": recall, "f1": f1, "relevant_retrieved": sum(y_true), "total_retrieved": len(y_true), "ground_truth_count": len(ground_truth_qs), } k_values = [5, 10, 20] all_roles = extract_all_roles_from_qdrant(collection_name="interview_questions") results = [] for k in k_values: for role in all_roles: metrics = evaluate_retrieval(role, all_roles, k=k) if metrics: # only if we found ground truth metrics["k"] = k results.append(metrics) import pandas as pd df = pd.DataFrame(results) summary = df.groupby("k")[["precision", "recall", "f1"]].mean().round(3) print(summary) def extract_job_details(job_description): """Extract job details such as title, skills, experience level, and years of experience from the job description.""" title_match = re.search(r"(?i)(?:seeking|hiring) a (.+?) to", job_description) job_title = title_match.group(1) if title_match else "Unknown" skills_match = re.findall(r"(?i)(?:Proficiency in|Experience with|Knowledge of) (.+?)(?:,|\.| and| or)", job_description) skills = list(set([skill.strip() for skill in skills_match])) if skills_match else [] experience_match = re.search(r"(\d+)\+? years of experience", job_description) if experience_match: years_experience = int(experience_match.group(1)) experience_level = "Senior" if years_experience >= 5 else "Mid" if years_experience >= 3 else "Junior" else: years_experience = None experience_level = "Unknown" return { "job_title": job_title, "skills": skills, "experience_level": experience_level, "years_experience": years_experience } import re from docx import Document import textract from PyPDF2 import PdfReader JOB_TITLES = [ "Accountant", "Data Scientist", "Machine Learning Engineer", "Software Engineer", "Developer", "Analyst", "Researcher", "Intern", "Consultant", "Manager", "Engineer", "Specialist", "Project Manager", "Product Manager", "Administrator", "Director", "Officer", "Assistant", "Coordinator", "Supervisor" ] def clean_filename_name(filename): # Remove file extension base = re.sub(r"\.[^.]+$", "", filename) base = base.strip() # Remove 'cv' or 'CV' words base_clean = re.sub(r"\bcv\b", "", base, flags=re.IGNORECASE).strip() # If after removing CV it's empty, return None if not base_clean: return None # If it contains any digit, return None (unreliable) if re.search(r"\d", base_clean): return None # Replace underscores/dashes with spaces, capitalize base_clean = base_clean.replace("_", " ").replace("-", " ") return base_clean.title() def looks_like_job_title(line): for title in JOB_TITLES: pattern = r"\b" + re.escape(title.lower()) + r"\b" if re.search(pattern, line.lower()): return True return False def extract_name_from_text(lines): # Try first 3 lines for a name, skipping job titles for i in range(min(1, len(lines))): line = lines[i].strip() if looks_like_job_title(line): return "unknown" if re.search(r"\d", line): # skip lines with digits continue if len(line.split()) > 4 or len(line) > 40: # too long or many words continue # If line has only uppercase words, it's probably not a name if line.isupper(): continue # Passed checks, return title-cased line as name return line.title() return None def extract_text_from_file(file_path): if file_path.endswith('.pdf'): reader = PdfReader(file_path) text = "\n".join(page.extract_text() or '' for page in reader.pages) elif file_path.endswith('.docx'): doc = Document(file_path) text = "\n".join([para.text for para in doc.paragraphs]) else: # For .doc or fallback text = textract.process(file_path).decode('utf-8') return text.strip() def extract_candidate_details(file_path): text = extract_text_from_file(file_path) lines = [line.strip() for line in text.splitlines() if line.strip()] # Extract name filename = file_path.split("/")[-1] # just filename, no path name = clean_filename_name(filename) if not name: name = extract_name_from_text(lines) if not name: name = "Unknown" # Extract skills (basic version) skills = [] skills_section = re.search(r"Skills\s*[:\-]?\s*(.+)", text, re.IGNORECASE) if skills_section: raw_skills = skills_section.group(1) skills = [s.strip() for s in re.split(r",|\n|•|-", raw_skills) if s.strip()] return { "name": name, "skills": skills } # import gradio as gr # import time # import tempfile # import numpy as np # import scipy.io.wavfile as wavfile # import os # import json # from transformers import BarkModel, AutoProcessor # import torch, gc # import whisper # from transformers import Wav2Vec2Processor, Wav2Vec2ForSequenceClassification # import librosa # import torch # print(torch.cuda.is_available()) # ✅ Tells you if GPU is available # torch.cuda.empty_cache() # gc.collect() # # Bark TTS # print("🔁 Loading Bark model...") # model_bark = BarkModel.from_pretrained("suno/bark").to("cuda" if torch.cuda.is_available() else "cpu") # print("✅ Bark model loaded") # print("🔁 Loading Bark processor...") # processor_bark = AutoProcessor.from_pretrained("suno/bark") # print("✅ Bark processor loaded") # bark_voice_preset = "v2/en_speaker_5" # def bark_tts(text): # print(f"🔁 Synthesizing TTS for: {text}") # # Process the text # inputs = processor_bark(text, return_tensors="pt", voice_preset=bark_voice_preset) # # Move tensors to device # input_ids = inputs["input_ids"].to(model_bark.device) # start = time.time() # # Generate speech with only the required parameters # with torch.no_grad(): # speech_values = model_bark.generate( # input_ids=input_ids, # do_sample=True, # fine_temperature=0.4, # coarse_temperature=0.8 # ) # print(f"✅ Bark finished in {round(time.time() - start, 2)}s") # # Convert to audio # speech = speech_values.cpu().numpy().squeeze() # speech = (speech * 32767).astype(np.int16) # temp_wav = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") # wavfile.write(temp_wav.name, 22050, speech) # return temp_wav.name # # Whisper STT # print("🔁 Loading Whisper model...") # whisper_model = whisper.load_model("base", device="cuda") # print("✅ Whisper model loaded") # def whisper_stt(audio_path): # if not audio_path or not os.path.exists(audio_path): return "" # result = whisper_model.transcribe(audio_path) # return result["text"] # seniority_mapping = { # "Entry-level": 1, "Junior": 2, "Mid-Level": 3, "Senior": 4, "Lead": 5 # } # # --- 2. Gradio App --- # with gr.Blocks(theme=gr.themes.Soft()) as demo: # user_data = gr.State({}) # interview_state = gr.State({}) # missing_fields_state = gr.State([]) # # --- UI Layout --- # with gr.Column(visible=True) as user_info_section: # gr.Markdown("## Candidate Information") # cv_file = gr.File(label="Upload CV") # job_desc = gr.Textbox(label="Job Description") # start_btn = gr.Button("Continue", interactive=False) # with gr.Column(visible=False) as missing_section: # gr.Markdown("## Missing Information") # name_in = gr.Textbox(label="Name", visible=False) # role_in = gr.Textbox(label="Job Role", visible=False) # seniority_in = gr.Dropdown(list(seniority_mapping.keys()), label="Seniority", visible=False) # skills_in = gr.Textbox(label="Skills", visible=False) # submit_btn = gr.Button("Submit", interactive=False) # with gr.Column(visible=False) as interview_pre_section: # pre_interview_greeting_md = gr.Markdown() # start_interview_final_btn = gr.Button("Start Interview") # with gr.Column(visible=False) as interview_section: # gr.Markdown("## Interview in Progress") # question_audio = gr.Audio(label="Listen", interactive=False, autoplay=True) # question_text = gr.Markdown() # user_audio_input = gr.Audio(sources=["microphone"], type="filepath", label="1. Record Audio Answer") # stt_transcript = gr.Textbox(label="Transcribed Answer (edit if needed)") # confirm_btn = gr.Button("Confirm Answer") # evaluation_display = gr.Markdown() # interview_summary = gr.Markdown(visible=False) # # --- UI Logic --- # def validate_start_btn(cv_file, job_desc): # return gr.update(interactive=(cv_file is not None and hasattr(cv_file, "name") and bool(job_desc and job_desc.strip()))) # cv_file.change(validate_start_btn, [cv_file, job_desc], start_btn) # job_desc.change(validate_start_btn, [cv_file, job_desc], start_btn) # def process_and_route_initial(cv_file, job_desc): # details = extract_candidate_details(cv_file.name) # job_info = extract_job_details(job_desc) # data = { # "name": details.get("name", "unknown"), "job_role": job_info.get("job_title", "unknown"), # "seniority": job_info.get("experience_level", "unknown"), "skills": job_info.get("skills", []) # } # missing = [k for k, v in data.items() if (isinstance(v, str) and v.lower() == "unknown") or not v] # if missing: # return data, missing, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False) # else: # greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' when ready." # return data, missing, gr.update(visible=False), gr.update(visible=False), gr.update(visible=True, value=greeting) # start_btn.click( # process_and_route_initial, # [cv_file, job_desc], # [user_data, missing_fields_state, user_info_section, missing_section, pre_interview_greeting_md] # ) # def show_missing(missing): # if missing is None: missing = [] # return gr.update(visible="name" in missing), gr.update(visible="job_role" in missing), gr.update(visible="seniority" in missing), gr.update(visible="skills" in missing) # missing_fields_state.change(show_missing, missing_fields_state, [name_in, role_in, seniority_in, skills_in]) # def validate_fields(name, role, seniority, skills, missing): # if not missing: return gr.update(interactive=False) # all_filled = all([(not ("name" in missing) or bool(name.strip())), (not ("job_role" in missing) or bool(role.strip())), (not ("seniority" in missing) or bool(seniority)), (not ("skills" in missing) or bool(skills.strip())),]) # return gr.update(interactive=all_filled) # for inp in [name_in, role_in, seniority_in, skills_in]: # inp.change(validate_fields, [name_in, role_in, seniority_in, skills_in, missing_fields_state], submit_btn) # def complete_manual(data, name, role, seniority, skills): # if data["name"].lower() == "unknown": data["name"] = name # if data["job_role"].lower() == "unknown": data["job_role"] = role # if data["seniority"].lower() == "unknown": data["seniority"] = seniority # if not data["skills"]: data["skills"] = [s.strip() for s in skills.split(",")] # greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' to begin." # return data, gr.update(visible=False), gr.update(visible=True), gr.update(value=greeting) # submit_btn.click(complete_manual, [user_data, name_in, role_in, seniority_in, skills_in], [user_data, missing_section, interview_pre_section, pre_interview_greeting_md]) # def start_interview(data): # # --- Advanced state with full logging --- # state = { # "questions": [], "answers": [], "face_labels": [], "voice_labels": [], "timings": [], # "question_evaluations": [], "answer_evaluations": [], "effective_confidences": [], # "conversation_history": [], # "difficulty_adjustment": None, # "question_idx": 0, "max_questions": 3, "q_start_time": time.time(), # "log": [] # } # # --- Optionally: context retrieval here (currently just blank) --- # context = "" # prompt = build_interview_prompt( # conversation_history=[], user_response="", context=context, job_role=data["job_role"], # skills=data["skills"], seniority=data["seniority"], difficulty_adjustment=None, # voice_label="neutral", face_label="neutral" # ) # #here the original one # # first_q = groq_llm.predict(prompt) # # # Evaluate Q for quality # # q_eval = eval_question_quality(first_q, data["job_role"], data["seniority"], None) # # state["questions"].append(first_q) # # state["question_evaluations"].append(q_eval) # #here the testing one # first_q = groq_llm.predict(prompt) # q_eval = { # "Score": "N/A", # "Reasoning": "Skipped to reduce processing time", # "Improvements": [] # } # state["questions"].append(first_q) # state["question_evaluations"].append(q_eval) # state["conversation_history"].append({'role': 'Interviewer', 'content': first_q}) # start = time.perf_counter() # audio_path = bark_tts(first_q) # print("⏱️ Bark TTS took", time.perf_counter() - start, "seconds") # # LOG # state["log"].append({"type": "question", "question": first_q, "question_eval": q_eval, "timestamp": time.time()}) # return state, gr.update(visible=False), gr.update(visible=True), audio_path, f"*Question 1:* {first_q}" # start_interview_final_btn.click(start_interview, [user_data], [interview_state, interview_pre_section, interview_section, question_audio, question_text]) # def transcribe(audio_path): # return whisper_stt(audio_path) # user_audio_input.change(transcribe, user_audio_input, stt_transcript) # def process_answer(transcript, audio_path, state, data): # if not transcript: # return state, gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update() # elapsed = round(time.time() - state.get("q_start_time", time.time()), 2) # state["timings"].append(elapsed) # state["answers"].append(transcript) # state["conversation_history"].append({'role': 'Candidate', 'content': transcript}) # # --- 1. Emotion analysis (simplified for testing) --- # voice_label = "neutral" # face_label = "neutral" # state["voice_labels"].append(voice_label) # state["face_labels"].append(face_label) # # --- 2. Evaluate previous Q and Answer --- # last_q = state["questions"][-1] # q_eval = state["question_evaluations"][-1] # Already in state # ref_answer = generate_reference_answer(last_q, data["job_role"], data["seniority"]) # answer_eval = evaluate_answer(last_q, transcript, ref_answer, data["job_role"], data["seniority"], None) # state["answer_evaluations"].append(answer_eval) # answer_score = answer_eval.get("Score", "medium") if answer_eval else "medium" # # --- 3. Adaptive difficulty --- # if answer_score == "excellent": # state["difficulty_adjustment"] = "harder" # elif answer_score in ("medium", "poor"): # state["difficulty_adjustment"] = "easier" # else: # state["difficulty_adjustment"] = None # # --- 4. Effective confidence (simplified) --- # eff_conf = {"effective_confidence": 0.6} # state["effective_confidences"].append(eff_conf) # # --- LOG --- # state["log"].append({ # "type": "answer", # "question": last_q, # "answer": transcript, # "answer_eval": answer_eval, # "ref_answer": ref_answer, # "face_label": face_label, # "voice_label": voice_label, # "effective_confidence": eff_conf, # "timing": elapsed, # "timestamp": time.time() # }) # # --- Next or End --- # qidx = state["question_idx"] + 1 # if qidx >= state["max_questions"]: # # Save as JSON (optionally) # timestamp = time.strftime("%Y%m%d_%H%M%S") # log_file = f"interview_log_{timestamp}.json" # with open(log_file, "w", encoding="utf-8") as f: # json.dump(state["log"], f, indent=2, ensure_ascii=False) # # Report # summary = "# Interview Summary\n" # for i, q in enumerate(state["questions"]): # summary += (f"\n### Q{i + 1}: {q}\n" # f"- *Answer*: {state['answers'][i]}\n" # f"- *Q Eval*: {state['question_evaluations'][i]}\n" # f"- *A Eval*: {state['answer_evaluations'][i]}\n" # f"- *Time*: {state['timings'][i]}s\n") # summary += f"\n\n⏺ Full log saved as {log_file}." # return (state, gr.update(visible=True, value=summary), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(visible=True, value=f"Last Detected — Face: {face_label}, Voice: {voice_label}")) # else: # # --- Build next prompt using adaptive difficulty --- # state["question_idx"] = qidx # state["q_start_time"] = time.time() # context = "" # You can add your context logic here # prompt = build_interview_prompt( # conversation_history=state["conversation_history"], # user_response=transcript, # context=context, # job_role=data["job_role"], # skills=data["skills"], # seniority=data["seniority"], # difficulty_adjustment=state["difficulty_adjustment"], # voice_label=voice_label, # ) # next_q = groq_llm.predict(prompt) # # Evaluate Q quality # q_eval = eval_question_quality(next_q, data["job_role"], data["seniority"], None) # state["questions"].append(next_q) # state["question_evaluations"].append(q_eval) # state["conversation_history"].append({'role': 'Interviewer', 'content': next_q}) # state["log"].append({"type": "question", "question": next_q, "question_eval": q_eval, "timestamp": time.time()}) # audio_path = bark_tts(next_q) # # Display evaluations # eval_md = f"*Last Answer Eval:* {answer_eval}\n\n*Effective Confidence:* {eff_conf}" # return ( # state, gr.update(visible=False), audio_path, f"*Question {qidx + 1}:* {next_q}", # gr.update(value=None), gr.update(value=None), # gr.update(visible=True, value=eval_md), # ) # # Replace your confirm_btn.click with this: # confirm_btn.click( # process_answer, # [stt_transcript, user_audio_input, interview_state, user_data], # Added None for video_path # [interview_state, interview_summary, question_audio, question_text, user_audio_input, stt_transcript, evaluation_display] # ).then( # lambda: (gr.update(value=None), gr.update(value=None)), None, [user_audio_input, stt_transcript] # ) # demo.launch(debug=True) import gradio as gr import time import tempfile import numpy as np import scipy.io.wavfile as wavfile import os import json import edge_tts import torch, gc from faster_whisper import WhisperModel import asyncio import threading from concurrent.futures import ThreadPoolExecutor print(torch.cuda.is_available()) torch.cuda.empty_cache() gc.collect() # Global variables for lazy loading faster_whisper_model = None tts_voice = "en-US-AriaNeural" # Thread pool for async operations executor = ThreadPoolExecutor(max_workers=2) # Add after your imports if torch.cuda.is_available(): print(f"🔥 CUDA Available: {torch.cuda.get_device_name(0)}") print(f"🔥 CUDA Memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB") # Set default device torch.cuda.set_device(0) else: print("⚠️ CUDA not available, using CPU") def load_models_lazy(): """Load models only when needed""" global faster_whisper_model device = "cuda" if torch.cuda.is_available() else "cpu" print(f"🔁 Using device: {device}") if faster_whisper_model is None: print("🔁 Loading Faster-Whisper model...") compute_type = "float16" if device == "cuda" else "int8" faster_whisper_model = WhisperModel("base", device=device, compute_type=compute_type) print(f"✅ Faster-Whisper model loaded on {device}") async def edge_tts_to_file(text, output_path="tts.wav", voice=tts_voice): communicate = edge_tts.Communicate(text, voice) await communicate.save(output_path) return output_path def tts_async(text): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return executor.submit(loop.run_until_complete, edge_tts_to_file(text)) def whisper_stt(audio_path): """STT using Faster-Whisper""" if not audio_path or not os.path.exists(audio_path): return "" load_models_lazy() print("🔁 Transcribing with Faster-Whisper") segments, _ = faster_whisper_model.transcribe(audio_path) transcript = " ".join(segment.text for segment in segments) return transcript.strip() seniority_mapping = { "Entry-level": 1, "Junior": 2, "Mid-Level": 3, "Senior": 4, "Lead": 5 } with gr.Blocks(theme=gr.themes.Soft()) as demo: user_data = gr.State({}) interview_state = gr.State({}) missing_fields_state = gr.State([]) tts_future = gr.State(None) # Store async TTS future with gr.Column(visible=True) as user_info_section: gr.Markdown("## Candidate Information") cv_file = gr.File(label="Upload CV") job_desc = gr.Textbox(label="Job Description") start_btn = gr.Button("Continue", interactive=False) with gr.Column(visible=False) as missing_section: gr.Markdown("## Missing Information") name_in = gr.Textbox(label="Name", visible=False) role_in = gr.Textbox(label="Job Role", visible=False) seniority_in = gr.Dropdown(list(seniority_mapping.keys()), label="Seniority", visible=False) skills_in = gr.Textbox(label="Skills", visible=False) submit_btn = gr.Button("Submit", interactive=False) with gr.Column(visible=False) as interview_pre_section: pre_interview_greeting_md = gr.Markdown() start_interview_final_btn = gr.Button("Start Interview") loading_status = gr.Markdown("", visible=False) with gr.Column(visible=False) as interview_section: gr.Markdown("## Interview in Progress") question_audio = gr.Audio(label="Listen", interactive=False, autoplay=True) question_text = gr.Markdown() user_audio_input = gr.Audio(sources=["microphone"], type="filepath", label="1. Record Audio Answer") stt_transcript = gr.Textbox(label="Transcribed Answer (edit if needed)") confirm_btn = gr.Button("Confirm Answer") evaluation_display = gr.Markdown() interview_summary = gr.Markdown(visible=False) def validate_start_btn(cv_file, job_desc): return gr.update(interactive=(cv_file is not None and hasattr(cv_file, "name") and bool(job_desc and job_desc.strip()))) cv_file.change(validate_start_btn, [cv_file, job_desc], start_btn) job_desc.change(validate_start_btn, [cv_file, job_desc], start_btn) def process_and_route_initial(cv_file, job_desc): details = extract_candidate_details(cv_file.name) job_info = extract_job_details(job_desc) data = { "name": details.get("name", "unknown"), "job_role": job_info.get("job_title", "unknown"), "seniority": job_info.get("experience_level", "unknown"), "skills": job_info.get("skills", []) } missing = [k for k, v in data.items() if (isinstance(v, str) and v.lower() == "unknown") or not v] if missing: return data, missing, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False) else: greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' when ready." return data, missing, gr.update(visible=False), gr.update(visible=False), gr.update(visible=True, value=greeting) start_btn.click(process_and_route_initial, [cv_file, job_desc], [user_data, missing_fields_state, user_info_section, missing_section, pre_interview_greeting_md]) def show_missing(missing): if missing is None: missing = [] return gr.update(visible="name" in missing), gr.update(visible="job_role" in missing), gr.update(visible="seniority" in missing), gr.update(visible="skills" in missing) missing_fields_state.change(show_missing, missing_fields_state, [name_in, role_in, seniority_in, skills_in]) def validate_fields(name, role, seniority, skills, missing): if not missing: return gr.update(interactive=False) all_filled = all([(not ("name" in missing) or bool(name.strip())), (not ("job_role" in missing) or bool(role.strip())), (not ("seniority" in missing) or bool(seniority)), (not ("skills" in missing) or bool(skills.strip()))]) return gr.update(interactive=all_filled) for inp in [name_in, role_in, seniority_in, skills_in]: inp.change(validate_fields, [name_in, role_in, seniority_in, skills_in, missing_fields_state], submit_btn) def complete_manual(data, name, role, seniority, skills): if data["name"].lower() == "unknown": data["name"] = name if data["job_role"].lower() == "unknown": data["job_role"] = role if data["seniority"].lower() == "unknown": data["seniority"] = seniority if not data["skills"]: data["skills"] = [s.strip() for s in skills.split(",")] greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' to begin." return data, gr.update(visible=False), gr.update(visible=True), gr.update(value=greeting) submit_btn.click(complete_manual, [user_data, name_in, role_in, seniority_in, skills_in], [user_data, missing_section, interview_pre_section, pre_interview_greeting_md]) def start_interview(data): # Initialize interview state state = { "questions": [], "answers": [], "timings": [], "question_evaluations": [], "answer_evaluations": [], "conversation_history": [], "difficulty_adjustment": None, "question_idx": 0, "max_questions": 3, "q_start_time": time.time(), "log": [] } # Build prompt for first question context = "" prompt = build_interview_prompt( conversation_history=[], user_response="", context=context, job_role=data["job_role"], skills=data["skills"], seniority=data["seniority"], difficulty_adjustment=None, voice_label="neutral" ) # Generate first question first_q = groq_llm.predict(prompt) q_eval = { "Score": "N/A", "Reasoning": "Skipped to reduce processing time", "Improvements": [] } state["questions"].append(first_q) state["question_evaluations"].append(q_eval) state["conversation_history"].append({'role': 'Interviewer', 'content': first_q}) # Generate audio with Bark (wait for it) start = time.perf_counter() cleaned_text = first_q.strip().replace("\n", " ") audio_future = tts_async(cleaned_text) audio_path = audio_future.result() print("⏱️ TTS (edge-tts) took", round(time.perf_counter() - start, 2), "seconds") # Log question state["log"].append({ "type": "question", "question": first_q, "question_eval": q_eval, "timestamp": time.time() }) return ( state, gr.update(visible=False), # Hide interview_pre_section gr.update(visible=True), # Show interview_section audio_path, # Set audio f"*Question 1:* {first_q}" # Set question text ) # Hook into Gradio start_interview_final_btn.click( start_interview, [user_data], [interview_state, interview_pre_section, interview_section, question_audio, question_text] ) def transcribe(audio_path): return whisper_stt(audio_path) user_audio_input.change(transcribe, user_audio_input, stt_transcript) def process_answer(transcript, audio_path, state, data): if not transcript: return state, gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update() elapsed = round(time.time() - state.get("q_start_time", time.time()), 2) state["timings"].append(elapsed) state["answers"].append(transcript) state["conversation_history"].append({'role': 'Candidate', 'content': transcript}) last_q = state["questions"][-1] q_eval = state["question_evaluations"][-1] ref_answer = generate_reference_answer(last_q, data["job_role"], data["seniority"]) answer_eval = evaluate_answer(last_q, transcript, ref_answer, data["job_role"], data["seniority"], None) state["answer_evaluations"].append(answer_eval) answer_score = answer_eval.get("Score", "medium") if answer_eval else "medium" if answer_score == "excellent": state["difficulty_adjustment"] = "harder" elif answer_score in ("medium", "poor"): state["difficulty_adjustment"] = "easier" else: state["difficulty_adjustment"] = None state["log"].append({ "type": "answer", "question": last_q, "answer": transcript, "answer_eval": answer_eval, "ref_answer": ref_answer, "timing": elapsed, "timestamp": time.time() }) qidx = state["question_idx"] + 1 if qidx >= state["max_questions"]: timestamp = time.strftime("%Y%m%d_%H%M%S") log_file = f"interview_log_{timestamp}.json" with open(log_file, "w", encoding="utf-8") as f: json.dump(state["log"], f, indent=2, ensure_ascii=False) summary = "# Interview Summary\n" for i, q in enumerate(state["questions"]): summary += (f"\n### Q{i + 1}: {q}\n" f"- *Answer*: {state['answers'][i]}\n" f"- *Q Eval*: {state['question_evaluations'][i]}\n" f"- *A Eval*: {state['answer_evaluations'][i]}\n" f"- *Time*: {state['timings'][i]}s\n") summary += f"\n\n⏺ Full log saved as {log_file}." return state, gr.update(visible=True, value=summary), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(visible=False) else: state["question_idx"] = qidx state["q_start_time"] = time.time() context = "" prompt = build_interview_prompt( conversation_history=state["conversation_history"], user_response=transcript, context=context, job_role=data["job_role"], skills=data["skills"], seniority=data["seniority"], difficulty_adjustment=state["difficulty_adjustment"], voice_label="neutral" ) next_q = groq_llm.predict(prompt) q_eval = eval_question_quality(next_q, data["job_role"], data["seniority"], None) state["questions"].append(next_q) state["question_evaluations"].append(q_eval) state["conversation_history"].append({'role': 'Interviewer', 'content': next_q}) state["log"].append({"type": "question", "question": next_q, "question_eval": q_eval, "timestamp": time.time()}) # Generate TTS asynchronously for next question too audio_future = tts_async(next_q) # For now, we'll wait for it (you can make this async too) audio_path = audio_future.result() eval_md = f"*Last Answer Eval:* {answer_eval}" return state, gr.update(visible=False), audio_path, f"*Question {qidx + 1}:* {next_q}", gr.update(value=None), gr.update(value=None), gr.update(visible=True, value=eval_md) confirm_btn.click( process_answer, [stt_transcript, user_audio_input, interview_state, user_data], [interview_state, interview_summary, question_audio, question_text, user_audio_input, stt_transcript, evaluation_display] ).then( lambda: (gr.update(value=None), gr.update(value=None)), None, [user_audio_input, stt_transcript] ) demo.launch(debug=True)