Spaces:
Paused
Paused
| import os | |
| import json | |
| import asyncio | |
| import edge_tts | |
| from faster_whisper import WhisperModel | |
| from langchain_groq import ChatGroq | |
| import logging | |
| import tempfile | |
| import shutil | |
| import torch | |
| if torch.cuda.is_available(): | |
| print("π₯ CUDA Available") | |
| print(torch.cuda.get_device_name(0)) | |
| print("cuDNN version:", torch.backends.cudnn.version()) | |
| else: | |
| print("β CUDA Not Available") | |
| print("π₯ CUDA:", torch.cuda.is_available()) | |
| print("π§ GPU:", torch.cuda.get_device_name(0)) | |
| print("π‘ cuDNN version:", torch.backends.cudnn.version()) | |
| print("π₯ cuDNN enabled:", torch.backends.cudnn.is_available()) | |
| # Initialize models | |
| chat_groq_api = os.getenv("GROQ_API_KEY") | |
| # Attempt to initialize the Groq LLM only if an API key is provided. When | |
| # running in environments where the key is unavailable (such as local | |
| # development or automated testing), fall back to a simple stub that | |
| # generates generic responses. This avoids raising an exception at import | |
| # time and allows the rest of the application to run without external | |
| # dependencies. See the DummyGroq class defined below. | |
| if chat_groq_api: | |
| try: | |
| groq_llm = ChatGroq( | |
| temperature=0.7, | |
| model_name="llama-3.3-70b-versatile", | |
| api_key=chat_groq_api | |
| ) | |
| except Exception as e: | |
| logging.error(f"Error initializing ChatGroq LLM: {e}. Falling back to dummy model.") | |
| groq_llm = None | |
| else: | |
| groq_llm = None | |
| if groq_llm is None: | |
| class DummyGroq: | |
| """A fallback language model used when no Groq API key is set. | |
| The ``invoke`` method of this class returns a simple canned response | |
| rather than calling an external API. This ensures that the | |
| interview functionality still produces a sensible prompt, albeit | |
| without advanced LLM behaviour. | |
| """ | |
| def invoke(self, prompt: str): | |
| # Provide a very generic question based on the prompt. This | |
| # implementation ignores the prompt contents entirely; in a more | |
| # sophisticated fallback you could parse ``prompt`` to tailor | |
| # responses. | |
| return "Tell me about yourself and why you're interested in this position." | |
| groq_llm = DummyGroq() | |
| # Initialize Whisper model | |
| # | |
| # Loading the Whisper model can take several seconds on first use because the | |
| # model weights must be downloaded from Hugging Face. This delay can cause | |
| # the API call to ``/api/transcribe_audio`` to appear stuck while the model | |
| # downloads. To mitigate this, we allow the model size to be configured via | |
| # the ``WHISPER_MODEL_NAME`` environment variable and preload the model when | |
| # this module is imported. Using a smaller model (e.g. "tiny" or "base.en") | |
| # reduces download size and inference time considerably. | |
| whisper_model = None | |
| def load_whisper_model(): | |
| global whisper_model | |
| if whisper_model is None: | |
| try: | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| compute_type = "float16" if device == "cuda" else "int8" | |
| # Allow overriding the model size via environment. Default to a | |
| # lightweight model to improve startup times. Available options | |
| # include: tiny, base, base.en, small, medium, large. See | |
| # https://huggingface.co/ggerganov/whisper.cpp for details. | |
| model_name = os.getenv("WHISPER_MODEL_NAME", "tiny") | |
| whisper_model = WhisperModel(model_name, device=device, compute_type=compute_type) | |
| logging.info(f"Whisper model '{model_name}' loaded on {device} with {compute_type}") | |
| except Exception as e: | |
| logging.error(f"Error loading Whisper model: {e}") | |
| # Fallback to CPU | |
| whisper_model = WhisperModel(model_name if 'model_name' in locals() else "tiny", device="cpu", compute_type="int8") | |
| return whisper_model | |
| load_whisper_model() | |
| def generate_first_question(profile, job): | |
| """Generate the first interview question based on profile and job""" | |
| try: | |
| prompt = f""" | |
| You are conducting an interview for a {job.role} position at {job.company}. | |
| The candidate's profile shows: | |
| - Skills: {profile.get('skills', [])} | |
| - Experience: {profile.get('experience', [])} | |
| - Education: {profile.get('education', [])} | |
| Generate an appropriate opening interview question that is professional and relevant. | |
| Keep it concise and clear. Respond with ONLY the question text, no additional formatting. | |
| """ | |
| response = groq_llm.invoke(prompt) | |
| # Fix: Handle AIMessage object properly | |
| if hasattr(response, 'content'): | |
| question = response.content.strip() | |
| elif isinstance(response, str): | |
| question = response.strip() | |
| else: | |
| question = str(response).strip() | |
| # Ensure we have a valid question | |
| if not question or len(question) < 10: | |
| question = "Tell me about yourself and why you're interested in this position." | |
| logging.info(f"Generated question: {question}") | |
| return question | |
| except Exception as e: | |
| logging.error(f"Error generating first question: {e}") | |
| return "Tell me about yourself and why you're interested in this position." | |
| def edge_tts_to_file_sync(text, output_path, voice="en-US-AriaNeural"): | |
| """Synchronous wrapper for edge-tts with better error handling""" | |
| try: | |
| # Ensure text is not empty | |
| if not text or not text.strip(): | |
| logging.error("Empty text provided for TTS") | |
| return None | |
| # Ensure the directory exists and is writable | |
| directory = os.path.dirname(output_path) | |
| if not directory: | |
| directory = "/tmp/audio" | |
| output_path = os.path.join(directory, os.path.basename(output_path)) | |
| os.makedirs(directory, exist_ok=True) | |
| # Test write permissions with a temporary file | |
| test_file = os.path.join(directory, f"test_{os.getpid()}.tmp") | |
| try: | |
| with open(test_file, 'w') as f: | |
| f.write("test") | |
| os.remove(test_file) | |
| logging.info(f"Directory {directory} is writable") | |
| except (PermissionError, OSError) as e: | |
| logging.error(f"Directory {directory} is not writable: {e}") | |
| # Fallback to /tmp | |
| directory = "/tmp/audio" | |
| output_path = os.path.join(directory, os.path.basename(output_path)) | |
| os.makedirs(directory, exist_ok=True) | |
| async def generate_audio(): | |
| try: | |
| communicate = edge_tts.Communicate(text, voice) | |
| await communicate.save(output_path) | |
| logging.info(f"TTS audio saved to: {output_path}") | |
| except Exception as e: | |
| logging.error(f"Error in async TTS generation: {e}") | |
| raise | |
| # Run async function in sync context | |
| try: | |
| loop = asyncio.get_event_loop() | |
| if loop.is_running(): | |
| # If loop is already running, create a new one in a thread | |
| import threading | |
| import concurrent.futures | |
| def run_in_thread(): | |
| new_loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(new_loop) | |
| try: | |
| new_loop.run_until_complete(generate_audio()) | |
| finally: | |
| new_loop.close() | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| future = executor.submit(run_in_thread) | |
| future.result(timeout=30) # 30 second timeout | |
| else: | |
| loop.run_until_complete(generate_audio()) | |
| except RuntimeError: | |
| # No event loop exists | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| loop.run_until_complete(generate_audio()) | |
| finally: | |
| loop.close() | |
| # Verify file was created and has content | |
| if os.path.exists(output_path): | |
| file_size = os.path.getsize(output_path) | |
| if file_size > 1000: # At least 1KB for a valid audio file | |
| logging.info(f"TTS file created successfully: {output_path} ({file_size} bytes)") | |
| return output_path | |
| else: | |
| logging.error(f"TTS file is too small: {output_path} ({file_size} bytes)") | |
| return None | |
| else: | |
| logging.error(f"TTS file was not created: {output_path}") | |
| return None | |
| except Exception as e: | |
| logging.error(f"Error in TTS generation: {e}") | |
| return None | |
| def convert_webm_to_wav(webm_path, wav_path): | |
| """Convert WebM audio to WAV using ffmpeg if available""" | |
| try: | |
| import subprocess | |
| result = subprocess.run([ | |
| 'ffmpeg', '-i', webm_path, '-ar', '16000', '-ac', '1', '-y', wav_path | |
| ], capture_output=True, text=True, timeout=30) | |
| if result.returncode == 0 and os.path.exists(wav_path) and os.path.getsize(wav_path) > 0: | |
| logging.info(f"Successfully converted {webm_path} to {wav_path}") | |
| return wav_path | |
| else: | |
| logging.error(f"FFmpeg conversion failed: {result.stderr}") | |
| return None | |
| except (subprocess.TimeoutExpired, FileNotFoundError, Exception) as e: | |
| logging.error(f"Error converting audio: {e}") | |
| return None | |
| import subprocess # top of the file if not already imported | |
| def whisper_stt(audio_path): | |
| """Speech-to-text using Faster-Whisper""" | |
| try: | |
| if not os.path.exists(audio_path) or os.path.getsize(audio_path) == 0: | |
| logging.error(f"Audio file is empty or missing: {audio_path}") | |
| return "" | |
| # Convert webm to wav using ffmpeg | |
| wav_path = audio_path.replace(".webm", ".wav") | |
| cmd = [ | |
| "ffmpeg", | |
| "-y", # overwrite | |
| "-i", audio_path, | |
| "-ar", "16000", | |
| "-ac", "1", | |
| "-f", "wav", | |
| wav_path | |
| ] | |
| subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| if not os.path.exists(wav_path) or os.path.getsize(wav_path) == 0: | |
| logging.error(f"FFmpeg conversion failed or produced empty file: {wav_path}") | |
| return "" | |
| model = load_whisper_model() | |
| segments, _ = model.transcribe(wav_path) | |
| transcript = " ".join(segment.text for segment in segments) | |
| return transcript.strip() | |
| except Exception as e: | |
| logging.error(f"Error in STT: {e}") | |
| return "" | |
| def evaluate_answer(question, answer, job_role="Software Developer", seniority="Mid-level"): | |
| """Evaluate candidate's answer with better error handling""" | |
| try: | |
| if not answer or not answer.strip(): | |
| return { | |
| "score": "Poor", | |
| "feedback": "No answer provided." | |
| } | |
| prompt = f""" | |
| You are evaluating a candidate's answer for a {seniority} {job_role} position. | |
| Question: {question} | |
| Candidate Answer: {answer} | |
| Evaluate based on technical correctness, clarity, and relevance. | |
| Provide a brief evaluation in 1-2 sentences. | |
| Rate the answer as one of: Poor, Medium, Good, Excellent | |
| Respond in this exact format: | |
| Score: [Poor/Medium/Good/Excellent] | |
| Feedback: [Your brief feedback here] | |
| """ | |
| response = groq_llm.invoke(prompt) | |
| # Handle AIMessage object properly | |
| if hasattr(response, 'content'): | |
| response_text = response.content.strip() | |
| elif isinstance(response, str): | |
| response_text = response.strip() | |
| else: | |
| response_text = str(response).strip() | |
| # Parse the response | |
| lines = response_text.split('\n') | |
| score = "Medium" # default | |
| feedback = "Good answer, but could be more detailed." # default | |
| for line in lines: | |
| line = line.strip() | |
| if line.startswith('Score:'): | |
| score = line.replace('Score:', '').strip() | |
| elif line.startswith('Feedback:'): | |
| feedback = line.replace('Feedback:', '').strip() | |
| # Ensure score is valid | |
| valid_scores = ["Poor", "Medium", "Good", "Excellent"] | |
| if score not in valid_scores: | |
| score = "Medium" | |
| return { | |
| "score": score, | |
| "feedback": feedback | |
| } | |
| except Exception as e: | |
| logging.error(f"Error evaluating answer: {e}") | |
| return { | |
| "score": "Medium", | |
| "feedback": "Unable to evaluate answer at this time." | |
| } |