from __future__ import annotations from huggingface_hub import HfApi, snapshot_download from concurrent.futures import ThreadPoolExecutor import asyncio import ast import os import random import time import gradio as gr import numpy as np import PIL.Image import torch from diffusers import StableDiffusionPipeline import uuid from diffusers import DiffusionPipeline from tqdm import tqdm from safetensors.torch import load_file import cv2 MAX_SEED = np.iinfo(np.int32).max CACHE_EXAMPLES = torch.cuda.is_available() and os.getenv("CACHE_EXAMPLES") == "1" MAX_IMAGE_SIZE = int(os.getenv("MAX_IMAGE_SIZE", "768")) USE_TORCH_COMPILE = os.getenv("USE_TORCH_COMPILE") == "1" DTYPE = torch.float32 api = HfApi() executor = ThreadPoolExecutor() model_cache = {} model_id = "Lykon/dreamshaper-xl-v2-turbo" custom_pipe = DiffusionPipeline.from_pretrained( model_id, custom_pipeline="latent_consistency_txt2img", custom_revision="main", safety_checker=None, feature_extractor=None ) custom_pipe.to(torch_device="cpu", torch_dtype=DTYPE) pipe = custom_pipe def randomize_seed_fn(seed: int, randomize_seed: bool) -> int: return random.randint(0, MAX_SEED) if randomize_seed else seed def save_image(img, profile: gr.OAuthProfile | None, metadata: dict): unique_name = str(uuid.uuid4()) + '.png' img.save(unique_name) gr_user_history.save_image(label=metadata["prompt"], image=img, profile=profile, metadata=metadata) return unique_name def save_images(image_array, profile: gr.OAuthProfile | None, metadata: dict): with ThreadPoolExecutor() as executor: return list(executor.map(save_image, image_array, [profile]*len(image_array), [metadata]*len(image_array))) def generate(prompt: str, seed: int = 0, width: int = 512, height: int = 512, guidance_scale: float = 8.0, num_inference_steps: int = 4, num_images: int = 1, randomize_seed: bool = False, progress=gr.Progress(track_tqdm=True), profile: gr.OAuthProfile | None = None) -> tuple[list[str], int]: seed = randomize_seed_fn(seed, randomize_seed) torch.manual_seed(seed) start_time = time.time() outputs = pipe(prompt=prompt, negative_prompt="", height=height, width=width, guidance_scale=guidance_scale, num_inference_steps=num_inference_steps, num_images_per_prompt=num_images, output_type="pil", lcm_origin_steps=50).images print(f"Generation took {time.time() - start_time:.2f} seconds") paths = save_images(outputs, profile, metadata={"prompt": prompt, "seed": seed, "width": width, "height": height, "guidance_scale": guidance_scale, "num_inference_steps": num_inference_steps}) return paths, seed def validate_and_list_models(hfuser): try: models = api.list_models(author=hfuser) return [model.modelId for model in models if model.pipeline_tag == "text-to-image"] except Exception: return [] def parse_user_model_dict(user_model_dict_str): try: data = ast.literal_eval(user_model_dict_str) if isinstance(data, dict) and all(isinstance(v, list) for v in data.values()): return data return {} except Exception: return {} def load_model(model_id): if model_id in model_cache: return f"{model_id} loaded from cache" try: path = snapshot_download(repo_id=model_id, cache_dir="model_cache", token=os.getenv("HF_TOKEN")) model_cache[model_id] = path return f"{model_id} loaded successfully" except Exception as e: return f"{model_id} failed to load: {str(e)}" def run_models(models, parallel): if parallel: futures = [executor.submit(load_model, m) for m in models] return [f.result() for f in futures] return [load_model(m) for m in models] with gr.Blocks() as demo: with gr.Row(): gr.HTML("""
this is currently running the Lykon/dreamshaper-xl-v2-turbo model