|
import os |
|
|
|
os.environ["HF_HOME"] = "/tmp/huggingface" |
|
os.environ["TRANSFORMERS_CACHE"] = "/tmp/huggingface/transformers" |
|
os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib" |
|
|
|
|
|
|
|
import sys |
|
def patch_gradio_utils(): |
|
try: |
|
from gradio_client import utils |
|
original_get_type = utils.get_type |
|
|
|
def patched_get_type(schema): |
|
if isinstance(schema, bool): |
|
return "boolean" |
|
if not isinstance(schema, dict): |
|
return "any" |
|
return original_get_type(schema) |
|
|
|
utils.get_type = patched_get_type |
|
print("Successfully patched Gradio utils.get_type") |
|
except Exception as e: |
|
print(f"Could not patch Gradio utils: {e}") |
|
|
|
patch_gradio_utils() |
|
|
|
import gc |
|
import torch |
|
import cv2 |
|
import gradio as gr |
|
print("📦 Gradio version:", gr.__version__) |
|
import numpy as np |
|
import matplotlib.cm as cm |
|
import matplotlib |
|
import subprocess |
|
|
|
from video_depth_anything.video_depth import VideoDepthAnything |
|
from utils.dc_utils import read_video_frames, save_video |
|
from huggingface_hub import hf_hub_download |
|
|
|
|
|
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu' |
|
|
|
|
|
model_configs = { |
|
'vits': {'encoder': 'vits', 'features': 64, 'out_channels': [48, 96, 192, 384]}, |
|
'vitl': {'encoder': 'vitl', 'features': 256, 'out_channels': [256, 512, 1024, 1024]}, |
|
} |
|
encoder2name = { |
|
'vits': 'Small', |
|
'vitl': 'Large', |
|
} |
|
encoder = 'vitl' |
|
model_name = encoder2name[encoder] |
|
|
|
|
|
video_depth_anything = VideoDepthAnything(**model_configs[encoder]) |
|
|
|
filepath = hf_hub_download( |
|
repo_id=f"depth-anything/Video-Depth-Anything-{model_name}", |
|
filename=f"video_depth_anything_{encoder}.pth", |
|
repo_type="model", |
|
cache_dir="/tmp/huggingface" |
|
) |
|
video_depth_anything.load_state_dict(torch.load(filepath, map_location='cpu')) |
|
video_depth_anything = video_depth_anything.to(DEVICE).eval() |
|
|
|
title = "# Video Depth Anything + RGBD sbs output" |
|
description = """Official demo for **Video Depth Anything** + RGBD sbs output for viewing with Looking Glass Factory displays. |
|
Please refer to our [paper](https://arxiv.org/abs/2501.12375), [project page](https://videodepthanything.github.io/), and [github](https://github.com/DepthAnything/Video-Depth-Anything) for more details.""" |
|
|
|
def infer_video_depth( |
|
input_video: str, |
|
max_len: int = -1, |
|
target_fps: int = -1, |
|
max_res: int = 1280, |
|
stitch: bool = True, |
|
grayscale: bool = True, |
|
convert_from_color: bool = True, |
|
blur: float = 0.3, |
|
loop_factor: int = 1, |
|
output_dir: str = './outputs', |
|
input_size: int = 518, |
|
): |
|
|
|
frames, target_fps = read_video_frames(input_video, max_len, target_fps, max_res) |
|
|
|
depths, fps = video_depth_anything.infer_video_depth(frames, target_fps, input_size=input_size, device=DEVICE) |
|
|
|
video_name = os.path.basename(input_video) |
|
if not os.path.exists(output_dir): |
|
os.makedirs(output_dir) |
|
|
|
|
|
|
|
processed_video_path = os.path.join(output_dir, os.path.splitext(video_name)[0] + '_src.mp4') |
|
depth_vis_path = os.path.join(output_dir, os.path.splitext(video_name)[0] + '_vis.mp4') |
|
save_video(frames, processed_video_path, fps=fps) |
|
save_video(depths, depth_vis_path, fps=fps, is_depths=True) |
|
|
|
stitched_video_path = None |
|
if stitch: |
|
|
|
full_frames, _ = read_video_frames(input_video, max_len, target_fps, max_res=-1) |
|
|
|
d_min, d_max = depths.min(), depths.max() |
|
stitched_frames = [] |
|
for i in range(min(len(full_frames), len(depths))): |
|
rgb_full = full_frames[i] |
|
depth_frame = depths[i] |
|
|
|
depth_norm = ((depth_frame - d_min) / (d_max - d_min) * 255).astype(np.uint8) |
|
|
|
if grayscale: |
|
if convert_from_color: |
|
|
|
|
|
cmap = matplotlib.colormaps.get_cmap("inferno") |
|
depth_color = (cmap(depth_norm / 255.0)[..., :3] * 255).astype(np.uint8) |
|
depth_gray = cv2.cvtColor(depth_color, cv2.COLOR_RGB2GRAY) |
|
depth_vis = np.stack([depth_gray] * 3, axis=-1) |
|
else: |
|
|
|
depth_vis = np.stack([depth_norm] * 3, axis=-1) |
|
else: |
|
|
|
cmap = matplotlib.colormaps.get_cmap("inferno") |
|
depth_vis = (cmap(depth_norm / 255.0)[..., :3] * 255).astype(np.uint8) |
|
|
|
if blur > 0: |
|
kernel_size = int(blur * 20) * 2 + 1 |
|
depth_vis = cv2.GaussianBlur(depth_vis, (kernel_size, kernel_size), 0) |
|
|
|
H_full, W_full = rgb_full.shape[:2] |
|
depth_vis_resized = cv2.resize(depth_vis, (W_full, H_full)) |
|
|
|
stitched = cv2.hconcat([rgb_full, depth_vis_resized]) |
|
stitched_frames.append(stitched) |
|
stitched_frames = np.array(stitched_frames) |
|
|
|
base_name = os.path.splitext(video_name)[0] |
|
short_name = base_name[:20] |
|
stitched_video_path = os.path.join(output_dir, short_name + '_RGBD.mp4') |
|
save_video(stitched_frames, stitched_video_path, fps=fps) |
|
|
|
|
|
temp_audio_path = stitched_video_path.replace('_RGBD.mp4', '_RGBD_audio.mp4') |
|
cmd = [ |
|
"ffmpeg", |
|
"-y", |
|
"-i", stitched_video_path, |
|
"-i", input_video, |
|
"-c:v", "copy", |
|
"-c:a", "aac", |
|
"-map", "0:v:0", |
|
"-map", "1:a:0?", |
|
"-shortest", |
|
temp_audio_path |
|
] |
|
subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
os.replace(temp_audio_path, stitched_video_path) |
|
|
|
|
|
if loop_factor > 1 and stitch and stitched_video_path: |
|
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
original_path = stitched_video_path |
|
abs_original_path = os.path.abspath(original_path) |
|
|
|
print(f"Looping video {original_path} with factor {loop_factor}") |
|
|
|
|
|
has_audio = False |
|
check_audio_cmd = [ |
|
"ffmpeg", |
|
"-i", input_video, |
|
"-c", "copy", |
|
"-f", "null", |
|
"-" |
|
] |
|
result = subprocess.run(check_audio_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
stderr = result.stderr.decode('utf-8') |
|
if "Audio" in stderr: |
|
has_audio = True |
|
print("Audio stream detected in input video") |
|
|
|
|
|
temp_looped_path = os.path.join(output_dir, 'temp_rgbd_looped.mp4') |
|
|
|
try: |
|
|
|
concat_stitched_file_path = os.path.join(output_dir, 'concat_stitched_list.txt') |
|
with open(concat_stitched_file_path, 'w') as f: |
|
for _ in range(loop_factor): |
|
|
|
f.write(f"file '{abs_original_path}'\n") |
|
|
|
print(f"Creating temporary file at: {temp_looped_path}") |
|
print(f"Using absolute path for original: {abs_original_path}") |
|
|
|
|
|
concat_cmd = [ |
|
"ffmpeg", |
|
"-y", |
|
"-f", "concat", |
|
"-safe", "0", |
|
"-i", concat_stitched_file_path, |
|
"-c", "copy", |
|
temp_looped_path |
|
] |
|
process = subprocess.run(concat_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
print(f"FFmpeg concat command exit code: {process.returncode}") |
|
if process.returncode != 0: |
|
print(f"FFmpeg error: {process.stderr.decode('utf-8')}") |
|
|
|
|
|
if not os.path.exists(temp_looped_path): |
|
print(f"ERROR: Failed to create temporary file {temp_looped_path}") |
|
print(f"Current directory contents: {os.listdir(output_dir)}") |
|
|
|
return [depth_vis_path, stitched_video_path] |
|
|
|
|
|
if has_audio: |
|
|
|
audio_path = os.path.join(output_dir, 'extracted_audio.aac') |
|
extract_audio_cmd = [ |
|
"ffmpeg", |
|
"-y", |
|
"-i", input_video, |
|
"-vn", "-acodec", "copy", |
|
audio_path |
|
] |
|
subprocess.run(extract_audio_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
|
|
|
|
if not os.path.exists(audio_path) or os.path.getsize(audio_path) == 0: |
|
print(f"WARNING: Failed to extract audio or no audio track in {input_video}") |
|
has_audio = False |
|
else: |
|
|
|
concat_audio_file_path = os.path.join(output_dir, 'concat_audio_list.txt') |
|
with open(concat_audio_file_path, 'w') as f: |
|
for _ in range(loop_factor): |
|
|
|
abs_audio_path = os.path.abspath(audio_path) |
|
f.write(f"file '{abs_audio_path}'\n") |
|
|
|
|
|
looped_audio_path = os.path.join(output_dir, 'looped_audio.aac') |
|
audio_loop_cmd = [ |
|
"ffmpeg", |
|
"-y", |
|
"-f", "concat", |
|
"-safe", "0", |
|
"-i", concat_audio_file_path, |
|
"-c", "copy", |
|
looped_audio_path |
|
] |
|
subprocess.run(audio_loop_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
|
|
|
|
if not os.path.exists(looped_audio_path) or os.path.getsize(looped_audio_path) == 0: |
|
print(f"WARNING: Failed to create looped audio") |
|
has_audio = False |
|
|
|
|
|
if has_audio: |
|
|
|
final_cmd = [ |
|
"ffmpeg", |
|
"-y", |
|
"-i", temp_looped_path, |
|
"-i", looped_audio_path, |
|
"-c:v", "copy", |
|
"-c:a", "aac", |
|
"-map", "0:v:0", |
|
"-map", "1:a:0", |
|
"-shortest", |
|
original_path |
|
] |
|
subprocess.run(final_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
else: |
|
|
|
copy_cmd = [ |
|
"ffmpeg", |
|
"-y", |
|
"-i", temp_looped_path, |
|
"-c", "copy", |
|
original_path |
|
] |
|
subprocess.run(copy_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
|
|
|
|
if not os.path.exists(original_path): |
|
print(f"ERROR: Failed to replace {original_path} with looped version") |
|
else: |
|
print(f"Successfully replaced {original_path} with looped version") |
|
|
|
|
|
temp_files = [concat_stitched_file_path] |
|
if has_audio: |
|
temp_files.extend([concat_audio_file_path, audio_path, looped_audio_path]) |
|
if os.path.exists(temp_looped_path): |
|
temp_files.append(temp_looped_path) |
|
|
|
for file_path in temp_files: |
|
if os.path.exists(file_path): |
|
try: |
|
os.remove(file_path) |
|
except Exception as e: |
|
print(f"Warning: Could not remove temporary file {file_path}: {str(e)}") |
|
|
|
except Exception as e: |
|
print(f"Error during looping process: {str(e)}") |
|
import traceback |
|
traceback.print_exc() |
|
|
|
return [depth_vis_path, stitched_video_path] |
|
|
|
gc.collect() |
|
torch.cuda.empty_cache() |
|
|
|
|
|
return [depth_vis_path, stitched_video_path] |
|
|
|
def construct_demo(): |
|
with gr.Blocks(analytics_enabled=False) as demo: |
|
gr.Markdown(title) |
|
gr.Markdown(description) |
|
gr.Markdown("### If you find this work useful, please help ⭐ the [Github Repo](https://github.com/DepthAnything/Video-Depth-Anything). Thanks for your attention!") |
|
|
|
with gr.Row(equal_height=True): |
|
with gr.Column(scale=1): |
|
|
|
input_video = gr.Video(label="Input Video") |
|
with gr.Column(scale=2): |
|
with gr.Row(equal_height=True): |
|
|
|
depth_vis_video = gr.Video(label="Generated Depth Video", interactive=False, autoplay=True, show_share_button=True, scale=5) |
|
stitched_video = gr.Video(label="Stitched RGBD Video", interactive=False, autoplay=True, show_share_button=True, scale=5) |
|
|
|
with gr.Row(equal_height=True): |
|
with gr.Column(scale=1): |
|
with gr.Accordion("Advanced Settings", open=False): |
|
max_len = gr.Slider(label="Max process length", minimum=-1, maximum=1000, value=-1, step=1) |
|
target_fps = gr.Slider(label="Target FPS", minimum=-1, maximum=30, value=-1, step=1) |
|
max_res = gr.Slider(label="Max side resolution", minimum=480, maximum=1920, value=1280, step=1) |
|
stitch_option = gr.Checkbox(label="Stitch RGB & Depth Videos", value=True) |
|
grayscale_option = gr.Checkbox(label="Output Depth as Grayscale", value=True) |
|
convert_from_color_option = gr.Checkbox(label="Convert Grayscale from Color", value=True) |
|
blur_slider = gr.Slider(minimum=0, maximum=1, step=0.01, label="Depth Blur (can reduce edge artifacts on display)", value=0.3) |
|
|
|
loop_factor = gr.Slider(label="Loop Factor (repeats the RGBD output video)", minimum=1, maximum=20, value=1, step=1) |
|
generate_btn = gr.Button("Generate") |
|
with gr.Column(scale=2): |
|
pass |
|
|
|
|
|
|
|
generate_btn.click( |
|
fn=infer_video_depth, |
|
inputs=[input_video, max_len, target_fps, max_res, stitch_option, grayscale_option, convert_from_color_option, blur_slider, loop_factor], |
|
outputs=[depth_vis_video, stitched_video], |
|
) |
|
|
|
return demo |
|
|
|
if __name__ == "__main__": |
|
demo = construct_demo() |
|
demo.queue() |
|
demo.launch(share=True, show_api=False) |