|
import os |
|
import cv2 |
|
import tempfile |
|
|
|
from dotenv import load_dotenv |
|
|
|
from mllm_tools.utils import _prepare_text_video_inputs |
|
from eval_suite.prompts_raw import _video_eval_new |
|
from eval_suite.utils import extract_json, convert_score_fields |
|
|
|
load_dotenv() |
|
|
|
|
|
def reduce_video_framerate(input_path, target_fps=1, output_path=None): |
|
""" |
|
Reduces the frame rate of a video by only keeping frames at the target interval. |
|
|
|
Args: |
|
input_path (str): Path to the input video |
|
target_fps (int): Target frames per second (default: 1) |
|
output_path (str, optional): Path to save the processed video. If None, uses a temporary file. |
|
|
|
Returns: |
|
str: Path to the processed video |
|
|
|
Raises: |
|
ValueError: If input video cannot be opened or has invalid FPS |
|
RuntimeError: If video writer initialization fails or output video creation fails |
|
""" |
|
cap = cv2.VideoCapture(input_path) |
|
if not cap.isOpened(): |
|
raise ValueError(f"Could not open input video: {input_path}") |
|
|
|
original_fps = cap.get(cv2.CAP_PROP_FPS) |
|
if original_fps <= 0: |
|
raise ValueError(f"Invalid FPS ({original_fps}) detected in input video") |
|
|
|
frame_interval = int(original_fps / target_fps) |
|
|
|
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
|
|
|
if output_path is None: |
|
temp_output = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) |
|
output_path = temp_output.name |
|
|
|
|
|
os.makedirs(os.path.dirname(output_path), exist_ok=True) |
|
|
|
|
|
codecs = [ |
|
('avc1', '.mp4'), |
|
('mp4v', '.mp4'), |
|
('XVID', '.avi'), |
|
('MJPG', '.avi'), |
|
] |
|
|
|
success = False |
|
for codec, ext in codecs: |
|
if output_path.endswith('.mp4') and not ext.endswith('.mp4'): |
|
|
|
output_path = output_path[:-4] + ext |
|
|
|
fourcc = cv2.VideoWriter_fourcc(*codec) |
|
out = cv2.VideoWriter(output_path, fourcc, target_fps, (width, height)) |
|
|
|
if out.isOpened(): |
|
success = True |
|
print(f"Successfully initialized video writer with codec: {codec}") |
|
break |
|
else: |
|
out.release() |
|
if os.path.exists(output_path): |
|
os.remove(output_path) |
|
|
|
if not success: |
|
raise RuntimeError("Could not initialize video writer with any available codec") |
|
|
|
frame_count = 0 |
|
frames_written = 0 |
|
while cap.isOpened(): |
|
ret, frame = cap.read() |
|
if not ret: |
|
break |
|
|
|
|
|
if frame_count % frame_interval == 0: |
|
out.write(frame) |
|
frames_written += 1 |
|
frame_count += 1 |
|
|
|
cap.release() |
|
out.release() |
|
|
|
|
|
verify_cap = cv2.VideoCapture(output_path) |
|
if not verify_cap.isOpened(): |
|
raise RuntimeError(f"Failed to create output video at {output_path}") |
|
|
|
actual_fps = verify_cap.get(cv2.CAP_PROP_FPS) |
|
total_frames = verify_cap.get(cv2.CAP_PROP_FRAME_COUNT) |
|
verify_cap.release() |
|
|
|
if actual_fps <= 0: |
|
print("Warning: Output video reports invalid FPS. This might be a codec issue.") |
|
actual_fps = target_fps |
|
|
|
print(f"Created video with {frames_written} frames at {actual_fps} FPS") |
|
print(f"Total duration: {total_frames/actual_fps:.2f} seconds") |
|
print(f"Video saved to: {output_path}") |
|
|
|
return output_path |
|
|
|
|
|
def evaluate_video_chunk_new(model, video_path, transcript="No transcript provided", description="No description provided", |
|
save_processed_video=None, target_fps=None, retry_limit=5): |
|
""" |
|
Evaluate a single video chunk using a multimodal model. |
|
|
|
Args: |
|
model: The multimodal model to use for evaluation |
|
video_path (str): Path to the video file to evaluate |
|
transcript (str, optional): Video transcript text. Defaults to "No transcript provided" |
|
description (str, optional): Video description text. Defaults to "No description provided" |
|
save_processed_video (str, optional): Path to save processed video. If None, uses temporary file |
|
target_fps (int, optional): Target frames per second for video processing. If None, no processing |
|
retry_limit (int, optional): Maximum number of retry attempts. Defaults to 5 |
|
|
|
Returns: |
|
dict: Evaluation results as a JSON object with scores converted to integers |
|
|
|
Raises: |
|
FileNotFoundError: If video file does not exist |
|
Exception: If evaluation fails after all retry attempts |
|
""" |
|
if not os.path.exists(video_path): |
|
raise FileNotFoundError(f"Video file not found: {video_path}") |
|
|
|
|
|
if target_fps is not None: |
|
processed_video_path = reduce_video_framerate(video_path, target_fps=target_fps, output_path=save_processed_video) |
|
video_to_use = processed_video_path |
|
else: |
|
video_to_use = video_path |
|
|
|
prompt = _video_eval_new.format(description=description) |
|
inputs = _prepare_text_video_inputs(prompt, video_to_use) |
|
|
|
try: |
|
for attempt in range(retry_limit): |
|
try: |
|
response = model(inputs) |
|
response_json = extract_json(response) |
|
response_json = convert_score_fields(response_json) |
|
|
|
return response_json |
|
except Exception as e: |
|
print(f"Attempt {attempt + 1} failed: {e}") |
|
if attempt + 1 == retry_limit: |
|
print("Reached maximum retry limit. Evaluation failed.") |
|
raise |
|
finally: |
|
|
|
if target_fps is not None and save_processed_video is None and os.path.exists(processed_video_path): |
|
os.unlink(processed_video_path) |