ShortiFoley / app.py
Bils's picture
Update app.py
ec1dfdf verified
raw
history blame
19.1 kB
import os, sys, json, tempfile, subprocess, shutil, uuid, glob, traceback, datetime
from pathlib import Path
from typing import Tuple, List
# ================= Crash trap & verbose logs =================
import faulthandler
faulthandler.enable()
os.environ.setdefault("GRADIO_ANALYTICS_ENABLED", "false")
os.environ.setdefault("GRADIO_NUM_PORTS", "1")
os.environ.setdefault("HF_HUB_VERBOSE", "1")
os.environ.setdefault("TRANSFORMERS_VERBOSITY", "info")
os.environ.setdefault("PYTHONUNBUFFERED", "1")
def _crash_trap(exctype, value, tb):
ts = datetime.datetime.utcnow().isoformat()
print(f"\n===== FATAL ({ts}Z) =====================================")
traceback.print_exception(exctype, value, tb)
print("=========================================================\n", flush=True)
sys.excepthook = _crash_trap
# ============================================================
import gradio as gr
import spaces
from huggingface_hub import snapshot_download
from loguru import logger
import torch, torchaudio
# ========= Paths & Config =========
ROOT = Path(__file__).parent.resolve()
REPO_DIR = ROOT / "HunyuanVideo-Foley"
WEIGHTS_DIR = ROOT / "weights"
CACHE_DIR = ROOT / "cache"
OUT_DIR = ROOT / "outputs"
ASSETS = ROOT / "assets"
ASSETS.mkdir(exist_ok=True)
APP_TITLE = os.environ.get("APP_TITLE", "Foley Studio · ZeroGPU")
APP_TAGLINE = os.environ.get("APP_TAGLINE", "Generate scene-true foley for short clips (ZeroGPU-ready).")
PRIMARY_COLOR = os.environ.get("PRIMARY_COLOR", "#6B5BFF")
# ZeroGPU-safe defaults (tweak in Space Secrets if needed)
MAX_SECS = int(os.environ.get("MAX_SECS", "15"))
TARGET_H = int(os.environ.get("TARGET_H", "480"))
SR = int(os.environ.get("TARGET_SR", "48000"))
ZEROGPU_DURATION = int(os.environ.get("ZEROGPU_DURATION", "110"))
def sh(cmd: str):
print(">>", cmd)
subprocess.run(cmd, shell=True, check=True)
def ffprobe_duration(path: str) -> float:
try:
out = subprocess.check_output([
"ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", path
]).decode().strip()
return float(out)
except Exception:
return 0.0
def _clone_without_lfs():
"""
Clone repo while skipping LFS smudge to avoid huge demo assets.
Falls back to sparse checkout with only essential paths.
"""
if REPO_DIR.exists():
return
try:
sh(
"GIT_LFS_SKIP_SMUDGE=1 "
"git -c filter.lfs.smudge= -c filter.lfs.required=false "
f"clone --depth 1 https://github.com/Tencent-Hunyuan/HunyuanVideo-Foley.git {REPO_DIR}"
)
assets = REPO_DIR / "assets"
if assets.exists():
shutil.rmtree(assets, ignore_errors=True)
return
except subprocess.CalledProcessError as e:
print("Shallow clone with LFS skipped failed, trying sparse checkout…", e)
REPO_DIR.mkdir(parents=True, exist_ok=True)
sh(f"git -C {REPO_DIR} init")
sh(
f"git -C {REPO_DIR} -c filter.lfs.smudge= -c filter.lfs.required=false "
"remote add origin https://github.com/Tencent-Hunyuan/HunyuanVideo-Foley.git"
)
sh(f"git -C {REPO_DIR} config core.sparseCheckout true")
sparse_file = REPO_DIR / ".git" / "info" / "sparse-checkout"
sparse_file.parent.mkdir(parents=True, exist_ok=True)
sparse_file.write_text("\n".join([
"hunyuanvideo_foley/",
"configs/",
"gradio_app.py",
"requirements.txt",
"LICENSE",
"README.md",
]) + "\n")
try:
sh(f"git -C {REPO_DIR} fetch --depth 1 origin main")
sh(f"git -C {REPO_DIR} checkout main")
except subprocess.CalledProcessError:
sh(f"git -C {REPO_DIR} fetch --depth 1 origin master")
sh(f"git -C {REPO_DIR} checkout master")
def prepare_once():
"""Clone code (skip LFS), download weights, set env, prepare dirs."""
_clone_without_lfs()
if str(REPO_DIR) not in sys.path:
sys.path.insert(0, str(REPO_DIR))
WEIGHTS_DIR.mkdir(parents=True, exist_ok=True)
snapshot_download(
repo_id="tencent/HunyuanVideo-Foley",
local_dir=str(WEIGHTS_DIR),
local_dir_use_symlinks=False,
repo_type="model",
resume_download=True,
)
os.environ["HIFI_FOLEY_MODEL_PATH"] = str(WEIGHTS_DIR)
CACHE_DIR.mkdir(exist_ok=True)
OUT_DIR.mkdir(exist_ok=True)
prepare_once()
# Prefer safetensors & fast transfer
os.environ["TRANSFORMERS_PREFER_SAFETENSORS"] = "1"
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1"
def ensure_clap_safetensors():
"""
Pre-cache ONLY safetensors for laion/larger_clap_general so
Transformers never selects a stale/corrupt *.bin.
"""
snapshot_download(
repo_id="laion/larger_clap_general",
allow_patterns=[
"*.safetensors", "config.json", "*.json", "*.txt",
"tokenizer*", "*merges*", "*vocab*"
],
ignore_patterns=["*.bin"],
resume_download=True,
local_dir=None,
local_dir_use_symlinks=False,
)
def _purge_clap_pt_bins():
"""Remove any cached .bin for laion/larger_clap_general."""
cache_root = Path.home() / ".cache" / "huggingface" / "hub"
for pat in [
cache_root / "models--laion--larger_clap_general" / "snapshots" / "*" / "*.bin",
]:
for f in glob.glob(str(pat)):
try:
Path(f).unlink()
print(f">> Purged cached bin: {f}")
except Exception:
pass
# ---- Dependency guards (early / clear errors) -------------------------------
try:
import audiotools # provided by PyPI package 'descript-audiotools'
except Exception as e:
raise RuntimeError(
"Missing module 'audiotools'. Install via PyPI package "
"'descript-audiotools' (add 'descript-audiotools>=0.7.2' to requirements.txt)."
) from e
try:
import omegaconf # noqa: F401
import yaml # from pyyaml
import easydict # noqa: F401
except Exception as e:
raise RuntimeError(
"Missing config deps. Add to requirements.txt: "
"'omegaconf>=2.3.0', 'pyyaml', 'easydict'."
) from e
# Import Tencent internals after guards
from hunyuanvideo_foley.utils.model_utils import load_model, denoise_process
from hunyuanvideo_foley.utils.feature_utils import feature_process
from hunyuanvideo_foley.utils.media_utils import merge_audio_video
# ========= Native Model Setup =========
MODEL_PATH = os.environ.get("HIFI_FOLEY_MODEL_PATH", str(WEIGHTS_DIR))
CONFIG_PATH = str(REPO_DIR / "configs" / "hunyuanvideo-foley-xxl.yaml")
_model_dict = None
_cfg = None
_device = None
def _setup_device(device_str: str = "auto", gpu_id: int = 0) -> torch.device:
if device_str == "auto":
if torch.cuda.is_available():
d = torch.device(f"cuda:{gpu_id}")
logger.info(f"Using CUDA {d}")
elif torch.backends.mps.is_available():
d = torch.device("mps")
logger.info("Using MPS")
else:
d = torch.device("cpu")
logger.info("Using CPU")
else:
d = torch.device(device_str if device_str != "cuda" else f"cuda:{gpu_id}")
logger.info(f"Using specified device: {d}")
return d
def auto_load_models() -> str:
"""Load model natively (weights already downloaded to MODEL_PATH)."""
global _model_dict, _cfg, _device
if not os.path.exists(MODEL_PATH):
os.makedirs(MODEL_PATH, exist_ok=True)
if not os.path.exists(CONFIG_PATH):
return f"❌ Config file not found: {CONFIG_PATH}"
_device = _setup_device("auto", 0)
logger.info("Loading HunyuanVideo-Foley model...")
logger.info(f"MODEL_PATH: {MODEL_PATH}")
logger.info(f"CONFIG_PATH: {CONFIG_PATH}")
# Ensure CLAP uses safetensors; nuke any .bin first
ensure_clap_safetensors()
_purge_clap_pt_bins()
# Lock HF Hub to offline so Transformers can't fetch a fresh .bin again
os.environ["HF_HUB_OFFLINE"] = "1"
os.environ["TRANSFORMERS_OFFLINE"] = "1"
_model_dict, _cfg = load_model(MODEL_PATH, CONFIG_PATH, _device)
logger.info("✅ Model loaded")
return "✅ Model loaded"
# Init logger and load model once (with explicit crash surface)
logger.remove()
logger.add(lambda msg: print(msg, end=''), level="INFO")
try:
msg = auto_load_models()
logger.info(msg)
except Exception as e:
print("\n[BOOT][ERROR] auto_load_models() failed:")
traceback.print_exc()
with gr.Blocks(title="Foley Studio · Boot Error") as demo:
gr.Markdown("### ❌ Boot failure\n```\n" + "".join(traceback.format_exc()) + "\n```")
demo.launch(server_name="0.0.0.0")
raise
# ========= Preprocessing =========
def preprocess_video(in_path: str) -> Tuple[str, float]:
"""
- Trim to <= MAX_SECS
- Downscale to TARGET_H (keep AR), strip audio
- Return processed mp4 path and final duration
"""
dur = ffprobe_duration(in_path)
if dur == 0:
raise RuntimeError("Unable to read the video duration.")
temp_dir = Path(tempfile.mkdtemp(prefix="pre_"))
trimmed = temp_dir / "trim.mp4"
processed = temp_dir / "proc.mp4"
trim_args = ["-t", str(MAX_SECS)] if dur > MAX_SECS else []
# Normalize & remove audio
sh(" ".join([
"ffmpeg", "-y", "-i", f"\"{in_path}\"",
*trim_args,
"-an",
"-vcodec", "libx264", "-preset", "veryfast", "-crf", "23",
"-movflags", "+faststart",
f"\"{trimmed}\""
]))
# Downscale to TARGET_H; ensure mod2 width
vf = f"scale=-2:{TARGET_H}:flags=bicubic"
sh(" ".join([
"ffmpeg", "-y", "-i", f"\"{trimmed}\"",
"-vf", f"\"{vf}\"",
"-an",
"-vcodec", "libx264", "-profile:v", "baseline", "-level", "3.1",
"-pix_fmt", "yuv420p",
"-preset", "veryfast", "-crf", "24",
"-movflags", "+faststart",
f"\"{processed}\""
]))
final_dur = min(dur, float(MAX_SECS))
return str(processed), final_dur
# ========= Inference (ZeroGPU) =========
@spaces.GPU(duration=ZEROGPU_DURATION)
@torch.inference_mode()
def run_model(video_path: str, prompt_text: str,
guidance_scale: float = 4.5,
num_inference_steps: int = 50,
sample_nums: int = 1) -> Tuple[List[str], int]:
"""
Native inference (no shell). Returns ([wav_paths], sample_rate).
"""
if _model_dict is None or _cfg is None:
raise RuntimeError("Model not loaded yet.")
text_prompt = (prompt_text or "").strip()
# Extract features
visual_feats, text_feats, audio_len_s = feature_process(
video_path, text_prompt, _model_dict, _cfg
)
# Generate audio (B x C x T)
logger.info(f"Generating {sample_nums} sample(s)...")
audio_batch, sr = denoise_process(
visual_feats, text_feats, audio_len_s, _model_dict, _cfg,
guidance_scale=guidance_scale,
num_inference_steps=num_inference_steps,
batch_size=sample_nums
)
# Save each sample as WAV
out_dir = OUT_DIR / f"job_{uuid.uuid4().hex[:8]}"
out_dir.mkdir(parents=True, exist_ok=True)
wav_paths = []
for i in range(sample_nums):
wav_p = out_dir / f"generated_audio_{i+1}.wav"
torchaudio.save(str(wav_p), audio_batch[i], sr)
wav_paths.append(str(wav_p))
return wav_paths, sr
# ========= Optional: Mux Foley back to video =========
def mux_audio_with_video(video_path: str, audio_path: str) -> str:
out_path = Path(tempfile.mkdtemp(prefix="mux_")) / "with_foley.mp4"
sh(" ".join([
"ffmpeg", "-y",
"-i", f"\"{video_path}\"",
"-i", f"\"{audio_path}\"",
"-map", "0:v:0", "-map", "1:a:0",
"-c:v", "copy", "-c:a", "aac", "-b:a", "192k",
"-shortest",
f"\"{out_path}\""
]))
return str(out_path)
# ========= UI Handlers =========
def single_generate(video: str, prompt: str, want_mux: bool, project_name: str):
history = []
try:
if not video:
return None, None, "⚠️ Please upload a video.", history
history.append(["Preprocess", "Downscaling & trimming"])
pre_path, final_dur = preprocess_video(video)
history.append(["Inference", "ZeroGPU native pipeline"])
wav_list, sr = run_model(
pre_path, prompt or "", guidance_scale=4.5, num_inference_steps=50, sample_nums=1
)
if not wav_list:
raise RuntimeError("No audio produced.")
wav = wav_list[0]
muxed = None
if want_mux:
history.append(["Mux", "Merging foley with video"])
muxed = mux_audio_with_video(pre_path, wav)
history.append(["Done", f"OK · ~{final_dur:.1f}s"])
return wav, muxed, f"✅ Completed (~{final_dur:.1f}s)", history
except Exception as e:
history.append(["Error", str(e)])
return None, None, f"❌ {type(e).__name__}: {e}", history
def batch_lite_generate(files: List[str], prompt: str, want_mux: bool):
log = []
if not files:
return "⚠️ Please upload 1–3 videos.", log
if len(files) > 3:
files = files[:3]
log.append(["Info", "Limiting to first 3 videos."])
outputs = []
for i, f in enumerate(files, 1):
try:
log.append([f"Preprocess {i}", Path(f).name])
pre, final_dur = preprocess_video(f)
log.append([f"Run {i}", f"ZeroGPU ~{final_dur:.1f}s"])
wav_list, sr = run_model(pre, prompt or "", sample_nums=1)
if not wav_list:
raise RuntimeError("No audio produced.")
wav = wav_list[0]
muxed = mux_audio_with_video(pre, wav) if want_mux else None
outputs.append((wav, muxed))
log.append([f"Done {i}", "OK"])
except Exception as e:
log.append([f"Error {i}", str(e)])
manifest = OUT_DIR / f"batchlite_{uuid.uuid4().hex[:6]}.json"
manifest.write_text(json.dumps(
[{"wav": w, "video": v} for (w, v) in outputs], ensure_ascii=False, indent=2
))
return f"✅ Batch-lite finished · items: {len(outputs)}", log
# ========= UI (refreshed design) =========
THEME_CSS = f"""
:root {{
--brand: {PRIMARY_COLOR};
--bg: #0f1120;
--panel: #181a2e;
--text: #edf0ff;
--muted: #b7bce3;
--card: #15172a;
}}
.gradio-container {{
font-family: Inter, ui-sans-serif, -apple-system, Segoe UI, Roboto, Cairo, Noto Sans, Arial;
background: var(--bg);
color: var(--text);
}}
#hero {{
background: linear-gradient(135deg, var(--brand) 0%, #2f2e8b 40%, #1b1a3a 100%);
border-radius: 18px;
padding: 18px 20px;
color: white;
box-shadow: 0 10px 30px rgba(0,0,0,.35);
}}
#hero h1 {{
margin: 0 0 6px 0;
font-size: 20px;
font-weight: 700;
letter-spacing: .2px;
}}
#hero p {{
margin: 0;
opacity: .95;
}}
.gr-tabitem, .gr-block.gr-group, .gr-panel {{
background: var(--panel);
border-radius: 16px !important;
box-shadow: 0 6px 18px rgba(0,0,0,.28);
border: 1px solid rgba(255,255,255,.04);
}}
.gr-button {{
border-radius: 12px !important;
border: 1px solid rgba(255,255,255,.08) !important;
}}
.gradio-container .tabs .tab-nav button.selected {{
background: rgba(255,255,255,.06);
border-radius: 12px;
border: 1px solid rgba(255,255,255,.08);
}}
.badge {{
display:inline-block; padding:2px 8px; border-radius:999px;
background: rgba(255,255,255,.12); color:#fff; font-size:12px
}}
"""
with gr.Blocks(css=THEME_CSS, title=APP_TITLE, analytics_enabled=False) as demo:
with gr.Row():
gr.HTML(f"""
<div id="hero">
<h1>{APP_TITLE}</h1>
<p>{APP_TAGLINE}</p>
<div style="margin-top:8px"><span class="badge">ZeroGPU</span> <span class="badge">Auto-trim ≤ {MAX_SECS}s</span> <span class="badge">Downscale {TARGET_H}p</span></div>
</div>
""")
with gr.Tabs():
with gr.Tab("🎬 Single Clip"):
with gr.Group():
project_name = gr.Textbox(
label="Project name (optional)",
placeholder="Enter a short label for this clip"
)
with gr.Row():
v_single = gr.Video(label=f"Video (≤ ~{MAX_SECS}s recommended)")
p_single = gr.Textbox(
label="Sound prompt (optional)",
placeholder="e.g., soft footsteps on wood, light rain, indoor reverb"
)
with gr.Row():
want_mux_single = gr.Checkbox(value=True, label="Mux foley into MP4 output")
run_btn = gr.Button("Generate", variant="primary")
with gr.Row():
out_audio = gr.Audio(label=f"Generated Foley ({SR//1000} kHz WAV)", type="filepath")
out_mux = gr.Video(label="Video + Foley (MP4)", visible=True)
status_md = gr.Markdown()
history_table = gr.Dataframe(
headers=["Step", "Note"], datatype=["str","str"],
interactive=False, wrap=True, label="Activity"
)
run_btn.click(
single_generate,
inputs=[v_single, p_single, want_mux_single, project_name],
outputs=[out_audio, out_mux, status_md, history_table]
)
with gr.Tab("📦 Batch-Lite (1–3 clips)"):
files = gr.Files(label="Upload 1–3 short videos", file_types=[".mp4",".mov"], file_count="multiple")
prompt_b = gr.Textbox(label="Global prompt (optional)")
want_mux_b = gr.Checkbox(value=True, label="Mux each output")
go_b = gr.Button("Run batch-lite")
batch_status = gr.Markdown()
batch_log = gr.Dataframe(
headers=["Step","Note"], datatype=["str","str"],
interactive=False, wrap=True, label="Batch Log"
)
go_b.click(
batch_lite_generate,
inputs=[files, prompt_b, want_mux_b],
outputs=[batch_status, batch_log]
)
with gr.Tab("ℹ️ Tips"):
gr.Markdown(f"""
**Usage guidelines**
- Keep clips short (the tool trims to **≤ {MAX_SECS}s** automatically).
- The video is downscaled to **{TARGET_H}p** to fit the ZeroGPU time window.
- If you see a quota message, try again later (ZeroGPU limits GPU minutes per visitor).
**Outputs**
- WAV is **{SR//1000} kHz** stereo.
- Enable **Mux** to get a ready MP4 with the generated foley track.
""")
# ---- Health endpoint & guarded launch ---------------------------------------
try:
from fastapi import FastAPI
fastapi_app = demo.app # Gradio's FastAPI app
@fastapi_app.get("/health")
def _health():
return {"ok": True, "model_loaded": _model_dict is not None, "device": str(_device)}
except Exception:
pass
try:
demo.queue(max_size=24).launch(server_name="0.0.0.0")
except Exception:
print("\n[BOOT][ERROR] Gradio launch failed:")
traceback.print_exc()
raise