|
|
|
|
|
|
|
|
|
import gradio as gr |
|
import torch |
|
import os |
|
import yaml |
|
from PIL import Image |
|
import shutil |
|
import gc |
|
import subprocess |
|
import math |
|
import google.generativeai as genai |
|
import numpy as np |
|
import imageio |
|
from pathlib import Path |
|
import huggingface_hub |
|
import json |
|
|
|
from inference import create_ltx_video_pipeline, load_image_to_tensor_with_resize_and_crop, ConditioningItem, calculate_padding |
|
from dreamo_helpers import dreamo_generator_singleton |
|
|
|
|
|
config_file_path = "configs/ltxv-13b-0.9.8-distilled.yaml" |
|
with open(config_file_path, "r") as file: |
|
PIPELINE_CONFIG_YAML = yaml.safe_load(file) |
|
|
|
LTX_REPO = "Lightricks/LTX-Video" |
|
models_dir = "downloaded_models_gradio_cpu_init" |
|
Path(models_dir).mkdir(parents=True, exist_ok=True) |
|
WORKSPACE_DIR = "aduc_workspace" |
|
GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY") |
|
|
|
VIDEO_FPS = 30 |
|
VIDEO_DURATION_SECONDS = 3 |
|
VIDEO_TOTAL_FRAMES = VIDEO_DURATION_SECONDS * VIDEO_FPS |
|
|
|
print("Baixando e criando pipelines LTX na CPU...") |
|
distilled_model_actual_path = huggingface_hub.hf_hub_download(repo_id=LTX_REPO, filename=PIPELINE_CONFIG_YAML["checkpoint_path"], local_dir=models_dir, local_dir_use_symlinks=False) |
|
pipeline_instance_original = create_ltx_video_pipeline(ckpt_path=distilled_model_actual_path, precision=PIPELINE_CONFIG_YAML["precision"], text_encoder_model_name_or_path=PIPELINE_CONFIG_YAML["text_encoder_model_name_or_path"], sampler=PIPELINE_CONFIG_YAML["sampler"], device='cpu') |
|
print("Modelos LTX prontos (na CPU).") |
|
|
|
|
|
pipeline_instance = pipeline_instance_original |
|
|
|
if torch.cuda.is_available(): |
|
print("Compilando o modelo LTX para otimização de desempenho (torch.compile)...") |
|
try: |
|
|
|
pipeline_instance = torch.compile(pipeline_instance_original, mode="reduce-overhead", fullgraph=True) |
|
print("Modelo compilado com sucesso.") |
|
except Exception as e: |
|
print(f"Falha ao compilar o modelo, usando a versão não compilada. Erro: {e}") |
|
pipeline_instance = pipeline_instance_original |
|
|
|
|
|
|
|
|
|
def get_next_scene_prompt(user_prompt: str, prompt_history_str: str, previous_image_path: str): |
|
genai.configure(api_key=GEMINI_API_KEY) |
|
script_dir = os.path.dirname(os.path.abspath(__file__)) |
|
prompt_file_path = os.path.join(script_dir, "prompts", "photographer_sequential_prompt.txt") |
|
with open(prompt_file_path, "r", encoding="utf-8") as f: template = f.read() |
|
|
|
model_prompt = template.format(user_prompt=user_prompt, prompt_history=prompt_history_str) |
|
img = Image.open(previous_image_path) |
|
model = genai.GenerativeModel('gemini-2.0-flash') |
|
response = model.generate_content([model_prompt, img]) |
|
|
|
try: |
|
cleaned_response = response.text.strip().replace("```json", "").replace("```", "") |
|
data = json.loads(cleaned_response) |
|
return data.get("next_scene_prompt") |
|
except Exception as e: |
|
raise gr.Error(f"Fotógrafo Sequencial falhou: {e}. Resposta: {response.text}") |
|
|
|
def get_motion_prompt_for_pair(user_prompt: str, start_image_path: str, end_image_path: str): |
|
genai.configure(api_key=GEMINI_API_KEY) |
|
script_dir = os.path.dirname(os.path.abspath(__file__)) |
|
prompt_file_path = os.path.join(script_dir, "prompts", "director_sequential_prompt.txt") |
|
with open(prompt_file_path, "r", encoding="utf-8") as f: template = f.read() |
|
|
|
model_prompt = template.format(user_prompt=user_prompt) |
|
img1 = Image.open(start_image_path) |
|
img2 = Image.open(end_image_path) |
|
model = genai.GenerativeModel('gemini-2.0-flash') |
|
response = model.generate_content([model_prompt, img1, img2]) |
|
|
|
try: |
|
cleaned_response = response.text.strip().replace("```json", "").replace("```", "") |
|
data = json.loads(cleaned_response) |
|
return data.get("motion_prompt") |
|
except Exception as e: |
|
raise gr.Error(f"Diretor Sequencial falhou: {e}. Resposta: {response.text}") |
|
|
|
def run_ltx_animation(current_fragment_index, motion_prompt, conditioning_items_data, width, height, seed, cfg, progress=gr.Progress()): |
|
progress(0, desc=f"[Animador LTX] Gerando Cena {current_fragment_index}..."); |
|
output_path = os.path.join(WORKSPACE_DIR, f"fragment_{current_fragment_index}.mp4"); |
|
target_device = 'cuda' if torch.cuda.is_available() else 'cpu' |
|
|
|
result_tensor = None |
|
try: |
|
pipeline_instance.to(target_device) |
|
|
|
conditioning_items = [] |
|
for (path, start_frame, strength) in conditioning_items_data: |
|
tensor = load_image_to_tensor_with_resize_and_crop(path, height, width) |
|
conditioning_items.append(ConditioningItem(tensor.to(target_device), start_frame, strength)) |
|
|
|
n_val = round((float(VIDEO_TOTAL_FRAMES) - 1.0) / 8.0) |
|
actual_num_frames = int(n_val * 8 + 1) |
|
padded_h, padded_w = ((height - 1) // 32 + 1) * 32, ((width - 1) // 32 + 1) * 32 |
|
padding_vals = calculate_padding(height, width, padded_h, padded_w) |
|
for cond_item in conditioning_items: cond_item.media_item = torch.nn.functional.pad(cond_item.media_item, padding_vals) |
|
|
|
first_pass_config = PIPELINE_CONFIG_YAML.get("first_pass", {}) |
|
|
|
kwargs = { |
|
"prompt": motion_prompt, "negative_prompt": "blurry, distorted, bad quality, artifacts", |
|
"height": padded_h, "width": padded_w, "num_frames": actual_num_frames, "frame_rate": VIDEO_FPS, |
|
"generator": torch.Generator(device=target_device).manual_seed(int(seed) + current_fragment_index), |
|
"output_type": "pt", "guidance_scale": float(cfg), "timesteps": first_pass_config.get("timesteps"), |
|
"stg_scale": first_pass_config.get("stg_scale"), "rescaling_scale": first_pass_config.get("rescaling_scale"), |
|
"skip_block_list": first_pass_config.get("skip_block_list"), "conditioning_items": conditioning_items, |
|
"decode_timestep": PIPELINE_CONFIG_YAML.get("decode_timestep"), "decode_noise_scale": PIPELINE_CONFIG_YAML.get("decode_noise_scale"), |
|
"stochastic_sampling": PIPELINE_CONFIG_YAML.get("stochastic_sampling"), "image_cond_noise_scale": 0.15, |
|
"is_video": True, "vae_per_channel_normalize": True, |
|
"mixed_precision": (PIPELINE_CONFIG_YAML.get("precision") == "mixed_precision"), "offload_to_cpu": False, "enhance_prompt": False |
|
} |
|
|
|
result_tensor = pipeline_instance(**kwargs).images |
|
|
|
pad_l, pad_r, pad_t, pad_b = map(int, padding_vals) |
|
slice_h = -pad_b if pad_b > 0 else None |
|
slice_w = -pad_r if pad_r > 0 else None |
|
|
|
cropped_tensor = result_tensor[:, :, :VIDEO_TOTAL_FRAMES, pad_t:slice_h, pad_l:slice_w] |
|
video_np = (cropped_tensor[0].permute(1, 2, 3, 0).cpu().float().numpy() * 255).astype(np.uint8) |
|
|
|
with imageio.get_writer(output_path, fps=VIDEO_FPS, codec='libx264', quality=8) as writer: |
|
for i, frame in enumerate(video_np): |
|
progress(i / len(video_np), desc=f"Renderizando frame {i+1}/{len(video_np)}...") |
|
writer.append_data(frame) |
|
return output_path |
|
finally: |
|
pipeline_instance.to('cpu') |
|
|
|
if result_tensor is not None: del result_tensor |
|
gc.collect() |
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
print("Memória do Animador LTX liberada.") |
|
|
|
def concatenate_masterpiece(fragment_paths: list, progress=gr.Progress()): |
|
if not fragment_paths: return None |
|
progress(0.5, desc="Montando a obra-prima final..."); |
|
list_file_path = os.path.join(WORKSPACE_DIR, "concat_list.txt") |
|
final_output_path = os.path.join(WORKSPACE_DIR, "obra_prima_final.mp4") |
|
with open(list_file_path, "w") as f: |
|
for path in fragment_paths: f.write(f"file '{os.path.abspath(path)}'\n") |
|
command = f"ffmpeg -y -f concat -safe 0 -i {list_file_path} -c copy {final_output_path}" |
|
try: |
|
subprocess.run(command, shell=True, check=True, capture_output=True, text=True) |
|
return final_output_path |
|
except subprocess.CalledProcessError as e: |
|
raise gr.Error(f"FFmpeg falhou ao unir os vídeos: {e.stderr}") |
|
|
|
def editor_magic(video_path: str, fragment_index: int): |
|
print(f"--- [ADUC-SDR] Editor (FFmpeg) trabalhando no Fragmento {fragment_index}... ---") |
|
output_image_path = os.path.join(WORKSPACE_DIR, f"last_frame_frag_{fragment_index}.jpg") |
|
|
|
if not video_path or not os.path.exists(video_path): |
|
raise gr.Error(f"Erro Interno: O vídeo do fragmento {fragment_index} não foi encontrado para extrair o frame.") |
|
|
|
try: |
|
command_probe = f"ffprobe -v error -count_frames -select_streams v:0 -show_entries stream=nb_read_frames -of default=noprint_wrappers=1:nokey=1 \"{video_path}\"" |
|
result_probe = subprocess.run(command_probe, shell=True, check=True, capture_output=True, text=True) |
|
total_frames = int(result_probe.stdout.strip()) |
|
last_frame_index = total_frames - 1 |
|
|
|
if last_frame_index < 0: |
|
raise gr.Error("FFprobe retornou um número de frames inválido.") |
|
|
|
command_extract = f"ffmpeg -y -i \"{video_path}\" -vf \"select='eq(n,{last_frame_index})'\" -vsync vfr -frames:v 1 \"{output_image_path}\"" |
|
subprocess.run(command_extract, shell=True, check=True, capture_output=True, text=True) |
|
|
|
print(f"Último frame ({last_frame_index}) extraído com sucesso para: {output_image_path}") |
|
return output_image_path |
|
except (subprocess.CalledProcessError, ValueError) as e: |
|
error_message = f"FFmpeg/FFprobe falhou ao extrair último frame: {e}" |
|
if hasattr(e, 'stderr'): |
|
error_message += f"\nDetalhes: {e.stderr}" |
|
raise gr.Error(error_message) |
|
|
|
def run_sequential_production(num_fragments, user_prompt, ref_image_path, seed, cfg, progress=gr.Progress()): |
|
if not ref_image_path: raise gr.Error("Por favor, forneça uma imagem de referência.") |
|
|
|
video_fragments = [] |
|
log_history = "Iniciando Produção Sequencial com Memória Contextual...\n" |
|
|
|
prompt_history = [] |
|
image_anterior_path = ref_image_path |
|
|
|
for i in range(int(num_fragments)): |
|
progress(i / num_fragments, desc=f"Gerando Fragmento {i+1}/{num_fragments}") |
|
log_history += f"\n--- FRAGMENTO {i+1} ---\n" |
|
yield log_history, None, image_anterior_path, None |
|
|
|
log_history += "Fotógrafo (Gemini) criando prompt da próxima cena (com memória)...\n" |
|
yield log_history, None, image_anterior_path, None |
|
|
|
prompt_history_str = "\n".join([f"- Cena {idx+1}: {p}" for idx, p in enumerate(prompt_history)]) |
|
if not prompt_history_str: |
|
prompt_history_str = "Esta é a primeira cena." |
|
|
|
prompt_proxima_cena = get_next_scene_prompt(user_prompt, prompt_history_str, image_anterior_path) |
|
prompt_history.append(prompt_proxima_cena) |
|
|
|
log_history += f"Pintor (DreamO) renderizando a próxima cena: '{prompt_proxima_cena}'...\n" |
|
yield log_history, None, image_anterior_path, None |
|
|
|
image_atual_path = os.path.join(WORKSPACE_DIR, f"keyframe_{i+1}.png") |
|
with Image.open(image_anterior_path) as img: width, height = img.size |
|
width, height = (width // 32) * 32, (height // 32) * 32 |
|
|
|
dreamo_generator_singleton.to_gpu() |
|
try: |
|
image_atual = dreamo_generator_singleton.generate_image_with_gpu_management( |
|
ref_image1_np=np.array(Image.open(image_anterior_path).convert("RGB")), ref_task1="style", |
|
ref_image2_np=np.array(Image.open(image_anterior_path).convert("RGB")), ref_task2="ip", |
|
prompt=prompt_proxima_cena, width=width, height=height |
|
) |
|
image_atual.save(image_atual_path) |
|
log_history += "Nova imagem de keyframe gerada.\n" |
|
yield log_history, None, image_anterior_path, image_atual_path |
|
finally: |
|
dreamo_generator_singleton.to_cpu() |
|
|
|
log_history += "Diretor de Cena (Gemini) criando prompt de movimento...\n" |
|
yield log_history, None, image_anterior_path, image_atual_path |
|
prompt_movimento = get_motion_prompt_for_pair(user_prompt, image_anterior_path, image_atual_path) |
|
|
|
log_history += f"Animador (LTX) gerando vídeo: '{prompt_movimento}'...\n" |
|
yield log_history, None, image_anterior_path, image_atual_path |
|
|
|
n_val = round((float(VIDEO_TOTAL_FRAMES) - 1.0) / 8.0) |
|
actual_num_frames = int(n_val * 8 + 1) |
|
end_frame_index = actual_num_frames - 1 |
|
conditioning_items_data = [(image_anterior_path, 0, 1.0), (image_atual_path, end_frame_index, 1.0)] |
|
|
|
fragment_path = run_ltx_animation(i + 1, prompt_movimento, conditioning_items_data, width, height, seed, cfg, progress) |
|
video_fragments.append(fragment_path) |
|
|
|
log_history += "Editor (FFmpeg) extraindo último frame para continuidade...\n" |
|
yield log_history, None, image_anterior_path, image_atual_path |
|
image_anterior_path = editor_magic(fragment_path, i + 1) |
|
|
|
log_history += "\nConcatenando vídeo final...\n" |
|
yield log_history, None, None, None |
|
final_video_path = concatenate_masterpiece(video_fragments, progress) |
|
|
|
log_history += "\nProdução Concluída! Vídeo final pronto." |
|
yield log_history, final_video_path, None, None |
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Soft()) as demo: |
|
gr.Markdown("# LTX Video - ADUC-SDR v4.0 (Compilação Corrigida)\n*By Carlex & Gemini & DreamO*") |
|
|
|
if os.path.exists(WORKSPACE_DIR): shutil.rmtree(WORKSPACE_DIR) |
|
os.makedirs(WORKSPACE_DIR) |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
num_fragments_input = gr.Slider(1, 10, 4, step=1, label="Número de Fragmentos a Gerar") |
|
prompt_input = gr.Textbox(label="Ideia Geral (Prompt)") |
|
image_input = gr.Image(type="filepath", label="Imagem de Referência Inicial") |
|
seed_number = gr.Number(42, label="Seed") |
|
cfg_slider = gr.Slider(1.0, 10.0, 2.5, step=0.1, label="CFG") |
|
run_button = gr.Button("▶️ Gerar Vídeo Completo", variant="primary") |
|
with gr.Column(scale=2): |
|
with gr.Row(): |
|
start_keyframe_display = gr.Image(label="Keyframe Inicial da Animação", interactive=False) |
|
end_keyframe_display = gr.Image(label="Keyframe Final da Animação", interactive=False) |
|
log_output = gr.Textbox(label="Diário de Bordo da Produção", lines=10, interactive=False) |
|
video_output = gr.Video(label="Vídeo Final") |
|
|
|
run_button.click( |
|
fn=run_sequential_production, |
|
inputs=[num_fragments_input, prompt_input, image_input, seed_number, cfg_slider], |
|
outputs=[log_output, video_output, start_keyframe_display, end_keyframe_display] |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.queue().launch(server_name="0.0.0.0", share=True) |