Spaces:
Paused
Paused
File size: 9,082 Bytes
9f2b0ed 308d699 8e4e001 9f2b0ed 2ae57cb 9f2b0ed 2ae57cb 9f2b0ed 44441db 308d699 44441db 308d699 81341b4 2ae57cb 9f2b0ed 308d699 9f2b0ed 308d699 2ae57cb 9f2b0ed 44441db 308d699 44441db 308d699 44441db 308d699 44441db 308d699 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
import os
import uuid
import json
import logging
from flask import Blueprint, request, jsonify, send_file, url_for, current_app
from flask_login import login_required, current_user
from backend.models.database import db, Job, Application
from backend.services.interview_engine import (
generate_first_question,
edge_tts_to_file_sync,
whisper_stt,
evaluate_answer
)
interview_api = Blueprint("interview_api", __name__)
@interview_api.route("/start_interview", methods=["POST"])
@login_required
def start_interview():
"""
Start a new interview. Generates the first question based on the user's
resume/profile and the selected job. Always returns a JSON payload
containing the question text and, if available, a URL to an audio
rendition of the question.
"""
try:
data = request.get_json() or {}
job_id = data.get("job_id")
# Validate the job and the user's application
job = Job.query.get_or_404(job_id)
application = Application.query.filter_by(
user_id=current_user.id,
job_id=job_id
).first()
if not application or not application.extracted_features:
return jsonify({"error": "No application/profile data found."}), 400
# Parse the candidate's profile
try:
profile = json.loads(application.extracted_features)
except Exception as e:
logging.error(f"Invalid profile JSON: {e}")
return jsonify({"error": "Invalid profile JSON"}), 500
# Generate the first question using the LLM
question = generate_first_question(profile, job)
if not question:
question = "Tell me about yourself and why you're interested in this position."
# Attempt to generate a TTS audio file for the question
audio_url = None
try:
audio_dir = "/tmp/audio"
os.makedirs(audio_dir, exist_ok=True)
filename = f"q_{uuid.uuid4().hex}.wav"
audio_path = os.path.join(audio_dir, filename)
audio_result = edge_tts_to_file_sync(question, audio_path)
if audio_result and os.path.exists(audio_path) and os.path.getsize(audio_path) > 1000:
audio_url = url_for("interview_api.get_audio", filename=filename)
logging.info(f"Audio generated successfully: {audio_url}")
else:
logging.warning("Audio generation failed or file too small")
except Exception as e:
logging.error(f"Error generating TTS audio: {e}")
audio_url = None
return jsonify({
"question": question,
"audio_url": audio_url
})
except Exception as e:
logging.error(f"Error in start_interview: {e}")
return jsonify({"error": "Internal server error"}), 500
@interview_api.route("/transcribe_audio", methods=["POST"])
@login_required
def transcribe_audio():
"""Transcribe uploaded audio with better error handling"""
try:
audio_file = request.files.get("audio")
if not audio_file:
return jsonify({"error": "No audio file received."}), 400
# Check if file has content
audio_file.seek(0, 2) # Seek to end
file_size = audio_file.tell()
audio_file.seek(0) # Seek back to start
if file_size == 0:
logging.error("Received empty audio file")
return jsonify({"error": "Empty audio file received."}), 400
logging.info(f"Received audio file: {file_size} bytes")
# Use /tmp directory which is writable in Hugging Face Spaces
temp_dir = "/tmp/interview_temp"
os.makedirs(temp_dir, exist_ok=True)
# Keep original extension for better compatibility
original_filename = audio_file.filename or "recording.webm"
file_extension = os.path.splitext(original_filename)[1] or ".webm"
filename = f"user_audio_{uuid.uuid4().hex}{file_extension}"
path = os.path.join(temp_dir, filename)
# Save the file
audio_file.save(path)
# Verify file was saved
if not os.path.exists(path) or os.path.getsize(path) == 0:
logging.error(f"Failed to save audio file or file is empty: {path}")
return jsonify({"error": "Failed to save audio file."}), 500
logging.info(f"Audio file saved: {path} ({os.path.getsize(path)} bytes)")
# Transcribe the audio
transcript = whisper_stt(path)
# Clean up
try:
os.remove(path)
except Exception as e:
logging.warning(f"Could not remove temp file {path}: {e}")
if not transcript or not transcript.strip():
return jsonify({"error": "No speech detected in audio. Please try again."}), 400
return jsonify({"transcript": transcript})
except Exception as e:
logging.error(f"Error in transcribe_audio: {e}")
return jsonify({"error": "Error processing audio. Please try again."}), 500
@interview_api.route("/process_answer", methods=["POST"])
@login_required
def process_answer():
"""
Process a user's answer and return a follow‑up question along with an
evaluation. Always responds with JSON.
"""
try:
data = request.get_json() or {}
answer = data.get("answer", "").strip()
question_idx = data.get("questionIndex", 0)
if not answer:
return jsonify({"error": "No answer provided."}), 400
# Get the current question for evaluation context
current_question = data.get("current_question", "Tell me about yourself")
# Evaluate the answer
evaluation_result = evaluate_answer(current_question, answer)
# Determine completion (3 questions in total, zero‑based index)
is_complete = question_idx >= 2
next_question_text = None
audio_url = None
if not is_complete:
# Generate next question based on question index
if question_idx == 0:
next_question_text = "Can you describe a challenging project you've worked on and how you overcame the difficulties?"
elif question_idx == 1:
next_question_text = "What are your career goals and how does this position align with them?"
else:
next_question_text = "Do you have any questions about the role or our company?"
# Try to generate audio for the next question
try:
audio_dir = "/tmp/audio"
os.makedirs(audio_dir, exist_ok=True)
filename = f"q_{uuid.uuid4().hex}.wav"
audio_path = os.path.join(audio_dir, filename)
audio_result = edge_tts_to_file_sync(next_question_text, audio_path)
if audio_result and os.path.exists(audio_path) and os.path.getsize(audio_path) > 1000:
audio_url = url_for("interview_api.get_audio", filename=filename)
logging.info(f"Next question audio generated: {audio_url}")
except Exception as e:
logging.error(f"Error generating next question audio: {e}")
audio_url = None
return jsonify({
"success": True,
"next_question": next_question_text,
"audio_url": audio_url,
"evaluation": evaluation_result,
"is_complete": is_complete
})
except Exception as e:
logging.error(f"Error in process_answer: {e}")
return jsonify({"error": "Error processing answer. Please try again."}), 500
@interview_api.route("/audio/<string:filename>", methods=["GET"])
@login_required
def get_audio(filename: str):
"""Serve previously generated TTS audio from the /tmp/audio directory."""
try:
# Sanitize filename to prevent directory traversal
safe_name = os.path.basename(filename)
if not safe_name.endswith('.wav'):
return jsonify({"error": "Invalid audio file format."}), 400
audio_path = os.path.join("/tmp/audio", safe_name)
if not os.path.exists(audio_path):
logging.warning(f"Audio file not found: {audio_path}")
return jsonify({"error": "Audio file not found."}), 404
if os.path.getsize(audio_path) == 0:
logging.warning(f"Audio file is empty: {audio_path}")
return jsonify({"error": "Audio file is empty."}), 404
return send_file(
audio_path,
mimetype="audio/wav",
as_attachment=False,
conditional=True # Enable range requests for better audio streaming
)
except Exception as e:
logging.error(f"Error serving audio file {filename}: {e}")
return jsonify({"error": "Error serving audio file."}), 500 |