Spaces:
Running
on
Zero
Running
on
Zero
# Created by bilsimaging.com | |
import os | |
os.environ.setdefault("HF_PREFER_SAFETENSORS", "1") | |
import sys | |
import json | |
import base64 | |
import random | |
import tempfile | |
import datetime | |
from pathlib import Path | |
from typing import List, Optional, Tuple, Dict | |
import numpy as np | |
import torch | |
import torchaudio | |
import gradio as gr | |
from loguru import logger | |
from huggingface_hub import snapshot_download | |
import spaces | |
# ------------------------- | |
# Constants & configuration | |
# ------------------------- | |
ROOT = Path(__file__).parent.resolve() | |
REPO_DIR = ROOT / "HunyuanVideo-Foley" | |
WEIGHTS_DIR = Path(os.environ.get("HIFI_FOLEY_MODEL_PATH", str(ROOT / "weights"))) | |
CONFIG_PATH = Path(os.environ.get("HIFI_FOLEY_CONFIG", str(REPO_DIR / "configs" / "hunyuanvideo-foley-xxl.yaml"))) | |
# Always save into outputs/autosaved/ | |
OUTPUTS_DIR = Path(os.environ.get("OUTPUTS_DIR", str(ROOT / "outputs" / "autosaved"))) | |
OUTPUTS_DIR.mkdir(parents=True, exist_ok=True) | |
SPACE_TITLE = "π΅ ShortiFoley β HunyuanVideo-Foley" | |
SPACE_TAGLINE = "Text/Video β Audio Foley Β· Created by bilsimaging.com" | |
WATERMARK_NOTE = "Made with β€οΈ by bilsimaging.com" | |
# ZeroGPU limit (<=120s recommended) | |
GPU_DURATION = int(os.environ.get("GPU_DURATION_SECS", "110")) | |
# Globals | |
_model_dict = None | |
_cfg = None | |
_device: Optional[torch.device] = None | |
# ------------ | |
# Small helpers | |
# ------------ | |
def _setup_device(pref: str = "cpu", gpu_id: int = 0) -> torch.device: | |
""" | |
Safe device picker. | |
IMPORTANT: Do NOT probe torch.cuda.is_available() here on Stateless GPU Spaces. | |
Only request CUDA inside a @spaces.GPU function. | |
""" | |
if pref.startswith("cuda"): | |
d = torch.device(f"cuda:{gpu_id}") | |
elif pref == "mps": | |
d = torch.device("mps") | |
else: | |
d = torch.device("cpu") | |
logger.info(f"Using {d}") | |
return d | |
def _ensure_repo() -> None: | |
"""Shallow-clone Tencent repo with LFS smudge disabled (avoid LFS quota checkout).""" | |
if REPO_DIR.exists(): | |
return | |
cmd = ( | |
"GIT_LFS_SKIP_SMUDGE=1 " | |
"git -c filter.lfs.smudge= -c filter.lfs.required=false " | |
f"clone --depth 1 https://github.com/Tencent-Hunyuan/HunyuanVideo-Foley.git {REPO_DIR}" | |
) | |
logger.info(f">> {cmd}") | |
os.system(cmd) | |
def _download_weights_if_needed() -> None: | |
"""Snapshot only needed files from HF weights/model hub.""" | |
WEIGHTS_DIR.mkdir(parents=True, exist_ok=True) | |
snapshot_download( | |
repo_id="tencent/HunyuanVideo-Foley", | |
local_dir=str(WEIGHTS_DIR), | |
resume_download=True, | |
allow_patterns=[ | |
"hunyuanvideo_foley.pth", | |
"synchformer_state_dict.pth", | |
"vae_128d_48k.pth", | |
"assets/*", | |
"config.yaml", | |
], | |
) | |
def prepare_once() -> None: | |
_ensure_repo() | |
_download_weights_if_needed() | |
# ----------------------- | |
# Model load & inference | |
# ----------------------- | |
def _force_fp32_on_modules(obj): | |
"""Ensure every torch.nn.Module inside obj is float32 to avoid half/float mismatches.""" | |
try: | |
import torch.nn as nn | |
for name in dir(obj): | |
try: | |
m = getattr(obj, name) | |
except Exception: | |
continue | |
if isinstance(m, nn.Module): | |
m.float() | |
if hasattr(obj, "foley_model"): obj.foley_model.float() | |
if hasattr(obj, "dac_model"): obj.dac_model.float() | |
if hasattr(obj, "siglip2_model"): obj.siglip2_model.float() | |
if hasattr(obj, "clap_model"): obj.clap_model.float() | |
if hasattr(obj, "syncformer_model"): obj.syncformer_model.float() | |
except Exception as e: | |
logger.warning(f"FP32 cast warning: {e}") | |
def auto_load_models(device_str: str = "cpu") -> str: | |
""" | |
Load HunyuanVideo-Foley + encoders on the chosen device. | |
Use device_str='cuda' ONLY inside @spaces.GPU to avoid CUDA init in main process. | |
""" | |
global _model_dict, _cfg, _device | |
if _model_dict is not None and _cfg is not None: | |
return "β Model already loaded." | |
# Make absolutely sure safetensors is preferred | |
os.environ["HF_PREFER_SAFETENSORS"] = "1" | |
torch.set_float32_matmul_precision("high") # allow TF32 where possible | |
sys.path.append(str(REPO_DIR)) | |
from hunyuanvideo_foley.utils.model_utils import load_model | |
_device = _setup_device(device_str, 0) | |
logger.info("Loading HunyuanVideo-Foley model...") | |
logger.info(f"MODEL_PATH: {WEIGHTS_DIR}") | |
logger.info(f"CONFIG_PATH: {CONFIG_PATH}") | |
try: | |
_model_dict, _cfg = load_model(str(WEIGHTS_DIR), str(CONFIG_PATH), _device) | |
# Force fp32 to fix: RuntimeError: Input type (Half) and bias (float) must match | |
_force_fp32_on_modules(_model_dict) | |
return "β Model loaded." | |
except OSError as e: | |
logger.error(str(e)) | |
logger.info("Retrying after enforcing safetensors preference...") | |
os.environ["HF_PREFER_SAFETENSORS"] = "1" | |
try: | |
_model_dict, _cfg = load_model(str(WEIGHTS_DIR), str(CONFIG_PATH), _device) | |
_force_fp32_on_modules(_model_dict) | |
return "β Model loaded (after safetensors retry)." | |
except Exception as e2: | |
logger.error(str(e2)) | |
return f"β Failed to load model: {e2}" | |
except Exception as e: | |
logger.error(str(e)) | |
return f"β Failed to load model: {e}" | |
def _merge_audio_video(audio_path: str, video_path: str, out_path: str) -> None: | |
"""Preferred: project's util; fallback to ffmpeg.""" | |
sys.path.append(str(REPO_DIR)) | |
try: | |
from hunyuanvideo_foley.utils.media_utils import merge_audio_video | |
merge_audio_video(audio_path, video_path, out_path) | |
except Exception as e: | |
logger.warning(f"merge_audio_video failed, falling back to ffmpeg: {e}") | |
import subprocess | |
cmd = [ | |
"ffmpeg", "-y", | |
"-i", video_path, | |
"-i", audio_path, | |
"-c:v", "copy", | |
"-c:a", "aac", | |
"-shortest", | |
out_path | |
] | |
subprocess.run(cmd, check=True) | |
def _save_outputs(video_src: str, audio_tensor: torch.Tensor, sr: int, idx: int, | |
prompt: str) -> str: | |
"""Save WAV + MP4 in outputs/autosaved/, add metadata with a soft watermark note.""" | |
# torchaudio expects [C, N] | |
if audio_tensor.ndim == 1: | |
audio_tensor = audio_tensor.unsqueeze(0) | |
tmpdir = Path(tempfile.mkdtemp()) | |
wav_path = tmpdir / f"gen_{idx}.wav" | |
torchaudio.save(str(wav_path), audio_tensor.cpu(), sr) | |
ts = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S_%f") | |
base = f"shortifoley_{ts}_{idx}" | |
out_mp4 = OUTPUTS_DIR / f"{base}.mp4" | |
_merge_audio_video(str(wav_path), video_src, str(out_mp4)) | |
# Sidecar JSON | |
meta = { | |
"id": base, | |
"created_utc": datetime.datetime.utcnow().isoformat() + "Z", | |
"source_video": Path(video_src).name, | |
"output_video": Path(out_mp4).name, | |
"prompt": prompt or "", | |
"watermark_note": WATERMARK_NOTE, | |
"tool": "ShortiFoley (HunyuanVideo-Foley)" | |
} | |
(OUTPUTS_DIR / f"{base}.json").write_text(json.dumps(meta, ensure_ascii=False, indent=2)) | |
return str(out_mp4) | |
def _list_gallery(limit: int = 100) -> List[str]: | |
vids = [] | |
for p in sorted(OUTPUTS_DIR.glob("*.mp4"), key=lambda x: x.stat().st_mtime, reverse=True): | |
vids.append(str(p)) | |
if len(vids) >= limit: | |
break | |
return vids | |
# ================ | |
# Inference kernel | |
# ================ | |
def infer_single_video( | |
video_file: str, | |
text_prompt: str, | |
guidance_scale: float = 4.5, | |
num_inference_steps: int = 50, | |
sample_nums: int = 1, | |
) -> Tuple[List[str], str]: | |
""" | |
Generate Foley audio for an uploaded video (1β6 variants). | |
Returns: (list of output video paths, status message) | |
""" | |
# Lazy-load on GPU ONLY here (prevents CUDA init in main process) | |
if _model_dict is None or _cfg is None: | |
msg = auto_load_models(device_str="cuda") | |
if not str(msg).startswith("β "): | |
return [], f"β {msg}" | |
if not video_file: | |
return [], "β Please provide a video." | |
sys.path.append(str(REPO_DIR)) | |
from hunyuanvideo_foley.utils.feature_utils import feature_process | |
from hunyuanvideo_foley.utils.model_utils import denoise_process | |
# Avoid autocast to float16 to fix Half/Float mismatch inside Synchformer conv3d | |
with torch.autocast(device_type="cuda", enabled=False): | |
# preprocess | |
visual_feats, text_feats, audio_len_s = feature_process( | |
video_file, (text_prompt or "").strip(), _model_dict, _cfg | |
) | |
# generate batch | |
n = int(max(1, min(6, sample_nums))) | |
audio, sr = denoise_process( | |
visual_feats, | |
text_feats, | |
audio_len_s, | |
_model_dict, | |
_cfg, | |
guidance_scale=float(guidance_scale), | |
num_inference_steps=int(num_inference_steps), | |
batch_size=n, | |
) | |
# save results | |
outs = [] | |
for i in range(n): | |
outs.append(_save_outputs(video_file, audio[i], sr, i + 1, text_prompt or "")) | |
return outs, f"β Generated {len(outs)} result(s). Saved to {OUTPUTS_DIR}/" | |
# ------------- | |
# Gradio UI (with MCP+API inside the same app) | |
# ------------- | |
def _about_html() -> str: | |
return f""" | |
<div style="line-height:1.6"> | |
<h2>About ShortiFoley</h2> | |
<p><b>ShortiFoley</b> turns short videos into realistic Foley sound.<br/> | |
Powered by Tencentβs HunyuanVideo-Foley (SigLIP2 + CLAP), with autosave and an MCP server for automation | |
(<a href="https://n8n.partnerlinks.io/bilsimaging" target="_blank" rel="noopener">n8n</a> flows).</p> | |
<p><b>Created by <a href="https://bilsimaging.com" target="_blank" rel="noopener">bilsimaging.com</a></b></p> | |
<h3>Quick Steps</h3> | |
<ol> | |
<li>Upload a clip (ideally < 120s).</li> | |
<li>Optionally describe the sound (English).</li> | |
<li>Pick variants (1β6), adjust CFG and steps.</li> | |
<li>Hit <b>Generate</b>. Results show on the right and save into the Gallery.</li> | |
</ol> | |
<h3>Tips for Best Quality</h3> | |
<ul> | |
<li>Use tight clips (5β30s) around the action.</li> | |
<li>Include material & action cues: βmetal clangβ, βglass shatterβ, βrubber on wet tileβ.</li> | |
<li>Describe ambience: βroomyβ, βechoeyβ, βdistant crowdβ.</li> | |
<li>Generate 2β4 variants and pick the most natural.</li> | |
</ul> | |
<h3>MCP & API</h3> | |
<p>This Space exposes an <b>MCP server</b> and simple REST endpoints (see βAPI & MCPβ tab). | |
Perfect for media-automation pipelines and tools like <b><a href="https://n8n.partnerlinks.io/bilsimaging" target="_blank" rel="noopener">n8n</a></b>.</p> | |
</div> | |
""" | |
def create_ui() -> gr.Blocks: | |
css = """ | |
.main-header{ text-align:center; padding:1.2rem; border-radius:18px; background:linear-gradient(135deg,#6366f1,#8b5cf6); color:white; box-shadow:0 12px 40px rgba(99,102,241,.35); margin-bottom:16px;} | |
.main-header h1{ margin:0; font-size:2.0rem; font-weight:800;} | |
.main-header p{ margin:.25rem 0 0; opacity:.95; font-weight:500;} | |
.card{ background:white; border:1px solid #e7e9ef; border-radius:16px; padding:14px; box-shadow:0 10px 28px rgba(0,0,0,.06);} | |
.generate-btn button{ font-weight:800; border-radius:12px; padding:10px 18px;} | |
.minor-btn button{ border-radius:10px;} | |
.muted{ color:#64748b; } | |
.footer-text{ color:#64748b; text-align:center; padding:12px 0; font-size:.95rem; } | |
""" | |
with gr.Blocks(title="ShortiFoley β HunyuanVideo-Foley", css=css) as demo: | |
gr.HTML(f"<div class='main-header'><h1>{SPACE_TITLE}</h1><p>{SPACE_TAGLINE}</p></div>") | |
with gr.Tabs(): | |
with gr.Tab("Run"): | |
with gr.Row(): | |
# LEFT: input | |
with gr.Column(scale=1, elem_classes=["card"]): | |
gr.Markdown("### πΉ Input") | |
video_input = gr.Video(label="Upload Video", height=300) | |
text_input = gr.Textbox( | |
label="π― Audio Description (optional, English)", | |
placeholder="e.g., Rubber soles on wet tile; distant chatter; occasional splashes.", | |
lines=3 | |
) | |
with gr.Row(): | |
guidance_scale = gr.Slider(1.0, 10.0, value=4.5, step=0.1, label="CFG") | |
steps = gr.Slider(10, 100, value=50, step=5, label="Steps") | |
samples = gr.Slider(1, 6, value=1, step=1, label="Variants") | |
with gr.Row(): | |
load_btn = gr.Button("βοΈ Load model (CPU)", variant="secondary", elem_classes=["minor-btn"]) | |
generate = gr.Button("π΅ Generate", variant="primary", elem_classes=["generate-btn"]) | |
status = gr.Textbox(label="Status", interactive=False) | |
# RIGHT: results | |
with gr.Column(scale=1, elem_classes=["card"]): | |
gr.Markdown("### π₯ Result(s)") | |
v1 = gr.Video(label="Sample 1", height=260, visible=True) | |
with gr.Row(): | |
v2 = gr.Video(label="Sample 2", height=160, visible=False) | |
v3 = gr.Video(label="Sample 3", height=160, visible=False) | |
with gr.Row(): | |
v4 = gr.Video(label="Sample 4", height=160, visible=False) | |
v5 = gr.Video(label="Sample 5", height=160, visible=False) | |
v6 = gr.Video(label="Sample 6", height=160, visible=False) | |
gr.Markdown("<span class='muted'>Autosaved to the Gallery tab.</span>") | |
# Generate handler (single binding, exact outputs) | |
def _process_and_update(video_file, text_prompt, cfg, nsteps, nsamples): | |
outs, msg = infer_single_video(video_file, text_prompt, cfg, nsteps, nsamples) | |
vis = [] | |
for i in range(6): | |
if outs and i < len(outs): | |
vis.append(gr.update(visible=True, value=outs[i])) | |
else: | |
vis.append(gr.update(visible=(i == 0), value=None if i > 0 else None)) | |
return (*vis, msg) | |
generate.click( | |
fn=_process_and_update, | |
inputs=[video_input, text_input, guidance_scale, steps, samples], | |
outputs=[v1, v2, v3, v4, v5, v6, status], | |
api_name="/infer", | |
api_description="Generate Foley audio for an uploaded video. Returns up to 6 video+audio files." | |
) | |
load_btn.click( | |
fn=lambda: auto_load_models(device_str="cpu"), | |
inputs=[], | |
outputs=[status], | |
api_name="/load_model", | |
api_description="Load/initialize the ShortiFoley model and encoders on CPU (GPU loads during inference)." | |
) | |
# Toggle visibility based on variants | |
def _toggle_vis(n): | |
n = int(n) | |
return [ | |
gr.update(visible=True), | |
gr.update(visible=n >= 2), | |
gr.update(visible=n >= 3), | |
gr.update(visible=n >= 4), | |
gr.update(visible=n >= 5), | |
gr.update(visible=n >= 6), | |
] | |
samples.change(_toggle_vis, inputs=[samples], outputs=[v1, v2, v3, v4, v5, v6]) | |
with gr.Tab("π Gallery"): | |
gr.Markdown("Latest generated videos (autosaved to <code>outputs/autosaved/</code>).") | |
gallery = gr.Gallery( | |
value=_list_gallery(), | |
columns=3, | |
preview=True, | |
label="Saved Results" | |
) | |
refresh = gr.Button("π Refresh Gallery") | |
refresh.click(lambda: _list_gallery(), outputs=[gallery]) | |
with gr.Tab("API & MCP"): | |
gr.Markdown(""" | |
### REST examples | |
**POST** `/api_generate_from_url` | |
```json | |
{ | |
"video_url_or_b64": "https://yourhost/sample.mp4", | |
"text_prompt": "metallic clink; hollow room reverb", | |
"guidance_scale": 4.5, | |
"num_inference_steps": 50, | |
"sample_nums": 2 | |
} | |
``` | |
**POST** `/load_model_tool` | |
Loads the model proactively (useful before batch runs). | |
**MCP resources & prompt** | |
- `shortifoley://status` β quick health info | |
- `foley_prompt` β reusable guidance for describing the sound | |
Works great for media-automation in tools like **n8n**: call `load_model_tool` once, then `api_generate_from_url` for each clip. | |
""") | |
with gr.Tab("βΉοΈ About"): | |
gr.HTML(_about_html()) | |
# Footer | |
gr.HTML( | |
""" | |
<div class="footer-text"> | |
π Created by <a href="https://bilsimaging.com" target="_blank" rel="noopener">bilsimaging.com</a> | |
· Powered by HunyuanVideo-Foley | |
</div> | |
""" | |
) | |
# ---- REST + MCP endpoints (inside Blocks) ---- | |
def _download_to_tmp(url: str) -> str: | |
try: | |
import requests | |
except Exception: | |
raise RuntimeError("Missing dependency 'requests'. Add it to requirements.txt to use URL inputs.") | |
r = requests.get(url, timeout=30) | |
r.raise_for_status() | |
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") | |
tmp.write(r.content) | |
tmp.flush() | |
tmp.close() | |
return tmp.name | |
def _maybe_from_base64(data_url_or_b64: str) -> str: | |
b64 = data_url_or_b64 | |
if data_url_or_b64.startswith("data:"): | |
b64 = data_url_or_b64.split(",", 1)[-1] | |
raw = base64.b64decode(b64) | |
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") | |
tmp.write(raw) | |
tmp.flush() | |
tmp.close() | |
return tmp.name | |
def _normalize_video_input(video_url_or_b64: str) -> str: | |
v = (video_url_or_b64 or "").strip() | |
if v.startswith("http://") or v.startswith("https://"): | |
return _download_to_tmp(v) | |
return _maybe_from_base64(v) | |
def api_generate_from_url( | |
video_url_or_b64: str, | |
text_prompt: str = "", | |
guidance_scale: float = 4.5, | |
num_inference_steps: int = 50, | |
sample_nums: int = 1, | |
) -> Dict[str, List[str]]: | |
if _model_dict is None or _cfg is None: | |
msg = auto_load_models(device_str="cpu") # safe in HTTP context | |
if not str(msg).startswith("β "): | |
raise RuntimeError(msg) | |
local = _normalize_video_input(video_url_or_b64) | |
outs, msg = infer_single_video(local, text_prompt, guidance_scale, num_inference_steps, sample_nums) | |
return {"videos": outs, "message": msg} | |
def load_model_tool() -> str: | |
"""Ensure model is loaded on server (convenient for MCP/REST).""" | |
return auto_load_models(device_str="cpu") | |
def shortifoley_status() -> str: | |
"""Return a simple readiness string for MCP clients.""" | |
ready = _model_dict is not None and _cfg is not None | |
dev = "cuda" if (_device and _device.type == "cuda") else ("mps" if (_device and _device.type == "mps") else "cpu") | |
return f"ShortiFoley status: {'ready' if ready else 'loading'} | device={dev} | outputs={OUTPUTS_DIR}" | |
def foley_prompt(name: str = "default") -> str: | |
"""Reusable guidance for describing sound ambience.""" | |
return ( | |
"Describe the expected environmental sound precisely. Mention material, rhythm, intensity, and ambience.\n" | |
"Example: 'Soft leather footfalls on wet pavement with distant traffic hiss; occasional splashes.'" | |
) | |
return demo | |
def set_seeds(s: int = 1): | |
random.seed(s) | |
np.random.seed(s) | |
torch.manual_seed(s) | |
# ------------- | |
# App bootstrap | |
# ------------- | |
if __name__ == "__main__": | |
logger.remove() | |
logger.add(lambda m: print(m, end=""), level="INFO") | |
set_seeds(1) | |
logger.info("===== Application Startup =====\n") | |
prepare_once() | |
# Probe imports (early surfacing) | |
sys.path.append(str(REPO_DIR)) | |
try: | |
from hunyuanvideo_foley.utils.model_utils import load_model, denoise_process # noqa: F401 | |
from hunyuanvideo_foley.utils.feature_utils import feature_process # noqa: F401 | |
from hunyuanvideo_foley.utils.media_utils import merge_audio_video # noqa: F401 | |
except Exception as e: | |
logger.warning(f"Repo imports not ready yet: {e}") | |
ui = create_ui() | |
# Enable MCP server so tools/resources/prompts are discoverable | |
ui.launch( | |
server_name="0.0.0.0", | |
share=False, | |
show_error=True, | |
mcp_server=True, # Enable MCP server | |
) | |