Xlcar / app-old.py
Carlexxx's picture
Update app-old.py
44a7f82 verified
# --- app.py (O Painel de Controle do Maestro - Produção em Lote com Diário de Bordo) ---
# By Carlex & Gemini
# --- Ato 1: A Convocação da Orquestra (Importações) ---
import gradio as gr
import torch
import spaces
import os
import yaml
from PIL import Image
import shutil
import gc
import traceback
import subprocess
import math
import google.generativeai as genai
import numpy as np
import imageio
import tempfile
from pathlib import Path
from huggingface_hub import hf_hub_download
import json
from facexlib.utils.face_restoration_helper import FaceRestoreHelper
import huggingface_hub
import spaces
import argparse
import spaces
import argparse
import cv2
from facexlib.utils.face_restoration_helper import FaceRestoreHelper
import huggingface_hub
from dreamo.dreamo_pipeline import DreamOPipeline
from dreamo.utils import img2tensor, resize_numpy_image_area, tensor2img, resize_numpy_image_long
from tools import BEN2
# --- Músicos Originais (Sua implementação) ---
from inference import create_ltx_video_pipeline, load_image_to_tensor_with_resize_and_crop, seed_everething, calculate_padding
from ltx_video.pipelines.pipeline_ltx_video import ConditioningItem
# --- Ato 2: A Preparação do Palco (Configurações) ---
config_file_path = "configs/ltxv-13b-0.9.8-distilled.yaml"
with open(config_file_path, "r") as file:
PIPELINE_CONFIG_YAML = yaml.safe_load(file)
# --- Constantes Globais ---
LTX_REPO = "Lightricks/LTX-Video"
models_dir = "downloaded_models_gradio_cpu_init"
Path(models_dir).mkdir(parents=True, exist_ok=True)
WORKSPACE_DIR = "aduc_workspace"
GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
# --- Carregamento de Modelos LTX na CPU ---
print("Baixando e criando pipelines LTX na CPU...")
distilled_model_actual_path = hf_hub_download(repo_id=LTX_REPO, filename=PIPELINE_CONFIG_YAML["checkpoint_path"], local_dir=models_dir, local_dir_use_symlinks=False)
pipeline_instance = create_ltx_video_pipeline(ckpt_path=distilled_model_actual_path, precision=PIPELINE_CONFIG_YAML["precision"], text_encoder_model_name_or_path=PIPELINE_CONFIG_YAML["text_encoder_model_name_or_path"], sampler=PIPELINE_CONFIG_YAML["sampler"], device="cpu")
print("Modelos LTX prontos.")
# --- Ato 3: As Partituras dos Músicos (Funções) ---
def get_storyboard_from_director_v2(num_fragments: int, prompt: str, initial_image_path: str, progress=gr.Progress()):
progress(0.5, desc="[Diretor Gemini] Criando o storyboard completo...")
if not initial_image_path: raise gr.Error("Por favor, forneça uma imagem de referência inicial.")
if not GEMINI_API_KEY: raise gr.Error("Chave da API Gemini (GEMINI_API_KEY) não configurada!")
genai.configure(api_key=GEMINI_API_KEY)
try:
with open("prompts/director_storyboard_v2.txt", "r", encoding="utf-8") as f: template = f.read()
except FileNotFoundError: raise gr.Error("'prompts/director_storyboard_v2.txt' não encontrado!")
director_prompt = template.format(user_prompt=prompt, num_fragments=int(num_fragments))
model = genai.GenerativeModel('gemini-2.0-flash')
img = Image.open(initial_image_path)
response = model.generate_content([director_prompt, img])
try:
cleaned_response = response.text.strip().replace("```json", "").replace("```", "")
storyboard_data = json.loads(cleaned_response)
storyboard_list = storyboard_data.get("storyboard", [])
if not storyboard_list: raise gr.Error("A IA não retornou um storyboard válido.")
return storyboard_list
except (json.JSONDecodeError, KeyError, TypeError) as e:
raise gr.Error(f"O Diretor retornou uma resposta inesperada. Erro: {e}\nResposta Bruta: {response.text}")
def run_ltx_animation(current_fragment_index, motion_prompt, input_frame_path, height, width, fps, seed, cfg, progress=gr.Progress()):
progress(0, desc=f"[Animador LTX] Aquecendo para a Cena {current_fragment_index}...")
target_device = "cuda"; output_path = os.path.join(WORKSPACE_DIR, f"fragment_{current_fragment_index}.mp4")
try:
pipeline_instance.to(target_device)
duration_fragment, target_frames_ideal = 3.0, 3.0 * fps
n_val = round((float(round(target_frames_ideal)) - 1.0) / 8.0); actual_num_frames = max(9, min(int(n_val * 8 + 1), 257))
num_frames_padded = ((actual_num_frames - 2) // 8 + 1) * 8 + 1
padded_h, padded_w = ((int(height) - 1) // 32 + 1) * 32, ((int(width) - 1) // 32 + 1) * 32
padding_vals = calculate_padding(int(height), int(width), padded_h, padded_w)
timesteps = PIPELINE_CONFIG_YAML.get("first_pass", {}).get("timesteps")
kwargs = {"prompt": motion_prompt, "negative_prompt": "blurry, distorted", "height": padded_h, "width": padded_w, "num_frames": num_frames_padded, "frame_rate": int(fps), "generator": torch.Generator(device=target_device).manual_seed(int(seed) + current_fragment_index), "output_type": "pt", "guidance_scale": float(cfg), "timesteps": timesteps, "vae_per_channel_normalize": True, "decode_timestep": PIPELINE_CONFIG_YAML["decode_timestep"], "decode_noise_scale": PIPELINE_CONFIG_YAML["decode_noise_scale"], "stochastic_sampling": PIPELINE_CONFIG_YAML["stochastic_sampling"], "image_cond_noise_scale": 0.15, "is_video": True, "mixed_precision": (PIPELINE_CONFIG_YAML["precision"] == "mixed_precision"), "offload_to_cpu": False, "enhance_prompt": False}
media_tensor = load_image_to_tensor_with_resize_and_crop(input_frame_path, int(height), int(width)); media_tensor = torch.nn.functional.pad(media_tensor, padding_vals); kwargs["conditioning_items"] = [ConditioningItem(media_tensor.to(target_device), 0, 1.0)]
result_tensor = pipeline_instance(**kwargs).images
pad_l, pad_r, pad_t, pad_b = padding_vals; slice_h, slice_w = (-pad_b if pad_b > 0 else None), (-pad_r if pad_r > 0 else None)
cropped_tensor = result_tensor[:, :, :actual_num_frames, pad_t:slice_h, pad_l:slice_w]; video_np = (cropped_tensor[0].permute(1, 2, 3, 0).cpu().float().numpy() * 255).astype(np.uint8)
with imageio.get_writer(output_path, fps=int(fps), codec='libx264', quality=8) as writer:
for i, frame in enumerate(video_np): progress(i / len(video_np), desc=f"Renderizando frame {i+1}/{len(video_np)}..."); writer.append_data(frame)
return output_path
finally:
pipeline_instance.to("cpu"); gc.collect(); torch.cuda.empty_cache()
def concatenate_masterpiece(fragment_paths: list, progress=gr.Progress()):
progress(0.5, desc="Montando a obra-prima final..."); list_file_path, final_output_path = os.path.join(WORKSPACE_DIR, "concat_list.txt"), os.path.join(WORKSPACE_DIR, "obra_prima_final.mp4")
with open(list_file_path, "w") as f:
for path in fragment_paths: f.write(f"file '{os.path.abspath(path)}'\n")
command = f"ffmpeg -y -f concat -safe 0 -i {list_file_path} -c copy {final_output_path}"
try:
subprocess.run(command, shell=True, check=True, capture_output=True, text=True); return final_output_path
except subprocess.CalledProcessError as e:
raise gr.Error(f"FFmpeg falhou ao unir os vídeos: {e.stderr}")
def run_full_production(storyboard, ref_img_path, height, width, fps, seed, cfg):
if not storyboard: raise gr.Error("Nenhum roteiro para produzir.")
if not ref_img_path: raise gr.Error("Nenhuma imagem de referência definida.")
video_fragments, log_history = [], ""
for i, motion_prompt in enumerate(storyboard):
log_message = f"Iniciando produção da Cena {i+1}/{len(storyboard)}..."
log_history += log_message + "\n"
yield {production_log_output: gr.update(value=log_history)}
fragment_path = run_ltx_animation(i + 1, motion_prompt, ref_img_path, height, width, fps, seed, cfg, gr.Progress())
video_fragments.append(fragment_path)
log_message = f"Cena {i+1} concluída e salva em {os.path.basename(fragment_path)}."
log_history += log_message + "\n"
yield {production_log_output: gr.update(value=log_history), fragment_gallery_output: gr.update(value=video_fragments), fragment_list_state: video_fragments, final_fragments_display: gr.update(value=video_fragments)}
log_history += "\nProdução de todas as cenas concluída!"
yield {production_log_output: gr.update(value=log_history)}
# --- Ato 4: A Apresentação (UI do Gradio) ---
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# LTX Video - Storyboard em Vídeo (ADUC-SDR)\n*By Carlex & Gemini*")
storyboard_state = gr.State([])
reference_image_state = gr.State("")
fragment_list_state = gr.State([])
if os.path.exists(WORKSPACE_DIR): shutil.rmtree(WORKSPACE_DIR)
os.makedirs(WORKSPACE_DIR)
with gr.Tabs():
with gr.TabItem("ETAPA 1: O DIRETOR (Roteiro Visual)"):
with gr.Row():
with gr.Column():
num_fragments_input = gr.Slider(2, 10, 4, step=1, label="Número de Cenas (Fragmentos)")
prompt_input = gr.Textbox(label="Ideia Geral (Prompt)")
image_input = gr.Image(type="filepath", label="Imagem de Referência")
director_button = gr.Button("▶️ Gerar Roteiro Visual (Gemini)", variant="primary")
with gr.Column():
storyboard_output = gr.JSON(label="Roteiro Visual Gerado (Storyboard)")
with gr.TabItem("ETAPA 2: A PRODUÇÃO (Gerar Cenas em Vídeo)"):
with gr.Row():
with gr.Column():
storyboard_to_render = gr.JSON(label="Roteiro a ser Produzido")
animator_button = gr.Button("▶️ Produzir TODAS as Cenas (LTX)", variant="primary")
production_log_output = gr.Textbox(label="Diário de Bordo da Produção", lines=5, interactive=False, placeholder="Aguardando início da produção...")
with gr.Column():
fragment_gallery_output = gr.Gallery(label="Cenas Produzidas (Fragmentos de Vídeo)", object_fit="contain", height="auto")
with gr.Row():
height_slider = gr.Slider(256, 1024, 512, step=32, label="Altura")
width_slider = gr.Slider(256, 1024, 512, step=32, label="Largura")
with gr.Row():
fps_slider = gr.Slider(8, 24, 15, step=1, label="FPS")
seed_number = gr.Number(42, label="Seed")
cfg_slider = gr.Slider(1.0, 10.0, 2.5, step=0.1, label="CFG")
with gr.TabItem("ETAPA 3: PÓS-PRODUÇÃO"):
with gr.Row():
with gr.Column():
final_fragments_display = gr.JSON(label="Vídeos a Concatenar")
editor_button = gr.Button("▶️ Concatenar Tudo (FFmpeg)", variant="primary")
with gr.Column():
final_video_output = gr.Video(label="A Obra-Prima Final")
# --- Ato 5: A Regência (Lógica de Conexão dos Botões) ---
def director_success(img_path, storyboard_json):
if not img_path: raise gr.Error("A imagem de referência é necessária.")
storyboard_list = storyboard_json if isinstance(storyboard_json, list) else storyboard_json.get("storyboard", [])
if not storyboard_list: raise gr.Error("O storyboard está vazio.")
return storyboard_list, img_path, gr.update(value=storyboard_json)
director_button.click(
fn=get_storyboard_from_director_v2,
inputs=[num_fragments_input, prompt_input, image_input],
outputs=[storyboard_output]
).success(
fn=director_success,
inputs=[image_input, storyboard_output],
outputs=[storyboard_state, reference_image_state, storyboard_to_render]
)
animator_button.click(
fn=run_full_production,
inputs=[storyboard_state, reference_image_state, height_slider, width_slider, fps_slider, seed_number, cfg_slider],
outputs=[production_log_output, fragment_gallery_output, fragment_list_state, final_fragments_display]
)
editor_button.click(
fn=concatenate_masterpiece,
inputs=[fragment_list_state],
outputs=[final_video_output]
)
if __name__ == "__main__":
demo.queue().launch(server_name="0.0.0.0", share=True)