|
import os |
|
import tempfile |
|
|
|
import numpy as np |
|
from PIL import Image, ImageOps |
|
from moviepy import VideoFileClip |
|
|
|
from eval_suite.prompts_raw import _image_eval |
|
from eval_suite.utils import extract_json, convert_score_fields, calculate_geometric_mean |
|
from mllm_tools.utils import _prepare_text_image_inputs |
|
from src.core.parse_video import image_with_most_non_black_space |
|
|
|
def extract_key_frames(video_path, output_dir, num_chunks): |
|
"""Extract key frames from a video by dividing it into chunks and selecting representative frames. |
|
|
|
Args: |
|
video_path (str): Path to the input video file |
|
output_dir (str): Directory where extracted frames will be saved |
|
num_chunks (int): Number of chunks to divide the video into |
|
|
|
Returns: |
|
list: List of paths to the extracted key frames |
|
""" |
|
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
clip = VideoFileClip(video_path) |
|
frames = list(clip.iter_frames(fps=1)) |
|
|
|
total_frames = len(frames) |
|
if total_frames == 0: |
|
print("No frames extracted from the video.") |
|
return [] |
|
|
|
|
|
frames_per_chunk = total_frames // num_chunks |
|
num_chunks = min(num_chunks, (total_frames + frames_per_chunk - 1) // frames_per_chunk) |
|
|
|
key_frames = [] |
|
|
|
|
|
for i in range(num_chunks): |
|
start_idx = i * frames_per_chunk |
|
end_idx = min((i + 1) * frames_per_chunk, total_frames) |
|
chunk_frames = frames[start_idx:end_idx] |
|
|
|
if chunk_frames: |
|
|
|
output_path = os.path.join(output_dir, f"key_frame_{i+1}.jpg") |
|
result = image_with_most_non_black_space(chunk_frames, output_path) |
|
else: |
|
print(f"No frames in chunk {i+1}. Skipping.") |
|
result = None |
|
|
|
if result is not None: |
|
key_frames.append(output_path) |
|
clip.close() |
|
|
|
return key_frames |
|
|
|
|
|
def evaluate_sampled_images(model, video_path, description="No description provided", num_chunks=10, output_folder=None): |
|
"""Evaluate sampled frames from a video using an image evaluation model. |
|
|
|
Args: |
|
model: The image evaluation model to use |
|
video_path (str): Path to the input video file |
|
description (str, optional): Description of the video content. Defaults to "No description provided" |
|
num_chunks (int, optional): Number of chunks to divide the video into. Defaults to 10 |
|
output_folder (str, optional): Directory for temporary files. Defaults to None |
|
|
|
Returns: |
|
dict: Dictionary containing evaluation scores and individual frame assessments with keys: |
|
- evaluation: Dictionary of averaged scores for each criterion |
|
- image_chunks: List of individual frame evaluation results |
|
""" |
|
with tempfile.TemporaryDirectory(dir=output_folder) as temp_dir: |
|
key_frames = extract_key_frames(video_path, temp_dir, num_chunks) |
|
|
|
prompt = _image_eval.format(description=description) |
|
|
|
responses = [] |
|
for key_frame in key_frames: |
|
inputs = _prepare_text_image_inputs(prompt, key_frame) |
|
response = model(inputs) |
|
response_json = extract_json(response) |
|
response_json = convert_score_fields(response_json) |
|
responses.append(response_json) |
|
|
|
criteria = list(responses[0]["evaluation"].keys()) |
|
scores_dict = {c: [] for c in criteria} |
|
for response in responses: |
|
for key, val in response["evaluation"].items(): |
|
scores_dict[key].append(val["score"]) |
|
|
|
res_score = {} |
|
for key, scores in scores_dict.items(): |
|
res_score[key] = {"score": calculate_geometric_mean(scores)} |
|
|
|
return { |
|
"evaluation": res_score, |
|
"image_chunks": responses |
|
} |
|
|