Spaces:
Paused
Paused
import os | |
import uuid | |
import json | |
from flask import Blueprint, request, jsonify, send_file, url_for, current_app | |
from flask_login import login_required, current_user | |
from backend.models.database import db, Job, Application | |
from backend.services.interview_engine import ( | |
generate_first_question, | |
edge_tts_to_file_sync, | |
whisper_stt, | |
evaluate_answer | |
) | |
interview_api = Blueprint("interview_api", __name__) | |
def start_interview(): | |
""" | |
Start a new interview. Generates the first question based on the user's | |
resume/profile and the selected job. Always returns a JSON payload | |
containing the question text and, if available, a URL to an audio | |
rendition of the question. | |
Previously this endpoint returned a raw audio file when TTS generation | |
succeeded. This prevented the client from displaying the actual question | |
and forced it to fall back to a hard‑coded default. By always returning | |
structured JSON we ensure the UI can show the generated question and | |
optionally play the associated audio. | |
""" | |
data = request.get_json() or {} | |
job_id = data.get("job_id") | |
# Validate the job and the user's application | |
job = Job.query.get_or_404(job_id) | |
application = Application.query.filter_by( | |
user_id=current_user.id, | |
job_id=job_id | |
).first() | |
if not application or not application.extracted_features: | |
return jsonify({"error": "No application/profile data found."}), 400 | |
# Parse the candidate's profile | |
try: | |
profile = json.loads(application.extracted_features) | |
except Exception: | |
return jsonify({"error": "Invalid profile JSON"}), 500 | |
# Generate the first question using the LLM | |
question = generate_first_question(profile, job) | |
# Attempt to generate a TTS audio file for the question. If successful | |
# we'll return a URL that the client can call to retrieve it; otherwise | |
# audio_url remains None. | |
audio_url = None | |
try: | |
audio_dir = "/tmp/audio" | |
os.makedirs(audio_dir, exist_ok=True) | |
filename = f"q_{uuid.uuid4().hex}.wav" | |
audio_path = os.path.join(audio_dir, filename) | |
audio_out = edge_tts_to_file_sync(question, audio_path) | |
if audio_out and os.path.exists(audio_path): | |
audio_url = url_for("interview_api.get_audio", filename=filename) | |
except Exception: | |
audio_url = None | |
return jsonify({ | |
"question": question, | |
"audio_url": audio_url | |
}) | |
def transcribe_audio(): | |
audio_file = request.files.get("audio") | |
if not audio_file: | |
return jsonify({"error": "No audio file received."}), 400 | |
# Use /tmp directory which is writable in Hugging Face Spaces | |
temp_dir = "/tmp/interview_temp" | |
os.makedirs(temp_dir, exist_ok=True) | |
filename = f"user_audio_{uuid.uuid4().hex}.wav" | |
path = os.path.join(temp_dir, filename) | |
audio_file.save(path) | |
transcript = whisper_stt(path) | |
# Clean up | |
try: | |
os.remove(path) | |
except: | |
pass | |
return jsonify({"transcript": transcript}) | |
def process_answer(): | |
""" | |
Process a user's answer and return a follow‑up question along with an | |
evaluation. Always responds with JSON containing: | |
- success: boolean indicating the operation succeeded | |
- next_question: the text of the next question | |
- audio_url: optional URL to the TTS audio for the next question | |
- evaluation: a dict with a score and feedback | |
- is_complete: boolean indicating if the interview is finished | |
Returning JSON even when audio generation succeeds simplifies client | |
handling and prevents errors when parsing the response. | |
""" | |
data = request.get_json() or {} | |
answer = data.get("answer", "") | |
question_idx = data.get("questionIndex", 0) | |
# Construct the next question. In a full implementation this would | |
# depend on the user's answer and job description. | |
next_question_text = f"Follow‑up question {question_idx + 2}: Can you elaborate on your experience with relevant technologies?" | |
# Stubbed evaluation of the answer. Replace with a call to evaluate_answer() | |
evaluation_result = { | |
"score": "medium", | |
"feedback": "Good answer, but be more specific." | |
} | |
# Determine completion (3 questions in total, zero‑based index) | |
is_complete = question_idx >= 2 | |
# Try to generate audio for the next question | |
audio_url = None | |
try: | |
audio_dir = "/tmp/audio" | |
os.makedirs(audio_dir, exist_ok=True) | |
filename = f"q_{uuid.uuid4().hex}.wav" | |
audio_path = os.path.join(audio_dir, filename) | |
audio_out = edge_tts_to_file_sync(next_question_text, audio_path) | |
if audio_out and os.path.exists(audio_path): | |
audio_url = url_for("interview_api.get_audio", filename=filename) | |
except Exception: | |
audio_url = None | |
return jsonify({ | |
"success": True, | |
"next_question": next_question_text, | |
"audio_url": audio_url, | |
"evaluation": evaluation_result, | |
"is_complete": is_complete | |
}) | |
def get_audio(filename: str): | |
"""Serve previously generated TTS audio from the /tmp/audio directory.""" | |
safe_name = os.path.basename(filename) | |
audio_path = os.path.join("/tmp/audio", safe_name) | |
if not os.path.exists(audio_path): | |
return jsonify({"error": "Audio file not found."}), 404 | |
return send_file(audio_path, mimetype="audio/wav", as_attachment=False) |