#!/usr/bin/env python # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is #import spaces #import subprocess import os #subprocess.run(['sh', './torch.sh']) os.putenv('PYTORCH_NVML_BASED_CUDA_CHECK','1') os.putenv('TORCH_LINALG_PREFER_CUSOLVER','1') alloc_conf_parts = [ 'expandable_segments:True', 'pinned_use_background_threads:True' # Specific to pinned memory. ] os.environ['PYTORCH_CUDA_ALLOC_CONF'] = ','.join(alloc_conf_parts) os.environ["SAFETENSORS_FAST_GPU"] = "1" os.putenv('HF_HUB_ENABLE_HF_TRANSFER','1') import gradio as gr import numpy as np from PIL import Image DESCRIPTIONXX = """ ## ⚡⚡⚡⚡ REALVISXL V5.0 BF16 (Tester C) ⚡⚡⚡⚡ """ examples = [ "Many apples splashed with drops of water within a fancy bowl 4k, hdr --v 6.0 --style raw", "A profile photo of a dog, brown background, shot on Leica M6 --ar 128:85 --v 6.0 --style raw", ] MODEL_OPTIONS = { "REALVISXL V5.0 BF16": "ford442/RealVisXL_V5.0_BF16", } MAX_IMAGE_SIZE = int(os.getenv("MAX_IMAGE_SIZE", "4096")) BATCH_SIZE = int(os.getenv("BATCH_SIZE", "1")) style_list = [ { "name": "3840 x 2160", "prompt": "hyper-realistic 8K image of {prompt}. ultra-detailed, lifelike, high-resolution, sharp, vibrant colors, photorealistic", "negative_prompt": "cartoonish, low resolution, blurry, simplistic, abstract, deformed, ugly", }, { "name": "2560 x 1440", "prompt": "hyper-realistic 4K image of {prompt}. ultra-detailed, lifelike, high-resolution, sharp, vibrant colors, photorealistic", "negative_prompt": "cartoonish, low resolution, blurry, simplistic, abstract, deformed, ugly", }, { "name": "HD+", "prompt": "hyper-realistic 2K image of {prompt}. ultra-detailed, lifelike, high-resolution, sharp, vibrant colors, photorealistic", "negative_prompt": "cartoonish, low resolution, blurry, simplistic, abstract, deformed, ugly", }, { "name": "Style Zero", "prompt": "{prompt}", "negative_prompt": "", }, ] styles = {k["name"]: (k["prompt"], k["negative_prompt"]) for k in style_list} DEFAULT_STYLE_NAME = "Style Zero" STYLE_NAMES = list(styles.keys()) MAX_SEED = np.iinfo(np.int32).max import os import torch import paramiko import socket import threading # NEW IMPORT import queue # NEW IMPORT torch.backends.cuda.matmul.allow_tf32 = False torch.backends.cuda.matmul.allow_bf16_reduced_precision_reduction = False torch.backends.cuda.matmul.allow_fp16_reduced_precision_reduction = False torch.backends.cudnn.allow_tf32 = False torch.backends.cudnn.deterministic = False torch.backends.cudnn.benchmark = False torch.backends.cuda.preferred_blas_library="cublas" torch.backends.cuda.preferred_linalg_library="cusolver" torch.set_float32_matmul_precision("highest") HF_TOKEN = os.getenv("HF_TOKEN") FTP_HOST = os.getenv("FTP_HOST") FTP_USER = os.getenv("FTP_USER") FTP_PASS = os.getenv("FTP_PASS") FTP_DIR = os.getenv("FTP_DIR") FTP_HOST_FALLBACK = os.getenv("FTP_HOST_FALLBACK") FTP_DIR_FALLBACK = os.getenv("FTP_DIR_FALLBACK") def scheduler_swap_callback(pipeline, step_index, timestep, callback_kwargs): # adjust the batch_size of prompt_embeds according to guidance_scale if step_index == int(pipeline.num_timesteps * 0.1): print("-- swapping torch modes --") # pipeline.scheduler = euler_scheduler torch.set_float32_matmul_precision("high") # pipe.vae = vae_b torch.backends.cudnn.allow_tf32 = True torch.backends.cuda.matmul.allow_tf32 = True torch.backends.cudnn.deterministic = True torch.backends.cuda.preferred_blas_library="cublaslt" #if step_index == int(pipeline.num_timesteps * 0.5): # torch.set_float32_matmul_precision("medium") #callback_kwargs["latents"] = callback_kwargs["latents"].to(torch.float64) #pipe.unet.to(torch.float64) # pipe.guidance_scale=1.0 # pipe.scheduler.set_timesteps(num_inference_steps*.70) # print(f"-- setting step {pipeline.num_timesteps * 0.1} --") # pipeline.scheduler._step_index = pipeline.num_timesteps * 0.1 if step_index == int(pipeline.num_timesteps * 0.9): torch.backends.cuda.preferred_blas_library="cublas" torch.backends.cudnn.allow_tf32 = False torch.backends.cuda.matmul.allow_tf32 = False torch.set_float32_matmul_precision("highest") #callback_kwargs["latents"] = callback_kwargs["latents"].to(torch.bfloat16) #pipe.unet.to(torch.float64) #pipeline.unet.set_default_attn_processor() ## custom ## # pipe.vae = vae_a # pipe.unet = unet_a torch.backends.cudnn.deterministic = False print("-- swapping torch modes --") # pipeline.scheduler = heun_scheduler #pipe.scheduler.set_timesteps(num_inference_steps*.70) # print(f"-- setting step {pipeline.num_timesteps * 0.9} --") # pipeline.scheduler._step_index = pipeline.num_timesteps * 0.9 return callback_kwargs # --- WORKER FUNCTION FOR THREADING --- # This function contains the logic to connect to a single host. # It will be executed by each of our threads. def connect_worker(host, result_queue): """Tries to connect to a single host and puts the successful transport object into the queue.""" transport = None try: transport = paramiko.Transport((host, 22)) # We still use the 5-second timeout for the handshake transport.start_client(timeout=5) transport.auth_password(username=FTP_USER, password=FTP_PASS) # If we reach here, the connection was successful. # Put the result in the queue for the main thread to use. print(f"✅ Connection to {host} succeeded first.") result_queue.put(transport) except (paramiko.SSHException, socket.timeout, EOFError) as e: # This is an expected failure, just print a note. print(f"ℹ️ Connection to {host} failed or was too slow: {e}") if transport: transport.close() except Exception as e: # Handle any other unexpected errors. print(f"❌ Unexpected error connecting to {host}: {e}") if transport: transport.close() def upload_to_ftp(filename): """ Attempts to connect to two FTP hosts simultaneously and uses the first one that responds. It now uses a corresponding directory for the primary and fallback hosts. """ hosts = [FTP_HOST] if FTP_HOST_FALLBACK: hosts.append(FTP_HOST_FALLBACK) result_queue = queue.Queue() threads = [] print(f"--> Racing connections to {hosts} for uploading {filename}...") for host in hosts: thread = threading.Thread(target=connect_worker, args=(host, result_queue)) thread.daemon = True thread.start() threads.append(thread) try: winning_transport = result_queue.get(timeout=7) # --- THIS IS THE NEW LOGIC --- # 1. Determine which host won the race. winning_host = winning_transport.getpeername()[0] # 2. Select the correct destination directory based on the winning host. # If the fallback directory isn't specified, it safely defaults to the primary directory. if winning_host == FTP_HOST: destination_directory = FTP_DIR else: destination_directory = FTP_DIR_FALLBACK if FTP_DIR_FALLBACK else FTP_DIR print(f"--> Proceeding with upload to {winning_host} in directory {destination_directory}...") # 3. Construct the full destination path using the selected directory. sftp = paramiko.SFTPClient.from_transport(winning_transport) destination_path = os.path.join(destination_directory, os.path.basename(filename)) sftp.put(filename, destination_path) print(f"✅ Successfully uploaded {filename}.") sftp.close() winning_transport.close() except queue.Empty: print("❌ Critical Error: Neither FTP host responded in time.") except Exception as e: print(f"❌ An unexpected error occurred during SFTP operation: {e}") def upload_to_ftp_old(filename): try: transport = paramiko.Transport((FTP_HOST, 22)) destination_path=FTP_DIR+filename transport.connect(username = FTP_USER, password = FTP_PASS) sftp = paramiko.SFTPClient.from_transport(transport) sftp.put(filename, destination_path) sftp.close() transport.close() print(f"Uploaded {filename} to FTP server") except Exception as e: print(f"FTP upload error: {e}") def uploadNote(prompt,num_inference_steps,guidance_scale,timestamp): filename= f'rv_C_{timestamp}.txt' with open(filename, "w") as f: f.write(f"Realvis 5.0 (Tester C) \n") f.write(f"Date/time: {timestamp} \n") f.write(f"Prompt: {prompt} \n") f.write(f"Steps: {num_inference_steps} \n") f.write(f"Guidance Scale: {guidance_scale} \n") f.write(f"SPACE SETUP: \n") f.write(f"Model VAE: sdxl-vae-bf16\n") f.write(f"To cuda and bfloat \n") return filename import spaces import torch.nn.functional as F from sageattention import sageattn import random import uuid import gradio as gr import numpy as np from PIL import Image #from accelerate import Accelerator #import diffusers from diffusers import AutoencoderKL, StableDiffusionXLPipeline from diffusers import EulerAncestralDiscreteScheduler #from typing import Tuple #import paramiko import datetime #import cyper from image_gen_aux import UpscaleWithModel #import torch import time import gc MAX_SEED = np.iinfo(np.int32).max device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") #accelerator = Accelerator(mixed_precision="bf16") # Example upscaler = UpscaleWithModel.from_pretrained("Kim2091/ClearRealityV1").to(torch.device("cuda:0")) def load_and_prepare_model(): #sched = EulerAncestralDiscreteScheduler.from_pretrained('ford442/RealVisXL_V5.0_BF16', subfolder='scheduler',beta_schedule="scaled_linear", beta_start=0.00085, beta_end=0.012, steps_offset=1 ,use_karras_sigmas=True) vaeXL = AutoencoderKL.from_pretrained("stabilityai/sdxl-vae", safety_checker=None, use_safetensors=False) #.to(torch.bfloat16) #.to(device=device, dtype=torch.bfloat16) #vaeRV = AutoencoderKL.from_pretrained("SG161222/RealVisXL_V5.0", subfolder='vae', safety_checker=None, use_safetensors=False).to(device).to(torch.bfloat16) #.to(device=device, dtype=torch.bfloat16) #sched = EulerAncestralDiscreteScheduler.from_pretrained('ford442/RealVisXL_V5.0_BF16', subfolder='scheduler',beta_schedule="scaled_linear") #txt_1 = CLIPTextModel.from_pretrained(device_map??) #txt_2 = CLIPTextModel.from_pretrained(vae too?) #sched = EulerAncestralDiscreteScheduler.from_pretrained('ford442/RealVisXL_V5.0_BF16', subfolder='scheduler') pipe = StableDiffusionXLPipeline.from_pretrained( 'ford442/RealVisXL_V5.0_BF16', use_safetensors=True, add_watermarker=False, token = HF_TOKEN, ) #pipe.scheduler = sched #pipe.vae.do_resize = False pipe.vae.do_convert_rgb = True pipe.vae.set_default_attn_processor() print(f'init noise scale: {pipe.scheduler.init_noise_sigma}') pipe.watermark=None pipe.safety_checker=None pipe.to(torch.device('cuda:0'), torch.bfloat16) pipe.vae = vaeXL #.to(torch.bfloat16) #pipe.to(torch.bfloat16) #pipe.to(accelerator.device) return pipe #hidet.option.parallel_build(False) #hidet.option.parallel_tune(2,2.0) #torch._dynamo.config.suppress_errors = True #torch._dynamo.disallow_in_graph(diffusers.models.attention.BasicTransformerBlock) # more search #hidet.torch.dynamo_config.search_space(0) #hidet.torch.dynamo_config.dump_graph_ir("./local_graph") # hidet.option.cache_dir("local_cache") # automatically transform the model to use float16 data type #hidet.torch.dynamo_config.use_fp16(True) # use float16 data type as the accumulate data type in operators with reduction #hidet.torch.dynamo_config.use_fp16_reduction(True) # use tensorcore #hidet.torch.dynamo_config.use_tensor_core() #hidet.torch.dynamo_config.steal_weights(False) # Preload and compile both models pipe = load_and_prepare_model() neg_prompt_2 = " 'non-photorealistic':1.5, 'unrealistic skin','unattractive face':1.3, 'low quality':1.1, ('dull color scheme', 'dull colors', 'digital noise':1.2),'amateurish', 'poorly drawn face':1.3, 'poorly drawn', 'distorted face', 'low resolution', 'simplistic' " @spaces.GPU(duration=40) def generate_30( prompt: str, negative_prompt: str = "", use_negative_prompt: bool = False, style_selection: str = "", width: int = 768, height: int = 768, guidance_scale: float = 4, num_inference_steps: int = 125, sage: bool = False, use_resolution_binning: bool = True, progress=gr.Progress(track_tqdm=True) ): if sage==True: F.scaled_dot_product_attention = sageattn if sage==False: F.scaled_dot_product_attention = F.scaled_dot_product_attention seed = random.randint(0, MAX_SEED) random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed_all(seed) generator = torch.Generator(device='cpu').manual_seed(seed) options = { "prompt": [prompt], "negative_prompt": [negative_prompt], "negative_prompt_2": [neg_prompt_2], "width": width, "height": height, "guidance_scale": guidance_scale, "num_inference_steps": num_inference_steps, "generator": generator, "output_type": "pil", # "callback_on_step_end": scheduler_swap_callback, } if use_resolution_binning: options["use_resolution_binning"] = True images = [] timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") filename = uploadNote(prompt,num_inference_steps,guidance_scale,timestamp) upload_to_ftp(filename) batch_options = options.copy() rv_image = pipe(**batch_options).images[0] sd_image_path = f"rv_C_{timestamp}.png" rv_image.save(sd_image_path,optimize=False,compress_level=0) upload_to_ftp(sd_image_path) torch.set_float32_matmul_precision("medium") with torch.no_grad(): upscale = upscaler(rv_image, tiling=True, tile_width=256, tile_height=256) downscale1 = upscale.resize((upscale.width // 4, upscale.height // 4), Image.LANCZOS) downscale_path = f"rv50_upscale_{timestamp}.png" downscale1.save(downscale_path,optimize=False,compress_level=0) upload_to_ftp(downscale_path) unique_name = str(uuid.uuid4()) + ".png" os.symlink(sd_image_path, unique_name) return [unique_name] @spaces.GPU(duration=70) def generate_60( prompt: str, negative_prompt: str = "", use_negative_prompt: bool = False, style_selection: str = "", width: int = 768, height: int = 768, guidance_scale: float = 4, num_inference_steps: int = 125, sage: bool = False, use_resolution_binning: bool = True, progress=gr.Progress(track_tqdm=True) ): if sage==True: F.scaled_dot_product_attention = sageattn if sage==False: F.scaled_dot_product_attention = F.scaled_dot_product_attention seed = random.randint(0, MAX_SEED) random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed_all(seed) generator = torch.Generator(device='cpu').manual_seed(seed) options = { "prompt": [prompt], "negative_prompt": [negative_prompt], "negative_prompt_2": [neg_prompt_2], "width": width, "height": height, "guidance_scale": guidance_scale, "num_inference_steps": num_inference_steps, "generator": generator, "output_type": "pil", # "callback_on_step_end": scheduler_swap_callback, } if use_resolution_binning: options["use_resolution_binning"] = True images = [] timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") filename = uploadNote(prompt,num_inference_steps,guidance_scale,timestamp) upload_to_ftp(filename) batch_options = options.copy() rv_image = pipe(**batch_options).images[0] sd_image_path = f"rv_C_{timestamp}.png" rv_image.save(sd_image_path,optimize=False,compress_level=0) upload_to_ftp(sd_image_path) torch.set_float32_matmul_precision("medium") with torch.no_grad(): upscale = upscaler(rv_image, tiling=True, tile_width=256, tile_height=256) downscale1 = upscale.resize((upscale.width // 4, upscale.height // 4), Image.LANCZOS) downscale_path = f"rv50_upscale_{timestamp}.png" downscale1.save(downscale_path,optimize=False,compress_level=0) upload_to_ftp(downscale_path) unique_name = str(uuid.uuid4()) + ".png" os.symlink(sd_image_path, unique_name) return [unique_name] @spaces.GPU(duration=100) def generate_90( prompt: str, negative_prompt: str = "", use_negative_prompt: bool = False, style_selection: str = "", width: int = 768, height: int = 768, guidance_scale: float = 4, num_inference_steps: int = 125, sage: bool = False, use_resolution_binning: bool = True, progress=gr.Progress(track_tqdm=True) ): if sage==True: F.scaled_dot_product_attention = sageattn if sage==False: F.scaled_dot_product_attention = F.scaled_dot_product_attention seed = random.randint(0, MAX_SEED) random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed_all(seed) generator = torch.Generator(device='cpu').manual_seed(seed) options = { "prompt": [prompt], "negative_prompt": [negative_prompt], "negative_prompt_2": [neg_prompt_2], "width": width, "height": height, "guidance_scale": guidance_scale, "num_inference_steps": num_inference_steps, "generator": generator, "output_type": "pil", # "callback_on_step_end": scheduler_swap_callback, } if use_resolution_binning: options["use_resolution_binning"] = True images = [] timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") filename = uploadNote(prompt,num_inference_steps,guidance_scale,timestamp) upload_to_ftp(filename) batch_options = options.copy() rv_image = pipe(**batch_options).images[0] sd_image_path = f"rv_C_{timestamp}.png" rv_image.save(sd_image_path,optimize=False,compress_level=0) upload_to_ftp(sd_image_path) torch.set_float32_matmul_precision("medium") with torch.no_grad(): upscale = upscaler(rv_image, tiling=True, tile_width=256, tile_height=256) downscale1 = upscale.resize((upscale.width // 4, upscale.height // 4), Image.LANCZOS) downscale_path = f"rv50_upscale_{timestamp}.png" downscale1.save(downscale_path,optimize=False,compress_level=0) upload_to_ftp(downscale_path) unique_name = str(uuid.uuid4()) + ".png" os.symlink(sd_image_path, unique_name) return [unique_name] def load_predefined_images1(): predefined_images1 = [ "assets/7.png", "assets/8.png", "assets/9.png", "assets/1.png", "assets/2.png", "assets/3.png", "assets/4.png", "assets/5.png", "assets/6.png", ] return predefined_images1 css = ''' #col-container { margin: 0 auto; max-width: 640px; } h1{text-align:center} footer { visibility: hidden } body { background-color: green; } ''' with gr.Blocks(theme=gr.themes.Origin(),css=css) as demo: gr.Markdown(DESCRIPTIONXX) with gr.Row(): prompt = gr.Text( label="Prompt", show_label=False, max_lines=1, placeholder="Enter your prompt", container=False, ) run_button_30 = gr.Button("Run 30 Seconds", scale=0) run_button_60 = gr.Button("Run 60 Seconds", scale=0) run_button_90 = gr.Button("Run 90 Seconds", scale=0) result = gr.Gallery(label="Result", columns=1, show_label=False) with gr.Row(): style_selection = gr.Radio( show_label=True, container=True, interactive=True, choices=STYLE_NAMES, value=DEFAULT_STYLE_NAME, label="Quality Style", ) with gr.Row(): with gr.Column(scale=1): use_negative_prompt = gr.Checkbox(label="Use negative prompt", value=True) negative_prompt = gr.Text( label="Negative prompt", max_lines=5, lines=4, placeholder="Enter a negative prompt", value="('deformed', 'distorted', 'disfigured':1.3),'not photorealistic':1.5, 'poorly drawn', 'bad anatomy', 'wrong anatomy', 'extra limb', 'missing limb', 'floating limbs', 'poorly drawn hands', 'poorly drawn feet', 'poorly drawn face':1.3, 'out of frame', 'extra limbs', 'bad anatomy', 'bad art', 'beginner', 'distorted face','amateur'", visible=True, ) with gr.Row(): width = gr.Slider( label="Width", minimum=448, maximum=MAX_IMAGE_SIZE, step=64, value=768, ) height = gr.Slider( label="Height", minimum=448, maximum=MAX_IMAGE_SIZE, step=64, value=768, ) with gr.Row(): guidance_scale = gr.Slider( label="Guidance Scale", minimum=0.1, maximum=30, step=0.1, value=3.8, ) num_inference_steps = gr.Slider( label="Number of inference steps", minimum=10, maximum=1000, step=10, value=180, ) options = [True, False] sage = gr.Radio( show_label=True, container=True, interactive=True, choices=options, value=False, label="Use SageAttention: ", ) gr.Examples( examples=examples, inputs=prompt, cache_examples=False ) use_negative_prompt.change( fn=lambda x: gr.update(visible=x), inputs=use_negative_prompt, outputs=negative_prompt, api_name=False, ) gr.on( triggers=[ run_button_30.click, ], # api_name="generate", # Add this line fn=generate_30, inputs=[ prompt, negative_prompt, use_negative_prompt, style_selection, width, height, guidance_scale, num_inference_steps, sage, ], outputs=[result], ) gr.on( triggers=[ run_button_60.click, ], # api_name="generate", # Add this line fn=generate_60, inputs=[ prompt, negative_prompt, use_negative_prompt, style_selection, width, height, guidance_scale, num_inference_steps, sage, ], outputs=[result], ) gr.on( triggers=[ run_button_90.click, ], # api_name="generate", # Add this line fn=generate_90, inputs=[ prompt, negative_prompt, use_negative_prompt, style_selection, width, height, guidance_scale, num_inference_steps, sage, ], outputs=[result], ) gr.Markdown("### REALVISXL V5.0") predefined_gallery = gr.Gallery(label="REALVISXL V5.0", columns=3, show_label=False, value=load_predefined_images1()) #gr.Markdown("### LIGHTNING V5.0") #predefined_gallery = gr.Gallery(label="LIGHTNING V5.0", columns=3, show_label=False, value=load_predefined_images()) gr.Markdown( """
⚡Models used in the playground [REALVISXL V5.0], [REALVISXL V5.0 LIGHTNING] for image generation. Stable Diffusion XL piped (SDXL) model HF. This is the demo space for generating images using the Stable Diffusion XL models, with multiple different variants available.
""") gr.Markdown( """
⚡This is the demo space for generating images using Stable Diffusion XL with quality styles, different models, and types. Try the sample prompts to generate higher quality images. Try the sample prompts for generating higher quality images. Try prompts.
""") gr.Markdown( """
⚠️ Users are accountable for the content they generate and are responsible for ensuring it meets appropriate ethical standards.
""") def text_generation(input_text, seed): full_prompt = "Text Generator Application by ecarbo" return full_prompt title = "Text Generator Demo GPT-Neo" description = "Text Generator Application by ecarbo" if __name__ == "__main__": demo_interface = demo.queue(max_size=50) # Remove .launch() here text_gen_interface = gr.Interface( fn=text_generation, inputs=[ gr.Textbox(lines=1, label="Expand the following prompt to be more detailed and descriptive for image generation: "), gr.Number(value=10, label="Enter seed number") ], outputs=gr.Textbox(label="Text Generated"), title=title, description=description, ) combined_interface = gr.TabbedInterface([demo_interface, text_gen_interface], ["Image Generation", "Text Generation"]) combined_interface.launch(show_api=False)