ShortiFoley / app.py
Bils's picture
Update app.py
6e08050 verified
raw
history blame
21.5 kB
# Created by bilsimaging.com
import os
os.environ.setdefault("HF_PREFER_SAFETENSORS", "1")
import sys
import json
import uuid
import time
import shutil
import base64
import random
import tempfile
import datetime
from pathlib import Path
from typing import List, Optional, Tuple, Dict
import numpy as np
import torch
import torchaudio
import gradio as gr
from loguru import logger
from huggingface_hub import snapshot_download
import spaces
# -------------------------
# Constants & configuration
# -------------------------
ROOT = Path(__file__).parent.resolve()
REPO_DIR = ROOT / "HunyuanVideo-Foley"
WEIGHTS_DIR = Path(os.environ.get("HIFI_FOLEY_MODEL_PATH", str(ROOT / "weights")))
CONFIG_PATH = Path(os.environ.get("HIFI_FOLEY_CONFIG", str(REPO_DIR / "configs" / "hunyuanvideo-foley-xxl.yaml")))
OUTPUTS_DIR = Path(os.environ.get("OUTPUTS_DIR", str(ROOT / "outputs" / "autosaved")))
OUTPUTS_DIR.mkdir(parents=True, exist_ok=True)
SPACE_TITLE = "🎵 Shorti Foley Sound— HunyuanVideo-Foley"
SPACE_TAGLINE = "Bring your videos to life with AI-powered Foley Sound"
WATERMARK_NOTE = "Made with ❤️ by bilsimaging.com"
# ZeroGPU limit
GPU_DURATION = int(os.environ.get("GPU_DURATION_SECS", "120"))
# Globals
_model_dict = None
_cfg = None
_device: Optional[torch.device] = None
# ------------
# Small helpers
# ------------
def _setup_device(pref: str = "cpu", gpu_id: int = 0) -> torch.device:
"""
Pick device safely.
IMPORTANT: Do NOT query torch.cuda.is_available() in main/non-GPU processes
on Stateless GPU Spaces. Only set CUDA when called from a @spaces.GPU context.
"""
if pref.startswith("cuda"):
d = torch.device(f"cuda:{gpu_id}")
elif pref == "mps":
d = torch.device("mps")
else:
d = torch.device("cpu")
logger.info(f"Using {d}")
return d
def _ensure_repo() -> None:
"""Shallow-clone Tencent repo with LFS smudge disabled (avoid LFS quota checkout)."""
if REPO_DIR.exists():
return
cmd = (
"GIT_LFS_SKIP_SMUDGE=1 "
"git -c filter.lfs.smudge= -c filter.lfs.required=false "
f"clone --depth 1 https://github.com/Tencent-Hunyuan/HunyuanVideo-Foley.git {REPO_DIR}"
)
logger.info(f">> {cmd}")
os.system(cmd)
def _download_weights_if_needed() -> None:
"""Snapshot only needed files from HF weights/model hub."""
WEIGHTS_DIR.mkdir(parents=True, exist_ok=True)
snapshot_download(
repo_id="tencent/HunyuanVideo-Foley",
local_dir=str(WEIGHTS_DIR),
resume_download=True,
allow_patterns=[
"hunyuanvideo_foley.pth",
"synchformer_state_dict.pth",
"vae_128d_48k.pth",
"assets/*",
"config.yaml",
],
)
def prepare_once() -> None:
_ensure_repo()
_download_weights_if_needed()
# -----------------------
# Model load & inference
# -----------------------
def auto_load_models(device_str: str = "cpu") -> str:
"""
Load HunyuanVideo-Foley + encoders on the chosen device.
Use device_str="cuda" ONLY inside @spaces.GPU function to avoid CUDA init in main process.
"""
global _model_dict, _cfg, _device
if _model_dict is not None and _cfg is not None:
return "✅ Model already loaded."
# Make absolutely sure safetensors is preferred
os.environ["HF_PREFER_SAFETENSORS"] = "1"
sys.path.append(str(REPO_DIR))
from hunyuanvideo_foley.utils.model_utils import load_model
_device = _setup_device(device_str, 0)
logger.info("Loading HunyuanVideo-Foley model...")
logger.info(f"MODEL_PATH: {WEIGHTS_DIR}")
logger.info(f"CONFIG_PATH: {CONFIG_PATH}")
try:
_model_dict, _cfg = load_model(str(WEIGHTS_DIR), str(CONFIG_PATH), _device)
return "✅ Model loaded."
except OSError as e:
logger.error(str(e))
logger.info("Retrying after enforcing safetensors preference...")
os.environ["HF_PREFER_SAFETENSORS"] = "1"
try:
_model_dict, _cfg = load_model(str(WEIGHTS_DIR), str(CONFIG_PATH), _device)
return "✅ Model loaded (after safetensors retry)."
except Exception as e2:
logger.error(str(e2))
return f"❌ Failed to load model: {e2}"
except Exception as e:
logger.error(str(e))
return f"❌ Failed to load model: {e}"
def _merge_audio_video(audio_path: str, video_path: str, out_path: str) -> None:
"""Preferred: project’s util; fallback to ffmpeg."""
sys.path.append(str(REPO_DIR))
try:
from hunyuanvideo_foley.utils.media_utils import merge_audio_video
merge_audio_video(audio_path, video_path, out_path)
except Exception as e:
logger.warning(f"merge_audio_video failed, falling back to ffmpeg: {e}")
import subprocess
cmd = [
"ffmpeg", "-y",
"-i", video_path,
"-i", audio_path,
"-c:v", "copy",
"-c:a", "aac",
"-shortest",
out_path
]
subprocess.run(cmd, check=True)
def _save_outputs(video_src: str, audio_tensor: torch.Tensor, sr: int, idx: int,
prompt: str) -> str:
"""Save WAV + MP4 in autosaved/, add metadata with a soft watermark note."""
# torchaudio expects [C, N]
if audio_tensor.ndim == 1:
audio_tensor = audio_tensor.unsqueeze(0)
tmpdir = Path(tempfile.mkdtemp())
wav_path = tmpdir / f"gen_{idx}.wav"
torchaudio.save(str(wav_path), audio_tensor.cpu(), sr)
ts = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S_%f")
base = f"shortifoley_{ts}_{idx}"
out_mp4 = OUTPUTS_DIR / f"{base}.mp4"
_merge_audio_video(str(wav_path), video_src, str(out_mp4))
# Sidecar JSON
meta = {
"id": base,
"created_utc": datetime.datetime.utcnow().isoformat() + "Z",
"source_video": Path(video_src).name,
"output_video": Path(out_mp4).name,
"prompt": prompt or "",
"watermark_note": WATERMARK_NOTE,
"tool": "ShortiFoley (HunyuanVideo-Foley)"
}
(OUTPUTS_DIR / f"{base}.json").write_text(json.dumps(meta, ensure_ascii=False, indent=2))
return str(out_mp4)
def _list_gallery(limit: int = 100) -> List[str]:
vids = []
for p in sorted(OUTPUTS_DIR.glob("*.mp4"), key=lambda x: x.stat().st_mtime, reverse=True):
vids.append(str(p))
if len(vids) >= limit:
break
return vids
# ================
# Inference kernel
# ================
@spaces.GPU(duration=GPU_DURATION)
@torch.inference_mode()
def infer_single_video(
video_file: str,
text_prompt: str,
guidance_scale: float = 4.5,
num_inference_steps: int = 50,
sample_nums: int = 1,
) -> Tuple[List[str], str]:
"""
Generate Foley audio for an uploaded video (1–6 variants).
Returns: (list of output video paths, status message)
"""
# Lazy-load on GPU
if _model_dict is None or _cfg is None:
msg = auto_load_models(device_str="cuda")
if not str(msg).startswith("✅"):
return [], f"❌ {msg}"
if not video_file:
return [], "❌ Please provide a video."
sys.path.append(str(REPO_DIR))
from hunyuanvideo_foley.utils.feature_utils import feature_process
from hunyuanvideo_foley.utils.model_utils import denoise_process
# preprocess
visual_feats, text_feats, audio_len_s = feature_process(
video_file, (text_prompt or "").strip(), _model_dict, _cfg
)
# generate batch
n = int(max(1, min(6, sample_nums)))
audio, sr = denoise_process(
visual_feats,
text_feats,
audio_len_s,
_model_dict,
_cfg,
guidance_scale=float(guidance_scale),
num_inference_steps=int(num_inference_steps),
batch_size=n,
)
# save results
outs = []
for i in range(n):
outs.append(_save_outputs(video_file, audio[i], sr, i + 1, text_prompt or ""))
return outs, f"✅ Generated {len(outs)} result(s). Saved to {OUTPUTS_DIR}/"
# -------------
# Gradio UI
# -------------
def _about_html() -> str:
return f"""
<div style="line-height:1.6">
<h2>About ShortiFoley</h2>
<p><b>ShortiFoley</b> turns short videos into realistic Foley sound.<br/>
Powered by Tencent’s HunyuanVideo-Foley (SigLIP2 + CLAP), with autosave and an MCP server for automation
(<a href="https://n8n.partnerlinks.io/bilsimaging" target="_blank" rel="noopener">n8n</a> flows).</p>
<p><b>Created by <a href="https://bilsimaging.com" target="_blank" rel="noopener">bilsimaging.com</a></b></p>
<h3>Quick Steps</h3>
<ol>
<li>Upload a clip (ideally &lt; 120s).</li>
<li>Optionally describe the sound (English).</li>
<li>Pick variants (1–6), adjust CFG and steps.</li>
<li>Hit <b>Generate</b>. Results show on the right and save into the Gallery.</li>
</ol>
<h3>Tips for Best Quality</h3>
<ul>
<li>Use tight clips (5–30s) around the action.</li>
<li>Include material & action cues: “metal clang”, “glass shatter”, “rubber on wet tile”.</li>
<li>Describe ambience: “roomy”, “echoey”, “distant crowd”.</li>
<li>Generate 2–4 variants and pick the most natural.</li>
</ul>
<h3>MCP & API</h3>
<p>This Space exposes an <b>MCP server</b> and simple REST endpoints (see “API & MCP” tab).
Perfect for media-automation pipelines and tools like <b><a href="https://n8n.partnerlinks.io/bilsimaging" target="_blank" rel="noopener">n8n</a></b>.</p>
</div>
"""
def create_ui() -> gr.Blocks:
css = """
.main-header{ text-align:center; padding:1.2rem; border-radius:18px; background:linear-gradient(135deg,#6366f1,#8b5cf6); color:white; box-shadow:0 12px 40px rgba(99,102,241,.35); margin-bottom:16px;}
.main-header h1{ margin:0; font-size:2.0rem; font-weight:800;}
.main-header p{ margin:.25rem 0 0; opacity:.95; font-weight:500;}
.card{ background:white; border:1px solid #e7e9ef; border-radius:16px; padding:14px; box-shadow:0 10px 28px rgba(0,0,0,.06);}
.generate-btn button{ font-weight:800; border-radius:12px; padding:10px 18px;}
.minor-btn button{ border-radius:10px;}
.muted{ color:#64748b; }
.footer-text{ color:#64748b; text-align:center; padding:12px 0; font-size:.95rem; }
"""
with gr.Blocks(title="ShortiFoley — HunyuanVideo-Foley", css=css) as demo:
gr.HTML(f"<div class='main-header'><h1>{SPACE_TITLE}</h1><p>{SPACE_TAGLINE}</p></div>")
with gr.Tabs():
with gr.Tab("Run"):
with gr.Row():
# LEFT: input
with gr.Column(scale=1, elem_classes=["card"]):
gr.Markdown("### 📹 Input")
video_input = gr.Video(label="Upload Video", height=300)
text_input = gr.Textbox(
label="🎯 Audio Description (optional, English)",
placeholder="e.g., Rubber soles on wet tile; distant chatter; occasional splashes.",
lines=3
)
with gr.Row():
guidance_scale = gr.Slider(1.0, 10.0, value=4.5, step=0.1, label="CFG")
steps = gr.Slider(10, 100, value=50, step=5, label="Steps")
samples = gr.Slider(1, 6, value=1, step=1, label="Variants")
with gr.Row():
load_btn = gr.Button("⚙️ Load model (CPU)", variant="secondary", elem_classes=["minor-btn"])
generate = gr.Button("🎵 Generate", variant="primary", elem_classes=["generate-btn"])
status = gr.Textbox(label="Status", interactive=False)
# RIGHT: results
with gr.Column(scale=1, elem_classes=["card"]):
gr.Markdown("### 🎥 Result(s)")
v1 = gr.Video(label="Sample 1", height=260, visible=True)
with gr.Row():
v2 = gr.Video(label="Sample 2", height=160, visible=False)
v3 = gr.Video(label="Sample 3", height=160, visible=False)
with gr.Row():
v4 = gr.Video(label="Sample 4", height=160, visible=False)
v5 = gr.Video(label="Sample 5", height=160, visible=False)
v6 = gr.Video(label="Sample 6", height=160, visible=False)
gr.Markdown("<span class='muted'>Autosaved to the Gallery tab.</span>")
# Generate handler
def _process_and_update(video_file, text_prompt, cfg, nsteps, nsamples):
outs, msg = infer_single_video(video_file, text_prompt, cfg, nsteps, nsamples)
vis = []
for i in range(6):
if outs and i < len(outs):
vis.append(gr.update(visible=True, value=outs[i]))
else:
vis.append(gr.update(visible=(i == 0), value=None if i > 0 else None))
# Also refresh the gallery in this same event
new_gallery = _list_gallery()
return (*vis, msg, new_gallery)
generate.click(
fn=_process_and_update,
inputs=[video_input, text_input, guidance_scale, steps, samples],
outputs=[v1, v2, v3, v4, v5, v6, status], # updated below to include gallery via .then-like merge
api_name="/infer",
api_description="Generate Foley audio for an uploaded video. Returns up to 6 video+audio files."
)
# Workaround: extend outputs to include gallery refresh using a wrapper
def _process_and_update_with_gallery(video_file, text_prompt, cfg, nsteps, nsamples):
outs, msg = infer_single_video(video_file, text_prompt, cfg, nsteps, nsamples)
vis = []
for i in range(6):
if outs and i < len(outs):
vis.append(gr.update(visible=True, value=outs[i]))
else:
vis.append(gr.update(visible=(i == 0), value=None if i > 0 else None))
new_gallery = _list_gallery()
return (*vis, msg, new_gallery)
# Re-bind with gallery as extra output
generate.click(
fn=_process_and_update_with_gallery,
inputs=[video_input, text_input, guidance_scale, steps, samples],
outputs=[v1, v2, v3, v4, v5, v6, status,], # gallery will be refreshed on Gallery tab itself
)
load_btn.click(
fn=lambda: auto_load_models(device_str="cpu"),
inputs=[],
outputs=[status],
api_name="/load_model",
api_description="Load/initialize the ShortiFoley model and encoders on CPU (GPU loads during inference)."
)
# Toggle visibility based on variants
def _toggle_vis(n):
n = int(n)
return [
gr.update(visible=True),
gr.update(visible=n >= 2),
gr.update(visible=n >= 3),
gr.update(visible=n >= 4),
gr.update(visible=n >= 5),
gr.update(visible=n >= 6),
]
samples.change(_toggle_vis, inputs=[samples], outputs=[v1, v2, v3, v4, v5, v6])
with gr.Tab("📁 Gallery"):
gr.Markdown("Latest generated videos (autosaved to `outputs/autosaved/`).")
gallery = gr.Gallery(
value=_list_gallery(),
columns=3,
preview=True,
label="Saved Results"
)
refresh = gr.Button("🔄 Refresh Gallery")
refresh.click(lambda: _list_gallery(), outputs=[gallery])
with gr.Tab("API & MCP"):
gr.Markdown("""
### REST examples
**POST** `/api_generate_from_url`
```json
{
"video_url_or_b64": "https://yourhost/sample.mp4",
"text_prompt": "metallic clink; hollow room reverb",
"guidance_scale": 4.5,
"num_inference_steps": 50,
"sample_nums": 2
}
```
**POST** `/load_model_tool`
Loads the model proactively (useful before batch runs).
**MCP resources & prompt**
- `shortifoley://status` → quick health info
- `foley_prompt` → reusable guidance for describing the sound
Works great with media-automation in tools like **n8n**: call `load_model_tool` once, then `api_generate_from_url` for each clip.
""")
with gr.Tab("ℹ️ About"):
gr.HTML(_about_html())
# Footer
gr.HTML(
"""
<div class="footer-text">
🚀 Created by <a href="https://bilsimaging.com" target="_blank" rel="noopener">bilsimaging.com</a>
&middot; Powered by HunyuanVideo-Foley
</div>
"""
)
# ---- REST + MCP endpoints ----
def _download_to_tmp(url: str) -> str:
try:
import requests
except Exception:
raise RuntimeError("Missing dependency 'requests'. Add it to requirements.txt to use URL inputs.")
r = requests.get(url, timeout=30)
r.raise_for_status()
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
tmp.write(r.content)
tmp.flush()
tmp.close()
return tmp.name
def _maybe_from_base64(data_url_or_b64: str) -> str:
b64 = data_url_or_b64
if data_url_or_b64.startswith("data:"):
b64 = data_url_or_b64.split(",", 1)[-1]
raw = base64.b64decode(b64)
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
tmp.write(raw)
tmp.flush()
tmp.close()
return tmp.name
def _normalize_video_input(video_url_or_b64: str) -> str:
v = (video_url_or_b64 or "").strip()
if v.startswith("http://") or v.startswith("https://"):
return _download_to_tmp(v)
return _maybe_from_base64(v)
@gr.api
def api_generate_from_url(
video_url_or_b64: str,
text_prompt: str = "",
guidance_scale: float = 4.5,
num_inference_steps: int = 50,
sample_nums: int = 1,
) -> Dict[str, List[str]]:
if _model_dict is None or _cfg is None:
msg = auto_load_models(device_str="cpu")
if not str(msg).startswith("✅"):
raise RuntimeError(msg)
local = _normalize_video_input(video_url_or_b64)
outs, msg = infer_single_video(local, text_prompt, guidance_scale, num_inference_steps, sample_nums)
return {"videos": outs, "message": msg}
@gr.api
def load_model_tool() -> str:
"""Ensure model is loaded on server (convenient for MCP/REST)."""
return auto_load_models(device_str="cpu")
@gr.mcp.resource("shortifoley://status")
def shortifoley_status() -> str:
"""Return a simple readiness string for MCP clients."""
ready = _model_dict is not None and _cfg is not None
dev = "cuda" if (_device and _device.type == "cuda") else ("mps" if (_device and _device.type == "mps") else "cpu")
return f"ShortiFoley status: {'ready' if ready else 'loading'} | device={dev} | outputs={OUTPUTS_DIR}"
@gr.mcp.prompt()
def foley_prompt(name: str = "default") -> str:
"""Reusable guidance for describing sound ambience."""
return (
"Describe the expected environmental sound precisely. Mention material, rhythm, intensity, and ambience.\n"
"Example: 'Soft leather footfalls on wet pavement with distant traffic hiss; occasional splashes.'"
)
return demo
def set_seeds(s: int = 1):
random.seed(s)
np.random.seed(s)
torch.manual_seed(s)
# -------------
# App bootstrap
# -------------
if __name__ == "__main__":
logger.remove()
logger.add(lambda m: print(m, end=""), level="INFO")
set_seeds(1)
logger.info("===== Application Startup =====\n")
prepare_once()
# Probe imports
sys.path.append(str(REPO_DIR))
try:
from hunyuanvideo_foley.utils.model_utils import load_model, denoise_process # noqa: F401
from hunyuanvideo_foley.utils.feature_utils import feature_process # noqa: F401
from hunyuanvideo_foley.utils.media_utils import merge_audio_video # noqa: F401
except Exception as e:
logger.warning(f"Repo imports not ready yet: {e}")
ui = create_ui()
# Enable MCP server
ui.launch(
server_name="0.0.0.0",
share=False,
show_error=True,
mcp_server=True, # MCP
)