import re import os import torch import shutil import subprocess import folder_paths from torch import Tensor from collections.abc import Mapping audio_extensions = ['mp3', 'mp4', 'wav', 'ogg'] video_extensions = ['webm', 'mp4', 'mov'] def ffmpeg_suitability(path): try: version = subprocess.run([path, "-version"], check=True, capture_output=True).stdout.decode("utf-8") except: return 0 score = 0 #rough layout of the importance of various features simple_criterion = [("libvpx", 20),("264",10), ("265",3), ("svtav1",5),("libopus", 1)] for criterion in simple_criterion: if version.find(criterion[0]) >= 0: score += criterion[1] #obtain rough compile year from copyright information copyright_index = version.find('2000-2') if copyright_index >= 0: copyright_year = version[copyright_index+6:copyright_index+9] if copyright_year.isnumeric(): score += int(copyright_year) return score folder_paths.folder_names_and_paths["VHS_video_formats"] = ( [ os.path.join(os.path.dirname(os.path.abspath(__file__)), "video_formats"), ], [".json"] ) if "VHS_FORCE_FFMPEG_PATH" in os.environ: ffmpeg_path = os.environ.get("VHS_FORCE_FFMPEG_PATH") else: ffmpeg_paths = [] try: from imageio_ffmpeg import get_ffmpeg_exe imageio_ffmpeg_path = get_ffmpeg_exe() ffmpeg_paths.append(imageio_ffmpeg_path) except: if "VHS_USE_IMAGEIO_FFMPEG" in os.environ: raise print("Failed to import imageio_ffmpeg") if "VHS_USE_IMAGEIO_FFMPEG" in os.environ: ffmpeg_path = imageio_ffmpeg_path else: system_ffmpeg = shutil.which("ffmpeg") if system_ffmpeg is not None: ffmpeg_paths.append(system_ffmpeg) if os.path.isfile("ffmpeg"): ffmpeg_paths.append(os.path.abspath("ffmpeg")) if os.path.isfile("ffmpeg.exe"): ffmpeg_paths.append(os.path.abspath("ffmpeg.exe")) if len(ffmpeg_paths) == 0: print("No valid ffmpeg found.") ffmpeg_path = None elif len(ffmpeg_paths) == 1: #Evaluation of suitability isn't required, can take sole option #to reduce startup time ffmpeg_path = ffmpeg_paths[0] else: ffmpeg_path = max(ffmpeg_paths, key=ffmpeg_suitability) gifski_path = os.environ.get("VHS_GIFSKI", None) if gifski_path is None: gifski_path = os.environ.get("JOV_GIFSKI", None) if gifski_path is None: gifski_path = shutil.which("gifski") ytdl_path = os.environ.get("VHS_YTDL", None) or shutil.which('yt-dlp') \ or shutil.which('youtube-dl') def get_audio(file, start_time=0, duration=0): args = [ffmpeg_path, "-i", file] if start_time > 0: args += ["-ss", str(start_time)] if duration > 0: args += ["-t", str(duration)] try: #TODO: scan for sample rate and maintain res = subprocess.run(args + ["-f", "f32le", "-"], capture_output=True, check=True) audio = torch.frombuffer(bytearray(res.stdout), dtype=torch.float32) match = re.search(', (\\d+) Hz, (\\w+), ',res.stderr.decode('utf-8')) except subprocess.CalledProcessError as e: raise Exception(f"VHS failed to extract audio from {file}:\n" \ + e.stderr.decode("utf-8")) if match: ar = int(match.group(1)) #NOTE: Just throwing an error for other channel types right now #Will deal with issues if they come ac = {"mono": 1, "stereo": 2}[match.group(2)] else: ar = 44100 ac = 2 audio = audio.reshape((-1,ac)).transpose(0,1).unsqueeze(0) return {'waveform': audio, 'sample_rate': ar} class LazyAudioMap(Mapping): def __init__(self, file, start_time, duration): self.file = file self.start_time=start_time self.duration=duration self._dict=None def __getitem__(self, key): if self._dict is None: self._dict = get_audio(self.file, self.start_time, self.duration) return self._dict[key] def __iter__(self): if self._dict is None: self._dict = get_audio(self.file, self.start_time, self.duration) return iter(self._dict) def __len__(self): if self._dict is None: self._dict = get_audio(self.file, self.start_time, self.duration) return len(self._dict) def lazy_get_audio(file, start_time=0, duration=0): return LazyAudioMap(file, start_time, duration)