Bils commited on
Commit
22d96d3
Β·
verified Β·
1 Parent(s): 4588e7b

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +436 -425
app.py CHANGED
@@ -1,463 +1,474 @@
1
- import os, sys, json, tempfile, subprocess, shutil, uuid, glob, traceback, datetime
2
- from pathlib import Path
3
- from typing import Tuple, List
4
-
5
- # ========= Crash trap & env =========
6
- import faulthandler
7
- faulthandler.enable()
8
- os.environ.setdefault("GRADIO_ANALYTICS_ENABLED", "false")
9
- os.environ.setdefault("GRADIO_NUM_PORTS", "1")
10
- os.environ.setdefault("HF_HUB_VERBOSE", "1")
11
- os.environ.setdefault("TRANSFORMERS_VERBOSITY", "info")
12
- os.environ.setdefault("PYTHONUNBUFFERED", "1")
13
-
14
- def _crash_trap(exctype, value, tb):
15
- ts = datetime.datetime.utcnow().isoformat()
16
- print(f"\n===== FATAL ({ts}Z) =====================================")
17
- traceback.print_exception(exctype, value, tb)
18
- print("=========================================================\n", flush=True)
19
- sys.excepthook = _crash_trap
20
-
21
- # ========= Minimal imports for startup =========
22
  import gradio as gr
23
- from spaces import GPU # ensure checker can see decorator
 
 
24
  from loguru import logger
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
25
 
26
- # ---- ZeroGPU marker FIRST (so startup detector finds it) ----
27
- @GPU(duration=5)
28
- def _zgpu_marker(_: int = 0) -> int:
29
- """No-op; only to advertise a GPU-decorated function at import-time."""
30
- return _
31
-
32
- # ========= Paths & Configs =========
33
- ROOT = Path(__file__).parent.resolve()
34
- REPO_DIR = ROOT / "HunyuanVideo-Foley"
35
- WEIGHTS_DIR = ROOT / "weights"
36
- CACHE_DIR = ROOT / "cache"
37
- OUT_DIR = ROOT / "outputs"
38
- ASSETS = ROOT / "assets"
39
- for p in (ASSETS, WEIGHTS_DIR, CACHE_DIR, OUT_DIR):
40
- p.mkdir(parents=True, exist_ok=True)
41
-
42
- APP_TITLE = os.environ.get("APP_TITLE", "Foley Studio Β· ZeroGPU")
43
- APP_TAGLINE = os.environ.get("APP_TAGLINE", "Generate scene-true foley for short clips (ZeroGPU-ready).")
44
- PRIMARY_COLOR = os.environ.get("PRIMARY_COLOR", "#6B5BFF")
45
-
46
- # ZeroGPU-friendly defaults
47
- MAX_SECS = int(os.environ.get("MAX_SECS", "15"))
48
- TARGET_H = int(os.environ.get("TARGET_H", "480"))
49
- SR = int(os.environ.get("TARGET_SR", "48000"))
50
- ZEROGPU_DURATION = int(os.environ.get("ZEROGPU_DURATION", "110"))
51
-
52
- # ========= Light utils (safe at import) =========
53
- def sh(cmd: str):
54
- print(">>", cmd)
55
- subprocess.run(cmd, shell=True, check=True)
56
-
57
- def ffprobe_duration(path: str) -> float:
58
- try:
59
- out = subprocess.check_output([
60
- "ffprobe", "-v", "error", "-show_entries", "format=duration",
61
- "-of", "default=noprint_wrappers=1:nokey=1", path
62
- ]).decode().strip()
63
- return float(out)
64
- except Exception:
65
- return 0.0
66
 
67
- def _clone_without_lfs():
68
- if REPO_DIR.exists():
69
- return
70
- try:
71
- sh(
72
- "GIT_LFS_SKIP_SMUDGE=1 "
73
- "git -c filter.lfs.smudge= -c filter.lfs.required=false "
74
- f"clone --depth 1 https://github.com/Tencent-Hunyuan/HunyuanVideo-Foley.git {REPO_DIR}"
75
- )
76
- assets = REPO_DIR / "assets"
77
- if assets.exists():
78
- shutil.rmtree(assets, ignore_errors=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
79
  return
80
- except subprocess.CalledProcessError as e:
81
- print("Shallow clone with LFS skipped failed, trying sparse checkout…", e)
82
-
83
- REPO_DIR.mkdir(parents=True, exist_ok=True)
84
- sh(f"git -C {REPO_DIR} init")
85
- sh(
86
- f"git -C {REPO_DIR} -c filter.lfs.smudge= -c filter.lfs.required=false "
87
- "remote add origin https://github.com/Tencent-Hunyuan/HunyuanVideo-Foley.git"
88
  )
89
- sh(f"git -C {REPO_DIR} config core.sparseCheckout true")
90
- sparse_file = REPO_DIR / ".git" / "info" / "sparse-checkout"
91
- sparse_file.parent.mkdir(parents=True, exist_ok=True)
92
- sparse_file.write_text("\n".join([
93
- "hunyuanvideo_foley/",
94
- "configs/",
95
- "gradio_app.py",
96
- "requirements.txt",
97
- "LICENSE",
98
- "README.md",
99
- ]) + "\n")
100
- try:
101
- sh(f"git -C {REPO_DIR} fetch --depth 1 origin main")
102
- sh(f"git -C {REPO_DIR} checkout main")
103
- except subprocess.CalledProcessError:
104
- sh(f"git -C {REPO_DIR} fetch --depth 1 origin master")
105
- sh(f"git -C {REPO_DIR} checkout master")
106
-
107
- def prepare_code_and_weights():
108
- from huggingface_hub import snapshot_download
109
- _clone_without_lfs()
110
- if str(REPO_DIR) not in sys.path:
111
- sys.path.insert(0, str(REPO_DIR))
112
  snapshot_download(
113
  repo_id="tencent/HunyuanVideo-Foley",
114
- local_dir=str(WEIGHTS_DIR),
115
- local_dir_use_symlinks=False,
116
- repo_type="model",
117
  resume_download=True,
 
 
 
 
 
 
 
118
  )
119
- os.environ["HIFI_FOLEY_MODEL_PATH"] = str(WEIGHTS_DIR)
120
 
121
- # Do lightweight prep (no model init) at import-time
122
- prepare_code_and_weights()
123
 
124
- # Prefer safetensors & fast transfer for later downloads
125
- os.environ["TRANSFORMERS_PREFER_SAFETENSORS"] = "1"
126
- os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1"
127
 
128
- # ========= Heavy deps & model utilities (deferred import) =========
129
- _model_dict = None
130
- _cfg = None
131
- _device = None
132
-
133
- def _lazy_heavy_imports():
134
- global torch, torchaudio
135
- import torch, torchaudio # noqa
136
- try:
137
- import audiotools # provided by 'descript-audiotools'
138
- except Exception as e:
139
- raise RuntimeError(
140
- "Missing 'audiotools'. Add 'descript-audiotools>=0.7.2' to requirements.txt."
141
- ) from e
142
- try:
143
- import omegaconf # noqa
144
- import yaml # noqa
145
- import easydict # noqa
146
- except Exception as e:
147
- raise RuntimeError(
148
- "Missing config deps. Add: omegaconf>=2.3.0, pyyaml, easydict."
149
- ) from e
150
-
151
- # Tencent internals
152
- from hunyuanvideo_foley.utils.model_utils import load_model, denoise_process # noqa
153
- from hunyuanvideo_foley.utils.feature_utils import feature_process # noqa
154
- from hunyuanvideo_foley.utils.media_utils import merge_audio_video # noqa
155
- return torch, torchaudio
156
-
157
- def _ensure_clap_safetensors_only():
158
- from huggingface_hub import snapshot_download
159
- # Pre-cache only safetensors; block .bin selection
160
- snapshot_download(
161
- repo_id="laion/larger_clap_general",
162
- allow_patterns=[
163
- "*.safetensors", "config.json", "*.json", "*.txt",
164
- "tokenizer*", "*merges*", "*vocab*"
165
- ],
166
- ignore_patterns=["*.bin"],
167
- resume_download=True,
168
- local_dir=None,
169
- local_dir_use_symlinks=False,
170
- )
171
- # Purge any cached .bin for the model
172
- cache_root = Path.home() / ".cache" / "huggingface" / "hub"
173
- for pat in [cache_root / "models--laion--larger_clap_general" / "snapshots" / "*" / "*.bin"]:
174
- for f in glob.glob(str(pat)):
175
- try:
176
- Path(f).unlink()
177
- print(f">> Purged cached bin: {f}")
178
- except Exception:
179
- pass
180
-
181
- def _setup_device(device_str: str = "auto", gpu_id: int = 0):
182
- import torch
183
- if device_str == "auto":
184
- if torch.cuda.is_available():
185
- d = torch.device(f"cuda:{gpu_id}")
186
- logger.info(f"Using CUDA {d}")
187
- elif torch.backends.mps.is_available():
188
- d = torch.device("mps")
189
- logger.info("Using MPS")
190
- else:
191
- d = torch.device("cpu")
192
- logger.info("Using CPU")
193
- else:
194
- d = torch.device(device_str if device_str != "cuda" else f"cuda:{gpu_id}")
195
- logger.info(f"Using specified device: {d}")
196
- return d
197
 
 
 
 
198
  def auto_load_models() -> str:
199
- """Load the full Tencent pipeline (lazy; call when needed)."""
 
 
 
200
  global _model_dict, _cfg, _device
201
- if _model_dict is not None:
202
- return "βœ… Model already loaded"
203
 
204
- # Imports & guards
205
- torch, _ = _lazy_heavy_imports()
206
 
207
- MODEL_PATH = os.environ.get("HIFI_FOLEY_MODEL_PATH", str(WEIGHTS_DIR))
208
- CONFIG_PATH = str(REPO_DIR / "configs" / "hunyuanvideo-foley-xxl.yaml")
209
- if not os.path.exists(CONFIG_PATH):
210
- return f"❌ Config file not found: {CONFIG_PATH}"
211
 
212
  _device = _setup_device("auto", 0)
213
  logger.info("Loading HunyuanVideo-Foley model...")
214
- logger.info(f"MODEL_PATH: {MODEL_PATH}")
215
  logger.info(f"CONFIG_PATH: {CONFIG_PATH}")
216
 
217
- # Force CLAP to safetensors path
218
- _ensure_clap_safetensors_only()
219
- os.environ["HF_HUB_OFFLINE"] = "1"
220
- os.environ["TRANSFORMERS_OFFLINE"] = "1"
221
-
222
- from hunyuanvideo_foley.utils.model_utils import load_model
223
- _model_dict, _cfg = load_model(MODEL_PATH, CONFIG_PATH, _device)
224
- logger.info("βœ… Model loaded")
225
- return "βœ… Model loaded"
226
-
227
- # ========= Pre/Post-processing =========
228
- def preprocess_video(in_path: str) -> Tuple[str, float]:
229
- dur = ffprobe_duration(in_path)
230
- if dur == 0:
231
- raise RuntimeError("Unable to read the video duration.")
232
-
233
- temp_dir = Path(tempfile.mkdtemp(prefix="pre_"))
234
- trimmed = temp_dir / "trim.mp4"
235
- processed = temp_dir / "proc.mp4"
236
- trim_args = ["-t", str(MAX_SECS)] if dur > MAX_SECS else []
237
-
238
- sh(" ".join([
239
- "ffmpeg", "-y", "-i", f"\"{in_path}\"", *trim_args,
240
- "-an", "-vcodec", "libx264", "-preset", "veryfast", "-crf", "23",
241
- "-movflags", "+faststart", f"\"{trimmed}\""
242
- ]))
243
- vf = f"scale=-2:{TARGET_H}:flags=bicubic"
244
- sh(" ".join([
245
- "ffmpeg", "-y", "-i", f"\"{trimmed}\"",
246
- "-vf", f"\"{vf}\"", "-an",
247
- "-vcodec", "libx264", "-profile:v", "baseline", "-level", "3.1",
248
- "-pix_fmt", "yuv420p", "-preset", "veryfast", "-crf", "24",
249
- "-movflags", "+faststart", f"\"{processed}\""
250
- ]))
251
- return str(processed), min(dur, float(MAX_SECS))
252
-
253
- def mux_audio_with_video(video_path: str, audio_path: str) -> str:
254
- out_path = Path(tempfile.mkdtemp(prefix="mux_")) / "with_foley.mp4"
255
- sh(" ".join([
256
- "ffmpeg", "-y", "-i", f"\"{video_path}\"", "-i", f"\"{audio_path}\"",
257
- "-map", "0:v:0", "-map", "1:a:0", "-c:v", "copy", "-c:a", "aac", "-b:a", "192k",
258
- "-shortest", f"\"{out_path}\""
259
- ]))
260
- return str(out_path)
261
-
262
- # ========= Inference (GPU-decorated) =========
263
- @GPU(duration=ZEROGPU_DURATION)
264
- def run_model(video_path: str, prompt_text: str,
265
- guidance_scale: float = 4.5,
266
- num_inference_steps: int = 50,
267
- sample_nums: int = 1):
268
  """
269
- ZeroGPU-safe native pipeline. Returns ([wav_paths], sample_rate).
 
 
 
 
 
 
 
 
270
  """
271
- # Lazy load model the first time this runs
272
- if _model_dict is None:
273
- msg = auto_load_models()
274
- logger.info(msg)
 
275
 
276
- # heavy imports (after model load prepared)
277
- import torchaudio
278
  from hunyuanvideo_foley.utils.feature_utils import feature_process
279
  from hunyuanvideo_foley.utils.model_utils import denoise_process
280
 
281
- text_prompt = (prompt_text or "").strip()
282
-
283
  visual_feats, text_feats, audio_len_s = feature_process(
284
- video_path, text_prompt, _model_dict, _cfg
285
  )
286
- logger.info(f"Generating {sample_nums} sample(s)...")
287
- audio_batch, sr = denoise_process(
288
- visual_feats, text_feats, audio_len_s, _model_dict, _cfg,
289
- guidance_scale=guidance_scale, num_inference_steps=num_inference_steps,
290
- batch_size=sample_nums
 
 
 
 
 
 
 
291
  )
292
 
293
- out_dir = OUT_DIR / f"job_{uuid.uuid4().hex[:8]}"
294
- out_dir.mkdir(parents=True, exist_ok=True)
295
- wav_paths = []
296
  for i in range(sample_nums):
297
- wav_p = out_dir / f"generated_audio_{i+1}.wav"
298
- torchaudio.save(str(wav_p), audio_batch[i], sr)
299
- wav_paths.append(str(wav_p))
300
- return wav_paths, sr
301
-
302
- # ========= UI Handlers =========
303
- def single_generate(video: str, prompt: str, want_mux: bool, project_name: str):
304
- history = []
 
 
305
  try:
306
- if not video:
307
- return None, None, "⚠️ Please upload a video.", history
308
- history.append(["Preprocess", "Downscaling & trimming"])
309
- pre_path, final_dur = preprocess_video(video)
310
-
311
- history.append(["Inference", "ZeroGPU native pipeline"])
312
- wav_list, sr = run_model(pre_path, prompt or "", guidance_scale=4.5, num_inference_steps=50, sample_nums=1)
313
- if not wav_list:
314
- raise RuntimeError("No audio produced.")
315
- wav = wav_list[0]
316
-
317
- muxed = mux_audio_with_video(pre_path, wav) if want_mux else None
318
- history.append(["Done", f"OK Β· ~{final_dur:.1f}s"])
319
- return wav, muxed, f"βœ… Completed (~{final_dur:.1f}s)", history
320
- except Exception as e:
321
- history.append(["Error", str(e)])
322
- return None, None, f"❌ {type(e).__name__}: {e}", history
323
-
324
- def batch_lite_generate(files: List[str], prompt: str, want_mux: bool):
325
- log = []
326
- if not files:
327
- return "⚠️ Please upload 1–3 videos.", log
328
- if len(files) > 3:
329
- files = files[:3]
330
- log.append(["Info", "Limiting to first 3 videos."])
331
-
332
- outputs = []
333
- for i, f in enumerate(files, 1):
334
- try:
335
- log.append([f"Preprocess {i}", Path(f).name])
336
- pre, final_dur = preprocess_video(f)
337
- log.append([f"Run {i}", f"ZeroGPU ~{final_dur:.1f}s"])
338
- wav_list, sr = run_model(pre, prompt or "", sample_nums=1)
339
- if not wav_list:
340
- raise RuntimeError("No audio produced.")
341
- wav = wav_list[0]
342
- muxed = mux_audio_with_video(pre, wav) if want_mux else None
343
- outputs.append((wav, muxed))
344
- log.append([f"Done {i}", "OK"])
345
- except Exception as e:
346
- log.append([f"Error {i}", str(e)])
347
-
348
- manifest = OUT_DIR / f"batchlite_{uuid.uuid4().hex[:6]}.json"
349
- manifest.write_text(json.dumps(
350
- [{"wav": w, "video": v} for (w, v) in outputs], ensure_ascii=False, indent=2
351
- ))
352
- return f"βœ… Batch-lite finished Β· items: {len(outputs)}", log
353
-
354
- # ========= UI (refreshed design) =========
355
- THEME_CSS = f"""
356
- :root {{
357
- --brand: {PRIMARY_COLOR};
358
- --bg: #0f1120;
359
- --panel: #181a2e;
360
- --text: #edf0ff;
361
- --muted: #b7bce3;
362
- --card: #15172a;
363
- }}
364
- .gradio-container {{
365
- font-family: Inter, ui-sans-serif, -apple-system, Segoe UI, Roboto, Cairo, Noto Sans, Arial;
366
- background: var(--bg);
367
- color: var(--text);
368
- }}
369
- #hero {{
370
- background: linear-gradient(135deg, var(--brand) 0%, #2f2e8b 40%, #1b1a3a 100%);
371
- border-radius: 18px;
372
- padding: 18px 20px;
373
- color: white;
374
- box-shadow: 0 10px 30px rgba(0,0,0,.35);
375
- }}
376
- #hero h1 {{ margin: 0 0 6px 0; font-size: 20px; font-weight: 700; letter-spacing: .2px; }}
377
- #hero p {{ margin: 0; opacity: .95; }}
378
- .gr-tabitem, .gr-block.gr-group, .gr-panel {{
379
- background: var(--panel);
380
- border-radius: 16px !important;
381
- box-shadow: 0 6px 18px rgba(0,0,0,.28);
382
- border: 1px solid rgba(255,255,255,.04);
383
- }}
384
- .gr-button {{ border-radius: 12px !important; border: 1px solid rgba(255,255,255,.08) !important; }}
385
- .gradio-container .tabs .tab-nav button.selected {{
386
- background: rgba(255,255,255,.06); border-radius: 12px; border: 1px solid rgba(255,255,255,.08);
387
- }}
388
- .badge {{ display:inline-block; padding:2px 8px; border-radius:999px; background: rgba(255,255,255,.12); color:#fff; font-size:12px }}
389
- """
390
-
391
- with gr.Blocks(css=THEME_CSS, title=APP_TITLE, analytics_enabled=False) as demo:
392
- with gr.Row():
393
- gr.HTML(f"""
394
- <div id="hero">
395
- <h1>{APP_TITLE}</h1>
396
- <p>{APP_TAGLINE}</p>
397
- <div style="margin-top:8px"><span class="badge">ZeroGPU</span> <span class="badge">Auto-trim ≀ {MAX_SECS}s</span> <span class="badge">Downscale {TARGET_H}p</span></div>
398
- </div>
399
- """)
400
-
401
- with gr.Tabs():
402
- with gr.Tab("🎬 Single Clip"):
403
- with gr.Group():
404
- project_name = gr.Textbox(label="Project name (optional)", placeholder="Enter a short label for this clip")
405
- with gr.Row():
406
- v_single = gr.Video(label=f"Video (≀ ~{MAX_SECS}s recommended)")
407
- p_single = gr.Textbox(label="Sound prompt (optional)", placeholder="e.g., soft footsteps on wood, light rain, indoor reverb")
408
- with gr.Row():
409
- want_mux_single = gr.Checkbox(value=True, label="Mux foley into MP4 output")
410
- run_btn = gr.Button("Generate", variant="primary")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
411
  with gr.Row():
412
- out_audio = gr.Audio(label=f"Generated Foley ({SR//1000} kHz WAV)", type="filepath")
413
- out_mux = gr.Video(label="Video + Foley (MP4)", visible=True)
414
- status_md = gr.Markdown()
415
- history_table = gr.Dataframe(headers=["Step", "Note"], datatype=["str","str"], interactive=False, wrap=True, label="Activity")
416
-
417
- run_btn.click(
418
- single_generate,
419
- inputs=[v_single, p_single, want_mux_single, project_name],
420
- outputs=[out_audio, out_mux, status_md, history_table]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
421
  )
422
 
423
- with gr.Tab("πŸ“¦ Batch-Lite (1–3 clips)"):
424
- files = gr.Files(label="Upload 1–3 short videos", file_types=[".mp4",".mov"], file_count="multiple")
425
- prompt_b = gr.Textbox(label="Global prompt (optional)")
426
- want_mux_b = gr.Checkbox(value=True, label="Mux each output")
427
- go_b = gr.Button("Run batch-lite")
428
- batch_status = gr.Markdown()
429
- batch_log = gr.Dataframe(headers=["Step","Note"], datatype=["str","str"], interactive=False, wrap=True, label="Batch Log")
430
-
431
- go_b.click(batch_lite_generate, inputs=[files, prompt_b, want_mux_b], outputs=[batch_status, batch_log])
432
-
433
- with gr.Tab("ℹ️ Tips"):
434
- gr.Markdown(f"""
435
- **Usage guidelines**
436
- - Keep clips short (the tool trims to **≀ {MAX_SECS}s** automatically).
437
- - The video is downscaled to **{TARGET_H}p** to fit the ZeroGPU time window.
438
- - If you see a quota message, try again later (ZeroGPU limits GPU minutes per visitor).
439
-
440
- **Outputs**
441
- - WAV is **{SR//1000} kHz** stereo.
442
- - Enable **Mux** to get a ready MP4 with the generated foley track.
443
- """)
444
-
445
- # Health endpoint
446
- try:
447
- from fastapi import FastAPI
448
- fastapi_app = demo.app
449
- @fastapi_app.get("/health")
450
- def _health():
451
- return {"ok": True, "model_loaded": _model_dict is not None, "device": str(_device) if _device else None}
452
- except Exception:
453
- pass
454
-
455
- # Launch
456
- logger.remove()
457
- logger.add(lambda msg: print(msg, end=''), level="INFO")
458
- try:
459
- demo.queue(max_size=24).launch(server_name="0.0.0.0")
460
- except Exception:
461
- print("\n[BOOT][ERROR] Gradio launch failed:")
462
- traceback.print_exc()
463
- raise
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import io
3
+ import sys
4
+ import json
5
+ import shutil
6
+ import random
7
+ import tempfile
8
+ import base64
9
+ from datetime import datetime
10
+ from typing import List, Optional, Tuple, Dict
11
+
 
 
 
 
 
 
 
 
 
 
12
  import gradio as gr
13
+ import numpy as np
14
+ import torch
15
+ import torchaudio
16
  from loguru import logger
17
+ from huggingface_hub import snapshot_download
18
+
19
+ # --- Tencent repo imports (pulled at startup) ---
20
+ # These are available after we git clone the repo in prepare_once()
21
+ # Do not move these imports above the clone step in __main__.
22
+ # from hunyuanvideo_foley.utils.model_utils import load_model, denoise_process
23
+ # from hunyuanvideo_foley.utils.feature_utils import feature_process
24
+ # from hunyuanvideo_foley.utils.media_utils import merge_audio_video
25
+
26
+ # HF Spaces GPU decorator
27
+ import spaces
28
+
29
+ # -------------------------
30
+ # Constants & configuration
31
+ # -------------------------
32
+ SPACE_TITLE = "🎡 ShortiFoley β€” HunyuanVideo-Foley"
33
+ SPACE_TAGLINE = "Text/Video β†’ Audio Foley. Created by bilsimaging.com"
34
+ GALLERY_DIR = os.environ.get("OUTPUTS_DIR", "outputs")
35
+ WEIGHTS_DIR = os.environ.get("HIFI_FOLEY_MODEL_PATH", "/home/user/app/weights")
36
+ REPO_DIR = "/home/user/app/HunyuanVideo-Foley"
37
+ CONFIG_PATH = os.environ.get(
38
+ "HIFI_FOLEY_CONFIG",
39
+ f"{REPO_DIR}/configs/hunyuanvideo-foley-xxl.yaml"
40
+ )
41
+ # keep <=120s for ZeroGPU
42
+ GPU_DURATION = int(os.environ.get("GPU_DURATION_SECS", "110"))
43
+
44
+ os.makedirs(GALLERY_DIR, exist_ok=True)
45
+ os.makedirs(WEIGHTS_DIR, exist_ok=True)
46
+
47
+ # Globals populated after model load
48
+ _model_dict = None
49
+ _cfg = None
50
+ _device: Optional[torch.device] = None
51
+
52
+ # ------------
53
+ # Small helpers
54
+ # ------------
55
+ def _setup_device(pref: str = "auto", gpu_id: int = 0) -> torch.device:
56
+ """Pick CUDA if available, else MPS, else CPU."""
57
+ if pref == "auto":
58
+ if torch.cuda.is_available():
59
+ d = torch.device(f"cuda:{gpu_id}")
60
+ elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
61
+ d = torch.device("mps")
62
+ else:
63
+ d = torch.device("cpu")
64
+ else:
65
+ d = torch.device(pref)
66
+ logger.info(f"Using CUDA {d}" if d.type == "cuda" else f"Using {d}")
67
+ return d
68
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69
 
70
+ def _save_video_result(video_file: str, audio_tensor: torch.Tensor, sr: int, idx: int) -> str:
71
+ """Save audio to wav, merge with original video, and save mp4 into gallery."""
72
+ from hunyuanvideo_foley.utils.media_utils import merge_audio_video
73
+
74
+ temp_dir = tempfile.mkdtemp()
75
+ audio_path = os.path.join(temp_dir, f"gen_{idx}.wav")
76
+
77
+ # torchaudio expects shape [channels, samples]
78
+ if audio_tensor.ndim == 1:
79
+ audio_tensor = audio_tensor.unsqueeze(0)
80
+ torchaudio.save(audio_path, audio_tensor.cpu(), sr)
81
+
82
+ timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S_%f")
83
+ out_name = f"shortifoley_{timestamp}_{idx}.mp4"
84
+ out_path = os.path.join(GALLERY_DIR, out_name)
85
+ merge_audio_video(audio_path, video_file, out_path)
86
+ return out_path
87
+
88
+
89
+ def _list_gallery(limit: int = 100) -> List[str]:
90
+ files = []
91
+ for fn in sorted(os.listdir(GALLERY_DIR), reverse=True):
92
+ if fn.lower().endswith((".mp4", ".webm", ".mov", ".mkv")):
93
+ files.append(os.path.join(GALLERY_DIR, fn))
94
+ if len(files) >= limit:
95
+ break
96
+ return files
97
+
98
+
99
+ def _ensure_repo() -> None:
100
+ """Shallow clone the Tencent repo with LFS smudge disabled to avoid quota issues."""
101
+ if os.path.exists(REPO_DIR) and os.path.isdir(REPO_DIR):
102
  return
103
+ cmd = (
104
+ f"GIT_LFS_SKIP_SMUDGE=1 git -c filter.lfs.smudge= "
105
+ f"-c filter.lfs.required=false clone --depth 1 "
106
+ f"https://github.com/Tencent-Hunyuan/HunyuanVideo-Foley.git {REPO_DIR}"
 
 
 
 
107
  )
108
+ logger.info(f">> {cmd}")
109
+ os.system(cmd)
110
+
111
+
112
+ def _download_weights_if_needed() -> None:
113
+ """Pull big .pth files (and small assets) from HF model repo snapshot."""
114
+ # The official weights are hosted on the HF model page, so we snapshot into WEIGHTS_DIR
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
115
  snapshot_download(
116
  repo_id="tencent/HunyuanVideo-Foley",
117
+ local_dir=WEIGHTS_DIR,
 
 
118
  resume_download=True,
119
+ allow_patterns=[
120
+ "hunyuanvideo_foley.pth",
121
+ "synchformer_state_dict.pth",
122
+ "vae_128d_48k.pth",
123
+ "assets/*",
124
+ "config.yaml", # not used directly here, but harmless
125
+ ],
126
  )
 
127
 
 
 
128
 
129
+ def prepare_once() -> None:
130
+ _ensure_repo()
131
+ _download_weights_if_needed()
132
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
133
 
134
+ # -----------------------
135
+ # Model load & inference
136
+ # -----------------------
137
  def auto_load_models() -> str:
138
+ """
139
+ Load HunyuanVideo-Foley + encoders on the chosen device.
140
+ Uses safetensors where possible; falls back to HF/torch internal loaders.
141
+ """
142
  global _model_dict, _cfg, _device
 
 
143
 
144
+ if _model_dict is not None and _cfg is not None:
145
+ return "Model already loaded."
146
 
147
+ # Late imports (repo becomes available after clone).
148
+ sys.path.append(REPO_DIR)
149
+ from hunyuanvideo_foley.utils.model_utils import load_model
 
150
 
151
  _device = _setup_device("auto", 0)
152
  logger.info("Loading HunyuanVideo-Foley model...")
153
+ logger.info(f"MODEL_PATH: {WEIGHTS_DIR}")
154
  logger.info(f"CONFIG_PATH: {CONFIG_PATH}")
155
 
156
+ try:
157
+ _model_dict, _cfg = load_model(WEIGHTS_DIR, CONFIG_PATH, _device)
158
+ return "βœ… Model loaded."
159
+ except Exception as e:
160
+ logger.error(e)
161
+ return f"❌ Failed to load model: {e}"
162
+
163
+
164
+ @spaces.GPU(duration=GPU_DURATION)
165
+ @torch.inference_mode()
166
+ def infer_single_video(
167
+ video_file: str,
168
+ text_prompt: str,
169
+ guidance_scale: float = 4.5,
170
+ num_inference_steps: int = 50,
171
+ sample_nums: int = 1,
172
+ ) -> Tuple[List[str], str]:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
173
  """
174
+ Generate Foley audio for an uploaded video (1–6 variants).
175
+ Args:
176
+ video_file: Path to a local video file on the Space.
177
+ text_prompt: Optional text prompt to steer the audio.
178
+ guidance_scale: CFG scale.
179
+ num_inference_steps: Denoising steps.
180
+ sample_nums: Number of audio variants to produce (1–6).
181
+ Returns:
182
+ (video_paths, status_message)
183
  """
184
+ if _model_dict is None or _cfg is None:
185
+ return [], "❌ Load the model first."
186
+
187
+ if not video_file:
188
+ return [], "❌ Please provide a video."
189
 
190
+ sys.path.append(REPO_DIR)
 
191
  from hunyuanvideo_foley.utils.feature_utils import feature_process
192
  from hunyuanvideo_foley.utils.model_utils import denoise_process
193
 
194
+ # preprocess
 
195
  visual_feats, text_feats, audio_len_s = feature_process(
196
+ video_file, (text_prompt or "").strip(), _model_dict, _cfg
197
  )
198
+
199
+ # generate batch
200
+ sample_nums = int(max(1, min(6, sample_nums)))
201
+ audio, sr = denoise_process(
202
+ visual_feats,
203
+ text_feats,
204
+ audio_len_s,
205
+ _model_dict,
206
+ _cfg,
207
+ guidance_scale=guidance_scale,
208
+ num_inference_steps=int(num_inference_steps),
209
+ batch_size=sample_nums,
210
  )
211
 
212
+ # save results
213
+ out_videos = []
 
214
  for i in range(sample_nums):
215
+ out_videos.append(_save_video_result(video_file, audio[i], sr, i + 1))
216
+
217
+ return out_videos, f"βœ… Generated {len(out_videos)} result(s). Saved to {GALLERY_DIR}/"
218
+
219
+
220
+ # ---------------
221
+ # MCP-only API(s)
222
+ # ---------------
223
+ def _download_to_tmp(url: str) -> str:
224
+ """Download a remote file to a temp path. Lightweight helper for MCP."""
225
  try:
226
+ import requests # optional dependency
227
+ except Exception:
228
+ raise RuntimeError("The server is missing 'requests'. Add it to requirements.txt to use URL inputs.")
229
+
230
+ r = requests.get(url, timeout=30)
231
+ r.raise_for_status()
232
+ suffix = ".mp4"
233
+ tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
234
+ tmp.write(r.content)
235
+ tmp.flush()
236
+ tmp.close()
237
+ return tmp.name
238
+
239
+
240
+ def _maybe_from_base64(data_url_or_b64: str) -> str:
241
+ """Accept data: URLs or raw base64 for MCP convenience; returns temp file path."""
242
+ b64 = data_url_or_b64
243
+ if data_url_or_b64.startswith("data:"):
244
+ # data:video/mp4;base64,XXXX
245
+ b64 = data_url_or_b64.split(",", 1)[-1]
246
+ raw = base64.b64decode(b64)
247
+ tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
248
+ tmp.write(raw)
249
+ tmp.flush()
250
+ tmp.close()
251
+ return tmp.name
252
+
253
+
254
+ def _normalize_video_input(video_url_or_b64: str) -> str:
255
+ """Return a local filename from url or base64. Raises on error."""
256
+ v = (video_url_or_b64 or "").strip()
257
+ if v.startswith("http://") or v.startswith("https://"):
258
+ return _download_to_tmp(v)
259
+ # assume base64
260
+ return _maybe_from_base64(v)
261
+
262
+
263
+ def _api_generate_from_local(
264
+ local_video_path: str,
265
+ text_prompt: str = "",
266
+ guidance_scale: float = 4.5,
267
+ num_inference_steps: int = 50,
268
+ sample_nums: int = 1,
269
+ ) -> Dict[str, List[str]]:
270
+ outs, msg = infer_single_video(
271
+ video_file=local_video_path,
272
+ text_prompt=text_prompt or "",
273
+ guidance_scale=float(guidance_scale),
274
+ num_inference_steps=int(num_inference_steps),
275
+ sample_nums=int(sample_nums),
276
+ )
277
+ return {"videos": outs, "message": msg}
278
+
279
+
280
+ # Expose a **pure API** endpoint that becomes an MCP tool but does not show a UI.
281
+ with gr.Blocks() as mcp_only_endpoints:
282
+ gr.Markdown("These endpoints are MCP/API only and have no visible UI.", show_label=False)
283
+
284
+ @gr.api # becomes an MCP tool and a REST API endpoint automatically
285
+ def api_generate_from_url(
286
+ video_url_or_b64: str,
287
+ text_prompt: str = "",
288
+ guidance_scale: float = 4.5,
289
+ num_inference_steps: int = 50,
290
+ sample_nums: int = 1,
291
+ ) -> Dict[str, List[str]]:
292
+ """
293
+ Generate Foley from a remote video URL or base64-encoded video.
294
+ Args:
295
+ video_url_or_b64: http(s) URL or data/base64 string of a short video (mp4).
296
+ text_prompt: Optional audio description (English).
297
+ guidance_scale: CFG scale (1.0–10.0).
298
+ num_inference_steps: Denoising steps (10–100).
299
+ sample_nums: Number of variants to return (1–6).
300
+ Returns:
301
+ dict with { "videos": [paths], "message": str }
302
+ """
303
+ if _model_dict is None or _cfg is None:
304
+ raise RuntimeError("Model not loaded. Call /load_model tool or use the UI once.")
305
+
306
+ local_path = _normalize_video_input(video_url_or_b64)
307
+ return _api_generate_from_local(local_path, text_prompt, guidance_scale, num_inference_steps, sample_nums)
308
+
309
+ # Tiny status resource & prompt to help MCP clients
310
+ @gr.mcp.resource("shortifoley://status")
311
+ def shortifoley_status() -> str:
312
+ """Return a simple readiness string for MCP clients."""
313
+ ready = _model_dict is not None and _cfg is not None
314
+ dev = "cuda" if (_device and _device.type == "cuda") else ("mps" if (_device and _device.type == "mps") else "cpu")
315
+ return f"ShortiFoley status: {'ready' if ready else 'loading'} | device={dev} | outputs={GALLERY_DIR}"
316
+
317
+ @gr.mcp.prompt()
318
+ def foley_prompt(name: str = "default") -> str:
319
+ """A reusable prompt template for generating Foley."""
320
+ return (
321
+ "Describe the expected environmental sound precisely. Mention material, rhythm, intensity, and ambience.\n"
322
+ "Example: 'Soft leather footfalls on wet pavement with distant traffic hiss; occasional splashes.'"
323
+ )
324
+
325
+
326
+ # -----------------
327
+ # Gradio UI (Blocks)
328
+ # -----------------
329
+ def create_ui() -> gr.Blocks:
330
+ with gr.Blocks(
331
+ title="ShortiFoley β€” HunyuanVideo-Foley",
332
+ css="""
333
+ .main-header{ text-align:center; padding:1.5rem; border-radius:16px; background:linear-gradient(135deg,#667eea,#764ba2); color:white; }
334
+ .card{ background:white; border:1px solid #e1e5e9; border-radius:16px; padding:1rem; box-shadow:0 8px 32px rgba(0,0,0,.06); }
335
+ .generate-btn button{ font-weight:700; }
336
+ """
337
+ ) as demo:
338
+
339
+ gr.HTML(f"<div class='main-header'><h1>{SPACE_TITLE}</h1><p>{SPACE_TAGLINE}</p></div>")
340
+
341
+ with gr.Row():
342
+ with gr.Column(scale=1, elem_classes=["card"]):
343
+ gr.Markdown("### πŸ“Ή Input")
344
+ video_input = gr.Video(label="Upload Video", height=300)
345
+ text_input = gr.Textbox(
346
+ label="🎯 Audio Description (optional, English)",
347
+ placeholder="e.g., Quick rubber-soled footsteps on tile; echoey hallway."
348
+ )
349
  with gr.Row():
350
+ guidance_scale = gr.Slider(1.0, 10.0, value=4.5, step=0.1, label="CFG Scale")
351
+ steps = gr.Slider(10, 100, value=50, step=5, label="Steps")
352
+ samples = gr.Slider(1, 6, value=1, step=1, label="Variants")
353
+
354
+ generate = gr.Button("🎡 Generate Audio", variant="primary", elem_classes=["generate-btn"])
355
+
356
+ with gr.Column(scale=1, elem_classes=["card"]):
357
+ gr.Markdown("### πŸŽ₯ Result(s)")
358
+ v1 = gr.Video(label="Sample 1", height=260, visible=True)
359
+ v2 = gr.Video(label="Sample 2", height=160, visible=False)
360
+ v3 = gr.Video(label="Sample 3", height=160, visible=False)
361
+ v4 = gr.Video(label="Sample 4", height=160, visible=False)
362
+ v5 = gr.Video(label="Sample 5", height=160, visible=False)
363
+ v6 = gr.Video(label="Sample 6", height=160, visible=False)
364
+ status = gr.Textbox(label="Status", interactive=False)
365
+
366
+ with gr.Tab("πŸ“ Gallery"):
367
+ gr.Markdown("Latest generated videos (autosaved to `outputs/`).")
368
+ gallery = gr.Gallery(
369
+ value=_list_gallery(),
370
+ columns=3,
371
+ preview=True,
372
+ label="Saved Results"
373
+ )
374
+ refresh = gr.Button("πŸ”„ Refresh Gallery")
375
+
376
+ # Event handlers
377
+ def _process(
378
+ video_file, text_prompt, cfg, nsteps, nsamples
379
+ ) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], str]:
380
+ outs, msg = infer_single_video(video_file, text_prompt, cfg, nsteps, nsamples)
381
+ # set visibilities based on how many were generated
382
+ vis = [gr.update(visible=i < len(outs), value=(outs[i] if i < len(outs) else None)) for i in range(6)]
383
+ # update gallery (prepend newest)
384
+ return (
385
+ *[v.value if isinstance(v, gr.Video) else None for v in []], # filler not used; kept for clarity
386
  )
387
 
388
+ def _process_and_update(video_file, text_prompt, cfg, nsteps, nsamples):
389
+ outs, msg = infer_single_video(video_file, text_prompt, cfg, nsteps, nsamples)
390
+ updates = []
391
+ # six video slots
392
+ for i in range(6):
393
+ if i < len(outs):
394
+ updates.append(gr.update(visible=True, value=outs[i]))
395
+ else:
396
+ updates.append(gr.update(visible=False, value=None))
397
+ # status
398
+ updates.append(msg)
399
+ # refresh gallery implicitly
400
+ gallery_items = _list_gallery()
401
+ return (*updates, gr.update(value=gallery_items))
402
+
403
+ generate.click(
404
+ fn=_process_and_update,
405
+ inputs=[video_input, text_input, guidance_scale, steps, samples],
406
+ outputs=[v1, v2, v3, v4, v5, v6, status, gallery],
407
+ api_name="/infer",
408
+ api_description="Generate Foley audio for an uploaded video. Returns up to 6 video+audio files."
409
+ )
410
+
411
+ # Visibility toggling from samples slider
412
+ def _toggle_vis(n):
413
+ n = int(n)
414
+ return [
415
+ gr.update(visible=True),
416
+ gr.update(visible=n >= 2),
417
+ gr.update(visible=n >= 3),
418
+ gr.update(visible=n >= 4),
419
+ gr.update(visible=n >= 5),
420
+ gr.update(visible=n >= 6),
421
+ ]
422
+
423
+ samples.change(_toggle_vis, inputs=[samples], outputs=[v1, v2, v3, v4, v5, v6])
424
+
425
+ refresh.click(lambda: gr.update(value=_list_gallery()), outputs=[gallery])
426
+
427
+ return demo
428
+
429
+
430
+ def set_seeds(s: int = 1):
431
+ random.seed(s)
432
+ np.random.seed(s)
433
+ torch.manual_seed(s)
434
+
435
+
436
+ # -------------
437
+ # App bootstrap
438
+ # -------------
439
+ if __name__ == "__main__":
440
+ # clean logger -> print to stdout
441
+ logger.remove()
442
+ logger.add(lambda m: print(m, end=""), level="INFO")
443
+
444
+ set_seeds(1)
445
+
446
+ logger.info("===== Application Startup =====\n")
447
+ prepare_once()
448
+
449
+ # Late import after repo present
450
+ sys.path.append(REPO_DIR)
451
+ from hunyuanvideo_foley.utils.model_utils import load_model, denoise_process # noqa: F401
452
+ from hunyuanvideo_foley.utils.feature_utils import feature_process # noqa: F401
453
+ from hunyuanvideo_foley.utils.media_utils import merge_audio_video # noqa: F401
454
+
455
+ msg = auto_load_models()
456
+ if not msg.startswith("βœ…"):
457
+ logger.error(f"[BOOT][ERROR] auto_load_models() failed:\n{msg}")
458
+ else:
459
+ logger.info(msg)
460
+
461
+ ui = create_ui()
462
+
463
+ # Mount MCP-only endpoints alongside the UI (optional but handy)
464
+ ui.blocks.append(mcp_only_endpoints)
465
+
466
+ # IMPORTANT: enable MCP server (tools/resources/prompts). This is all you need.
467
+ # See: https://www.gradio.app/guides/building-mcp-server-with-gradio
468
+ ui.launch(
469
+ server_name="0.0.0.0",
470
+ share=False,
471
+ show_error=True,
472
+ mcp_server=True, # <β€” MCP enabled
473
+ # ssr_mode=True (default in 5.x)
474
+ )