File size: 6,292 Bytes
8fb7841 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
import os
import cv2
import tempfile
from dotenv import load_dotenv
from mllm_tools.utils import _prepare_text_video_inputs
from eval_suite.prompts_raw import _video_eval_new
from eval_suite.utils import extract_json, convert_score_fields
load_dotenv()
def reduce_video_framerate(input_path, target_fps=1, output_path=None):
"""
Reduces the frame rate of a video by only keeping frames at the target interval.
Args:
input_path (str): Path to the input video
target_fps (int): Target frames per second (default: 1)
output_path (str, optional): Path to save the processed video. If None, uses a temporary file.
Returns:
str: Path to the processed video
Raises:
ValueError: If input video cannot be opened or has invalid FPS
RuntimeError: If video writer initialization fails or output video creation fails
"""
cap = cv2.VideoCapture(input_path)
if not cap.isOpened():
raise ValueError(f"Could not open input video: {input_path}")
original_fps = cap.get(cv2.CAP_PROP_FPS)
if original_fps <= 0:
raise ValueError(f"Invalid FPS ({original_fps}) detected in input video")
frame_interval = int(original_fps / target_fps)
# Get video properties
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Use provided output path or create temporary file
if output_path is None:
temp_output = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False)
output_path = temp_output.name
# Ensure output directory exists
os.makedirs(os.path.dirname(output_path), exist_ok=True)
# Try different codecs in order of preference
codecs = [
('avc1', '.mp4'), # H.264 codec
('mp4v', '.mp4'), # MP4V codec
('XVID', '.avi'), # XVID codec
('MJPG', '.avi'), # Motion JPEG codec
]
success = False
for codec, ext in codecs:
if output_path.endswith('.mp4') and not ext.endswith('.mp4'):
# If we're switching to AVI format, change the extension
output_path = output_path[:-4] + ext
fourcc = cv2.VideoWriter_fourcc(*codec)
out = cv2.VideoWriter(output_path, fourcc, target_fps, (width, height))
if out.isOpened():
success = True
print(f"Successfully initialized video writer with codec: {codec}")
break
else:
out.release()
if os.path.exists(output_path):
os.remove(output_path)
if not success:
raise RuntimeError("Could not initialize video writer with any available codec")
frame_count = 0
frames_written = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Only write frames at the specified interval
if frame_count % frame_interval == 0:
out.write(frame)
frames_written += 1
frame_count += 1
cap.release()
out.release()
# Verify the output
verify_cap = cv2.VideoCapture(output_path)
if not verify_cap.isOpened():
raise RuntimeError(f"Failed to create output video at {output_path}")
actual_fps = verify_cap.get(cv2.CAP_PROP_FPS)
total_frames = verify_cap.get(cv2.CAP_PROP_FRAME_COUNT)
verify_cap.release()
if actual_fps <= 0:
print("Warning: Output video reports invalid FPS. This might be a codec issue.")
actual_fps = target_fps # Use target FPS for duration calculation
print(f"Created video with {frames_written} frames at {actual_fps} FPS")
print(f"Total duration: {total_frames/actual_fps:.2f} seconds")
print(f"Video saved to: {output_path}")
return output_path
def evaluate_video_chunk_new(model, video_path, transcript="No transcript provided", description="No description provided",
save_processed_video=None, target_fps=None, retry_limit=5):
"""
Evaluate a single video chunk using a multimodal model.
Args:
model: The multimodal model to use for evaluation
video_path (str): Path to the video file to evaluate
transcript (str, optional): Video transcript text. Defaults to "No transcript provided"
description (str, optional): Video description text. Defaults to "No description provided"
save_processed_video (str, optional): Path to save processed video. If None, uses temporary file
target_fps (int, optional): Target frames per second for video processing. If None, no processing
retry_limit (int, optional): Maximum number of retry attempts. Defaults to 5
Returns:
dict: Evaluation results as a JSON object with scores converted to integers
Raises:
FileNotFoundError: If video file does not exist
Exception: If evaluation fails after all retry attempts
"""
if not os.path.exists(video_path):
raise FileNotFoundError(f"Video file not found: {video_path}")
# Only process video if target_fps is specified
if target_fps is not None:
processed_video_path = reduce_video_framerate(video_path, target_fps=target_fps, output_path=save_processed_video)
video_to_use = processed_video_path
else:
video_to_use = video_path
prompt = _video_eval_new.format(description=description)
inputs = _prepare_text_video_inputs(prompt, video_to_use)
try:
for attempt in range(retry_limit):
try:
response = model(inputs)
response_json = extract_json(response)
response_json = convert_score_fields(response_json)
return response_json
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt + 1 == retry_limit:
print("Reached maximum retry limit. Evaluation failed.")
raise
finally:
# Clean up the temporary processed video if we created one
if target_fps is not None and save_processed_video is None and os.path.exists(processed_video_path):
os.unlink(processed_video_path) |