ShortiFoley / app.py
Bils's picture
Update app.py
89c8e20 verified
raw
history blame
16.3 kB
import os, sys, json, tempfile, subprocess, shutil, uuid
from pathlib import Path
from typing import Optional, Tuple, List
import gradio as gr
import spaces
from huggingface_hub import snapshot_download
from loguru import logger
import torch, torchaudio
# ========= Paths & Config =========
ROOT = Path(__file__).parent.resolve()
REPO_DIR = ROOT / "HunyuanVideo-Foley"
WEIGHTS_DIR = ROOT / "weights"
CACHE_DIR = ROOT / "cache"
OUT_DIR = ROOT / "outputs"
ASSETS = ROOT / "assets"
ASSETS.mkdir(exist_ok=True)
APP_TITLE = os.environ.get("APP_TITLE", "Foley Studio · ZeroGPU")
APP_TAGLINE = os.environ.get("APP_TAGLINE", "Generate scene-true foley for short clips (ZeroGPU-ready).")
PRIMARY_COLOR = os.environ.get("PRIMARY_COLOR", "#6B5BFF") # UI accent only
# ZeroGPU-safe defaults (tweak in Space Secrets if needed)
MAX_SECS = int(os.environ.get("MAX_SECS", "15")) # keep clips short for ZeroGPU window
TARGET_H = int(os.environ.get("TARGET_H", "480")) # downscale target height
SR = int(os.environ.get("TARGET_SR", "48000")) # WAV sample rate
ZEROGPU_DURATION = int(os.environ.get("ZEROGPU_DURATION", "110")) # must be <= platform limit
def sh(cmd: str):
print(">>", cmd)
subprocess.run(cmd, shell=True, check=True)
def ffprobe_duration(path: str) -> float:
try:
out = subprocess.check_output([
"ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", path
]).decode().strip()
return float(out)
except Exception:
return 0.0
def _clone_without_lfs():
"""
Clone repo while skipping LFS smudge to avoid demo video downloads.
Falls back to sparse checkout with only essential paths.
"""
if REPO_DIR.exists():
return
# Attempt 1: shallow clone with LFS disabled
try:
sh(
"GIT_LFS_SKIP_SMUDGE=1 "
"git -c filter.lfs.smudge= -c filter.lfs.required=false "
f"clone --depth 1 https://github.com/Tencent-Hunyuan/HunyuanVideo-Foley.git {REPO_DIR}"
)
assets = REPO_DIR / "assets"
if assets.exists():
shutil.rmtree(assets, ignore_errors=True)
return
except subprocess.CalledProcessError as e:
print("Shallow clone with LFS skipped failed, trying sparse checkout…", e)
# Attempt 2: sparse checkout minimal files
REPO_DIR.mkdir(parents=True, exist_ok=True)
sh(f"git -C {REPO_DIR} init")
sh(
f"git -C {REPO_DIR} -c filter.lfs.smudge= -c filter.lfs.required=false "
"remote add origin https://github.com/Tencent-Hunyuan/HunyuanVideo-Foley.git"
)
sh(f"git -C {REPO_DIR} config core.sparseCheckout true")
sparse_file = REPO_DIR / ".git" / "info" / "sparse-checkout"
sparse_file.parent.mkdir(parents=True, exist_ok=True)
sparse_file.write_text("\n".join([
"hunyuanvideo_foley/",
"configs/",
"gradio_app.py",
"requirements.txt",
"LICENSE",
"README.md",
]) + "\n")
# Try main, fallback to master
try:
sh(f"git -C {REPO_DIR} fetch --depth 1 origin main")
sh(f"git -C {REPO_DIR} checkout main")
except subprocess.CalledProcessError:
sh(f"git -C {REPO_DIR} fetch --depth 1 origin master")
sh(f"git -C {REPO_DIR} checkout master")
def prepare_once():
"""Clone code (skip LFS), download weights, set env, prepare dirs."""
_clone_without_lfs()
# Ensure we can import their package
if str(REPO_DIR) not in sys.path:
sys.path.insert(0, str(REPO_DIR))
WEIGHTS_DIR.mkdir(parents=True, exist_ok=True)
snapshot_download(
repo_id="tencent/HunyuanVideo-Foley",
local_dir=str(WEIGHTS_DIR),
local_dir_use_symlinks=False,
repo_type="model",
resume_download=True,
)
os.environ["HIFI_FOLEY_MODEL_PATH"] = str(WEIGHTS_DIR)
CACHE_DIR.mkdir(exist_ok=True)
OUT_DIR.mkdir(exist_ok=True)
prepare_once()
# ---- Friendly dependency check (correct package name) ------------------------
try:
import audiotools # provided by the PyPI package 'descript-audiotools'
except Exception as e:
raise RuntimeError(
"Missing module 'audiotools'. Install it via the PyPI package "
"'descript-audiotools' (e.g., add 'descript-audiotools>=0.7.2' "
"to requirements.txt) and restart the Space."
) from e
# Now safe to import their internals
from hunyuanvideo_foley.utils.model_utils import load_model, denoise_process
from hunyuanvideo_foley.utils.feature_utils import feature_process
from hunyuanvideo_foley.utils.media_utils import merge_audio_video
# ========= Native Model Setup =========
MODEL_PATH = os.environ.get("HIFI_FOLEY_MODEL_PATH", str(WEIGHTS_DIR))
CONFIG_PATH = str(REPO_DIR / "configs" / "hunyuanvideo-foley-xxl.yaml")
_model_dict = None
_cfg = None
_device = None
def _setup_device(device_str: str = "auto", gpu_id: int = 0) -> torch.device:
if device_str == "auto":
if torch.cuda.is_available():
d = torch.device(f"cuda:{gpu_id}")
logger.info(f"Using CUDA {d}")
elif torch.backends.mps.is_available():
d = torch.device("mps")
logger.info("Using MPS")
else:
d = torch.device("cpu")
logger.info("Using CPU")
else:
d = torch.device(device_str if device_str != "cuda" else f"cuda:{gpu_id}")
logger.info(f"Using specified device: {d}")
return d
def auto_load_models() -> str:
"""Download weights if needed + load model natively."""
global _model_dict, _cfg, _device
if not os.path.exists(MODEL_PATH):
os.makedirs(MODEL_PATH, exist_ok=True)
if not os.path.exists(CONFIG_PATH):
return f"❌ Config file not found: {CONFIG_PATH}"
_device = _setup_device("auto", 0)
logger.info("Loading HunyuanVideo-Foley model...")
logger.info(f"MODEL_PATH: {MODEL_PATH}")
logger.info(f"CONFIG_PATH: {CONFIG_PATH}")
_model_dict, _cfg = load_model(MODEL_PATH, CONFIG_PATH, _device)
logger.info("✅ Model loaded")
return "✅ Model loaded"
# Init logger and load model once
logger.remove()
logger.add(lambda msg: print(msg, end=''), level="INFO")
logger.info(auto_load_models())
# ========= Preprocessing =========
def preprocess_video(in_path: str) -> Tuple[str, float]:
"""
- Validate/trim to <= MAX_SECS.
- Downscale to TARGET_H (keep AR), strip original audio.
- Return processed mp4 path and final duration.
"""
dur = ffprobe_duration(in_path)
if dur == 0:
raise RuntimeError("Unable to read the video duration.")
temp_dir = Path(tempfile.mkdtemp(prefix="pre_"))
trimmed = temp_dir / "trim.mp4"
processed = temp_dir / "proc.mp4"
trim_args = ["-t", str(MAX_SECS)] if dur > MAX_SECS else []
# Normalize container & remove audio
sh(" ".join([
"ffmpeg", "-y", "-i", f"\"{in_path}\"",
*trim_args,
"-an",
"-vcodec", "libx264", "-preset", "veryfast", "-crf", "23",
"-movflags", "+faststart",
f"\"{trimmed}\""
]))
# Downscale to TARGET_H; ensure mod2 width, baseline profile
vf = f"scale=-2:{TARGET_H}:flags=bicubic"
sh(" ".join([
"ffmpeg", "-y", "-i", f"\"{trimmed}\"",
"-vf", f"\"{vf}\"",
"-an",
"-vcodec", "libx264", "-profile:v", "baseline", "-level", "3.1",
"-pix_fmt", "yuv420p",
"-preset", "veryfast", "-crf", "24",
"-movflags", "+faststart",
f"\"{processed}\""
]))
final_dur = min(dur, float(MAX_SECS))
return str(processed), final_dur
# ========= Inference (ZeroGPU) =========
@spaces.GPU(duration=ZEROGPU_DURATION) # tune via env if needed
@torch.inference_mode()
def run_model(video_path: str, prompt_text: str,
guidance_scale: float = 4.5,
num_inference_steps: int = 50,
sample_nums: int = 1) -> Tuple[List[str], int]:
"""
Native inference (no shell). Returns ([wav_paths], sample_rate).
"""
if _model_dict is None or _cfg is None:
raise RuntimeError("Model not loaded yet.")
text_prompt = (prompt_text or "").strip()
# Extract features
visual_feats, text_feats, audio_len_s = feature_process(
video_path, text_prompt, _model_dict, _cfg
)
# Generate audio (B x C x T)
logger.info(f"Generating {sample_nums} sample(s)...")
audio_batch, sr = denoise_process(
visual_feats, text_feats, audio_len_s, _model_dict, _cfg,
guidance_scale=guidance_scale,
num_inference_steps=num_inference_steps,
batch_size=sample_nums
)
# Save each sample as WAV
out_dir = OUT_DIR / f"job_{uuid.uuid4().hex[:8]}"
out_dir.mkdir(parents=True, exist_ok=True)
wav_paths = []
for i in range(sample_nums):
wav_p = out_dir / f"generated_audio_{i+1}.wav"
torchaudio.save(str(wav_p), audio_batch[i], sr)
wav_paths.append(str(wav_p))
return wav_paths, sr
# ========= Optional: Mux Foley back to video =========
def mux_audio_with_video(video_path: str, audio_path: str) -> str:
out_path = Path(tempfile.mkdtemp(prefix="mux_")) / "with_foley.mp4"
sh(" ".join([
"ffmpeg", "-y",
"-i", f"\"{video_path}\"",
"-i", f"\"{audio_path}\"",
"-map", "0:v:0", "-map", "1:a:0",
"-c:v", "copy", "-c:a", "aac", "-b:a", "192k",
"-shortest",
f"\"{out_path}\""
]))
return str(out_path)
# ========= UI Handlers =========
def single_generate(video: str, prompt: str, want_mux: bool, project_name: str):
history = []
try:
if not video:
return None, None, "⚠️ Please upload a video.", history
history.append(["Preprocess", "Downscaling & trimming"])
pre_path, final_dur = preprocess_video(video)
history.append(["Inference", "ZeroGPU native pipeline"])
wav_list, sr = run_model(
pre_path, prompt or "", guidance_scale=4.5, num_inference_steps=50, sample_nums=1
)
if not wav_list:
raise RuntimeError("No audio produced.")
wav = wav_list[0]
muxed = None
if want_mux:
history.append(["Mux", "Merging foley with video"])
muxed = mux_audio_with_video(pre_path, wav)
history.append(["Done", f"OK · ~{final_dur:.1f}s"])
return wav, muxed, f"✅ Completed (~{final_dur:.1f}s)", history
except Exception as e:
history.append(["Error", str(e)])
return None, None, f"❌ {type(e).__name__}: {e}", history
def batch_lite_generate(files: List[str], prompt: str, want_mux: bool):
log = []
if not files:
return "⚠️ Please upload 1–3 videos.", log
if len(files) > 3:
files = files[:3]
log.append(["Info", "Limiting to first 3 videos."])
outputs = []
for i, f in enumerate(files, 1):
try:
log.append([f"Preprocess {i}", Path(f).name])
pre, final_dur = preprocess_video(f)
log.append([f"Run {i}", f"ZeroGPU ~{final_dur:.1f}s"])
wav_list, sr = run_model(pre, prompt or "", sample_nums=1)
if not wav_list:
raise RuntimeError("No audio produced.")
wav = wav_list[0]
muxed = mux_audio_with_video(pre, wav) if want_mux else None
outputs.append((wav, muxed))
log.append([f"Done {i}", "OK"])
except Exception as e:
log.append([f"Error {i}", str(e)])
manifest = OUT_DIR / f"batchlite_{uuid.uuid4().hex[:6]}.json"
manifest.write_text(json.dumps(
[{"wav": w, "video": v} for (w, v) in outputs], ensure_ascii=False, indent=2
))
return f"✅ Batch-lite finished · items: {len(outputs)}", log
# ========= UI (refreshed design) =========
THEME_CSS = f"""
:root {{
--brand: {PRIMARY_COLOR};
--bg: #0f1120;
--panel: #181a2e;
--text: #edf0ff;
--muted: #b7bce3;
--card: #15172a;
}}
.gradio-container {{
font-family: Inter, ui-sans-serif, -apple-system, Segoe UI, Roboto, Cairo, Noto Sans, Arial;
background: var(--bg);
color: var(--text);
}}
#hero {{
background: linear-gradient(135deg, var(--brand) 0%, #2f2e8b 40%, #1b1a3a 100%);
border-radius: 18px;
padding: 18px 20px;
color: white;
box-shadow: 0 10px 30px rgba(0,0,0,.35);
}}
#hero h1 {{
margin: 0 0 6px 0;
font-size: 20px;
font-weight: 700;
letter-spacing: .2px;
}}
#hero p {{
margin: 0;
opacity: .95;
}}
.gr-tabitem, .gr-block.gr-group, .gr-panel {{
background: var(--panel);
border-radius: 16px !important;
box-shadow: 0 6px 18px rgba(0,0,0,.28);
border: 1px solid rgba(255,255,255,.04);
}}
.gr-button {{
border-radius: 12px !important;
border: 1px solid rgba(255,255,255,.08) !important;
}}
.gradio-container .tabs .tab-nav button.selected {{
background: rgba(255,255,255,.06);
border-radius: 12px;
border: 1px solid rgba(255,255,255,.08);
}}
.badge {{
display:inline-block; padding:2px 8px; border-radius:999px;
background: rgba(255,255,255,.12); color:#fff; font-size:12px
}}
"""
with gr.Blocks(css=THEME_CSS, title=APP_TITLE, analytics_enabled=False) as demo:
with gr.Row():
gr.HTML(f"""
<div id="hero">
<h1>{APP_TITLE}</h1>
<p>{APP_TAGLINE}</p>
<div style="margin-top:8px"><span class="badge">ZeroGPU</span> <span class="badge">Auto-trim ≤ {MAX_SECS}s</span> <span class="badge">Downscale {TARGET_H}p</span></div>
</div>
""")
with gr.Tabs():
with gr.Tab("🎬 Single Clip"):
with gr.Group():
project_name = gr.Textbox(
label="Project name (optional)",
placeholder="Enter a short label for this clip"
)
with gr.Row():
v_single = gr.Video(label=f"Video (≤ ~{MAX_SECS}s recommended)")
p_single = gr.Textbox(
label="Sound prompt (optional)",
placeholder="e.g., soft footsteps on wood, light rain, indoor reverb"
)
with gr.Row():
want_mux_single = gr.Checkbox(value=True, label="Mux foley into MP4 output")
run_btn = gr.Button("Generate", variant="primary")
with gr.Row():
out_audio = gr.Audio(label=f"Generated Foley ({SR//1000} kHz WAV)", type="filepath")
out_mux = gr.Video(label="Video + Foley (MP4)", visible=True)
status_md = gr.Markdown()
history_table = gr.Dataframe(
headers=["Step", "Note"], datatype=["str","str"],
interactive=False, wrap=True, label="Activity"
)
run_btn.click(
single_generate,
inputs=[v_single, p_single, want_mux_single, project_name],
outputs=[out_audio, out_mux, status_md, history_table]
)
with gr.Tab("📦 Batch-Lite (1–3 clips)"):
files = gr.Files(label="Upload 1–3 short videos", file_types=[".mp4",".mov"], file_count="multiple")
prompt_b = gr.Textbox(label="Global prompt (optional)")
want_mux_b = gr.Checkbox(value=True, label="Mux each output")
go_b = gr.Button("Run batch-lite")
batch_status = gr.Markdown()
batch_log = gr.Dataframe(
headers=["Step","Note"], datatype=["str","str"],
interactive=False, wrap=True, label="Batch Log"
)
go_b.click(
batch_lite_generate,
inputs=[files, prompt_b, want_mux_b],
outputs=[batch_status, batch_log]
)
with gr.Tab("ℹ️ Tips"):
gr.Markdown(f"""
**Usage guidelines**
- Keep clips short (the tool trims to **≤ {MAX_SECS}s** automatically).
- The video is downscaled to **{TARGET_H}p** to fit the ZeroGPU time window.
- If you see a quota message, try again later (ZeroGPU limits GPU minutes per visitor).
**Outputs**
- WAV is **{SR//1000} kHz** stereo.
- Enable **Mux** to get a ready MP4 with the generated foley track.
""")
demo.queue(max_size=24).launch()