File size: 7,932 Bytes
9b5ca29 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
import os
import pysrt
from moviepy import VideoFileClip
import shutil
from PIL import Image, ImageOps
import numpy as np
import speech_recognition as sr
def get_images_from_video(video_path, fps=0.2):
"""Extract frames from a video file at specified FPS.
Args:
video_path (str): Path to the video file.
fps (float, optional): Frames per second to extract. Defaults to 0.2.
Returns:
list: List of frames as numpy arrays.
"""
clip = VideoFileClip(video_path)
images = clip.iter_frames(fps=fps)
return images
def image_with_most_non_black_space(images, output_path, return_type="path"):
"""Find and save the image with the most non-black space from a list of images.
Args:
images (list): List of image file paths, PIL Image objects, or numpy arrays.
output_path (str): Path where the output image should be saved.
return_type (str, optional): Type of return value - "path" or "image". Defaults to "path".
Returns:
Union[str, PIL.Image, None]: Path to saved image, PIL Image object, or None if no valid image found.
"""
max_non_black_area = 0
image_with_max_non_black_space = None
for img in images:
try:
# If img is a path, open the image
if isinstance(img, str):
image = Image.open(img)
elif isinstance(img, Image.Image):
image = img
elif isinstance(img, np.ndarray):
image = Image.fromarray(img)
else:
print(f"Unsupported type: {type(img)}. Skipping.")
continue
# Convert to grayscale
gray = ImageOps.grayscale(image)
# Convert to numpy array
gray_array = np.array(gray)
# Count non-black pixels (threshold to consider near-black as black)
non_black_pixels = np.sum(gray_array > 10) # Threshold 10 to account for slight variations in black
if non_black_pixels > max_non_black_area:
max_non_black_area = non_black_pixels
image_with_max_non_black_space = image
except Exception as e:
print(f"Warning: Unable to process image {img}: {e}")
if image_with_max_non_black_space is not None:
image_with_max_non_black_space.save(output_path)
print(f"Saved image with most non-black space to {output_path}")
if return_type == "path":
return output_path
else:
return image_with_max_non_black_space
return image_with_max_non_black_space
def parse_srt_to_text(output_dir, topic_name):
"""Convert SRT subtitle file to plain text.
Args:
output_dir (str): Directory containing the topic folders.
topic_name (str): Name of the topic/video.
"""
topic_name = topic_name.replace(" ", "_").lower()
srt_path = os.path.join(output_dir, topic_name, f"{topic_name}_combined.srt")
txt_path = os.path.join(output_dir, topic_name, f"{topic_name}_combined.txt")
subs = pysrt.open(srt_path)
with open(txt_path, 'w') as f:
full_text = ""
for sub in subs:
sub.text = sub.text.replace("...", ".")
full_text += sub.text + " "
f.write(full_text.strip())
def parse_srt_and_extract_frames(output_dir, topic_name):
"""Extract frames from video at subtitle timestamps and save with corresponding text.
Args:
output_dir (str): Directory containing the topic folders.
topic_name (str): Name of the topic/video.
"""
topic_name = topic_name.replace(" ", "_").lower()
video_path = os.path.join(output_dir, topic_name, f"{topic_name}_combined.mp4")
srt_path = os.path.join(output_dir, topic_name, f"{topic_name}_combined.srt")
subs = pysrt.open(srt_path)
# Create extract_images folder if it doesn't exist
images_dir = os.path.join(output_dir, topic_name, "extract_images")
if os.path.exists(images_dir):
shutil.rmtree(images_dir)
os.makedirs(images_dir, exist_ok=True)
# Load the video file
video = VideoFileClip(video_path)
# Dictionary to store image-text pairs
pairs = {}
i = 0
while i < len(subs):
sub = subs[i]
text = sub.text
sub_indexes = [sub.index]
# Check if we need to concatenate with next subtitle
while i < len(subs) - 1 and not text.strip().endswith('.'):
i += 1
next_sub = subs[i]
text += " " + next_sub.text
sub_indexes.append(next_sub.index)
# Get the end time of the last concatenated subtitle
end_time = sub.end.to_time()
# Convert end time to seconds
end_time_seconds = end_time.hour * 3600 + end_time.minute * 60 + end_time.second + end_time.microsecond / 1e6
# Save the frame as an image in extract_images folder
frame_path = os.path.join(images_dir, f"{sub.index}.jpg")
video.save_frame(frame_path, t=end_time_seconds)
# Save the subtitle text to a txt file
text_path = os.path.join(images_dir, f"{sub.index}.txt")
with open(text_path, 'w') as f:
f.write(text)
# Add pair to dictionary
pairs[str(sub.index)] = {
"image_path": f"{sub.index}.jpg",
"text": text,
"text_path": f"{sub.index}.txt",
"srt_index": sub_indexes,
}
i += 1
# Save pairs to json file
import json
json_path = os.path.join(images_dir, "pairs.json")
with open(json_path, 'w') as f:
json.dump(pairs, f, indent=4)
# Close the video file
video.close()
def extract_trasnscript(video_path):
"""Extract transcript from video audio using Google Speech Recognition.
Args:
video_path (str): Path to the video file.
Returns:
str: Transcribed text from the video audio.
Raises:
FileNotFoundError: If video file does not exist.
"""
if not os.path.exists(video_path):
raise FileNotFoundError(f"Video file not found: {video_path}")
clip = VideoFileClip(video_path)
# write the video to a temporary audio file
audio_path = os.path.join(os.path.dirname(video_path), "audio.wav")
clip.audio.write_audiofile(audio_path)
try:
# extract the subtitles from the audio file
recognizer = sr.Recognizer()
with sr.AudioFile(audio_path) as source:
audio = recognizer.record(source)
return recognizer.recognize_google(audio)
finally:
# clean up the temporary audio file
if os.path.exists(audio_path):
os.remove(audio_path)
if __name__ == "__main__":
import argparse
def process_all_topics(output_folder):
"""Process all topic folders in the output directory.
Args:
output_folder (str): Directory containing the topic folders.
"""
# Only get immediate subdirectories
topics = [d for d in os.listdir(output_folder)
if os.path.isdir(os.path.join(output_folder, d))]
for topic in topics:
print(f"\nProcessing topic: {topic}")
try:
parse_srt_to_text(output_folder, topic)
parse_srt_and_extract_frames(output_folder, topic)
except Exception as e:
print(f"Error processing {topic}: {str(e)}")
continue
# Set up argument parser
parser = argparse.ArgumentParser(description='Process video files and extract frames with subtitles')
parser.add_argument('--output_dir', type=str, default="output",
help='Directory containing the topic folders')
args = parser.parse_args()
# Process topics using provided output directory
process_all_topics(args.output_dir) |