File size: 12,244 Bytes
44a7f82
d5176a0
44f5806
44a7f82
4bf2150
 
 
 
 
 
 
 
 
d5176a0
 
 
 
 
 
 
 
 
44a7f82
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4bf2150
44a7f82
d5176a0
 
44a7f82
 
4bf2150
44a7f82
 
 
 
0f7ac82
 
4bf2150
d5176a0
44a7f82
 
 
d5176a0
44f5806
176805b
d5176a0
 
 
44a7f82
d5176a0
 
44a7f82
 
 
d5176a0
 
 
 
 
44a7f82
d5176a0
 
 
 
 
 
 
 
 
44a7f82
d5176a0
 
 
 
4bf2150
176805b
d5176a0
 
 
 
 
 
44a7f82
d5176a0
 
 
 
 
 
 
4bf2150
d5176a0
 
 
 
 
 
 
 
 
 
 
 
44a7f82
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d5176a0
 
 
 
 
 
44a7f82
d5176a0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
44a7f82
d5176a0
44a7f82
d5176a0
 
 
 
 
 
 
 
 
 
 
 
 
44a7f82
d5176a0
 
 
 
44a7f82
d5176a0
 
 
44a7f82
d5176a0
 
 
 
 
 
 
 
 
 
 
 
b980fa1
d5176a0
 
 
44a7f82
d5176a0
 
 
 
 
 
b980fa1
4bf2150
 
a8d2e01
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# --- app.py (O Painel de Controle do Maestro - Produção em Lote com Diário de Bordo) ---
# By Carlex & Gemini

# --- Ato 1: A Convocação da Orquestra (Importações) ---
import gradio as gr
import torch
import spaces
import os
import yaml
from PIL import Image
import shutil
import gc
import traceback
import subprocess
import math
import google.generativeai as genai
import numpy as np
import imageio
import tempfile
from pathlib import Path
from huggingface_hub import hf_hub_download
import json
from facexlib.utils.face_restoration_helper import FaceRestoreHelper
import huggingface_hub
import spaces
import argparse

import spaces
import argparse


import cv2

from facexlib.utils.face_restoration_helper import FaceRestoreHelper
import huggingface_hub



from dreamo.dreamo_pipeline import DreamOPipeline
from dreamo.utils import img2tensor, resize_numpy_image_area, tensor2img, resize_numpy_image_long
from tools import BEN2


# --- Músicos Originais (Sua implementação) ---
from inference import create_ltx_video_pipeline, load_image_to_tensor_with_resize_and_crop, seed_everething, calculate_padding
from ltx_video.pipelines.pipeline_ltx_video import ConditioningItem

# --- Ato 2: A Preparação do Palco (Configurações) ---
config_file_path = "configs/ltxv-13b-0.9.8-distilled.yaml"
with open(config_file_path, "r") as file:
    PIPELINE_CONFIG_YAML = yaml.safe_load(file)

# --- Constantes Globais ---
LTX_REPO = "Lightricks/LTX-Video"
models_dir = "downloaded_models_gradio_cpu_init"
Path(models_dir).mkdir(parents=True, exist_ok=True)
WORKSPACE_DIR = "aduc_workspace"
GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")

# --- Carregamento de Modelos LTX na CPU ---
print("Baixando e criando pipelines LTX na CPU...")
distilled_model_actual_path = hf_hub_download(repo_id=LTX_REPO, filename=PIPELINE_CONFIG_YAML["checkpoint_path"], local_dir=models_dir, local_dir_use_symlinks=False)
pipeline_instance = create_ltx_video_pipeline(ckpt_path=distilled_model_actual_path, precision=PIPELINE_CONFIG_YAML["precision"], text_encoder_model_name_or_path=PIPELINE_CONFIG_YAML["text_encoder_model_name_or_path"], sampler=PIPELINE_CONFIG_YAML["sampler"], device="cpu")
print("Modelos LTX prontos.")


# --- Ato 3: As Partituras dos Músicos (Funções) ---

def get_storyboard_from_director_v2(num_fragments: int, prompt: str, initial_image_path: str, progress=gr.Progress()):
    progress(0.5, desc="[Diretor Gemini] Criando o storyboard completo...")
    if not initial_image_path: raise gr.Error("Por favor, forneça uma imagem de referência inicial.")
    if not GEMINI_API_KEY: raise gr.Error("Chave da API Gemini (GEMINI_API_KEY) não configurada!")
    genai.configure(api_key=GEMINI_API_KEY)
    try:
        with open("prompts/director_storyboard_v2.txt", "r", encoding="utf-8") as f: template = f.read()
    except FileNotFoundError: raise gr.Error("'prompts/director_storyboard_v2.txt' não encontrado!")
    director_prompt = template.format(user_prompt=prompt, num_fragments=int(num_fragments))
    model = genai.GenerativeModel('gemini-2.0-flash')
    img = Image.open(initial_image_path)
    response = model.generate_content([director_prompt, img])
    try:
        cleaned_response = response.text.strip().replace("```json", "").replace("```", "")
        storyboard_data = json.loads(cleaned_response)
        storyboard_list = storyboard_data.get("storyboard", [])
        if not storyboard_list: raise gr.Error("A IA não retornou um storyboard válido.")
        return storyboard_list
    except (json.JSONDecodeError, KeyError, TypeError) as e:
        raise gr.Error(f"O Diretor retornou uma resposta inesperada. Erro: {e}\nResposta Bruta: {response.text}")

def run_ltx_animation(current_fragment_index, motion_prompt, input_frame_path, height, width, fps, seed, cfg, progress=gr.Progress()):
    progress(0, desc=f"[Animador LTX] Aquecendo para a Cena {current_fragment_index}...")
    target_device = "cuda"; output_path = os.path.join(WORKSPACE_DIR, f"fragment_{current_fragment_index}.mp4")
    try:
        pipeline_instance.to(target_device)
        duration_fragment, target_frames_ideal = 3.0, 3.0 * fps
        n_val = round((float(round(target_frames_ideal)) - 1.0) / 8.0); actual_num_frames = max(9, min(int(n_val * 8 + 1), 257))
        num_frames_padded = ((actual_num_frames - 2) // 8 + 1) * 8 + 1
        padded_h, padded_w = ((int(height) - 1) // 32 + 1) * 32, ((int(width) - 1) // 32 + 1) * 32
        padding_vals = calculate_padding(int(height), int(width), padded_h, padded_w)
        timesteps = PIPELINE_CONFIG_YAML.get("first_pass", {}).get("timesteps")
        kwargs = {"prompt": motion_prompt, "negative_prompt": "blurry, distorted", "height": padded_h, "width": padded_w, "num_frames": num_frames_padded, "frame_rate": int(fps), "generator": torch.Generator(device=target_device).manual_seed(int(seed) + current_fragment_index), "output_type": "pt", "guidance_scale": float(cfg), "timesteps": timesteps, "vae_per_channel_normalize": True, "decode_timestep": PIPELINE_CONFIG_YAML["decode_timestep"], "decode_noise_scale": PIPELINE_CONFIG_YAML["decode_noise_scale"], "stochastic_sampling": PIPELINE_CONFIG_YAML["stochastic_sampling"], "image_cond_noise_scale": 0.15, "is_video": True, "mixed_precision": (PIPELINE_CONFIG_YAML["precision"] == "mixed_precision"), "offload_to_cpu": False, "enhance_prompt": False}
        media_tensor = load_image_to_tensor_with_resize_and_crop(input_frame_path, int(height), int(width)); media_tensor = torch.nn.functional.pad(media_tensor, padding_vals); kwargs["conditioning_items"] = [ConditioningItem(media_tensor.to(target_device), 0, 1.0)]
        result_tensor = pipeline_instance(**kwargs).images
        pad_l, pad_r, pad_t, pad_b = padding_vals; slice_h, slice_w = (-pad_b if pad_b > 0 else None), (-pad_r if pad_r > 0 else None)
        cropped_tensor = result_tensor[:, :, :actual_num_frames, pad_t:slice_h, pad_l:slice_w]; video_np = (cropped_tensor[0].permute(1, 2, 3, 0).cpu().float().numpy() * 255).astype(np.uint8)
        with imageio.get_writer(output_path, fps=int(fps), codec='libx264', quality=8) as writer:
            for i, frame in enumerate(video_np): progress(i / len(video_np), desc=f"Renderizando frame {i+1}/{len(video_np)}..."); writer.append_data(frame)
        return output_path
    finally:
        pipeline_instance.to("cpu"); gc.collect(); torch.cuda.empty_cache()

def concatenate_masterpiece(fragment_paths: list, progress=gr.Progress()):
    progress(0.5, desc="Montando a obra-prima final..."); list_file_path, final_output_path = os.path.join(WORKSPACE_DIR, "concat_list.txt"), os.path.join(WORKSPACE_DIR, "obra_prima_final.mp4")
    with open(list_file_path, "w") as f:
        for path in fragment_paths: f.write(f"file '{os.path.abspath(path)}'\n")
    command = f"ffmpeg -y -f concat -safe 0 -i {list_file_path} -c copy {final_output_path}"
    try:
        subprocess.run(command, shell=True, check=True, capture_output=True, text=True); return final_output_path
    except subprocess.CalledProcessError as e:
        raise gr.Error(f"FFmpeg falhou ao unir os vídeos: {e.stderr}")

def run_full_production(storyboard, ref_img_path, height, width, fps, seed, cfg):
    if not storyboard: raise gr.Error("Nenhum roteiro para produzir.")
    if not ref_img_path: raise gr.Error("Nenhuma imagem de referência definida.")
    video_fragments, log_history = [], ""
    for i, motion_prompt in enumerate(storyboard):
        log_message = f"Iniciando produção da Cena {i+1}/{len(storyboard)}..."
        log_history += log_message + "\n"
        yield {production_log_output: gr.update(value=log_history)}
        fragment_path = run_ltx_animation(i + 1, motion_prompt, ref_img_path, height, width, fps, seed, cfg, gr.Progress())
        video_fragments.append(fragment_path)
        log_message = f"Cena {i+1} concluída e salva em {os.path.basename(fragment_path)}."
        log_history += log_message + "\n"
        yield {production_log_output: gr.update(value=log_history), fragment_gallery_output: gr.update(value=video_fragments), fragment_list_state: video_fragments, final_fragments_display: gr.update(value=video_fragments)}
    log_history += "\nProdução de todas as cenas concluída!"
    yield {production_log_output: gr.update(value=log_history)}

# --- Ato 4: A Apresentação (UI do Gradio) ---
with gr.Blocks(theme=gr.themes.Soft()) as demo:
    gr.Markdown("# LTX Video - Storyboard em Vídeo (ADUC-SDR)\n*By Carlex & Gemini*")
    
    storyboard_state = gr.State([])
    reference_image_state = gr.State("")
    fragment_list_state = gr.State([])

    if os.path.exists(WORKSPACE_DIR): shutil.rmtree(WORKSPACE_DIR)
    os.makedirs(WORKSPACE_DIR)

    with gr.Tabs():
        with gr.TabItem("ETAPA 1: O DIRETOR (Roteiro Visual)"):
            with gr.Row():
                with gr.Column():
                    num_fragments_input = gr.Slider(2, 10, 4, step=1, label="Número de Cenas (Fragmentos)")
                    prompt_input = gr.Textbox(label="Ideia Geral (Prompt)")
                    image_input = gr.Image(type="filepath", label="Imagem de Referência")
                    director_button = gr.Button("▶️ Gerar Roteiro Visual (Gemini)", variant="primary")
                with gr.Column():
                    storyboard_output = gr.JSON(label="Roteiro Visual Gerado (Storyboard)")

        with gr.TabItem("ETAPA 2: A PRODUÇÃO (Gerar Cenas em Vídeo)"):
            with gr.Row():
                with gr.Column():
                    storyboard_to_render = gr.JSON(label="Roteiro a ser Produzido")
                    animator_button = gr.Button("▶️ Produzir TODAS as Cenas (LTX)", variant="primary")
                    production_log_output = gr.Textbox(label="Diário de Bordo da Produção", lines=5, interactive=False, placeholder="Aguardando início da produção...")
                with gr.Column():
                    fragment_gallery_output = gr.Gallery(label="Cenas Produzidas (Fragmentos de Vídeo)", object_fit="contain", height="auto")
            with gr.Row():
                height_slider = gr.Slider(256, 1024, 512, step=32, label="Altura")
                width_slider = gr.Slider(256, 1024, 512, step=32, label="Largura")
            with gr.Row():
                fps_slider = gr.Slider(8, 24, 15, step=1, label="FPS")
                seed_number = gr.Number(42, label="Seed")
                cfg_slider = gr.Slider(1.0, 10.0, 2.5, step=0.1, label="CFG")

        with gr.TabItem("ETAPA 3: PÓS-PRODUÇÃO"):
            with gr.Row():
                with gr.Column():
                    final_fragments_display = gr.JSON(label="Vídeos a Concatenar")
                    editor_button = gr.Button("▶️ Concatenar Tudo (FFmpeg)", variant="primary")
                with gr.Column():
                    final_video_output = gr.Video(label="A Obra-Prima Final")

    # --- Ato 5: A Regência (Lógica de Conexão dos Botões) ---

    def director_success(img_path, storyboard_json):
        if not img_path: raise gr.Error("A imagem de referência é necessária.")
        storyboard_list = storyboard_json if isinstance(storyboard_json, list) else storyboard_json.get("storyboard", [])
        if not storyboard_list: raise gr.Error("O storyboard está vazio.")
        return storyboard_list, img_path, gr.update(value=storyboard_json)

    director_button.click(
        fn=get_storyboard_from_director_v2,
        inputs=[num_fragments_input, prompt_input, image_input],
        outputs=[storyboard_output]
    ).success(
        fn=director_success,
        inputs=[image_input, storyboard_output],
        outputs=[storyboard_state, reference_image_state, storyboard_to_render]
    )
    
    animator_button.click(
        fn=run_full_production,
        inputs=[storyboard_state, reference_image_state, height_slider, width_slider, fps_slider, seed_number, cfg_slider],
        outputs=[production_log_output, fragment_gallery_output, fragment_list_state, final_fragments_display]
    )

    editor_button.click(
        fn=concatenate_masterpiece,
        inputs=[fragment_list_state],
        outputs=[final_video_output]
    )

if __name__ == "__main__":
    demo.queue().launch(server_name="0.0.0.0", share=True)