Spaces:
Running
Running
| # https://huggingface.co/deinferno/LCM_Dreamshaper_v7-openvino | |
| import inspect | |
| from pathlib import Path | |
| from tempfile import TemporaryDirectory | |
| from typing import List, Optional, Tuple, Union, Dict, Any, Callable, OrderedDict | |
| import numpy as np | |
| import openvino | |
| import torch | |
| from diffusers.pipelines.stable_diffusion import StableDiffusionPipelineOutput | |
| from optimum.intel.openvino.modeling_diffusion import ( | |
| OVStableDiffusionPipeline, | |
| OVModelUnet, | |
| OVModelVaeDecoder, | |
| OVModelTextEncoder, | |
| OVModelVaeEncoder, | |
| VaeImageProcessor, | |
| ) | |
| from optimum.utils import ( | |
| DIFFUSION_MODEL_TEXT_ENCODER_2_SUBFOLDER, | |
| DIFFUSION_MODEL_TEXT_ENCODER_SUBFOLDER, | |
| DIFFUSION_MODEL_UNET_SUBFOLDER, | |
| DIFFUSION_MODEL_VAE_DECODER_SUBFOLDER, | |
| DIFFUSION_MODEL_VAE_ENCODER_SUBFOLDER, | |
| ) | |
| from diffusers import logging | |
| logger = logging.get_logger(__name__) # pylint: disable=invalid-name | |
| class LCMOVModelUnet(OVModelUnet): | |
| def __call__( | |
| self, | |
| sample: np.ndarray, | |
| timestep: np.ndarray, | |
| encoder_hidden_states: np.ndarray, | |
| timestep_cond: Optional[np.ndarray] = None, | |
| text_embeds: Optional[np.ndarray] = None, | |
| time_ids: Optional[np.ndarray] = None, | |
| ): | |
| self._compile() | |
| inputs = { | |
| "sample": sample, | |
| "timestep": timestep, | |
| "encoder_hidden_states": encoder_hidden_states, | |
| } | |
| if timestep_cond is not None: | |
| inputs["timestep_cond"] = timestep_cond | |
| if text_embeds is not None: | |
| inputs["text_embeds"] = text_embeds | |
| if time_ids is not None: | |
| inputs["time_ids"] = time_ids | |
| outputs = self.request(inputs, shared_memory=True) | |
| return list(outputs.values()) | |
| class OVLatentConsistencyModelPipeline(OVStableDiffusionPipeline): | |
| def __init__( | |
| self, | |
| vae_decoder: openvino.runtime.Model, | |
| text_encoder: openvino.runtime.Model, | |
| unet: openvino.runtime.Model, | |
| config: Dict[str, Any], | |
| tokenizer: "CLIPTokenizer", | |
| scheduler: Union["DDIMScheduler", "PNDMScheduler", "LMSDiscreteScheduler"], | |
| feature_extractor: Optional["CLIPFeatureExtractor"] = None, | |
| vae_encoder: Optional[openvino.runtime.Model] = None, | |
| text_encoder_2: Optional[openvino.runtime.Model] = None, | |
| tokenizer_2: Optional["CLIPTokenizer"] = None, | |
| device: str = "CPU", | |
| dynamic_shapes: bool = True, | |
| compile: bool = True, | |
| ov_config: Optional[Dict[str, str]] = None, | |
| model_save_dir: Optional[Union[str, Path, TemporaryDirectory]] = None, | |
| **kwargs, | |
| ): | |
| self._internal_dict = config | |
| self._device = device.upper() | |
| self.is_dynamic = dynamic_shapes | |
| self.ov_config = ov_config if ov_config is not None else {} | |
| self._model_save_dir = ( | |
| Path(model_save_dir.name) | |
| if isinstance(model_save_dir, TemporaryDirectory) | |
| else model_save_dir | |
| ) | |
| self.vae_decoder = OVModelVaeDecoder(vae_decoder, self) | |
| self.unet = LCMOVModelUnet(unet, self) | |
| self.text_encoder = ( | |
| OVModelTextEncoder(text_encoder, self) if text_encoder is not None else None | |
| ) | |
| self.text_encoder_2 = ( | |
| OVModelTextEncoder( | |
| text_encoder_2, | |
| self, | |
| model_name=DIFFUSION_MODEL_TEXT_ENCODER_2_SUBFOLDER, | |
| ) | |
| if text_encoder_2 is not None | |
| else None | |
| ) | |
| self.vae_encoder = ( | |
| OVModelVaeEncoder(vae_encoder, self) if vae_encoder is not None else None | |
| ) | |
| if "block_out_channels" in self.vae_decoder.config: | |
| self.vae_scale_factor = 2 ** ( | |
| len(self.vae_decoder.config["block_out_channels"]) - 1 | |
| ) | |
| else: | |
| self.vae_scale_factor = 8 | |
| self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor) | |
| self.tokenizer = tokenizer | |
| self.tokenizer_2 = tokenizer_2 | |
| self.scheduler = scheduler | |
| self.feature_extractor = feature_extractor | |
| self.safety_checker = None | |
| self.preprocessors = [] | |
| if self.is_dynamic: | |
| self.reshape(batch_size=-1, height=-1, width=-1, num_images_per_prompt=-1) | |
| if compile: | |
| self.compile() | |
| sub_models = { | |
| DIFFUSION_MODEL_TEXT_ENCODER_SUBFOLDER: self.text_encoder, | |
| DIFFUSION_MODEL_UNET_SUBFOLDER: self.unet, | |
| DIFFUSION_MODEL_VAE_DECODER_SUBFOLDER: self.vae_decoder, | |
| DIFFUSION_MODEL_VAE_ENCODER_SUBFOLDER: self.vae_encoder, | |
| DIFFUSION_MODEL_TEXT_ENCODER_2_SUBFOLDER: self.text_encoder_2, | |
| } | |
| for name in sub_models.keys(): | |
| self._internal_dict[name] = ( | |
| ("optimum", sub_models[name].__class__.__name__) | |
| if sub_models[name] is not None | |
| else (None, None) | |
| ) | |
| self._internal_dict.pop("vae", None) | |
| def _reshape_unet( | |
| self, | |
| model: openvino.runtime.Model, | |
| batch_size: int = -1, | |
| height: int = -1, | |
| width: int = -1, | |
| num_images_per_prompt: int = -1, | |
| tokenizer_max_length: int = -1, | |
| ): | |
| if batch_size == -1 or num_images_per_prompt == -1: | |
| batch_size = -1 | |
| else: | |
| batch_size = batch_size * num_images_per_prompt | |
| height = height // self.vae_scale_factor if height > 0 else height | |
| width = width // self.vae_scale_factor if width > 0 else width | |
| shapes = {} | |
| for inputs in model.inputs: | |
| shapes[inputs] = inputs.get_partial_shape() | |
| if inputs.get_any_name() == "timestep": | |
| shapes[inputs][0] = 1 | |
| elif inputs.get_any_name() == "sample": | |
| in_channels = self.unet.config.get("in_channels", None) | |
| if in_channels is None: | |
| in_channels = shapes[inputs][1] | |
| if in_channels.is_dynamic: | |
| logger.warning( | |
| "Could not identify `in_channels` from the unet configuration, to statically reshape the unet please provide a configuration." | |
| ) | |
| self.is_dynamic = True | |
| shapes[inputs] = [batch_size, in_channels, height, width] | |
| elif inputs.get_any_name() == "timestep_cond": | |
| shapes[inputs] = [batch_size, inputs.get_partial_shape()[1]] | |
| elif inputs.get_any_name() == "text_embeds": | |
| shapes[inputs] = [ | |
| batch_size, | |
| self.text_encoder_2.config["projection_dim"], | |
| ] | |
| elif inputs.get_any_name() == "time_ids": | |
| shapes[inputs] = [batch_size, inputs.get_partial_shape()[1]] | |
| else: | |
| shapes[inputs][0] = batch_size | |
| shapes[inputs][1] = tokenizer_max_length | |
| model.reshape(shapes) | |
| return model | |
| def get_guidance_scale_embedding(self, w, embedding_dim=512, dtype=np.float32): | |
| """ | |
| see https://github.com/google-research/vdm/blob/dc27b98a554f65cdc654b800da5aa1846545d41b/model_vdm.py#L298 | |
| Args: | |
| timesteps: np.array: generate embedding vectors at these timesteps | |
| embedding_dim: int: dimension of the embeddings to generate | |
| dtype: data type of the generated embeddings | |
| Returns: | |
| embedding vectors with shape `(len(timesteps), embedding_dim)` | |
| """ | |
| assert len(w.shape) == 1 | |
| w = w * 1000.0 | |
| half_dim = embedding_dim // 2 | |
| emb = np.log(np.array(10000.0)) / (half_dim - 1) | |
| emb = np.exp(np.arange(half_dim, dtype=dtype) * -emb) | |
| emb = w.astype(dtype)[:, None] * emb[None, :] | |
| emb = np.concatenate([np.sin(emb), np.cos(emb)], axis=1) | |
| if embedding_dim % 2 == 1: # zero pad | |
| emb = np.pad(emb, (0, 1)) | |
| assert emb.shape == (w.shape[0], embedding_dim) | |
| return emb | |
| # Adapted from https://github.com/huggingface/optimum/blob/15b8d1eed4d83c5004d3b60f6b6f13744b358f01/optimum/pipelines/diffusers/pipeline_stable_diffusion.py#L201 | |
| def __call__( | |
| self, | |
| prompt: Optional[Union[str, List[str]]] = None, | |
| height: Optional[int] = None, | |
| width: Optional[int] = None, | |
| num_inference_steps: int = 4, | |
| original_inference_steps: int = None, | |
| guidance_scale: float = 7.5, | |
| num_images_per_prompt: int = 1, | |
| eta: float = 0.0, | |
| generator: Optional[np.random.RandomState] = None, | |
| latents: Optional[np.ndarray] = None, | |
| prompt_embeds: Optional[np.ndarray] = None, | |
| output_type: str = "pil", | |
| return_dict: bool = True, | |
| callback: Optional[Callable[[int, int, np.ndarray], None]] = None, | |
| callback_steps: int = 1, | |
| guidance_rescale: float = 0.0, | |
| ): | |
| r""" | |
| Function invoked when calling the pipeline for generation. | |
| Args: | |
| prompt (`Optional[Union[str, List[str]]]`, defaults to None): | |
| The prompt or prompts to guide the image generation. If not defined, one has to pass `prompt_embeds`. | |
| instead. | |
| height (`Optional[int]`, defaults to None): | |
| The height in pixels of the generated image. | |
| width (`Optional[int]`, defaults to None): | |
| The width in pixels of the generated image. | |
| num_inference_steps (`int`, defaults to 4): | |
| The number of denoising steps. More denoising steps usually lead to a higher quality image at the | |
| expense of slower inference. | |
| original_inference_steps (`int`, *optional*): | |
| The original number of inference steps use to generate a linearly-spaced timestep schedule, from which | |
| we will draw `num_inference_steps` evenly spaced timesteps from as our final timestep schedule, | |
| following the Skipping-Step method in the paper (see Section 4.3). If not set this will default to the | |
| scheduler's `original_inference_steps` attribute. | |
| guidance_scale (`float`, defaults to 7.5): | |
| Guidance scale as defined in [Classifier-Free Diffusion Guidance](https://arxiv.org/abs/2207.12598). | |
| `guidance_scale` is defined as `w` of equation 2. of [Imagen | |
| Paper](https://arxiv.org/pdf/2205.11487.pdf). Guidance scale is enabled by setting `guidance_scale > | |
| 1`. Higher guidance scale encourages to generate images that are closely linked to the text `prompt`, | |
| usually at the expense of lower image quality. | |
| num_images_per_prompt (`int`, defaults to 1): | |
| The number of images to generate per prompt. | |
| eta (`float`, defaults to 0.0): | |
| Corresponds to parameter eta (η) in the DDIM paper: https://arxiv.org/abs/2010.02502. Only applies to | |
| [`schedulers.DDIMScheduler`], will be ignored for others. | |
| generator (`Optional[np.random.RandomState]`, defaults to `None`):: | |
| A np.random.RandomState to make generation deterministic. | |
| latents (`Optional[np.ndarray]`, defaults to `None`): | |
| Pre-generated noisy latents, sampled from a Gaussian distribution, to be used as inputs for image | |
| generation. Can be used to tweak the same generation with different prompts. If not provided, a latents | |
| tensor will ge generated by sampling using the supplied random `generator`. | |
| prompt_embeds (`Optional[np.ndarray]`, defaults to `None`): | |
| Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If not | |
| provided, text embeddings will be generated from `prompt` input argument. | |
| output_type (`str`, defaults to `"pil"`): | |
| The output format of the generate image. Choose between | |
| [PIL](https://pillow.readthedocs.io/en/stable/): `PIL.Image.Image` or `np.array`. | |
| return_dict (`bool`, defaults to `True`): | |
| Whether or not to return a [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] instead of a | |
| plain tuple. | |
| callback (Optional[Callable], defaults to `None`): | |
| A function that will be called every `callback_steps` steps during inference. The function will be | |
| called with the following arguments: `callback(step: int, timestep: int, latents: torch.FloatTensor)`. | |
| callback_steps (`int`, defaults to 1): | |
| The frequency at which the `callback` function will be called. If not specified, the callback will be | |
| called at every step. | |
| guidance_rescale (`float`, defaults to 0.0): | |
| Guidance rescale factor proposed by [Common Diffusion Noise Schedules and Sample Steps are | |
| Flawed](https://arxiv.org/pdf/2305.08891.pdf) `guidance_scale` is defined as `φ` in equation 16. of | |
| [Common Diffusion Noise Schedules and Sample Steps are Flawed](https://arxiv.org/pdf/2305.08891.pdf). | |
| Guidance rescale factor should fix overexposure when using zero terminal SNR. | |
| Returns: | |
| [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] or `tuple`: | |
| [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] if `return_dict` is True, otherwise a `tuple. | |
| When returning a tuple, the first element is a list with the generated images, and the second element is a | |
| list of `bool`s denoting whether the corresponding generated image likely represents "not-safe-for-work" | |
| (nsfw) content, according to the `safety_checker`. | |
| """ | |
| height = ( | |
| height or self.unet.config.get("sample_size", 64) * self.vae_scale_factor | |
| ) | |
| width = width or self.unet.config.get("sample_size", 64) * self.vae_scale_factor | |
| # check inputs. Raise error if not correct | |
| self.check_inputs( | |
| prompt, height, width, callback_steps, None, prompt_embeds, None | |
| ) | |
| # define call parameters | |
| if isinstance(prompt, str): | |
| batch_size = 1 | |
| elif isinstance(prompt, list): | |
| batch_size = len(prompt) | |
| else: | |
| batch_size = prompt_embeds.shape[0] | |
| if generator is None: | |
| generator = np.random | |
| # Create torch.Generator instance with same state as np.random.RandomState | |
| torch_generator = torch.Generator().manual_seed( | |
| int(generator.get_state()[1][0]) | |
| ) | |
| # do_classifier_free_guidance = guidance_scale > 1.0 | |
| # NOTE: when a LCM is distilled from an LDM via latent consistency distillation (Algorithm 1) with guided | |
| # distillation, the forward pass of the LCM learns to approximate sampling from the LDM using CFG with the | |
| # unconditional prompt "" (the empty string). Due to this, LCMs currently do not support negative prompts. | |
| prompt_embeds = self._encode_prompt( | |
| prompt, | |
| num_images_per_prompt, | |
| False, | |
| negative_prompt=None, | |
| prompt_embeds=prompt_embeds, | |
| negative_prompt_embeds=None, | |
| ) | |
| # set timesteps | |
| self.scheduler.set_timesteps( | |
| num_inference_steps, | |
| "cpu", | |
| original_inference_steps=original_inference_steps, | |
| ) | |
| timesteps = self.scheduler.timesteps | |
| latents = self.prepare_latents( | |
| batch_size * num_images_per_prompt, | |
| self.unet.config.get("in_channels", 4), | |
| height, | |
| width, | |
| prompt_embeds.dtype, | |
| generator, | |
| latents, | |
| ) | |
| # Get Guidance Scale Embedding | |
| w = np.tile(guidance_scale - 1, batch_size * num_images_per_prompt) | |
| w_embedding = self.get_guidance_scale_embedding( | |
| w, embedding_dim=self.unet.config.get("time_cond_proj_dim", 256) | |
| ) | |
| # Adapted from diffusers to extend it for other runtimes than ORT | |
| timestep_dtype = self.unet.input_dtype.get("timestep", np.float32) | |
| # prepare extra kwargs for the scheduler step, since not all schedulers have the same signature | |
| # eta (η) is only used with the DDIMScheduler, it will be ignored for other schedulers. | |
| # eta corresponds to η in DDIM paper: https://arxiv.org/abs/2010.02502 | |
| # and should be between [0, 1] | |
| accepts_eta = "eta" in set( | |
| inspect.signature(self.scheduler.step).parameters.keys() | |
| ) | |
| extra_step_kwargs = {} | |
| if accepts_eta: | |
| extra_step_kwargs["eta"] = eta | |
| accepts_generator = "generator" in set( | |
| inspect.signature(self.scheduler.step).parameters.keys() | |
| ) | |
| if accepts_generator: | |
| extra_step_kwargs["generator"] = torch_generator | |
| num_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.order | |
| for i, t in enumerate(self.progress_bar(timesteps)): | |
| # predict the noise residual | |
| timestep = np.array([t], dtype=timestep_dtype) | |
| noise_pred = self.unet( | |
| sample=latents, | |
| timestep=timestep, | |
| timestep_cond=w_embedding, | |
| encoder_hidden_states=prompt_embeds, | |
| )[0] | |
| # compute the previous noisy sample x_t -> x_t-1 | |
| latents, denoised = self.scheduler.step( | |
| torch.from_numpy(noise_pred), | |
| t, | |
| torch.from_numpy(latents), | |
| **extra_step_kwargs, | |
| return_dict=False, | |
| ) | |
| latents, denoised = latents.numpy(), denoised.numpy() | |
| # call the callback, if provided | |
| if i == len(timesteps) - 1 or ( | |
| (i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0 | |
| ): | |
| if callback is not None and i % callback_steps == 0: | |
| callback(i, t, latents) | |
| if output_type == "latent": | |
| image = latents | |
| has_nsfw_concept = None | |
| else: | |
| denoised /= self.vae_decoder.config.get("scaling_factor", 0.18215) | |
| # it seems likes there is a strange result for using half-precision vae decoder if batchsize>1 | |
| image = np.concatenate( | |
| [ | |
| self.vae_decoder(latent_sample=denoised[i : i + 1])[0] | |
| for i in range(latents.shape[0]) | |
| ] | |
| ) | |
| image, has_nsfw_concept = self.run_safety_checker(image) | |
| if has_nsfw_concept is None: | |
| do_denormalize = [True] * image.shape[0] | |
| else: | |
| do_denormalize = [not has_nsfw for has_nsfw in has_nsfw_concept] | |
| image = self.image_processor.postprocess( | |
| image, output_type=output_type, do_denormalize=do_denormalize | |
| ) | |
| if not return_dict: | |
| return (image, has_nsfw_concept) | |
| return StableDiffusionPipelineOutput( | |
| images=image, nsfw_content_detected=has_nsfw_concept | |
| ) | |