import requests import os import json from langchain_groq import ChatGroq from langchain_community.embeddings import HuggingFaceEmbeddings from langchain_community.vectorstores import Qdrant from langchain.prompts import PromptTemplate from langchain.chains import LLMChain from langchain.retrievers import ContextualCompressionRetriever from langchain.retrievers.document_compressors import CohereRerank from qdrant_client import QdrantClient import cohere import json import re import time from collections import defaultdict from qdrant_client.http import models from qdrant_client.models import PointStruct from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.neighbors import NearestNeighbors from transformers import AutoTokenizer #from langchain_huggingface import HuggingFaceEndpoint from langchain_community.embeddings import HuggingFaceEmbeddings import numpy as np import os from dotenv import load_dotenv from enum import Enum import time from inputimeout import inputimeout, TimeoutOccurred # Import Qdrant client and models (adjust based on your environment) from qdrant_client import QdrantClient from qdrant_client.http.models import VectorParams, Distance, Filter, FieldCondition, MatchValue from qdrant_client.http.models import PointStruct, Filter, FieldCondition, MatchValue, SearchRequest import traceback from transformers import pipeline from textwrap import dedent import json import logging from transformers import pipeline,BitsAndBytesConfig import os cohere_api_key = os.getenv("COHERE_API_KEY") chat_groq_api = os.getenv("GROQ_API_KEY") hf_api_key = os.getenv("HF_API_KEY") qdrant_api = os.getenv("QDRANT_API_KEY") qdrant_url = os.getenv("QDRANT_API_URL") print("GROQ API Key:", chat_groq_api) print("QDRANT API Key:", qdrant_api) print("QDRANT API URL:", qdrant_url) print("Cohere API Key:", cohere_api_key) from qdrant_client import QdrantClient qdrant_client = QdrantClient( url="https://313b1ceb-057f-4b7b-89f5-7b19a213fe65.us-east-1-0.aws.cloud.qdrant.io:6333", api_key="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2Nlc3MiOiJtIn0.w13SPZbljbSvt9Ch_0r034QhMFlmEr4ctXqLo2zhxm4", ) print(qdrant_client.get_collections()) class CustomChatGroq: def __init__(self, temperature, model_name, api_key): self.temperature = temperature self.model_name = model_name self.api_key = api_key self.api_url = "https://api.groq.com/openai/v1/chat/completions" def predict(self, prompt): """Send a request to the Groq API and return the generated response.""" try: headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } payload = { "model": self.model_name, "messages": [{"role": "system", "content": "You are an AI interviewer."}, {"role": "user", "content": prompt}], "temperature": self.temperature, "max_tokens": 150 } response = requests.post(self.api_url, headers=headers, json=payload, timeout=10) response.raise_for_status() # Raise an error for HTTP codes 4xx/5xx data = response.json() # Extract response text based on Groq API response format if "choices" in data and len(data["choices"]) > 0: return data["choices"][0]["message"]["content"].strip() logging.warning("Unexpected response structure from Groq API") return "Interviewer: Could you tell me more about your relevant experience?" except requests.exceptions.RequestException as e: logging.error(f"ChatGroq API error: {e}") return "Interviewer: Due to a system issue, let's move on to another question." groq_llm = ChatGroq( temperature=0.7, model_name="llama-3.3-70b-versatile", api_key=chat_groq_api ) from huggingface_hub import login import os HF_TOKEN = os.getenv("HF_TOKEN") if HF_TOKEN: login(HF_TOKEN) else: raise EnvironmentError("Missing HF_TOKEN environment variable.") #Load mistral Model from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline import torch print(torch.cuda.is_available()) MODEL_PATH = "mistralai/Mistral-7B-Instruct-v0.3" #MODEL_PATH = "tiiuae/falcon-rw-1b" bnb_config = BitsAndBytesConfig( load_in_8bit=True, ) mistral_tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH,token=hf_api_key) judge_llm = AutoModelForCausalLM.from_pretrained( MODEL_PATH, quantization_config=bnb_config,torch_dtype=torch.float16, device_map="auto", token=hf_api_key ) judge_llm.config.pad_token_id = judge_llm.config.eos_token_id print(judge_llm.hf_device_map) judge_pipeline = pipeline( "text-generation", model=judge_llm, tokenizer=mistral_tokenizer, max_new_tokens=128, temperature=0.3, top_p=0.9, do_sample=True, # Optional but recommended with temperature/top_p repetition_penalty=1.1, ) output = judge_pipeline("Q: What is Python?\nA:", max_new_tokens=128)[0]['generated_text'] print(output) # embedding model from sentence_transformers import SentenceTransformer class LocalEmbeddings: def __init__(self, model_name="all-MiniLM-L6-v2"): self.model = SentenceTransformer(model_name) def embed_query(self, text): return self.model.encode(text).tolist() def embed_documents(self, documents): return self.model.encode(documents).tolist() embeddings = LocalEmbeddings() # import cohere qdrant_client = QdrantClient(url=qdrant_url, api_key=qdrant_api,check_compatibility=False) co = cohere.Client(api_key=cohere_api_key) class EvaluationScore(str, Enum): POOR = "Poor" MEDIUM = "Medium" GOOD = "Good" EXCELLENT = "Excellent" # Cohere Reranker class CohereReranker: def __init__(self, client): self.client = client def compress_documents(self, documents, query): if not documents: return [] doc_texts = [doc.page_content for doc in documents] try: reranked = self.client.rerank( query=query, documents=doc_texts, model="rerank-english-v2.0", top_n=5 ) return [documents[result.index] for result in reranked.results] except Exception as e: logging.error(f"Error in CohereReranker.compress_documents: {e}") return documents[:5] reranker = CohereReranker(co) def load_data_from_json(file_path): """Load interview Q&A data from a JSON file.""" try: with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) job_role_buckets = defaultdict(list) for idx, item in enumerate(data): try: job_role = item["Job Role"].lower().strip() question = item["Questions"].strip() answer = item["Answers"].strip() job_role_buckets[job_role].append({"question": question, "answer": answer}) except KeyError as e: logging.warning(f"Skipping item {idx}: missing key {e}") return job_role_buckets # <--- You missed this! except Exception as e: logging.error(f"Error loading data: {e}") raise def verify_qdrant_collection(collection_name='interview_questions'): """Verify if a Qdrant collection exists with the correct configuration.""" try: collection_info = qdrant_client.get_collection(collection_name) vector_size = collection_info.config.params.vectors.size logging.info(f"Collection '{collection_name}' exists with vector size: {vector_size}") return True except Exception as e: logging.warning(f"Collection '{collection_name}' not found: {e}") return False def store_data_to_qdrant(data, collection_name='interview_questions', batch_size=100): """Store interview data in the Qdrant vector database.""" try: # Check if collection exists, otherwise create it if not verify_qdrant_collection(collection_name): try: qdrant_client.create_collection( collection_name=collection_name, vectors_config=VectorParams(size=384, distance=Distance.COSINE) ) logging.info(f"Created collection '{collection_name}'") except Exception as e: logging.error(f"Error creating collection: {e}\n{traceback.format_exc()}") return False points = [] point_id = 0 total_points = sum(len(qa_list) for qa_list in data.values()) processed = 0 for job_role, qa_list in data.items(): for entry in qa_list: try: emb = embeddings.embed_query(entry["question"]) print(f"Embedding shape: {len(emb)}") if not emb or len(emb) != 384: logging.warning(f"Skipping point {point_id} due to invalid embedding length: {len(emb)}") continue points.append(PointStruct( id=point_id, vector=emb, payload={ "job_role": job_role, "question": entry["question"], "answer": entry["answer"] } )) point_id += 1 processed += 1 # Batch upload if len(points) >= batch_size: try: qdrant_client.upsert(collection_name=collection_name, points=points) logging.info(f"Stored {processed}/{total_points} points ({processed/total_points*100:.1f}%)") except Exception as upsert_err: logging.error(f"Error during upsert: {upsert_err}\n{traceback.format_exc()}") points = [] except Exception as embed_err: logging.error(f"Embedding error for point {point_id}: {embed_err}\n{traceback.format_exc()}") # Final batch upload if points: try: qdrant_client.upsert(collection_name=collection_name, points=points) logging.info(f"Stored final batch of {len(points)} points") except Exception as final_upsert_err: logging.error(f"Error during final upsert: {final_upsert_err}\n{traceback.format_exc()}") # Final verification try: count = qdrant_client.count(collection_name=collection_name, exact=True).count print("Current count:", count) logging.info(f"✅ Successfully stored {count} points in Qdrant") if count != total_points: logging.warning(f"Expected {total_points} points but stored {count}") except Exception as count_err: logging.error(f"Error verifying stored points: {count_err}\n{traceback.format_exc()}") return True except Exception as e: logging.error(f"Error storing data to Qdrant: {e}\n{traceback.format_exc()}") return False # to ensure cosine similarity use info = qdrant_client.get_collection("interview_questions") print(info.config.params.vectors.distance) def extract_all_roles_from_qdrant(collection_name='interview_questions'): """ Extract all unique job roles from the Qdrant vector store """ try: all_roles = set() scroll_offset = None while True: response = qdrant_client.scroll( collection_name=collection_name, limit=200, offset=scroll_offset, with_payload=True ) points, next_page_offset = response if not points: break for point in points: role = point.payload.get("job_role", "").strip().lower() if role: all_roles.add(role) if not next_page_offset: break scroll_offset = next_page_offset if not all_roles: logging.warning("[Qdrant] No roles found in payloads.") else: logging.info(f"[Qdrant] Extracted {len(all_roles)} unique job roles.") return list(all_roles) except Exception as e: logging.error(f"Error extracting roles from Qdrant: {e}") return [] import numpy as np import logging from sklearn.metrics.pairwise import cosine_similarity def find_similar_roles(user_role, all_roles, top_k=3): """ Find the most similar job roles to the given user_role using embeddings. """ try: # Clean inputs user_role = user_role.strip().lower() if not user_role or not all_roles or not isinstance(all_roles, list): logging.warning("Invalid input for role similarity") return [] # Embed user role try: user_embedding = embeddings.embed_query(user_role) if user_embedding is None: logging.error("User embedding is None") return [] except Exception as e: logging.error(f"Error embedding user role: {type(e).__name__}: {e}") return [] # Embed all roles try: role_embeddings = [] valid_roles = [] for role in all_roles: emb = embeddings.embed_query(role.lower()) if emb is not None: role_embeddings.append(emb) valid_roles.append(role) else: logging.warning(f"Skipping role with no embedding: {role}") except Exception as e: logging.error(f"Error embedding all roles: {type(e).__name__}: {e}") return [] if not role_embeddings: logging.error("All role embeddings failed") return [] # Compute similarities similarities = cosine_similarity([user_embedding], role_embeddings)[0] top_indices = np.argsort(similarities)[::-1][:top_k] similar_roles = [valid_roles[i] for i in top_indices] logging.debug(f"Similar roles to '{user_role}': {similar_roles}") return similar_roles except Exception as e: logging.error(f"Error finding similar roles: {type(e).__name__}: {e}", exc_info=True) return [] # RETREIVE ALL DATA RELATED TO THE JOB ROLE NOT JUST TOP_K def get_role_questions(job_role): try: if not job_role: logging.warning("Job role is empty.") return [] filter_by_role = Filter( must=[FieldCondition( key="job_role", match=MatchValue(value=job_role.lower()) )] ) all_results = [] offset = None while True: results, next_page_offset = qdrant_client.scroll( collection_name="interview_questions", scroll_filter=filter_by_role, with_payload=True, with_vectors=False, limit=100, # batch size offset=offset ) all_results.extend(results) if not next_page_offset: break offset = next_page_offset parsed_results = [{ "question": r.payload.get("question"), "answer": r.payload.get("answer"), "job_role": r.payload.get("job_role") } for r in all_results] return parsed_results except Exception as e: logging.error(f"Error fetching role questions: {type(e).__name__}: {e}", exc_info=True) return [] def retrieve_interview_data(job_role, all_roles): """ Retrieve all interview Q&A for a given job role. Falls back to similar roles if no data found. Args: job_role (str): Input job role (can be misspelled) all_roles (list): Full list of available job roles Returns: list: List of QA dicts with keys: 'question', 'answer', 'job_role' """ import logging logging.basicConfig(level=logging.INFO) job_role = job_role.strip().lower() seen_questions = set() final_results = [] # Step 1: Try exact match (fetch all questions for role) logging.info(f"Trying to fetch all data for exact role: '{job_role}'") exact_matches = get_role_questions(job_role) for qa in exact_matches: question = qa["question"] if question and question not in seen_questions: seen_questions.add(question) final_results.append(qa) if final_results: logging.info(f"Found {len(final_results)} QA pairs for exact role '{job_role}'") return final_results logging.warning(f"No data found for role '{job_role}'. Trying similar roles...") # Step 2: No matches — find similar roles similar_roles = find_similar_roles(job_role, all_roles, top_k=3) if not similar_roles: logging.warning("No similar roles found.") return [] logging.info(f"Found similar roles: {similar_roles}") # Step 3: Retrieve data for each similar role (all questions) for role in similar_roles: logging.info(f"Fetching data for similar role: '{role}'") role_qa = get_role_questions(role) for qa in role_qa: question = qa["question"] if question and question not in seen_questions: seen_questions.add(question) final_results.append(qa) logging.info(f"Retrieved total {len(final_results)} QA pairs from similar roles") return final_results import random def random_context_chunks(retrieved_data, k=3): chunks = random.sample(retrieved_data, k) return "\n\n".join([f"Q: {item['question']}\nA: {item['answer']}" for item in chunks]) import json import logging import re from typing import Dict def eval_question_quality( question: str, job_role: str, seniority: str, judge_pipeline=None, max_retries=1 # Allow at least 1 retry on parse fail ) -> Dict[str, str]: import time try: # Use provided pipeline or fall back to global if judge_pipeline is None: judge_pipeline = globals().get("judge_pipeline") if not judge_pipeline: return { "Score": "Error", "Reasoning": "Judge pipeline not available", "Improvements": "Please provide a valid language model pipeline" } prompt = f""" ... (same as your prompt) ... Now evaluate this question: \"{question}\" """ for attempt in range(max_retries + 1): response = judge_pipeline( prompt, max_new_tokens=512, do_sample=False, temperature=0.1, repetition_penalty=1.2 )[0]["generated_text"] try: # Fallback to last {...} block match = re.search(r'\{.*\}', response, re.DOTALL) if not match: raise ValueError("Could not locate JSON structure in model output.") json_str = match.group(0) result = json.loads(json_str) # Validate required fields and values required_keys = ["Score", "Reasoning", "Improvements"] valid_scores = {"Poor", "Medium", "Good", "Excellent"} if not all(k in result for k in required_keys): raise ValueError("Missing required fields.") if result["Score"] not in valid_scores: raise ValueError("Invalid score value.") return result except Exception as e: logging.warning(f"Attempt {attempt+1} JSON parsing failed: {e}") time.sleep(0.2) # Small delay before retry # If all attempts fail, return a default valid dict return { "Score": "Poor", "Reasoning": "The evaluation model failed to produce a valid score, so defaulted to 'Poor'. Check model output and prompt formatting.", "Improvements": [ "Ensure the question is clear and role-relevant.", "Double-check prompt and formatting.", "Try rephrasing the question to match rubric." ] } except Exception as e: logging.error(f"Error in eval_question_quality: {type(e).__name__}: {e}", exc_info=True) return { "Score": "Poor", "Reasoning": f"Critical error occurred: {str(e)}. Defaulted to 'Poor'.", "Improvements": [ "Retry with a different question.", "Check your judge pipeline connection.", "Contact support if this persists." ] } def evaluate_answer( question: str, answer: str, ref_answer: str, job_role: str, seniority: str, judge_pipeline=None, max_retries=1 ) -> Dict[str, str]: """ Evaluates a candidate's answer to an interview question and returns a structured judgment. Guarantees a valid, actionable result even if the model fails. """ import time try: if judge_pipeline is None: judge_pipeline = globals().get("judge_pipeline") if not judge_pipeline: return { "Score": "Error", "Reasoning": "Judge pipeline not available", "Improvements": [ "Please provide a valid language model pipeline" ] } # Enhanced prompt (your version) prompt = f""" You are an expert technical interviewer evaluating a candidate's response for a {job_role} position at the {seniority} level. You are provided with: - The question asked - The candidate's response - A reference answer that represents a high-quality expected answer Evaluate the candidate's response based on: - Technical correctness - Clarity and depth of explanation - Relevance to the job role and seniority - Completeness and structure Be objective, concise, and use professional language. Be fair but critical. -------------------------- Question: {question} Candidate Answer: {answer} Reference Answer: {ref_answer} -------------------------- Now return your evaluation as a valid JSON object using exactly these keys: - "Score": One of ["Poor", "Medium", "Good", "Excellent"] - "Reasoning": 2-3 sentence explanation justifying the score, covering clarity, accuracy, completeness, or relevance - "Improvements": A list of 2-3 specific and constructive suggestions to help the candidate improve this answer Example: {{ "Score": "Good", "Reasoning": "The answer demonstrates a good understanding of the concept and touches on key ideas, but lacks depth in explaining the trade-offs between techniques.", "Improvements": [ "Explain when this method might fail or produce biased results", "Include examples or metrics to support the explanation", "Clarify the specific business impact or outcome achieved" ] }} Respond only with the JSON: """ for attempt in range(max_retries + 1): output = judge_pipeline( prompt, max_new_tokens=512, temperature=0.3, do_sample=False )[0]["generated_text"] # Try to extract JSON response from output robustly try: start_idx = output.rfind("{") end_idx = output.rfind("}") + 1 if start_idx != -1 and end_idx != -1 and end_idx > start_idx: json_str = output[start_idx:end_idx] result = json.loads(json_str) valid_scores = {"Poor", "Medium", "Good", "Excellent"} if result.get("Score") in valid_scores: return { "Score": result["Score"], "Reasoning": result.get("Reasoning", "No explanation provided."), "Improvements": result.get("Improvements", ["No improvement suggestions provided."]) } else: raise ValueError(f"Invalid Score value: {result.get('Score')}") else: raise ValueError("JSON format not found in output") except Exception as e: logging.warning(f"evaluate_answer: Attempt {attempt+1} failed to parse model output: {e}") time.sleep(0.2) # Small wait before retry # Fallback: always return a default 'Poor' score if all attempts fail return { "Score": "Poor", "Reasoning": "The evaluation model failed to produce a valid score or parse output; defaulted to 'Poor'. Please check model output and prompt formatting.", "Improvements": [ "Be more specific and detailed in the answer.", "Structure your response with clear points.", "Relate your answer more closely to the job role and question." ] } except Exception as e: logging.error(f"Evaluation failed: {e}", exc_info=True) return { "Score": "Poor", "Reasoning": f"Critical error occurred: {str(e)}. Defaulted to 'Poor'.", "Improvements": [ "Try again with a different answer.", "Check your judge pipeline connection.", "Contact support if the error persists." ] } # SAME BUT USING LLAMA 3.3 FROM GROQ def generate_reference_answer(question, job_role, seniority): """ Generates a high-quality reference answer using Groq-hosted LLaMA model. Args: question (str): Interview question to answer. job_role (str): Target job role (e.g., "Frontend Developer"). seniority (str): Experience level (e.g., "Mid-Level"). Returns: str: Clean, generated reference answer or error message. """ try: # Clean, role-specific prompt prompt = f"""You are a {seniority} {job_role}. Q: {question} A:""" # Use Groq-hosted model to generate the answer ref_answer = groq_llm.predict(prompt) if not ref_answer.strip(): return "Reference answer not generated." return ref_answer.strip() except Exception as e: logging.error(f"Error generating reference answer: {e}", exc_info=True) return "Unable to generate reference answer due to an error" def build_interview_prompt(conversation_history, user_response, context, job_role, skills, seniority, difficulty_adjustment=None, voice_label=None, face_label=None, effective_confidence=None): """Build a prompt for generating the next interview question with adaptive difficulty and fairness logic.""" interview_template = """ You are an AI interviewer conducting a real-time interview for a {job_role} position. Your objective is to thoroughly evaluate the candidate's suitability for the role using smart, structured, and adaptive questioning. --- Interview Rules and Principles: - The **baseline difficulty** of questions must match the candidate’s seniority level (e.g., junior, mid-level, senior). - Use your judgment to increase difficulty **slightly** if the candidate performs well, or simplify if they struggle — but never drop below the expected baseline for their level. - Avoid asking extremely difficult questions to junior candidates unless they’ve clearly demonstrated advanced knowledge. - Be fair: candidates for the same role should be evaluated within a consistent difficulty range. - Adapt your line of questioning gradually and logically based on the **overall flow**, not just the last answer. - Include real-world problem-solving scenarios to test how the candidate thinks and behaves practically. - You must **lead** the interview and make intelligent decisions about what to ask next. --- Context Use: {context_instruction} Note: If no relevant context was retrieved or the previous answer is unclear, you must still generate a thoughtful interview question using your own knowledge. Do not skip generation. Avoid default or fallback responses — always try to generate a meaningful and fair next question. --- Job Role: {job_role} Seniority Level: {seniority} Skills Focus: {skills} Difficulty Setting: {difficulty} (based on {difficulty_adjustment}) --- Recent Conversation History: {history} Candidate's Last Response: "{user_response}" Evaluation of Last Response: {response_evaluation} Voice Tone: {voice_label} --- --- Important: If no relevant context was retrieved or the previous answer is unclear or off-topic, you must still generate a meaningful and fair interview question using your own knowledge and best practices. Do not skip question generation or fall back to default/filler responses. --- Guidelines for Next Question: - If this is the beginning of the interview, start with a question about the candidate’s background or experience. - Base the difficulty primarily on the seniority level, with light adjustment from recent performance. - Focus on core skills, real-world applications, and depth of reasoning. - Ask only one question. Be clear and concise. Generate the next interview question now: """ # Calculate difficulty phrase if difficulty_adjustment == "harder": difficulty = f"slightly more challenging than typical for {seniority}" elif difficulty_adjustment == "easier": difficulty = f"slightly easier than typical for {seniority}" else: difficulty = f"appropriate for {seniority}" # Choose context logic if context.strip(): context_instruction = ( "Use both your own expertise and the provided context from relevant interview datasets. " "You can either build on questions from the dataset or generate your own." ) context = context.strip() else: context_instruction = ( "No specific context retrieved. Use your own knowledge and best practices to craft a question." ) context = "" # Let it be actually empty! # Format conversation history (last 6 exchanges max) recent_history = conversation_history[-6:] if len(conversation_history) > 6 else conversation_history formatted_history = "\n".join([f"{msg['role'].capitalize()}: {msg['content']}" for msg in recent_history]) # Add evaluation summary if available if conversation_history and conversation_history[-1].get("evaluation"): eval_data = conversation_history[-1]["evaluation"][-1] response_evaluation = f""" - Score: {eval_data.get('Score', 'N/A')} - Reasoning: {eval_data.get('Reasoning', 'N/A')} - Improvements: {eval_data.get('Improvements', 'N/A')} """ else: response_evaluation = "No evaluation available yet." # Fill the template prompt = interview_template.format( job_role=job_role, seniority=seniority, skills=skills, difficulty=difficulty, difficulty_adjustment=difficulty_adjustment if difficulty_adjustment else "default seniority", context_instruction=context_instruction, context=context, history=formatted_history, user_response=user_response, response_evaluation=response_evaluation.strip(), voice_label=voice_label or "unknown", ) return prompt def generate_llm_interview_report( interview_state, logged_samples, job_role, seniority ): from collections import Counter # Helper for converting score to 1–5 def score_label(label): mapping = { "confident": 5, "calm": 4, "neutral": 3, "nervous": 2, "anxious": 1, "unknown": 3 } return mapping.get(label.lower(), 3) def section_score(vals): return round(sum(vals)/len(vals), 2) if vals else "N/A" # Aggregate info scores, voice_conf, face_conf, comm_scores = [], [], [], [] tech_details, comm_details, emotion_details, relevance_details, problem_details = [], [], [], [], [] for entry in logged_samples: answer_eval = entry.get("answer_evaluation", {}) score = answer_eval.get("Score", "Not Evaluated") reasoning = answer_eval.get("Reasoning", "") if score.lower() in ["excellent", "good", "medium", "poor"]: score_map = {"excellent": 5, "good": 4, "medium": 3, "poor": 2} scores.append(score_map[score.lower()]) # Section details tech_details.append(reasoning) comm_details.append(reasoning) # Emotions/confidence voice_conf.append(score_label(entry.get("voice_label", "unknown"))) face_conf.append(score_label(entry.get("face_label", "unknown"))) # Communication estimate if entry["user_answer"]: length = len(entry["user_answer"].split()) comm_score = min(5, max(2, length // 30)) comm_scores.append(comm_score) # Compute averages for sections avg_problem = section_score(scores) avg_tech = section_score(scores) avg_comm = section_score(comm_scores) avg_emotion = section_score([(v+f)/2 for v, f in zip(voice_conf, face_conf)]) # Compute decision heuristics section_averages = [avg_problem, avg_tech, avg_comm, avg_emotion] numeric_avgs = [v for v in section_averages if isinstance(v, (float, int))] avg_overall = round(sum(numeric_avgs) / len(numeric_avgs), 2) if numeric_avgs else 0 # Hiring logic (you can customize thresholds) if avg_overall >= 4.5: verdict = "Strong Hire" elif avg_overall >= 4.0: verdict = "Hire" elif avg_overall >= 3.0: verdict = "Conditional Hire" else: verdict = "No Hire" # Build LLM report prompt transcript = "\n\n".join([ f"Q: {e['generated_question']}\nA: {e['user_answer']}\nScore: {e.get('answer_evaluation',{}).get('Score','')}\nReasoning: {e.get('answer_evaluation',{}).get('Reasoning','')}" for e in logged_samples ]) prompt = f""" You are a senior technical interviewer at a major tech company. Write a structured, realistic hiring report for this {seniority} {job_role} interview, using these section scores (scale 1–5, with 5 best): Section-wise Evaluation 1. *Problem Solving & Critical Thinking*: {avg_problem} 2. *Technical Depth & Knowledge*: {avg_tech} 3. *Communication & Clarity*: {avg_comm} 4. *Emotional Composure & Confidence*: {avg_emotion} 5. *Role Relevance*: 5 *Transcript* {transcript} Your report should have the following sections: 1. *Executive Summary* (realistic, hiring-committee style) 2. *Section-wise Comments* (for each numbered category above, with short paragraph citing specifics) 3. *Strengths & Weaknesses* (list at least 2 for each) 4. *Final Verdict*: {verdict} 5. *Recommendations* (2–3 for future improvement) Use realistic language. If some sections are N/A or lower than others, comment honestly. Interview Report: """ # LLM call, or just return prompt for review return groq_llm.predict(prompt) def get_user_info(): """ Collects essential information from the candidate before starting the interview. Returns a dictionary with keys: name, job_role, seniority, skills """ import logging logging.info("Collecting user information...") print("Welcome to the AI Interview Simulator!") print("Let’s set up your mock interview.\n") # Get user name name = input("What is your name? ").strip() while not name: print("Please enter your name.") name = input("What is your name? ").strip() # Get job role job_role = input(f"Hi {name}, what job role are you preparing for? (e.g. Frontend Developer) ").strip() while not job_role: print("Please specify the job role.") job_role = input("What job role are you preparing for? ").strip() # Get seniority level seniority_options = ["Entry-level", "Junior", "Mid-Level", "Senior", "Lead"] print("\nSelect your experience level:") for i, option in enumerate(seniority_options, 1): print(f"{i}. {option}") seniority_choice = None while seniority_choice not in range(1, len(seniority_options)+1): try: seniority_choice = int(input("Enter the number corresponding to your level: ")) except ValueError: print(f"Please enter a number between 1 and {len(seniority_options)}") seniority = seniority_options[seniority_choice - 1] # Get skills skills_input = input(f"\nWhat are your top skills relevant to {job_role}? (Separate with commas): ") skills = [skill.strip() for skill in skills_input.split(",") if skill.strip()] while not skills: print("Please enter at least one skill.") skills_input = input("Your top skills (comma-separated): ") skills = [skill.strip() for skill in skills_input.split(",") if skill.strip()] # Confirm collected info print("\n Interview Setup Complete!") print(f"Name: {name}") print(f"Job Role: {job_role}") print(f"Experience Level: {seniority}") print(f"Skills: {', '.join(skills)}") print("\nStarting your mock interview...\n") return { "name": name, "job_role": job_role, "seniority": seniority, "skills": skills } import threading def wait_for_user_response(timeout=200): """Wait for user input with timeout. Returns '' if no response.""" user_input = [] def get_input(): answer = input("Your Answer (within timeout): ").strip() user_input.append(answer) thread = threading.Thread(target=get_input) thread.start() thread.join(timeout) return user_input[0] if user_input else "" import json from datetime import datetime from time import time import random def interview_loop(max_questions, timeout_seconds=300, collection_name="interview_questions", judge_pipeline=None, save_path="interview_log.json"): user_info = get_user_info() job_role = user_info['job_role'] seniority = user_info['seniority'] skills = user_info['skills'] all_roles = extract_all_roles_from_qdrant(collection_name=collection_name) retrieved_data = retrieve_interview_data(job_role, all_roles) context_data = random_context_chunks(retrieved_data, k=4) conversation_history = [] interview_state = { "questions": [], "user_answer": [], "job_role": job_role, "seniority": seniority, "start_time": time() } # Store log for evaluation logged_samples = [] difficulty_adjustment = None for i in range(max_questions): last_user_response = conversation_history[-1]['content'] if conversation_history else "" # Generate question prompt prompt = build_interview_prompt( conversation_history=conversation_history, user_response=last_user_response, context=context_data, job_role=job_role, skills=skills, seniority=seniority, difficulty_adjustment=difficulty_adjustment ) question = groq_llm.predict(prompt) question_eval = eval_question_quality(question, job_role, seniority, judge_pipeline) conversation_history.append({'role': "Interviewer", "content": question}) print(f"Interviewer: Q{i + 1} : {question}") # Wait for user answer start_time = time() user_answer = wait_for_user_response(timeout=timeout_seconds) response_time = time() - start_time skipped = False answer_eval = None ref_answer = None if not user_answer: print("No Response Received, moving to next question.") user_answer = None skipped = True difficulty_adjustment = "medium" else: conversation_history.append({"role": "Candidate", "content": user_answer}) ref_answer = generate_reference_answer(question, job_role, seniority) answer_eval = evaluate_answer( question=question, answer=user_answer, ref_answer=ref_answer, job_role=job_role, seniority=seniority, judge_pipeline=judge_pipeline ) interview_state["user_answer"].append(user_answer) # Append inline evaluation for history conversation_history[-1].setdefault('evaluation', []).append({ "technical_depth": { "score": answer_eval['Score'], "Reasoning": answer_eval['Reasoning'] } }) # Adjust difficulty score = answer_eval['Score'].lower() if score == "excellent": difficulty_adjustment = "harder" elif score in ['poor', 'medium']: difficulty_adjustment = "easier" else: difficulty_adjustment = None # Store for local logging logged_samples.append({ "job_role": job_role, "seniority": seniority, "skills": skills, "context": context_data, "prompt": prompt, "generated_question": question, "question_evaluation": question_eval, "user_answer": user_answer, "reference_answer": ref_answer, "answer_evaluation": answer_eval, "skipped": skipped }) # Store state interview_state['questions'].append({ "question": question, "question_evaluation": question_eval, "user_answer": user_answer, "answer_evaluation": answer_eval, "skipped": skipped }) interview_state['end_time'] = time() report = generate_llm_interview_report(interview_state, job_role, seniority) print("Report : _____________________\n") print(report) print('______________________________________________') # Save full interview logs to JSON timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{save_path.replace('.json', '')}_{timestamp}.json" with open(filename, "w", encoding="utf-8") as f: json.dump(logged_samples, f, indent=2, ensure_ascii=False) print(f" Interview log saved to {filename}") print("____________________________________\n") print(f"interview state : {interview_state}") return interview_state, report from sklearn.metrics import precision_score, recall_score, f1_score import numpy as np # build ground truth for retrieving data for testing def build_ground_truth(all_roles): gt = {} for role in all_roles: qa_list = get_role_questions(role) gt[role] = set(q["question"] for q in qa_list if q["question"]) return gt def evaluate_retrieval(job_role, all_roles, k=10): """ Evaluate retrieval quality using Precision@k, Recall@k, and F1@k. Args: job_role (str): The input job role to search for. all_roles (list): List of all available job roles in the system. k (int): Top-k retrieved questions to evaluate. Returns: dict: Evaluation metrics including precision, recall, and f1. """ # Step 1: Ground Truth (all exact questions stored for this role) ground_truth_qs = set( q["question"].strip() for q in get_role_questions(job_role) if q.get("question") ) if not ground_truth_qs: print(f"[!] No ground truth found for role: {job_role}") return {} # Step 2: Retrieved Questions (may include fallback roles) retrieved_qas = retrieve_interview_data(job_role, all_roles) retrieved_qs = [q["question"].strip() for q in retrieved_qas if q.get("question")] # Step 3: Take top-k retrieved (you can also do full if needed) retrieved_top_k = retrieved_qs[:k] # Step 4: Binary relevance (1 if in ground truth, 0 if not) y_true = [1 if q in ground_truth_qs else 0 for q in retrieved_top_k] y_pred = [1] * len(y_true) # all retrieved are treated as predicted relevant precision = precision_score(y_true, y_pred, zero_division=0) recall = recall_score(y_true, y_pred, zero_division=0) f1 = f1_score(y_true, y_pred, zero_division=0) print(f" Retrieval Evaluation for role: '{job_role}' (Top-{k})") print(f"Precision@{k}: {precision:.2f}") print(f"Recall@{k}: {recall:.2f}") print(f"F1@{k}: {f1:.2f}") print(f"Relevant Retrieved: {sum(y_true)}/{len(y_true)}") print("–" * 40) return { "job_role": job_role, "precision": precision, "recall": recall, "f1": f1, "relevant_retrieved": sum(y_true), "total_retrieved": len(y_true), "ground_truth_count": len(ground_truth_qs), } k_values = [5, 10, 20] all_roles = extract_all_roles_from_qdrant(collection_name="interview_questions") results = [] for k in k_values: for role in all_roles: metrics = evaluate_retrieval(role, all_roles, k=k) if metrics: # only if we found ground truth metrics["k"] = k results.append(metrics) import pandas as pd df = pd.DataFrame(results) summary = df.groupby("k")[["precision", "recall", "f1"]].mean().round(3) print(summary) def extract_job_details(job_description): """Extract job details such as title, skills, experience level, and years of experience from the job description.""" title_match = re.search(r"(?i)(?:seeking|hiring) a (.+?) to", job_description) job_title = title_match.group(1) if title_match else "Unknown" skills_match = re.findall(r"(?i)(?:Proficiency in|Experience with|Knowledge of) (.+?)(?:,|\.| and| or)", job_description) skills = list(set([skill.strip() for skill in skills_match])) if skills_match else [] experience_match = re.search(r"(\d+)\+? years of experience", job_description) if experience_match: years_experience = int(experience_match.group(1)) experience_level = "Senior" if years_experience >= 5 else "Mid" if years_experience >= 3 else "Junior" else: years_experience = None experience_level = "Unknown" return { "job_title": job_title, "skills": skills, "experience_level": experience_level, "years_experience": years_experience } import re from docx import Document import textract from PyPDF2 import PdfReader JOB_TITLES = [ "Accountant", "Data Scientist", "Machine Learning Engineer", "Software Engineer", "Developer", "Analyst", "Researcher", "Intern", "Consultant", "Manager", "Engineer", "Specialist", "Project Manager", "Product Manager", "Administrator", "Director", "Officer", "Assistant", "Coordinator", "Supervisor" ] def clean_filename_name(filename): # Remove file extension base = re.sub(r"\.[^.]+$", "", filename) base = base.strip() # Remove 'cv' or 'CV' words base_clean = re.sub(r"\bcv\b", "", base, flags=re.IGNORECASE).strip() # If after removing CV it's empty, return None if not base_clean: return None # If it contains any digit, return None (unreliable) if re.search(r"\d", base_clean): return None # Replace underscores/dashes with spaces, capitalize base_clean = base_clean.replace("_", " ").replace("-", " ") return base_clean.title() def looks_like_job_title(line): for title in JOB_TITLES: pattern = r"\b" + re.escape(title.lower()) + r"\b" if re.search(pattern, line.lower()): return True return False def extract_name_from_text(lines): # Try first 3 lines for a name, skipping job titles for i in range(min(1, len(lines))): line = lines[i].strip() if looks_like_job_title(line): return "unknown" if re.search(r"\d", line): # skip lines with digits continue if len(line.split()) > 4 or len(line) > 40: # too long or many words continue # If line has only uppercase words, it's probably not a name if line.isupper(): continue # Passed checks, return title-cased line as name return line.title() return None def extract_text_from_file(file_path): if file_path.endswith('.pdf'): reader = PdfReader(file_path) text = "\n".join(page.extract_text() or '' for page in reader.pages) elif file_path.endswith('.docx'): doc = Document(file_path) text = "\n".join([para.text for para in doc.paragraphs]) else: # For .doc or fallback text = textract.process(file_path).decode('utf-8') return text.strip() def extract_candidate_details(file_path): text = extract_text_from_file(file_path) lines = [line.strip() for line in text.splitlines() if line.strip()] # Extract name filename = file_path.split("/")[-1] # just filename, no path name = clean_filename_name(filename) if not name: name = extract_name_from_text(lines) if not name: name = "Unknown" # Extract skills (basic version) skills = [] skills_section = re.search(r"Skills\s*[:\-]?\s*(.+)", text, re.IGNORECASE) if skills_section: raw_skills = skills_section.group(1) skills = [s.strip() for s in re.split(r",|\n|•|-", raw_skills) if s.strip()] return { "name": name, "skills": skills } # import gradio as gr # import time # import tempfile # import numpy as np # import scipy.io.wavfile as wavfile # import os # import json # from transformers import BarkModel, AutoProcessor # import torch, gc # import whisper # from transformers import Wav2Vec2Processor, Wav2Vec2ForSequenceClassification # import librosa # import torch # print(torch.cuda.is_available()) # ✅ Tells you if GPU is available # torch.cuda.empty_cache() # gc.collect() # # Bark TTS # print("🔁 Loading Bark model...") # model_bark = BarkModel.from_pretrained("suno/bark").to("cuda" if torch.cuda.is_available() else "cpu") # print("✅ Bark model loaded") # print("🔁 Loading Bark processor...") # processor_bark = AutoProcessor.from_pretrained("suno/bark") # print("✅ Bark processor loaded") # bark_voice_preset = "v2/en_speaker_5" # def bark_tts(text): # print(f"🔁 Synthesizing TTS for: {text}") # # Process the text # inputs = processor_bark(text, return_tensors="pt", voice_preset=bark_voice_preset) # # Move tensors to device # input_ids = inputs["input_ids"].to(model_bark.device) # start = time.time() # # Generate speech with only the required parameters # with torch.no_grad(): # speech_values = model_bark.generate( # input_ids=input_ids, # do_sample=True, # fine_temperature=0.4, # coarse_temperature=0.8 # ) # print(f"✅ Bark finished in {round(time.time() - start, 2)}s") # # Convert to audio # speech = speech_values.cpu().numpy().squeeze() # speech = (speech * 32767).astype(np.int16) # temp_wav = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") # wavfile.write(temp_wav.name, 22050, speech) # return temp_wav.name # # Whisper STT # print("🔁 Loading Whisper model...") # whisper_model = whisper.load_model("base", device="cuda") # print("✅ Whisper model loaded") # def whisper_stt(audio_path): # if not audio_path or not os.path.exists(audio_path): return "" # result = whisper_model.transcribe(audio_path) # return result["text"] # seniority_mapping = { # "Entry-level": 1, "Junior": 2, "Mid-Level": 3, "Senior": 4, "Lead": 5 # } # # --- 2. Gradio App --- # with gr.Blocks(theme=gr.themes.Soft()) as demo: # user_data = gr.State({}) # interview_state = gr.State({}) # missing_fields_state = gr.State([]) # # --- UI Layout --- # with gr.Column(visible=True) as user_info_section: # gr.Markdown("## Candidate Information") # cv_file = gr.File(label="Upload CV") # job_desc = gr.Textbox(label="Job Description") # start_btn = gr.Button("Continue", interactive=False) # with gr.Column(visible=False) as missing_section: # gr.Markdown("## Missing Information") # name_in = gr.Textbox(label="Name", visible=False) # role_in = gr.Textbox(label="Job Role", visible=False) # seniority_in = gr.Dropdown(list(seniority_mapping.keys()), label="Seniority", visible=False) # skills_in = gr.Textbox(label="Skills", visible=False) # submit_btn = gr.Button("Submit", interactive=False) # with gr.Column(visible=False) as interview_pre_section: # pre_interview_greeting_md = gr.Markdown() # start_interview_final_btn = gr.Button("Start Interview") # with gr.Column(visible=False) as interview_section: # gr.Markdown("## Interview in Progress") # question_audio = gr.Audio(label="Listen", interactive=False, autoplay=True) # question_text = gr.Markdown() # user_audio_input = gr.Audio(sources=["microphone"], type="filepath", label="1. Record Audio Answer") # stt_transcript = gr.Textbox(label="Transcribed Answer (edit if needed)") # confirm_btn = gr.Button("Confirm Answer") # evaluation_display = gr.Markdown() # interview_summary = gr.Markdown(visible=False) # # --- UI Logic --- # def validate_start_btn(cv_file, job_desc): # return gr.update(interactive=(cv_file is not None and hasattr(cv_file, "name") and bool(job_desc and job_desc.strip()))) # cv_file.change(validate_start_btn, [cv_file, job_desc], start_btn) # job_desc.change(validate_start_btn, [cv_file, job_desc], start_btn) # def process_and_route_initial(cv_file, job_desc): # details = extract_candidate_details(cv_file.name) # job_info = extract_job_details(job_desc) # data = { # "name": details.get("name", "unknown"), "job_role": job_info.get("job_title", "unknown"), # "seniority": job_info.get("experience_level", "unknown"), "skills": job_info.get("skills", []) # } # missing = [k for k, v in data.items() if (isinstance(v, str) and v.lower() == "unknown") or not v] # if missing: # return data, missing, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False) # else: # greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' when ready." # return data, missing, gr.update(visible=False), gr.update(visible=False), gr.update(visible=True, value=greeting) # start_btn.click( # process_and_route_initial, # [cv_file, job_desc], # [user_data, missing_fields_state, user_info_section, missing_section, pre_interview_greeting_md] # ) # def show_missing(missing): # if missing is None: missing = [] # return gr.update(visible="name" in missing), gr.update(visible="job_role" in missing), gr.update(visible="seniority" in missing), gr.update(visible="skills" in missing) # missing_fields_state.change(show_missing, missing_fields_state, [name_in, role_in, seniority_in, skills_in]) # def validate_fields(name, role, seniority, skills, missing): # if not missing: return gr.update(interactive=False) # all_filled = all([(not ("name" in missing) or bool(name.strip())), (not ("job_role" in missing) or bool(role.strip())), (not ("seniority" in missing) or bool(seniority)), (not ("skills" in missing) or bool(skills.strip())),]) # return gr.update(interactive=all_filled) # for inp in [name_in, role_in, seniority_in, skills_in]: # inp.change(validate_fields, [name_in, role_in, seniority_in, skills_in, missing_fields_state], submit_btn) # def complete_manual(data, name, role, seniority, skills): # if data["name"].lower() == "unknown": data["name"] = name # if data["job_role"].lower() == "unknown": data["job_role"] = role # if data["seniority"].lower() == "unknown": data["seniority"] = seniority # if not data["skills"]: data["skills"] = [s.strip() for s in skills.split(",")] # greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' to begin." # return data, gr.update(visible=False), gr.update(visible=True), gr.update(value=greeting) # submit_btn.click(complete_manual, [user_data, name_in, role_in, seniority_in, skills_in], [user_data, missing_section, interview_pre_section, pre_interview_greeting_md]) # def start_interview(data): # # --- Advanced state with full logging --- # state = { # "questions": [], "answers": [], "face_labels": [], "voice_labels": [], "timings": [], # "question_evaluations": [], "answer_evaluations": [], "effective_confidences": [], # "conversation_history": [], # "difficulty_adjustment": None, # "question_idx": 0, "max_questions": 3, "q_start_time": time.time(), # "log": [] # } # # --- Optionally: context retrieval here (currently just blank) --- # context = "" # prompt = build_interview_prompt( # conversation_history=[], user_response="", context=context, job_role=data["job_role"], # skills=data["skills"], seniority=data["seniority"], difficulty_adjustment=None, # voice_label="neutral", face_label="neutral" # ) # #here the original one # # first_q = groq_llm.predict(prompt) # # # Evaluate Q for quality # # q_eval = eval_question_quality(first_q, data["job_role"], data["seniority"], None) # # state["questions"].append(first_q) # # state["question_evaluations"].append(q_eval) # #here the testing one # first_q = groq_llm.predict(prompt) # q_eval = { # "Score": "N/A", # "Reasoning": "Skipped to reduce processing time", # "Improvements": [] # } # state["questions"].append(first_q) # state["question_evaluations"].append(q_eval) # state["conversation_history"].append({'role': 'Interviewer', 'content': first_q}) # start = time.perf_counter() # audio_path = bark_tts(first_q) # print("⏱️ Bark TTS took", time.perf_counter() - start, "seconds") # # LOG # state["log"].append({"type": "question", "question": first_q, "question_eval": q_eval, "timestamp": time.time()}) # return state, gr.update(visible=False), gr.update(visible=True), audio_path, f"*Question 1:* {first_q}" # start_interview_final_btn.click(start_interview, [user_data], [interview_state, interview_pre_section, interview_section, question_audio, question_text]) # def transcribe(audio_path): # return whisper_stt(audio_path) # user_audio_input.change(transcribe, user_audio_input, stt_transcript) # def process_answer(transcript, audio_path, state, data): # if not transcript: # return state, gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update() # elapsed = round(time.time() - state.get("q_start_time", time.time()), 2) # state["timings"].append(elapsed) # state["answers"].append(transcript) # state["conversation_history"].append({'role': 'Candidate', 'content': transcript}) # # --- 1. Emotion analysis (simplified for testing) --- # voice_label = "neutral" # face_label = "neutral" # state["voice_labels"].append(voice_label) # state["face_labels"].append(face_label) # # --- 2. Evaluate previous Q and Answer --- # last_q = state["questions"][-1] # q_eval = state["question_evaluations"][-1] # Already in state # ref_answer = generate_reference_answer(last_q, data["job_role"], data["seniority"]) # answer_eval = evaluate_answer(last_q, transcript, ref_answer, data["job_role"], data["seniority"], None) # state["answer_evaluations"].append(answer_eval) # answer_score = answer_eval.get("Score", "medium") if answer_eval else "medium" # # --- 3. Adaptive difficulty --- # if answer_score == "excellent": # state["difficulty_adjustment"] = "harder" # elif answer_score in ("medium", "poor"): # state["difficulty_adjustment"] = "easier" # else: # state["difficulty_adjustment"] = None # # --- 4. Effective confidence (simplified) --- # eff_conf = {"effective_confidence": 0.6} # state["effective_confidences"].append(eff_conf) # # --- LOG --- # state["log"].append({ # "type": "answer", # "question": last_q, # "answer": transcript, # "answer_eval": answer_eval, # "ref_answer": ref_answer, # "face_label": face_label, # "voice_label": voice_label, # "effective_confidence": eff_conf, # "timing": elapsed, # "timestamp": time.time() # }) # # --- Next or End --- # qidx = state["question_idx"] + 1 # if qidx >= state["max_questions"]: # # Save as JSON (optionally) # timestamp = time.strftime("%Y%m%d_%H%M%S") # log_file = f"interview_log_{timestamp}.json" # with open(log_file, "w", encoding="utf-8") as f: # json.dump(state["log"], f, indent=2, ensure_ascii=False) # # Report # summary = "# Interview Summary\n" # for i, q in enumerate(state["questions"]): # summary += (f"\n### Q{i + 1}: {q}\n" # f"- *Answer*: {state['answers'][i]}\n" # f"- *Q Eval*: {state['question_evaluations'][i]}\n" # f"- *A Eval*: {state['answer_evaluations'][i]}\n" # f"- *Time*: {state['timings'][i]}s\n") # summary += f"\n\n⏺ Full log saved as {log_file}." # return (state, gr.update(visible=True, value=summary), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(visible=True, value=f"Last Detected — Face: {face_label}, Voice: {voice_label}")) # else: # # --- Build next prompt using adaptive difficulty --- # state["question_idx"] = qidx # state["q_start_time"] = time.time() # context = "" # You can add your context logic here # prompt = build_interview_prompt( # conversation_history=state["conversation_history"], # user_response=transcript, # context=context, # job_role=data["job_role"], # skills=data["skills"], # seniority=data["seniority"], # difficulty_adjustment=state["difficulty_adjustment"], # voice_label=voice_label, # ) # next_q = groq_llm.predict(prompt) # # Evaluate Q quality # q_eval = eval_question_quality(next_q, data["job_role"], data["seniority"], None) # state["questions"].append(next_q) # state["question_evaluations"].append(q_eval) # state["conversation_history"].append({'role': 'Interviewer', 'content': next_q}) # state["log"].append({"type": "question", "question": next_q, "question_eval": q_eval, "timestamp": time.time()}) # audio_path = bark_tts(next_q) # # Display evaluations # eval_md = f"*Last Answer Eval:* {answer_eval}\n\n*Effective Confidence:* {eff_conf}" # return ( # state, gr.update(visible=False), audio_path, f"*Question {qidx + 1}:* {next_q}", # gr.update(value=None), gr.update(value=None), # gr.update(visible=True, value=eval_md), # ) # # Replace your confirm_btn.click with this: # confirm_btn.click( # process_answer, # [stt_transcript, user_audio_input, interview_state, user_data], # Added None for video_path # [interview_state, interview_summary, question_audio, question_text, user_audio_input, stt_transcript, evaluation_display] # ).then( # lambda: (gr.update(value=None), gr.update(value=None)), None, [user_audio_input, stt_transcript] # ) # demo.launch(debug=True) import gradio as gr import time import tempfile import numpy as np import scipy.io.wavfile as wavfile import os import json from transformers import BarkModel, AutoProcessor import torch, gc import whisper import asyncio import threading from concurrent.futures import ThreadPoolExecutor print(torch.cuda.is_available()) torch.cuda.empty_cache() gc.collect() # Global variables for lazy loading model_bark = None processor_bark = None whisper_model = None bark_voice_preset = "v2/en_speaker_1" # Thread pool for async operations executor = ThreadPoolExecutor(max_workers=2) # Add after your imports if torch.cuda.is_available(): print(f"🔥 CUDA Available: {torch.cuda.get_device_name(0)}") print(f"🔥 CUDA Memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB") # Set default device torch.cuda.set_device(0) else: print("⚠️ CUDA not available, using CPU") def load_models_lazy(): """Load models only when needed""" global model_bark, processor_bark, whisper_model device = "cuda" if torch.cuda.is_available() else "cpu" print(f"🔁 Using device: {device}") if model_bark is None: print("🔁 Loading Bark model...") model_bark = BarkModel.from_pretrained("suno/bark").to(device) print(f"✅ Bark model loaded on {device}") if processor_bark is None: print("🔁 Loading Bark processor...") processor_bark = AutoProcessor.from_pretrained("suno/bark") print("✅ Bark processor loaded") if whisper_model is None: print("🔁 Loading Whisper model...") whisper_model = whisper.load_model("base", device=device) print(f"✅ Whisper model loaded on {device}") def bark_tts_async(text): """Async TTS generation""" def _generate(): load_models_lazy() # Load only when needed print(f"🔁 Synthesizing TTS for: {text}") # Ensure we're using the correct device device = next(model_bark.parameters()).device print(f"🔁 Bark model is on device: {device}") inputs = processor_bark(text, return_tensors="pt", voice_preset=bark_voice_preset) input_ids = inputs["input_ids"].to(device) # Move to same device as model start = time.time() with torch.no_grad(): speech_values = model_bark.generate(input_ids=input_ids) print(f"✅ Bark finished in {round(time.time() - start, 2)}s on {device}") speech = speech_values.cpu().numpy().squeeze() speech = (speech * 32767).astype(np.int16) temp_wav = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") wavfile.write(temp_wav.name, 22050, speech) return temp_wav.name return executor.submit(_generate) def whisper_stt(audio_path): """Lazy loading whisper STT""" if not audio_path or not os.path.exists(audio_path): return "" load_models_lazy() # Load only when needed # Check what device Whisper is actually using device = "cuda" if torch.cuda.is_available() else "cpu" print(f"🔁 Whisper transcribing on {device}") result = whisper_model.transcribe(audio_path) return result["text"] seniority_mapping = { "Entry-level": 1, "Junior": 2, "Mid-Level": 3, "Senior": 4, "Lead": 5 } with gr.Blocks(theme=gr.themes.Soft()) as demo: user_data = gr.State({}) interview_state = gr.State({}) missing_fields_state = gr.State([]) tts_future = gr.State(None) # Store async TTS future with gr.Column(visible=True) as user_info_section: gr.Markdown("## Candidate Information") cv_file = gr.File(label="Upload CV") job_desc = gr.Textbox(label="Job Description") start_btn = gr.Button("Continue", interactive=False) with gr.Column(visible=False) as missing_section: gr.Markdown("## Missing Information") name_in = gr.Textbox(label="Name", visible=False) role_in = gr.Textbox(label="Job Role", visible=False) seniority_in = gr.Dropdown(list(seniority_mapping.keys()), label="Seniority", visible=False) skills_in = gr.Textbox(label="Skills", visible=False) submit_btn = gr.Button("Submit", interactive=False) with gr.Column(visible=False) as interview_pre_section: pre_interview_greeting_md = gr.Markdown() start_interview_final_btn = gr.Button("Start Interview") loading_status = gr.Markdown("", visible=False) with gr.Column(visible=False) as interview_section: gr.Markdown("## Interview in Progress") question_audio = gr.Audio(label="Listen", interactive=False, autoplay=True) question_text = gr.Markdown() user_audio_input = gr.Audio(sources=["microphone"], type="filepath", label="1. Record Audio Answer") stt_transcript = gr.Textbox(label="Transcribed Answer (edit if needed)") confirm_btn = gr.Button("Confirm Answer") evaluation_display = gr.Markdown() interview_summary = gr.Markdown(visible=False) def validate_start_btn(cv_file, job_desc): return gr.update(interactive=(cv_file is not None and hasattr(cv_file, "name") and bool(job_desc and job_desc.strip()))) cv_file.change(validate_start_btn, [cv_file, job_desc], start_btn) job_desc.change(validate_start_btn, [cv_file, job_desc], start_btn) def process_and_route_initial(cv_file, job_desc): details = extract_candidate_details(cv_file.name) job_info = extract_job_details(job_desc) data = { "name": details.get("name", "unknown"), "job_role": job_info.get("job_title", "unknown"), "seniority": job_info.get("experience_level", "unknown"), "skills": job_info.get("skills", []) } missing = [k for k, v in data.items() if (isinstance(v, str) and v.lower() == "unknown") or not v] if missing: return data, missing, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False) else: greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' when ready." return data, missing, gr.update(visible=False), gr.update(visible=False), gr.update(visible=True, value=greeting) start_btn.click(process_and_route_initial, [cv_file, job_desc], [user_data, missing_fields_state, user_info_section, missing_section, pre_interview_greeting_md]) def show_missing(missing): if missing is None: missing = [] return gr.update(visible="name" in missing), gr.update(visible="job_role" in missing), gr.update(visible="seniority" in missing), gr.update(visible="skills" in missing) missing_fields_state.change(show_missing, missing_fields_state, [name_in, role_in, seniority_in, skills_in]) def validate_fields(name, role, seniority, skills, missing): if not missing: return gr.update(interactive=False) all_filled = all([(not ("name" in missing) or bool(name.strip())), (not ("job_role" in missing) or bool(role.strip())), (not ("seniority" in missing) or bool(seniority)), (not ("skills" in missing) or bool(skills.strip()))]) return gr.update(interactive=all_filled) for inp in [name_in, role_in, seniority_in, skills_in]: inp.change(validate_fields, [name_in, role_in, seniority_in, skills_in, missing_fields_state], submit_btn) def complete_manual(data, name, role, seniority, skills): if data["name"].lower() == "unknown": data["name"] = name if data["job_role"].lower() == "unknown": data["job_role"] = role if data["seniority"].lower() == "unknown": data["seniority"] = seniority if not data["skills"]: data["skills"] = [s.strip() for s in skills.split(",")] greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' to begin." return data, gr.update(visible=False), gr.update(visible=True), gr.update(value=greeting) submit_btn.click(complete_manual, [user_data, name_in, role_in, seniority_in, skills_in], [user_data, missing_section, interview_pre_section, pre_interview_greeting_md]) def start_interview(data): # Initialize interview state state = { "questions": [], "answers": [], "timings": [], "question_evaluations": [], "answer_evaluations": [], "conversation_history": [], "difficulty_adjustment": None, "question_idx": 0, "max_questions": 3, "q_start_time": time.time(), "log": [] } # Build prompt for first question context = "" prompt = build_interview_prompt( conversation_history=[], user_response="", context=context, job_role=data["job_role"], skills=data["skills"], seniority=data["seniority"], difficulty_adjustment=None, voice_label="neutral" ) # Generate first question first_q = groq_llm.predict(prompt) q_eval = { "Score": "N/A", "Reasoning": "Skipped to reduce processing time", "Improvements": [] } state["questions"].append(first_q) state["question_evaluations"].append(q_eval) state["conversation_history"].append({'role': 'Interviewer', 'content': first_q}) # Generate audio with Bark (wait for it) start = time.perf_counter() cleaned_text = first_q.strip().replace("\n", " ") audio_future = bark_tts_async(cleaned_text) audio_path = audio_future.result() print("⏱️ Bark TTS took", round(time.perf_counter() - start, 2), "seconds") # Log question state["log"].append({ "type": "question", "question": first_q, "question_eval": q_eval, "timestamp": time.time() }) return ( state, gr.update(visible=False), # Hide interview_pre_section gr.update(visible=True), # Show interview_section audio_path, # Set audio f"*Question 1:* {first_q}" # Set question text ) # Hook into Gradio start_interview_final_btn.click( start_interview, [user_data], [interview_state, interview_pre_section, interview_section, question_audio, question_text] ) def transcribe(audio_path): return whisper_stt(audio_path) user_audio_input.change(transcribe, user_audio_input, stt_transcript) def process_answer(transcript, audio_path, state, data): if not transcript: return state, gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update() elapsed = round(time.time() - state.get("q_start_time", time.time()), 2) state["timings"].append(elapsed) state["answers"].append(transcript) state["conversation_history"].append({'role': 'Candidate', 'content': transcript}) last_q = state["questions"][-1] q_eval = state["question_evaluations"][-1] ref_answer = generate_reference_answer(last_q, data["job_role"], data["seniority"]) answer_eval = evaluate_answer(last_q, transcript, ref_answer, data["job_role"], data["seniority"], None) state["answer_evaluations"].append(answer_eval) answer_score = answer_eval.get("Score", "medium") if answer_eval else "medium" if answer_score == "excellent": state["difficulty_adjustment"] = "harder" elif answer_score in ("medium", "poor"): state["difficulty_adjustment"] = "easier" else: state["difficulty_adjustment"] = None state["log"].append({ "type": "answer", "question": last_q, "answer": transcript, "answer_eval": answer_eval, "ref_answer": ref_answer, "timing": elapsed, "timestamp": time.time() }) qidx = state["question_idx"] + 1 if qidx >= state["max_questions"]: timestamp = time.strftime("%Y%m%d_%H%M%S") log_file = f"interview_log_{timestamp}.json" with open(log_file, "w", encoding="utf-8") as f: json.dump(state["log"], f, indent=2, ensure_ascii=False) summary = "# Interview Summary\n" for i, q in enumerate(state["questions"]): summary += (f"\n### Q{i + 1}: {q}\n" f"- *Answer*: {state['answers'][i]}\n" f"- *Q Eval*: {state['question_evaluations'][i]}\n" f"- *A Eval*: {state['answer_evaluations'][i]}\n" f"- *Time*: {state['timings'][i]}s\n") summary += f"\n\n⏺ Full log saved as {log_file}." return state, gr.update(visible=True, value=summary), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(visible=False) else: state["question_idx"] = qidx state["q_start_time"] = time.time() context = "" prompt = build_interview_prompt( conversation_history=state["conversation_history"], user_response=transcript, context=context, job_role=data["job_role"], skills=data["skills"], seniority=data["seniority"], difficulty_adjustment=state["difficulty_adjustment"], voice_label="neutral" ) next_q = groq_llm.predict(prompt) q_eval = eval_question_quality(next_q, data["job_role"], data["seniority"], None) state["questions"].append(next_q) state["question_evaluations"].append(q_eval) state["conversation_history"].append({'role': 'Interviewer', 'content': next_q}) state["log"].append({"type": "question", "question": next_q, "question_eval": q_eval, "timestamp": time.time()}) # Generate TTS asynchronously for next question too audio_future = bark_tts_async(next_q) # For now, we'll wait for it (you can make this async too) audio_path = audio_future.result() eval_md = f"*Last Answer Eval:* {answer_eval}" return state, gr.update(visible=False), audio_path, f"*Question {qidx + 1}:* {next_q}", gr.update(value=None), gr.update(value=None), gr.update(visible=True, value=eval_md) confirm_btn.click( process_answer, [stt_transcript, user_audio_input, interview_state, user_data], [interview_state, interview_summary, question_audio, question_text, user_audio_input, stt_transcript, evaluation_display] ).then( lambda: (gr.update(value=None), gr.update(value=None)), None, [user_audio_input, stt_transcript] ) demo.launch(debug=True)