import os import pysrt from moviepy import VideoFileClip import shutil from PIL import Image, ImageOps import numpy as np import speech_recognition as sr def get_images_from_video(video_path, fps=0.2): """Extract frames from a video file at specified FPS. Args: video_path (str): Path to the video file. fps (float, optional): Frames per second to extract. Defaults to 0.2. Returns: list: List of frames as numpy arrays. """ clip = VideoFileClip(video_path) images = clip.iter_frames(fps=fps) return images def image_with_most_non_black_space(images, output_path, return_type="path"): """Find and save the image with the most non-black space from a list of images. Args: images (list): List of image file paths, PIL Image objects, or numpy arrays. output_path (str): Path where the output image should be saved. return_type (str, optional): Type of return value - "path" or "image". Defaults to "path". Returns: Union[str, PIL.Image, None]: Path to saved image, PIL Image object, or None if no valid image found. """ max_non_black_area = 0 image_with_max_non_black_space = None for img in images: try: # If img is a path, open the image if isinstance(img, str): image = Image.open(img) elif isinstance(img, Image.Image): image = img elif isinstance(img, np.ndarray): image = Image.fromarray(img) else: print(f"Unsupported type: {type(img)}. Skipping.") continue # Convert to grayscale gray = ImageOps.grayscale(image) # Convert to numpy array gray_array = np.array(gray) # Count non-black pixels (threshold to consider near-black as black) non_black_pixels = np.sum(gray_array > 10) # Threshold 10 to account for slight variations in black if non_black_pixels > max_non_black_area: max_non_black_area = non_black_pixels image_with_max_non_black_space = image except Exception as e: print(f"Warning: Unable to process image {img}: {e}") if image_with_max_non_black_space is not None: image_with_max_non_black_space.save(output_path) print(f"Saved image with most non-black space to {output_path}") if return_type == "path": return output_path else: return image_with_max_non_black_space return image_with_max_non_black_space def parse_srt_to_text(output_dir, topic_name): """Convert SRT subtitle file to plain text. Args: output_dir (str): Directory containing the topic folders. topic_name (str): Name of the topic/video. """ topic_name = topic_name.replace(" ", "_").lower() srt_path = os.path.join(output_dir, topic_name, f"{topic_name}_combined.srt") txt_path = os.path.join(output_dir, topic_name, f"{topic_name}_combined.txt") subs = pysrt.open(srt_path) with open(txt_path, 'w') as f: full_text = "" for sub in subs: sub.text = sub.text.replace("...", ".") full_text += sub.text + " " f.write(full_text.strip()) def parse_srt_and_extract_frames(output_dir, topic_name): """Extract frames from video at subtitle timestamps and save with corresponding text. Args: output_dir (str): Directory containing the topic folders. topic_name (str): Name of the topic/video. """ topic_name = topic_name.replace(" ", "_").lower() video_path = os.path.join(output_dir, topic_name, f"{topic_name}_combined.mp4") srt_path = os.path.join(output_dir, topic_name, f"{topic_name}_combined.srt") subs = pysrt.open(srt_path) # Create extract_images folder if it doesn't exist images_dir = os.path.join(output_dir, topic_name, "extract_images") if os.path.exists(images_dir): shutil.rmtree(images_dir) os.makedirs(images_dir, exist_ok=True) # Load the video file video = VideoFileClip(video_path) # Dictionary to store image-text pairs pairs = {} i = 0 while i < len(subs): sub = subs[i] text = sub.text sub_indexes = [sub.index] # Check if we need to concatenate with next subtitle while i < len(subs) - 1 and not text.strip().endswith('.'): i += 1 next_sub = subs[i] text += " " + next_sub.text sub_indexes.append(next_sub.index) # Get the end time of the last concatenated subtitle end_time = sub.end.to_time() # Convert end time to seconds end_time_seconds = end_time.hour * 3600 + end_time.minute * 60 + end_time.second + end_time.microsecond / 1e6 # Save the frame as an image in extract_images folder frame_path = os.path.join(images_dir, f"{sub.index}.jpg") video.save_frame(frame_path, t=end_time_seconds) # Save the subtitle text to a txt file text_path = os.path.join(images_dir, f"{sub.index}.txt") with open(text_path, 'w') as f: f.write(text) # Add pair to dictionary pairs[str(sub.index)] = { "image_path": f"{sub.index}.jpg", "text": text, "text_path": f"{sub.index}.txt", "srt_index": sub_indexes, } i += 1 # Save pairs to json file import json json_path = os.path.join(images_dir, "pairs.json") with open(json_path, 'w') as f: json.dump(pairs, f, indent=4) # Close the video file video.close() def extract_trasnscript(video_path): """Extract transcript from video audio using Google Speech Recognition. Args: video_path (str): Path to the video file. Returns: str: Transcribed text from the video audio. Raises: FileNotFoundError: If video file does not exist. """ if not os.path.exists(video_path): raise FileNotFoundError(f"Video file not found: {video_path}") clip = VideoFileClip(video_path) # write the video to a temporary audio file audio_path = os.path.join(os.path.dirname(video_path), "audio.wav") clip.audio.write_audiofile(audio_path) try: # extract the subtitles from the audio file recognizer = sr.Recognizer() with sr.AudioFile(audio_path) as source: audio = recognizer.record(source) return recognizer.recognize_google(audio) finally: # clean up the temporary audio file if os.path.exists(audio_path): os.remove(audio_path) if __name__ == "__main__": import argparse def process_all_topics(output_folder): """Process all topic folders in the output directory. Args: output_folder (str): Directory containing the topic folders. """ # Only get immediate subdirectories topics = [d for d in os.listdir(output_folder) if os.path.isdir(os.path.join(output_folder, d))] for topic in topics: print(f"\nProcessing topic: {topic}") try: parse_srt_to_text(output_folder, topic) parse_srt_and_extract_frames(output_folder, topic) except Exception as e: print(f"Error processing {topic}: {str(e)}") continue # Set up argument parser parser = argparse.ArgumentParser(description='Process video files and extract frames with subtitles') parser.add_argument('--output_dir', type=str, default="output", help='Directory containing the topic folders') args = parser.parse_args() # Process topics using provided output directory process_all_topics(args.output_dir)