Krokodilpirat's picture
Update app.py
7aec1dd verified
import os
# Set cache directories to writable locations right at the beginning
os.environ["HF_HOME"] = "/tmp/huggingface"
os.environ["TRANSFORMERS_CACHE"] = "/tmp/huggingface/transformers"
os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib"
# Patching the schema handling problem in Gradio 5.x
# This needs to be done before any Gradio imports
import sys
def patch_gradio_utils():
try:
from gradio_client import utils
original_get_type = utils.get_type
def patched_get_type(schema):
if isinstance(schema, bool):
return "boolean"
if not isinstance(schema, dict):
return "any"
return original_get_type(schema)
utils.get_type = patched_get_type
print("Successfully patched Gradio utils.get_type")
except Exception as e:
print(f"Could not patch Gradio utils: {e}")
patch_gradio_utils()
import gc
import torch
import cv2
import gradio as gr
print("📦 Gradio version:", gr.__version__)
import numpy as np
import matplotlib.cm as cm
import matplotlib # New import for the updated colormap API
import subprocess
from video_depth_anything.video_depth import VideoDepthAnything
from utils.dc_utils import read_video_frames, save_video
from huggingface_hub import hf_hub_download
# Use GPU if available; otherwise, use CPU.
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
# Model configuration for different encoder variants.
model_configs = {
'vits': {'encoder': 'vits', 'features': 64, 'out_channels': [48, 96, 192, 384]},
'vitl': {'encoder': 'vitl', 'features': 256, 'out_channels': [256, 512, 1024, 1024]},
}
encoder2name = {
'vits': 'Small',
'vitl': 'Large',
}
encoder = 'vitl'
model_name = encoder2name[encoder]
# Initialize the model.
video_depth_anything = VideoDepthAnything(**model_configs[encoder])
filepath = hf_hub_download(
repo_id=f"depth-anything/Video-Depth-Anything-{model_name}",
filename=f"video_depth_anything_{encoder}.pth",
repo_type="model",
cache_dir="/tmp/huggingface" # Explicitly set the cache directory
)
video_depth_anything.load_state_dict(torch.load(filepath, map_location='cpu'))
video_depth_anything = video_depth_anything.to(DEVICE).eval()
title = "# Video Depth Anything + RGBD sbs output"
description = """Official demo for **Video Depth Anything** + RGBD sbs output for viewing with Looking Glass Factory displays.
Please refer to our [paper](https://arxiv.org/abs/2501.12375), [project page](https://videodepthanything.github.io/), and [github](https://github.com/DepthAnything/Video-Depth-Anything) for more details."""
def infer_video_depth(
input_video: str,
max_len: int = -1,
target_fps: int = -1,
max_res: int = 1280,
stitch: bool = True,
grayscale: bool = True,
convert_from_color: bool = True,
blur: float = 0.3,
loop_factor: int = 1, # New parameter for video looping
output_dir: str = './outputs',
input_size: int = 518,
):
# 1. Read input video frames for inference (downscaled to max_res).
frames, target_fps = read_video_frames(input_video, max_len, target_fps, max_res)
# 2. Perform depth inference using the model.
depths, fps = video_depth_anything.infer_video_depth(frames, target_fps, input_size=input_size, device=DEVICE)
video_name = os.path.basename(input_video)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# Save the preprocessed (RGB) video and the generated depth visualization.
# Still process the video, but we won't display it in the UI
processed_video_path = os.path.join(output_dir, os.path.splitext(video_name)[0] + '_src.mp4')
depth_vis_path = os.path.join(output_dir, os.path.splitext(video_name)[0] + '_vis.mp4')
save_video(frames, processed_video_path, fps=fps)
save_video(depths, depth_vis_path, fps=fps, is_depths=True)
stitched_video_path = None
if stitch:
# For stitching: read the original video in full resolution (without downscaling).
full_frames, _ = read_video_frames(input_video, max_len, target_fps, max_res=-1)
# For each frame, create a visual depth image from the inferenced depths.
d_min, d_max = depths.min(), depths.max()
stitched_frames = []
for i in range(min(len(full_frames), len(depths))):
rgb_full = full_frames[i] # Full-resolution RGB frame.
depth_frame = depths[i]
# Normalize the depth frame to the range [0, 255].
depth_norm = ((depth_frame - d_min) / (d_max - d_min) * 255).astype(np.uint8)
# Generate depth visualization:
if grayscale:
if convert_from_color:
# First, generate a color depth image using the inferno colormap,
# then convert that color image to grayscale.
cmap = matplotlib.colormaps.get_cmap("inferno")
depth_color = (cmap(depth_norm / 255.0)[..., :3] * 255).astype(np.uint8)
depth_gray = cv2.cvtColor(depth_color, cv2.COLOR_RGB2GRAY)
depth_vis = np.stack([depth_gray] * 3, axis=-1)
else:
# Directly generate a grayscale image from the normalized depth values.
depth_vis = np.stack([depth_norm] * 3, axis=-1)
else:
# Generate a color depth image using the inferno colormap.
cmap = matplotlib.colormaps.get_cmap("inferno")
depth_vis = (cmap(depth_norm / 255.0)[..., :3] * 255).astype(np.uint8)
# Apply Gaussian blur if requested.
if blur > 0:
kernel_size = int(blur * 20) * 2 + 1 # Ensures an odd kernel size.
depth_vis = cv2.GaussianBlur(depth_vis, (kernel_size, kernel_size), 0)
# Resize the depth visualization to match the full-resolution RGB frame.
H_full, W_full = rgb_full.shape[:2]
depth_vis_resized = cv2.resize(depth_vis, (W_full, H_full))
# Concatenate the full-resolution RGB frame (left) and the resized depth visualization (right).
stitched = cv2.hconcat([rgb_full, depth_vis_resized])
stitched_frames.append(stitched)
stitched_frames = np.array(stitched_frames)
# Use only the first 20 characters of the base name for the output filename and append '_RGBD.mp4'
base_name = os.path.splitext(video_name)[0]
short_name = base_name[:20]
stitched_video_path = os.path.join(output_dir, short_name + '_RGBD.mp4')
save_video(stitched_frames, stitched_video_path, fps=fps)
# Merge audio from the input video into the stitched video using ffmpeg.
temp_audio_path = stitched_video_path.replace('_RGBD.mp4', '_RGBD_audio.mp4')
cmd = [
"ffmpeg",
"-y",
"-i", stitched_video_path,
"-i", input_video,
"-c:v", "copy",
"-c:a", "aac",
"-map", "0:v:0",
"-map", "1:a:0?",
"-shortest",
temp_audio_path
]
subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
os.replace(temp_audio_path, stitched_video_path)
# Apply looping only to the RGBD video when requested
if loop_factor > 1 and stitch and stitched_video_path:
# Ensure the output directory exists
os.makedirs(output_dir, exist_ok=True)
# Save original path and absolute path
original_path = stitched_video_path
abs_original_path = os.path.abspath(original_path)
print(f"Looping video {original_path} with factor {loop_factor}")
# Check if the input video has an audio stream
has_audio = False
check_audio_cmd = [
"ffmpeg",
"-i", input_video,
"-c", "copy",
"-f", "null",
"-"
]
result = subprocess.run(check_audio_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stderr = result.stderr.decode('utf-8')
if "Audio" in stderr:
has_audio = True
print("Audio stream detected in input video")
# Temporary path in the output directory
temp_looped_path = os.path.join(output_dir, 'temp_rgbd_looped.mp4')
try:
# Create a temporary text file for the stitched videos
concat_stitched_file_path = os.path.join(output_dir, 'concat_stitched_list.txt')
with open(concat_stitched_file_path, 'w') as f:
for _ in range(loop_factor):
# Use absolute path
f.write(f"file '{abs_original_path}'\n")
print(f"Creating temporary file at: {temp_looped_path}")
print(f"Using absolute path for original: {abs_original_path}")
# Use ffmpeg to loop the video
concat_cmd = [
"ffmpeg",
"-y",
"-f", "concat",
"-safe", "0",
"-i", concat_stitched_file_path,
"-c", "copy",
temp_looped_path
]
process = subprocess.run(concat_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(f"FFmpeg concat command exit code: {process.returncode}")
if process.returncode != 0:
print(f"FFmpeg error: {process.stderr.decode('utf-8')}")
# Check if the temporary file was created
if not os.path.exists(temp_looped_path):
print(f"ERROR: Failed to create temporary file {temp_looped_path}")
print(f"Current directory contents: {os.listdir(output_dir)}")
# Fallback
return [depth_vis_path, stitched_video_path]
# If audio is present, we need to handle it separately
if has_audio:
# Extract the audio track from the original input video
audio_path = os.path.join(output_dir, 'extracted_audio.aac')
extract_audio_cmd = [
"ffmpeg",
"-y",
"-i", input_video, # Use original input video
"-vn", "-acodec", "copy",
audio_path
]
subprocess.run(extract_audio_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Check if audio was extracted
if not os.path.exists(audio_path) or os.path.getsize(audio_path) == 0:
print(f"WARNING: Failed to extract audio or no audio track in {input_video}")
has_audio = False
else:
# Create a text file for audio looping
concat_audio_file_path = os.path.join(output_dir, 'concat_audio_list.txt')
with open(concat_audio_file_path, 'w') as f:
for _ in range(loop_factor):
# Use absolute path
abs_audio_path = os.path.abspath(audio_path)
f.write(f"file '{abs_audio_path}'\n")
# Create the looped audio track
looped_audio_path = os.path.join(output_dir, 'looped_audio.aac')
audio_loop_cmd = [
"ffmpeg",
"-y",
"-f", "concat",
"-safe", "0",
"-i", concat_audio_file_path,
"-c", "copy",
looped_audio_path
]
subprocess.run(audio_loop_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Check if audio was looped
if not os.path.exists(looped_audio_path) or os.path.getsize(looped_audio_path) == 0:
print(f"WARNING: Failed to create looped audio")
has_audio = False
# Final step: Combine video and audio if needed, otherwise just copy video
if has_audio:
# Combine the looped video with the looped audio
final_cmd = [
"ffmpeg",
"-y",
"-i", temp_looped_path,
"-i", looped_audio_path,
"-c:v", "copy",
"-c:a", "aac",
"-map", "0:v:0",
"-map", "1:a:0",
"-shortest",
original_path # Use the original path as destination
]
subprocess.run(final_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
else:
# If no audio exists, just copy the video
copy_cmd = [
"ffmpeg",
"-y",
"-i", temp_looped_path,
"-c", "copy",
original_path
]
subprocess.run(copy_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Check if the replacement was successful
if not os.path.exists(original_path):
print(f"ERROR: Failed to replace {original_path} with looped version")
else:
print(f"Successfully replaced {original_path} with looped version")
# Clean up temporary files
temp_files = [concat_stitched_file_path]
if has_audio:
temp_files.extend([concat_audio_file_path, audio_path, looped_audio_path])
if os.path.exists(temp_looped_path):
temp_files.append(temp_looped_path)
for file_path in temp_files:
if os.path.exists(file_path):
try:
os.remove(file_path)
except Exception as e:
print(f"Warning: Could not remove temporary file {file_path}: {str(e)}")
except Exception as e:
print(f"Error during looping process: {str(e)}")
import traceback
traceback.print_exc()
# In case of error, keep the original files
return [depth_vis_path, stitched_video_path]
gc.collect()
torch.cuda.empty_cache()
# Only return the depth visualization and stitched video (not the preprocessed video)
return [depth_vis_path, stitched_video_path]
def construct_demo():
with gr.Blocks(analytics_enabled=False) as demo:
gr.Markdown(title)
gr.Markdown(description)
gr.Markdown("### If you find this work useful, please help ⭐ the [Github Repo](https://github.com/DepthAnything/Video-Depth-Anything). Thanks for your attention!")
with gr.Row(equal_height=True):
with gr.Column(scale=1):
# Video input component for file upload.
input_video = gr.Video(label="Input Video")
with gr.Column(scale=2):
with gr.Row(equal_height=True):
# Removed the processed_video component from the UI
depth_vis_video = gr.Video(label="Generated Depth Video", interactive=False, autoplay=True, show_share_button=True, scale=5)
stitched_video = gr.Video(label="Stitched RGBD Video", interactive=False, autoplay=True, show_share_button=True, scale=5)
with gr.Row(equal_height=True):
with gr.Column(scale=1):
with gr.Accordion("Advanced Settings", open=False):
max_len = gr.Slider(label="Max process length", minimum=-1, maximum=1000, value=-1, step=1)
target_fps = gr.Slider(label="Target FPS", minimum=-1, maximum=30, value=-1, step=1)
max_res = gr.Slider(label="Max side resolution", minimum=480, maximum=1920, value=1280, step=1)
stitch_option = gr.Checkbox(label="Stitch RGB & Depth Videos", value=True)
grayscale_option = gr.Checkbox(label="Output Depth as Grayscale", value=True)
convert_from_color_option = gr.Checkbox(label="Convert Grayscale from Color", value=True)
blur_slider = gr.Slider(minimum=0, maximum=1, step=0.01, label="Depth Blur (can reduce edge artifacts on display)", value=0.3)
# Add the loop factor slider
loop_factor = gr.Slider(label="Loop Factor (repeats the RGBD output video)", minimum=1, maximum=20, value=1, step=1)
generate_btn = gr.Button("Generate")
with gr.Column(scale=2):
pass
# Removed Examples block to improve loading time
generate_btn.click(
fn=infer_video_depth,
inputs=[input_video, max_len, target_fps, max_res, stitch_option, grayscale_option, convert_from_color_option, blur_slider, loop_factor], # Added loop_factor
outputs=[depth_vis_video, stitched_video],
)
return demo
if __name__ == "__main__":
demo = construct_demo()
demo.queue() # Enable asynchronous processing.
demo.launch(share=True, show_api=False)