Spaces:
Runtime error
Runtime error
import glob | |
import mimetypes | |
import os | |
import platform | |
import shutil | |
import ssl | |
import subprocess | |
import urllib | |
from pathlib import Path | |
from typing import List, Any | |
from tqdm import tqdm | |
import modules.globals | |
TEMP_FILE = "temp.mp4" | |
TEMP_DIRECTORY = "temp" | |
# monkey patch ssl for mac | |
if platform.system().lower() == "darwin": | |
ssl._create_default_https_context = ssl._create_unverified_context | |
def run_ffmpeg(args: List[str]) -> bool: | |
commands = [ | |
"ffmpeg", | |
"-hide_banner", | |
"-hwaccel", | |
"auto", | |
"-loglevel", | |
modules.globals.log_level, | |
] | |
commands.extend(args) | |
try: | |
subprocess.check_output(commands, stderr=subprocess.STDOUT) | |
return True | |
except Exception: | |
pass | |
return False | |
def detect_fps(target_path: str) -> float: | |
command = [ | |
"ffprobe", | |
"-v", | |
"error", | |
"-select_streams", | |
"v:0", | |
"-show_entries", | |
"stream=r_frame_rate", | |
"-of", | |
"default=noprint_wrappers=1:nokey=1", | |
target_path, | |
] | |
output = subprocess.check_output(command).decode().strip().split("/") | |
try: | |
numerator, denominator = map(int, output) | |
return numerator / denominator | |
except Exception: | |
pass | |
return 30.0 | |
def extract_frames(target_path: str) -> None: | |
temp_directory_path = get_temp_directory_path(target_path) | |
run_ffmpeg( | |
[ | |
"-i", | |
target_path, | |
"-pix_fmt", | |
"rgb24", | |
os.path.join(temp_directory_path, "%04d.png"), | |
] | |
) | |
def create_video(target_path: str, fps: float = 30.0) -> None: | |
temp_output_path = get_temp_output_path(target_path) | |
temp_directory_path = get_temp_directory_path(target_path) | |
run_ffmpeg( | |
[ | |
"-r", | |
str(fps), | |
"-i", | |
os.path.join(temp_directory_path, "%04d.png"), | |
"-c:v", | |
modules.globals.video_encoder, | |
"-crf", | |
str(modules.globals.video_quality), | |
"-pix_fmt", | |
"yuv420p", | |
"-vf", | |
"colorspace=bt709:iall=bt601-6-625:fast=1", | |
"-y", | |
temp_output_path, | |
] | |
) | |
def restore_audio(target_path: str, output_path: str) -> None: | |
temp_output_path = get_temp_output_path(target_path) | |
done = run_ffmpeg( | |
[ | |
"-i", | |
temp_output_path, | |
"-i", | |
target_path, | |
"-c:v", | |
"copy", | |
"-map", | |
"0:v:0", | |
"-map", | |
"1:a:0", | |
"-y", | |
output_path, | |
] | |
) | |
if not done: | |
move_temp(target_path, output_path) | |
def get_temp_frame_paths(target_path: str) -> List[str]: | |
temp_directory_path = get_temp_directory_path(target_path) | |
return glob.glob((os.path.join(glob.escape(temp_directory_path), "*.png"))) | |
def get_temp_directory_path(target_path: str) -> str: | |
target_name, _ = os.path.splitext(os.path.basename(target_path)) | |
target_directory_path = os.path.dirname(target_path) | |
return os.path.join(target_directory_path, TEMP_DIRECTORY, target_name) | |
def get_temp_output_path(target_path: str) -> str: | |
temp_directory_path = get_temp_directory_path(target_path) | |
return os.path.join(temp_directory_path, TEMP_FILE) | |
def normalize_output_path(source_path: str, target_path: str, output_path: str) -> Any: | |
if source_path and target_path: | |
source_name, _ = os.path.splitext(os.path.basename(source_path)) | |
target_name, target_extension = os.path.splitext(os.path.basename(target_path)) | |
if os.path.isdir(output_path): | |
return os.path.join( | |
output_path, source_name + "-" + target_name + target_extension | |
) | |
return output_path | |
def create_temp(target_path: str) -> None: | |
temp_directory_path = get_temp_directory_path(target_path) | |
Path(temp_directory_path).mkdir(parents=True, exist_ok=True) | |
def move_temp(target_path: str, output_path: str) -> None: | |
temp_output_path = get_temp_output_path(target_path) | |
if os.path.isfile(temp_output_path): | |
if os.path.isfile(output_path): | |
os.remove(output_path) | |
shutil.move(temp_output_path, output_path) | |
def clean_temp(target_path: str) -> None: | |
temp_directory_path = get_temp_directory_path(target_path) | |
parent_directory_path = os.path.dirname(temp_directory_path) | |
if not modules.globals.keep_frames and os.path.isdir(temp_directory_path): | |
shutil.rmtree(temp_directory_path) | |
if os.path.exists(parent_directory_path) and not os.listdir(parent_directory_path): | |
os.rmdir(parent_directory_path) | |
def has_image_extension(image_path: str) -> bool: | |
return image_path.lower().endswith(("png", "jpg", "jpeg")) | |
def is_image(image_path: str) -> bool: | |
if image_path and os.path.isfile(image_path): | |
mimetype, _ = mimetypes.guess_type(image_path) | |
return bool(mimetype and mimetype.startswith("image/")) | |
return False | |
def is_video(video_path: str) -> bool: | |
if video_path and os.path.isfile(video_path): | |
mimetype, _ = mimetypes.guess_type(video_path) | |
return bool(mimetype and mimetype.startswith("video/")) | |
return False | |
def conditional_download(download_directory_path: str, urls: List[str]) -> None: | |
if not os.path.exists(download_directory_path): | |
os.makedirs(download_directory_path) | |
for url in urls: | |
download_file_path = os.path.join( | |
download_directory_path, os.path.basename(url) | |
) | |
if not os.path.exists(download_file_path): | |
request = urllib.request.urlopen(url) # type: ignore[attr-defined] | |
total = int(request.headers.get("Content-Length", 0)) | |
with tqdm( | |
total=total, | |
desc="Downloading", | |
unit="B", | |
unit_scale=True, | |
unit_divisor=1024, | |
) as progress: | |
urllib.request.urlretrieve(url, download_file_path, reporthook=lambda count, block_size, total_size: progress.update(block_size)) # type: ignore[attr-defined] | |
def resolve_relative_path(path: str) -> str: | |
return os.path.abspath(os.path.join(os.path.dirname(__file__), path)) | |