Spaces:
Running
on
Zero
Running
on
Zero
# app.py β ShortiFoley (Video -> Foley) | |
# Created by bilsimaging.com | |
import os | |
import sys | |
import io | |
import json | |
import uuid | |
import time | |
import shutil | |
import base64 | |
import random | |
import tempfile | |
import datetime | |
from pathlib import Path | |
from typing import List, Optional, Tuple, Dict | |
import numpy as np | |
import torch | |
import torchaudio | |
import gradio as gr | |
from loguru import logger | |
from huggingface_hub import snapshot_download | |
import spaces # HF Spaces ZeroGPU & MCP integration | |
# ------------------------- | |
# Constants & configuration | |
# ------------------------- | |
ROOT = Path(__file__).parent.resolve() | |
REPO_DIR = ROOT / "HunyuanVideo-Foley" | |
WEIGHTS_DIR = Path(os.environ.get("HIFI_FOLEY_MODEL_PATH", str(ROOT / "weights"))) | |
CONFIG_PATH = Path(os.environ.get("HIFI_FOLEY_CONFIG", str(REPO_DIR / "configs" / "hunyuanvideo-foley-xxl.yaml"))) | |
OUTPUTS_DIR = Path(os.environ.get("OUTPUTS_DIR", str(ROOT / "outputs"))) | |
OUTPUTS_DIR.mkdir(parents=True, exist_ok=True) | |
SPACE_TITLE = "π΅ ShortiFoley β HunyuanVideo-Foley" | |
SPACE_TAGLINE = "Text/Video β Audio Foley. Created by bilsimaging.com" | |
WATERMARK_NOTE = "Made with β€οΈ by bilsimaging.com" | |
# Keep GPU <= 120s for ZeroGPU (default 110) | |
GPU_DURATION = int(os.environ.get("GPU_DURATION_SECS", "110")) | |
# Globals | |
_model_dict = None | |
_cfg = None | |
_device: Optional[torch.device] = None | |
# ------------ | |
# Small helpers | |
# ------------ | |
def _setup_device(pref: str = "auto", gpu_id: int = 0) -> torch.device: | |
"""Pick CUDA if available, else MPS, else CPU.""" | |
if pref == "auto": | |
if torch.cuda.is_available(): | |
d = torch.device(f"cuda:{gpu_id}") | |
elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): | |
d = torch.device("mps") | |
else: | |
d = torch.device("cpu") | |
else: | |
d = torch.device(pref) | |
logger.info(f"Using CUDA {d}" if d.type == "cuda" else f"Using {d}") | |
return d | |
def _ensure_repo() -> None: | |
"""Shallow-clone Tencent repo with LFS smudge disabled (avoid LFS quota checkout).""" | |
if REPO_DIR.exists(): | |
return | |
cmd = ( | |
"GIT_LFS_SKIP_SMUDGE=1 " | |
"git -c filter.lfs.smudge= -c filter.lfs.required=false " | |
f"clone --depth 1 https://github.com/Tencent-Hunyuan/HunyuanVideo-Foley.git {REPO_DIR}" | |
) | |
logger.info(f">> {cmd}") | |
os.system(cmd) | |
def _download_weights_if_needed() -> None: | |
"""Snapshot only needed files from HF weights/model hub.""" | |
WEIGHTS_DIR.mkdir(parents=True, exist_ok=True) | |
snapshot_download( | |
repo_id="tencent/HunyuanVideo-Foley", | |
local_dir=str(WEIGHTS_DIR), | |
resume_download=True, | |
allow_patterns=[ | |
"hunyuanvideo_foley.pth", | |
"synchformer_state_dict.pth", | |
"vae_128d_48k.pth", | |
"assets/*", | |
"config.yaml", # harmless | |
], | |
) | |
def prepare_once() -> None: | |
_ensure_repo() | |
_download_weights_if_needed() | |
# ----------------------- | |
# Model load & inference | |
# ----------------------- | |
def auto_load_models() -> str: | |
""" | |
Load HunyuanVideo-Foley + encoders on the chosen device. | |
""" | |
global _model_dict, _cfg, _device | |
if _model_dict is not None and _cfg is not None: | |
return "Model already loaded." | |
sys.path.append(str(REPO_DIR)) | |
from hunyuanvideo_foley.utils.model_utils import load_model | |
_device = _setup_device("auto", 0) | |
logger.info("Loading HunyuanVideo-Foley model...") | |
logger.info(f"MODEL_PATH: {WEIGHTS_DIR}") | |
logger.info(f"CONFIG_PATH: {CONFIG_PATH}") | |
try: | |
_model_dict, _cfg = load_model(str(WEIGHTS_DIR), str(CONFIG_PATH), _device) | |
return "β Model loaded." | |
except Exception as e: | |
logger.error(e) | |
return f"β Failed to load model: {e}" | |
def _merge_audio_video(audio_path: str, video_path: str, out_path: str) -> None: | |
"""Use project's helper (preferred) with a fallback to ffmpeg via subprocess.""" | |
sys.path.append(str(REPO_DIR)) | |
try: | |
from hunyuanvideo_foley.utils.media_utils import merge_audio_video | |
merge_audio_video(audio_path, video_path, out_path) | |
except Exception as e: | |
# Fallback: plain ffmpeg merge (assumes same duration or lets ffmpeg handle) | |
logger.warning(f"merge_audio_video failed, falling back to ffmpeg: {e}") | |
import subprocess | |
cmd = [ | |
"ffmpeg", "-y", | |
"-i", video_path, | |
"-i", audio_path, | |
"-c:v", "copy", | |
"-c:a", "aac", | |
"-shortest", | |
out_path | |
] | |
subprocess.run(cmd, check=True) | |
def _save_outputs(video_src: str, audio_tensor: torch.Tensor, sr: int, idx: int, | |
prompt: str) -> str: | |
"""Save WAV + MP4 in outputs/, add metadata and a small watermark note (metadata only).""" | |
# torchaudio expects [C, N] | |
if audio_tensor.ndim == 1: | |
audio_tensor = audio_tensor.unsqueeze(0) | |
tmpdir = Path(tempfile.mkdtemp()) | |
wav_path = tmpdir / f"gen_{idx}.wav" | |
torchaudio.save(str(wav_path), audio_tensor.cpu(), sr) | |
ts = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S_%f") | |
base = f"shortifoley_{ts}_{idx}" | |
out_mp4 = OUTPUTS_DIR / f"{base}.mp4" | |
_merge_audio_video(str(wav_path), video_src, str(out_mp4)) | |
# Save JSON sidecar | |
meta = { | |
"id": base, | |
"created_utc": datetime.datetime.utcnow().isoformat() + "Z", | |
"source_video": Path(video_src).name, | |
"output_video": Path(out_mp4).name, | |
"prompt": prompt or "", | |
"watermark": WATERMARK_NOTE, | |
"tool": "ShortiFoley (HunyuanVideo-Foley)" | |
} | |
(OUTPUTS_DIR / f"{base}.json").write_text(json.dumps(meta, ensure_ascii=False, indent=2)) | |
return str(out_mp4) | |
def _list_gallery(limit: int = 100) -> List[str]: | |
vids = [] | |
for p in sorted(OUTPUTS_DIR.glob("*.mp4"), key=lambda x: x.stat().st_mtime, reverse=True): | |
vids.append(str(p)) | |
if len(vids) >= limit: | |
break | |
return vids | |
# ================ | |
# Inference kernel | |
# ================ | |
def infer_single_video( | |
video_file: str, | |
text_prompt: str, | |
guidance_scale: float = 4.5, | |
num_inference_steps: int = 50, | |
sample_nums: int = 1, | |
) -> Tuple[List[str], str]: | |
""" | |
Generate Foley audio for an uploaded video (1β6 variants). | |
Returns: (list of output video paths, status message) | |
""" | |
if _model_dict is None or _cfg is None: | |
return [], "β Load the model first (open the app once)." | |
if not video_file: | |
return [], "β Please provide a video." | |
sys.path.append(str(REPO_DIR)) | |
from hunyuanvideo_foley.utils.feature_utils import feature_process | |
from hunyuanvideo_foley.utils.model_utils import denoise_process | |
# preprocess | |
visual_feats, text_feats, audio_len_s = feature_process( | |
video_file, (text_prompt or "").strip(), _model_dict, _cfg | |
) | |
# generate batch | |
n = int(max(1, min(6, sample_nums))) | |
audio, sr = denoise_process( | |
visual_feats, | |
text_feats, | |
audio_len_s, | |
_model_dict, | |
_cfg, | |
guidance_scale=float(guidance_scale), | |
num_inference_steps=int(num_inference_steps), | |
batch_size=n, | |
) | |
# save results | |
outs = [] | |
for i in range(n): | |
outs.append(_save_outputs(video_file, audio[i], sr, i + 1, text_prompt or "")) | |
return outs, f"β Generated {len(outs)} result(s). Saved to {OUTPUTS_DIR}/" | |
# --------------- | |
# MCP-only APIs | |
# --------------- | |
def _download_to_tmp(url: str) -> str: | |
"""Download a remote file to temp.""" | |
try: | |
import requests | |
except Exception: | |
raise RuntimeError("Missing dependency 'requests'. Add it to requirements.txt to use URL inputs.") | |
r = requests.get(url, timeout=30) | |
r.raise_for_status() | |
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") | |
tmp.write(r.content) | |
tmp.flush() | |
tmp.close() | |
return tmp.name | |
def _maybe_from_base64(data_url_or_b64: str) -> str: | |
"""Accept data: URLs or raw base64; returns temp file path.""" | |
b64 = data_url_or_b64 | |
if data_url_or_b64.startswith("data:"): | |
b64 = data_url_or_b64.split(",", 1)[-1] | |
raw = base64.b64decode(b64) | |
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") | |
tmp.write(raw) | |
tmp.flush() | |
tmp.close() | |
return tmp.name | |
def _normalize_video_input(video_url_or_b64: str) -> str: | |
v = (video_url_or_b64 or "").strip() | |
if v.startswith("http://") or v.startswith("https://"): | |
return _download_to_tmp(v) | |
return _maybe_from_base64(v) | |
with gr.Blocks() as mcp_only_endpoints: | |
gr.Markdown("These endpoints are MCP/API only and have no visible UI.", show_label=False) | |
def api_generate_from_url( | |
video_url_or_b64: str, | |
text_prompt: str = "", | |
guidance_scale: float = 4.5, | |
num_inference_steps: int = 50, | |
sample_nums: int = 1, | |
) -> Dict[str, List[str]]: | |
""" | |
Generate Foley from a remote video URL or base64-encoded video. | |
Returns: {"videos": [paths], "message": str} | |
""" | |
if _model_dict is None or _cfg is None: | |
raise RuntimeError("Model not loaded. Open the UI once or call /load_model tool.") | |
local = _normalize_video_input(video_url_or_b64) | |
outs, msg = infer_single_video(local, text_prompt, guidance_scale, num_inference_steps, sample_nums) | |
return {"videos": outs, "message": msg} | |
def load_model_tool() -> str: | |
"""Ensure model is loaded on server (MCP convenience).""" | |
return auto_load_models() | |
def shortifoley_status() -> str: | |
"""Return a simple readiness string for MCP clients.""" | |
ready = _model_dict is not None and _cfg is not None | |
dev = "cuda" if (_device and _device.type == "cuda") else ("mps" if (_device and _device.type == "mps") else "cpu") | |
return f"ShortiFoley status: {'ready' if ready else 'loading'} | device={dev} | outputs={OUTPUTS_DIR}" | |
def foley_prompt(name: str = "default") -> str: | |
"""Reusable guidance for describing sound ambience.""" | |
return ( | |
"Describe the expected environmental sound precisely. Mention material, rhythm, intensity, and ambience.\n" | |
"Example: 'Soft leather footfalls on wet pavement with distant traffic hiss; occasional splashes.'" | |
) | |
# ------------- | |
# Gradio UI | |
# ------------- | |
def _about_html() -> str: | |
return f""" | |
<div style="line-height:1.6"> | |
<h2>About ShortiFoley</h2> | |
<p><b>ShortiFoley</b> automatically generates realistic Foley soundtracks for short videos using | |
Tencentβs HunyuanVideo-Foley with CLAP & SigLIP2 encoders. It includes autosave and an MCP server so | |
you can call it from agents or workflows (e.g., n8n).</p> | |
<p><b>Created by <a href="https://bilsimaging.com" target="_blank">bilsimaging.com</a></b></p> | |
<h3>How to use</h3> | |
<ol> | |
<li>Upload a video (ideally < 120 seconds).</li> | |
<li>Optionally enter a text description of the sound (English).</li> | |
<li>Adjust CFG scale, steps, and number of variants.</li> | |
<li>Click <b>Generate</b>. Results appear on the right and are stored in the Gallery.</li> | |
</ol> | |
<h3>Tips</h3> | |
<ul> | |
<li>Trim clips to the key action (5β30s) for faster, crisper results.</li> | |
<li>Include material cues (βwoodβ, βmetalβ, βconcreteβ), action cues (βsplashβ, βglass shatterβ), and ambience (βroomyβ, βechoeyβ).</li> | |
<li>Generate multiple variants and pick the most natural.</li> | |
</ul> | |
<h3>MCP / Automation</h3> | |
<p>This app runs as an <b>MCP server</b>. Open the footer βView API β MCPβ to copy a ready config. You can also use the REST endpoints listed there. Perfect for n8n integrations.</p> | |
<h3>Watermark</h3> | |
<p>Each outputβs metadata includes: <i>{WATERMARK_NOTE}</i>. If you want a <b>visible video overlay</b>, I can add an ffmpeg overlay step on request.</p> | |
</div> | |
""" | |
def create_ui() -> gr.Blocks: | |
with gr.Blocks( | |
title="ShortiFoley β HunyuanVideo-Foley", | |
css=""" | |
.main-header{ text-align:center; padding:1.2rem; border-radius:16px; background:linear-gradient(135deg,#667eea,#764ba2); color:white; } | |
.card{ background:white; border:1px solid #e1e5e9; border-radius:16px; padding:1rem; box-shadow:0 8px 32px rgba(0,0,0,.06); } | |
.generate-btn button{ font-weight:700; } | |
""" | |
) as demo: | |
gr.HTML(f"<div class='main-header'><h1>{SPACE_TITLE}</h1><p>{SPACE_TAGLINE}</p></div>") | |
with gr.Tabs(): | |
with gr.Tab("Run"): | |
with gr.Row(): | |
with gr.Column(scale=1, elem_classes=["card"]): | |
gr.Markdown("### πΉ Input") | |
video_input = gr.Video(label="Upload Video", height=300) | |
text_input = gr.Textbox( | |
label="π― Audio Description (optional, English)", | |
placeholder="e.g., Rubber soles on wet tile, distant chatter.", | |
lines=3 | |
) | |
with gr.Row(): | |
guidance_scale = gr.Slider(1.0, 10.0, value=4.5, step=0.1, label="CFG Scale") | |
steps = gr.Slider(10, 100, value=50, step=5, label="Steps") | |
samples = gr.Slider(1, 6, value=1, step=1, label="Variants") | |
generate = gr.Button("π΅ Generate", variant="primary", elem_classes=["generate-btn"]) | |
with gr.Column(scale=1, elem_classes=["card"]): | |
gr.Markdown("### π₯ Result(s)") | |
v1 = gr.Video(label="Sample 1", height=260, visible=True) | |
v2 = gr.Video(label="Sample 2", height=160, visible=False) | |
v3 = gr.Video(label="Sample 3", height=160, visible=False) | |
v4 = gr.Video(label="Sample 4", height=160, visible=False) | |
v5 = gr.Video(label="Sample 5", height=160, visible=False) | |
v6 = gr.Video(label="Sample 6", height=160, visible=False) | |
status = gr.Textbox(label="Status", interactive=False) | |
# Generate handler | |
def _process_and_update(video_file, text_prompt, cfg, nsteps, nsamples): | |
outs, msg = infer_single_video(video_file, text_prompt, cfg, nsteps, nsamples) | |
vis_updates = [] | |
for i in range(6): | |
if i < len(outs): | |
vis_updates.append(gr.update(visible=True, value=outs[i])) | |
else: | |
vis_updates.append(gr.update(visible=False, value=None)) | |
gal_items = _list_gallery() | |
return (*vis_updates, msg, gr.update(value=gal_items)) | |
generate.click( | |
fn=_process_and_update, | |
inputs=[video_input, text_input, guidance_scale, steps, samples], | |
outputs=[v1, v2, v3, v4, v5, v6, status, ], | |
api_name="/infer", | |
api_description="Generate Foley audio for an uploaded video. Returns up to 6 video+audio files." | |
) | |
# Toggle visibility when # of samples changes | |
def _toggle_vis(n): | |
n = int(n) | |
return [ | |
gr.update(visible=True), | |
gr.update(visible=n >= 2), | |
gr.update(visible=n >= 3), | |
gr.update(visible=n >= 4), | |
gr.update(visible=n >= 5), | |
gr.update(visible=n >= 6), | |
] | |
samples.change(_toggle_vis, inputs=[samples], outputs=[v1, v2, v3, v4, v5, v6]) | |
with gr.Tab("π Gallery"): | |
gr.Markdown("Latest generated videos (autosaved to `outputs/`).") | |
gallery = gr.Gallery( | |
value=_list_gallery(), | |
columns=3, | |
preview=True, | |
label="Saved Results" | |
) | |
refresh = gr.Button("π Refresh Gallery") | |
refresh.click(lambda: gr.update(value=_list_gallery()), outputs=[gallery]) | |
with gr.Tab("βΉοΈ About"): | |
gr.HTML(_about_html()) | |
# Also expose gallery update after generate | |
generate.click(lambda: gr.update(value=_list_gallery()), outputs=[gallery]) | |
return demo | |
def set_seeds(s: int = 1): | |
random.seed(s) | |
np.random.seed(s) | |
torch.manual_seed(s) | |
# ------------- | |
# App bootstrap | |
# ------------- | |
if __name__ == "__main__": | |
logger.remove() | |
logger.add(lambda m: print(m, end=""), level="INFO") | |
set_seeds(1) | |
logger.info("===== Application Startup =====\n") | |
prepare_once() | |
# Ensure import paths after repo is present | |
sys.path.append(str(REPO_DIR)) | |
try: | |
# Probe key modules early (better error surfacing) | |
from hunyuanvideo_foley.utils.model_utils import load_model, denoise_process # noqa: F401 | |
from hunyuanvideo_foley.utils.feature_utils import feature_process # noqa: F401 | |
from hunyuanvideo_foley.utils.media_utils import merge_audio_video # noqa: F401 | |
except Exception as e: | |
logger.warning(f"Repo imports not ready yet: {e}") | |
msg = auto_load_models() | |
if not msg.startswith("β "): | |
logger.error(f"[BOOT][ERROR] auto_load_models() failed:\n{msg}") | |
else: | |
logger.info(msg) | |
ui = create_ui() | |
# Mount MCP-only endpoints alongside the UI | |
ui.blocks.append(mcp_only_endpoints) | |
# Enable MCP server so tools/resources/prompts are discoverable | |
ui.launch( | |
server_name="0.0.0.0", | |
share=False, | |
show_error=True, | |
mcp_server=True, # MCP on | |
) | |