# app.py — ShortiFoley (Video -> Foley) # Created by bilsimaging.com import os import sys import io import json import uuid import time import shutil import base64 import random import tempfile import datetime from pathlib import Path from typing import List, Optional, Tuple, Dict import numpy as np import torch import torchaudio import gradio as gr from loguru import logger from huggingface_hub import snapshot_download import spaces # HF Spaces ZeroGPU & MCP integration # ------------------------- # Constants & configuration # ------------------------- ROOT = Path(__file__).parent.resolve() REPO_DIR = ROOT / "HunyuanVideo-Foley" WEIGHTS_DIR = Path(os.environ.get("HIFI_FOLEY_MODEL_PATH", str(ROOT / "weights"))) CONFIG_PATH = Path(os.environ.get("HIFI_FOLEY_CONFIG", str(REPO_DIR / "configs" / "hunyuanvideo-foley-xxl.yaml"))) OUTPUTS_DIR = Path(os.environ.get("OUTPUTS_DIR", str(ROOT / "outputs"))) OUTPUTS_DIR.mkdir(parents=True, exist_ok=True) SPACE_TITLE = "🎵 ShortiFoley — HunyuanVideo-Foley" SPACE_TAGLINE = "Text/Video → Audio Foley. Created by bilsimaging.com" WATERMARK_NOTE = "Made with ❤️ by bilsimaging.com" # Keep GPU <= 120s for ZeroGPU (default 110) GPU_DURATION = int(os.environ.get("GPU_DURATION_SECS", "110")) # Globals _model_dict = None _cfg = None _device: Optional[torch.device] = None # ------------ # Small helpers # ------------ def _setup_device(pref: str = "auto", gpu_id: int = 0) -> torch.device: """Pick CUDA if available, else MPS, else CPU.""" if pref == "auto": if torch.cuda.is_available(): d = torch.device(f"cuda:{gpu_id}") elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): d = torch.device("mps") else: d = torch.device("cpu") else: d = torch.device(pref) logger.info(f"Using CUDA {d}" if d.type == "cuda" else f"Using {d}") return d def _ensure_repo() -> None: """Shallow-clone Tencent repo with LFS smudge disabled (avoid LFS quota checkout).""" if REPO_DIR.exists(): return cmd = ( "GIT_LFS_SKIP_SMUDGE=1 " "git -c filter.lfs.smudge= -c filter.lfs.required=false " f"clone --depth 1 https://github.com/Tencent-Hunyuan/HunyuanVideo-Foley.git {REPO_DIR}" ) logger.info(f">> {cmd}") os.system(cmd) def _download_weights_if_needed() -> None: """Snapshot only needed files from HF weights/model hub.""" WEIGHTS_DIR.mkdir(parents=True, exist_ok=True) snapshot_download( repo_id="tencent/HunyuanVideo-Foley", local_dir=str(WEIGHTS_DIR), resume_download=True, allow_patterns=[ "hunyuanvideo_foley.pth", "synchformer_state_dict.pth", "vae_128d_48k.pth", "assets/*", "config.yaml", # harmless ], ) def prepare_once() -> None: _ensure_repo() _download_weights_if_needed() # ----------------------- # Model load & inference # ----------------------- def auto_load_models() -> str: """ Load HunyuanVideo-Foley + encoders on the chosen device. """ global _model_dict, _cfg, _device if _model_dict is not None and _cfg is not None: return "Model already loaded." sys.path.append(str(REPO_DIR)) from hunyuanvideo_foley.utils.model_utils import load_model _device = _setup_device("auto", 0) logger.info("Loading HunyuanVideo-Foley model...") logger.info(f"MODEL_PATH: {WEIGHTS_DIR}") logger.info(f"CONFIG_PATH: {CONFIG_PATH}") try: _model_dict, _cfg = load_model(str(WEIGHTS_DIR), str(CONFIG_PATH), _device) return "✅ Model loaded." except Exception as e: logger.error(e) return f"❌ Failed to load model: {e}" def _merge_audio_video(audio_path: str, video_path: str, out_path: str) -> None: """Use project's helper (preferred) with a fallback to ffmpeg via subprocess.""" sys.path.append(str(REPO_DIR)) try: from hunyuanvideo_foley.utils.media_utils import merge_audio_video merge_audio_video(audio_path, video_path, out_path) except Exception as e: # Fallback: plain ffmpeg merge (assumes same duration or lets ffmpeg handle) logger.warning(f"merge_audio_video failed, falling back to ffmpeg: {e}") import subprocess cmd = [ "ffmpeg", "-y", "-i", video_path, "-i", audio_path, "-c:v", "copy", "-c:a", "aac", "-shortest", out_path ] subprocess.run(cmd, check=True) def _save_outputs(video_src: str, audio_tensor: torch.Tensor, sr: int, idx: int, prompt: str) -> str: """Save WAV + MP4 in outputs/, add metadata and a small watermark note (metadata only).""" # torchaudio expects [C, N] if audio_tensor.ndim == 1: audio_tensor = audio_tensor.unsqueeze(0) tmpdir = Path(tempfile.mkdtemp()) wav_path = tmpdir / f"gen_{idx}.wav" torchaudio.save(str(wav_path), audio_tensor.cpu(), sr) ts = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S_%f") base = f"shortifoley_{ts}_{idx}" out_mp4 = OUTPUTS_DIR / f"{base}.mp4" _merge_audio_video(str(wav_path), video_src, str(out_mp4)) # Save JSON sidecar meta = { "id": base, "created_utc": datetime.datetime.utcnow().isoformat() + "Z", "source_video": Path(video_src).name, "output_video": Path(out_mp4).name, "prompt": prompt or "", "watermark": WATERMARK_NOTE, "tool": "ShortiFoley (HunyuanVideo-Foley)" } (OUTPUTS_DIR / f"{base}.json").write_text(json.dumps(meta, ensure_ascii=False, indent=2)) return str(out_mp4) def _list_gallery(limit: int = 100) -> List[str]: vids = [] for p in sorted(OUTPUTS_DIR.glob("*.mp4"), key=lambda x: x.stat().st_mtime, reverse=True): vids.append(str(p)) if len(vids) >= limit: break return vids # ================ # Inference kernel # ================ @spaces.GPU(duration=GPU_DURATION) @torch.inference_mode() def infer_single_video( video_file: str, text_prompt: str, guidance_scale: float = 4.5, num_inference_steps: int = 50, sample_nums: int = 1, ) -> Tuple[List[str], str]: """ Generate Foley audio for an uploaded video (1–6 variants). Returns: (list of output video paths, status message) """ if _model_dict is None or _cfg is None: return [], "❌ Load the model first (open the app once)." if not video_file: return [], "❌ Please provide a video." sys.path.append(str(REPO_DIR)) from hunyuanvideo_foley.utils.feature_utils import feature_process from hunyuanvideo_foley.utils.model_utils import denoise_process # preprocess visual_feats, text_feats, audio_len_s = feature_process( video_file, (text_prompt or "").strip(), _model_dict, _cfg ) # generate batch n = int(max(1, min(6, sample_nums))) audio, sr = denoise_process( visual_feats, text_feats, audio_len_s, _model_dict, _cfg, guidance_scale=float(guidance_scale), num_inference_steps=int(num_inference_steps), batch_size=n, ) # save results outs = [] for i in range(n): outs.append(_save_outputs(video_file, audio[i], sr, i + 1, text_prompt or "")) return outs, f"✅ Generated {len(outs)} result(s). Saved to {OUTPUTS_DIR}/" # --------------- # MCP-only APIs # --------------- def _download_to_tmp(url: str) -> str: """Download a remote file to temp.""" try: import requests except Exception: raise RuntimeError("Missing dependency 'requests'. Add it to requirements.txt to use URL inputs.") r = requests.get(url, timeout=30) r.raise_for_status() tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") tmp.write(r.content) tmp.flush() tmp.close() return tmp.name def _maybe_from_base64(data_url_or_b64: str) -> str: """Accept data: URLs or raw base64; returns temp file path.""" b64 = data_url_or_b64 if data_url_or_b64.startswith("data:"): b64 = data_url_or_b64.split(",", 1)[-1] raw = base64.b64decode(b64) tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") tmp.write(raw) tmp.flush() tmp.close() return tmp.name def _normalize_video_input(video_url_or_b64: str) -> str: v = (video_url_or_b64 or "").strip() if v.startswith("http://") or v.startswith("https://"): return _download_to_tmp(v) return _maybe_from_base64(v) with gr.Blocks() as mcp_only_endpoints: gr.Markdown("These endpoints are MCP/API only and have no visible UI.", show_label=False) @gr.api def api_generate_from_url( video_url_or_b64: str, text_prompt: str = "", guidance_scale: float = 4.5, num_inference_steps: int = 50, sample_nums: int = 1, ) -> Dict[str, List[str]]: """ Generate Foley from a remote video URL or base64-encoded video. Returns: {"videos": [paths], "message": str} """ if _model_dict is None or _cfg is None: raise RuntimeError("Model not loaded. Open the UI once or call /load_model tool.") local = _normalize_video_input(video_url_or_b64) outs, msg = infer_single_video(local, text_prompt, guidance_scale, num_inference_steps, sample_nums) return {"videos": outs, "message": msg} @gr.api def load_model_tool() -> str: """Ensure model is loaded on server (MCP convenience).""" return auto_load_models() @gr.mcp.resource("shortifoley://status") def shortifoley_status() -> str: """Return a simple readiness string for MCP clients.""" ready = _model_dict is not None and _cfg is not None dev = "cuda" if (_device and _device.type == "cuda") else ("mps" if (_device and _device.type == "mps") else "cpu") return f"ShortiFoley status: {'ready' if ready else 'loading'} | device={dev} | outputs={OUTPUTS_DIR}" @gr.mcp.prompt() def foley_prompt(name: str = "default") -> str: """Reusable guidance for describing sound ambience.""" return ( "Describe the expected environmental sound precisely. Mention material, rhythm, intensity, and ambience.\n" "Example: 'Soft leather footfalls on wet pavement with distant traffic hiss; occasional splashes.'" ) # ------------- # Gradio UI # ------------- def _about_html() -> str: return f"""

About ShortiFoley

ShortiFoley automatically generates realistic Foley soundtracks for short videos using Tencent’s HunyuanVideo-Foley with CLAP & SigLIP2 encoders. It includes autosave and an MCP server so you can call it from agents or workflows (e.g., n8n).

Created by bilsimaging.com

How to use

  1. Upload a video (ideally < 120 seconds).
  2. Optionally enter a text description of the sound (English).
  3. Adjust CFG scale, steps, and number of variants.
  4. Click Generate. Results appear on the right and are stored in the Gallery.

Tips

MCP / Automation

This app runs as an MCP server. Open the footer “View API → MCP” to copy a ready config. You can also use the REST endpoints listed there. Perfect for n8n integrations.

Watermark

Each output’s metadata includes: {WATERMARK_NOTE}. If you want a visible video overlay, I can add an ffmpeg overlay step on request.

""" def create_ui() -> gr.Blocks: with gr.Blocks( title="ShortiFoley — HunyuanVideo-Foley", css=""" .main-header{ text-align:center; padding:1.2rem; border-radius:16px; background:linear-gradient(135deg,#667eea,#764ba2); color:white; } .card{ background:white; border:1px solid #e1e5e9; border-radius:16px; padding:1rem; box-shadow:0 8px 32px rgba(0,0,0,.06); } .generate-btn button{ font-weight:700; } """ ) as demo: gr.HTML(f"

{SPACE_TITLE}

{SPACE_TAGLINE}

") with gr.Tabs(): with gr.Tab("Run"): with gr.Row(): with gr.Column(scale=1, elem_classes=["card"]): gr.Markdown("### 📹 Input") video_input = gr.Video(label="Upload Video", height=300) text_input = gr.Textbox( label="🎯 Audio Description (optional, English)", placeholder="e.g., Rubber soles on wet tile, distant chatter.", lines=3 ) with gr.Row(): guidance_scale = gr.Slider(1.0, 10.0, value=4.5, step=0.1, label="CFG Scale") steps = gr.Slider(10, 100, value=50, step=5, label="Steps") samples = gr.Slider(1, 6, value=1, step=1, label="Variants") generate = gr.Button("🎵 Generate", variant="primary", elem_classes=["generate-btn"]) with gr.Column(scale=1, elem_classes=["card"]): gr.Markdown("### 🎥 Result(s)") v1 = gr.Video(label="Sample 1", height=260, visible=True) v2 = gr.Video(label="Sample 2", height=160, visible=False) v3 = gr.Video(label="Sample 3", height=160, visible=False) v4 = gr.Video(label="Sample 4", height=160, visible=False) v5 = gr.Video(label="Sample 5", height=160, visible=False) v6 = gr.Video(label="Sample 6", height=160, visible=False) status = gr.Textbox(label="Status", interactive=False) # Generate handler def _process_and_update(video_file, text_prompt, cfg, nsteps, nsamples): outs, msg = infer_single_video(video_file, text_prompt, cfg, nsteps, nsamples) vis_updates = [] for i in range(6): if i < len(outs): vis_updates.append(gr.update(visible=True, value=outs[i])) else: vis_updates.append(gr.update(visible=False, value=None)) gal_items = _list_gallery() return (*vis_updates, msg, gr.update(value=gal_items)) generate.click( fn=_process_and_update, inputs=[video_input, text_input, guidance_scale, steps, samples], outputs=[v1, v2, v3, v4, v5, v6, status, ], api_name="/infer", api_description="Generate Foley audio for an uploaded video. Returns up to 6 video+audio files." ) # Toggle visibility when # of samples changes def _toggle_vis(n): n = int(n) return [ gr.update(visible=True), gr.update(visible=n >= 2), gr.update(visible=n >= 3), gr.update(visible=n >= 4), gr.update(visible=n >= 5), gr.update(visible=n >= 6), ] samples.change(_toggle_vis, inputs=[samples], outputs=[v1, v2, v3, v4, v5, v6]) with gr.Tab("📁 Gallery"): gr.Markdown("Latest generated videos (autosaved to `outputs/`).") gallery = gr.Gallery( value=_list_gallery(), columns=3, preview=True, label="Saved Results" ) refresh = gr.Button("🔄 Refresh Gallery") refresh.click(lambda: gr.update(value=_list_gallery()), outputs=[gallery]) with gr.Tab("ℹ️ About"): gr.HTML(_about_html()) # Also expose gallery update after generate generate.click(lambda: gr.update(value=_list_gallery()), outputs=[gallery]) return demo def set_seeds(s: int = 1): random.seed(s) np.random.seed(s) torch.manual_seed(s) # ------------- # App bootstrap # ------------- if __name__ == "__main__": logger.remove() logger.add(lambda m: print(m, end=""), level="INFO") set_seeds(1) logger.info("===== Application Startup =====\n") prepare_once() # Ensure import paths after repo is present sys.path.append(str(REPO_DIR)) try: # Probe key modules early (better error surfacing) from hunyuanvideo_foley.utils.model_utils import load_model, denoise_process # noqa: F401 from hunyuanvideo_foley.utils.feature_utils import feature_process # noqa: F401 from hunyuanvideo_foley.utils.media_utils import merge_audio_video # noqa: F401 except Exception as e: logger.warning(f"Repo imports not ready yet: {e}") msg = auto_load_models() if not msg.startswith("✅"): logger.error(f"[BOOT][ERROR] auto_load_models() failed:\n{msg}") else: logger.info(msg) ui = create_ui() # Mount MCP-only endpoints alongside the UI ui.blocks.append(mcp_only_endpoints) # Enable MCP server so tools/resources/prompts are discoverable ui.launch( server_name="0.0.0.0", share=False, show_error=True, mcp_server=True, # MCP on )