t2m / src /core /parse_video.py
thanhkt's picture
Upload 75 files
9b5ca29 verified
import os
import pysrt
from moviepy import VideoFileClip
import shutil
from PIL import Image, ImageOps
import numpy as np
import speech_recognition as sr
def get_images_from_video(video_path, fps=0.2):
"""Extract frames from a video file at specified FPS.
Args:
video_path (str): Path to the video file.
fps (float, optional): Frames per second to extract. Defaults to 0.2.
Returns:
list: List of frames as numpy arrays.
"""
clip = VideoFileClip(video_path)
images = clip.iter_frames(fps=fps)
return images
def image_with_most_non_black_space(images, output_path, return_type="path"):
"""Find and save the image with the most non-black space from a list of images.
Args:
images (list): List of image file paths, PIL Image objects, or numpy arrays.
output_path (str): Path where the output image should be saved.
return_type (str, optional): Type of return value - "path" or "image". Defaults to "path".
Returns:
Union[str, PIL.Image, None]: Path to saved image, PIL Image object, or None if no valid image found.
"""
max_non_black_area = 0
image_with_max_non_black_space = None
for img in images:
try:
# If img is a path, open the image
if isinstance(img, str):
image = Image.open(img)
elif isinstance(img, Image.Image):
image = img
elif isinstance(img, np.ndarray):
image = Image.fromarray(img)
else:
print(f"Unsupported type: {type(img)}. Skipping.")
continue
# Convert to grayscale
gray = ImageOps.grayscale(image)
# Convert to numpy array
gray_array = np.array(gray)
# Count non-black pixels (threshold to consider near-black as black)
non_black_pixels = np.sum(gray_array > 10) # Threshold 10 to account for slight variations in black
if non_black_pixels > max_non_black_area:
max_non_black_area = non_black_pixels
image_with_max_non_black_space = image
except Exception as e:
print(f"Warning: Unable to process image {img}: {e}")
if image_with_max_non_black_space is not None:
image_with_max_non_black_space.save(output_path)
print(f"Saved image with most non-black space to {output_path}")
if return_type == "path":
return output_path
else:
return image_with_max_non_black_space
return image_with_max_non_black_space
def parse_srt_to_text(output_dir, topic_name):
"""Convert SRT subtitle file to plain text.
Args:
output_dir (str): Directory containing the topic folders.
topic_name (str): Name of the topic/video.
"""
topic_name = topic_name.replace(" ", "_").lower()
srt_path = os.path.join(output_dir, topic_name, f"{topic_name}_combined.srt")
txt_path = os.path.join(output_dir, topic_name, f"{topic_name}_combined.txt")
subs = pysrt.open(srt_path)
with open(txt_path, 'w') as f:
full_text = ""
for sub in subs:
sub.text = sub.text.replace("...", ".")
full_text += sub.text + " "
f.write(full_text.strip())
def parse_srt_and_extract_frames(output_dir, topic_name):
"""Extract frames from video at subtitle timestamps and save with corresponding text.
Args:
output_dir (str): Directory containing the topic folders.
topic_name (str): Name of the topic/video.
"""
topic_name = topic_name.replace(" ", "_").lower()
video_path = os.path.join(output_dir, topic_name, f"{topic_name}_combined.mp4")
srt_path = os.path.join(output_dir, topic_name, f"{topic_name}_combined.srt")
subs = pysrt.open(srt_path)
# Create extract_images folder if it doesn't exist
images_dir = os.path.join(output_dir, topic_name, "extract_images")
if os.path.exists(images_dir):
shutil.rmtree(images_dir)
os.makedirs(images_dir, exist_ok=True)
# Load the video file
video = VideoFileClip(video_path)
# Dictionary to store image-text pairs
pairs = {}
i = 0
while i < len(subs):
sub = subs[i]
text = sub.text
sub_indexes = [sub.index]
# Check if we need to concatenate with next subtitle
while i < len(subs) - 1 and not text.strip().endswith('.'):
i += 1
next_sub = subs[i]
text += " " + next_sub.text
sub_indexes.append(next_sub.index)
# Get the end time of the last concatenated subtitle
end_time = sub.end.to_time()
# Convert end time to seconds
end_time_seconds = end_time.hour * 3600 + end_time.minute * 60 + end_time.second + end_time.microsecond / 1e6
# Save the frame as an image in extract_images folder
frame_path = os.path.join(images_dir, f"{sub.index}.jpg")
video.save_frame(frame_path, t=end_time_seconds)
# Save the subtitle text to a txt file
text_path = os.path.join(images_dir, f"{sub.index}.txt")
with open(text_path, 'w') as f:
f.write(text)
# Add pair to dictionary
pairs[str(sub.index)] = {
"image_path": f"{sub.index}.jpg",
"text": text,
"text_path": f"{sub.index}.txt",
"srt_index": sub_indexes,
}
i += 1
# Save pairs to json file
import json
json_path = os.path.join(images_dir, "pairs.json")
with open(json_path, 'w') as f:
json.dump(pairs, f, indent=4)
# Close the video file
video.close()
def extract_trasnscript(video_path):
"""Extract transcript from video audio using Google Speech Recognition.
Args:
video_path (str): Path to the video file.
Returns:
str: Transcribed text from the video audio.
Raises:
FileNotFoundError: If video file does not exist.
"""
if not os.path.exists(video_path):
raise FileNotFoundError(f"Video file not found: {video_path}")
clip = VideoFileClip(video_path)
# write the video to a temporary audio file
audio_path = os.path.join(os.path.dirname(video_path), "audio.wav")
clip.audio.write_audiofile(audio_path)
try:
# extract the subtitles from the audio file
recognizer = sr.Recognizer()
with sr.AudioFile(audio_path) as source:
audio = recognizer.record(source)
return recognizer.recognize_google(audio)
finally:
# clean up the temporary audio file
if os.path.exists(audio_path):
os.remove(audio_path)
if __name__ == "__main__":
import argparse
def process_all_topics(output_folder):
"""Process all topic folders in the output directory.
Args:
output_folder (str): Directory containing the topic folders.
"""
# Only get immediate subdirectories
topics = [d for d in os.listdir(output_folder)
if os.path.isdir(os.path.join(output_folder, d))]
for topic in topics:
print(f"\nProcessing topic: {topic}")
try:
parse_srt_to_text(output_folder, topic)
parse_srt_and_extract_frames(output_folder, topic)
except Exception as e:
print(f"Error processing {topic}: {str(e)}")
continue
# Set up argument parser
parser = argparse.ArgumentParser(description='Process video files and extract frames with subtitles')
parser.add_argument('--output_dir', type=str, default="output",
help='Directory containing the topic folders')
args = parser.parse_args()
# Process topics using provided output directory
process_all_topics(args.output_dir)