# import gradio as gr | |
# from absl import flags | |
# from absl import app | |
# from ml_collections import config_flags | |
# import os | |
# import spaces #[uncomment to use ZeroGPU] | |
# import torch | |
# import os | |
# import random | |
# import numpy as np | |
# import torch | |
# import torch.nn.functional as F | |
# from torchvision.utils import save_image | |
# from huggingface_hub import hf_hub_download | |
# from absl import logging | |
# import ml_collections | |
# from diffusion.flow_matching import ODEEulerFlowMatchingSolver | |
# import utils | |
# import libs.autoencoder | |
# from libs.clip import FrozenCLIPEmbedder | |
# from configs import t2i_512px_clip_dimr | |
# def unpreprocess(x: torch.Tensor) -> torch.Tensor: | |
# x = 0.5 * (x + 1.0) | |
# x.clamp_(0.0, 1.0) | |
# return x | |
# def cosine_similarity_torch(latent1: torch.Tensor, latent2: torch.Tensor) -> torch.Tensor: | |
# latent1_flat = latent1.view(-1) | |
# latent2_flat = latent2.view(-1) | |
# cosine_similarity = F.cosine_similarity( | |
# latent1_flat.unsqueeze(0), latent2_flat.unsqueeze(0), dim=1 | |
# ) | |
# return cosine_similarity | |
# def kl_divergence(latent1: torch.Tensor, latent2: torch.Tensor) -> torch.Tensor: | |
# latent1_prob = F.softmax(latent1, dim=-1) | |
# latent2_prob = F.softmax(latent2, dim=-1) | |
# latent1_log_prob = torch.log(latent1_prob) | |
# kl_div = F.kl_div(latent1_log_prob, latent2_prob, reduction="batchmean") | |
# return kl_div | |
# def batch_decode(_z: torch.Tensor, decode, batch_size: int = 10) -> torch.Tensor: | |
# num_samples = _z.size(0) | |
# decoded_batches = [] | |
# for i in range(0, num_samples, batch_size): | |
# batch = _z[i : i + batch_size] | |
# decoded_batch = decode(batch) | |
# decoded_batches.append(decoded_batch) | |
# return torch.cat(decoded_batches, dim=0) | |
# def get_caption(llm: str, text_model, prompt_dict: dict, batch_size: int): | |
# if batch_size == 3: | |
# # Only addition or only subtraction mode. | |
# assert len(prompt_dict) == 2, "Expected 2 prompts for batch_size 3." | |
# batch_prompts = list(prompt_dict.values()) + [" "] | |
# elif batch_size == 4: | |
# # Addition and subtraction mode. | |
# assert len(prompt_dict) == 3, "Expected 3 prompts for batch_size 4." | |
# batch_prompts = list(prompt_dict.values()) + [" "] | |
# elif batch_size >= 5: | |
# # Linear interpolation mode. | |
# assert len(prompt_dict) == 2, "Expected 2 prompts for linear interpolation." | |
# batch_prompts = [prompt_dict["prompt_1"]] + [" "] * (batch_size - 2) + [prompt_dict["prompt_2"]] | |
# else: | |
# raise ValueError(f"Unsupported batch_size: {batch_size}") | |
# if llm == "clip": | |
# latent, latent_and_others = text_model.encode(batch_prompts) | |
# context = latent_and_others["token_embedding"].detach() | |
# elif llm == "t5": | |
# latent, latent_and_others = text_model.get_text_embeddings(batch_prompts) | |
# context = (latent_and_others["token_embedding"] * 10.0).detach() | |
# else: | |
# raise NotImplementedError(f"Language model {llm} not supported.") | |
# token_mask = latent_and_others["token_mask"].detach() | |
# tokens = latent_and_others["tokens"].detach() | |
# captions = batch_prompts | |
# return context, token_mask, tokens, captions | |
# # Load configuration and initialize models. | |
# config_dict = t2i_512px_clip_dimr.get_config() | |
# config = ml_collections.ConfigDict(config_dict) | |
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
# logging.info(f"Using device: {device}") | |
# # Freeze configuration. | |
# config = ml_collections.FrozenConfigDict(config) | |
# torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 | |
# MAX_SEED = np.iinfo(np.int32).max | |
# MAX_IMAGE_SIZE = 1024 # Currently not used. | |
# # Load the main diffusion model. | |
# repo_id = "QHL067/CrossFlow" | |
# filename = "pretrained_models/t2i_512px_clip_dimr.pth" | |
# checkpoint_path = hf_hub_download(repo_id=repo_id, filename=filename) | |
# nnet = utils.get_nnet(**config.nnet) | |
# nnet = nnet.to(device) | |
# state_dict = torch.load(checkpoint_path, map_location=device) | |
# nnet.load_state_dict(state_dict) | |
# nnet.eval() | |
# # Initialize text model. | |
# llm = "clip" | |
# clip = FrozenCLIPEmbedder() | |
# clip.eval() | |
# clip.to(device) | |
# # Load autoencoder. | |
# autoencoder = libs.autoencoder.get_model(**config.autoencoder) | |
# autoencoder.to(device) | |
# @torch.cuda.amp.autocast() | |
# def encode(_batch: torch.Tensor) -> torch.Tensor: | |
# """Encode a batch of images using the autoencoder.""" | |
# return autoencoder.encode(_batch) | |
# @torch.cuda.amp.autocast() | |
# def decode(_batch: torch.Tensor) -> torch.Tensor: | |
# """Decode a batch of latent vectors using the autoencoder.""" | |
# return autoencoder.decode(_batch) | |
# @spaces.GPU #[uncomment to use ZeroGPU] | |
# def infer( | |
# prompt1, | |
# prompt2, | |
# seed, | |
# randomize_seed, | |
# guidance_scale, | |
# num_inference_steps, | |
# num_of_interpolation, | |
# save_gpu_memory=True, | |
# progress=gr.Progress(track_tqdm=True), | |
# ): | |
# if randomize_seed: | |
# seed = random.randint(0, MAX_SEED) | |
# torch.manual_seed(seed) | |
# if device.type == "cuda": | |
# torch.cuda.manual_seed_all(seed) | |
# # Only support interpolation in this implementation. | |
# prompt_dict = {"prompt_1": prompt1, "prompt_2": prompt2} | |
# for key, value in prompt_dict.items(): | |
# assert value is not None, f"{key} must not be None." | |
# assert num_of_interpolation >= 5, "For linear interpolation, please sample at least five images." | |
# # Get text embeddings and tokens. | |
# _context, _token_mask, _token, _caption = get_caption( | |
# llm, clip, prompt_dict=prompt_dict, batch_size=num_of_interpolation | |
# ) | |
# with torch.no_grad(): | |
# _z_gaussian = torch.randn(num_of_interpolation, *config.z_shape, device=device) | |
# _z_x0, _mu, _log_var = nnet( | |
# _context, text_encoder=True, shape=_z_gaussian.shape, mask=_token_mask | |
# ) | |
# _z_init = _z_x0.reshape(_z_gaussian.shape) | |
# # Prepare the initial latent representations based on the number of interpolations. | |
# if num_of_interpolation == 3: | |
# # Addition or subtraction mode. | |
# if config.prompt_a is not None: | |
# assert config.prompt_s is None, "Only one of prompt_a or prompt_s should be provided." | |
# z_init_temp = _z_init[0] + _z_init[1] | |
# elif config.prompt_s is not None: | |
# assert config.prompt_a is None, "Only one of prompt_a or prompt_s should be provided." | |
# z_init_temp = _z_init[0] - _z_init[1] | |
# else: | |
# raise NotImplementedError("Either prompt_a or prompt_s must be provided for 3-sample mode.") | |
# mean = z_init_temp.mean() | |
# std = z_init_temp.std() | |
# _z_init[2] = (z_init_temp - mean) / std | |
# elif num_of_interpolation == 4: | |
# z_init_temp = _z_init[0] + _z_init[1] - _z_init[2] | |
# mean = z_init_temp.mean() | |
# std = z_init_temp.std() | |
# _z_init[3] = (z_init_temp - mean) / std | |
# elif num_of_interpolation >= 5: | |
# tensor_a = _z_init[0] | |
# tensor_b = _z_init[-1] | |
# num_interpolations = num_of_interpolation - 2 | |
# interpolations = [ | |
# tensor_a + (tensor_b - tensor_a) * (i / (num_interpolations + 1)) | |
# for i in range(1, num_interpolations + 1) | |
# ] | |
# _z_init = torch.stack([tensor_a] + interpolations + [tensor_b], dim=0) | |
# else: | |
# raise ValueError("Unsupported number of interpolations.") | |
# assert guidance_scale > 1, "Guidance scale must be greater than 1." | |
# has_null_indicator = hasattr(config.nnet.model_args, "cfg_indicator") | |
# ode_solver = ODEEulerFlowMatchingSolver( | |
# nnet, | |
# bdv_model_fn=None, | |
# step_size_type="step_in_dsigma", | |
# guidance_scale=guidance_scale, | |
# ) | |
# _z, _ = ode_solver.sample( | |
# x_T=_z_init, | |
# batch_size=num_of_interpolation, | |
# sample_steps=num_inference_steps, | |
# unconditional_guidance_scale=guidance_scale, | |
# has_null_indicator=has_null_indicator, | |
# ) | |
# if save_gpu_memory: | |
# image_unprocessed = batch_decode(_z, decode) | |
# else: | |
# image_unprocessed = decode(_z) | |
# samples = unpreprocess(image_unprocessed).contiguous()[0] | |
# # return samples, seed | |
# return seed | |
# # examples = [ | |
# # "Astronaut in a jungle, cold color palette, muted colors, detailed, 8k", | |
# # "An astronaut riding a green horse", | |
# # "A delicious ceviche cheesecake slice", | |
# # ] | |
# examples = [ | |
# ["A dog cooking dinner in the kitchen", "An orange cat wearing sunglasses on a ship"], | |
# ] | |
# css = """ | |
# #col-container { | |
# margin: 0 auto; | |
# max-width: 640px; | |
# } | |
# """ | |
# with gr.Blocks(css=css) as demo: | |
# with gr.Column(elem_id="col-container"): | |
# gr.Markdown(" # CrossFlow") | |
# gr.Markdown(" CrossFlow directly transforms text representations into images for text-to-image generation, enabling interpolation in the input text latent space.") | |
# with gr.Row(): | |
# prompt1 = gr.Text( | |
# label="Prompt_1", | |
# show_label=False, | |
# max_lines=1, | |
# placeholder="Enter your prompt for the first image", | |
# container=False, | |
# ) | |
# with gr.Row(): | |
# prompt2 = gr.Text( | |
# label="Prompt_2", | |
# show_label=False, | |
# max_lines=1, | |
# placeholder="Enter your prompt for the second image", | |
# container=False, | |
# ) | |
# with gr.Row(): | |
# run_button = gr.Button("Run", scale=0, variant="primary") | |
# result = gr.Image(label="Result", show_label=False) | |
# with gr.Accordion("Advanced Settings", open=False): | |
# seed = gr.Slider( | |
# label="Seed", | |
# minimum=0, | |
# maximum=MAX_SEED, | |
# step=1, | |
# value=0, | |
# ) | |
# randomize_seed = gr.Checkbox(label="Randomize seed", value=True) | |
# with gr.Row(): | |
# guidance_scale = gr.Slider( | |
# label="Guidance scale", | |
# minimum=0.0, | |
# maximum=10.0, | |
# step=0.1, | |
# value=7.0, # Replace with defaults that work for your model | |
# ) | |
# with gr.Row(): | |
# num_inference_steps = gr.Slider( | |
# label="Number of inference steps", | |
# minimum=1, | |
# maximum=50, | |
# step=1, | |
# value=50, # Replace with defaults that work for your model | |
# ) | |
# with gr.Row(): | |
# num_of_interpolation = gr.Slider( | |
# label="Number of images for interpolation", | |
# minimum=5, | |
# maximum=50, | |
# step=1, | |
# value=10, # Replace with defaults that work for your model | |
# ) | |
# gr.Examples(examples=examples, inputs=[prompt1, prompt2]) | |
# gr.on( | |
# triggers=[run_button.click, prompt1.submit, prompt2.submit], | |
# fn=infer, | |
# inputs=[ | |
# prompt1, | |
# prompt2, | |
# seed, | |
# randomize_seed, | |
# guidance_scale, | |
# num_inference_steps, | |
# num_of_interpolation, | |
# ], | |
# # outputs=[result, seed], | |
# outputs=[seed], | |
# ) | |
# if __name__ == "__main__": | |
# demo.launch() | |
import gradio as gr | |
import numpy as np | |
import random | |
# import spaces #[uncomment to use ZeroGPU] | |
from diffusers import DiffusionPipeline | |
import torch | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
model_repo_id = "stabilityai/sdxl-turbo" # Replace to the model you would like to use | |
if torch.cuda.is_available(): | |
torch_dtype = torch.float16 | |
else: | |
torch_dtype = torch.float32 | |
pipe = DiffusionPipeline.from_pretrained(model_repo_id, torch_dtype=torch_dtype) | |
pipe = pipe.to(device) | |
MAX_SEED = np.iinfo(np.int32).max | |
MAX_IMAGE_SIZE = 1024 | |
# @spaces.GPU #[uncomment to use ZeroGPU] | |
def infer( | |
prompt, | |
negative_prompt, | |
seed, | |
randomize_seed, | |
width, | |
height, | |
guidance_scale, | |
num_inference_steps, | |
progress=gr.Progress(track_tqdm=True), | |
): | |
if randomize_seed: | |
seed = random.randint(0, MAX_SEED) | |
generator = torch.Generator().manual_seed(seed) | |
image = pipe( | |
prompt=prompt, | |
negative_prompt=negative_prompt, | |
guidance_scale=guidance_scale, | |
num_inference_steps=num_inference_steps, | |
width=width, | |
height=height, | |
generator=generator, | |
).images[0] | |
print('image.shape') | |
print(image.shape) | |
return image, seed | |
examples = [ | |
"Astronaut in a jungle, cold color palette, muted colors, detailed, 8k", | |
"An astronaut riding a green horse", | |
"A delicious ceviche cheesecake slice", | |
] | |
css = """ | |
#col-container { | |
margin: 0 auto; | |
max-width: 640px; | |
} | |
""" | |
with gr.Blocks(css=css) as demo: | |
with gr.Column(elem_id="col-container"): | |
gr.Markdown(" # Text-to-Image Gradio Template") | |
with gr.Row(): | |
prompt = gr.Text( | |
label="Prompt", | |
show_label=False, | |
max_lines=1, | |
placeholder="Enter your prompt", | |
container=False, | |
) | |
run_button = gr.Button("Run", scale=0, variant="primary") | |
result = gr.Image(label="Result", show_label=False) | |
with gr.Accordion("Advanced Settings", open=False): | |
negative_prompt = gr.Text( | |
label="Negative prompt", | |
max_lines=1, | |
placeholder="Enter a negative prompt", | |
visible=False, | |
) | |
seed = gr.Slider( | |
label="Seed", | |
minimum=0, | |
maximum=MAX_SEED, | |
step=1, | |
value=0, | |
) | |
randomize_seed = gr.Checkbox(label="Randomize seed", value=True) | |
with gr.Row(): | |
width = gr.Slider( | |
label="Width", | |
minimum=256, | |
maximum=MAX_IMAGE_SIZE, | |
step=32, | |
value=1024, # Replace with defaults that work for your model | |
) | |
height = gr.Slider( | |
label="Height", | |
minimum=256, | |
maximum=MAX_IMAGE_SIZE, | |
step=32, | |
value=1024, # Replace with defaults that work for your model | |
) | |
with gr.Row(): | |
guidance_scale = gr.Slider( | |
label="Guidance scale", | |
minimum=0.0, | |
maximum=10.0, | |
step=0.1, | |
value=0.0, # Replace with defaults that work for your model | |
) | |
num_inference_steps = gr.Slider( | |
label="Number of inference steps", | |
minimum=1, | |
maximum=50, | |
step=1, | |
value=2, # Replace with defaults that work for your model | |
) | |
gr.Examples(examples=examples, inputs=[prompt]) | |
gr.on( | |
triggers=[run_button.click, prompt.submit], | |
fn=infer, | |
inputs=[ | |
prompt, | |
negative_prompt, | |
seed, | |
randomize_seed, | |
width, | |
height, | |
guidance_scale, | |
num_inference_steps, | |
], | |
outputs=[result, seed], | |
) | |
if __name__ == "__main__": | |
demo.launch() |