Spaces:
Running
on
Zero
Running
on
Zero
import os, sys, json, tempfile, subprocess, shutil, uuid, glob, traceback, datetime | |
from pathlib import Path | |
from typing import Tuple, List | |
# ================= Crash trap & verbose logs ================= | |
import faulthandler | |
faulthandler.enable() | |
os.environ.setdefault("GRADIO_ANALYTICS_ENABLED", "false") | |
os.environ.setdefault("GRADIO_NUM_PORTS", "1") | |
os.environ.setdefault("HF_HUB_VERBOSE", "1") | |
os.environ.setdefault("TRANSFORMERS_VERBOSITY", "info") | |
os.environ.setdefault("PYTHONUNBUFFERED", "1") | |
def _crash_trap(exctype, value, tb): | |
ts = datetime.datetime.utcnow().isoformat() | |
print(f"\n===== FATAL ({ts}Z) =====================================") | |
traceback.print_exception(exctype, value, tb) | |
print("=========================================================\n", flush=True) | |
sys.excepthook = _crash_trap | |
# ============================================================ | |
import gradio as gr | |
import spaces | |
from huggingface_hub import snapshot_download | |
from loguru import logger | |
import torch, torchaudio | |
# ========= Paths & Config ========= | |
ROOT = Path(__file__).parent.resolve() | |
REPO_DIR = ROOT / "HunyuanVideo-Foley" | |
WEIGHTS_DIR = ROOT / "weights" | |
CACHE_DIR = ROOT / "cache" | |
OUT_DIR = ROOT / "outputs" | |
ASSETS = ROOT / "assets" | |
ASSETS.mkdir(exist_ok=True) | |
APP_TITLE = os.environ.get("APP_TITLE", "Foley Studio · ZeroGPU") | |
APP_TAGLINE = os.environ.get("APP_TAGLINE", "Generate scene-true foley for short clips (ZeroGPU-ready).") | |
PRIMARY_COLOR = os.environ.get("PRIMARY_COLOR", "#6B5BFF") | |
# ZeroGPU-safe defaults (tweak in Space Secrets if needed) | |
MAX_SECS = int(os.environ.get("MAX_SECS", "15")) | |
TARGET_H = int(os.environ.get("TARGET_H", "480")) | |
SR = int(os.environ.get("TARGET_SR", "48000")) | |
ZEROGPU_DURATION = int(os.environ.get("ZEROGPU_DURATION", "110")) | |
def sh(cmd: str): | |
print(">>", cmd) | |
subprocess.run(cmd, shell=True, check=True) | |
def ffprobe_duration(path: str) -> float: | |
try: | |
out = subprocess.check_output([ | |
"ffprobe", "-v", "error", "-show_entries", "format=duration", | |
"-of", "default=noprint_wrappers=1:nokey=1", path | |
]).decode().strip() | |
return float(out) | |
except Exception: | |
return 0.0 | |
def _clone_without_lfs(): | |
""" | |
Clone repo while skipping LFS smudge to avoid huge demo assets. | |
Falls back to sparse checkout with only essential paths. | |
""" | |
if REPO_DIR.exists(): | |
return | |
try: | |
sh( | |
"GIT_LFS_SKIP_SMUDGE=1 " | |
"git -c filter.lfs.smudge= -c filter.lfs.required=false " | |
f"clone --depth 1 https://github.com/Tencent-Hunyuan/HunyuanVideo-Foley.git {REPO_DIR}" | |
) | |
assets = REPO_DIR / "assets" | |
if assets.exists(): | |
shutil.rmtree(assets, ignore_errors=True) | |
return | |
except subprocess.CalledProcessError as e: | |
print("Shallow clone with LFS skipped failed, trying sparse checkout…", e) | |
REPO_DIR.mkdir(parents=True, exist_ok=True) | |
sh(f"git -C {REPO_DIR} init") | |
sh( | |
f"git -C {REPO_DIR} -c filter.lfs.smudge= -c filter.lfs.required=false " | |
"remote add origin https://github.com/Tencent-Hunyuan/HunyuanVideo-Foley.git" | |
) | |
sh(f"git -C {REPO_DIR} config core.sparseCheckout true") | |
sparse_file = REPO_DIR / ".git" / "info" / "sparse-checkout" | |
sparse_file.parent.mkdir(parents=True, exist_ok=True) | |
sparse_file.write_text("\n".join([ | |
"hunyuanvideo_foley/", | |
"configs/", | |
"gradio_app.py", | |
"requirements.txt", | |
"LICENSE", | |
"README.md", | |
]) + "\n") | |
try: | |
sh(f"git -C {REPO_DIR} fetch --depth 1 origin main") | |
sh(f"git -C {REPO_DIR} checkout main") | |
except subprocess.CalledProcessError: | |
sh(f"git -C {REPO_DIR} fetch --depth 1 origin master") | |
sh(f"git -C {REPO_DIR} checkout master") | |
def prepare_once(): | |
"""Clone code (skip LFS), download weights, set env, prepare dirs.""" | |
_clone_without_lfs() | |
if str(REPO_DIR) not in sys.path: | |
sys.path.insert(0, str(REPO_DIR)) | |
WEIGHTS_DIR.mkdir(parents=True, exist_ok=True) | |
snapshot_download( | |
repo_id="tencent/HunyuanVideo-Foley", | |
local_dir=str(WEIGHTS_DIR), | |
local_dir_use_symlinks=False, | |
repo_type="model", | |
resume_download=True, | |
) | |
os.environ["HIFI_FOLEY_MODEL_PATH"] = str(WEIGHTS_DIR) | |
CACHE_DIR.mkdir(exist_ok=True) | |
OUT_DIR.mkdir(exist_ok=True) | |
prepare_once() | |
# Prefer safetensors & fast transfer | |
os.environ["TRANSFORMERS_PREFER_SAFETENSORS"] = "1" | |
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" | |
def ensure_clap_safetensors(): | |
""" | |
Pre-cache ONLY safetensors for laion/larger_clap_general so | |
Transformers never selects a stale/corrupt *.bin. | |
""" | |
snapshot_download( | |
repo_id="laion/larger_clap_general", | |
allow_patterns=[ | |
"*.safetensors", "config.json", "*.json", "*.txt", | |
"tokenizer*", "*merges*", "*vocab*" | |
], | |
ignore_patterns=["*.bin"], | |
resume_download=True, | |
local_dir=None, | |
local_dir_use_symlinks=False, | |
) | |
def _purge_clap_pt_bins(): | |
"""Remove any cached .bin for laion/larger_clap_general.""" | |
cache_root = Path.home() / ".cache" / "huggingface" / "hub" | |
for pat in [ | |
cache_root / "models--laion--larger_clap_general" / "snapshots" / "*" / "*.bin", | |
]: | |
for f in glob.glob(str(pat)): | |
try: | |
Path(f).unlink() | |
print(f">> Purged cached bin: {f}") | |
except Exception: | |
pass | |
# ---- Dependency guards (early / clear errors) ------------------------------- | |
try: | |
import audiotools # provided by PyPI package 'descript-audiotools' | |
except Exception as e: | |
raise RuntimeError( | |
"Missing module 'audiotools'. Install via PyPI package " | |
"'descript-audiotools' (add 'descript-audiotools>=0.7.2' to requirements.txt)." | |
) from e | |
try: | |
import omegaconf # noqa: F401 | |
import yaml # from pyyaml | |
import easydict # noqa: F401 | |
except Exception as e: | |
raise RuntimeError( | |
"Missing config deps. Add to requirements.txt: " | |
"'omegaconf>=2.3.0', 'pyyaml', 'easydict'." | |
) from e | |
# Import Tencent internals after guards | |
from hunyuanvideo_foley.utils.model_utils import load_model, denoise_process | |
from hunyuanvideo_foley.utils.feature_utils import feature_process | |
from hunyuanvideo_foley.utils.media_utils import merge_audio_video | |
# ========= Native Model Setup ========= | |
MODEL_PATH = os.environ.get("HIFI_FOLEY_MODEL_PATH", str(WEIGHTS_DIR)) | |
CONFIG_PATH = str(REPO_DIR / "configs" / "hunyuanvideo-foley-xxl.yaml") | |
_model_dict = None | |
_cfg = None | |
_device = None | |
def _setup_device(device_str: str = "auto", gpu_id: int = 0) -> torch.device: | |
if device_str == "auto": | |
if torch.cuda.is_available(): | |
d = torch.device(f"cuda:{gpu_id}") | |
logger.info(f"Using CUDA {d}") | |
elif torch.backends.mps.is_available(): | |
d = torch.device("mps") | |
logger.info("Using MPS") | |
else: | |
d = torch.device("cpu") | |
logger.info("Using CPU") | |
else: | |
d = torch.device(device_str if device_str != "cuda" else f"cuda:{gpu_id}") | |
logger.info(f"Using specified device: {d}") | |
return d | |
def auto_load_models() -> str: | |
"""Load model natively (weights already downloaded to MODEL_PATH).""" | |
global _model_dict, _cfg, _device | |
if not os.path.exists(MODEL_PATH): | |
os.makedirs(MODEL_PATH, exist_ok=True) | |
if not os.path.exists(CONFIG_PATH): | |
return f"❌ Config file not found: {CONFIG_PATH}" | |
_device = _setup_device("auto", 0) | |
logger.info("Loading HunyuanVideo-Foley model...") | |
logger.info(f"MODEL_PATH: {MODEL_PATH}") | |
logger.info(f"CONFIG_PATH: {CONFIG_PATH}") | |
# Ensure CLAP uses safetensors; nuke any .bin first | |
ensure_clap_safetensors() | |
_purge_clap_pt_bins() | |
# Lock HF Hub to offline so Transformers can't fetch a fresh .bin again | |
os.environ["HF_HUB_OFFLINE"] = "1" | |
os.environ["TRANSFORMERS_OFFLINE"] = "1" | |
_model_dict, _cfg = load_model(MODEL_PATH, CONFIG_PATH, _device) | |
logger.info("✅ Model loaded") | |
return "✅ Model loaded" | |
# Init logger and load model once (with explicit crash surface) | |
logger.remove() | |
logger.add(lambda msg: print(msg, end=''), level="INFO") | |
try: | |
msg = auto_load_models() | |
logger.info(msg) | |
except Exception as e: | |
print("\n[BOOT][ERROR] auto_load_models() failed:") | |
traceback.print_exc() | |
with gr.Blocks(title="Foley Studio · Boot Error") as demo: | |
gr.Markdown("### ❌ Boot failure\n```\n" + "".join(traceback.format_exc()) + "\n```") | |
demo.launch(server_name="0.0.0.0") | |
raise | |
# ========= Preprocessing ========= | |
def preprocess_video(in_path: str) -> Tuple[str, float]: | |
""" | |
- Trim to <= MAX_SECS | |
- Downscale to TARGET_H (keep AR), strip audio | |
- Return processed mp4 path and final duration | |
""" | |
dur = ffprobe_duration(in_path) | |
if dur == 0: | |
raise RuntimeError("Unable to read the video duration.") | |
temp_dir = Path(tempfile.mkdtemp(prefix="pre_")) | |
trimmed = temp_dir / "trim.mp4" | |
processed = temp_dir / "proc.mp4" | |
trim_args = ["-t", str(MAX_SECS)] if dur > MAX_SECS else [] | |
# Normalize & remove audio | |
sh(" ".join([ | |
"ffmpeg", "-y", "-i", f"\"{in_path}\"", | |
*trim_args, | |
"-an", | |
"-vcodec", "libx264", "-preset", "veryfast", "-crf", "23", | |
"-movflags", "+faststart", | |
f"\"{trimmed}\"" | |
])) | |
# Downscale to TARGET_H; ensure mod2 width | |
vf = f"scale=-2:{TARGET_H}:flags=bicubic" | |
sh(" ".join([ | |
"ffmpeg", "-y", "-i", f"\"{trimmed}\"", | |
"-vf", f"\"{vf}\"", | |
"-an", | |
"-vcodec", "libx264", "-profile:v", "baseline", "-level", "3.1", | |
"-pix_fmt", "yuv420p", | |
"-preset", "veryfast", "-crf", "24", | |
"-movflags", "+faststart", | |
f"\"{processed}\"" | |
])) | |
final_dur = min(dur, float(MAX_SECS)) | |
return str(processed), final_dur | |
# ========= Inference (ZeroGPU) ========= | |
def run_model(video_path: str, prompt_text: str, | |
guidance_scale: float = 4.5, | |
num_inference_steps: int = 50, | |
sample_nums: int = 1) -> Tuple[List[str], int]: | |
""" | |
Native inference (no shell). Returns ([wav_paths], sample_rate). | |
""" | |
if _model_dict is None or _cfg is None: | |
raise RuntimeError("Model not loaded yet.") | |
text_prompt = (prompt_text or "").strip() | |
# Extract features | |
visual_feats, text_feats, audio_len_s = feature_process( | |
video_path, text_prompt, _model_dict, _cfg | |
) | |
# Generate audio (B x C x T) | |
logger.info(f"Generating {sample_nums} sample(s)...") | |
audio_batch, sr = denoise_process( | |
visual_feats, text_feats, audio_len_s, _model_dict, _cfg, | |
guidance_scale=guidance_scale, | |
num_inference_steps=num_inference_steps, | |
batch_size=sample_nums | |
) | |
# Save each sample as WAV | |
out_dir = OUT_DIR / f"job_{uuid.uuid4().hex[:8]}" | |
out_dir.mkdir(parents=True, exist_ok=True) | |
wav_paths = [] | |
for i in range(sample_nums): | |
wav_p = out_dir / f"generated_audio_{i+1}.wav" | |
torchaudio.save(str(wav_p), audio_batch[i], sr) | |
wav_paths.append(str(wav_p)) | |
return wav_paths, sr | |
# ========= Optional: Mux Foley back to video ========= | |
def mux_audio_with_video(video_path: str, audio_path: str) -> str: | |
out_path = Path(tempfile.mkdtemp(prefix="mux_")) / "with_foley.mp4" | |
sh(" ".join([ | |
"ffmpeg", "-y", | |
"-i", f"\"{video_path}\"", | |
"-i", f"\"{audio_path}\"", | |
"-map", "0:v:0", "-map", "1:a:0", | |
"-c:v", "copy", "-c:a", "aac", "-b:a", "192k", | |
"-shortest", | |
f"\"{out_path}\"" | |
])) | |
return str(out_path) | |
# ========= UI Handlers ========= | |
def single_generate(video: str, prompt: str, want_mux: bool, project_name: str): | |
history = [] | |
try: | |
if not video: | |
return None, None, "⚠️ Please upload a video.", history | |
history.append(["Preprocess", "Downscaling & trimming"]) | |
pre_path, final_dur = preprocess_video(video) | |
history.append(["Inference", "ZeroGPU native pipeline"]) | |
wav_list, sr = run_model( | |
pre_path, prompt or "", guidance_scale=4.5, num_inference_steps=50, sample_nums=1 | |
) | |
if not wav_list: | |
raise RuntimeError("No audio produced.") | |
wav = wav_list[0] | |
muxed = None | |
if want_mux: | |
history.append(["Mux", "Merging foley with video"]) | |
muxed = mux_audio_with_video(pre_path, wav) | |
history.append(["Done", f"OK · ~{final_dur:.1f}s"]) | |
return wav, muxed, f"✅ Completed (~{final_dur:.1f}s)", history | |
except Exception as e: | |
history.append(["Error", str(e)]) | |
return None, None, f"❌ {type(e).__name__}: {e}", history | |
def batch_lite_generate(files: List[str], prompt: str, want_mux: bool): | |
log = [] | |
if not files: | |
return "⚠️ Please upload 1–3 videos.", log | |
if len(files) > 3: | |
files = files[:3] | |
log.append(["Info", "Limiting to first 3 videos."]) | |
outputs = [] | |
for i, f in enumerate(files, 1): | |
try: | |
log.append([f"Preprocess {i}", Path(f).name]) | |
pre, final_dur = preprocess_video(f) | |
log.append([f"Run {i}", f"ZeroGPU ~{final_dur:.1f}s"]) | |
wav_list, sr = run_model(pre, prompt or "", sample_nums=1) | |
if not wav_list: | |
raise RuntimeError("No audio produced.") | |
wav = wav_list[0] | |
muxed = mux_audio_with_video(pre, wav) if want_mux else None | |
outputs.append((wav, muxed)) | |
log.append([f"Done {i}", "OK"]) | |
except Exception as e: | |
log.append([f"Error {i}", str(e)]) | |
manifest = OUT_DIR / f"batchlite_{uuid.uuid4().hex[:6]}.json" | |
manifest.write_text(json.dumps( | |
[{"wav": w, "video": v} for (w, v) in outputs], ensure_ascii=False, indent=2 | |
)) | |
return f"✅ Batch-lite finished · items: {len(outputs)}", log | |
# ========= UI (refreshed design) ========= | |
THEME_CSS = f""" | |
:root {{ | |
--brand: {PRIMARY_COLOR}; | |
--bg: #0f1120; | |
--panel: #181a2e; | |
--text: #edf0ff; | |
--muted: #b7bce3; | |
--card: #15172a; | |
}} | |
.gradio-container {{ | |
font-family: Inter, ui-sans-serif, -apple-system, Segoe UI, Roboto, Cairo, Noto Sans, Arial; | |
background: var(--bg); | |
color: var(--text); | |
}} | |
#hero {{ | |
background: linear-gradient(135deg, var(--brand) 0%, #2f2e8b 40%, #1b1a3a 100%); | |
border-radius: 18px; | |
padding: 18px 20px; | |
color: white; | |
box-shadow: 0 10px 30px rgba(0,0,0,.35); | |
}} | |
#hero h1 {{ | |
margin: 0 0 6px 0; | |
font-size: 20px; | |
font-weight: 700; | |
letter-spacing: .2px; | |
}} | |
#hero p {{ | |
margin: 0; | |
opacity: .95; | |
}} | |
.gr-tabitem, .gr-block.gr-group, .gr-panel {{ | |
background: var(--panel); | |
border-radius: 16px !important; | |
box-shadow: 0 6px 18px rgba(0,0,0,.28); | |
border: 1px solid rgba(255,255,255,.04); | |
}} | |
.gr-button {{ | |
border-radius: 12px !important; | |
border: 1px solid rgba(255,255,255,.08) !important; | |
}} | |
.gradio-container .tabs .tab-nav button.selected {{ | |
background: rgba(255,255,255,.06); | |
border-radius: 12px; | |
border: 1px solid rgba(255,255,255,.08); | |
}} | |
.badge {{ | |
display:inline-block; padding:2px 8px; border-radius:999px; | |
background: rgba(255,255,255,.12); color:#fff; font-size:12px | |
}} | |
""" | |
with gr.Blocks(css=THEME_CSS, title=APP_TITLE, analytics_enabled=False) as demo: | |
with gr.Row(): | |
gr.HTML(f""" | |
<div id="hero"> | |
<h1>{APP_TITLE}</h1> | |
<p>{APP_TAGLINE}</p> | |
<div style="margin-top:8px"><span class="badge">ZeroGPU</span> <span class="badge">Auto-trim ≤ {MAX_SECS}s</span> <span class="badge">Downscale {TARGET_H}p</span></div> | |
</div> | |
""") | |
with gr.Tabs(): | |
with gr.Tab("🎬 Single Clip"): | |
with gr.Group(): | |
project_name = gr.Textbox( | |
label="Project name (optional)", | |
placeholder="Enter a short label for this clip" | |
) | |
with gr.Row(): | |
v_single = gr.Video(label=f"Video (≤ ~{MAX_SECS}s recommended)") | |
p_single = gr.Textbox( | |
label="Sound prompt (optional)", | |
placeholder="e.g., soft footsteps on wood, light rain, indoor reverb" | |
) | |
with gr.Row(): | |
want_mux_single = gr.Checkbox(value=True, label="Mux foley into MP4 output") | |
run_btn = gr.Button("Generate", variant="primary") | |
with gr.Row(): | |
out_audio = gr.Audio(label=f"Generated Foley ({SR//1000} kHz WAV)", type="filepath") | |
out_mux = gr.Video(label="Video + Foley (MP4)", visible=True) | |
status_md = gr.Markdown() | |
history_table = gr.Dataframe( | |
headers=["Step", "Note"], datatype=["str","str"], | |
interactive=False, wrap=True, label="Activity" | |
) | |
run_btn.click( | |
single_generate, | |
inputs=[v_single, p_single, want_mux_single, project_name], | |
outputs=[out_audio, out_mux, status_md, history_table] | |
) | |
with gr.Tab("📦 Batch-Lite (1–3 clips)"): | |
files = gr.Files(label="Upload 1–3 short videos", file_types=[".mp4",".mov"], file_count="multiple") | |
prompt_b = gr.Textbox(label="Global prompt (optional)") | |
want_mux_b = gr.Checkbox(value=True, label="Mux each output") | |
go_b = gr.Button("Run batch-lite") | |
batch_status = gr.Markdown() | |
batch_log = gr.Dataframe( | |
headers=["Step","Note"], datatype=["str","str"], | |
interactive=False, wrap=True, label="Batch Log" | |
) | |
go_b.click( | |
batch_lite_generate, | |
inputs=[files, prompt_b, want_mux_b], | |
outputs=[batch_status, batch_log] | |
) | |
with gr.Tab("ℹ️ Tips"): | |
gr.Markdown(f""" | |
**Usage guidelines** | |
- Keep clips short (the tool trims to **≤ {MAX_SECS}s** automatically). | |
- The video is downscaled to **{TARGET_H}p** to fit the ZeroGPU time window. | |
- If you see a quota message, try again later (ZeroGPU limits GPU minutes per visitor). | |
**Outputs** | |
- WAV is **{SR//1000} kHz** stereo. | |
- Enable **Mux** to get a ready MP4 with the generated foley track. | |
""") | |
# ---- Health endpoint & guarded launch --------------------------------------- | |
try: | |
from fastapi import FastAPI | |
fastapi_app = demo.app # Gradio's FastAPI app | |
def _health(): | |
return {"ok": True, "model_loaded": _model_dict is not None, "device": str(_device)} | |
except Exception: | |
pass | |
try: | |
demo.queue(max_size=24).launch(server_name="0.0.0.0") | |
except Exception: | |
print("\n[BOOT][ERROR] Gradio launch failed:") | |
traceback.print_exc() | |
raise | |