import os # Set cache directories to writable locations right at the beginning os.environ["HF_HOME"] = "/tmp/huggingface" os.environ["TRANSFORMERS_CACHE"] = "/tmp/huggingface/transformers" os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib" # Patching the schema handling problem in Gradio 5.x # This needs to be done before any Gradio imports import sys def patch_gradio_utils(): try: from gradio_client import utils original_get_type = utils.get_type def patched_get_type(schema): if isinstance(schema, bool): return "boolean" if not isinstance(schema, dict): return "any" return original_get_type(schema) utils.get_type = patched_get_type print("Successfully patched Gradio utils.get_type") except Exception as e: print(f"Could not patch Gradio utils: {e}") patch_gradio_utils() import gc import torch import cv2 import gradio as gr print("📦 Gradio version:", gr.__version__) import numpy as np import matplotlib.cm as cm import matplotlib # New import for the updated colormap API import subprocess from video_depth_anything.video_depth import VideoDepthAnything from utils.dc_utils import read_video_frames, save_video from huggingface_hub import hf_hub_download # Use GPU if available; otherwise, use CPU. DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu' # Model configuration for different encoder variants. model_configs = { 'vits': {'encoder': 'vits', 'features': 64, 'out_channels': [48, 96, 192, 384]}, 'vitl': {'encoder': 'vitl', 'features': 256, 'out_channels': [256, 512, 1024, 1024]}, } encoder2name = { 'vits': 'Small', 'vitl': 'Large', } encoder = 'vitl' model_name = encoder2name[encoder] # Initialize the model. video_depth_anything = VideoDepthAnything(**model_configs[encoder]) filepath = hf_hub_download( repo_id=f"depth-anything/Video-Depth-Anything-{model_name}", filename=f"video_depth_anything_{encoder}.pth", repo_type="model", cache_dir="/tmp/huggingface" # Explicitly set the cache directory ) video_depth_anything.load_state_dict(torch.load(filepath, map_location='cpu')) video_depth_anything = video_depth_anything.to(DEVICE).eval() title = "# Video Depth Anything + RGBD sbs output" description = """Official demo for **Video Depth Anything** + RGBD sbs output for viewing with Looking Glass Factory displays. Please refer to our [paper](https://arxiv.org/abs/2501.12375), [project page](https://videodepthanything.github.io/), and [github](https://github.com/DepthAnything/Video-Depth-Anything) for more details.""" def infer_video_depth( input_video: str, max_len: int = -1, target_fps: int = -1, max_res: int = 1280, stitch: bool = True, grayscale: bool = True, convert_from_color: bool = True, blur: float = 0.3, loop_factor: int = 1, # New parameter for video looping output_dir: str = './outputs', input_size: int = 518, ): # 1. Read input video frames for inference (downscaled to max_res). frames, target_fps = read_video_frames(input_video, max_len, target_fps, max_res) # 2. Perform depth inference using the model. depths, fps = video_depth_anything.infer_video_depth(frames, target_fps, input_size=input_size, device=DEVICE) video_name = os.path.basename(input_video) if not os.path.exists(output_dir): os.makedirs(output_dir) # Save the preprocessed (RGB) video and the generated depth visualization. # Still process the video, but we won't display it in the UI processed_video_path = os.path.join(output_dir, os.path.splitext(video_name)[0] + '_src.mp4') depth_vis_path = os.path.join(output_dir, os.path.splitext(video_name)[0] + '_vis.mp4') save_video(frames, processed_video_path, fps=fps) save_video(depths, depth_vis_path, fps=fps, is_depths=True) stitched_video_path = None if stitch: # For stitching: read the original video in full resolution (without downscaling). full_frames, _ = read_video_frames(input_video, max_len, target_fps, max_res=-1) # For each frame, create a visual depth image from the inferenced depths. d_min, d_max = depths.min(), depths.max() stitched_frames = [] for i in range(min(len(full_frames), len(depths))): rgb_full = full_frames[i] # Full-resolution RGB frame. depth_frame = depths[i] # Normalize the depth frame to the range [0, 255]. depth_norm = ((depth_frame - d_min) / (d_max - d_min) * 255).astype(np.uint8) # Generate depth visualization: if grayscale: if convert_from_color: # First, generate a color depth image using the inferno colormap, # then convert that color image to grayscale. cmap = matplotlib.colormaps.get_cmap("inferno") depth_color = (cmap(depth_norm / 255.0)[..., :3] * 255).astype(np.uint8) depth_gray = cv2.cvtColor(depth_color, cv2.COLOR_RGB2GRAY) depth_vis = np.stack([depth_gray] * 3, axis=-1) else: # Directly generate a grayscale image from the normalized depth values. depth_vis = np.stack([depth_norm] * 3, axis=-1) else: # Generate a color depth image using the inferno colormap. cmap = matplotlib.colormaps.get_cmap("inferno") depth_vis = (cmap(depth_norm / 255.0)[..., :3] * 255).astype(np.uint8) # Apply Gaussian blur if requested. if blur > 0: kernel_size = int(blur * 20) * 2 + 1 # Ensures an odd kernel size. depth_vis = cv2.GaussianBlur(depth_vis, (kernel_size, kernel_size), 0) # Resize the depth visualization to match the full-resolution RGB frame. H_full, W_full = rgb_full.shape[:2] depth_vis_resized = cv2.resize(depth_vis, (W_full, H_full)) # Concatenate the full-resolution RGB frame (left) and the resized depth visualization (right). stitched = cv2.hconcat([rgb_full, depth_vis_resized]) stitched_frames.append(stitched) stitched_frames = np.array(stitched_frames) # Use only the first 20 characters of the base name for the output filename and append '_RGBD.mp4' base_name = os.path.splitext(video_name)[0] short_name = base_name[:20] stitched_video_path = os.path.join(output_dir, short_name + '_RGBD.mp4') save_video(stitched_frames, stitched_video_path, fps=fps) # Merge audio from the input video into the stitched video using ffmpeg. temp_audio_path = stitched_video_path.replace('_RGBD.mp4', '_RGBD_audio.mp4') cmd = [ "ffmpeg", "-y", "-i", stitched_video_path, "-i", input_video, "-c:v", "copy", "-c:a", "aac", "-map", "0:v:0", "-map", "1:a:0?", "-shortest", temp_audio_path ] subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) os.replace(temp_audio_path, stitched_video_path) # Apply looping only to the RGBD video when requested if loop_factor > 1 and stitch and stitched_video_path: # Ensure the output directory exists os.makedirs(output_dir, exist_ok=True) # Save original path and absolute path original_path = stitched_video_path abs_original_path = os.path.abspath(original_path) print(f"Looping video {original_path} with factor {loop_factor}") # Check if the input video has an audio stream has_audio = False check_audio_cmd = [ "ffmpeg", "-i", input_video, "-c", "copy", "-f", "null", "-" ] result = subprocess.run(check_audio_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stderr = result.stderr.decode('utf-8') if "Audio" in stderr: has_audio = True print("Audio stream detected in input video") # Temporary path in the output directory temp_looped_path = os.path.join(output_dir, 'temp_rgbd_looped.mp4') try: # Create a temporary text file for the stitched videos concat_stitched_file_path = os.path.join(output_dir, 'concat_stitched_list.txt') with open(concat_stitched_file_path, 'w') as f: for _ in range(loop_factor): # Use absolute path f.write(f"file '{abs_original_path}'\n") print(f"Creating temporary file at: {temp_looped_path}") print(f"Using absolute path for original: {abs_original_path}") # Use ffmpeg to loop the video concat_cmd = [ "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", concat_stitched_file_path, "-c", "copy", temp_looped_path ] process = subprocess.run(concat_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) print(f"FFmpeg concat command exit code: {process.returncode}") if process.returncode != 0: print(f"FFmpeg error: {process.stderr.decode('utf-8')}") # Check if the temporary file was created if not os.path.exists(temp_looped_path): print(f"ERROR: Failed to create temporary file {temp_looped_path}") print(f"Current directory contents: {os.listdir(output_dir)}") # Fallback return [depth_vis_path, stitched_video_path] # If audio is present, we need to handle it separately if has_audio: # Extract the audio track from the original input video audio_path = os.path.join(output_dir, 'extracted_audio.aac') extract_audio_cmd = [ "ffmpeg", "-y", "-i", input_video, # Use original input video "-vn", "-acodec", "copy", audio_path ] subprocess.run(extract_audio_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Check if audio was extracted if not os.path.exists(audio_path) or os.path.getsize(audio_path) == 0: print(f"WARNING: Failed to extract audio or no audio track in {input_video}") has_audio = False else: # Create a text file for audio looping concat_audio_file_path = os.path.join(output_dir, 'concat_audio_list.txt') with open(concat_audio_file_path, 'w') as f: for _ in range(loop_factor): # Use absolute path abs_audio_path = os.path.abspath(audio_path) f.write(f"file '{abs_audio_path}'\n") # Create the looped audio track looped_audio_path = os.path.join(output_dir, 'looped_audio.aac') audio_loop_cmd = [ "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", concat_audio_file_path, "-c", "copy", looped_audio_path ] subprocess.run(audio_loop_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Check if audio was looped if not os.path.exists(looped_audio_path) or os.path.getsize(looped_audio_path) == 0: print(f"WARNING: Failed to create looped audio") has_audio = False # Final step: Combine video and audio if needed, otherwise just copy video if has_audio: # Combine the looped video with the looped audio final_cmd = [ "ffmpeg", "-y", "-i", temp_looped_path, "-i", looped_audio_path, "-c:v", "copy", "-c:a", "aac", "-map", "0:v:0", "-map", "1:a:0", "-shortest", original_path # Use the original path as destination ] subprocess.run(final_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) else: # If no audio exists, just copy the video copy_cmd = [ "ffmpeg", "-y", "-i", temp_looped_path, "-c", "copy", original_path ] subprocess.run(copy_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Check if the replacement was successful if not os.path.exists(original_path): print(f"ERROR: Failed to replace {original_path} with looped version") else: print(f"Successfully replaced {original_path} with looped version") # Clean up temporary files temp_files = [concat_stitched_file_path] if has_audio: temp_files.extend([concat_audio_file_path, audio_path, looped_audio_path]) if os.path.exists(temp_looped_path): temp_files.append(temp_looped_path) for file_path in temp_files: if os.path.exists(file_path): try: os.remove(file_path) except Exception as e: print(f"Warning: Could not remove temporary file {file_path}: {str(e)}") except Exception as e: print(f"Error during looping process: {str(e)}") import traceback traceback.print_exc() # In case of error, keep the original files return [depth_vis_path, stitched_video_path] gc.collect() torch.cuda.empty_cache() # Only return the depth visualization and stitched video (not the preprocessed video) return [depth_vis_path, stitched_video_path] def construct_demo(): with gr.Blocks(analytics_enabled=False) as demo: gr.Markdown(title) gr.Markdown(description) gr.Markdown("### If you find this work useful, please help ⭐ the [Github Repo](https://github.com/DepthAnything/Video-Depth-Anything). Thanks for your attention!") with gr.Row(equal_height=True): with gr.Column(scale=1): # Video input component for file upload. input_video = gr.Video(label="Input Video") with gr.Column(scale=2): with gr.Row(equal_height=True): # Removed the processed_video component from the UI depth_vis_video = gr.Video(label="Generated Depth Video", interactive=False, autoplay=True, show_share_button=True, scale=5) stitched_video = gr.Video(label="Stitched RGBD Video", interactive=False, autoplay=True, show_share_button=True, scale=5) with gr.Row(equal_height=True): with gr.Column(scale=1): with gr.Accordion("Advanced Settings", open=False): max_len = gr.Slider(label="Max process length", minimum=-1, maximum=1000, value=-1, step=1) target_fps = gr.Slider(label="Target FPS", minimum=-1, maximum=30, value=-1, step=1) max_res = gr.Slider(label="Max side resolution", minimum=480, maximum=1920, value=1280, step=1) stitch_option = gr.Checkbox(label="Stitch RGB & Depth Videos", value=True) grayscale_option = gr.Checkbox(label="Output Depth as Grayscale", value=True) convert_from_color_option = gr.Checkbox(label="Convert Grayscale from Color", value=True) blur_slider = gr.Slider(minimum=0, maximum=1, step=0.01, label="Depth Blur (can reduce edge artifacts on display)", value=0.3) # Add the loop factor slider loop_factor = gr.Slider(label="Loop Factor (repeats the RGBD output video)", minimum=1, maximum=20, value=1, step=1) generate_btn = gr.Button("Generate") with gr.Column(scale=2): pass # Removed Examples block to improve loading time generate_btn.click( fn=infer_video_depth, inputs=[input_video, max_len, target_fps, max_res, stitch_option, grayscale_option, convert_from_color_option, blur_slider, loop_factor], # Added loop_factor outputs=[depth_vis_video, stitched_video], ) return demo if __name__ == "__main__": demo = construct_demo() demo.queue() # Enable asynchronous processing. demo.launch(share=True, show_api=False)