File size: 10,786 Bytes
c2640c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
import os, json, tempfile, subprocess, shutil, time, uuid
from pathlib import Path
from typing import Optional, Tuple, List

import gradio as gr
import spaces
from huggingface_hub import snapshot_download

# ========= Paths & Repo =========
ROOT = Path(__file__).parent.resolve()
REPO_DIR = ROOT / "HunyuanVideo-Foley"
WEIGHTS_DIR = ROOT / "weights"
CACHE_DIR = ROOT / "cache"
OUT_DIR = ROOT / "outputs"
ASSETS = ROOT / "assets"
ASSETS.mkdir(exist_ok=True)

BILS_BRAND = os.environ.get("BILS_BRAND", "Bilsimaging · Foley Studio")
PRIMARY_COLOR = os.environ.get("PRIMARY_COLOR", "#6B5BFF")  # purple-ish

MAX_SECS = int(os.environ.get("MAX_SECS", "22"))  # ZeroGPU-friendly
TARGET_H = int(os.environ.get("TARGET_H", "480"))  # downscale target height
SR = int(os.environ.get("TARGET_SR", "48000"))     # target audio sample rate

def sh(cmd: str):
    print(">>", cmd)
    subprocess.run(cmd, shell=True, check=True)

def ffprobe_duration(path: str) -> float:
    try:
        out = subprocess.check_output([
            "ffprobe", "-v", "error", "-show_entries", "format=duration",
            "-of", "default=noprint_wrappers=1:nokey=1", path
        ]).decode().strip()
        return float(out)
    except Exception:
        return 0.0

def prepare_once():
    """Clone repo + download weights on cold start."""
    REPO_DIR.exists() or sh("git clone https://github.com/Tencent-Hunyuan/HunyuanVideo-Foley.git")
    WEIGHTS_DIR.mkdir(parents=True, exist_ok=True)
    snapshot_download(
        repo_id="tencent/HunyuanVideo-Foley",
        local_dir=str(WEIGHTS_DIR),
        local_dir_use_symlinks=False,
        repo_type="model",
    )
    os.environ["HIFI_FOLEY_MODEL_PATH"] = str(WEIGHTS_DIR)
    CACHE_DIR.mkdir(exist_ok=True)
    OUT_DIR.mkdir(exist_ok=True)

prepare_once()

# ========= Preprocessing =========
def preprocess_video(in_path: str) -> Tuple[str, float]:
    """
    - Validates duration (<= MAX_SECS). If longer, auto-trims to MAX_SECS.
    - Downscales to TARGET_H height (keeping AR), H.264 baseline, AAC passthrough.
    - Returns path to processed mp4 and final duration.
    """
    dur = ffprobe_duration(in_path)
    temp_dir = Path(tempfile.mkdtemp(prefix="pre_"))
    trimmed = temp_dir / "trim.mp4"
    processed = temp_dir / "proc.mp4"

    # If longer than budget, trim to MAX_SECS (from start).
    if dur == 0:
        raise RuntimeError("Unable to read the video duration.")
    trim_filter = []
    if dur > MAX_SECS:
        trim_filter = ["-t", str(MAX_SECS)]

    # First, ensure we have a small, uniform container (mp4)
    sh(" ".join([
        "ffmpeg", "-y", "-i", f"\"{in_path}\"",
        *trim_filter,
        "-an",                               # remove original audio (we're generating new foley)
        "-vcodec", "libx264", "-preset", "veryfast", "-crf", "23",
        "-movflags", "+faststart",
        f"\"{trimmed}\""
    ]))

    # Downscale to TARGET_H keeping AR; re-encode efficiently
    # Use mod2 dimensions for compatibility
    vf = f"scale=-2:{TARGET_H}:flags=bicubic"
    sh(" ".join([
        "ffmpeg", "-y", "-i", f"\"{trimmed}\"",
        "-vf", f"\"{vf}\"",
        "-an",
        "-vcodec", "libx264", "-profile:v", "baseline", "-level", "3.1",
        "-pix_fmt", "yuv420p",
        "-preset", "veryfast", "-crf", "24",
        "-movflags", "+faststart",
        f"\"{processed}\""
    ]))

    final_dur = min(dur, float(MAX_SECS))
    return str(processed), final_dur

# ========= Inference (ZeroGPU) =========
@spaces.GPU(duration=240)  # ~4 minutes per call window
def run_model(video_path: str, prompt_text: str) -> str:
    """
    Run Tencent's infer.py on ZeroGPU. Returns path to WAV.
    """
    job_id = uuid.uuid4().hex[:8]
    work_out = OUT_DIR / f"job_{job_id}"
    work_out.mkdir(parents=True, exist_ok=True)

    cmd = [
        "python", f"{REPO_DIR}/infer.py",
        "--model_path", str(WEIGHTS_DIR),
        "--config_path", f"{REPO_DIR}/configs/hunyuanvideo-foley-xxl.yaml",
        "--single_video", video_path,
        "--single_prompt", json.dumps(prompt_text or ""),
        "--output_dir", str(work_out),
        "--device", "cuda"
    ]
    sh(" ".join(cmd))

    # Find produced wav
    wav = None
    for p in work_out.rglob("*.wav"):
        wav = p
        break
    if not wav:
        raise RuntimeError("No audio produced by the model.")

    # Normalize / resample to SR (safeguard)
    fixed = work_out / "foley_48k.wav"
    sh(" ".join([
        "ffmpeg", "-y", "-i", f"\"{str(wav)}\"",
        "-ar", str(SR), "-ac", "2",
        f"\"{str(fixed)}\""
    ]))
    return str(fixed)

# ========= Post: optional mux back to the video =========
def mux_audio_with_video(video_path: str, audio_path: str) -> str:
    out_path = Path(tempfile.mkdtemp(prefix="mux_")) / "with_foley.mp4"
    # Copy video, add foley audio as AAC
    sh(" ".join([
        "ffmpeg", "-y",
        "-i", f"\"{video_path}\"",
        "-i", f"\"{audio_path}\"",
        "-map", "0:v:0", "-map", "1:a:0",
        "-c:v", "copy", "-c:a", "aac", "-b:a", "192k",
        "-shortest",
        f"\"{out_path}\""
    ]))
    return str(out_path)

# ========= Gradio UI Logic =========
def single_generate(video: str, prompt: str, want_mux: bool, project_name: str) -> Tuple[Optional[str], Optional[str], str, list]:
    """
    Returns: (wav_path, muxed_video_path_or_None, status_markdown, history_list)
    """
    history = []
    try:
        if not video:
            return None, None, "⚠️ Please upload a video.", history
        # Preprocess
        history.append(["Preprocess", "Downscaling / trimming…"])
        pre_path, final_dur = preprocess_video(video)
        # Run model (ZeroGPU)
        history.append(["Inference", "Generating foley on GPU…"])
        wav = run_model(pre_path, prompt or "")
        # Optional Mux
        muxed = None
        if want_mux:
            history.append(["Mux", "Combining foley with video…"])
            muxed = mux_audio_with_video(pre_path, wav)
        history.append(["Done", f"OK · Duration ~{final_dur:.1f}s"])
        return wav, muxed, f"✅ Finished (≈ {final_dur:.1f}s)", history
    except Exception as e:
        history.append(["Error", str(e)])
        return None, None, f"❌ {type(e).__name__}: {e}", history

def batch_lite_generate(files: List[str], prompt: str, want_mux: bool) -> Tuple[str, list]:
    """
    Run a tiny queue sequentially; ZeroGPU handles each call in series.
    We enforce 3 items max to stay quota-friendly.
    """
    log = []
    if not files:
        return "⚠️ Please upload 1–3 videos.", log
    if len(files) > 3:
        files = files[:3]
        log.append(["Info", "Limiting to first 3 videos."])

    outputs = []
    for i, f in enumerate(files, 1):
        try:
            log.append([f"Preprocess {i}", Path(f).name])
            pre, final_dur = preprocess_video(f)
            log.append([f"Run {i}", f"GPU infer ~{final_dur:.1f}s"])
            wav = run_model(pre, prompt or "")
            muxed = mux_audio_with_video(pre, wav) if want_mux else None
            outputs.append((wav, muxed))
            log.append([f"Done {i}", "OK"])
        except Exception as e:
            log.append([f"Error {i}", str(e)])
    # Write a small manifest to outputs
    manifest = OUT_DIR / f"batchlite_{uuid.uuid4().hex[:6]}.json"
    manifest.write_text(json.dumps(
        [{"wav": w, "video": v} for (w, v) in outputs], ensure_ascii=False, indent=2
    ))
    return f"✅ Batch-lite finished · items: {len(outputs)}", log

# ========= UI =========
THEME_CSS = f"""
:root {{
  --brand: {PRIMARY_COLOR};
}}
.gradio-container {{
  font-family: ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, Cairo, Noto Sans, Arial, "Apple Color Emoji", "Segoe UI Emoji";
}}
#brandbar {{
  background: linear-gradient(90deg, var(--brand), #222);
  color: white; padding: 12px 16px; border-radius: 12px;
}}
#brandbar strong {{ letter-spacing: .3px; }}
footer, #footer {{}}
"""

with gr.Blocks(
    css=THEME_CSS,
    title="Foley Studio · ZeroGPU"
) as demo:
    with gr.Row():
        gr.HTML(f'<div id="brandbar"><strong>{BILS_BRAND}</strong> — HunyuanVideo-Foley on ZeroGPU</div>')

    with gr.Tabs():
        with gr.Tab("🎬 Single Clip"):
            with gr.Group():
                project_name = gr.Textbox(label="Project name (optional)", placeholder="e.g., JawharaFM Teaser 09-2025")
                with gr.Row():
                    v_single = gr.Video(label="Video (≤ ~20s recommended)")
                    p_single = gr.Textbox(label="Sound prompt (optional)", placeholder="e.g., soft footsteps, indoor reverb, light rain outside")
                with gr.Row():
                    want_mux_single = gr.Checkbox(value=True, label="Mux foley back into video (MP4)")
                run_btn = gr.Button("Generate", variant="primary")
                with gr.Row():
                    out_audio = gr.Audio(label="Generated Foley (48 kHz WAV)", type="filepath")
                    out_mux = gr.Video(label="Video + Foley (MP4)", visible=True)
                status_md = gr.Markdown()
                history_table = gr.Dataframe(headers=["Step", "Note"], datatype=["str","str"], interactive=False, wrap=True, label="Activity")

            run_btn.click(
                single_generate,
                inputs=[v_single, p_single, want_mux_single, project_name],
                outputs=[out_audio, out_mux, status_md, history_table]
            )

        with gr.Tab("📦 Batch-Lite (1–3 clips)"):
            files = gr.Files(label="Upload 1–3 short videos", file_types=[".mp4",".mov"], file_count="multiple")
            prompt_b = gr.Textbox(label="Global prompt (optional)")
            want_mux_b = gr.Checkbox(value=True, label="Mux each output")
            go_b = gr.Button("Run batch-lite")
            batch_status = gr.Markdown()
            batch_log = gr.Dataframe(headers=["Step","Note"], datatype=["str","str"], interactive=False, wrap=True, label="Batch Log")

            go_b.click(
                batch_lite_generate,
                inputs=[files, prompt_b, want_mux_b],
                outputs=[batch_status, batch_log]
            )

        with gr.Tab("⚙️ Settings / Tips"):
            gr.Markdown(f"""
**ZeroGPU Budget Tips**
- Keep clips **≤ {MAX_SECS}s** (tool trims automatically if longer).
- Video is downscaled to **{TARGET_H}p** to speed up inference.
- If you hit a quota message, try again later; ZeroGPU limits GPU minutes per visitor.

**Branding**
- Change brand name / color via environment variables:
  - `BILS_BRAND` → header text
  - `PRIMARY_COLOR` → UI accent hex

**Outputs**
- WAV is 48 kHz stereo. Toggle **Mux** to get a ready MP4 with the foley track.
""")

    demo.queue(max_size=24).launch()