Spaces:
Paused
Paused
import os | |
import uuid | |
import json | |
from flask import Blueprint, request, jsonify, send_file, url_for, current_app | |
from flask_login import login_required, current_user | |
from backend.models.database import db, Job, Application | |
from backend.services.interview_engine import ( | |
generate_first_question, | |
edge_tts_to_file_sync, | |
whisper_stt, | |
evaluate_answer | |
) | |
interview_api = Blueprint("interview_api", __name__) | |
def start_interview(): | |
data = request.get_json() | |
job_id = data.get("job_id") | |
job = Job.query.get_or_404(job_id) | |
application = Application.query.filter_by( | |
user_id=current_user.id, | |
job_id=job_id | |
).first() | |
if not application or not application.extracted_features: | |
return jsonify({"error": "No application/profile data found."}), 400 | |
try: | |
profile = json.loads(application.extracted_features) | |
except: | |
return jsonify({"error": "Invalid profile JSON"}), 500 | |
question = generate_first_question(profile, job) | |
# Use /tmp directory which is writable in Hugging Face Spaces | |
audio_dir = "/tmp/audio" | |
os.makedirs(audio_dir, exist_ok=True) | |
audio_filename = f"q_{uuid.uuid4().hex}.wav" | |
audio_path = os.path.join(audio_dir, audio_filename) | |
# Generate audio synchronously. The function returns None on error. | |
audio_out = edge_tts_to_file_sync(question, audio_path) | |
if audio_out and os.path.exists(audio_path): | |
return send_file(audio_path, mimetype="audio/wav", as_attachment=False) | |
else: | |
# Fallback to JSON response if audio generation fails | |
return jsonify({"question": question}) | |
def transcribe_audio(): | |
audio_file = request.files.get("audio") | |
if not audio_file: | |
return jsonify({"error": "No audio file received."}), 400 | |
# Use /tmp directory which is writable in Hugging Face Spaces | |
temp_dir = "/tmp/interview_temp" | |
os.makedirs(temp_dir, exist_ok=True) | |
filename = f"user_audio_{uuid.uuid4().hex}.wav" | |
path = os.path.join(temp_dir, filename) | |
audio_file.save(path) | |
transcript = whisper_stt(path) | |
# Clean up | |
try: | |
os.remove(path) | |
except: | |
pass | |
return jsonify({"transcript": transcript}) | |
def process_answer(): | |
data = request.get_json() | |
answer = data.get("answer", "") | |
question_idx = data.get("questionIndex", 0) | |
# Generate next question (simplified for now). In a full implementation this | |
# would call a model such as groq_llm to generate a follow‑up question based | |
# on the candidate's answer. | |
next_question = f"Follow‑up question {question_idx + 2}: Can you elaborate on your experience with relevant technologies?" | |
# Use /tmp directory for audio files | |
audio_dir = "/tmp/audio" | |
os.makedirs(audio_dir, exist_ok=True) | |
audio_filename = f"q_{uuid.uuid4().hex}.wav" | |
audio_path = os.path.join(audio_dir, audio_filename) | |
# Attempt to generate speech for the next question. If audio generation | |
# fails, ``audio_out`` will be None and we return JSON response instead. | |
audio_out = edge_tts_to_file_sync(next_question, audio_path) | |
if audio_out and os.path.exists(audio_path): | |
return send_file(audio_path, mimetype="audio/wav", as_attachment=False) | |
else: | |
# Fallback to JSON response | |
response = { | |
"success": True, | |
"nextQuestion": next_question, | |
"evaluation": { | |
"score": "medium", | |
"feedback": "Good answer, but be more specific." | |
}, | |
"isComplete": question_idx >= 2, | |
"summary": [] | |
} | |
return jsonify(response) |