Xlcar / appv3.py
Carlexxx's picture
Rename app.py to appv3.py
3060e82 verified
# --- app.py (ADUC-SDR v4.0 - Correção de Compilação e Estado) ---
# By Carlex & Gemini & DreamO
# --- Ato 1: A Convocação da Orquestra (Importações) ---
import gradio as gr
import torch
import os
import yaml
from PIL import Image
import shutil
import gc
import subprocess
import math
import google.generativeai as genai
import numpy as np
import imageio
from pathlib import Path
import huggingface_hub
import json
from inference import create_ltx_video_pipeline, load_image_to_tensor_with_resize_and_crop, ConditioningItem, calculate_padding
from dreamo_helpers import dreamo_generator_singleton
# --- Ato 2: A Preparação do Palco (Configurações) ---
config_file_path = "configs/ltxv-13b-0.9.8-distilled.yaml"
with open(config_file_path, "r") as file:
PIPELINE_CONFIG_YAML = yaml.safe_load(file)
LTX_REPO = "Lightricks/LTX-Video"
models_dir = "downloaded_models_gradio_cpu_init"
Path(models_dir).mkdir(parents=True, exist_ok=True)
WORKSPACE_DIR = "aduc_workspace"
GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
VIDEO_FPS = 30
VIDEO_DURATION_SECONDS = 3
VIDEO_TOTAL_FRAMES = VIDEO_DURATION_SECONDS * VIDEO_FPS
print("Baixando e criando pipelines LTX na CPU...")
distilled_model_actual_path = huggingface_hub.hf_hub_download(repo_id=LTX_REPO, filename=PIPELINE_CONFIG_YAML["checkpoint_path"], local_dir=models_dir, local_dir_use_symlinks=False)
pipeline_instance_original = create_ltx_video_pipeline(ckpt_path=distilled_model_actual_path, precision=PIPELINE_CONFIG_YAML["precision"], text_encoder_model_name_or_path=PIPELINE_CONFIG_YAML["text_encoder_model_name_or_path"], sampler=PIPELINE_CONFIG_YAML["sampler"], device='cpu')
print("Modelos LTX prontos (na CPU).")
# <<< CORREÇÃO: A variável global `pipeline_instance` será o objeto que usamos. >>>
pipeline_instance = pipeline_instance_original
if torch.cuda.is_available():
print("Compilando o modelo LTX para otimização de desempenho (torch.compile)...")
try:
# Reatribui a variável global com a versão compilada
pipeline_instance = torch.compile(pipeline_instance_original, mode="reduce-overhead", fullgraph=True)
print("Modelo compilado com sucesso.")
except Exception as e:
print(f"Falha ao compilar o modelo, usando a versão não compilada. Erro: {e}")
pipeline_instance = pipeline_instance_original
# --- Ato 3: As Partituras dos Músicos (Funções) ---
def get_next_scene_prompt(user_prompt: str, prompt_history_str: str, previous_image_path: str):
genai.configure(api_key=GEMINI_API_KEY)
script_dir = os.path.dirname(os.path.abspath(__file__))
prompt_file_path = os.path.join(script_dir, "prompts", "photographer_sequential_prompt.txt")
with open(prompt_file_path, "r", encoding="utf-8") as f: template = f.read()
model_prompt = template.format(user_prompt=user_prompt, prompt_history=prompt_history_str)
img = Image.open(previous_image_path)
model = genai.GenerativeModel('gemini-2.0-flash')
response = model.generate_content([model_prompt, img])
try:
cleaned_response = response.text.strip().replace("```json", "").replace("```", "")
data = json.loads(cleaned_response)
return data.get("next_scene_prompt")
except Exception as e:
raise gr.Error(f"Fotógrafo Sequencial falhou: {e}. Resposta: {response.text}")
def get_motion_prompt_for_pair(user_prompt: str, start_image_path: str, end_image_path: str):
genai.configure(api_key=GEMINI_API_KEY)
script_dir = os.path.dirname(os.path.abspath(__file__))
prompt_file_path = os.path.join(script_dir, "prompts", "director_sequential_prompt.txt")
with open(prompt_file_path, "r", encoding="utf-8") as f: template = f.read()
model_prompt = template.format(user_prompt=user_prompt)
img1 = Image.open(start_image_path)
img2 = Image.open(end_image_path)
model = genai.GenerativeModel('gemini-2.0-flash')
response = model.generate_content([model_prompt, img1, img2])
try:
cleaned_response = response.text.strip().replace("```json", "").replace("```", "")
data = json.loads(cleaned_response)
return data.get("motion_prompt")
except Exception as e:
raise gr.Error(f"Diretor Sequencial falhou: {e}. Resposta: {response.text}")
def run_ltx_animation(current_fragment_index, motion_prompt, conditioning_items_data, width, height, seed, cfg, progress=gr.Progress()):
progress(0, desc=f"[Animador LTX] Gerando Cena {current_fragment_index}...");
output_path = os.path.join(WORKSPACE_DIR, f"fragment_{current_fragment_index}.mp4");
target_device = 'cuda' if torch.cuda.is_available() else 'cpu'
result_tensor = None
try:
pipeline_instance.to(target_device)
conditioning_items = []
for (path, start_frame, strength) in conditioning_items_data:
tensor = load_image_to_tensor_with_resize_and_crop(path, height, width)
conditioning_items.append(ConditioningItem(tensor.to(target_device), start_frame, strength))
n_val = round((float(VIDEO_TOTAL_FRAMES) - 1.0) / 8.0)
actual_num_frames = int(n_val * 8 + 1)
padded_h, padded_w = ((height - 1) // 32 + 1) * 32, ((width - 1) // 32 + 1) * 32
padding_vals = calculate_padding(height, width, padded_h, padded_w)
for cond_item in conditioning_items: cond_item.media_item = torch.nn.functional.pad(cond_item.media_item, padding_vals)
first_pass_config = PIPELINE_CONFIG_YAML.get("first_pass", {})
kwargs = {
"prompt": motion_prompt, "negative_prompt": "blurry, distorted, bad quality, artifacts",
"height": padded_h, "width": padded_w, "num_frames": actual_num_frames, "frame_rate": VIDEO_FPS,
"generator": torch.Generator(device=target_device).manual_seed(int(seed) + current_fragment_index),
"output_type": "pt", "guidance_scale": float(cfg), "timesteps": first_pass_config.get("timesteps"),
"stg_scale": first_pass_config.get("stg_scale"), "rescaling_scale": first_pass_config.get("rescaling_scale"),
"skip_block_list": first_pass_config.get("skip_block_list"), "conditioning_items": conditioning_items,
"decode_timestep": PIPELINE_CONFIG_YAML.get("decode_timestep"), "decode_noise_scale": PIPELINE_CONFIG_YAML.get("decode_noise_scale"),
"stochastic_sampling": PIPELINE_CONFIG_YAML.get("stochastic_sampling"), "image_cond_noise_scale": 0.15,
"is_video": True, "vae_per_channel_normalize": True,
"mixed_precision": (PIPELINE_CONFIG_YAML.get("precision") == "mixed_precision"), "offload_to_cpu": False, "enhance_prompt": False
}
result_tensor = pipeline_instance(**kwargs).images
pad_l, pad_r, pad_t, pad_b = map(int, padding_vals)
slice_h = -pad_b if pad_b > 0 else None
slice_w = -pad_r if pad_r > 0 else None
cropped_tensor = result_tensor[:, :, :VIDEO_TOTAL_FRAMES, pad_t:slice_h, pad_l:slice_w]
video_np = (cropped_tensor[0].permute(1, 2, 3, 0).cpu().float().numpy() * 255).astype(np.uint8)
with imageio.get_writer(output_path, fps=VIDEO_FPS, codec='libx264', quality=8) as writer:
for i, frame in enumerate(video_np):
progress(i / len(video_np), desc=f"Renderizando frame {i+1}/{len(video_np)}...")
writer.append_data(frame)
return output_path
finally:
pipeline_instance.to('cpu')
if result_tensor is not None: del result_tensor
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
print("Memória do Animador LTX liberada.")
def concatenate_masterpiece(fragment_paths: list, progress=gr.Progress()):
if not fragment_paths: return None
progress(0.5, desc="Montando a obra-prima final...");
list_file_path = os.path.join(WORKSPACE_DIR, "concat_list.txt")
final_output_path = os.path.join(WORKSPACE_DIR, "obra_prima_final.mp4")
with open(list_file_path, "w") as f:
for path in fragment_paths: f.write(f"file '{os.path.abspath(path)}'\n")
command = f"ffmpeg -y -f concat -safe 0 -i {list_file_path} -c copy {final_output_path}"
try:
subprocess.run(command, shell=True, check=True, capture_output=True, text=True)
return final_output_path
except subprocess.CalledProcessError as e:
raise gr.Error(f"FFmpeg falhou ao unir os vídeos: {e.stderr}")
def editor_magic(video_path: str, fragment_index: int):
print(f"--- [ADUC-SDR] Editor (FFmpeg) trabalhando no Fragmento {fragment_index}... ---")
output_image_path = os.path.join(WORKSPACE_DIR, f"last_frame_frag_{fragment_index}.jpg")
if not video_path or not os.path.exists(video_path):
raise gr.Error(f"Erro Interno: O vídeo do fragmento {fragment_index} não foi encontrado para extrair o frame.")
try:
command_probe = f"ffprobe -v error -count_frames -select_streams v:0 -show_entries stream=nb_read_frames -of default=noprint_wrappers=1:nokey=1 \"{video_path}\""
result_probe = subprocess.run(command_probe, shell=True, check=True, capture_output=True, text=True)
total_frames = int(result_probe.stdout.strip())
last_frame_index = total_frames - 1
if last_frame_index < 0:
raise gr.Error("FFprobe retornou um número de frames inválido.")
command_extract = f"ffmpeg -y -i \"{video_path}\" -vf \"select='eq(n,{last_frame_index})'\" -vsync vfr -frames:v 1 \"{output_image_path}\""
subprocess.run(command_extract, shell=True, check=True, capture_output=True, text=True)
print(f"Último frame ({last_frame_index}) extraído com sucesso para: {output_image_path}")
return output_image_path
except (subprocess.CalledProcessError, ValueError) as e:
error_message = f"FFmpeg/FFprobe falhou ao extrair último frame: {e}"
if hasattr(e, 'stderr'):
error_message += f"\nDetalhes: {e.stderr}"
raise gr.Error(error_message)
def run_sequential_production(num_fragments, user_prompt, ref_image_path, seed, cfg, progress=gr.Progress()):
if not ref_image_path: raise gr.Error("Por favor, forneça uma imagem de referência.")
video_fragments = []
log_history = "Iniciando Produção Sequencial com Memória Contextual...\n"
prompt_history = []
image_anterior_path = ref_image_path
for i in range(int(num_fragments)):
progress(i / num_fragments, desc=f"Gerando Fragmento {i+1}/{num_fragments}")
log_history += f"\n--- FRAGMENTO {i+1} ---\n"
yield log_history, None, image_anterior_path, None
log_history += "Fotógrafo (Gemini) criando prompt da próxima cena (com memória)...\n"
yield log_history, None, image_anterior_path, None
prompt_history_str = "\n".join([f"- Cena {idx+1}: {p}" for idx, p in enumerate(prompt_history)])
if not prompt_history_str:
prompt_history_str = "Esta é a primeira cena."
prompt_proxima_cena = get_next_scene_prompt(user_prompt, prompt_history_str, image_anterior_path)
prompt_history.append(prompt_proxima_cena)
log_history += f"Pintor (DreamO) renderizando a próxima cena: '{prompt_proxima_cena}'...\n"
yield log_history, None, image_anterior_path, None
image_atual_path = os.path.join(WORKSPACE_DIR, f"keyframe_{i+1}.png")
with Image.open(image_anterior_path) as img: width, height = img.size
width, height = (width // 32) * 32, (height // 32) * 32
dreamo_generator_singleton.to_gpu()
try:
image_atual = dreamo_generator_singleton.generate_image_with_gpu_management(
ref_image1_np=np.array(Image.open(image_anterior_path).convert("RGB")), ref_task1="style",
ref_image2_np=np.array(Image.open(image_anterior_path).convert("RGB")), ref_task2="ip",
prompt=prompt_proxima_cena, width=width, height=height
)
image_atual.save(image_atual_path)
log_history += "Nova imagem de keyframe gerada.\n"
yield log_history, None, image_anterior_path, image_atual_path
finally:
dreamo_generator_singleton.to_cpu()
log_history += "Diretor de Cena (Gemini) criando prompt de movimento...\n"
yield log_history, None, image_anterior_path, image_atual_path
prompt_movimento = get_motion_prompt_for_pair(user_prompt, image_anterior_path, image_atual_path)
log_history += f"Animador (LTX) gerando vídeo: '{prompt_movimento}'...\n"
yield log_history, None, image_anterior_path, image_atual_path
n_val = round((float(VIDEO_TOTAL_FRAMES) - 1.0) / 8.0)
actual_num_frames = int(n_val * 8 + 1)
end_frame_index = actual_num_frames - 1
conditioning_items_data = [(image_anterior_path, 0, 1.0), (image_atual_path, end_frame_index, 1.0)]
fragment_path = run_ltx_animation(i + 1, prompt_movimento, conditioning_items_data, width, height, seed, cfg, progress)
video_fragments.append(fragment_path)
log_history += "Editor (FFmpeg) extraindo último frame para continuidade...\n"
yield log_history, None, image_anterior_path, image_atual_path
image_anterior_path = editor_magic(fragment_path, i + 1)
log_history += "\nConcatenando vídeo final...\n"
yield log_history, None, None, None
final_video_path = concatenate_masterpiece(video_fragments, progress)
log_history += "\nProdução Concluída! Vídeo final pronto."
yield log_history, final_video_path, None, None
# --- Ato 4: A Interface com o Mundo (Gradio UI) ---
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# LTX Video - ADUC-SDR v4.0 (Compilação Corrigida)\n*By Carlex & Gemini & DreamO*")
if os.path.exists(WORKSPACE_DIR): shutil.rmtree(WORKSPACE_DIR)
os.makedirs(WORKSPACE_DIR)
with gr.Row():
with gr.Column(scale=1):
num_fragments_input = gr.Slider(1, 10, 4, step=1, label="Número de Fragmentos a Gerar")
prompt_input = gr.Textbox(label="Ideia Geral (Prompt)")
image_input = gr.Image(type="filepath", label="Imagem de Referência Inicial")
seed_number = gr.Number(42, label="Seed")
cfg_slider = gr.Slider(1.0, 10.0, 2.5, step=0.1, label="CFG")
run_button = gr.Button("▶️ Gerar Vídeo Completo", variant="primary")
with gr.Column(scale=2):
with gr.Row():
start_keyframe_display = gr.Image(label="Keyframe Inicial da Animação", interactive=False)
end_keyframe_display = gr.Image(label="Keyframe Final da Animação", interactive=False)
log_output = gr.Textbox(label="Diário de Bordo da Produção", lines=10, interactive=False)
video_output = gr.Video(label="Vídeo Final")
run_button.click(
fn=run_sequential_production,
inputs=[num_fragments_input, prompt_input, image_input, seed_number, cfg_slider],
outputs=[log_output, video_output, start_keyframe_display, end_keyframe_display]
)
if __name__ == "__main__":
demo.queue().launch(server_name="0.0.0.0", share=True)