|
import json |
|
import os |
|
import cv2 |
|
|
|
from transformers import BlipProcessor, BlipForConditionalGeneration |
|
|
|
|
|
model_id = "Salesforce/blip-image-captioning-large" |
|
captioning_processor = BlipProcessor.from_pretrained(model_id) |
|
captioning_model = BlipForConditionalGeneration.from_pretrained(model_id) |
|
|
|
|
|
def extract_frames(video_path, output_folder, interval_ms=2000) -> None: |
|
""" |
|
Extracts frames from a video into an output folder at a specified time |
|
interval. Frames are saved as *.jpg images. |
|
|
|
Args: |
|
video_path: The file name of the video to sample. |
|
output_folder: The output directory for the extracted frames. |
|
interval_ms: The sampling interval in milliseconds. |
|
NOTE: No anti-aliasing filter is applied. |
|
""" |
|
if not os.path.exists(output_folder): |
|
os.makedirs(output_folder) |
|
|
|
cap = cv2.VideoCapture(video_path) |
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
|
interval_frames = int(fps * interval_ms * 0.001) |
|
|
|
frame_count = 0 |
|
saved_frame_count = 0 |
|
|
|
while True: |
|
ret, frame = cap.read() |
|
if not ret: |
|
break |
|
|
|
|
|
if frame_count % interval_frames == 0: |
|
frame_filename = os.path.join( |
|
output_folder, |
|
f"frame_{saved_frame_count:04d}.jpg" |
|
) |
|
cv2.imwrite(frame_filename, frame) |
|
saved_frame_count += 1 |
|
|
|
frame_count += 1 |
|
|
|
cap.release() |
|
|
|
|
|
def extract_frame_captions( |
|
video_path, |
|
interval_ms=2000 |
|
) -> str: |
|
""" |
|
Extracts frame captions from a video at a specified time |
|
interval. |
|
|
|
Args: |
|
video_path: The file name of the video to sample. |
|
interval_ms: The sampling interval in milliseconds. |
|
NOTE: No anti-aliasing filter is applied. |
|
|
|
Returns: |
|
Frame descriptions as a list of strings. |
|
""" |
|
cap = cv2.VideoCapture(video_path) |
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
|
interval_frames = int(fps * interval_ms * 0.001) |
|
|
|
frame_count = 0 |
|
saved_frame_count = 0 |
|
|
|
captions = [] |
|
while True: |
|
ret, frame = cap.read() |
|
if not ret: |
|
break |
|
|
|
|
|
if frame_count % interval_frames == 0: |
|
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
inputs = captioning_processor( |
|
frame, |
|
return_tensors="pt" |
|
) |
|
out = captioning_model.generate(**inputs) |
|
cur_caption = ( |
|
captioning_processor.decode(out[0], skip_special_tokens=True) |
|
) |
|
captions += [cur_caption] |
|
saved_frame_count += 1 |
|
|
|
frame_count += 1 |
|
|
|
cap.release() |
|
return json.dumps(captions) |
|
|