|
|
|
|
|
|
|
|
|
import gradio as gr |
|
import torch |
|
import spaces |
|
import os |
|
import yaml |
|
from PIL import Image |
|
import shutil |
|
import gc |
|
import traceback |
|
import subprocess |
|
import math |
|
import google.generativeai as genai |
|
import numpy as np |
|
import imageio |
|
import tempfile |
|
from pathlib import Path |
|
from huggingface_hub import hf_hub_download |
|
import json |
|
from facexlib.utils.face_restoration_helper import FaceRestoreHelper |
|
import huggingface_hub |
|
import spaces |
|
import argparse |
|
|
|
import spaces |
|
import argparse |
|
|
|
|
|
import cv2 |
|
|
|
from facexlib.utils.face_restoration_helper import FaceRestoreHelper |
|
import huggingface_hub |
|
|
|
|
|
|
|
from dreamo.dreamo_pipeline import DreamOPipeline |
|
from dreamo.utils import img2tensor, resize_numpy_image_area, tensor2img, resize_numpy_image_long |
|
from tools import BEN2 |
|
|
|
|
|
|
|
from inference import create_ltx_video_pipeline, load_image_to_tensor_with_resize_and_crop, seed_everething, calculate_padding |
|
from ltx_video.pipelines.pipeline_ltx_video import ConditioningItem |
|
|
|
|
|
config_file_path = "configs/ltxv-13b-0.9.8-distilled.yaml" |
|
with open(config_file_path, "r") as file: |
|
PIPELINE_CONFIG_YAML = yaml.safe_load(file) |
|
|
|
|
|
LTX_REPO = "Lightricks/LTX-Video" |
|
models_dir = "downloaded_models_gradio_cpu_init" |
|
Path(models_dir).mkdir(parents=True, exist_ok=True) |
|
WORKSPACE_DIR = "aduc_workspace" |
|
GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY") |
|
|
|
|
|
print("Baixando e criando pipelines LTX na CPU...") |
|
distilled_model_actual_path = hf_hub_download(repo_id=LTX_REPO, filename=PIPELINE_CONFIG_YAML["checkpoint_path"], local_dir=models_dir, local_dir_use_symlinks=False) |
|
pipeline_instance = create_ltx_video_pipeline(ckpt_path=distilled_model_actual_path, precision=PIPELINE_CONFIG_YAML["precision"], text_encoder_model_name_or_path=PIPELINE_CONFIG_YAML["text_encoder_model_name_or_path"], sampler=PIPELINE_CONFIG_YAML["sampler"], device="cpu") |
|
print("Modelos LTX prontos.") |
|
|
|
|
|
|
|
|
|
def get_storyboard_from_director_v2(num_fragments: int, prompt: str, initial_image_path: str, progress=gr.Progress()): |
|
progress(0.5, desc="[Diretor Gemini] Criando o storyboard completo...") |
|
if not initial_image_path: raise gr.Error("Por favor, forneça uma imagem de referência inicial.") |
|
if not GEMINI_API_KEY: raise gr.Error("Chave da API Gemini (GEMINI_API_KEY) não configurada!") |
|
genai.configure(api_key=GEMINI_API_KEY) |
|
try: |
|
with open("prompts/director_storyboard_v2.txt", "r", encoding="utf-8") as f: template = f.read() |
|
except FileNotFoundError: raise gr.Error("'prompts/director_storyboard_v2.txt' não encontrado!") |
|
director_prompt = template.format(user_prompt=prompt, num_fragments=int(num_fragments)) |
|
model = genai.GenerativeModel('gemini-2.0-flash') |
|
img = Image.open(initial_image_path) |
|
response = model.generate_content([director_prompt, img]) |
|
try: |
|
cleaned_response = response.text.strip().replace("```json", "").replace("```", "") |
|
storyboard_data = json.loads(cleaned_response) |
|
storyboard_list = storyboard_data.get("storyboard", []) |
|
if not storyboard_list: raise gr.Error("A IA não retornou um storyboard válido.") |
|
return storyboard_list |
|
except (json.JSONDecodeError, KeyError, TypeError) as e: |
|
raise gr.Error(f"O Diretor retornou uma resposta inesperada. Erro: {e}\nResposta Bruta: {response.text}") |
|
|
|
def run_ltx_animation(current_fragment_index, motion_prompt, input_frame_path, height, width, fps, seed, cfg, progress=gr.Progress()): |
|
progress(0, desc=f"[Animador LTX] Aquecendo para a Cena {current_fragment_index}...") |
|
target_device = "cuda"; output_path = os.path.join(WORKSPACE_DIR, f"fragment_{current_fragment_index}.mp4") |
|
try: |
|
pipeline_instance.to(target_device) |
|
duration_fragment, target_frames_ideal = 3.0, 3.0 * fps |
|
n_val = round((float(round(target_frames_ideal)) - 1.0) / 8.0); actual_num_frames = max(9, min(int(n_val * 8 + 1), 257)) |
|
num_frames_padded = ((actual_num_frames - 2) // 8 + 1) * 8 + 1 |
|
padded_h, padded_w = ((int(height) - 1) // 32 + 1) * 32, ((int(width) - 1) // 32 + 1) * 32 |
|
padding_vals = calculate_padding(int(height), int(width), padded_h, padded_w) |
|
timesteps = PIPELINE_CONFIG_YAML.get("first_pass", {}).get("timesteps") |
|
kwargs = {"prompt": motion_prompt, "negative_prompt": "blurry, distorted", "height": padded_h, "width": padded_w, "num_frames": num_frames_padded, "frame_rate": int(fps), "generator": torch.Generator(device=target_device).manual_seed(int(seed) + current_fragment_index), "output_type": "pt", "guidance_scale": float(cfg), "timesteps": timesteps, "vae_per_channel_normalize": True, "decode_timestep": PIPELINE_CONFIG_YAML["decode_timestep"], "decode_noise_scale": PIPELINE_CONFIG_YAML["decode_noise_scale"], "stochastic_sampling": PIPELINE_CONFIG_YAML["stochastic_sampling"], "image_cond_noise_scale": 0.15, "is_video": True, "mixed_precision": (PIPELINE_CONFIG_YAML["precision"] == "mixed_precision"), "offload_to_cpu": False, "enhance_prompt": False} |
|
media_tensor = load_image_to_tensor_with_resize_and_crop(input_frame_path, int(height), int(width)); media_tensor = torch.nn.functional.pad(media_tensor, padding_vals); kwargs["conditioning_items"] = [ConditioningItem(media_tensor.to(target_device), 0, 1.0)] |
|
result_tensor = pipeline_instance(**kwargs).images |
|
pad_l, pad_r, pad_t, pad_b = padding_vals; slice_h, slice_w = (-pad_b if pad_b > 0 else None), (-pad_r if pad_r > 0 else None) |
|
cropped_tensor = result_tensor[:, :, :actual_num_frames, pad_t:slice_h, pad_l:slice_w]; video_np = (cropped_tensor[0].permute(1, 2, 3, 0).cpu().float().numpy() * 255).astype(np.uint8) |
|
with imageio.get_writer(output_path, fps=int(fps), codec='libx264', quality=8) as writer: |
|
for i, frame in enumerate(video_np): progress(i / len(video_np), desc=f"Renderizando frame {i+1}/{len(video_np)}..."); writer.append_data(frame) |
|
return output_path |
|
finally: |
|
pipeline_instance.to("cpu"); gc.collect(); torch.cuda.empty_cache() |
|
|
|
def concatenate_masterpiece(fragment_paths: list, progress=gr.Progress()): |
|
progress(0.5, desc="Montando a obra-prima final..."); list_file_path, final_output_path = os.path.join(WORKSPACE_DIR, "concat_list.txt"), os.path.join(WORKSPACE_DIR, "obra_prima_final.mp4") |
|
with open(list_file_path, "w") as f: |
|
for path in fragment_paths: f.write(f"file '{os.path.abspath(path)}'\n") |
|
command = f"ffmpeg -y -f concat -safe 0 -i {list_file_path} -c copy {final_output_path}" |
|
try: |
|
subprocess.run(command, shell=True, check=True, capture_output=True, text=True); return final_output_path |
|
except subprocess.CalledProcessError as e: |
|
raise gr.Error(f"FFmpeg falhou ao unir os vídeos: {e.stderr}") |
|
|
|
def run_full_production(storyboard, ref_img_path, height, width, fps, seed, cfg): |
|
if not storyboard: raise gr.Error("Nenhum roteiro para produzir.") |
|
if not ref_img_path: raise gr.Error("Nenhuma imagem de referência definida.") |
|
video_fragments, log_history = [], "" |
|
for i, motion_prompt in enumerate(storyboard): |
|
log_message = f"Iniciando produção da Cena {i+1}/{len(storyboard)}..." |
|
log_history += log_message + "\n" |
|
yield {production_log_output: gr.update(value=log_history)} |
|
fragment_path = run_ltx_animation(i + 1, motion_prompt, ref_img_path, height, width, fps, seed, cfg, gr.Progress()) |
|
video_fragments.append(fragment_path) |
|
log_message = f"Cena {i+1} concluída e salva em {os.path.basename(fragment_path)}." |
|
log_history += log_message + "\n" |
|
yield {production_log_output: gr.update(value=log_history), fragment_gallery_output: gr.update(value=video_fragments), fragment_list_state: video_fragments, final_fragments_display: gr.update(value=video_fragments)} |
|
log_history += "\nProdução de todas as cenas concluída!" |
|
yield {production_log_output: gr.update(value=log_history)} |
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Soft()) as demo: |
|
gr.Markdown("# LTX Video - Storyboard em Vídeo (ADUC-SDR)\n*By Carlex & Gemini*") |
|
|
|
storyboard_state = gr.State([]) |
|
reference_image_state = gr.State("") |
|
fragment_list_state = gr.State([]) |
|
|
|
if os.path.exists(WORKSPACE_DIR): shutil.rmtree(WORKSPACE_DIR) |
|
os.makedirs(WORKSPACE_DIR) |
|
|
|
with gr.Tabs(): |
|
with gr.TabItem("ETAPA 1: O DIRETOR (Roteiro Visual)"): |
|
with gr.Row(): |
|
with gr.Column(): |
|
num_fragments_input = gr.Slider(2, 10, 4, step=1, label="Número de Cenas (Fragmentos)") |
|
prompt_input = gr.Textbox(label="Ideia Geral (Prompt)") |
|
image_input = gr.Image(type="filepath", label="Imagem de Referência") |
|
director_button = gr.Button("▶️ Gerar Roteiro Visual (Gemini)", variant="primary") |
|
with gr.Column(): |
|
storyboard_output = gr.JSON(label="Roteiro Visual Gerado (Storyboard)") |
|
|
|
with gr.TabItem("ETAPA 2: A PRODUÇÃO (Gerar Cenas em Vídeo)"): |
|
with gr.Row(): |
|
with gr.Column(): |
|
storyboard_to_render = gr.JSON(label="Roteiro a ser Produzido") |
|
animator_button = gr.Button("▶️ Produzir TODAS as Cenas (LTX)", variant="primary") |
|
production_log_output = gr.Textbox(label="Diário de Bordo da Produção", lines=5, interactive=False, placeholder="Aguardando início da produção...") |
|
with gr.Column(): |
|
fragment_gallery_output = gr.Gallery(label="Cenas Produzidas (Fragmentos de Vídeo)", object_fit="contain", height="auto") |
|
with gr.Row(): |
|
height_slider = gr.Slider(256, 1024, 512, step=32, label="Altura") |
|
width_slider = gr.Slider(256, 1024, 512, step=32, label="Largura") |
|
with gr.Row(): |
|
fps_slider = gr.Slider(8, 24, 15, step=1, label="FPS") |
|
seed_number = gr.Number(42, label="Seed") |
|
cfg_slider = gr.Slider(1.0, 10.0, 2.5, step=0.1, label="CFG") |
|
|
|
with gr.TabItem("ETAPA 3: PÓS-PRODUÇÃO"): |
|
with gr.Row(): |
|
with gr.Column(): |
|
final_fragments_display = gr.JSON(label="Vídeos a Concatenar") |
|
editor_button = gr.Button("▶️ Concatenar Tudo (FFmpeg)", variant="primary") |
|
with gr.Column(): |
|
final_video_output = gr.Video(label="A Obra-Prima Final") |
|
|
|
|
|
|
|
def director_success(img_path, storyboard_json): |
|
if not img_path: raise gr.Error("A imagem de referência é necessária.") |
|
storyboard_list = storyboard_json if isinstance(storyboard_json, list) else storyboard_json.get("storyboard", []) |
|
if not storyboard_list: raise gr.Error("O storyboard está vazio.") |
|
return storyboard_list, img_path, gr.update(value=storyboard_json) |
|
|
|
director_button.click( |
|
fn=get_storyboard_from_director_v2, |
|
inputs=[num_fragments_input, prompt_input, image_input], |
|
outputs=[storyboard_output] |
|
).success( |
|
fn=director_success, |
|
inputs=[image_input, storyboard_output], |
|
outputs=[storyboard_state, reference_image_state, storyboard_to_render] |
|
) |
|
|
|
animator_button.click( |
|
fn=run_full_production, |
|
inputs=[storyboard_state, reference_image_state, height_slider, width_slider, fps_slider, seed_number, cfg_slider], |
|
outputs=[production_log_output, fragment_gallery_output, fragment_list_state, final_fragments_display] |
|
) |
|
|
|
editor_button.click( |
|
fn=concatenate_masterpiece, |
|
inputs=[fragment_list_state], |
|
outputs=[final_video_output] |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.queue().launch(server_name="0.0.0.0", share=True) |