Spaces:
Paused
Paused
File size: 8,115 Bytes
9f2b0ed 308d699 8e4e001 9f2b0ed 2ae57cb 9f2b0ed 2ae57cb 9f2b0ed 44441db 308d699 44441db 308d699 81341b4 330157f 9f2b0ed 330157f 9f2b0ed 330157f 308d699 330157f 2ae57cb 9f2b0ed 44441db 308d699 44441db 308d699 44441db 308d699 44441db 308d699 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
import os
import uuid
import json
import logging
from flask import Blueprint, request, jsonify, send_file, url_for, current_app
from flask_login import login_required, current_user
from backend.models.database import db, Job, Application
from backend.services.interview_engine import (
generate_first_question,
edge_tts_to_file_sync,
whisper_stt,
evaluate_answer
)
interview_api = Blueprint("interview_api", __name__)
@interview_api.route("/start_interview", methods=["POST"])
@login_required
def start_interview():
"""
Start a new interview. Generates the first question based on the user's
resume/profile and the selected job. Always returns a JSON payload
containing the question text and, if available, a URL to an audio
rendition of the question.
"""
try:
data = request.get_json() or {}
job_id = data.get("job_id")
# Validate the job and the user's application
job = Job.query.get_or_404(job_id)
application = Application.query.filter_by(
user_id=current_user.id,
job_id=job_id
).first()
if not application or not application.extracted_features:
return jsonify({"error": "No application/profile data found."}), 400
# Parse the candidate's profile
try:
profile = json.loads(application.extracted_features)
except Exception as e:
logging.error(f"Invalid profile JSON: {e}")
return jsonify({"error": "Invalid profile JSON"}), 500
# Generate the first question using the LLM
question = generate_first_question(profile, job)
if not question:
question = "Tell me about yourself and why you're interested in this position."
# Attempt to generate a TTS audio file for the question
audio_url = None
try:
audio_dir = "/tmp/audio"
os.makedirs(audio_dir, exist_ok=True)
filename = f"q_{uuid.uuid4().hex}.wav"
audio_path = os.path.join(audio_dir, filename)
audio_result = edge_tts_to_file_sync(question, audio_path)
if audio_result and os.path.exists(audio_path) and os.path.getsize(audio_path) > 1000:
audio_url = url_for("interview_api.get_audio", filename=filename)
logging.info(f"Audio generated successfully: {audio_url}")
else:
logging.warning("Audio generation failed or file too small")
except Exception as e:
logging.error(f"Error generating TTS audio: {e}")
audio_url = None
return jsonify({
"question": question,
"audio_url": audio_url
})
except Exception as e:
logging.error(f"Error in start_interview: {e}")
return jsonify({"error": "Internal server error"}), 500
import subprocess
@interview_api.route("/transcribe_audio", methods=["POST"])
@login_required
def transcribe_audio():
"""Transcribe uploaded .webm audio using ffmpeg conversion and Faster-Whisper"""
audio_file = request.files.get("audio")
if not audio_file:
return jsonify({"error": "No audio file received."}), 400
temp_dir = "/tmp/interview_temp"
os.makedirs(temp_dir, exist_ok=True)
original_path = os.path.join(temp_dir, f"user_audio_{uuid.uuid4().hex}.webm")
wav_path = original_path.replace(".webm", ".wav")
audio_file.save(original_path)
# Convert to WAV using ffmpeg
try:
subprocess.run(
["ffmpeg", "-y", "-i", original_path, wav_path],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
except Exception as e:
logging.error(f"FFmpeg conversion failed: {e}")
return jsonify({"error": "Failed to convert audio"}), 500
# Transcribe
transcript = whisper_stt(wav_path)
# Cleanup
try:
os.remove(original_path)
os.remove(wav_path)
except:
pass
if not transcript or not transcript.strip():
return jsonify({"error": "No speech detected in audio. Please try again."}), 400
return jsonify({"transcript": transcript})
@interview_api.route("/process_answer", methods=["POST"])
@login_required
def process_answer():
"""
Process a user's answer and return a follow‑up question along with an
evaluation. Always responds with JSON.
"""
try:
data = request.get_json() or {}
answer = data.get("answer", "").strip()
question_idx = data.get("questionIndex", 0)
if not answer:
return jsonify({"error": "No answer provided."}), 400
# Get the current question for evaluation context
current_question = data.get("current_question", "Tell me about yourself")
# Evaluate the answer
evaluation_result = evaluate_answer(current_question, answer)
# Determine completion (3 questions in total, zero‑based index)
is_complete = question_idx >= 2
next_question_text = None
audio_url = None
if not is_complete:
# Generate next question based on question index
if question_idx == 0:
next_question_text = "Can you describe a challenging project you've worked on and how you overcame the difficulties?"
elif question_idx == 1:
next_question_text = "What are your career goals and how does this position align with them?"
else:
next_question_text = "Do you have any questions about the role or our company?"
# Try to generate audio for the next question
try:
audio_dir = "/tmp/audio"
os.makedirs(audio_dir, exist_ok=True)
filename = f"q_{uuid.uuid4().hex}.wav"
audio_path = os.path.join(audio_dir, filename)
audio_result = edge_tts_to_file_sync(next_question_text, audio_path)
if audio_result and os.path.exists(audio_path) and os.path.getsize(audio_path) > 1000:
audio_url = url_for("interview_api.get_audio", filename=filename)
logging.info(f"Next question audio generated: {audio_url}")
except Exception as e:
logging.error(f"Error generating next question audio: {e}")
audio_url = None
return jsonify({
"success": True,
"next_question": next_question_text,
"audio_url": audio_url,
"evaluation": evaluation_result,
"is_complete": is_complete
})
except Exception as e:
logging.error(f"Error in process_answer: {e}")
return jsonify({"error": "Error processing answer. Please try again."}), 500
@interview_api.route("/audio/<string:filename>", methods=["GET"])
@login_required
def get_audio(filename: str):
"""Serve previously generated TTS audio from the /tmp/audio directory."""
try:
# Sanitize filename to prevent directory traversal
safe_name = os.path.basename(filename)
if not safe_name.endswith('.wav'):
return jsonify({"error": "Invalid audio file format."}), 400
audio_path = os.path.join("/tmp/audio", safe_name)
if not os.path.exists(audio_path):
logging.warning(f"Audio file not found: {audio_path}")
return jsonify({"error": "Audio file not found."}), 404
if os.path.getsize(audio_path) == 0:
logging.warning(f"Audio file is empty: {audio_path}")
return jsonify({"error": "Audio file is empty."}), 404
return send_file(
audio_path,
mimetype="audio/wav",
as_attachment=False,
conditional=True # Enable range requests for better audio streaming
)
except Exception as e:
logging.error(f"Error serving audio file {filename}: {e}")
return jsonify({"error": "Error serving audio file."}), 500 |