# Created by bilsimaging.com import os os.environ.setdefault("HF_PREFER_SAFETENSORS", "1") import sys import json import base64 import random import tempfile import datetime from pathlib import Path from typing import List, Optional, Tuple, Dict import numpy as np import torch import torchaudio import gradio as gr from loguru import logger from huggingface_hub import snapshot_download import spaces # ------------------------- # Constants & configuration # ------------------------- ROOT = Path(__file__).parent.resolve() REPO_DIR = ROOT / "HunyuanVideo-Foley" WEIGHTS_DIR = Path(os.environ.get("HIFI_FOLEY_MODEL_PATH", str(ROOT / "weights"))) CONFIG_PATH = Path(os.environ.get("HIFI_FOLEY_CONFIG", str(REPO_DIR / "configs" / "hunyuanvideo-foley-xxl.yaml"))) # Always save into outputs/autosaved/ OUTPUTS_DIR = Path(os.environ.get("OUTPUTS_DIR", str(ROOT / "outputs" / "autosaved"))) OUTPUTS_DIR.mkdir(parents=True, exist_ok=True) SPACE_TITLE = "🎵 ShortiFoley — HunyuanVideo-Foley" SPACE_TAGLINE = "Text/Video → Audio Foley · Created by bilsimaging.com" WATERMARK_NOTE = "Made with ❤️ by bilsimaging.com" # ZeroGPU limit (<=120s recommended) GPU_DURATION = int(os.environ.get("GPU_DURATION_SECS", "110")) # Globals _model_dict = None _cfg = None _device: Optional[torch.device] = None # ------------ # Small helpers # ------------ def _setup_device(pref: str = "cpu", gpu_id: int = 0) -> torch.device: """ Safe device picker. IMPORTANT: Do NOT probe torch.cuda.is_available() here on Stateless GPU Spaces. Only request CUDA inside a @spaces.GPU function. """ if pref.startswith("cuda"): d = torch.device(f"cuda:{gpu_id}") elif pref == "mps": d = torch.device("mps") else: d = torch.device("cpu") logger.info(f"Using {d}") return d def _ensure_repo() -> None: """Shallow-clone Tencent repo with LFS smudge disabled (avoid LFS quota checkout).""" if REPO_DIR.exists(): return cmd = ( "GIT_LFS_SKIP_SMUDGE=1 " "git -c filter.lfs.smudge= -c filter.lfs.required=false " f"clone --depth 1 https://github.com/Tencent-Hunyuan/HunyuanVideo-Foley.git {REPO_DIR}" ) logger.info(f">> {cmd}") os.system(cmd) def _download_weights_if_needed() -> None: """Snapshot only needed files from HF weights/model hub.""" WEIGHTS_DIR.mkdir(parents=True, exist_ok=True) snapshot_download( repo_id="tencent/HunyuanVideo-Foley", local_dir=str(WEIGHTS_DIR), resume_download=True, allow_patterns=[ "hunyuanvideo_foley.pth", "synchformer_state_dict.pth", "vae_128d_48k.pth", "assets/*", "config.yaml", ], ) def prepare_once() -> None: _ensure_repo() _download_weights_if_needed() # ----------------------- # Model load & inference # ----------------------- def _force_fp32_on_modules(obj): """Ensure every torch.nn.Module inside obj is float32 to avoid half/float mismatches.""" try: import torch.nn as nn for name in dir(obj): try: m = getattr(obj, name) except Exception: continue if isinstance(m, nn.Module): m.float() if hasattr(obj, "foley_model"): obj.foley_model.float() if hasattr(obj, "dac_model"): obj.dac_model.float() if hasattr(obj, "siglip2_model"): obj.siglip2_model.float() if hasattr(obj, "clap_model"): obj.clap_model.float() if hasattr(obj, "syncformer_model"): obj.syncformer_model.float() except Exception as e: logger.warning(f"FP32 cast warning: {e}") def auto_load_models(device_str: str = "cpu") -> str: """ Load HunyuanVideo-Foley + encoders on the chosen device. Use device_str='cuda' ONLY inside @spaces.GPU to avoid CUDA init in main process. """ global _model_dict, _cfg, _device if _model_dict is not None and _cfg is not None: return "✅ Model already loaded." # Make absolutely sure safetensors is preferred os.environ["HF_PREFER_SAFETENSORS"] = "1" torch.set_float32_matmul_precision("high") # allow TF32 where possible sys.path.append(str(REPO_DIR)) from hunyuanvideo_foley.utils.model_utils import load_model _device = _setup_device(device_str, 0) logger.info("Loading HunyuanVideo-Foley model...") logger.info(f"MODEL_PATH: {WEIGHTS_DIR}") logger.info(f"CONFIG_PATH: {CONFIG_PATH}") try: _model_dict, _cfg = load_model(str(WEIGHTS_DIR), str(CONFIG_PATH), _device) # Force fp32 to fix: RuntimeError: Input type (Half) and bias (float) must match _force_fp32_on_modules(_model_dict) return "✅ Model loaded." except OSError as e: logger.error(str(e)) logger.info("Retrying after enforcing safetensors preference...") os.environ["HF_PREFER_SAFETENSORS"] = "1" try: _model_dict, _cfg = load_model(str(WEIGHTS_DIR), str(CONFIG_PATH), _device) _force_fp32_on_modules(_model_dict) return "✅ Model loaded (after safetensors retry)." except Exception as e2: logger.error(str(e2)) return f"❌ Failed to load model: {e2}" except Exception as e: logger.error(str(e)) return f"❌ Failed to load model: {e}" def _merge_audio_video(audio_path: str, video_path: str, out_path: str) -> None: """Preferred: project's util; fallback to ffmpeg.""" sys.path.append(str(REPO_DIR)) try: from hunyuanvideo_foley.utils.media_utils import merge_audio_video merge_audio_video(audio_path, video_path, out_path) except Exception as e: logger.warning(f"merge_audio_video failed, falling back to ffmpeg: {e}") import subprocess cmd = [ "ffmpeg", "-y", "-i", video_path, "-i", audio_path, "-c:v", "copy", "-c:a", "aac", "-shortest", out_path ] subprocess.run(cmd, check=True) def _save_outputs(video_src: str, audio_tensor: torch.Tensor, sr: int, idx: int, prompt: str) -> str: """Save WAV + MP4 in outputs/autosaved/, add metadata with a soft watermark note.""" # torchaudio expects [C, N] if audio_tensor.ndim == 1: audio_tensor = audio_tensor.unsqueeze(0) tmpdir = Path(tempfile.mkdtemp()) wav_path = tmpdir / f"gen_{idx}.wav" torchaudio.save(str(wav_path), audio_tensor.cpu(), sr) ts = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S_%f") base = f"shortifoley_{ts}_{idx}" out_mp4 = OUTPUTS_DIR / f"{base}.mp4" _merge_audio_video(str(wav_path), video_src, str(out_mp4)) # Sidecar JSON meta = { "id": base, "created_utc": datetime.datetime.utcnow().isoformat() + "Z", "source_video": Path(video_src).name, "output_video": Path(out_mp4).name, "prompt": prompt or "", "watermark_note": WATERMARK_NOTE, "tool": "ShortiFoley (HunyuanVideo-Foley)" } (OUTPUTS_DIR / f"{base}.json").write_text(json.dumps(meta, ensure_ascii=False, indent=2)) return str(out_mp4) def _list_gallery(limit: int = 100) -> List[str]: vids = [] for p in sorted(OUTPUTS_DIR.glob("*.mp4"), key=lambda x: x.stat().st_mtime, reverse=True): vids.append(str(p)) if len(vids) >= limit: break return vids # ================ # Inference kernel # ================ @spaces.GPU(duration=GPU_DURATION) @torch.inference_mode() def infer_single_video( video_file: str, text_prompt: str, guidance_scale: float = 4.5, num_inference_steps: int = 50, sample_nums: int = 1, ) -> Tuple[List[str], str]: """ Generate Foley audio for an uploaded video (1–6 variants). Returns: (list of output video paths, status message) """ # Lazy-load on GPU ONLY here (prevents CUDA init in main process) if _model_dict is None or _cfg is None: msg = auto_load_models(device_str="cuda") if not str(msg).startswith("✅"): return [], f"❌ {msg}" if not video_file: return [], "❌ Please provide a video." sys.path.append(str(REPO_DIR)) from hunyuanvideo_foley.utils.feature_utils import feature_process from hunyuanvideo_foley.utils.model_utils import denoise_process # Avoid autocast to float16 to fix Half/Float mismatch inside Synchformer conv3d with torch.autocast(device_type="cuda", enabled=False): # preprocess visual_feats, text_feats, audio_len_s = feature_process( video_file, (text_prompt or "").strip(), _model_dict, _cfg ) # generate batch n = int(max(1, min(6, sample_nums))) audio, sr = denoise_process( visual_feats, text_feats, audio_len_s, _model_dict, _cfg, guidance_scale=float(guidance_scale), num_inference_steps=int(num_inference_steps), batch_size=n, ) # save results outs = [] for i in range(n): outs.append(_save_outputs(video_file, audio[i], sr, i + 1, text_prompt or "")) return outs, f"✅ Generated {len(outs)} result(s). Saved to {OUTPUTS_DIR}/" # ------------- # Gradio UI (with MCP+API inside the same app) # ------------- def _about_html() -> str: return f"""

About ShortiFoley

ShortiFoley turns short videos into realistic Foley sound.
Powered by Tencent’s HunyuanVideo-Foley (SigLIP2 + CLAP), with autosave and an MCP server for automation (n8n flows).

Created by bilsimaging.com

Quick Steps

  1. Upload a clip (ideally < 120s).
  2. Optionally describe the sound (English).
  3. Pick variants (1–6), adjust CFG and steps.
  4. Hit Generate. Results show on the right and save into the Gallery.

Tips for Best Quality

MCP & API

This Space exposes an MCP server and simple REST endpoints (see “API & MCP” tab). Perfect for media-automation pipelines and tools like n8n.

""" def create_ui() -> gr.Blocks: css = """ .main-header{ text-align:center; padding:1.2rem; border-radius:18px; background:linear-gradient(135deg,#6366f1,#8b5cf6); color:white; box-shadow:0 12px 40px rgba(99,102,241,.35); margin-bottom:16px;} .main-header h1{ margin:0; font-size:2.0rem; font-weight:800;} .main-header p{ margin:.25rem 0 0; opacity:.95; font-weight:500;} .card{ background:white; border:1px solid #e7e9ef; border-radius:16px; padding:14px; box-shadow:0 10px 28px rgba(0,0,0,.06);} .generate-btn button{ font-weight:800; border-radius:12px; padding:10px 18px;} .minor-btn button{ border-radius:10px;} .muted{ color:#64748b; } .footer-text{ color:#64748b; text-align:center; padding:12px 0; font-size:.95rem; } """ with gr.Blocks(title="ShortiFoley — HunyuanVideo-Foley", css=css) as demo: gr.HTML(f"

{SPACE_TITLE}

{SPACE_TAGLINE}

") with gr.Tabs(): with gr.Tab("Run"): with gr.Row(): # LEFT: input with gr.Column(scale=1, elem_classes=["card"]): gr.Markdown("### 📹 Input") video_input = gr.Video(label="Upload Video", height=300) text_input = gr.Textbox( label="🎯 Audio Description (optional, English)", placeholder="e.g., Rubber soles on wet tile; distant chatter; occasional splashes.", lines=3 ) with gr.Row(): guidance_scale = gr.Slider(1.0, 10.0, value=4.5, step=0.1, label="CFG") steps = gr.Slider(10, 100, value=50, step=5, label="Steps") samples = gr.Slider(1, 6, value=1, step=1, label="Variants") with gr.Row(): load_btn = gr.Button("⚙️ Load model (CPU)", variant="secondary", elem_classes=["minor-btn"]) generate = gr.Button("🎵 Generate", variant="primary", elem_classes=["generate-btn"]) status = gr.Textbox(label="Status", interactive=False) # RIGHT: results with gr.Column(scale=1, elem_classes=["card"]): gr.Markdown("### 🎥 Result(s)") v1 = gr.Video(label="Sample 1", height=260, visible=True) with gr.Row(): v2 = gr.Video(label="Sample 2", height=160, visible=False) v3 = gr.Video(label="Sample 3", height=160, visible=False) with gr.Row(): v4 = gr.Video(label="Sample 4", height=160, visible=False) v5 = gr.Video(label="Sample 5", height=160, visible=False) v6 = gr.Video(label="Sample 6", height=160, visible=False) gr.Markdown("Autosaved to the Gallery tab.") # Generate handler (single binding, exact outputs) def _process_and_update(video_file, text_prompt, cfg, nsteps, nsamples): outs, msg = infer_single_video(video_file, text_prompt, cfg, nsteps, nsamples) vis = [] for i in range(6): if outs and i < len(outs): vis.append(gr.update(visible=True, value=outs[i])) else: vis.append(gr.update(visible=(i == 0), value=None if i > 0 else None)) return (*vis, msg) generate.click( fn=_process_and_update, inputs=[video_input, text_input, guidance_scale, steps, samples], outputs=[v1, v2, v3, v4, v5, v6, status], api_name="/infer", api_description="Generate Foley audio for an uploaded video. Returns up to 6 video+audio files." ) load_btn.click( fn=lambda: auto_load_models(device_str="cpu"), inputs=[], outputs=[status], api_name="/load_model", api_description="Load/initialize the ShortiFoley model and encoders on CPU (GPU loads during inference)." ) # Toggle visibility based on variants def _toggle_vis(n): n = int(n) return [ gr.update(visible=True), gr.update(visible=n >= 2), gr.update(visible=n >= 3), gr.update(visible=n >= 4), gr.update(visible=n >= 5), gr.update(visible=n >= 6), ] samples.change(_toggle_vis, inputs=[samples], outputs=[v1, v2, v3, v4, v5, v6]) with gr.Tab("📁 Gallery"): gr.Markdown("Latest generated videos (autosaved to outputs/autosaved/).") gallery = gr.Gallery( value=_list_gallery(), columns=3, preview=True, label="Saved Results" ) refresh = gr.Button("🔄 Refresh Gallery") refresh.click(lambda: _list_gallery(), outputs=[gallery]) with gr.Tab("API & MCP"): gr.Markdown(""" ### REST examples **POST** `/api_generate_from_url` ```json { "video_url_or_b64": "https://yourhost/sample.mp4", "text_prompt": "metallic clink; hollow room reverb", "guidance_scale": 4.5, "num_inference_steps": 50, "sample_nums": 2 } ``` **POST** `/load_model_tool` Loads the model proactively (useful before batch runs). **MCP resources & prompt** - `shortifoley://status` → quick health info - `foley_prompt` → reusable guidance for describing the sound Works great for media-automation in tools like **n8n**: call `load_model_tool` once, then `api_generate_from_url` for each clip. """) with gr.Tab("ℹ️ About"): gr.HTML(_about_html()) # Footer gr.HTML( """ """ ) # ---- REST + MCP endpoints (inside Blocks) ---- def _download_to_tmp(url: str) -> str: try: import requests except Exception: raise RuntimeError("Missing dependency 'requests'. Add it to requirements.txt to use URL inputs.") r = requests.get(url, timeout=30) r.raise_for_status() tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") tmp.write(r.content) tmp.flush() tmp.close() return tmp.name def _maybe_from_base64(data_url_or_b64: str) -> str: b64 = data_url_or_b64 if data_url_or_b64.startswith("data:"): b64 = data_url_or_b64.split(",", 1)[-1] raw = base64.b64decode(b64) tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") tmp.write(raw) tmp.flush() tmp.close() return tmp.name def _normalize_video_input(video_url_or_b64: str) -> str: v = (video_url_or_b64 or "").strip() if v.startswith("http://") or v.startswith("https://"): return _download_to_tmp(v) return _maybe_from_base64(v) @gr.api def api_generate_from_url( video_url_or_b64: str, text_prompt: str = "", guidance_scale: float = 4.5, num_inference_steps: int = 50, sample_nums: int = 1, ) -> Dict[str, List[str]]: if _model_dict is None or _cfg is None: msg = auto_load_models(device_str="cpu") # safe in HTTP context if not str(msg).startswith("✅"): raise RuntimeError(msg) local = _normalize_video_input(video_url_or_b64) outs, msg = infer_single_video(local, text_prompt, guidance_scale, num_inference_steps, sample_nums) return {"videos": outs, "message": msg} @gr.api def load_model_tool() -> str: """Ensure model is loaded on server (convenient for MCP/REST).""" return auto_load_models(device_str="cpu") @gr.mcp.resource("shortifoley://status") def shortifoley_status() -> str: """Return a simple readiness string for MCP clients.""" ready = _model_dict is not None and _cfg is not None dev = "cuda" if (_device and _device.type == "cuda") else ("mps" if (_device and _device.type == "mps") else "cpu") return f"ShortiFoley status: {'ready' if ready else 'loading'} | device={dev} | outputs={OUTPUTS_DIR}" @gr.mcp.prompt() def foley_prompt(name: str = "default") -> str: """Reusable guidance for describing sound ambience.""" return ( "Describe the expected environmental sound precisely. Mention material, rhythm, intensity, and ambience.\n" "Example: 'Soft leather footfalls on wet pavement with distant traffic hiss; occasional splashes.'" ) return demo def set_seeds(s: int = 1): random.seed(s) np.random.seed(s) torch.manual_seed(s) # ------------- # App bootstrap # ------------- if __name__ == "__main__": logger.remove() logger.add(lambda m: print(m, end=""), level="INFO") set_seeds(1) logger.info("===== Application Startup =====\n") prepare_once() # Probe imports (early surfacing) sys.path.append(str(REPO_DIR)) try: from hunyuanvideo_foley.utils.model_utils import load_model, denoise_process # noqa: F401 from hunyuanvideo_foley.utils.feature_utils import feature_process # noqa: F401 from hunyuanvideo_foley.utils.media_utils import merge_audio_video # noqa: F401 except Exception as e: logger.warning(f"Repo imports not ready yet: {e}") ui = create_ui() # Enable MCP server so tools/resources/prompts are discoverable ui.launch( server_name="0.0.0.0", share=False, show_error=True, mcp_server=True, # Enable MCP server )