import os import json import asyncio import edge_tts from faster_whisper import WhisperModel from langchain_groq import ChatGroq import logging import tempfile import shutil # Initialize models chat_groq_api = os.getenv("GROQ_API_KEY") if not chat_groq_api: raise ValueError("GROQ_API_KEY is not set in environment variables.") groq_llm = ChatGroq( temperature=0.7, model_name="llama-3.3-70b-versatile", api_key=chat_groq_api ) # Initialize Whisper model whisper_model = None def load_whisper_model(): global whisper_model if whisper_model is None: try: device = "cuda" if os.system("nvidia-smi") == 0 else "cpu" compute_type = "float16" if device == "cuda" else "int8" whisper_model = WhisperModel("base", device=device, compute_type=compute_type) logging.info(f"Whisper model loaded on {device} with {compute_type}") except Exception as e: logging.error(f"Error loading Whisper model: {e}") # Fallback to CPU whisper_model = WhisperModel("base", device="cpu", compute_type="int8") return whisper_model def generate_first_question(profile, job): """Generate the first interview question based on profile and job""" try: prompt = f""" You are conducting an interview for a {job.role} position at {job.company}. The candidate's profile shows: - Skills: {profile.get('skills', [])} - Experience: {profile.get('experience', [])} - Education: {profile.get('education', [])} Generate an appropriate opening interview question that is professional and relevant. Keep it concise and clear. Respond with ONLY the question text, no additional formatting. """ response = groq_llm.invoke(prompt) # Fix: Handle AIMessage object properly if hasattr(response, 'content'): question = response.content.strip() elif isinstance(response, str): question = response.strip() else: question = str(response).strip() # Ensure we have a valid question if not question or len(question) < 10: question = "Tell me about yourself and why you're interested in this position." logging.info(f"Generated question: {question}") return question except Exception as e: logging.error(f"Error generating first question: {e}") return "Tell me about yourself and why you're interested in this position." def edge_tts_to_file_sync(text, output_path, voice="en-US-AriaNeural"): """Synchronous wrapper for edge-tts with better error handling""" try: # Ensure text is not empty if not text or not text.strip(): logging.error("Empty text provided for TTS") return None # Ensure the directory exists and is writable directory = os.path.dirname(output_path) if not directory: directory = "/tmp/audio" output_path = os.path.join(directory, os.path.basename(output_path)) os.makedirs(directory, exist_ok=True) # Test write permissions with a temporary file test_file = os.path.join(directory, f"test_{os.getpid()}.tmp") try: with open(test_file, 'w') as f: f.write("test") os.remove(test_file) logging.info(f"Directory {directory} is writable") except (PermissionError, OSError) as e: logging.error(f"Directory {directory} is not writable: {e}") # Fallback to /tmp directory = "/tmp/audio" output_path = os.path.join(directory, os.path.basename(output_path)) os.makedirs(directory, exist_ok=True) async def generate_audio(): try: communicate = edge_tts.Communicate(text, voice) await communicate.save(output_path) logging.info(f"TTS audio saved to: {output_path}") except Exception as e: logging.error(f"Error in async TTS generation: {e}") raise # Run async function in sync context try: loop = asyncio.get_event_loop() if loop.is_running(): # If loop is already running, create a new one in a thread import threading import concurrent.futures def run_in_thread(): new_loop = asyncio.new_event_loop() asyncio.set_event_loop(new_loop) try: new_loop.run_until_complete(generate_audio()) finally: new_loop.close() with concurrent.futures.ThreadPoolExecutor() as executor: future = executor.submit(run_in_thread) future.result(timeout=30) # 30 second timeout else: loop.run_until_complete(generate_audio()) except RuntimeError: # No event loop exists loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete(generate_audio()) finally: loop.close() # Verify file was created and has content if os.path.exists(output_path): file_size = os.path.getsize(output_path) if file_size > 1000: # At least 1KB for a valid audio file logging.info(f"TTS file created successfully: {output_path} ({file_size} bytes)") return output_path else: logging.error(f"TTS file is too small: {output_path} ({file_size} bytes)") return None else: logging.error(f"TTS file was not created: {output_path}") return None except Exception as e: logging.error(f"Error in TTS generation: {e}") return None def convert_webm_to_wav(webm_path, wav_path): """Convert WebM audio to WAV using ffmpeg if available""" try: import subprocess result = subprocess.run([ 'ffmpeg', '-i', webm_path, '-ar', '16000', '-ac', '1', '-y', wav_path ], capture_output=True, text=True, timeout=30) if result.returncode == 0 and os.path.exists(wav_path) and os.path.getsize(wav_path) > 0: logging.info(f"Successfully converted {webm_path} to {wav_path}") return wav_path else: logging.error(f"FFmpeg conversion failed: {result.stderr}") return None except (subprocess.TimeoutExpired, FileNotFoundError, Exception) as e: logging.error(f"Error converting audio: {e}") return None def whisper_stt(audio_path): """Speech-to-text using Faster-Whisper with better error handling""" try: if not audio_path or not os.path.exists(audio_path): logging.error(f"Audio file does not exist: {audio_path}") return "" # Check if file has content file_size = os.path.getsize(audio_path) if file_size == 0: logging.error(f"Audio file is empty: {audio_path}") return "" logging.info(f"Processing audio file: {audio_path} ({file_size} bytes)") # If the file is WebM, try to convert it to WAV if audio_path.endswith('.webm'): wav_path = audio_path.replace('.webm', '.wav') converted_path = convert_webm_to_wav(audio_path, wav_path) if converted_path: audio_path = converted_path else: logging.warning("Could not convert WebM to WAV, trying with original file") model = load_whisper_model() # Add timeout and better error handling try: segments, info = model.transcribe( audio_path, language="en", # Specify language for better performance task="transcribe", vad_filter=True, # Voice activity detection vad_parameters=dict(min_silence_duration_ms=500) ) transcript_parts = [] for segment in segments: if hasattr(segment, 'text') and segment.text.strip(): transcript_parts.append(segment.text.strip()) transcript = " ".join(transcript_parts) if transcript: logging.info(f"Transcription successful: '{transcript[:100]}...'") else: logging.warning("No speech detected in audio file") return transcript.strip() except Exception as e: logging.error(f"Error during transcription: {e}") return "" except Exception as e: logging.error(f"Error in STT: {e}") return "" def evaluate_answer(question, answer, job_role="Software Developer", seniority="Mid-level"): """Evaluate candidate's answer with better error handling""" try: if not answer or not answer.strip(): return { "score": "Poor", "feedback": "No answer provided." } prompt = f""" You are evaluating a candidate's answer for a {seniority} {job_role} position. Question: {question} Candidate Answer: {answer} Evaluate based on technical correctness, clarity, and relevance. Provide a brief evaluation in 1-2 sentences. Rate the answer as one of: Poor, Medium, Good, Excellent Respond in this exact format: Score: [Poor/Medium/Good/Excellent] Feedback: [Your brief feedback here] """ response = groq_llm.invoke(prompt) # Handle AIMessage object properly if hasattr(response, 'content'): response_text = response.content.strip() elif isinstance(response, str): response_text = response.strip() else: response_text = str(response).strip() # Parse the response lines = response_text.split('\n') score = "Medium" # default feedback = "Good answer, but could be more detailed." # default for line in lines: line = line.strip() if line.startswith('Score:'): score = line.replace('Score:', '').strip() elif line.startswith('Feedback:'): feedback = line.replace('Feedback:', '').strip() # Ensure score is valid valid_scores = ["Poor", "Medium", "Good", "Excellent"] if score not in valid_scores: score = "Medium" return { "score": score, "feedback": feedback } except Exception as e: logging.error(f"Error evaluating answer: {e}") return { "score": "Medium", "feedback": "Unable to evaluate answer at this time." }