# ShortiFoley # Created by bilsimaging.com import os os.environ.setdefault("HF_PREFER_SAFETENSORS", "1") import sys import json import base64 import random import tempfile import datetime from pathlib import Path from typing import List, Optional, Tuple, Dict import numpy as np import torch import torchaudio import gradio as gr from loguru import logger from huggingface_hub import snapshot_download import spaces # ------------------------- # Constants & configuration # ------------------------- ROOT = Path(__file__).parent.resolve() REPO_DIR = ROOT / "HunyuanVideo-Foley" WEIGHTS_DIR = Path(os.environ.get("HIFI_FOLEY_MODEL_PATH", str(ROOT / "weights"))) CONFIG_PATH = Path(os.environ.get("HIFI_FOLEY_CONFIG", str(REPO_DIR / "configs" / "hunyuanvideo-foley-xxl.yaml"))) OUTPUTS_DIR = Path(os.environ.get("OUTPUTS_DIR", str(ROOT / "outputs"))) OUTPUTS_DIR.mkdir(parents=True, exist_ok=True) SPACE_TITLE = "🎵 ShortiFoley — HunyuanVideo-Foley" SPACE_TAGLINE = "Bring your videos to life with AI-powered Foley" WATERMARK_NOTE = "Made with ❤️ by bilsimaging.com" # ZeroGPU limit (<=120) GPU_DURATION = int(os.environ.get("GPU_DURATION_SECS", "110")) # Globals (NO CUDA INIT HERE) _model_dict = None _cfg = None _device: Optional[torch.device] = None # ------------ # Small helpers # ------------ def _ensure_repo() -> None: """Shallow-clone Tencent repo with LFS smudge disabled (avoid LFS quota checkout).""" if REPO_DIR.exists(): return cmd = ( "GIT_LFS_SKIP_SMUDGE=1 " "git -c filter.lfs.smudge= -c filter.lfs.required=false " f"clone --depth 1 https://github.com/Tencent-Hunyuan/HunyuanVideo-Foley.git {REPO_DIR}" ) logger.info(f">> {cmd}") os.system(cmd) def _download_weights_if_needed() -> None: """Snapshot only needed files from HF weights/model hub.""" WEIGHTS_DIR.mkdir(parents=True, exist_ok=True) snapshot_download( repo_id="tencent/HunyuanVideo-Foley", local_dir=str(WEIGHTS_DIR), resume_download=True, allow_patterns=[ "hunyuanvideo_foley.pth", "synchformer_state_dict.pth", "vae_128d_48k.pth", "assets/*", "config.yaml", ], ) def prepare_once() -> None: _ensure_repo() _download_weights_if_needed() # ----------------------- # Model load & inference # ----------------------- def auto_load_models(device: Optional[torch.device] = None) -> str: """ Load HunyuanVideo-Foley + encoders on the given device. MUST be called only inside a @spaces.GPU context with device=cuda:0. """ global _model_dict, _cfg, _device if _model_dict is not None and _cfg is not None: return "✅ Model already loaded." # DO NOT probe CUDA here unless device is passed from GPU context if device is None: return "❌ Load the model inside a GPU task first (use the Load button or run Generate)." os.environ["HF_PREFER_SAFETENSORS"] = "1" # enforce again for safety sys.path.append(str(REPO_DIR)) from hunyuanvideo_foley.utils.model_utils import load_model _device = device logger.info("Loading HunyuanVideo-Foley model...") logger.info(f"MODEL_PATH: {WEIGHTS_DIR}") logger.info(f"CONFIG_PATH: {CONFIG_PATH}") logger.info(f"TARGET_DEVICE: {_device}") try: _model_dict, _cfg = load_model(str(WEIGHTS_DIR), str(CONFIG_PATH), _device) return "✅ Model loaded." except OSError as e: logger.error(str(e)) logger.info("Retrying after enforcing safetensors preference...") os.environ["HF_PREFER_SAFETENSORS"] = "1" try: _model_dict, _cfg = load_model(str(WEIGHTS_DIR), str(CONFIG_PATH), _device) return "✅ Model loaded (after safetensors retry)." except Exception as e2: logger.error(str(e2)) return f"❌ Failed to load model: {e2}" except Exception as e: logger.error(str(e)) return f"❌ Failed to load model: {e}" def _merge_audio_video(audio_path: str, video_path: str, out_path: str) -> None: """Preferred: project’s util; fallback to ffmpeg.""" sys.path.append(str(REPO_DIR)) try: from hunyuanvideo_foley.utils.media_utils import merge_audio_video merge_audio_video(audio_path, video_path, out_path) except Exception as e: logger.warning(f"merge_audio_video failed, falling back to ffmpeg: {e}") import subprocess cmd = [ "ffmpeg", "-y", "-i", video_path, "-i", audio_path, "-c:v", "copy", "-c:a", "aac", "-shortest", out_path ] subprocess.run(cmd, check=True) def _save_outputs(video_src: str, audio_tensor: torch.Tensor, sr: int, idx: int, prompt: str) -> str: """Save WAV + MP4 in outputs/, add metadata with a soft watermark note.""" # torchaudio expects [C, N] if audio_tensor.ndim == 1: audio_tensor = audio_tensor.unsqueeze(0) tmpdir = Path(tempfile.mkdtemp()) wav_path = tmpdir / f"gen_{idx}.wav" torchaudio.save(str(wav_path), audio_tensor.cpu(), sr) ts = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S_%f") base = f"shortifoley_{ts}_{idx}" out_mp4 = OUTPUTS_DIR / f"{base}.mp4" _merge_audio_video(str(wav_path), video_src, str(out_mp4)) # Sidecar JSON meta = { "id": base, "created_utc": datetime.datetime.utcnow().isoformat() + "Z", "source_video": Path(video_src).name, "output_video": Path(out_mp4).name, "prompt": prompt or "", "watermark_note": WATERMARK_NOTE, "tool": "ShortiFoley (HunyuanVideo-Foley)" } (OUTPUTS_DIR / f"{base}.json").write_text(json.dumps(meta, ensure_ascii=False, indent=2)) return str(out_mp4) def _list_gallery(limit: int = 100) -> List[str]: vids = [] for p in sorted(OUTPUTS_DIR.glob("*.mp4"), key=lambda x: x.stat().st_mtime, reverse=True): vids.append(str(p)) if len(vids) >= limit: break return vids # ================ # Inference kernel # ================ @spaces.GPU(duration=GPU_DURATION) @torch.inference_mode() def infer_single_video( video_file: str, text_prompt: str, guidance_scale: float = 4.5, num_inference_steps: int = 50, sample_nums: int = 1, ) -> Tuple[List[str], str]: """ Generate Foley audio for an uploaded video (1–6 variants). Returns: (list of output video paths, status message) """ # Safe: inside GPU context, we can use CUDA device = torch.device("cuda:0") # Lazy-load if needed on GPU if _model_dict is None or _cfg is None: msg = auto_load_models(device) if not str(msg).startswith("✅"): return [], f"❌ {msg}" if not video_file: return [], "❌ Please provide a video." sys.path.append(str(REPO_DIR)) from hunyuanvideo_foley.utils.feature_utils import feature_process from hunyuanvideo_foley.utils.model_utils import denoise_process # preprocess visual_feats, text_feats, audio_len_s = feature_process( video_file, (text_prompt or "").strip(), _model_dict, _cfg ) # generate batch n = int(max(1, min(6, sample_nums))) audio, sr = denoise_process( visual_feats, text_feats, audio_len_s, _model_dict, _cfg, guidance_scale=float(guidance_scale), num_inference_steps=int(num_inference_steps), batch_size=n, ) # save results outs = [] for i in range(n): outs.append(_save_outputs(video_file, audio[i], sr, i + 1, text_prompt or "")) return outs, f"✅ Generated {len(outs)} result(s). Saved to {OUTPUTS_DIR}/" @spaces.GPU(duration=GPU_DURATION) def gpu_load_models() -> str: device = torch.device("cuda:0") return auto_load_models(device) # ------------- # Gradio UI (with MCP + REST endpoints) # ------------- def _about_html() -> str: return f"""

About ShortiFoley

ShortiFoley turns short videos (and an optional text hint) into realistic Foley sound. Powered by Tencent’s HunyuanVideo-Foley (SigLIP2 + CLAP), with autosave and an MCP server for automation (e.g., n8n).

Created by bilsimaging.com

Quick Steps

  1. Upload a clip (ideally < 120s).
  2. Optionally describe the sound (English).
  3. Pick variants (1–6), adjust CFG and steps.
  4. Hit Generate. Results show on the right and save into the Gallery.

Tips for Best Quality

MCP & API

This Space exposes an MCP server and simple REST endpoints (see “API & MCP” tab). Perfect for pipelines and tools like n8n.

""" def create_ui() -> gr.Blocks: css = """ .main-header{ text-align:center; padding:1.2rem; border-radius:18px; background:linear-gradient(135deg,#6366f1,#8b5cf6); color:white; box-shadow:0 12px 40px rgba(99,102,241,.35); margin-bottom:16px;} .main-header h1{ margin:0; font-size:2.0rem; font-weight:800;} .main-header p{ margin:.25rem 0 0; opacity:.95; font-weight:500;} .card{ background:white; border:1px solid #e7e9ef; border-radius:16px; padding:14px; box-shadow:0 10px 28px rgba(0,0,0,.06);} .generate-btn button{ font-weight:800; border-radius:12px; padding:10px 18px;} .minor-btn button{ border-radius:10px;} .muted{ color:#64748b; } .footer-text{ margin-top:16px; text-align:center; color:#475569; font-size:.95rem;} """ with gr.Blocks(title="ShortiFoley — HunyuanVideo-Foley", css=css) as demo: gr.HTML(f"

{SPACE_TITLE}

{SPACE_TAGLINE}

") with gr.Tabs(): with gr.Tab("Run"): with gr.Row(): # LEFT: input with gr.Column(scale=1, elem_classes=["card"]): gr.Markdown("### 📹 Input") video_input = gr.Video(label="Upload Video", height=300) text_input = gr.Textbox( label="🎯 Audio Description (optional, English)", placeholder="e.g., Rubber soles on wet tile; distant chatter; occasional splashes.", lines=3 ) with gr.Row(): guidance_scale = gr.Slider(1.0, 10.0, value=4.5, step=0.1, label="CFG") steps = gr.Slider(10, 100, value=50, step=5, label="Steps") samples = gr.Slider(1, 6, value=1, step=1, label="Variants") with gr.Row(): load_btn = gr.Button("⚙️ Load model", variant="secondary", elem_classes=["minor-btn"]) generate = gr.Button("🎵 Generate", variant="primary", elem_classes=["generate-btn"]) status = gr.Textbox(label="Status", interactive=False) # RIGHT: results with gr.Column(scale=1, elem_classes=["card"]): gr.Markdown("### 🎥 Result(s)") v1 = gr.Video(label="Sample 1", height=260, visible=True) with gr.Row(): v2 = gr.Video(label="Sample 2", height=160, visible=False) v3 = gr.Video(label="Sample 3", height=160, visible=False) with gr.Row(): v4 = gr.Video(label="Sample 4", height=160, visible=False) v5 = gr.Video(label="Sample 5", height=160, visible=False) v6 = gr.Video(label="Sample 6", height=160, visible=False) gr.Markdown("Autosaved to the Gallery tab.") # Generate handler def _process_and_update(video_file, text_prompt, cfg, nsteps, nsamples): outs, msg = infer_single_video(video_file, text_prompt, cfg, nsteps, nsamples) vis = [] for i in range(6): if i < len(outs): vis.append(gr.update(visible=True, value=outs[i])) else: vis.append(gr.update(visible=False, value=None)) return (*vis, msg) gen_evt = generate.click( fn=_process_and_update, inputs=[video_input, text_input, guidance_scale, steps, samples], outputs=[v1, v2, v3, v4, v5, v6, status], api_name="/infer", api_description="Generate Foley audio for an uploaded video. Returns up to 6 video+audio files." ) # Load model (GPU-safe) load_btn.click( fn=gpu_load_models, inputs=[], outputs=[status], api_name="/load_model", api_description="Load/initialize the ShortiFoley model and encoders (runs on GPU)." ) # Toggle visibility based on variants def _toggle_vis(n): n = int(n) return [ gr.update(visible=True), gr.update(visible=n >= 2), gr.update(visible=n >= 3), gr.update(visible=n >= 4), gr.update(visible=n >= 5), gr.update(visible=n >= 6), ] samples.change(_toggle_vis, inputs=[samples], outputs=[v1, v2, v3, v4, v5, v6]) with gr.Tab("📁 Gallery"): gr.Markdown("Latest generated videos (autosaved to `outputs/`).") gallery = gr.Gallery( value=_list_gallery(), columns=3, preview=True, label="Saved Results" ) refresh = gr.Button("🔄 Refresh Gallery") def _refresh_gallery(): return gr.update(value=_list_gallery()) # Refresh via button refresh.click(_refresh_gallery, outputs=[gallery]) # Also refresh after generation finishes gen_evt.then(_refresh_gallery, inputs=None, outputs=[gallery]) with gr.Tab("API & MCP"): gr.Markdown( "### REST examples\n\n" "**POST** `api_generate_from_url`\n" "```json\n" "{\n" ' "video_url_or_b64": "https://yourhost/sample.mp4",\n' ' "text_prompt": "metallic clink; hollow room reverb",\n' ' "guidance_scale": 4.5,\n' ' "num_inference_steps": 50,\n' ' "sample_nums": 2\n' "}\n" "```\n\n" "**POST** `load_model_tool` — loads the model proactively.\n\n" "### MCP resources & prompt\n" "- `shortifoley://status` → quick health info\n" "- `foley_prompt` → reusable guidance for describing the sound\n\n" "Works with n8n: call `load_model_tool` once, then `api_generate_from_url` per clip." ) with gr.Tab("ℹ️ About"): gr.HTML(_about_html()) # Footer gr.HTML(""" """) # ---- REST + MCP endpoints (inside Blocks) ---- def _download_to_tmp(url: str) -> str: try: import requests except Exception: raise RuntimeError("Missing dependency 'requests'. Add it to requirements.txt to use URL inputs.") r = requests.get(url, timeout=30) r.raise_for_status() tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") tmp.write(r.content) tmp.flush() tmp.close() return tmp.name def _maybe_from_base64(data_url_or_b64: str) -> str: b64 = data_url_or_b64 if data_url_or_b64.startswith("data:"): b64 = data_url_or_b64.split(",", 1)[-1] raw = base64.b64decode(b64) tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") tmp.write(raw) tmp.flush() tmp.close() return tmp.name def _normalize_video_input(video_url_or_b64: str) -> str: v = (video_url_or_b64 or "").strip() if v.startswith("http://") or v.startswith("https://"): return _download_to_tmp(v) return _maybe_from_base64(v) @gr.api def api_generate_from_url( video_url_or_b64: str, text_prompt: str = "", guidance_scale: float = 4.5, num_inference_steps: int = 50, sample_nums: int = 1, ) -> Dict[str, List[str]]: # Ensure model is ready (GPU-safe path) if _model_dict is None or _cfg is None: _ = gpu_load_models() local = _normalize_video_input(video_url_or_b64) outs, msg = infer_single_video(local, text_prompt, guidance_scale, num_inference_steps, sample_nums) return {"videos": outs, "message": msg} @gr.api def load_model_tool() -> str: """Ensure model is loaded on server (convenient for MCP/REST).""" return gpu_load_models() @gr.mcp.resource("shortifoley://status") def shortifoley_status() -> str: """Return a simple readiness string for MCP clients.""" ready = _model_dict is not None and _cfg is not None dev = "cuda" if (_device and _device.type == "cuda") else ("mps" if (_device and _device.type == "mps") else "cpu") return f"ShortiFoley status: {'ready' if ready else 'idle'} | device={dev} | outputs={OUTPUTS_DIR}" @gr.mcp.prompt() def foley_prompt(name: str = "default") -> str: """Reusable guidance for describing sound ambience.""" return ( "Describe the expected environmental sound precisely. Mention material, rhythm, intensity, and ambience.\n" "Example: 'Soft leather footfalls on wet pavement with distant traffic hiss; occasional splashes.'" ) # IMPORTANT: Do NOT auto-load models here to avoid CUDA init in main process demo.load(lambda: "Ready. Click 'Load model' or 'Generate' to start.", inputs=None, outputs=None) return demo def set_seeds(s: int = 1): random.seed(s) np.random.seed(s) torch.manual_seed(s) # ------------- # App bootstrap (CPU only) # ------------- if __name__ == "__main__": logger.remove() logger.add(lambda m: print(m, end=""), level="INFO") set_seeds(1) logger.info("===== Application Startup =====\n") prepare_once() # Probe imports (early surfacing) — CPU-safe sys.path.append(str(REPO_DIR)) try: from hunyuanvideo_foley.utils.model_utils import load_model, denoise_process # noqa: F401 from hunyuanvideo_foley.utils.feature_utils import feature_process # noqa: F401 from hunyuanvideo_foley.utils.media_utils import merge_audio_video # noqa: F401 except Exception as e: logger.warning(f"Repo imports not ready yet: {e}") ui = create_ui() # Enable MCP server so tools/resources/prompts are discoverable ui.launch( server_name="0.0.0.0", share=False, show_error=True, mcp_server=True, # MCP )