Spaces:
Paused
Paused
import os | |
import json | |
import asyncio | |
import edge_tts | |
from faster_whisper import WhisperModel | |
from langchain_groq import ChatGroq | |
import logging | |
import tempfile | |
import shutil | |
# Initialize models | |
chat_groq_api = os.getenv("GROQ_API_KEY") | |
if not chat_groq_api: | |
raise ValueError("GROQ_API_KEY is not set in environment variables.") | |
groq_llm = ChatGroq( | |
temperature=0.7, | |
model_name="llama-3.3-70b-versatile", | |
api_key=chat_groq_api | |
) | |
# Initialize Whisper model | |
whisper_model = None | |
def load_whisper_model(): | |
global whisper_model | |
if whisper_model is None: | |
try: | |
device = "cuda" if os.system("nvidia-smi") == 0 else "cpu" | |
compute_type = "float16" if device == "cuda" else "int8" | |
whisper_model = WhisperModel("base", device=device, compute_type=compute_type) | |
logging.info(f"Whisper model loaded on {device} with {compute_type}") | |
except Exception as e: | |
logging.error(f"Error loading Whisper model: {e}") | |
# Fallback to CPU | |
whisper_model = WhisperModel("base", device="cpu", compute_type="int8") | |
return whisper_model | |
def generate_first_question(profile, job): | |
"""Generate the first interview question based on profile and job""" | |
try: | |
prompt = f""" | |
You are conducting an interview for a {job.role} position at {job.company}. | |
The candidate's profile shows: | |
- Skills: {profile.get('skills', [])} | |
- Experience: {profile.get('experience', [])} | |
- Education: {profile.get('education', [])} | |
Generate an appropriate opening interview question that is professional and relevant. | |
Keep it concise and clear. Respond with ONLY the question text, no additional formatting. | |
""" | |
response = groq_llm.invoke(prompt) | |
# Fix: Handle AIMessage object properly | |
if hasattr(response, 'content'): | |
question = response.content.strip() | |
elif isinstance(response, str): | |
question = response.strip() | |
else: | |
question = str(response).strip() | |
# Ensure we have a valid question | |
if not question or len(question) < 10: | |
question = "Tell me about yourself and why you're interested in this position." | |
logging.info(f"Generated question: {question}") | |
return question | |
except Exception as e: | |
logging.error(f"Error generating first question: {e}") | |
return "Tell me about yourself and why you're interested in this position." | |
def edge_tts_to_file_sync(text, output_path, voice="en-US-AriaNeural"): | |
"""Synchronous wrapper for edge-tts with better error handling""" | |
try: | |
# Ensure text is not empty | |
if not text or not text.strip(): | |
logging.error("Empty text provided for TTS") | |
return None | |
# Ensure the directory exists and is writable | |
directory = os.path.dirname(output_path) | |
if not directory: | |
directory = "/tmp/audio" | |
output_path = os.path.join(directory, os.path.basename(output_path)) | |
os.makedirs(directory, exist_ok=True) | |
# Test write permissions with a temporary file | |
test_file = os.path.join(directory, f"test_{os.getpid()}.tmp") | |
try: | |
with open(test_file, 'w') as f: | |
f.write("test") | |
os.remove(test_file) | |
logging.info(f"Directory {directory} is writable") | |
except (PermissionError, OSError) as e: | |
logging.error(f"Directory {directory} is not writable: {e}") | |
# Fallback to /tmp | |
directory = "/tmp/audio" | |
output_path = os.path.join(directory, os.path.basename(output_path)) | |
os.makedirs(directory, exist_ok=True) | |
async def generate_audio(): | |
try: | |
communicate = edge_tts.Communicate(text, voice) | |
await communicate.save(output_path) | |
logging.info(f"TTS audio saved to: {output_path}") | |
except Exception as e: | |
logging.error(f"Error in async TTS generation: {e}") | |
raise | |
# Run async function in sync context | |
try: | |
loop = asyncio.get_event_loop() | |
if loop.is_running(): | |
# If loop is already running, create a new one in a thread | |
import threading | |
import concurrent.futures | |
def run_in_thread(): | |
new_loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(new_loop) | |
try: | |
new_loop.run_until_complete(generate_audio()) | |
finally: | |
new_loop.close() | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
future = executor.submit(run_in_thread) | |
future.result(timeout=30) # 30 second timeout | |
else: | |
loop.run_until_complete(generate_audio()) | |
except RuntimeError: | |
# No event loop exists | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
try: | |
loop.run_until_complete(generate_audio()) | |
finally: | |
loop.close() | |
# Verify file was created and has content | |
if os.path.exists(output_path): | |
file_size = os.path.getsize(output_path) | |
if file_size > 1000: # At least 1KB for a valid audio file | |
logging.info(f"TTS file created successfully: {output_path} ({file_size} bytes)") | |
return output_path | |
else: | |
logging.error(f"TTS file is too small: {output_path} ({file_size} bytes)") | |
return None | |
else: | |
logging.error(f"TTS file was not created: {output_path}") | |
return None | |
except Exception as e: | |
logging.error(f"Error in TTS generation: {e}") | |
return None | |
def convert_webm_to_wav(webm_path, wav_path): | |
"""Convert WebM audio to WAV using ffmpeg if available""" | |
try: | |
import subprocess | |
result = subprocess.run([ | |
'ffmpeg', '-i', webm_path, '-ar', '16000', '-ac', '1', '-y', wav_path | |
], capture_output=True, text=True, timeout=30) | |
if result.returncode == 0 and os.path.exists(wav_path) and os.path.getsize(wav_path) > 0: | |
logging.info(f"Successfully converted {webm_path} to {wav_path}") | |
return wav_path | |
else: | |
logging.error(f"FFmpeg conversion failed: {result.stderr}") | |
return None | |
except (subprocess.TimeoutExpired, FileNotFoundError, Exception) as e: | |
logging.error(f"Error converting audio: {e}") | |
return None | |
def whisper_stt(audio_path): | |
"""Speech-to-text using Faster-Whisper with better error handling""" | |
try: | |
if not audio_path or not os.path.exists(audio_path): | |
logging.error(f"Audio file does not exist: {audio_path}") | |
return "" | |
# Check if file has content | |
file_size = os.path.getsize(audio_path) | |
if file_size == 0: | |
logging.error(f"Audio file is empty: {audio_path}") | |
return "" | |
logging.info(f"Processing audio file: {audio_path} ({file_size} bytes)") | |
# If the file is WebM, try to convert it to WAV | |
if audio_path.endswith('.webm'): | |
wav_path = audio_path.replace('.webm', '.wav') | |
converted_path = convert_webm_to_wav(audio_path, wav_path) | |
if converted_path: | |
audio_path = converted_path | |
else: | |
logging.warning("Could not convert WebM to WAV, trying with original file") | |
model = load_whisper_model() | |
# Add timeout and better error handling | |
try: | |
segments, info = model.transcribe( | |
audio_path, | |
language="en", # Specify language for better performance | |
task="transcribe", | |
vad_filter=True, # Voice activity detection | |
vad_parameters=dict(min_silence_duration_ms=500) | |
) | |
transcript_parts = [] | |
for segment in segments: | |
if hasattr(segment, 'text') and segment.text.strip(): | |
transcript_parts.append(segment.text.strip()) | |
transcript = " ".join(transcript_parts) | |
if transcript: | |
logging.info(f"Transcription successful: '{transcript[:100]}...'") | |
else: | |
logging.warning("No speech detected in audio file") | |
return transcript.strip() | |
except Exception as e: | |
logging.error(f"Error during transcription: {e}") | |
return "" | |
except Exception as e: | |
logging.error(f"Error in STT: {e}") | |
return "" | |
def evaluate_answer(question, answer, job_role="Software Developer", seniority="Mid-level"): | |
"""Evaluate candidate's answer with better error handling""" | |
try: | |
if not answer or not answer.strip(): | |
return { | |
"score": "Poor", | |
"feedback": "No answer provided." | |
} | |
prompt = f""" | |
You are evaluating a candidate's answer for a {seniority} {job_role} position. | |
Question: {question} | |
Candidate Answer: {answer} | |
Evaluate based on technical correctness, clarity, and relevance. | |
Provide a brief evaluation in 1-2 sentences. | |
Rate the answer as one of: Poor, Medium, Good, Excellent | |
Respond in this exact format: | |
Score: [Poor/Medium/Good/Excellent] | |
Feedback: [Your brief feedback here] | |
""" | |
response = groq_llm.invoke(prompt) | |
# Handle AIMessage object properly | |
if hasattr(response, 'content'): | |
response_text = response.content.strip() | |
elif isinstance(response, str): | |
response_text = response.strip() | |
else: | |
response_text = str(response).strip() | |
# Parse the response | |
lines = response_text.split('\n') | |
score = "Medium" # default | |
feedback = "Good answer, but could be more detailed." # default | |
for line in lines: | |
line = line.strip() | |
if line.startswith('Score:'): | |
score = line.replace('Score:', '').strip() | |
elif line.startswith('Feedback:'): | |
feedback = line.replace('Feedback:', '').strip() | |
# Ensure score is valid | |
valid_scores = ["Poor", "Medium", "Good", "Excellent"] | |
if score not in valid_scores: | |
score = "Medium" | |
return { | |
"score": score, | |
"feedback": feedback | |
} | |
except Exception as e: | |
logging.error(f"Error evaluating answer: {e}") | |
return { | |
"score": "Medium", | |
"feedback": "Unable to evaluate answer at this time." | |
} |