|
import shutil |
|
import subprocess |
|
from pathlib import Path |
|
from typing import Literal |
|
|
|
import numpy as np |
|
|
|
VideoCodec = Literal["h264", "vp9", "gif"] |
|
|
|
|
|
def _check_ffmpeg_installed() -> None: |
|
"""Raise an error if ffmpeg is not available on the system PATH.""" |
|
if shutil.which("ffmpeg") is None: |
|
raise RuntimeError( |
|
"ffmpeg is required to write video but was not found on your system. " |
|
"Please install ffmpeg and ensure it is available on your PATH." |
|
) |
|
|
|
|
|
def _check_array_format(video: np.ndarray) -> None: |
|
"""Raise an error if the array is not in the expected format.""" |
|
if not (video.ndim == 4 and video.shape[-1] == 3): |
|
raise ValueError( |
|
f"Expected RGB input shaped (F, H, W, 3), got {video.shape}. " |
|
f"Input has {video.ndim} dimensions, expected 4." |
|
) |
|
if video.dtype != np.uint8: |
|
raise TypeError( |
|
f"Expected dtype=uint8, got {video.dtype}. " |
|
"Please convert your video data to uint8 format." |
|
) |
|
|
|
|
|
def _check_path(file_path: str | Path) -> None: |
|
"""Raise an error if the parent directory does not exist.""" |
|
file_path = Path(file_path) |
|
if not file_path.parent.exists(): |
|
try: |
|
file_path.parent.mkdir(parents=True, exist_ok=True) |
|
except OSError as e: |
|
raise ValueError( |
|
f"Failed to create parent directory {file_path.parent}: {e}" |
|
) |
|
|
|
|
|
def write_video( |
|
file_path: str | Path, video: np.ndarray, fps: float, codec: VideoCodec |
|
) -> None: |
|
"""RGB uint8 only, shape (F, H, W, 3).""" |
|
_check_ffmpeg_installed() |
|
_check_path(file_path) |
|
|
|
if codec not in {"h264", "vp9", "gif"}: |
|
raise ValueError("Unsupported codec. Use h264, vp9, or gif.") |
|
|
|
arr = np.asarray(video) |
|
_check_array_format(arr) |
|
|
|
frames = np.ascontiguousarray(arr) |
|
_, height, width, _ = frames.shape |
|
out_path = str(file_path) |
|
|
|
cmd = [ |
|
"ffmpeg", |
|
"-y", |
|
"-f", |
|
"rawvideo", |
|
"-s", |
|
f"{width}x{height}", |
|
"-pix_fmt", |
|
"rgb24", |
|
"-r", |
|
str(fps), |
|
"-i", |
|
"-", |
|
"-an", |
|
] |
|
|
|
if codec == "gif": |
|
video_filter = "split[s0][s1];[s0]palettegen[p];[s1][p]paletteuse" |
|
cmd += [ |
|
"-vf", |
|
video_filter, |
|
"-loop", |
|
"0", |
|
] |
|
elif codec == "h264": |
|
cmd += [ |
|
"-vcodec", |
|
"libx264", |
|
"-pix_fmt", |
|
"yuv420p", |
|
"-movflags", |
|
"+faststart", |
|
] |
|
elif codec == "vp9": |
|
bpp = 0.08 |
|
bps = int(width * height * fps * bpp) |
|
if bps >= 1_000_000: |
|
bitrate = f"{round(bps / 1_000_000)}M" |
|
elif bps >= 1_000: |
|
bitrate = f"{round(bps / 1_000)}k" |
|
else: |
|
bitrate = str(max(bps, 1)) |
|
cmd += [ |
|
"-vcodec", |
|
"libvpx-vp9", |
|
"-b:v", |
|
bitrate, |
|
"-pix_fmt", |
|
"yuv420p", |
|
] |
|
cmd += [out_path] |
|
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE) |
|
try: |
|
for frame in frames: |
|
proc.stdin.write(frame.tobytes()) |
|
finally: |
|
if proc.stdin: |
|
proc.stdin.close() |
|
stderr = ( |
|
proc.stderr.read().decode("utf-8", errors="ignore") if proc.stderr else "" |
|
) |
|
ret = proc.wait() |
|
if ret != 0: |
|
raise RuntimeError(f"ffmpeg failed with code {ret}\n{stderr}") |
|
|