Spaces:
Running
on
Zero
Running
on
Zero
#app.py | |
from __future__ import annotations | |
import gradio as gr | |
import os | |
import shutil | |
import datetime | |
from typing import List, Optional | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# Import projectβspecific helpers β unchanged from initial version | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
from scripts.generate_scripts import generate_script, generate_title, generate_description | |
from scripts.generate_voice import generate_voice | |
from scripts.get_footage import get_video_montage_from_folder | |
from scripts.edit_video import edit_video | |
from scripts.generate_subtitles import ( | |
transcribe_audio_to_subs, | |
chunk_text_by_words, | |
add_subtitles_to_video, | |
) | |
import torch | |
from transformers import ( | |
AutoTokenizer, | |
AutoModelForCausalLM, | |
BitsAndBytesConfig, | |
) | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# Constants & utilities | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
WORDS_PER_SECOND = 2.3 # β 140 wpm | |
ASSETS_DIRS = ( | |
"./assets/audio", | |
"./assets/backgrounds", | |
"./assets/output", | |
"./assets/video_music", | |
) | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# CONFIGURATION | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
MODEL_ID = os.getenv("MODEL_ID", "Qwen/Qwen3-4B") | |
DTYPE = torch.float16 # or torch.float16 | |
print(f"π Loading {MODEL_ID} (dtype = {DTYPE}) β¦") | |
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, trust_remote_code=True) | |
model = AutoModelForCausalLM.from_pretrained( | |
MODEL_ID, | |
torch_dtype=DTYPE, | |
trust_remote_code=True, | |
) | |
model.to("cuda" if torch.cuda.is_available() else "cpu") # single-device move | |
DEVICE = next(model.parameters()).device | |
print(f"β Model ready on {DEVICE}.") | |
for d in ASSETS_DIRS: | |
os.makedirs(d, exist_ok=True) | |
def safe_copy(src: str, dst: str) -> str: | |
"""Copy src β dst unless they are the same file, returns destination path.""" | |
if os.path.abspath(src) == os.path.abspath(dst): | |
return src | |
shutil.copy(src, dst) | |
return dst | |
# Wrapper util to timestamp generated files so different runs don't overwrite each other | |
def timestamped_filename(prefix: str, ext: str) -> str: | |
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") | |
return os.path.join("./assets/output", f"{prefix}_{ts}.{ext}") | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# Independent functional endpoints (Gradio callbacks) | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def cb_generate_script( | |
context: str, | |
instruction: str, | |
target_duration: int, | |
script_mode: str, | |
custom_script: Optional[str], | |
): | |
"""Generate (or accept) a script + title/description.""" | |
approx_words = int(target_duration * WORDS_PER_SECOND) | |
if script_mode == "Use my script": | |
if not custom_script or not custom_script.strip(): | |
raise gr.Error("β You selected 'Use my script' but the script field is empty!") | |
script = custom_script.strip() | |
else: | |
prompt = ( | |
f"You are a video creation expert. Here is the context: {context.strip()}\n" | |
f"Instruction: {instruction.strip()}\n" | |
f"π΄ Strict target duration: {target_duration}s β β {approx_words} words (must be respected)." | |
) | |
script = generate_script(model,tokenizer,prompt) | |
title = generate_title(model,tokenizer,script) | |
description = generate_description(model,tokenizer,script) | |
return script, title, description, script # last return for state update | |
def cb_generate_voice(script: str): | |
"""TTS generation from a given script, returns path to MP3.""" | |
if not script or not script.strip(): | |
raise gr.Error("β Script text is empty β generate or paste a script first.") | |
voice_path = timestamped_filename("voice", "mp3") | |
generate_voice(script, voice_path) | |
return voice_path, voice_path # second value updates state | |
def accumulate_files(new: List[str], state: List[str] | None): | |
"""Append only new valid MP4 files to state.""" | |
state = state or [] | |
for f in new or []: | |
if ( | |
isinstance(f, str) | |
and os.path.isfile(f) | |
and f.lower().endswith(".mp4") | |
and f not in state | |
): | |
state.append(f) | |
return state | |
def cb_create_montage( | |
accumulated_videos: List[str], | |
voice_path: str, | |
lum: float, | |
contrast: float, | |
gamma: float, | |
show_bar: bool, | |
): | |
"""Create the backgroundβvideo montage synced to the narration audio.""" | |
if not accumulated_videos: | |
raise gr.Error("β Please upload at least one background video (.mp4) before generating the montage.") | |
if not voice_path or not os.path.isfile(voice_path): | |
raise gr.Error("β A narration audio file (.mp3) is required β generate or upload one first.") | |
# Clean previous backgrounds, then copy new ones | |
for f in os.listdir("./assets/backgrounds"): | |
if f.lower().endswith(".mp4"): | |
os.remove(os.path.join("./assets/backgrounds", f)) | |
for idx, v in enumerate(accumulated_videos): | |
safe_copy(v, os.path.join("./assets/backgrounds", f"video_{idx:03d}.mp4")) | |
montage_path = timestamped_filename("montage", "mp4") | |
_ = get_video_montage_from_folder( | |
folder_path="./assets/backgrounds", | |
audio_path=voice_path, | |
output_dir="./assets/video_music", | |
lum=lum, | |
contrast=contrast, | |
gamma=gamma, | |
show_progress_bar=show_bar, | |
) | |
# get_video_montage_from_folder already saves the file β we just need its path | |
# It returns the path, so capture it | |
montage_path = _ | |
return montage_path, montage_path | |
def cb_mix_audio( | |
montage_path: str, | |
voice_path: str, | |
music_file: Optional[str] = None, | |
): | |
"""Combine montage video, voice audio, and optional background music.""" | |
if not montage_path or not os.path.isfile(montage_path): | |
raise gr.Error("β Please generate a montage video first.") | |
if not voice_path or not os.path.isfile(voice_path): | |
raise gr.Error("β Narration audio missing β generate or upload it.") | |
music_path = music_file if music_file and os.path.isfile(music_file) else None | |
final_no_subs = timestamped_filename("final_no_subs", "mp4") | |
edit_video(montage_path, voice_path, music_path, final_no_subs) | |
return final_no_subs, final_no_subs | |
def cb_add_subtitles(final_no_subs: str, voice_path: str): | |
"""Overlay dynamic subtitles on the mixed video.""" | |
if not final_no_subs or not os.path.isfile(final_no_subs): | |
raise gr.Error("β Mixed video not found β run the 'Mix Audio/Video' step first.") | |
if not voice_path or not os.path.isfile(voice_path): | |
raise gr.Error("β Narration audio missing.") | |
segments = transcribe_audio_to_subs(voice_path) | |
subs = chunk_text_by_words(segments, max_words=3) | |
final_with_subs = timestamped_filename("final_with_subs", "mp4") | |
add_subtitles_to_video(final_no_subs, subs, final_with_subs) | |
return final_with_subs | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# Gradio UI β one tab per function | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
demo = gr.Blocks(theme="gradio/soft") | |
with demo: | |
gr.Markdown("# π¬ Modular AI Video Toolkit") | |
gr.Markdown( | |
"Each tab exposes **one single processing step** so you can mix & match them as you like. π‘" | |
) | |
# Shared state across tabs | |
script_state = gr.State("") | |
voice_state = gr.State("") | |
montage_state = gr.State("") | |
final_no_subs_state = gr.State("") | |
# βββββββββββββββββββββββββ Script generation βββββββββββββββββββββββββ | |
with gr.Tab("1οΈβ£ Generate Script"): | |
with gr.Row(): | |
context_in = gr.Textbox(label="π§ Context", lines=4) | |
instruction_in = gr.Textbox(label="π― Instruction", lines=4) | |
duration_slider = gr.Slider(5, 120, 1, 60, label="β±οΈ Target duration (s)") | |
script_mode = gr.Radio([ | |
"Generate script with AI", | |
"Use my script", | |
], value="Generate script with AI", label="Script mode") | |
custom_script_in = gr.Textbox(label="βοΈ My script", lines=8, interactive=False) | |
def _toggle(mode): | |
return gr.update(interactive=(mode == "Use my script")) | |
script_mode.change(_toggle, inputs=script_mode, outputs=custom_script_in) | |
gen_script_btn = gr.Button("π Create Script", variant="primary") | |
script_out = gr.Textbox(label="Script", lines=8, interactive=False) | |
title_out = gr.Textbox(label="Title", lines=1, interactive=False) | |
desc_out = gr.Textbox(label="Description", lines=3, interactive=False) | |
gen_script_btn.click( | |
cb_generate_script, | |
[context_in, instruction_in, duration_slider, script_mode, custom_script_in], | |
[script_out, title_out, desc_out, script_state], | |
) | |
# βββββββββββββββββββββββββ Voice generation βββββββββββββββββββββββββ | |
with gr.Tab("2οΈβ£ Generate Voice"): | |
script_in_voice = gr.Textbox(label="Script (paste or use from previous step)", lines=8) | |
gen_voice_btn = gr.Button("π Synthesize Voice", variant="primary") | |
voice_audio = gr.Audio(label="Generated voice", interactive=False) | |
gen_voice_btn.click( | |
cb_generate_voice, | |
inputs=[script_in_voice], | |
outputs=[voice_audio, voice_state], | |
) | |
# Autoβpopulate script textbox with state when it updates | |
script_state.change(lambda s: s, script_state, script_in_voice, queue=False) | |
# βββββββββββββββββββββββββ Montage creation βββββββββββββββββββββββββ | |
with gr.Tab("3οΈβ£ Create Montage"): | |
videos_dropzone = gr.Files(label="ποΈ Background videos (MP4)", file_types=[".mp4"], type="filepath") | |
videos_state = gr.State([]) | |
videos_dropzone.upload(accumulate_files, [videos_dropzone, videos_state], videos_state, queue=False) | |
videos_display = gr.Textbox(label="Selected videos", interactive=False) | |
videos_state.change(lambda s: "\n".join(os.path.basename(f) for f in s), videos_state, videos_display, queue=False) | |
with gr.Accordion("π¨ Visual settings", open=False): | |
lum_slider = gr.Slider(0, 20, 6, step=0.5, label="Brightness (0β20)") | |
contrast_slider = gr.Slider(0.5, 2.0, 1.0, step=0.05, label="Contrast (0.5β2.0)") | |
gamma_slider = gr.Slider(0.5, 2.0, 1.0, step=0.05, label="Gamma (0.5β2.0)") | |
show_bar = gr.Checkbox(label="Show progress bar", value=True) | |
create_montage_btn = gr.Button("ποΈ Build Montage", variant="primary") | |
montage_video = gr.Video(label="Montage Preview") | |
create_montage_btn.click( | |
cb_create_montage, | |
[videos_state, voice_state, lum_slider, contrast_slider, gamma_slider, show_bar], | |
[montage_video, montage_state], | |
) | |
# βββββββββββββββββββββββββ Mixing (voice + music) βββββββββββββββββββββββββ | |
with gr.Tab("4οΈβ£ Mix Audio / Video"): | |
voice_in = gr.File(label="Narration MP3 (optional β leave empty to use state)", file_types=[".mp3"], type="filepath") | |
montage_in = gr.File(label="Montage MP4 (optional β leave empty to use state)", file_types=[".mp4"], type="filepath") | |
music_in = gr.File(label="Background music (MP3 β optional)", file_types=[".mp3"], type="filepath") | |
def _use_state(file, state): | |
return file if file else state | |
mix_btn = gr.Button("ποΈ Mix", variant="primary") | |
final_no_subs_vid = gr.Video(label="Mixed video (no subtitles)") | |
mix_btn.click( | |
lambda montage, voice, music, montage_state_val, voice_state_val: cb_mix_audio( | |
_use_state(montage, montage_state_val), | |
_use_state(voice, voice_state_val), | |
music, | |
), | |
[montage_in, voice_in, music_in, montage_state, voice_state], | |
[final_no_subs_vid, final_no_subs_state], | |
) | |
# βββββββββββββββββββββββββ Subtitles βββββββββββββββββββββββββ | |
with gr.Tab("5οΈβ£ Add Subtitles"): | |
video_in_sub = gr.File(label="Video MP4 (optional β defaults to last mixed video)", type="filepath", file_types=[".mp4"]) | |
voice_in_sub = gr.File(label="Narration MP3 (optional β defaults to last generated voice)", type="filepath", file_types=[".mp3"]) | |
add_subs_btn = gr.Button("π€ Add Subtitles", variant="primary") | |
final_subs_video = gr.Video(label="Final video with subtitles") | |
add_subs_btn.click( | |
lambda v_in, a_in, v_state, a_state: cb_add_subtitles( | |
v_in if v_in else v_state, | |
a_in if a_in else a_state, | |
), | |
[video_in_sub, voice_in_sub, final_no_subs_state, voice_state], | |
final_subs_video, | |
) | |
# Startup | |
demo.launch() | |