|
import os |
|
import json |
|
import argparse |
|
import tempfile |
|
from typing import Dict, List, Union |
|
from datetime import datetime |
|
|
|
from dotenv import load_dotenv |
|
from moviepy import VideoFileClip |
|
|
|
from mllm_tools.litellm import LiteLLMWrapper |
|
from mllm_tools.gemini import GeminiWrapper |
|
from eval_suite.utils import calculate_geometric_mean |
|
from eval_suite.text_utils import parse_srt_to_text, fix_transcript, evaluate_text |
|
from eval_suite.video_utils import evaluate_video_chunk_new |
|
from eval_suite.image_utils import evaluate_sampled_images |
|
|
|
load_dotenv() |
|
|
|
with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), "src", "utils", "allowed_models.json")) as f: |
|
ALLOWED_MODELS = json.load(f)["allowed_models"] |
|
|
|
|
|
def combine_results(output_folder: str, combined_file: str, results: Dict[str, Dict]) -> None: |
|
""" |
|
Combine all evaluation results into a single file. |
|
|
|
Args: |
|
output_folder (str): Directory to store the combined file. |
|
combined_file (str): Name of the combined file. |
|
results (Dict[str, Dict]): Dictionary of evaluation results with file names as keys. |
|
|
|
Returns: |
|
None |
|
""" |
|
combined_path = os.path.join(output_folder, combined_file) |
|
with open(combined_path, 'w') as output_file: |
|
json.dump(results, output_file, indent=4) |
|
|
|
|
|
def save_individual_result(output_folder: str, file_name: str, result: Dict) -> None: |
|
""" |
|
Save individual evaluation result to a file. |
|
|
|
Args: |
|
output_folder (str): Directory to store the evaluation file. |
|
file_name (str): Name of the file. |
|
result (Dict): Evaluation result. |
|
|
|
Returns: |
|
None |
|
""" |
|
current_time = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
result_file = f"evaluation_{file_name}_{current_time}.json" |
|
os.makedirs(output_folder, exist_ok=True) |
|
result_path = os.path.join(output_folder, result_file) |
|
with open(result_path, 'w') as output_file: |
|
json.dump(result, output_file, indent=4) |
|
|
|
|
|
def evaluate_text_file(model, transcript_path, retry_limit): |
|
""" |
|
Evaluate a text file using the provided model. |
|
|
|
Args: |
|
model: The model to use for evaluation. |
|
transcript_path (str): Path to the transcript file (.srt or .txt). |
|
retry_limit (int): Number of retry attempts for evaluation. |
|
|
|
Returns: |
|
Dict or None: Evaluation results if successful, None if file format unsupported. |
|
""" |
|
if not transcript_path.endswith(('.srt', '.txt')): |
|
print(f"Skipping {transcript_path}: Unsupported file format for text evaluation.") |
|
return None |
|
|
|
if transcript_path.endswith(".srt"): |
|
transcript = parse_srt_to_text(transcript_path) |
|
elif transcript_path.endswith(".txt"): |
|
with open(transcript_path) as f: |
|
transcript = f.read().strip() |
|
else: |
|
raise ValueError("Unrecognized transcript file format.") |
|
|
|
capital_letter_proportion = sum(1 for c in transcript if c.isupper()) / sum(1 for c in transcript if c.isalpha()) |
|
if capital_letter_proportion < 0.01: |
|
transcript = fix_transcript(model, transcript) |
|
|
|
print(f"Performing text evaluation: {os.path.basename(transcript_path)}") |
|
result = evaluate_text(model, transcript, retry_limit) |
|
return result |
|
|
|
|
|
def evaluate_video_file(model, video_path, transcript_path, description_path, target_fps=None, output_folder=None): |
|
""" |
|
Evaluate a video file using the provided model. |
|
|
|
Args: |
|
model: The model to use for evaluation. |
|
video_path (str): Path to the video file. |
|
transcript_path (str): Path to the transcript file. |
|
description_path (str): Path to the description file. |
|
target_fps (int, optional): Target frames per second for video processing. |
|
output_folder (str, optional): Directory to store output files. |
|
|
|
Returns: |
|
Dict or None: Evaluation results if successful, None if file format unsupported. |
|
""" |
|
if not video_path.endswith(('.mp4', '.mkv')): |
|
print(f"Skipping {video_path}: Unsupported file format for video evaluation.") |
|
return None |
|
|
|
moviepy_temp_dir = os.path.join(output_folder, "moviepy_temp") |
|
|
|
|
|
num_chunks = 10 |
|
with VideoFileClip(video_path) as clip: |
|
duration = clip.duration |
|
chunk_duration = duration / num_chunks |
|
results = [] |
|
|
|
|
|
temp_dir_parent = output_folder or os.getcwd() |
|
with tempfile.TemporaryDirectory(dir=temp_dir_parent) as temp_dir: |
|
for i in range(10): |
|
start = i * chunk_duration |
|
end = min(start + chunk_duration, duration) |
|
chunk = clip.subclipped(start, end) |
|
chunk_path = os.path.join(temp_dir, f"chunk_{i+1}.mp4") |
|
|
|
temp_audiofile = os.path.join(moviepy_temp_dir, f"temp_audio_chunk_{i+1}.m4a") |
|
chunk.write_videofile( |
|
chunk_path, |
|
codec="libx264", |
|
audio_codec="aac", |
|
temp_audiofile=temp_audiofile, |
|
audio_bitrate="192k", |
|
preset="ultrafast", |
|
logger=None |
|
) |
|
|
|
processed_videos_dir = os.path.join(output_folder, "processed_videos") |
|
save_path = os.path.join(processed_videos_dir, f"processed_chunk_{i+1}.mp4") |
|
result = evaluate_video_chunk_new( |
|
model, |
|
chunk_path, |
|
transcript_path, |
|
description_path, |
|
target_fps=target_fps, |
|
save_processed_video=save_path |
|
) |
|
results.append(result) |
|
|
|
score_dict = {} |
|
for key in results[0]["evaluation"].keys(): |
|
score_dict[key] = [] |
|
for result in results: |
|
score_dict[key].append(result["evaluation"][key]["score"]) |
|
|
|
evaluation = {} |
|
for key, scores in score_dict.items(): |
|
evaluation[key] = {"score": calculate_geometric_mean(scores)} |
|
|
|
result_json = { |
|
"evaluation": evaluation, |
|
"video_chunks": results |
|
} |
|
return result_json |
|
|
|
|
|
def extract_scores(data: Union[Dict, List]) -> List[int]: |
|
""" |
|
Extract all score values from a nested dictionary or list structure. |
|
|
|
Args: |
|
data (Union[Dict, List]): The data structure to extract scores from. |
|
|
|
Returns: |
|
List[int]: List of extracted score values. |
|
""" |
|
scores = [] |
|
if isinstance(data, dict): |
|
for key, value in data.items(): |
|
if "chunks" in key: |
|
continue |
|
elif isinstance(value, dict) or isinstance(value, list): |
|
scores.extend(extract_scores(value)) |
|
elif key == 'score': |
|
scores.append(value) |
|
elif isinstance(data, list): |
|
for item in data: |
|
scores.extend(extract_scores(item)) |
|
return scores |
|
|
|
|
|
def calculate_overall_score(result: Dict) -> float: |
|
""" |
|
Calculate the overall score from evaluation results. |
|
|
|
Args: |
|
result (Dict): Dictionary containing evaluation results. |
|
|
|
Returns: |
|
float: The calculated overall score. |
|
""" |
|
scores = extract_scores(result) |
|
overall_score = calculate_geometric_mean(scores) |
|
return overall_score |
|
|
|
|
|
def process_topic_name(topic_name: str) -> str: |
|
""" |
|
Process a topic name by capitalizing words and handling special characters. |
|
|
|
Args: |
|
topic_name (str): The topic name to process. |
|
|
|
Returns: |
|
str: The processed topic name. |
|
""" |
|
words = topic_name.replace("_s_", "'s_").split("_") |
|
return " ".join([word.capitalize() for word in words]) |
|
|
|
|
|
def merge_dicts(dict1: dict, dict2: dict) -> dict: |
|
""" |
|
Recursively merge two dictionaries. |
|
|
|
Args: |
|
dict1 (dict): First dictionary. |
|
dict2 (dict): Second dictionary. |
|
|
|
Returns: |
|
dict: Merged dictionary. |
|
""" |
|
merged = dict1.copy() |
|
for key, value in dict2.items(): |
|
if key in merged and isinstance(merged[key], dict) and isinstance(value, dict): |
|
merged[key] = merge_dicts(merged[key], value) |
|
else: |
|
merged[key] = value |
|
return merged |
|
|
|
|
|
def process_theorem(models, file_path: str, eval_type: str, retry_limit: int, |
|
target_fps: int = None, use_parent_folder_as_topic: bool = False, |
|
output_folder: str = None) -> tuple[str, dict]: |
|
""" |
|
Process a theorem file or directory for evaluation. |
|
|
|
Args: |
|
models: Dictionary of models for different evaluation types. |
|
file_path (str): Path to the file or directory to evaluate. |
|
eval_type (str): Type of evaluation to perform. |
|
retry_limit (int): Number of retry attempts. |
|
target_fps (int, optional): Target frames per second for video processing. |
|
use_parent_folder_as_topic (bool, optional): Use parent folder name as topic. |
|
output_folder (str, optional): Directory to store output files. |
|
|
|
Returns: |
|
tuple[str, dict]: Tuple of file name and evaluation results. |
|
""" |
|
ext_map = { |
|
'text': ('.txt', '.srt'), |
|
'video': ('.mp4', '.mkv') |
|
} |
|
|
|
|
|
if os.path.isfile(file_path): |
|
file_ext = os.path.splitext(file_path)[1].lower() |
|
file_name = os.path.basename(file_path) |
|
|
|
if eval_type == "text" and file_ext in ext_map['text']: |
|
return file_name, evaluate_text_file(models['text'], file_path, retry_limit) |
|
elif eval_type == "video" and file_ext in ext_map['video']: |
|
if use_parent_folder_as_topic: |
|
topic_name = os.path.basename(os.path.dirname(file_path)) |
|
else: |
|
topic_name = None |
|
topic_name = process_topic_name(topic_name) |
|
return file_name, evaluate_video_file(models['video'], file_path, None, topic_name, target_fps, output_folder) |
|
elif eval_type == "image" and file_ext in ext_map['video']: |
|
if use_parent_folder_as_topic: |
|
topic_name = os.path.basename(os.path.dirname(file_path)) |
|
else: |
|
topic_name = None |
|
topic_name = process_topic_name(topic_name) |
|
return file_name, evaluate_sampled_images(models['image'], file_path, topic_name, num_chunks=10, output_folder=output_folder) |
|
elif eval_type == "all": |
|
raise ValueError("Evaluation type 'all' is not supported for a single file. Try passing a folder with both a video and a subtitle file.") |
|
else: |
|
raise ValueError(f"File type of {file_path} does not match evaluation type {eval_type!r}") |
|
|
|
|
|
theorem_dir = file_path |
|
all_files = os.listdir(theorem_dir) |
|
|
|
|
|
transcript_file_candidates = [f for f in all_files if f.endswith(ext_map['text']) and not f.endswith('_scene_outline.txt')] |
|
srt_files = [f for f in transcript_file_candidates if f.endswith('.srt')] |
|
txt_files = [f for f in transcript_file_candidates if f.endswith('.txt')] |
|
|
|
transcript_path = None |
|
if srt_files: |
|
transcript_path = os.path.join(theorem_dir, srt_files[0]) |
|
elif txt_files: |
|
transcript_path = os.path.join(theorem_dir, txt_files[0]) |
|
|
|
video_file_candidates = [f for f in all_files if f.endswith(ext_map['video'])] |
|
video_path = os.path.join(theorem_dir, video_file_candidates[0]) if len(video_file_candidates) == 1 else None |
|
|
|
topic_name = os.path.basename(theorem_dir) |
|
topic_name = process_topic_name(topic_name) |
|
|
|
if not video_path: |
|
print(f"Skipping {theorem_dir}: No video file found") |
|
return None, None |
|
|
|
text_result = video_result = image_result = None |
|
if eval_type == "text" or eval_type == "all": |
|
if transcript_path is None: |
|
print(f"Warning: No suitable transcript file found in {theorem_dir}") |
|
else: |
|
text_result = evaluate_text_file(models['text'], transcript_path, retry_limit) |
|
if eval_type == "video" or eval_type == "all": |
|
assert video_path is not None, f"Expected 1 video file, got {len(video_file_candidates)} for {theorem_dir}" |
|
video_result = evaluate_video_file(models['video'], video_path, transcript_path, topic_name, target_fps, output_folder) |
|
if eval_type == "image" or eval_type == "all": |
|
assert video_path is not None, f"Expected 1 video file, got {len(video_file_candidates)} for {theorem_dir}" |
|
image_result = evaluate_sampled_images(models['image'], video_path, topic_name, num_chunks=10, output_folder=output_folder) |
|
|
|
if eval_type == "all": |
|
result = {} |
|
if text_result: |
|
result = merge_dicts(result, text_result) |
|
if video_result: |
|
result = merge_dicts(result, video_result) |
|
if image_result: |
|
result = merge_dicts(result, image_result) |
|
if result: |
|
result["evaluation"]["overall_score"] = calculate_overall_score(result) |
|
else: |
|
result = text_result if eval_type == "text" else video_result if eval_type == "video" else image_result if eval_type == "image" else None |
|
|
|
file_name = os.path.basename(theorem_dir) |
|
return file_name, result |
|
|
|
|
|
def main(): |
|
""" |
|
Main function to run the evaluation script. |
|
|
|
Parses command line arguments and orchestrates the evaluation process |
|
for text, video, and image content using specified AI models. |
|
""" |
|
parser = argparse.ArgumentParser(description='Automatic evaluation of theorem explanation videos with LLMs') |
|
parser.add_argument('--model_text', type=str, |
|
choices=ALLOWED_MODELS, |
|
default='azure/gpt-4o', |
|
help='Select the AI model to use for text evaluation') |
|
parser.add_argument('--model_video', type=str, |
|
choices=['gemini/gemini-1.5-pro-002', |
|
'gemini/gemini-2.0-flash-exp', |
|
'gemini/gemini-2.0-pro-exp-02-05'], |
|
default='gemini/gemini-1.5-pro-002', |
|
help='Select the AI model to use for video evaluation') |
|
parser.add_argument('--model_image', type=str, |
|
choices=ALLOWED_MODELS, |
|
default='azure/gpt-4o', |
|
help='Select the AI model to use for image evaluation') |
|
parser.add_argument('--eval_type', type=str, choices=['text', 'video', 'image', 'all'], default='all', help='Type of evaluation to perform') |
|
parser.add_argument('--file_path', type=str, help='Path to a file or a theorem folder', required=True) |
|
parser.add_argument('--output_folder', type=str, help='Directory to store the evaluation files', required=True) |
|
parser.add_argument('--retry_limit', type=int, default=3, help='Number of retry attempts for each inference') |
|
parser.add_argument('--combine', action='store_true', help='Combine all results into a single JSON file') |
|
parser.add_argument('--bulk_evaluate', action='store_true', help='Evaluate a folder of theorems together', default=False) |
|
parser.add_argument('--target_fps', type=int, help='Target FPS for video processing. If not set, original video FPS will be used', required=False) |
|
parser.add_argument('--use_parent_folder_as_topic', action='store_true', help='Use parent folder name as topic name for single file evaluation', default=True) |
|
parser.add_argument('--max_workers', type=int, default=4, help='Maximum number of concurrent workers for parallel processing') |
|
|
|
args = parser.parse_args() |
|
|
|
|
|
text_model = LiteLLMWrapper( |
|
model_name=args.model_text, |
|
temperature=0.0, |
|
) |
|
video_model = GeminiWrapper( |
|
model_name=args.model_video, |
|
temperature=0.0, |
|
) |
|
image_model = LiteLLMWrapper( |
|
model_name=args.model_image, |
|
temperature=0.0, |
|
) |
|
|
|
models = { |
|
'text': text_model, |
|
'video': video_model, |
|
'image': image_model |
|
} |
|
|
|
theorem_dirs = [] |
|
if args.bulk_evaluate: |
|
assert os.path.isdir(args.file_path), "File path must be a folder for --bulk_evaluate" |
|
for root, dirnames, _ in os.walk(args.file_path): |
|
if not any(f.endswith(".mp4") for f in os.listdir(root)): |
|
continue |
|
|
|
theorem_dirs.append(root) |
|
elif os.path.isdir(args.file_path): |
|
assert any(f.endswith(".mp4") for f in os.listdir(args.file_path)), "The provided folder must contain a video file" |
|
|
|
theorem_dirs.append(args.file_path) |
|
|
|
|
|
os.makedirs(args.output_folder, exist_ok=True) |
|
moviepy_temp_dir = os.path.join(args.output_folder, "moviepy_temp") |
|
os.makedirs(moviepy_temp_dir, exist_ok=True) |
|
VideoFileClip.DEFAULT_TEMP_DIR = moviepy_temp_dir |
|
|
|
processed_videos_dir = os.path.join(args.output_folder, "processed_videos") |
|
os.makedirs(processed_videos_dir, exist_ok=True) |
|
|
|
results = {} |
|
if theorem_dirs: |
|
for theorem_dir in theorem_dirs: |
|
file_name, result = process_theorem( |
|
models, |
|
theorem_dir, |
|
args.eval_type, |
|
args.retry_limit, |
|
args.target_fps, |
|
args.use_parent_folder_as_topic, |
|
args.output_folder |
|
) |
|
|
|
if result is not None: |
|
results[file_name] = result |
|
|
|
if not args.combine: |
|
save_individual_result(args.output_folder, file_name, result) |
|
else: |
|
file_name, result = process_theorem( |
|
models, |
|
args.file_path, |
|
args.eval_type, |
|
args.retry_limit, |
|
args.target_fps, |
|
args.use_parent_folder_as_topic, |
|
args.output_folder |
|
) |
|
|
|
if result is not None: |
|
results[file_name] = result |
|
|
|
if not args.combine: |
|
save_individual_result(args.output_folder, file_name, result) |
|
|
|
if args.combine: |
|
if len(results) > 1: |
|
current_time = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
combined_file = f"evaluation_{current_time}.json" |
|
combine_results(args.output_folder, combined_file, results) |
|
print("Combining results completed.") |
|
else: |
|
for file_name, result in results.items(): |
|
save_individual_result(args.output_folder, file_name, result) |
|
|
|
os.rmdir(moviepy_temp_dir) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|