ShortiFoley / app.py
Bils's picture
Update app.py
aa644be verified
# Created by bilsimaging.com
import os
os.environ.setdefault("HF_PREFER_SAFETENSORS", "1")
import sys
import json
import base64
import random
import tempfile
import datetime
from pathlib import Path
from typing import List, Optional, Tuple, Dict
import numpy as np
import torch
import torchaudio
import gradio as gr
from loguru import logger
from huggingface_hub import snapshot_download
import spaces
# -------------------------
# Constants & configuration
# -------------------------
ROOT = Path(__file__).parent.resolve()
REPO_DIR = ROOT / "HunyuanVideo-Foley"
WEIGHTS_DIR = Path(os.environ.get("HIFI_FOLEY_MODEL_PATH", str(ROOT / "weights")))
CONFIG_PATH = Path(os.environ.get("HIFI_FOLEY_CONFIG", str(REPO_DIR / "configs" / "hunyuanvideo-foley-xxl.yaml")))
# Always save into outputs/autosaved/
OUTPUTS_DIR = Path(os.environ.get("OUTPUTS_DIR", str(ROOT / "outputs" / "autosaved")))
OUTPUTS_DIR.mkdir(parents=True, exist_ok=True)
SPACE_TITLE = "🎡 ShortiFoley β€” HunyuanVideo-Foley"
SPACE_TAGLINE = "Text/Video β†’ Audio Foley Β· Created by bilsimaging.com"
WATERMARK_NOTE = "Made with ❀️ by bilsimaging.com"
# ZeroGPU limit (<=120s recommended)
GPU_DURATION = int(os.environ.get("GPU_DURATION_SECS", "110"))
# Globals
_model_dict = None
_cfg = None
_device: Optional[torch.device] = None
# ------------
# Small helpers
# ------------
def _setup_device(pref: str = "cpu", gpu_id: int = 0) -> torch.device:
"""
Safe device picker.
IMPORTANT: Do NOT probe torch.cuda.is_available() here on Stateless GPU Spaces.
Only request CUDA inside a @spaces.GPU function.
"""
if pref.startswith("cuda"):
d = torch.device(f"cuda:{gpu_id}")
elif pref == "mps":
d = torch.device("mps")
else:
d = torch.device("cpu")
logger.info(f"Using {d}")
return d
def _ensure_repo() -> None:
"""Shallow-clone Tencent repo with LFS smudge disabled (avoid LFS quota checkout)."""
if REPO_DIR.exists():
return
cmd = (
"GIT_LFS_SKIP_SMUDGE=1 "
"git -c filter.lfs.smudge= -c filter.lfs.required=false "
f"clone --depth 1 https://github.com/Tencent-Hunyuan/HunyuanVideo-Foley.git {REPO_DIR}"
)
logger.info(f">> {cmd}")
os.system(cmd)
def _download_weights_if_needed() -> None:
"""Snapshot only needed files from HF weights/model hub."""
WEIGHTS_DIR.mkdir(parents=True, exist_ok=True)
snapshot_download(
repo_id="tencent/HunyuanVideo-Foley",
local_dir=str(WEIGHTS_DIR),
resume_download=True,
allow_patterns=[
"hunyuanvideo_foley.pth",
"synchformer_state_dict.pth",
"vae_128d_48k.pth",
"assets/*",
"config.yaml",
],
)
def prepare_once() -> None:
_ensure_repo()
_download_weights_if_needed()
# -----------------------
# Model load & inference
# -----------------------
def _force_fp32_on_modules(obj):
"""Ensure every torch.nn.Module inside obj is float32 to avoid half/float mismatches."""
try:
import torch.nn as nn
for name in dir(obj):
try:
m = getattr(obj, name)
except Exception:
continue
if isinstance(m, nn.Module):
m.float()
if hasattr(obj, "foley_model"): obj.foley_model.float()
if hasattr(obj, "dac_model"): obj.dac_model.float()
if hasattr(obj, "siglip2_model"): obj.siglip2_model.float()
if hasattr(obj, "clap_model"): obj.clap_model.float()
if hasattr(obj, "syncformer_model"): obj.syncformer_model.float()
except Exception as e:
logger.warning(f"FP32 cast warning: {e}")
def auto_load_models(device_str: str = "cpu") -> str:
"""
Load HunyuanVideo-Foley + encoders on the chosen device.
Use device_str='cuda' ONLY inside @spaces.GPU to avoid CUDA init in main process.
"""
global _model_dict, _cfg, _device
if _model_dict is not None and _cfg is not None:
return "βœ… Model already loaded."
# Make absolutely sure safetensors is preferred
os.environ["HF_PREFER_SAFETENSORS"] = "1"
torch.set_float32_matmul_precision("high") # allow TF32 where possible
sys.path.append(str(REPO_DIR))
from hunyuanvideo_foley.utils.model_utils import load_model
_device = _setup_device(device_str, 0)
logger.info("Loading HunyuanVideo-Foley model...")
logger.info(f"MODEL_PATH: {WEIGHTS_DIR}")
logger.info(f"CONFIG_PATH: {CONFIG_PATH}")
try:
_model_dict, _cfg = load_model(str(WEIGHTS_DIR), str(CONFIG_PATH), _device)
# Force fp32 to fix: RuntimeError: Input type (Half) and bias (float) must match
_force_fp32_on_modules(_model_dict)
return "βœ… Model loaded."
except OSError as e:
logger.error(str(e))
logger.info("Retrying after enforcing safetensors preference...")
os.environ["HF_PREFER_SAFETENSORS"] = "1"
try:
_model_dict, _cfg = load_model(str(WEIGHTS_DIR), str(CONFIG_PATH), _device)
_force_fp32_on_modules(_model_dict)
return "βœ… Model loaded (after safetensors retry)."
except Exception as e2:
logger.error(str(e2))
return f"❌ Failed to load model: {e2}"
except Exception as e:
logger.error(str(e))
return f"❌ Failed to load model: {e}"
def _merge_audio_video(audio_path: str, video_path: str, out_path: str) -> None:
"""Preferred: project's util; fallback to ffmpeg."""
sys.path.append(str(REPO_DIR))
try:
from hunyuanvideo_foley.utils.media_utils import merge_audio_video
merge_audio_video(audio_path, video_path, out_path)
except Exception as e:
logger.warning(f"merge_audio_video failed, falling back to ffmpeg: {e}")
import subprocess
cmd = [
"ffmpeg", "-y",
"-i", video_path,
"-i", audio_path,
"-c:v", "copy",
"-c:a", "aac",
"-shortest",
out_path
]
subprocess.run(cmd, check=True)
def _save_outputs(video_src: str, audio_tensor: torch.Tensor, sr: int, idx: int,
prompt: str) -> str:
"""Save WAV + MP4 in outputs/autosaved/, add metadata with a soft watermark note."""
# torchaudio expects [C, N]
if audio_tensor.ndim == 1:
audio_tensor = audio_tensor.unsqueeze(0)
tmpdir = Path(tempfile.mkdtemp())
wav_path = tmpdir / f"gen_{idx}.wav"
torchaudio.save(str(wav_path), audio_tensor.cpu(), sr)
ts = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S_%f")
base = f"shortifoley_{ts}_{idx}"
out_mp4 = OUTPUTS_DIR / f"{base}.mp4"
_merge_audio_video(str(wav_path), video_src, str(out_mp4))
# Sidecar JSON
meta = {
"id": base,
"created_utc": datetime.datetime.utcnow().isoformat() + "Z",
"source_video": Path(video_src).name,
"output_video": Path(out_mp4).name,
"prompt": prompt or "",
"watermark_note": WATERMARK_NOTE,
"tool": "ShortiFoley (HunyuanVideo-Foley)"
}
(OUTPUTS_DIR / f"{base}.json").write_text(json.dumps(meta, ensure_ascii=False, indent=2))
return str(out_mp4)
def _list_gallery(limit: int = 100) -> List[str]:
vids = []
for p in sorted(OUTPUTS_DIR.glob("*.mp4"), key=lambda x: x.stat().st_mtime, reverse=True):
vids.append(str(p))
if len(vids) >= limit:
break
return vids
# ================
# Inference kernel
# ================
@spaces.GPU(duration=GPU_DURATION)
@torch.inference_mode()
def infer_single_video(
video_file: str,
text_prompt: str,
guidance_scale: float = 4.5,
num_inference_steps: int = 50,
sample_nums: int = 1,
) -> Tuple[List[str], str]:
"""
Generate Foley audio for an uploaded video (1–6 variants).
Returns: (list of output video paths, status message)
"""
# Lazy-load on GPU ONLY here (prevents CUDA init in main process)
if _model_dict is None or _cfg is None:
msg = auto_load_models(device_str="cuda")
if not str(msg).startswith("βœ…"):
return [], f"❌ {msg}"
if not video_file:
return [], "❌ Please provide a video."
sys.path.append(str(REPO_DIR))
from hunyuanvideo_foley.utils.feature_utils import feature_process
from hunyuanvideo_foley.utils.model_utils import denoise_process
# Avoid autocast to float16 to fix Half/Float mismatch inside Synchformer conv3d
with torch.autocast(device_type="cuda", enabled=False):
# preprocess
visual_feats, text_feats, audio_len_s = feature_process(
video_file, (text_prompt or "").strip(), _model_dict, _cfg
)
# generate batch
n = int(max(1, min(6, sample_nums)))
audio, sr = denoise_process(
visual_feats,
text_feats,
audio_len_s,
_model_dict,
_cfg,
guidance_scale=float(guidance_scale),
num_inference_steps=int(num_inference_steps),
batch_size=n,
)
# save results
outs = []
for i in range(n):
outs.append(_save_outputs(video_file, audio[i], sr, i + 1, text_prompt or ""))
return outs, f"βœ… Generated {len(outs)} result(s). Saved to {OUTPUTS_DIR}/"
# -------------
# Gradio UI (with MCP+API inside the same app)
# -------------
def _about_html() -> str:
return f"""
<div style="line-height:1.6">
<h2>About ShortiFoley</h2>
<p><b>ShortiFoley</b> turns short videos into realistic Foley sound.<br/>
Powered by Tencent’s HunyuanVideo-Foley (SigLIP2 + CLAP), with autosave and an MCP server for automation
(<a href="https://n8n.partnerlinks.io/bilsimaging" target="_blank" rel="noopener">n8n</a> flows).</p>
<p><b>Created by <a href="https://bilsimaging.com" target="_blank" rel="noopener">bilsimaging.com</a></b></p>
<h3>Quick Steps</h3>
<ol>
<li>Upload a clip (ideally &lt; 120s).</li>
<li>Optionally describe the sound (English).</li>
<li>Pick variants (1–6), adjust CFG and steps.</li>
<li>Hit <b>Generate</b>. Results show on the right and save into the Gallery.</li>
</ol>
<h3>Tips for Best Quality</h3>
<ul>
<li>Use tight clips (5–30s) around the action.</li>
<li>Include material & action cues: β€œmetal clang”, β€œglass shatter”, β€œrubber on wet tile”.</li>
<li>Describe ambience: β€œroomy”, β€œechoey”, β€œdistant crowd”.</li>
<li>Generate 2–4 variants and pick the most natural.</li>
</ul>
<h3>MCP & API</h3>
<p>This Space exposes an <b>MCP server</b> and simple REST endpoints (see β€œAPI & MCP” tab).
Perfect for media-automation pipelines and tools like <b><a href="https://n8n.partnerlinks.io/bilsimaging" target="_blank" rel="noopener">n8n</a></b>.</p>
</div>
"""
def create_ui() -> gr.Blocks:
css = """
.main-header{ text-align:center; padding:1.2rem; border-radius:18px; background:linear-gradient(135deg,#6366f1,#8b5cf6); color:white; box-shadow:0 12px 40px rgba(99,102,241,.35); margin-bottom:16px;}
.main-header h1{ margin:0; font-size:2.0rem; font-weight:800;}
.main-header p{ margin:.25rem 0 0; opacity:.95; font-weight:500;}
.card{ background:white; border:1px solid #e7e9ef; border-radius:16px; padding:14px; box-shadow:0 10px 28px rgba(0,0,0,.06);}
.generate-btn button{ font-weight:800; border-radius:12px; padding:10px 18px;}
.minor-btn button{ border-radius:10px;}
.muted{ color:#64748b; }
.footer-text{ color:#64748b; text-align:center; padding:12px 0; font-size:.95rem; }
"""
with gr.Blocks(title="ShortiFoley β€” HunyuanVideo-Foley", css=css) as demo:
gr.HTML(f"<div class='main-header'><h1>{SPACE_TITLE}</h1><p>{SPACE_TAGLINE}</p></div>")
with gr.Tabs():
with gr.Tab("Run"):
with gr.Row():
# LEFT: input
with gr.Column(scale=1, elem_classes=["card"]):
gr.Markdown("### πŸ“Ή Input")
video_input = gr.Video(label="Upload Video", height=300)
text_input = gr.Textbox(
label="🎯 Audio Description (optional, English)",
placeholder="e.g., Rubber soles on wet tile; distant chatter; occasional splashes.",
lines=3
)
with gr.Row():
guidance_scale = gr.Slider(1.0, 10.0, value=4.5, step=0.1, label="CFG")
steps = gr.Slider(10, 100, value=50, step=5, label="Steps")
samples = gr.Slider(1, 6, value=1, step=1, label="Variants")
with gr.Row():
load_btn = gr.Button("βš™οΈ Load model (CPU)", variant="secondary", elem_classes=["minor-btn"])
generate = gr.Button("🎡 Generate", variant="primary", elem_classes=["generate-btn"])
status = gr.Textbox(label="Status", interactive=False)
# RIGHT: results
with gr.Column(scale=1, elem_classes=["card"]):
gr.Markdown("### πŸŽ₯ Result(s)")
v1 = gr.Video(label="Sample 1", height=260, visible=True)
with gr.Row():
v2 = gr.Video(label="Sample 2", height=160, visible=False)
v3 = gr.Video(label="Sample 3", height=160, visible=False)
with gr.Row():
v4 = gr.Video(label="Sample 4", height=160, visible=False)
v5 = gr.Video(label="Sample 5", height=160, visible=False)
v6 = gr.Video(label="Sample 6", height=160, visible=False)
gr.Markdown("<span class='muted'>Autosaved to the Gallery tab.</span>")
# Generate handler (single binding, exact outputs)
def _process_and_update(video_file, text_prompt, cfg, nsteps, nsamples):
outs, msg = infer_single_video(video_file, text_prompt, cfg, nsteps, nsamples)
vis = []
for i in range(6):
if outs and i < len(outs):
vis.append(gr.update(visible=True, value=outs[i]))
else:
vis.append(gr.update(visible=(i == 0), value=None if i > 0 else None))
return (*vis, msg)
generate.click(
fn=_process_and_update,
inputs=[video_input, text_input, guidance_scale, steps, samples],
outputs=[v1, v2, v3, v4, v5, v6, status],
api_name="/infer",
api_description="Generate Foley audio for an uploaded video. Returns up to 6 video+audio files."
)
load_btn.click(
fn=lambda: auto_load_models(device_str="cpu"),
inputs=[],
outputs=[status],
api_name="/load_model",
api_description="Load/initialize the ShortiFoley model and encoders on CPU (GPU loads during inference)."
)
# Toggle visibility based on variants
def _toggle_vis(n):
n = int(n)
return [
gr.update(visible=True),
gr.update(visible=n >= 2),
gr.update(visible=n >= 3),
gr.update(visible=n >= 4),
gr.update(visible=n >= 5),
gr.update(visible=n >= 6),
]
samples.change(_toggle_vis, inputs=[samples], outputs=[v1, v2, v3, v4, v5, v6])
with gr.Tab("πŸ“ Gallery"):
gr.Markdown("Latest generated videos (autosaved to <code>outputs/autosaved/</code>).")
gallery = gr.Gallery(
value=_list_gallery(),
columns=3,
preview=True,
label="Saved Results"
)
refresh = gr.Button("πŸ”„ Refresh Gallery")
refresh.click(lambda: _list_gallery(), outputs=[gallery])
with gr.Tab("API & MCP"):
gr.Markdown("""
### REST examples
**POST** `/api_generate_from_url`
```json
{
"video_url_or_b64": "https://yourhost/sample.mp4",
"text_prompt": "metallic clink; hollow room reverb",
"guidance_scale": 4.5,
"num_inference_steps": 50,
"sample_nums": 2
}
```
**POST** `/load_model_tool`
Loads the model proactively (useful before batch runs).
**MCP resources & prompt**
- `shortifoley://status` β†’ quick health info
- `foley_prompt` β†’ reusable guidance for describing the sound
Works great for media-automation in tools like **n8n**: call `load_model_tool` once, then `api_generate_from_url` for each clip.
""")
with gr.Tab("ℹ️ About"):
gr.HTML(_about_html())
# Footer
gr.HTML(
"""
<div class="footer-text">
πŸš€ Created by <a href="https://bilsimaging.com" target="_blank" rel="noopener">bilsimaging.com</a>
&middot; Powered by HunyuanVideo-Foley
</div>
"""
)
# ---- REST + MCP endpoints (inside Blocks) ----
def _download_to_tmp(url: str) -> str:
try:
import requests
except Exception:
raise RuntimeError("Missing dependency 'requests'. Add it to requirements.txt to use URL inputs.")
r = requests.get(url, timeout=30)
r.raise_for_status()
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
tmp.write(r.content)
tmp.flush()
tmp.close()
return tmp.name
def _maybe_from_base64(data_url_or_b64: str) -> str:
b64 = data_url_or_b64
if data_url_or_b64.startswith("data:"):
b64 = data_url_or_b64.split(",", 1)[-1]
raw = base64.b64decode(b64)
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
tmp.write(raw)
tmp.flush()
tmp.close()
return tmp.name
def _normalize_video_input(video_url_or_b64: str) -> str:
v = (video_url_or_b64 or "").strip()
if v.startswith("http://") or v.startswith("https://"):
return _download_to_tmp(v)
return _maybe_from_base64(v)
@gr.api
def api_generate_from_url(
video_url_or_b64: str,
text_prompt: str = "",
guidance_scale: float = 4.5,
num_inference_steps: int = 50,
sample_nums: int = 1,
) -> Dict[str, List[str]]:
if _model_dict is None or _cfg is None:
msg = auto_load_models(device_str="cpu") # safe in HTTP context
if not str(msg).startswith("βœ…"):
raise RuntimeError(msg)
local = _normalize_video_input(video_url_or_b64)
outs, msg = infer_single_video(local, text_prompt, guidance_scale, num_inference_steps, sample_nums)
return {"videos": outs, "message": msg}
@gr.api
def load_model_tool() -> str:
"""Ensure model is loaded on server (convenient for MCP/REST)."""
return auto_load_models(device_str="cpu")
@gr.mcp.resource("shortifoley://status")
def shortifoley_status() -> str:
"""Return a simple readiness string for MCP clients."""
ready = _model_dict is not None and _cfg is not None
dev = "cuda" if (_device and _device.type == "cuda") else ("mps" if (_device and _device.type == "mps") else "cpu")
return f"ShortiFoley status: {'ready' if ready else 'loading'} | device={dev} | outputs={OUTPUTS_DIR}"
@gr.mcp.prompt()
def foley_prompt(name: str = "default") -> str:
"""Reusable guidance for describing sound ambience."""
return (
"Describe the expected environmental sound precisely. Mention material, rhythm, intensity, and ambience.\n"
"Example: 'Soft leather footfalls on wet pavement with distant traffic hiss; occasional splashes.'"
)
return demo
def set_seeds(s: int = 1):
random.seed(s)
np.random.seed(s)
torch.manual_seed(s)
# -------------
# App bootstrap
# -------------
if __name__ == "__main__":
logger.remove()
logger.add(lambda m: print(m, end=""), level="INFO")
set_seeds(1)
logger.info("===== Application Startup =====\n")
prepare_once()
# Probe imports (early surfacing)
sys.path.append(str(REPO_DIR))
try:
from hunyuanvideo_foley.utils.model_utils import load_model, denoise_process # noqa: F401
from hunyuanvideo_foley.utils.feature_utils import feature_process # noqa: F401
from hunyuanvideo_foley.utils.media_utils import merge_audio_video # noqa: F401
except Exception as e:
logger.warning(f"Repo imports not ready yet: {e}")
ui = create_ui()
# Enable MCP server so tools/resources/prompts are discoverable
ui.launch(
server_name="0.0.0.0",
share=False,
show_error=True,
mcp_server=True, # Enable MCP server
)