|
import os |
|
import pysrt |
|
from moviepy import VideoFileClip |
|
import shutil |
|
from PIL import Image, ImageOps |
|
import numpy as np |
|
import speech_recognition as sr |
|
|
|
def get_images_from_video(video_path, fps=0.2): |
|
"""Extract frames from a video file at specified FPS. |
|
|
|
Args: |
|
video_path (str): Path to the video file. |
|
fps (float, optional): Frames per second to extract. Defaults to 0.2. |
|
|
|
Returns: |
|
list: List of frames as numpy arrays. |
|
""" |
|
clip = VideoFileClip(video_path) |
|
images = clip.iter_frames(fps=fps) |
|
return images |
|
|
|
def image_with_most_non_black_space(images, output_path, return_type="path"): |
|
"""Find and save the image with the most non-black space from a list of images. |
|
|
|
Args: |
|
images (list): List of image file paths, PIL Image objects, or numpy arrays. |
|
output_path (str): Path where the output image should be saved. |
|
return_type (str, optional): Type of return value - "path" or "image". Defaults to "path". |
|
|
|
Returns: |
|
Union[str, PIL.Image, None]: Path to saved image, PIL Image object, or None if no valid image found. |
|
""" |
|
max_non_black_area = 0 |
|
image_with_max_non_black_space = None |
|
|
|
for img in images: |
|
try: |
|
|
|
if isinstance(img, str): |
|
image = Image.open(img) |
|
elif isinstance(img, Image.Image): |
|
image = img |
|
elif isinstance(img, np.ndarray): |
|
image = Image.fromarray(img) |
|
else: |
|
print(f"Unsupported type: {type(img)}. Skipping.") |
|
continue |
|
|
|
|
|
gray = ImageOps.grayscale(image) |
|
|
|
|
|
gray_array = np.array(gray) |
|
|
|
|
|
non_black_pixels = np.sum(gray_array > 10) |
|
|
|
if non_black_pixels > max_non_black_area: |
|
max_non_black_area = non_black_pixels |
|
image_with_max_non_black_space = image |
|
|
|
except Exception as e: |
|
print(f"Warning: Unable to process image {img}: {e}") |
|
|
|
if image_with_max_non_black_space is not None: |
|
image_with_max_non_black_space.save(output_path) |
|
print(f"Saved image with most non-black space to {output_path}") |
|
|
|
if return_type == "path": |
|
return output_path |
|
else: |
|
return image_with_max_non_black_space |
|
return image_with_max_non_black_space |
|
|
|
def parse_srt_to_text(output_dir, topic_name): |
|
"""Convert SRT subtitle file to plain text. |
|
|
|
Args: |
|
output_dir (str): Directory containing the topic folders. |
|
topic_name (str): Name of the topic/video. |
|
""" |
|
topic_name = topic_name.replace(" ", "_").lower() |
|
srt_path = os.path.join(output_dir, topic_name, f"{topic_name}_combined.srt") |
|
txt_path = os.path.join(output_dir, topic_name, f"{topic_name}_combined.txt") |
|
subs = pysrt.open(srt_path) |
|
|
|
with open(txt_path, 'w') as f: |
|
full_text = "" |
|
for sub in subs: |
|
sub.text = sub.text.replace("...", ".") |
|
full_text += sub.text + " " |
|
f.write(full_text.strip()) |
|
|
|
def parse_srt_and_extract_frames(output_dir, topic_name): |
|
"""Extract frames from video at subtitle timestamps and save with corresponding text. |
|
|
|
Args: |
|
output_dir (str): Directory containing the topic folders. |
|
topic_name (str): Name of the topic/video. |
|
""" |
|
topic_name = topic_name.replace(" ", "_").lower() |
|
video_path = os.path.join(output_dir, topic_name, f"{topic_name}_combined.mp4") |
|
srt_path = os.path.join(output_dir, topic_name, f"{topic_name}_combined.srt") |
|
subs = pysrt.open(srt_path) |
|
|
|
|
|
images_dir = os.path.join(output_dir, topic_name, "extract_images") |
|
if os.path.exists(images_dir): |
|
shutil.rmtree(images_dir) |
|
os.makedirs(images_dir, exist_ok=True) |
|
|
|
|
|
video = VideoFileClip(video_path) |
|
|
|
|
|
pairs = {} |
|
|
|
i = 0 |
|
while i < len(subs): |
|
sub = subs[i] |
|
text = sub.text |
|
sub_indexes = [sub.index] |
|
|
|
|
|
while i < len(subs) - 1 and not text.strip().endswith('.'): |
|
i += 1 |
|
next_sub = subs[i] |
|
text += " " + next_sub.text |
|
sub_indexes.append(next_sub.index) |
|
|
|
|
|
end_time = sub.end.to_time() |
|
|
|
end_time_seconds = end_time.hour * 3600 + end_time.minute * 60 + end_time.second + end_time.microsecond / 1e6 |
|
|
|
|
|
frame_path = os.path.join(images_dir, f"{sub.index}.jpg") |
|
video.save_frame(frame_path, t=end_time_seconds) |
|
|
|
|
|
text_path = os.path.join(images_dir, f"{sub.index}.txt") |
|
with open(text_path, 'w') as f: |
|
f.write(text) |
|
|
|
|
|
pairs[str(sub.index)] = { |
|
"image_path": f"{sub.index}.jpg", |
|
"text": text, |
|
"text_path": f"{sub.index}.txt", |
|
"srt_index": sub_indexes, |
|
} |
|
|
|
i += 1 |
|
|
|
|
|
import json |
|
json_path = os.path.join(images_dir, "pairs.json") |
|
with open(json_path, 'w') as f: |
|
json.dump(pairs, f, indent=4) |
|
|
|
|
|
video.close() |
|
|
|
def extract_trasnscript(video_path): |
|
"""Extract transcript from video audio using Google Speech Recognition. |
|
|
|
Args: |
|
video_path (str): Path to the video file. |
|
|
|
Returns: |
|
str: Transcribed text from the video audio. |
|
|
|
Raises: |
|
FileNotFoundError: If video file does not exist. |
|
""" |
|
if not os.path.exists(video_path): |
|
raise FileNotFoundError(f"Video file not found: {video_path}") |
|
|
|
clip = VideoFileClip(video_path) |
|
|
|
|
|
audio_path = os.path.join(os.path.dirname(video_path), "audio.wav") |
|
clip.audio.write_audiofile(audio_path) |
|
|
|
try: |
|
|
|
recognizer = sr.Recognizer() |
|
with sr.AudioFile(audio_path) as source: |
|
audio = recognizer.record(source) |
|
return recognizer.recognize_google(audio) |
|
finally: |
|
|
|
if os.path.exists(audio_path): |
|
os.remove(audio_path) |
|
|
|
if __name__ == "__main__": |
|
import argparse |
|
|
|
def process_all_topics(output_folder): |
|
"""Process all topic folders in the output directory. |
|
|
|
Args: |
|
output_folder (str): Directory containing the topic folders. |
|
""" |
|
|
|
topics = [d for d in os.listdir(output_folder) |
|
if os.path.isdir(os.path.join(output_folder, d))] |
|
|
|
for topic in topics: |
|
print(f"\nProcessing topic: {topic}") |
|
try: |
|
parse_srt_to_text(output_folder, topic) |
|
parse_srt_and_extract_frames(output_folder, topic) |
|
except Exception as e: |
|
print(f"Error processing {topic}: {str(e)}") |
|
continue |
|
|
|
|
|
parser = argparse.ArgumentParser(description='Process video files and extract frames with subtitles') |
|
parser.add_argument('--output_dir', type=str, default="output", |
|
help='Directory containing the topic folders') |
|
|
|
args = parser.parse_args() |
|
|
|
|
|
process_all_topics(args.output_dir) |