File size: 18,181 Bytes
e7621f8
 
 
22d96d3
 
e7621f8
22d96d3
e7621f8
 
22d96d3
e7621f8
22d96d3
 
e7621f8
 
22d96d3
 
 
 
 
e7621f8
cc49c73
22d96d3
e7621f8
22d96d3
 
 
 
e7621f8
 
 
 
 
 
 
22d96d3
 
e7621f8
22d96d3
e7621f8
 
22d96d3
e7621f8
22d96d3
 
 
 
e7621f8
22d96d3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cc49c73
c2640c7
22d96d3
e7621f8
 
33f355d
22d96d3
e7621f8
 
 
33f355d
22d96d3
 
 
 
 
e7621f8
 
c2640c7
 
e7621f8
cc49c73
22d96d3
 
 
 
 
e7621f8
22d96d3
c2640c7
cc49c73
c2640c7
22d96d3
 
 
0fc14ac
cc49c73
22d96d3
 
 
cc49c73
22d96d3
 
 
cc49c73
4588e7b
22d96d3
 
cc49c73
e7621f8
22d96d3
cc49c73
 
 
22d96d3
cc49c73
f8f20d3
22d96d3
e7621f8
22d96d3
 
 
 
 
 
e7621f8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22d96d3
 
 
 
 
 
 
 
 
c2640c7
22d96d3
e7621f8
c2640c7
22d96d3
e7621f8
22d96d3
 
 
4588e7b
e7621f8
4588e7b
 
cc49c73
22d96d3
cc49c73
22d96d3
cc49c73
22d96d3
 
e7621f8
22d96d3
 
 
 
 
 
e7621f8
22d96d3
e7621f8
cc49c73
 
22d96d3
e7621f8
 
 
22d96d3
e7621f8
22d96d3
 
 
e7621f8
22d96d3
 
e7621f8
c2640c7
e7621f8
22d96d3
e7621f8
22d96d3
 
 
e7621f8
22d96d3
 
 
 
 
 
 
e7621f8
22d96d3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e7621f8
22d96d3
 
 
 
 
 
 
 
 
e7621f8
22d96d3
 
e7621f8
 
 
 
22d96d3
e7621f8
 
 
 
22d96d3
 
 
 
 
 
e7621f8
22d96d3
 
 
e7621f8
22d96d3
 
 
 
 
 
e7621f8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22d96d3
 
 
 
e7621f8
22d96d3
 
 
 
 
 
 
e7621f8
 
c2640c7
e7621f8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22d96d3
e7621f8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22d96d3
e7621f8
 
22d96d3
e7621f8
 
22d96d3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e7621f8
 
 
 
 
 
 
 
 
22d96d3
 
 
 
 
 
 
 
e7621f8
22d96d3
 
e7621f8
22d96d3
 
 
 
e7621f8
22d96d3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
# app.py — ShortiFoley (Video -> Foley)
# Created by bilsimaging.com

import os
import sys
import io
import json
import uuid
import time
import shutil
import base64
import random
import tempfile
import datetime
from pathlib import Path
from typing import List, Optional, Tuple, Dict

import numpy as np
import torch
import torchaudio
import gradio as gr
from loguru import logger
from huggingface_hub import snapshot_download
import spaces  # HF Spaces ZeroGPU & MCP integration

# -------------------------
# Constants & configuration
# -------------------------
ROOT = Path(__file__).parent.resolve()
REPO_DIR = ROOT / "HunyuanVideo-Foley"
WEIGHTS_DIR = Path(os.environ.get("HIFI_FOLEY_MODEL_PATH", str(ROOT / "weights")))
CONFIG_PATH = Path(os.environ.get("HIFI_FOLEY_CONFIG", str(REPO_DIR / "configs" / "hunyuanvideo-foley-xxl.yaml")))
OUTPUTS_DIR = Path(os.environ.get("OUTPUTS_DIR", str(ROOT / "outputs")))
OUTPUTS_DIR.mkdir(parents=True, exist_ok=True)

SPACE_TITLE = "🎵 ShortiFoley — HunyuanVideo-Foley"
SPACE_TAGLINE = "Text/Video → Audio Foley. Created by bilsimaging.com"
WATERMARK_NOTE = "Made with ❤️ by bilsimaging.com"

# Keep GPU <= 120s for ZeroGPU (default 110)
GPU_DURATION = int(os.environ.get("GPU_DURATION_SECS", "110"))

# Globals
_model_dict = None
_cfg = None
_device: Optional[torch.device] = None


# ------------
# Small helpers
# ------------
def _setup_device(pref: str = "auto", gpu_id: int = 0) -> torch.device:
    """Pick CUDA if available, else MPS, else CPU."""
    if pref == "auto":
        if torch.cuda.is_available():
            d = torch.device(f"cuda:{gpu_id}")
        elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
            d = torch.device("mps")
        else:
            d = torch.device("cpu")
    else:
        d = torch.device(pref)
    logger.info(f"Using CUDA {d}" if d.type == "cuda" else f"Using {d}")
    return d


def _ensure_repo() -> None:
    """Shallow-clone Tencent repo with LFS smudge disabled (avoid LFS quota checkout)."""
    if REPO_DIR.exists():
        return
    cmd = (
        "GIT_LFS_SKIP_SMUDGE=1 "
        "git -c filter.lfs.smudge= -c filter.lfs.required=false "
        f"clone --depth 1 https://github.com/Tencent-Hunyuan/HunyuanVideo-Foley.git {REPO_DIR}"
    )
    logger.info(f">> {cmd}")
    os.system(cmd)


def _download_weights_if_needed() -> None:
    """Snapshot only needed files from HF weights/model hub."""
    WEIGHTS_DIR.mkdir(parents=True, exist_ok=True)
    snapshot_download(
        repo_id="tencent/HunyuanVideo-Foley",
        local_dir=str(WEIGHTS_DIR),
        resume_download=True,
        allow_patterns=[
            "hunyuanvideo_foley.pth",
            "synchformer_state_dict.pth",
            "vae_128d_48k.pth",
            "assets/*",
            "config.yaml",  # harmless
        ],
    )


def prepare_once() -> None:
    _ensure_repo()
    _download_weights_if_needed()


# -----------------------
# Model load & inference
# -----------------------
def auto_load_models() -> str:
    """
    Load HunyuanVideo-Foley + encoders on the chosen device.
    """
    global _model_dict, _cfg, _device

    if _model_dict is not None and _cfg is not None:
        return "Model already loaded."

    sys.path.append(str(REPO_DIR))
    from hunyuanvideo_foley.utils.model_utils import load_model

    _device = _setup_device("auto", 0)
    logger.info("Loading HunyuanVideo-Foley model...")
    logger.info(f"MODEL_PATH:  {WEIGHTS_DIR}")
    logger.info(f"CONFIG_PATH: {CONFIG_PATH}")

    try:
        _model_dict, _cfg = load_model(str(WEIGHTS_DIR), str(CONFIG_PATH), _device)
        return "✅ Model loaded."
    except Exception as e:
        logger.error(e)
        return f"❌ Failed to load model: {e}"


def _merge_audio_video(audio_path: str, video_path: str, out_path: str) -> None:
    """Use project's helper (preferred) with a fallback to ffmpeg via subprocess."""
    sys.path.append(str(REPO_DIR))
    try:
        from hunyuanvideo_foley.utils.media_utils import merge_audio_video
        merge_audio_video(audio_path, video_path, out_path)
    except Exception as e:
        # Fallback: plain ffmpeg merge (assumes same duration or lets ffmpeg handle)
        logger.warning(f"merge_audio_video failed, falling back to ffmpeg: {e}")
        import subprocess
        cmd = [
            "ffmpeg", "-y",
            "-i", video_path,
            "-i", audio_path,
            "-c:v", "copy",
            "-c:a", "aac",
            "-shortest",
            out_path
        ]
        subprocess.run(cmd, check=True)


def _save_outputs(video_src: str, audio_tensor: torch.Tensor, sr: int, idx: int,
                  prompt: str) -> str:
    """Save WAV + MP4 in outputs/, add metadata and a small watermark note (metadata only)."""
    # torchaudio expects [C, N]
    if audio_tensor.ndim == 1:
        audio_tensor = audio_tensor.unsqueeze(0)

    tmpdir = Path(tempfile.mkdtemp())
    wav_path = tmpdir / f"gen_{idx}.wav"
    torchaudio.save(str(wav_path), audio_tensor.cpu(), sr)

    ts = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S_%f")
    base = f"shortifoley_{ts}_{idx}"
    out_mp4 = OUTPUTS_DIR / f"{base}.mp4"

    _merge_audio_video(str(wav_path), video_src, str(out_mp4))

    # Save JSON sidecar
    meta = {
        "id": base,
        "created_utc": datetime.datetime.utcnow().isoformat() + "Z",
        "source_video": Path(video_src).name,
        "output_video": Path(out_mp4).name,
        "prompt": prompt or "",
        "watermark": WATERMARK_NOTE,
        "tool": "ShortiFoley (HunyuanVideo-Foley)"
    }
    (OUTPUTS_DIR / f"{base}.json").write_text(json.dumps(meta, ensure_ascii=False, indent=2))

    return str(out_mp4)


def _list_gallery(limit: int = 100) -> List[str]:
    vids = []
    for p in sorted(OUTPUTS_DIR.glob("*.mp4"), key=lambda x: x.stat().st_mtime, reverse=True):
        vids.append(str(p))
        if len(vids) >= limit:
            break
    return vids


# ================
# Inference kernel
# ================
@spaces.GPU(duration=GPU_DURATION)
@torch.inference_mode()
def infer_single_video(
    video_file: str,
    text_prompt: str,
    guidance_scale: float = 4.5,
    num_inference_steps: int = 50,
    sample_nums: int = 1,
) -> Tuple[List[str], str]:
    """
    Generate Foley audio for an uploaded video (1–6 variants).
    Returns: (list of output video paths, status message)
    """
    if _model_dict is None or _cfg is None:
        return [], "❌ Load the model first (open the app once)."

    if not video_file:
        return [], "❌ Please provide a video."

    sys.path.append(str(REPO_DIR))
    from hunyuanvideo_foley.utils.feature_utils import feature_process
    from hunyuanvideo_foley.utils.model_utils import denoise_process

    # preprocess
    visual_feats, text_feats, audio_len_s = feature_process(
        video_file, (text_prompt or "").strip(), _model_dict, _cfg
    )

    # generate batch
    n = int(max(1, min(6, sample_nums)))
    audio, sr = denoise_process(
        visual_feats,
        text_feats,
        audio_len_s,
        _model_dict,
        _cfg,
        guidance_scale=float(guidance_scale),
        num_inference_steps=int(num_inference_steps),
        batch_size=n,
    )

    # save results
    outs = []
    for i in range(n):
        outs.append(_save_outputs(video_file, audio[i], sr, i + 1, text_prompt or ""))

    return outs, f"✅ Generated {len(outs)} result(s). Saved to {OUTPUTS_DIR}/"


# ---------------
# MCP-only APIs
# ---------------
def _download_to_tmp(url: str) -> str:
    """Download a remote file to temp."""
    try:
        import requests
    except Exception:
        raise RuntimeError("Missing dependency 'requests'. Add it to requirements.txt to use URL inputs.")

    r = requests.get(url, timeout=30)
    r.raise_for_status()
    tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
    tmp.write(r.content)
    tmp.flush()
    tmp.close()
    return tmp.name


def _maybe_from_base64(data_url_or_b64: str) -> str:
    """Accept data: URLs or raw base64; returns temp file path."""
    b64 = data_url_or_b64
    if data_url_or_b64.startswith("data:"):
        b64 = data_url_or_b64.split(",", 1)[-1]
    raw = base64.b64decode(b64)
    tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
    tmp.write(raw)
    tmp.flush()
    tmp.close()
    return tmp.name


def _normalize_video_input(video_url_or_b64: str) -> str:
    v = (video_url_or_b64 or "").strip()
    if v.startswith("http://") or v.startswith("https://"):
        return _download_to_tmp(v)
    return _maybe_from_base64(v)


with gr.Blocks() as mcp_only_endpoints:
    gr.Markdown("These endpoints are MCP/API only and have no visible UI.", show_label=False)

    @gr.api
    def api_generate_from_url(
        video_url_or_b64: str,
        text_prompt: str = "",
        guidance_scale: float = 4.5,
        num_inference_steps: int = 50,
        sample_nums: int = 1,
    ) -> Dict[str, List[str]]:
        """
        Generate Foley from a remote video URL or base64-encoded video.
        Returns: {"videos": [paths], "message": str}
        """
        if _model_dict is None or _cfg is None:
            raise RuntimeError("Model not loaded. Open the UI once or call /load_model tool.")
        local = _normalize_video_input(video_url_or_b64)
        outs, msg = infer_single_video(local, text_prompt, guidance_scale, num_inference_steps, sample_nums)
        return {"videos": outs, "message": msg}

    @gr.api
    def load_model_tool() -> str:
        """Ensure model is loaded on server (MCP convenience)."""
        return auto_load_models()

    @gr.mcp.resource("shortifoley://status")
    def shortifoley_status() -> str:
        """Return a simple readiness string for MCP clients."""
        ready = _model_dict is not None and _cfg is not None
        dev = "cuda" if (_device and _device.type == "cuda") else ("mps" if (_device and _device.type == "mps") else "cpu")
        return f"ShortiFoley status: {'ready' if ready else 'loading'} | device={dev} | outputs={OUTPUTS_DIR}"

    @gr.mcp.prompt()
    def foley_prompt(name: str = "default") -> str:
        """Reusable guidance for describing sound ambience."""
        return (
            "Describe the expected environmental sound precisely. Mention material, rhythm, intensity, and ambience.\n"
            "Example: 'Soft leather footfalls on wet pavement with distant traffic hiss; occasional splashes.'"
        )


# -------------
# Gradio UI
# -------------
def _about_html() -> str:
    return f"""
    <div style="line-height:1.6">
      <h2>About ShortiFoley</h2>
      <p><b>ShortiFoley</b> automatically generates realistic Foley soundtracks for short videos using
      Tencent’s HunyuanVideo-Foley with CLAP & SigLIP2 encoders. It includes autosave and an MCP server so
      you can call it from agents or workflows (e.g., n8n).</p>
      <p><b>Created by <a href="https://bilsimaging.com" target="_blank">bilsimaging.com</a></b></p>

      <h3>How to use</h3>
      <ol>
        <li>Upload a video (ideally &lt; 120 seconds).</li>
        <li>Optionally enter a text description of the sound (English).</li>
        <li>Adjust CFG scale, steps, and number of variants.</li>
        <li>Click <b>Generate</b>. Results appear on the right and are stored in the Gallery.</li>
      </ol>

      <h3>Tips</h3>
      <ul>
        <li>Trim clips to the key action (5–30s) for faster, crisper results.</li>
        <li>Include material cues (“wood”, “metal”, “concrete”), action cues (“splash”, “glass shatter”), and ambience (“roomy”, “echoey”).</li>
        <li>Generate multiple variants and pick the most natural.</li>
      </ul>

      <h3>MCP / Automation</h3>
      <p>This app runs as an <b>MCP server</b>. Open the footer “View API → MCP” to copy a ready config. You can also use the REST endpoints listed there. Perfect for n8n integrations.</p>

      <h3>Watermark</h3>
      <p>Each output’s metadata includes: <i>{WATERMARK_NOTE}</i>. If you want a <b>visible video overlay</b>, I can add an ffmpeg overlay step on request.</p>
    </div>
    """


def create_ui() -> gr.Blocks:
    with gr.Blocks(
        title="ShortiFoley — HunyuanVideo-Foley",
        css="""
        .main-header{ text-align:center; padding:1.2rem; border-radius:16px; background:linear-gradient(135deg,#667eea,#764ba2); color:white; }
        .card{ background:white; border:1px solid #e1e5e9; border-radius:16px; padding:1rem; box-shadow:0 8px 32px rgba(0,0,0,.06); }
        .generate-btn button{ font-weight:700; }
        """
    ) as demo:

        gr.HTML(f"<div class='main-header'><h1>{SPACE_TITLE}</h1><p>{SPACE_TAGLINE}</p></div>")

        with gr.Tabs():
            with gr.Tab("Run"):
                with gr.Row():
                    with gr.Column(scale=1, elem_classes=["card"]):
                        gr.Markdown("### 📹 Input")
                        video_input = gr.Video(label="Upload Video", height=300)
                        text_input = gr.Textbox(
                            label="🎯 Audio Description (optional, English)",
                            placeholder="e.g., Rubber soles on wet tile, distant chatter.",
                            lines=3
                        )
                        with gr.Row():
                            guidance_scale = gr.Slider(1.0, 10.0, value=4.5, step=0.1, label="CFG Scale")
                            steps = gr.Slider(10, 100, value=50, step=5, label="Steps")
                            samples = gr.Slider(1, 6, value=1, step=1, label="Variants")
                        generate = gr.Button("🎵 Generate", variant="primary", elem_classes=["generate-btn"])

                    with gr.Column(scale=1, elem_classes=["card"]):
                        gr.Markdown("### 🎥 Result(s)")
                        v1 = gr.Video(label="Sample 1", height=260, visible=True)
                        v2 = gr.Video(label="Sample 2", height=160, visible=False)
                        v3 = gr.Video(label="Sample 3", height=160, visible=False)
                        v4 = gr.Video(label="Sample 4", height=160, visible=False)
                        v5 = gr.Video(label="Sample 5", height=160, visible=False)
                        v6 = gr.Video(label="Sample 6", height=160, visible=False)
                        status = gr.Textbox(label="Status", interactive=False)

                # Generate handler
                def _process_and_update(video_file, text_prompt, cfg, nsteps, nsamples):
                    outs, msg = infer_single_video(video_file, text_prompt, cfg, nsteps, nsamples)
                    vis_updates = []
                    for i in range(6):
                        if i < len(outs):
                            vis_updates.append(gr.update(visible=True, value=outs[i]))
                        else:
                            vis_updates.append(gr.update(visible=False, value=None))
                    gal_items = _list_gallery()
                    return (*vis_updates, msg, gr.update(value=gal_items))

                generate.click(
                    fn=_process_and_update,
                    inputs=[video_input, text_input, guidance_scale, steps, samples],
                    outputs=[v1, v2, v3, v4, v5, v6, status, ],
                    api_name="/infer",
                    api_description="Generate Foley audio for an uploaded video. Returns up to 6 video+audio files."
                )

                # Toggle visibility when # of samples changes
                def _toggle_vis(n):
                    n = int(n)
                    return [
                        gr.update(visible=True),
                        gr.update(visible=n >= 2),
                        gr.update(visible=n >= 3),
                        gr.update(visible=n >= 4),
                        gr.update(visible=n >= 5),
                        gr.update(visible=n >= 6),
                    ]
                samples.change(_toggle_vis, inputs=[samples], outputs=[v1, v2, v3, v4, v5, v6])

            with gr.Tab("📁 Gallery"):
                gr.Markdown("Latest generated videos (autosaved to `outputs/`).")
                gallery = gr.Gallery(
                    value=_list_gallery(),
                    columns=3,
                    preview=True,
                    label="Saved Results"
                )
                refresh = gr.Button("🔄 Refresh Gallery")
                refresh.click(lambda: gr.update(value=_list_gallery()), outputs=[gallery])

            with gr.Tab("ℹ️ About"):
                gr.HTML(_about_html())

        # Also expose gallery update after generate
        generate.click(lambda: gr.update(value=_list_gallery()), outputs=[gallery])

    return demo


def set_seeds(s: int = 1):
    random.seed(s)
    np.random.seed(s)
    torch.manual_seed(s)


# -------------
# App bootstrap
# -------------
if __name__ == "__main__":
    logger.remove()
    logger.add(lambda m: print(m, end=""), level="INFO")
    set_seeds(1)

    logger.info("===== Application Startup =====\n")
    prepare_once()

    # Ensure import paths after repo is present
    sys.path.append(str(REPO_DIR))
    try:
        # Probe key modules early (better error surfacing)
        from hunyuanvideo_foley.utils.model_utils import load_model, denoise_process  # noqa: F401
        from hunyuanvideo_foley.utils.feature_utils import feature_process  # noqa: F401
        from hunyuanvideo_foley.utils.media_utils import merge_audio_video  # noqa: F401
    except Exception as e:
        logger.warning(f"Repo imports not ready yet: {e}")

    msg = auto_load_models()
    if not msg.startswith("✅"):
        logger.error(f"[BOOT][ERROR] auto_load_models() failed:\n{msg}")
    else:
        logger.info(msg)

    ui = create_ui()
    # Mount MCP-only endpoints alongside the UI
    ui.blocks.append(mcp_only_endpoints)

    # Enable MCP server so tools/resources/prompts are discoverable
    ui.launch(
        server_name="0.0.0.0",
        share=False,
        show_error=True,
        mcp_server=True,   # MCP on
    )