Spaces:
Paused
Paused
import os | |
import uuid | |
import json | |
import logging | |
from flask import Blueprint, request, jsonify, send_file, url_for, current_app | |
from flask_login import login_required, current_user | |
from backend.models.database import db, Job, Application | |
from backend.services.interview_engine import ( | |
generate_first_question, | |
edge_tts_to_file_sync, | |
whisper_stt, | |
evaluate_answer | |
) | |
# Additional imports for report generation | |
from backend.models.database import Application | |
from backend.services.report_generator import generate_llm_interview_report, create_pdf_report | |
from flask import abort | |
interview_api = Blueprint("interview_api", __name__) | |
def start_interview(): | |
""" | |
Start a new interview. Generates the first question based on the user's | |
resume/profile and the selected job. Always returns a JSON payload | |
containing the question text and, if available, a URL to an audio | |
rendition of the question. | |
""" | |
try: | |
data = request.get_json() or {} | |
job_id = data.get("job_id") | |
# Validate the job and the user's application | |
job = Job.query.get_or_404(job_id) | |
application = Application.query.filter_by( | |
user_id=current_user.id, | |
job_id=job_id | |
).first() | |
if not application or not application.extracted_features: | |
return jsonify({"error": "No application/profile data found."}), 400 | |
# Parse the candidate's profile | |
try: | |
profile = json.loads(application.extracted_features) | |
except Exception as e: | |
logging.error(f"Invalid profile JSON: {e}") | |
return jsonify({"error": "Invalid profile JSON"}), 500 | |
# Generate the first question using the LLM | |
question = generate_first_question(profile, job) | |
if not question: | |
question = "Tell me about yourself and why you're interested in this position." | |
# Attempt to generate a TTS audio file for the question | |
audio_url = None | |
try: | |
audio_dir = "/tmp/audio" | |
os.makedirs(audio_dir, exist_ok=True) | |
filename = f"q_{uuid.uuid4().hex}.wav" | |
audio_path = os.path.join(audio_dir, filename) | |
audio_result = edge_tts_to_file_sync(question, audio_path) | |
if audio_result and os.path.exists(audio_path) and os.path.getsize(audio_path) > 1000: | |
audio_url = url_for("interview_api.get_audio", filename=filename) | |
logging.info(f"Audio generated successfully: {audio_url}") | |
else: | |
logging.warning("Audio generation failed or file too small") | |
except Exception as e: | |
logging.error(f"Error generating TTS audio: {e}") | |
audio_url = None | |
return jsonify({ | |
"question": question, | |
"audio_url": audio_url | |
}) | |
except Exception as e: | |
logging.error(f"Error in start_interview: {e}") | |
return jsonify({"error": "Internal server error"}), 500 | |
import subprocess | |
def transcribe_audio(): | |
"""Transcribe uploaded .webm audio using ffmpeg conversion and Faster-Whisper""" | |
audio_file = request.files.get("audio") | |
if not audio_file: | |
return jsonify({"error": "No audio file received."}), 400 | |
temp_dir = "/tmp/interview_temp" | |
os.makedirs(temp_dir, exist_ok=True) | |
original_path = os.path.join(temp_dir, f"user_audio_{uuid.uuid4().hex}.webm") | |
wav_path = original_path.replace(".webm", ".wav") | |
audio_file.save(original_path) | |
# Convert to WAV using ffmpeg | |
try: | |
subprocess.run( | |
["ffmpeg", "-y", "-i", original_path, wav_path], | |
stdout=subprocess.DEVNULL, | |
stderr=subprocess.DEVNULL | |
) | |
except Exception as e: | |
logging.error(f"FFmpeg conversion failed: {e}") | |
return jsonify({"error": "Failed to convert audio"}), 500 | |
# Transcribe | |
transcript = whisper_stt(wav_path) | |
# Cleanup | |
try: | |
os.remove(original_path) | |
os.remove(wav_path) | |
except: | |
pass | |
if not transcript or not transcript.strip(): | |
return jsonify({"error": "No speech detected in audio. Please try again."}), 400 | |
return jsonify({"transcript": transcript}) | |
# ---------------------------------------------------------------------------- | |
# Interview report download | |
# | |
# Recruiters can download a PDF summarising a candidate's interview performance. | |
# This route performs several checks: it verifies that the current user has | |
# recruiter or admin privileges, ensures that the requested application exists | |
# and belongs to one of the recruiter's jobs, generates a textual report via | |
# the ``generate_llm_interview_report`` helper, converts it into a PDF, and | |
# finally sends the PDF as a file attachment. The heavy lifting is | |
# encapsulated in ``services/report_generator.py`` to keep this route | |
# lightweight. | |
def download_report(application_id: int): | |
"""Generate and return a PDF report for a candidate's interview. | |
The ``application_id`` corresponds to the ID of the Application record | |
representing a candidate's job application. Only recruiters (or admins) | |
associated with the job are permitted to access this report. | |
""" | |
# Fetch the application or return 404 if not found | |
application = Application.query.get_or_404(application_id) | |
# Authorisation: ensure the current user is a recruiter or admin | |
if current_user.role not in ('recruiter', 'admin'): | |
# 403 Forbidden if the user lacks permissions | |
return abort(403) | |
# Further check that the recruiter owns the job unless admin | |
job = getattr(application, 'job', None) | |
if job is None: | |
return abort(404) | |
if current_user.role != 'admin' and job.recruiter_id != current_user.id: | |
return abort(403) | |
try: | |
# Generate the textual report using the helper function. At this | |
# stage, interview answers and evaluations are not stored server‑side, | |
# so the report focuses on the candidate's application data and | |
# computed skill match. Should answer/score data be persisted in | |
# future iterations, ``generate_llm_interview_report`` can be | |
# extended accordingly without touching this route. | |
report_text = generate_llm_interview_report(application) | |
# Convert the text to a PDF. The helper returns a BytesIO buffer | |
# ready for sending via Flask's ``send_file``. Matplotlib is used | |
# under the hood to avoid heavy dependencies like reportlab. | |
pdf_buffer = create_pdf_report(report_text) | |
pdf_buffer.seek(0) | |
filename = f"interview_report_{application.id}.pdf" | |
return send_file( | |
pdf_buffer, | |
download_name=filename, | |
as_attachment=True, | |
mimetype='application/pdf' | |
) | |
except Exception as exc: | |
# Log the error for debugging; return a 500 to the client | |
logging.error(f"Error generating report for application {application_id}: {exc}") | |
return jsonify({"error": "Failed to generate report"}), 500 | |
def process_answer(): | |
""" | |
Process a user's answer and return a follow‑up question along with an | |
evaluation. Always responds with JSON. | |
""" | |
try: | |
data = request.get_json() or {} | |
answer = data.get("answer", "").strip() | |
question_idx = data.get("questionIndex", 0) | |
# ``job_id`` is required to determine how many total questions are | |
# expected for this interview. Without it we fall back to a | |
# three‑question interview. | |
job_id = data.get("job_id") | |
if not answer: | |
return jsonify({"error": "No answer provided."}), 400 | |
# Get the current question for evaluation context | |
current_question = data.get("current_question", "Tell me about yourself") | |
# Evaluate the answer | |
evaluation_result = evaluate_answer(current_question, answer) | |
# Determine the number of questions configured for this job | |
total_questions = 3 | |
if job_id is not None: | |
try: | |
job = Job.query.get(int(job_id)) | |
if job and job.num_questions and job.num_questions > 0: | |
total_questions = job.num_questions | |
except Exception: | |
# If lookup fails, keep default | |
pass | |
# Check completion. ``question_idx`` is zero‑based; the last index | |
# corresponds to ``total_questions - 1``. When the current index | |
# reaches or exceeds this value, the interview is complete. | |
is_complete = question_idx >= (total_questions - 1) | |
next_question_text = None | |
audio_url = None | |
if not is_complete: | |
# Follow‑up question bank. These are used for indices 1 .. n‑2. | |
# The final question (last index) probes salary expectations and | |
# working preferences. If the recruiter has configured fewer | |
# questions than the number of entries here, only the first | |
# appropriate number will be used. | |
follow_up_questions = [ | |
"Can you describe a challenging project you've worked on and how you overcame the difficulties?", | |
"What is your favorite machine learning algorithm and why?", | |
"How do you stay up-to-date with advancements in AI?", | |
"Describe a time you had to learn a new technology quickly. How did you approach it?" | |
] | |
final_question = ( | |
"What are your salary expectations? Are you looking for a full-time or part-time role, " | |
"and do you prefer remote or on-site work?" | |
) | |
# Compute the next index (zero‑based) for the upcoming question | |
next_idx = question_idx + 1 | |
# Determine which question to ask next. If next_idx is the last | |
# question (i.e. equals total_questions - 1), use the final | |
# question. Otherwise, select a follow‑up question from the | |
# bank based on ``next_idx - 1`` (because index 0 is for the | |
# first follow‑up). If out of range, cycle through the list. | |
if next_idx == (total_questions - 1): | |
next_question_text = final_question | |
else: | |
if follow_up_questions: | |
idx_in_bank = (next_idx - 1) % len(follow_up_questions) | |
next_question_text = follow_up_questions[idx_in_bank] | |
else: | |
# Fallback if no follow‑ups are defined | |
next_question_text = "Do you have any questions about the role or our company?" | |
# Try to generate audio for the next question | |
try: | |
audio_dir = "/tmp/audio" | |
os.makedirs(audio_dir, exist_ok=True) | |
filename = f"q_{uuid.uuid4().hex}.wav" | |
audio_path = os.path.join(audio_dir, filename) | |
audio_result = edge_tts_to_file_sync(next_question_text, audio_path) | |
if audio_result and os.path.exists(audio_path) and os.path.getsize(audio_path) > 1000: | |
audio_url = url_for("interview_api.get_audio", filename=filename) | |
logging.info(f"Next question audio generated: {audio_url}") | |
except Exception as e: | |
logging.error(f"Error generating next question audio: {e}") | |
audio_url = None | |
return jsonify({ | |
"success": True, | |
"next_question": next_question_text, | |
"audio_url": audio_url, | |
"evaluation": evaluation_result, | |
"is_complete": is_complete | |
}) | |
except Exception as e: | |
logging.error(f"Error in process_answer: {e}") | |
return jsonify({"error": "Error processing answer. Please try again."}), 500 | |
def get_audio(filename: str): | |
"""Serve previously generated TTS audio from the /tmp/audio directory.""" | |
try: | |
# Sanitize filename to prevent directory traversal | |
safe_name = os.path.basename(filename) | |
if not safe_name.endswith('.wav'): | |
return jsonify({"error": "Invalid audio file format."}), 400 | |
audio_path = os.path.join("/tmp/audio", safe_name) | |
if not os.path.exists(audio_path): | |
logging.warning(f"Audio file not found: {audio_path}") | |
return jsonify({"error": "Audio file not found."}), 404 | |
if os.path.getsize(audio_path) == 0: | |
logging.warning(f"Audio file is empty: {audio_path}") | |
return jsonify({"error": "Audio file is empty."}), 404 | |
return send_file( | |
audio_path, | |
mimetype="audio/wav", | |
as_attachment=False, | |
conditional=True # Enable range requests for better audio streaming | |
) | |
except Exception as e: | |
logging.error(f"Error serving audio file {filename}: {e}") | |
return jsonify({"error": "Error serving audio file."}), 500 |