Spaces:
Runtime error
Runtime error
import os | |
import json | |
import logging | |
import tempfile | |
import re | |
from typing import Dict, Any, Optional, List, Tuple | |
from pathlib import Path | |
from dotenv import load_dotenv | |
from flask import Flask, request, jsonify, render_template, send_file | |
from flask_cors import CORS | |
import google.generativeai as genai | |
from groq import Groq | |
import pandas as pd | |
from datetime import datetime | |
import io | |
import cv2 | |
import tensorflow as tf | |
from tensorflow.keras.models import load_model | |
from tensorflow.keras.utils import img_to_array | |
from moviepy.editor import VideoFileClip | |
import concurrent.futures | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
import numpy as np | |
# Configure logger first before using it elsewhere | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler('app.log') | |
] | |
) | |
logger = logging.getLogger(__name__) | |
# Suppress TensorFlow warnings | |
tf.get_logger().setLevel('ERROR') | |
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' | |
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0' | |
# Load environment variables | |
load_dotenv() | |
# Configure Flask app | |
app = Flask(__name__) | |
app.config['TEMPLATES_AUTO_RELOAD'] = False | |
app.config['MAX_CONTENT_LENGTH'] = 500 * 1024 * 1024 # 500MB max file size | |
# Configure API keys with validation | |
GROQ_API_KEY = os.getenv('GROQ_API_KEY') | |
GEMINI_API_KEY = os.getenv('GEMINI_API_KEY') | |
if not GROQ_API_KEY: | |
logger.error("GROQ_API_KEY environment variable not set") | |
raise ValueError("GROQ_API_KEY environment variable must be set") | |
if not GEMINI_API_KEY: | |
logger.error("GEMINI_API_KEY environment variable not set") | |
raise ValueError("GEMINI_API_KEY environment variable must be set") | |
# Initialize clients with proper configuration and error handling | |
try: | |
# Initialize Groq client with basic configuration | |
from groq._base_client import SyncHttpxClientWrapper | |
import httpx | |
# Create a simple httpx client | |
http_client = SyncHttpxClientWrapper( | |
base_url="https://api.groq.com/v1", | |
timeout=httpx.Timeout(60.0) | |
) | |
# Initialize Groq client | |
groq_client = Groq( | |
api_key=GROQ_API_KEY, | |
http_client=http_client | |
) | |
# Initialize Gemini client | |
genai.configure(api_key=GEMINI_API_KEY) | |
MODEL_NAME = "gemini-1.5-flash" | |
logger.info("API clients initialized successfully") | |
except Exception as e: | |
logger.error(f"Error initializing API clients: {str(e)}") | |
raise | |
# Emotion Detection Setup | |
MODEL_PATH = os.path.join(os.path.dirname(__file__), 'model.h5') | |
HAARCASCADE_PATH = os.path.join(os.path.dirname(__file__), 'haarcascade_frontalface_default.xml') | |
# Load models with optimized settings | |
try: | |
# Configure TensorFlow for optimal CPU performance | |
physical_devices = tf.config.list_physical_devices('CPU') | |
if physical_devices: | |
try: | |
# Limit memory growth to prevent OOM errors | |
tf.config.experimental.set_memory_growth(physical_devices[0], True) | |
except: | |
# Not all devices support memory growth | |
pass | |
tf.config.threading.set_inter_op_parallelism_threads(4) | |
tf.config.threading.set_intra_op_parallelism_threads(4) | |
# Load emotion model with optimized settings | |
model = load_model(MODEL_PATH, compile=False) | |
model.compile( | |
optimizer='adam', | |
loss='categorical_crossentropy', | |
metrics=['accuracy'], | |
run_eagerly=False | |
) | |
# Load face cascade | |
face_cascade = cv2.CascadeClassifier(HAARCASCADE_PATH) | |
if face_cascade.empty(): | |
raise Exception("Error: Haar Cascade file could not be loaded") | |
logger.info("Successfully loaded model and face cascade") | |
except Exception as e: | |
logger.error(f"Error loading model or face cascade: {str(e)}") | |
model = None | |
face_cascade = None | |
EMOTIONS = ['Angry', 'Disgust', 'Fear', 'Happy', 'Neutral', 'Sad', 'Surprise'] | |
# Video processing configuration | |
VIDEO_CHUNK_SIZE = 1024 * 1024 # 1MB chunks for video processing | |
MAX_VIDEO_DURATION = 120 # Maximum video duration in minutes | |
FRAME_SAMPLE_RATE = 5 # Process every 5th frame for long videos | |
def extract_json(text: str) -> Optional[str]: | |
"""Extract JSON from response text.""" | |
try: | |
json_match = re.search(r'\{.*\}', text, re.DOTALL) | |
if json_match: | |
return json_match.group(0) | |
return None | |
except Exception as e: | |
logger.error(f"Error extracting JSON: {str(e)}") | |
return None | |
def extract_audio_from_video(video_path: str) -> Optional[str]: | |
"""Extract audio from video file with optimized processing.""" | |
try: | |
temp_audio_path = video_path.replace('.mp4', '.mp3') | |
# Load video clip with optimized settings | |
video_clip = VideoFileClip( | |
video_path, | |
audio_buffersize=200000, | |
verbose=False, | |
audio_fps=44100 | |
) | |
if video_clip.audio is None: | |
logger.warning("Video has no audio track") | |
return None | |
# Extract audio with optimized settings | |
video_clip.audio.write_audiofile( | |
temp_audio_path, | |
buffersize=2000, | |
verbose=False, | |
logger=None | |
) | |
video_clip.close() | |
logger.info(f"Successfully extracted audio to {temp_audio_path}") | |
return temp_audio_path | |
except Exception as e: | |
logger.error(f"Error extracting audio: {str(e)}") | |
return None | |
finally: | |
# Ensure video clip is closed even if an exception occurs | |
if 'video_clip' in locals() and video_clip is not None: | |
try: | |
video_clip.close() | |
except: | |
pass | |
def transcribe_audio(audio_path: str) -> Optional[str]: | |
"""Transcribe audio using Groq.""" | |
if not audio_path or not os.path.exists(audio_path): | |
logger.error(f"Audio file not found at {audio_path}") | |
return None | |
try: | |
# Transcribe audio | |
with open(audio_path, "rb") as file: | |
transcription = groq_client.audio.transcriptions.create( | |
file=(audio_path, file.read()), | |
model="whisper-large-v3-turbo", | |
response_format="json", | |
language="en", | |
temperature=0.0 | |
) | |
logger.info(f"Transcription successful: {transcription.text[:100]}...") | |
return transcription.text | |
except Exception as e: | |
logger.error(f"Transcription error: {str(e)}") | |
return None | |
def process_video_chunk(frame_chunk: List[np.ndarray], start_frame: int) -> Dict[str, Any]: | |
"""Process a chunk of video frames efficiently.""" | |
results = { | |
'emotion_counts': {emotion: 0 for emotion in EMOTIONS}, | |
'faces_detected': 0, | |
'frames_with_faces': 0, | |
'frames_processed': 0 | |
} | |
for frame_idx, frame in enumerate(frame_chunk): | |
try: | |
# Skip empty frames | |
if frame is None or frame.size == 0: | |
continue | |
# Resize frame for faster processing if too large | |
height, width = frame.shape[:2] | |
if width > 1280: | |
scale = 1280 / width | |
frame = cv2.resize(frame, None, fx=scale, fy=scale) | |
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
faces = face_cascade.detectMultiScale( | |
gray, | |
scaleFactor=1.1, | |
minNeighbors=5, | |
minSize=(30, 30), | |
flags=cv2.CASCADE_SCALE_IMAGE | |
) | |
results['frames_processed'] += 1 | |
if len(faces) > 0: | |
results['frames_with_faces'] += 1 | |
results['faces_detected'] += len(faces) | |
for (x, y, w, h) in faces: | |
# Add boundary checks | |
if y >= gray.shape[0] or x >= gray.shape[1] or y+h > gray.shape[0] or x+w > gray.shape[1]: | |
continue | |
roi = gray[y:y + h, x:x + w] | |
roi = cv2.resize(roi, (48, 48), interpolation=cv2.INTER_AREA) | |
if np.sum(roi) == 0: | |
continue | |
roi = roi.astype("float32") / 255.0 | |
roi = img_to_array(roi) | |
roi = np.expand_dims(roi, axis=0) | |
with tf.device('/CPU:0'): | |
preds = model.predict(roi, verbose=0)[0] | |
label = EMOTIONS[np.argmax(preds)] | |
results['emotion_counts'][label] += 1 | |
except Exception as e: | |
logger.error(f"Error processing frame {start_frame + frame_idx}: {str(e)}") | |
continue | |
return results | |
def analyze_video_emotions(video_path: str) -> Dict[str, Any]: | |
"""Analyze emotions in a video with optimized processing for large files.""" | |
if model is None or face_cascade is None: | |
logger.error("Model or face detector not properly loaded") | |
return { | |
'emotion_counts': {}, | |
'emotion_percentages': {}, | |
'total_faces': 0, | |
'frames_processed': 0, | |
'frames_with_faces': 0, | |
'error': 'Models not properly loaded' | |
} | |
cap = None | |
try: | |
# Open video and get properties | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
raise Exception("Failed to open video file") | |
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
fps = int(cap.get(cv2.CAP_PROP_FPS)) or 30 # Default to 30 if fps is 0 | |
duration = total_frames / max(fps, 1) / 60 # Duration in minutes, prevent division by zero | |
# Check video duration | |
if duration > MAX_VIDEO_DURATION: | |
raise Exception(f"Video duration exceeds maximum limit of {MAX_VIDEO_DURATION} minutes") | |
# Initialize results | |
combined_results = { | |
'emotion_counts': {emotion: 0 for emotion in EMOTIONS}, | |
'total_faces': 0, | |
'frames_processed': 0, | |
'frames_with_faces': 0, | |
'processing_stats': { | |
'total_video_frames': total_frames, | |
'video_fps': fps, | |
'video_duration_minutes': round(duration, 2) | |
} | |
} | |
# Process video in chunks using ThreadPoolExecutor | |
frame_buffer = [] | |
frame_count = 0 | |
chunk_size = 30 # Process 30 frames per chunk | |
with ThreadPoolExecutor(max_workers=min(4, os.cpu_count() or 4)) as executor: | |
future_to_chunk = {} | |
while True: | |
ret, frame = cap.read() | |
if not ret: | |
break | |
frame_count += 1 | |
if frame_count % FRAME_SAMPLE_RATE != 0: | |
continue | |
frame_buffer.append(frame) | |
if len(frame_buffer) >= chunk_size: | |
# Submit chunk for processing | |
future = executor.submit( | |
process_video_chunk, | |
frame_buffer.copy(), | |
frame_count - len(frame_buffer) | |
) | |
future_to_chunk[future] = len(frame_buffer) | |
frame_buffer = [] | |
# Process remaining frames | |
if frame_buffer: | |
future = executor.submit( | |
process_video_chunk, | |
frame_buffer, | |
frame_count - len(frame_buffer) | |
) | |
future_to_chunk[future] = len(frame_buffer) | |
# Collect results | |
for future in as_completed(future_to_chunk): | |
try: | |
chunk_results = future.result() | |
# Combine results | |
for emotion, count in chunk_results['emotion_counts'].items(): | |
combined_results['emotion_counts'][emotion] += count | |
combined_results['total_faces'] += chunk_results['faces_detected'] | |
combined_results['frames_processed'] += chunk_results['frames_processed'] | |
combined_results['frames_with_faces'] += chunk_results['frames_with_faces'] | |
except Exception as e: | |
logger.error(f"Error processing chunk: {str(e)}") | |
# Calculate percentages | |
total_emotions = sum(combined_results['emotion_counts'].values()) | |
combined_results['emotion_percentages'] = { | |
emotion: round((count / max(total_emotions, 1) * 100), 2) | |
for emotion, count in combined_results['emotion_counts'].items() | |
} | |
# Add processing statistics | |
combined_results['processing_stats'].update({ | |
'frames_sampled': combined_results['frames_processed'], | |
'sampling_rate': f'1/{FRAME_SAMPLE_RATE}', | |
'processing_complete': True | |
}) | |
return combined_results | |
except Exception as e: | |
logger.error(f"Error in emotion analysis: {str(e)}") | |
return { | |
'error': str(e), | |
'emotion_counts': {emotion: 0 for emotion in EMOTIONS}, | |
'emotion_percentages': {emotion: 0 for emotion in EMOTIONS}, | |
'total_faces': 0, | |
'frames_processed': 0, | |
'frames_with_faces': 0, | |
'processing_stats': { | |
'error_occurred': True, | |
'error_message': str(e) | |
} | |
} | |
finally: | |
if cap is not None: | |
cap.release() | |
def analyze_interview(conversation_text: str, role_applied: Optional[str] = None, tech_skills: Optional[List[str]] = None) -> Dict[str, Any]: | |
"""Analyze technical interview transcript.""" | |
if not conversation_text or len(conversation_text.strip()) < 50: | |
logger.warning("Transcript too short for meaningful analysis") | |
return create_default_assessment() | |
try: | |
model = genai.GenerativeModel(MODEL_NAME) | |
skills_context = "" | |
if tech_skills and len(tech_skills) > 0: | |
skills_context = f"Focus on evaluating these specific technical skills: {', '.join(tech_skills)}." | |
role_context = "" | |
if role_applied: | |
role_context = f"The candidate is being interviewed for the role of {role_applied}." | |
prompt = f""" | |
Based on the following technical interview transcript, analyze the candidate's responses and provide a structured assessment in *valid JSON format*. | |
{role_context} | |
{skills_context} | |
*JSON Format:* | |
{{ | |
"candidate_assessment": {{ | |
"technical_knowledge": {{ | |
"score": 0, // Score from 1-10 | |
"strengths": [], | |
"areas_for_improvement": [] | |
}}, | |
"problem_solving": {{ | |
"score": 0, // Score from 1-10 | |
"strengths": [], | |
"areas_for_improvement": [] | |
}}, | |
"communication": {{ | |
"score": 0, // Score from 1-10 | |
"strengths": [], | |
"areas_for_improvement": [] | |
}} | |
}}, | |
"question_analysis": [ | |
{{ | |
"question": "", | |
"answer_quality": "", // Excellent, Good, Average, Poor | |
"feedback": "" | |
}} | |
], | |
"overall_recommendation": "", // Hire, Strong Consider, Consider, Do Not Recommend | |
"overall_feedback": "" | |
}} | |
*Interview Transcript:* | |
{conversation_text} | |
*Output Strictly JSON. Do NOT add explanations or extra text.* | |
""" | |
# Set timeout and retry parameters | |
safety_settings = [ | |
{ | |
"category": "HARM_CATEGORY_HARASSMENT", | |
"threshold": "BLOCK_MEDIUM_AND_ABOVE" | |
}, | |
{ | |
"category": "HARM_CATEGORY_HATE_SPEECH", | |
"threshold": "BLOCK_MEDIUM_AND_ABOVE" | |
}, | |
{ | |
"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", | |
"threshold": "BLOCK_MEDIUM_AND_ABOVE" | |
}, | |
{ | |
"category": "HARM_CATEGORY_DANGEROUS_CONTENT", | |
"threshold": "BLOCK_MEDIUM_AND_ABOVE" | |
}, | |
] | |
generation_config = { | |
"temperature": 0.2, | |
"top_p": 0.95, | |
"top_k": 40, | |
"max_output_tokens": 8192, | |
} | |
# Try to generate response with retry mechanism | |
max_retries = 3 | |
for attempt in range(max_retries): | |
try: | |
response = model.generate_content( | |
prompt, | |
safety_settings=safety_settings, | |
generation_config=generation_config | |
) | |
raw_response = response.text | |
logger.info(f"Raw Gemini Response: {raw_response[:100]}...") | |
break | |
except Exception as e: | |
logger.warning(f"Attempt {attempt+1} failed: {str(e)}") | |
if attempt == max_retries - 1: # Last attempt | |
logger.error(f"All {max_retries} attempts failed") | |
return create_default_assessment() | |
json_text = extract_json(raw_response) | |
if json_text: | |
try: | |
assessment = json.loads(json_text) | |
# Ensure the response has all required fields | |
required_fields = { | |
'candidate_assessment': { | |
'technical_knowledge': ['score', 'strengths', 'areas_for_improvement'], | |
'problem_solving': ['score', 'strengths', 'areas_for_improvement'], | |
'communication': ['score', 'strengths', 'areas_for_improvement'] | |
}, | |
'question_analysis': ['question', 'answer_quality', 'feedback'], | |
'overall_recommendation': None, | |
'overall_feedback': None | |
} | |
# Validate and set defaults if needed | |
if 'candidate_assessment' not in assessment: | |
assessment['candidate_assessment'] = {} | |
for category in ['technical_knowledge', 'problem_solving', 'communication']: | |
if category not in assessment['candidate_assessment']: | |
assessment['candidate_assessment'][category] = { | |
'score': 5, | |
'strengths': ['Not enough information to assess.'], | |
'areas_for_improvement': ['Not enough information to assess.'] | |
} | |
else: | |
cat_data = assessment['candidate_assessment'][category] | |
for field in required_fields['candidate_assessment'][category]: | |
if field not in cat_data: | |
if field == 'score': | |
cat_data[field] = 5 | |
else: | |
cat_data[field] = ['Not enough information to assess.'] | |
if 'question_analysis' not in assessment or not assessment['question_analysis']: | |
assessment['question_analysis'] = [{ | |
'question': 'General Interview', | |
'answer_quality': 'Average', | |
'feedback': 'Not enough specific questions to analyze.' | |
}] | |
else: | |
for qa in assessment['question_analysis']: | |
for field in required_fields['question_analysis']: | |
if field not in qa: | |
qa[field] = 'Not available' | |
if 'overall_recommendation' not in assessment or not assessment['overall_recommendation']: | |
assessment['overall_recommendation'] = 'Consider' | |
if 'overall_feedback' not in assessment or not assessment['overall_feedback']: | |
assessment['overall_feedback'] = 'Not enough information to provide detailed feedback.' | |
return assessment | |
except json.JSONDecodeError as e: | |
logger.error(f"Error parsing JSON response: {str(e)}") | |
return create_default_assessment() | |
else: | |
logger.error("No valid JSON found in response") | |
return create_default_assessment() | |
except Exception as e: | |
logger.error(f"Interview analysis error: {str(e)}") | |
return create_default_assessment() | |
def create_default_assessment() -> Dict[str, Any]: | |
"""Create a default assessment when analysis fails.""" | |
return { | |
"candidate_assessment": { | |
"technical_knowledge": { | |
"score": 5, | |
"strengths": ["Unable to assess strengths from the provided transcript."], | |
"areas_for_improvement": ["Unable to assess areas for improvement from the provided transcript."] | |
}, | |
"problem_solving": { | |
"score": 5, | |
"strengths": ["Unable to assess strengths from the provided transcript."], | |
"areas_for_improvement": ["Unable to assess areas for improvement from the provided transcript."] | |
}, | |
"communication": { | |
"score": 5, | |
"strengths": ["Unable to assess strengths from the provided transcript."], | |
"areas_for_improvement": ["Unable to assess areas for improvement from the provided transcript."] | |
} | |
}, | |
"question_analysis": [{ | |
"question": "General Interview", | |
"answer_quality": "Average", | |
"feedback": "Unable to assess specific questions from the transcript." | |
}], | |
"overall_recommendation": "Consider", | |
"overall_feedback": "Unable to provide a detailed assessment based on the provided transcript." | |
} | |
def process_video_and_audio_parallel(video_path: str, role_applied: str = None, tech_skills: list = None) -> Tuple[Dict[str, Any], str, Dict[str, Any]]: | |
"""Process video and audio in parallel with optimized handling.""" | |
audio_path = None | |
emotion_results = None | |
transcript = None | |
interview_assessment = None | |
try: | |
with ThreadPoolExecutor(max_workers=min(3, os.cpu_count() or 2)) as executor: | |
# Submit emotions analysis task | |
emotion_future = executor.submit(analyze_video_emotions, video_path) | |
# Submit audio extraction task | |
audio_future = executor.submit(extract_audio_from_video, video_path) | |
# Wait for audio extraction to complete with timeout | |
try: | |
audio_path = audio_future.result(timeout=120) # 2 minutes timeout | |
except concurrent.futures.TimeoutError: | |
logger.error("Audio extraction timeout exceeded") | |
audio_path = None | |
# Continue with transcription if audio was extracted | |
transcript_future = None | |
if audio_path: | |
transcript_future = executor.submit(transcribe_audio, audio_path) | |
# Wait for emotion analysis with timeout | |
try: | |
emotion_results = emotion_future.result(timeout=300) # 5 minutes timeout | |
except concurrent.futures.TimeoutError: | |
logger.error("Emotion analysis timeout exceeded") | |
emotion_results = { | |
'error': 'Processing timeout exceeded', | |
'emotion_counts': {emotion: 0 for emotion in EMOTIONS}, | |
'emotion_percentages': {emotion: 0 for emotion in EMOTIONS}, | |
'total_faces': 0, | |
'frames_processed': 0, | |
'frames_with_faces': 0 | |
} | |
# Wait for transcription with timeout | |
if transcript_future: | |
try: | |
transcript = transcript_future.result(timeout=300) # 5 minutes timeout | |
except concurrent.futures.TimeoutError: | |
logger.error("Transcription timeout exceeded") | |
transcript = "Transcription failed due to timeout." | |
else: | |
transcript = "Audio extraction failed, no transcription available." | |
# Analyze interview content if transcript is available | |
if transcript and len(transcript) > 50: | |
interview_assessment = analyze_interview(transcript, role_applied, tech_skills) | |
else: | |
interview_assessment = create_default_assessment() | |
# Clean up audio file | |
if audio_path and os.path.exists(audio_path): | |
try: | |
os.unlink(audio_path) | |
except Exception as e: | |
logger.warning(f"Error cleaning up audio file: {str(e)}") | |
return emotion_results, transcript, interview_assessment | |
except Exception as e: | |
logger.error(f"Error in parallel processing: {str(e)}") | |
# Create default results if any component failed | |
if not emotion_results: | |
emotion_results = { | |
'error': str(e), | |
'emotion_counts': {emotion: 0 for emotion in EMOTIONS}, | |
'emotion_percentages': {emotion: 0 for emotion in EMOTIONS}, | |
'total_faces': 0, | |
'frames_processed': 0, | |
'frames_with_faces': 0 | |
} | |
if not transcript: | |
transcript = f"Error processing audio: {str(e)}" | |
if not interview_assessment: | |
interview_assessment = create_default_assessment() | |
# Clean up audio file if it exists | |
if audio_path and os.path.exists(audio_path): | |
try: | |
os.unlink(audio_path) | |
except: | |
pass | |
return emotion_results, transcript, interview_assessment | |
def index(): | |
"""Render the main page.""" | |
return render_template('index.html') | |
def test_endpoint(): | |
"""Test endpoint to verify server is running.""" | |
return jsonify({"status": "ok", "message": "Server is running"}), 200 | |
def analyze_interview_route(): | |
"""Main route for comprehensive interview analysis.""" | |
# Add CORS headers for preflight requests | |
if request.method == 'OPTIONS': | |
headers = { | |
'Access-Control-Allow-Origin': '*', | |
'Access-Control-Allow-Methods': 'POST, OPTIONS', | |
'Access-Control-Allow-Headers': 'Content-Type', | |
'Access-Control-Max-Age': '86400' # 24 hours | |
} | |
return ('', 204, headers) | |
try: | |
logger.info("Received analyze_interview request") | |
# Check for required file | |
if 'video' not in request.files: | |
logger.error("No video file in request") | |
return jsonify({"error": "Video file is required"}), 400 | |
video_file = request.files['video'] | |
if not video_file: | |
logger.error("Empty video file") | |
return jsonify({"error": "Empty video file"}), 400 | |
# Get additional form data | |
role_applied = request.form.get('role_applied', '') | |
tech_skills = request.form.get('tech_skills', '') | |
candidate_name = request.form.get('candidate_name', 'Candidate') | |
tech_skills_list = [skill.strip() for skill in tech_skills.split(',')] if tech_skills else [] | |
# Create temporary video file | |
try: | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as video_temp: | |
video_file.save(video_temp.name) | |
video_temp_path = video_temp.name | |
logger.info(f"Video saved to temporary file: {video_temp_path}") | |
except Exception as e: | |
logger.error(f"Error saving video file: {str(e)}") | |
return jsonify({"error": f"Failed to save video file: {str(e)}"}), 500 | |
# Process video and audio in parallel | |
try: | |
emotion_analysis, transcript, interview_assessment = process_video_and_audio_parallel( | |
video_temp_path, role_applied, tech_skills_list | |
) | |
except Exception as e: | |
logger.error(f"Error during parallel processing: {str(e)}") | |
return jsonify({"error": str(e)}), 500 | |
# Combine results | |
combined_results = { | |
"candidate_assessment": interview_assessment["candidate_assessment"], | |
"question_analysis": interview_assessment["question_analysis"], | |
"overall_recommendation": interview_assessment["overall_recommendation"], | |
"overall_feedback": interview_assessment["overall_feedback"], | |
"transcription": transcript, | |
"candidate_name": candidate_name, | |
"role_applied": role_applied, | |
"interview_date": datetime.now().strftime('%Y-%m-%d'), | |
"emotion_analysis": emotion_analysis | |
} | |
logger.info("Combined results created successfully") | |
logger.debug(f"Response data: {json.dumps(combined_results, indent=2)}") | |
# Clean up temporary video file | |
try: | |
os.unlink(video_temp_path) | |
logger.info("Temporary files cleaned up") | |
except Exception as e: | |
logger.warning(f"Error cleaning up temporary files: {str(e)}") | |
# Add CORS headers to response | |
response = jsonify(combined_results) | |
response.headers.add('Access-Control-Allow-Origin', '*') | |
return response | |
except Exception as e: | |
logger.error(f"Error in analyze_interview_route: {str(e)}") | |
return jsonify({"error": str(e)}), 500 | |
def download_assessment(): | |
"""Download comprehensive assessment report.""" | |
# Add CORS headers for preflight requests | |
if request.method == 'OPTIONS': | |
headers = { | |
'Access-Control-Allow-Origin': '*', | |
'Access-Control-Allow-Methods': 'POST, OPTIONS', | |
'Access-Control-Allow-Headers': 'Content-Type', | |
'Access-Control-Max-Age': '86400' # 24 hours | |
} | |
return ('', 204, headers) | |
try: | |
data = request.json | |
if not data: | |
return jsonify({"error": "No data provided"}), 400 | |
# Create Excel writer object | |
output = io.BytesIO() | |
with pd.ExcelWriter(output, engine='xlsxwriter') as writer: | |
workbook = writer.book | |
# Define formats | |
header_format = workbook.add_format({ | |
'bold': True, | |
'bg_color': '#CCCCCC', | |
'border': 1 | |
}) | |
cell_format = workbook.add_format({ | |
'border': 1 | |
}) | |
# Summary Sheet | |
summary_data = { | |
'Metric': [ | |
'Technical Knowledge', | |
'Problem Solving', | |
'Communication', | |
'Overall Recommendation', | |
'Total Faces Detected' | |
], | |
'Score/Rating': [ | |
f"{data['candidate_assessment']['technical_knowledge']['score']}/10", | |
f"{data['candidate_assessment']['problem_solving']['score']}/10", | |
f"{data['candidate_assessment']['communication']['score']}/10", | |
data['overall_recommendation'], | |
data['emotion_analysis'].get('total_faces', 0) | |
] | |
} | |
summary_df = pd.DataFrame(summary_data) | |
summary_df.to_excel(writer, sheet_name='Summary', index=False) | |
# Format Summary sheet | |
summary_sheet = writer.sheets['Summary'] | |
summary_sheet.set_column('A:A', 25) | |
summary_sheet.set_column('B:B', 20) | |
# Apply formats to Summary sheet | |
for col_num, value in enumerate(summary_df.columns.values): | |
summary_sheet.write(0, col_num, value, header_format) | |
for row_num in range(len(summary_df)): | |
for col_num in range(len(summary_df.columns)): | |
summary_sheet.write(row_num + 1, col_num, summary_df.iloc[row_num, col_num], cell_format) | |
# Technical Assessment Sheet | |
tech_data = [] | |
# Add technical knowledge | |
tech_data.append(['Technical Knowledge', f"{data['candidate_assessment']['technical_knowledge']['score']}/10", '']) | |
tech_data.append(['Strengths', '', '']) | |
for strength in data['candidate_assessment']['technical_knowledge']['strengths']: | |
tech_data.append(['', '', strength]) | |
tech_data.append(['Areas for Improvement', '', '']) | |
for area in data['candidate_assessment']['technical_knowledge']['areas_for_improvement']: | |
tech_data.append(['', '', area]) | |
# Add problem solving | |
tech_data.append(['Problem Solving', f"{data['candidate_assessment']['problem_solving']['score']}/10", '']) | |
tech_data.append(['Strengths', '', '']) | |
for strength in data['candidate_assessment']['problem_solving']['strengths']: | |
tech_data.append(['', '', strength]) | |
tech_data.append(['Areas for Improvement', '', '']) | |
for area in data['candidate_assessment']['problem_solving']['areas_for_improvement']: | |
tech_data.append(['', '', area]) | |
# Add communication | |
tech_data.append(['Communication', f"{data['candidate_assessment']['communication']['score']}/10", '']) | |
tech_data.append(['Strengths', '', '']) | |
for strength in data['candidate_assessment']['communication']['strengths']: | |
tech_data.append(['', '', strength]) | |
tech_data.append(['Areas for Improvement', '', '']) | |
for area in data['candidate_assessment']['communication']['areas_for_improvement']: | |
tech_data.append(['', '', area]) | |
# Create Technical Assessment dataframe | |
tech_df = pd.DataFrame(tech_data, columns=['Category', 'Score', 'Details']) | |
tech_df.to_excel(writer, sheet_name='Technical Assessment', index=False) | |
# Format Technical Assessment sheet | |
tech_sheet = writer.sheets['Technical Assessment'] | |
tech_sheet.set_column('A:A', 25) | |
tech_sheet.set_column('B:B', 15) | |
tech_sheet.set_column('C:C', 60) | |
# Apply formats to Technical Assessment sheet | |
for col_num, value in enumerate(tech_df.columns.values): | |
tech_sheet.write(0, col_num, value, header_format) | |
# Question Analysis Sheet | |
question_data = [] | |
for qa in data['question_analysis']: | |
question_data.append([ | |
qa['question'], | |
qa['answer_quality'], | |
qa['feedback'] | |
]) | |
question_df = pd.DataFrame(question_data, columns=['Question', 'Answer Quality', 'Feedback']) | |
question_df.to_excel(writer, sheet_name='Question Analysis', index=False) | |
# Format Question Analysis sheet | |
qa_sheet = writer.sheets['Question Analysis'] | |
qa_sheet.set_column('A:A', 40) | |
qa_sheet.set_column('B:B', 15) | |
qa_sheet.set_column('C:C', 60) | |
# Apply formats to Question Analysis sheet | |
for col_num, value in enumerate(question_df.columns.values): | |
qa_sheet.write(0, col_num, value, header_format) | |
# Emotion Analysis Sheet | |
if 'emotion_analysis' in data and 'emotion_percentages' in data['emotion_analysis']: | |
emotion_data = { | |
'Emotion': list(data['emotion_analysis']['emotion_percentages'].keys()), | |
'Percentage': list(data['emotion_analysis']['emotion_percentages'].values()), | |
'Count': [data['emotion_analysis']['emotion_counts'].get(emotion, 0) | |
for emotion in data['emotion_analysis']['emotion_percentages'].keys()] | |
} | |
emotion_df = pd.DataFrame(emotion_data) | |
emotion_df.to_excel(writer, sheet_name='Emotion Analysis', index=False) | |
# Format Emotion Analysis sheet | |
emotion_sheet = writer.sheets['Emotion Analysis'] | |
emotion_sheet.set_column('A:A', 15) | |
emotion_sheet.set_column('B:B', 15) | |
emotion_sheet.set_column('C:C', 15) | |
# Apply formats to Emotion Analysis sheet | |
for col_num, value in enumerate(emotion_df.columns.values): | |
emotion_sheet.write(0, col_num, value, header_format) | |
# Add a chart | |
chart = workbook.add_chart({'type': 'pie'}) | |
chart.add_series({ | |
'name': 'Emotions', | |
'categories': ['Emotion Analysis', 1, 0, len(emotion_df), 0], | |
'values': ['Emotion Analysis', 1, 1, len(emotion_df), 1], | |
'data_labels': {'percentage': True} | |
}) | |
chart.set_title({'name': 'Emotion Distribution'}) | |
chart.set_style(10) | |
emotion_sheet.insert_chart('E2', chart, {'x_scale': 1.5, 'y_scale': 1.5}) | |
# Transcript Sheet | |
if 'transcription' in data: | |
transcript_data = {'Transcript': [data['transcription']]} | |
transcript_df = pd.DataFrame(transcript_data) | |
transcript_df.to_excel(writer, sheet_name='Transcript', index=False) | |
# Format Transcript sheet | |
transcript_sheet = writer.sheets['Transcript'] | |
transcript_sheet.set_column('A:A', 100) | |
# Apply formats to Transcript sheet | |
transcript_sheet.write(0, 0, 'Transcript', header_format) | |
# Overall Feedback Sheet | |
overall_data = {'Overall Feedback': [data['overall_feedback']]} | |
overall_df = pd.DataFrame(overall_data) | |
overall_df.to_excel(writer, sheet_name='Overall Feedback', index=False) | |
# Format Overall Feedback sheet | |
overall_sheet = writer.sheets['Overall Feedback'] | |
overall_sheet.set_column('A:A', 100) | |
# Apply formats to Overall Feedback sheet | |
overall_sheet.write(0, 0, 'Overall Feedback', header_format) | |
# Prepare the output file for download | |
output.seek(0) | |
candidate_name = data.get('candidate_name', 'Candidate').replace(' ', '_') | |
role_applied = data.get('role_applied', 'Role').replace(' ', '_') | |
filename = f"{candidate_name}_{role_applied}_Assessment.xlsx" | |
# Create response with appropriate headers | |
response = send_file( | |
output, | |
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', | |
as_attachment=True, | |
download_name=filename | |
) | |
# Add CORS headers | |
response.headers.add('Access-Control-Allow-Origin', '*') | |
return response | |
except Exception as e: | |
logger.error(f"Error generating assessment report: {str(e)}") | |
return jsonify({"error": f"Failed to generate assessment report: {str(e)}"}), 500 | |
if __name__ == "__main__": | |
# Setup Flask app with proper settings for production | |
PORT = int(os.environ.get("PORT", 5000)) | |
app.run(host="0.0.0.0", port=PORT, debug=False, threaded=True) |