ShortiFoley / app.py
Bils's picture
Update app.py
e7621f8 verified
raw
history blame
18.2 kB
# app.py β€” ShortiFoley (Video -> Foley)
# Created by bilsimaging.com
import os
import sys
import io
import json
import uuid
import time
import shutil
import base64
import random
import tempfile
import datetime
from pathlib import Path
from typing import List, Optional, Tuple, Dict
import numpy as np
import torch
import torchaudio
import gradio as gr
from loguru import logger
from huggingface_hub import snapshot_download
import spaces # HF Spaces ZeroGPU & MCP integration
# -------------------------
# Constants & configuration
# -------------------------
ROOT = Path(__file__).parent.resolve()
REPO_DIR = ROOT / "HunyuanVideo-Foley"
WEIGHTS_DIR = Path(os.environ.get("HIFI_FOLEY_MODEL_PATH", str(ROOT / "weights")))
CONFIG_PATH = Path(os.environ.get("HIFI_FOLEY_CONFIG", str(REPO_DIR / "configs" / "hunyuanvideo-foley-xxl.yaml")))
OUTPUTS_DIR = Path(os.environ.get("OUTPUTS_DIR", str(ROOT / "outputs")))
OUTPUTS_DIR.mkdir(parents=True, exist_ok=True)
SPACE_TITLE = "🎡 ShortiFoley β€” HunyuanVideo-Foley"
SPACE_TAGLINE = "Text/Video β†’ Audio Foley. Created by bilsimaging.com"
WATERMARK_NOTE = "Made with ❀️ by bilsimaging.com"
# Keep GPU <= 120s for ZeroGPU (default 110)
GPU_DURATION = int(os.environ.get("GPU_DURATION_SECS", "110"))
# Globals
_model_dict = None
_cfg = None
_device: Optional[torch.device] = None
# ------------
# Small helpers
# ------------
def _setup_device(pref: str = "auto", gpu_id: int = 0) -> torch.device:
"""Pick CUDA if available, else MPS, else CPU."""
if pref == "auto":
if torch.cuda.is_available():
d = torch.device(f"cuda:{gpu_id}")
elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
d = torch.device("mps")
else:
d = torch.device("cpu")
else:
d = torch.device(pref)
logger.info(f"Using CUDA {d}" if d.type == "cuda" else f"Using {d}")
return d
def _ensure_repo() -> None:
"""Shallow-clone Tencent repo with LFS smudge disabled (avoid LFS quota checkout)."""
if REPO_DIR.exists():
return
cmd = (
"GIT_LFS_SKIP_SMUDGE=1 "
"git -c filter.lfs.smudge= -c filter.lfs.required=false "
f"clone --depth 1 https://github.com/Tencent-Hunyuan/HunyuanVideo-Foley.git {REPO_DIR}"
)
logger.info(f">> {cmd}")
os.system(cmd)
def _download_weights_if_needed() -> None:
"""Snapshot only needed files from HF weights/model hub."""
WEIGHTS_DIR.mkdir(parents=True, exist_ok=True)
snapshot_download(
repo_id="tencent/HunyuanVideo-Foley",
local_dir=str(WEIGHTS_DIR),
resume_download=True,
allow_patterns=[
"hunyuanvideo_foley.pth",
"synchformer_state_dict.pth",
"vae_128d_48k.pth",
"assets/*",
"config.yaml", # harmless
],
)
def prepare_once() -> None:
_ensure_repo()
_download_weights_if_needed()
# -----------------------
# Model load & inference
# -----------------------
def auto_load_models() -> str:
"""
Load HunyuanVideo-Foley + encoders on the chosen device.
"""
global _model_dict, _cfg, _device
if _model_dict is not None and _cfg is not None:
return "Model already loaded."
sys.path.append(str(REPO_DIR))
from hunyuanvideo_foley.utils.model_utils import load_model
_device = _setup_device("auto", 0)
logger.info("Loading HunyuanVideo-Foley model...")
logger.info(f"MODEL_PATH: {WEIGHTS_DIR}")
logger.info(f"CONFIG_PATH: {CONFIG_PATH}")
try:
_model_dict, _cfg = load_model(str(WEIGHTS_DIR), str(CONFIG_PATH), _device)
return "βœ… Model loaded."
except Exception as e:
logger.error(e)
return f"❌ Failed to load model: {e}"
def _merge_audio_video(audio_path: str, video_path: str, out_path: str) -> None:
"""Use project's helper (preferred) with a fallback to ffmpeg via subprocess."""
sys.path.append(str(REPO_DIR))
try:
from hunyuanvideo_foley.utils.media_utils import merge_audio_video
merge_audio_video(audio_path, video_path, out_path)
except Exception as e:
# Fallback: plain ffmpeg merge (assumes same duration or lets ffmpeg handle)
logger.warning(f"merge_audio_video failed, falling back to ffmpeg: {e}")
import subprocess
cmd = [
"ffmpeg", "-y",
"-i", video_path,
"-i", audio_path,
"-c:v", "copy",
"-c:a", "aac",
"-shortest",
out_path
]
subprocess.run(cmd, check=True)
def _save_outputs(video_src: str, audio_tensor: torch.Tensor, sr: int, idx: int,
prompt: str) -> str:
"""Save WAV + MP4 in outputs/, add metadata and a small watermark note (metadata only)."""
# torchaudio expects [C, N]
if audio_tensor.ndim == 1:
audio_tensor = audio_tensor.unsqueeze(0)
tmpdir = Path(tempfile.mkdtemp())
wav_path = tmpdir / f"gen_{idx}.wav"
torchaudio.save(str(wav_path), audio_tensor.cpu(), sr)
ts = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S_%f")
base = f"shortifoley_{ts}_{idx}"
out_mp4 = OUTPUTS_DIR / f"{base}.mp4"
_merge_audio_video(str(wav_path), video_src, str(out_mp4))
# Save JSON sidecar
meta = {
"id": base,
"created_utc": datetime.datetime.utcnow().isoformat() + "Z",
"source_video": Path(video_src).name,
"output_video": Path(out_mp4).name,
"prompt": prompt or "",
"watermark": WATERMARK_NOTE,
"tool": "ShortiFoley (HunyuanVideo-Foley)"
}
(OUTPUTS_DIR / f"{base}.json").write_text(json.dumps(meta, ensure_ascii=False, indent=2))
return str(out_mp4)
def _list_gallery(limit: int = 100) -> List[str]:
vids = []
for p in sorted(OUTPUTS_DIR.glob("*.mp4"), key=lambda x: x.stat().st_mtime, reverse=True):
vids.append(str(p))
if len(vids) >= limit:
break
return vids
# ================
# Inference kernel
# ================
@spaces.GPU(duration=GPU_DURATION)
@torch.inference_mode()
def infer_single_video(
video_file: str,
text_prompt: str,
guidance_scale: float = 4.5,
num_inference_steps: int = 50,
sample_nums: int = 1,
) -> Tuple[List[str], str]:
"""
Generate Foley audio for an uploaded video (1–6 variants).
Returns: (list of output video paths, status message)
"""
if _model_dict is None or _cfg is None:
return [], "❌ Load the model first (open the app once)."
if not video_file:
return [], "❌ Please provide a video."
sys.path.append(str(REPO_DIR))
from hunyuanvideo_foley.utils.feature_utils import feature_process
from hunyuanvideo_foley.utils.model_utils import denoise_process
# preprocess
visual_feats, text_feats, audio_len_s = feature_process(
video_file, (text_prompt or "").strip(), _model_dict, _cfg
)
# generate batch
n = int(max(1, min(6, sample_nums)))
audio, sr = denoise_process(
visual_feats,
text_feats,
audio_len_s,
_model_dict,
_cfg,
guidance_scale=float(guidance_scale),
num_inference_steps=int(num_inference_steps),
batch_size=n,
)
# save results
outs = []
for i in range(n):
outs.append(_save_outputs(video_file, audio[i], sr, i + 1, text_prompt or ""))
return outs, f"βœ… Generated {len(outs)} result(s). Saved to {OUTPUTS_DIR}/"
# ---------------
# MCP-only APIs
# ---------------
def _download_to_tmp(url: str) -> str:
"""Download a remote file to temp."""
try:
import requests
except Exception:
raise RuntimeError("Missing dependency 'requests'. Add it to requirements.txt to use URL inputs.")
r = requests.get(url, timeout=30)
r.raise_for_status()
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
tmp.write(r.content)
tmp.flush()
tmp.close()
return tmp.name
def _maybe_from_base64(data_url_or_b64: str) -> str:
"""Accept data: URLs or raw base64; returns temp file path."""
b64 = data_url_or_b64
if data_url_or_b64.startswith("data:"):
b64 = data_url_or_b64.split(",", 1)[-1]
raw = base64.b64decode(b64)
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
tmp.write(raw)
tmp.flush()
tmp.close()
return tmp.name
def _normalize_video_input(video_url_or_b64: str) -> str:
v = (video_url_or_b64 or "").strip()
if v.startswith("http://") or v.startswith("https://"):
return _download_to_tmp(v)
return _maybe_from_base64(v)
with gr.Blocks() as mcp_only_endpoints:
gr.Markdown("These endpoints are MCP/API only and have no visible UI.", show_label=False)
@gr.api
def api_generate_from_url(
video_url_or_b64: str,
text_prompt: str = "",
guidance_scale: float = 4.5,
num_inference_steps: int = 50,
sample_nums: int = 1,
) -> Dict[str, List[str]]:
"""
Generate Foley from a remote video URL or base64-encoded video.
Returns: {"videos": [paths], "message": str}
"""
if _model_dict is None or _cfg is None:
raise RuntimeError("Model not loaded. Open the UI once or call /load_model tool.")
local = _normalize_video_input(video_url_or_b64)
outs, msg = infer_single_video(local, text_prompt, guidance_scale, num_inference_steps, sample_nums)
return {"videos": outs, "message": msg}
@gr.api
def load_model_tool() -> str:
"""Ensure model is loaded on server (MCP convenience)."""
return auto_load_models()
@gr.mcp.resource("shortifoley://status")
def shortifoley_status() -> str:
"""Return a simple readiness string for MCP clients."""
ready = _model_dict is not None and _cfg is not None
dev = "cuda" if (_device and _device.type == "cuda") else ("mps" if (_device and _device.type == "mps") else "cpu")
return f"ShortiFoley status: {'ready' if ready else 'loading'} | device={dev} | outputs={OUTPUTS_DIR}"
@gr.mcp.prompt()
def foley_prompt(name: str = "default") -> str:
"""Reusable guidance for describing sound ambience."""
return (
"Describe the expected environmental sound precisely. Mention material, rhythm, intensity, and ambience.\n"
"Example: 'Soft leather footfalls on wet pavement with distant traffic hiss; occasional splashes.'"
)
# -------------
# Gradio UI
# -------------
def _about_html() -> str:
return f"""
<div style="line-height:1.6">
<h2>About ShortiFoley</h2>
<p><b>ShortiFoley</b> automatically generates realistic Foley soundtracks for short videos using
Tencent’s HunyuanVideo-Foley with CLAP & SigLIP2 encoders. It includes autosave and an MCP server so
you can call it from agents or workflows (e.g., n8n).</p>
<p><b>Created by <a href="https://bilsimaging.com" target="_blank">bilsimaging.com</a></b></p>
<h3>How to use</h3>
<ol>
<li>Upload a video (ideally &lt; 120 seconds).</li>
<li>Optionally enter a text description of the sound (English).</li>
<li>Adjust CFG scale, steps, and number of variants.</li>
<li>Click <b>Generate</b>. Results appear on the right and are stored in the Gallery.</li>
</ol>
<h3>Tips</h3>
<ul>
<li>Trim clips to the key action (5–30s) for faster, crisper results.</li>
<li>Include material cues (β€œwood”, β€œmetal”, β€œconcrete”), action cues (β€œsplash”, β€œglass shatter”), and ambience (β€œroomy”, β€œechoey”).</li>
<li>Generate multiple variants and pick the most natural.</li>
</ul>
<h3>MCP / Automation</h3>
<p>This app runs as an <b>MCP server</b>. Open the footer β€œView API β†’ MCP” to copy a ready config. You can also use the REST endpoints listed there. Perfect for n8n integrations.</p>
<h3>Watermark</h3>
<p>Each output’s metadata includes: <i>{WATERMARK_NOTE}</i>. If you want a <b>visible video overlay</b>, I can add an ffmpeg overlay step on request.</p>
</div>
"""
def create_ui() -> gr.Blocks:
with gr.Blocks(
title="ShortiFoley β€” HunyuanVideo-Foley",
css="""
.main-header{ text-align:center; padding:1.2rem; border-radius:16px; background:linear-gradient(135deg,#667eea,#764ba2); color:white; }
.card{ background:white; border:1px solid #e1e5e9; border-radius:16px; padding:1rem; box-shadow:0 8px 32px rgba(0,0,0,.06); }
.generate-btn button{ font-weight:700; }
"""
) as demo:
gr.HTML(f"<div class='main-header'><h1>{SPACE_TITLE}</h1><p>{SPACE_TAGLINE}</p></div>")
with gr.Tabs():
with gr.Tab("Run"):
with gr.Row():
with gr.Column(scale=1, elem_classes=["card"]):
gr.Markdown("### πŸ“Ή Input")
video_input = gr.Video(label="Upload Video", height=300)
text_input = gr.Textbox(
label="🎯 Audio Description (optional, English)",
placeholder="e.g., Rubber soles on wet tile, distant chatter.",
lines=3
)
with gr.Row():
guidance_scale = gr.Slider(1.0, 10.0, value=4.5, step=0.1, label="CFG Scale")
steps = gr.Slider(10, 100, value=50, step=5, label="Steps")
samples = gr.Slider(1, 6, value=1, step=1, label="Variants")
generate = gr.Button("🎡 Generate", variant="primary", elem_classes=["generate-btn"])
with gr.Column(scale=1, elem_classes=["card"]):
gr.Markdown("### πŸŽ₯ Result(s)")
v1 = gr.Video(label="Sample 1", height=260, visible=True)
v2 = gr.Video(label="Sample 2", height=160, visible=False)
v3 = gr.Video(label="Sample 3", height=160, visible=False)
v4 = gr.Video(label="Sample 4", height=160, visible=False)
v5 = gr.Video(label="Sample 5", height=160, visible=False)
v6 = gr.Video(label="Sample 6", height=160, visible=False)
status = gr.Textbox(label="Status", interactive=False)
# Generate handler
def _process_and_update(video_file, text_prompt, cfg, nsteps, nsamples):
outs, msg = infer_single_video(video_file, text_prompt, cfg, nsteps, nsamples)
vis_updates = []
for i in range(6):
if i < len(outs):
vis_updates.append(gr.update(visible=True, value=outs[i]))
else:
vis_updates.append(gr.update(visible=False, value=None))
gal_items = _list_gallery()
return (*vis_updates, msg, gr.update(value=gal_items))
generate.click(
fn=_process_and_update,
inputs=[video_input, text_input, guidance_scale, steps, samples],
outputs=[v1, v2, v3, v4, v5, v6, status, ],
api_name="/infer",
api_description="Generate Foley audio for an uploaded video. Returns up to 6 video+audio files."
)
# Toggle visibility when # of samples changes
def _toggle_vis(n):
n = int(n)
return [
gr.update(visible=True),
gr.update(visible=n >= 2),
gr.update(visible=n >= 3),
gr.update(visible=n >= 4),
gr.update(visible=n >= 5),
gr.update(visible=n >= 6),
]
samples.change(_toggle_vis, inputs=[samples], outputs=[v1, v2, v3, v4, v5, v6])
with gr.Tab("πŸ“ Gallery"):
gr.Markdown("Latest generated videos (autosaved to `outputs/`).")
gallery = gr.Gallery(
value=_list_gallery(),
columns=3,
preview=True,
label="Saved Results"
)
refresh = gr.Button("πŸ”„ Refresh Gallery")
refresh.click(lambda: gr.update(value=_list_gallery()), outputs=[gallery])
with gr.Tab("ℹ️ About"):
gr.HTML(_about_html())
# Also expose gallery update after generate
generate.click(lambda: gr.update(value=_list_gallery()), outputs=[gallery])
return demo
def set_seeds(s: int = 1):
random.seed(s)
np.random.seed(s)
torch.manual_seed(s)
# -------------
# App bootstrap
# -------------
if __name__ == "__main__":
logger.remove()
logger.add(lambda m: print(m, end=""), level="INFO")
set_seeds(1)
logger.info("===== Application Startup =====\n")
prepare_once()
# Ensure import paths after repo is present
sys.path.append(str(REPO_DIR))
try:
# Probe key modules early (better error surfacing)
from hunyuanvideo_foley.utils.model_utils import load_model, denoise_process # noqa: F401
from hunyuanvideo_foley.utils.feature_utils import feature_process # noqa: F401
from hunyuanvideo_foley.utils.media_utils import merge_audio_video # noqa: F401
except Exception as e:
logger.warning(f"Repo imports not ready yet: {e}")
msg = auto_load_models()
if not msg.startswith("βœ…"):
logger.error(f"[BOOT][ERROR] auto_load_models() failed:\n{msg}")
else:
logger.info(msg)
ui = create_ui()
# Mount MCP-only endpoints alongside the UI
ui.blocks.append(mcp_only_endpoints)
# Enable MCP server so tools/resources/prompts are discoverable
ui.launch(
server_name="0.0.0.0",
share=False,
show_error=True,
mcp_server=True, # MCP on
)