CrossFlow / app.py
QHL067's picture
add
09d905b
raw
history blame
16 kB
# import gradio as gr
# from absl import flags
# from absl import app
# from ml_collections import config_flags
# import os
# import spaces #[uncomment to use ZeroGPU]
# import torch
# import os
# import random
# import numpy as np
# import torch
# import torch.nn.functional as F
# from torchvision.utils import save_image
# from huggingface_hub import hf_hub_download
# from absl import logging
# import ml_collections
# from diffusion.flow_matching import ODEEulerFlowMatchingSolver
# import utils
# import libs.autoencoder
# from libs.clip import FrozenCLIPEmbedder
# from configs import t2i_512px_clip_dimr
# def unpreprocess(x: torch.Tensor) -> torch.Tensor:
# x = 0.5 * (x + 1.0)
# x.clamp_(0.0, 1.0)
# return x
# def cosine_similarity_torch(latent1: torch.Tensor, latent2: torch.Tensor) -> torch.Tensor:
# latent1_flat = latent1.view(-1)
# latent2_flat = latent2.view(-1)
# cosine_similarity = F.cosine_similarity(
# latent1_flat.unsqueeze(0), latent2_flat.unsqueeze(0), dim=1
# )
# return cosine_similarity
# def kl_divergence(latent1: torch.Tensor, latent2: torch.Tensor) -> torch.Tensor:
# latent1_prob = F.softmax(latent1, dim=-1)
# latent2_prob = F.softmax(latent2, dim=-1)
# latent1_log_prob = torch.log(latent1_prob)
# kl_div = F.kl_div(latent1_log_prob, latent2_prob, reduction="batchmean")
# return kl_div
# def batch_decode(_z: torch.Tensor, decode, batch_size: int = 10) -> torch.Tensor:
# num_samples = _z.size(0)
# decoded_batches = []
# for i in range(0, num_samples, batch_size):
# batch = _z[i : i + batch_size]
# decoded_batch = decode(batch)
# decoded_batches.append(decoded_batch)
# return torch.cat(decoded_batches, dim=0)
# def get_caption(llm: str, text_model, prompt_dict: dict, batch_size: int):
# if batch_size == 3:
# # Only addition or only subtraction mode.
# assert len(prompt_dict) == 2, "Expected 2 prompts for batch_size 3."
# batch_prompts = list(prompt_dict.values()) + [" "]
# elif batch_size == 4:
# # Addition and subtraction mode.
# assert len(prompt_dict) == 3, "Expected 3 prompts for batch_size 4."
# batch_prompts = list(prompt_dict.values()) + [" "]
# elif batch_size >= 5:
# # Linear interpolation mode.
# assert len(prompt_dict) == 2, "Expected 2 prompts for linear interpolation."
# batch_prompts = [prompt_dict["prompt_1"]] + [" "] * (batch_size - 2) + [prompt_dict["prompt_2"]]
# else:
# raise ValueError(f"Unsupported batch_size: {batch_size}")
# if llm == "clip":
# latent, latent_and_others = text_model.encode(batch_prompts)
# context = latent_and_others["token_embedding"].detach()
# elif llm == "t5":
# latent, latent_and_others = text_model.get_text_embeddings(batch_prompts)
# context = (latent_and_others["token_embedding"] * 10.0).detach()
# else:
# raise NotImplementedError(f"Language model {llm} not supported.")
# token_mask = latent_and_others["token_mask"].detach()
# tokens = latent_and_others["tokens"].detach()
# captions = batch_prompts
# return context, token_mask, tokens, captions
# # Load configuration and initialize models.
# config_dict = t2i_512px_clip_dimr.get_config()
# config = ml_collections.ConfigDict(config_dict)
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# logging.info(f"Using device: {device}")
# # Freeze configuration.
# config = ml_collections.FrozenConfigDict(config)
# torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32
# MAX_SEED = np.iinfo(np.int32).max
# MAX_IMAGE_SIZE = 1024 # Currently not used.
# # Load the main diffusion model.
# repo_id = "QHL067/CrossFlow"
# filename = "pretrained_models/t2i_512px_clip_dimr.pth"
# checkpoint_path = hf_hub_download(repo_id=repo_id, filename=filename)
# nnet = utils.get_nnet(**config.nnet)
# nnet = nnet.to(device)
# state_dict = torch.load(checkpoint_path, map_location=device)
# nnet.load_state_dict(state_dict)
# nnet.eval()
# # Initialize text model.
# llm = "clip"
# clip = FrozenCLIPEmbedder()
# clip.eval()
# clip.to(device)
# # Load autoencoder.
# autoencoder = libs.autoencoder.get_model(**config.autoencoder)
# autoencoder.to(device)
# @torch.cuda.amp.autocast()
# def encode(_batch: torch.Tensor) -> torch.Tensor:
# """Encode a batch of images using the autoencoder."""
# return autoencoder.encode(_batch)
# @torch.cuda.amp.autocast()
# def decode(_batch: torch.Tensor) -> torch.Tensor:
# """Decode a batch of latent vectors using the autoencoder."""
# return autoencoder.decode(_batch)
# @spaces.GPU #[uncomment to use ZeroGPU]
# def infer(
# prompt1,
# prompt2,
# seed,
# randomize_seed,
# guidance_scale,
# num_inference_steps,
# num_of_interpolation,
# save_gpu_memory=True,
# progress=gr.Progress(track_tqdm=True),
# ):
# if randomize_seed:
# seed = random.randint(0, MAX_SEED)
# torch.manual_seed(seed)
# if device.type == "cuda":
# torch.cuda.manual_seed_all(seed)
# # Only support interpolation in this implementation.
# prompt_dict = {"prompt_1": prompt1, "prompt_2": prompt2}
# for key, value in prompt_dict.items():
# assert value is not None, f"{key} must not be None."
# assert num_of_interpolation >= 5, "For linear interpolation, please sample at least five images."
# # Get text embeddings and tokens.
# _context, _token_mask, _token, _caption = get_caption(
# llm, clip, prompt_dict=prompt_dict, batch_size=num_of_interpolation
# )
# with torch.no_grad():
# _z_gaussian = torch.randn(num_of_interpolation, *config.z_shape, device=device)
# _z_x0, _mu, _log_var = nnet(
# _context, text_encoder=True, shape=_z_gaussian.shape, mask=_token_mask
# )
# _z_init = _z_x0.reshape(_z_gaussian.shape)
# # Prepare the initial latent representations based on the number of interpolations.
# if num_of_interpolation == 3:
# # Addition or subtraction mode.
# if config.prompt_a is not None:
# assert config.prompt_s is None, "Only one of prompt_a or prompt_s should be provided."
# z_init_temp = _z_init[0] + _z_init[1]
# elif config.prompt_s is not None:
# assert config.prompt_a is None, "Only one of prompt_a or prompt_s should be provided."
# z_init_temp = _z_init[0] - _z_init[1]
# else:
# raise NotImplementedError("Either prompt_a or prompt_s must be provided for 3-sample mode.")
# mean = z_init_temp.mean()
# std = z_init_temp.std()
# _z_init[2] = (z_init_temp - mean) / std
# elif num_of_interpolation == 4:
# z_init_temp = _z_init[0] + _z_init[1] - _z_init[2]
# mean = z_init_temp.mean()
# std = z_init_temp.std()
# _z_init[3] = (z_init_temp - mean) / std
# elif num_of_interpolation >= 5:
# tensor_a = _z_init[0]
# tensor_b = _z_init[-1]
# num_interpolations = num_of_interpolation - 2
# interpolations = [
# tensor_a + (tensor_b - tensor_a) * (i / (num_interpolations + 1))
# for i in range(1, num_interpolations + 1)
# ]
# _z_init = torch.stack([tensor_a] + interpolations + [tensor_b], dim=0)
# else:
# raise ValueError("Unsupported number of interpolations.")
# assert guidance_scale > 1, "Guidance scale must be greater than 1."
# has_null_indicator = hasattr(config.nnet.model_args, "cfg_indicator")
# ode_solver = ODEEulerFlowMatchingSolver(
# nnet,
# bdv_model_fn=None,
# step_size_type="step_in_dsigma",
# guidance_scale=guidance_scale,
# )
# _z, _ = ode_solver.sample(
# x_T=_z_init,
# batch_size=num_of_interpolation,
# sample_steps=num_inference_steps,
# unconditional_guidance_scale=guidance_scale,
# has_null_indicator=has_null_indicator,
# )
# if save_gpu_memory:
# image_unprocessed = batch_decode(_z, decode)
# else:
# image_unprocessed = decode(_z)
# samples = unpreprocess(image_unprocessed).contiguous()[0]
# # return samples, seed
# return seed
# # examples = [
# # "Astronaut in a jungle, cold color palette, muted colors, detailed, 8k",
# # "An astronaut riding a green horse",
# # "A delicious ceviche cheesecake slice",
# # ]
# examples = [
# ["A dog cooking dinner in the kitchen", "An orange cat wearing sunglasses on a ship"],
# ]
# css = """
# #col-container {
# margin: 0 auto;
# max-width: 640px;
# }
# """
# with gr.Blocks(css=css) as demo:
# with gr.Column(elem_id="col-container"):
# gr.Markdown(" # CrossFlow")
# gr.Markdown(" CrossFlow directly transforms text representations into images for text-to-image generation, enabling interpolation in the input text latent space.")
# with gr.Row():
# prompt1 = gr.Text(
# label="Prompt_1",
# show_label=False,
# max_lines=1,
# placeholder="Enter your prompt for the first image",
# container=False,
# )
# with gr.Row():
# prompt2 = gr.Text(
# label="Prompt_2",
# show_label=False,
# max_lines=1,
# placeholder="Enter your prompt for the second image",
# container=False,
# )
# with gr.Row():
# run_button = gr.Button("Run", scale=0, variant="primary")
# result = gr.Image(label="Result", show_label=False)
# with gr.Accordion("Advanced Settings", open=False):
# seed = gr.Slider(
# label="Seed",
# minimum=0,
# maximum=MAX_SEED,
# step=1,
# value=0,
# )
# randomize_seed = gr.Checkbox(label="Randomize seed", value=True)
# with gr.Row():
# guidance_scale = gr.Slider(
# label="Guidance scale",
# minimum=0.0,
# maximum=10.0,
# step=0.1,
# value=7.0, # Replace with defaults that work for your model
# )
# with gr.Row():
# num_inference_steps = gr.Slider(
# label="Number of inference steps",
# minimum=1,
# maximum=50,
# step=1,
# value=50, # Replace with defaults that work for your model
# )
# with gr.Row():
# num_of_interpolation = gr.Slider(
# label="Number of images for interpolation",
# minimum=5,
# maximum=50,
# step=1,
# value=10, # Replace with defaults that work for your model
# )
# gr.Examples(examples=examples, inputs=[prompt1, prompt2])
# gr.on(
# triggers=[run_button.click, prompt1.submit, prompt2.submit],
# fn=infer,
# inputs=[
# prompt1,
# prompt2,
# seed,
# randomize_seed,
# guidance_scale,
# num_inference_steps,
# num_of_interpolation,
# ],
# # outputs=[result, seed],
# outputs=[seed],
# )
# if __name__ == "__main__":
# demo.launch()
import gradio as gr
import numpy as np
import random
# import spaces #[uncomment to use ZeroGPU]
from diffusers import DiffusionPipeline
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
model_repo_id = "stabilityai/sdxl-turbo" # Replace to the model you would like to use
if torch.cuda.is_available():
torch_dtype = torch.float16
else:
torch_dtype = torch.float32
pipe = DiffusionPipeline.from_pretrained(model_repo_id, torch_dtype=torch_dtype)
pipe = pipe.to(device)
MAX_SEED = np.iinfo(np.int32).max
MAX_IMAGE_SIZE = 1024
# @spaces.GPU #[uncomment to use ZeroGPU]
def infer(
prompt,
negative_prompt,
seed,
randomize_seed,
width,
height,
guidance_scale,
num_inference_steps,
progress=gr.Progress(track_tqdm=True),
):
if randomize_seed:
seed = random.randint(0, MAX_SEED)
generator = torch.Generator().manual_seed(seed)
image = pipe(
prompt=prompt,
negative_prompt=negative_prompt,
guidance_scale=guidance_scale,
num_inference_steps=num_inference_steps,
width=width,
height=height,
generator=generator,
).images[0]
print('image.shape')
print(image.shape)
return image, seed
examples = [
"Astronaut in a jungle, cold color palette, muted colors, detailed, 8k",
"An astronaut riding a green horse",
"A delicious ceviche cheesecake slice",
]
css = """
#col-container {
margin: 0 auto;
max-width: 640px;
}
"""
with gr.Blocks(css=css) as demo:
with gr.Column(elem_id="col-container"):
gr.Markdown(" # Text-to-Image Gradio Template")
with gr.Row():
prompt = gr.Text(
label="Prompt",
show_label=False,
max_lines=1,
placeholder="Enter your prompt",
container=False,
)
run_button = gr.Button("Run", scale=0, variant="primary")
result = gr.Image(label="Result", show_label=False)
with gr.Accordion("Advanced Settings", open=False):
negative_prompt = gr.Text(
label="Negative prompt",
max_lines=1,
placeholder="Enter a negative prompt",
visible=False,
)
seed = gr.Slider(
label="Seed",
minimum=0,
maximum=MAX_SEED,
step=1,
value=0,
)
randomize_seed = gr.Checkbox(label="Randomize seed", value=True)
with gr.Row():
width = gr.Slider(
label="Width",
minimum=256,
maximum=MAX_IMAGE_SIZE,
step=32,
value=1024, # Replace with defaults that work for your model
)
height = gr.Slider(
label="Height",
minimum=256,
maximum=MAX_IMAGE_SIZE,
step=32,
value=1024, # Replace with defaults that work for your model
)
with gr.Row():
guidance_scale = gr.Slider(
label="Guidance scale",
minimum=0.0,
maximum=10.0,
step=0.1,
value=0.0, # Replace with defaults that work for your model
)
num_inference_steps = gr.Slider(
label="Number of inference steps",
minimum=1,
maximum=50,
step=1,
value=2, # Replace with defaults that work for your model
)
gr.Examples(examples=examples, inputs=[prompt])
gr.on(
triggers=[run_button.click, prompt.submit],
fn=infer,
inputs=[
prompt,
negative_prompt,
seed,
randomize_seed,
width,
height,
guidance_scale,
num_inference_steps,
],
outputs=[result, seed],
)
if __name__ == "__main__":
demo.launch()