Spaces:
Paused
Paused
import os | |
from fastapi import FastAPI, File, UploadFile, Form | |
from fastapi.responses import StreamingResponse | |
import torch | |
from diffusers import StableDiffusionPipeline, StableDiffusionXLPipeline, EulerAncestralDiscreteScheduler, DPMSolverSinglestepScheduler | |
from diffusers.pipelines import StableDiffusionInpaintPipeline, StableDiffusionXLInpaintPipeline | |
from huggingface_hub import hf_hub_download | |
import numpy as np | |
import random | |
from PIL import Image | |
import io | |
app = FastAPI() | |
MAX_SEED = np.iinfo(np.int32).max | |
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
# Set directory for model storage | |
MODEL_DIR = "/data" | |
# Ensure model directory exists | |
os.makedirs(MODEL_DIR, exist_ok=True) | |
# Download models to local directory | |
HF_TOKEN = os.getenv("HF_TOKEN") # Replace with your actual token | |
def download_model(repo_id, filename=None, model_dir=MODEL_DIR, token=HF_TOKEN): | |
if filename: | |
return hf_hub_download(repo_id=repo_id, filename=filename, local_dir=model_dir, token=token) | |
return hf_hub_download(repo_id=repo_id, local_dir=model_dir, token=token) | |
# Paths for models | |
paths = { | |
"Fluently XL Final": download_model("fluently/Fluently-XL-Final", "FluentlyXL-Final.safetensors"), | |
"Fluently Anime": download_model("fluently/Fluently-anime"), | |
"Fluently Epic": download_model("fluently/Fluently-epic"), | |
"Fluently XL v4": download_model("fluently/Fluently-XL-v4"), | |
"Fluently XL v3 Lightning": download_model("fluently/Fluently-XL-v3-lightning"), | |
"Fluently v4 inpaint": download_model("fluently/Fluently-v4-inpainting"), | |
"Fluently XL v3 inpaint": download_model("fluently/Fluently-XL-v3-inpainting", "FluentlyXL-v3-inpainting.safetensors"), | |
} | |
# Function to load model dynamically | |
def load_model(model_name): | |
if model_name == "Fluently XL Final": | |
model = StableDiffusionXLPipeline.from_single_file( | |
paths[model_name], | |
torch_dtype=torch.float16, | |
use_safetensors=True, | |
) | |
model.scheduler = EulerAncestralDiscreteScheduler.from_config(model.scheduler.config) | |
elif model_name == "Fluently Anime": | |
model = StableDiffusionPipeline.from_pretrained( | |
paths[model_name], | |
torch_dtype=torch.float16, | |
use_safetensors=True, | |
) | |
model.scheduler = EulerAncestralDiscreteScheduler.from_config(model.scheduler.config) | |
elif model_name == "Fluently Epic": | |
model = StableDiffusionPipeline.from_pretrained( | |
paths[model_name], | |
torch_dtype=torch.float16, | |
use_safetensors=True, | |
) | |
model.scheduler = EulerAncestralDiscreteScheduler.from_config(model.scheduler.config) | |
elif model_name == "Fluently XL v4": | |
model = StableDiffusionXLPipeline.from_pretrained( | |
paths[model_name], | |
torch_dtype=torch.float16, | |
use_safetensors=True, | |
) | |
model.scheduler = EulerAncestralDiscreteScheduler.from_config(model.scheduler.config) | |
elif model_name == "Fluently XL v3 Lightning": | |
model = StableDiffusionXLPipeline.from_pretrained( | |
paths[model_name], | |
torch_dtype=torch.float16, | |
use_safetensors=True, | |
) | |
model.scheduler = DPMSolverSinglestepScheduler.from_config(model.scheduler.config, use_karras_sigmas=False, timestep_spacing="trailing", lower_order_final=True) | |
elif model_name in ["Fluently v4 inpaint", "Fluently XL v3 inpaint"]: | |
if model_name == "Fluently v4 inpaint": | |
model = StableDiffusionInpaintPipeline.from_pretrained( | |
paths[model_name], | |
torch_dtype=torch.float16, | |
use_safetensors=True, | |
) | |
else: | |
model = StableDiffusionXLInpaintPipeline.from_single_file( | |
paths[model_name], | |
torch_dtype=torch.float16, | |
use_safetensors=True, | |
) | |
else: | |
raise ValueError(f"Model {model_name} not found") | |
model.to(device) | |
return model | |
def randomize_seed_fn(seed: int, randomize_seed: bool) -> int: | |
if randomize_seed: | |
seed = random.randint(0, MAX_SEED) | |
return seed | |
async def generate( | |
model: str = Form(...), | |
prompt: str = Form(...), | |
negative_prompt: str = Form(""), | |
use_negative_prompt: bool = Form(False), | |
seed: int = Form(0), | |
width: int = Form(1024), | |
height: int = Form(1024), | |
guidance_scale: float = Form(3), | |
randomize_seed: bool = Form(False), | |
inpaint_image: UploadFile = File(None), | |
mask_image: UploadFile = File(None), | |
blur_factor: float = Form(1.0), | |
strength: float = Form(0.75) | |
): | |
seed = int(randomize_seed_fn(seed, randomize_seed)) | |
if not use_negative_prompt: | |
negative_prompt = "" | |
inpaint_image_pil = Image.open(io.BytesIO(await inpaint_image.read())) if inpaint_image else None | |
mask_image_pil = Image.open(io.BytesIO(await mask_image.read())) if mask_image else None | |
model_pipeline = load_model(model) | |
if model == "Fluently XL Final": | |
images = model_pipeline( | |
prompt=prompt, | |
negative_prompt=negative_prompt, | |
width=width, | |
height=height, | |
guidance_scale=guidance_scale, | |
num_inference_steps=25, | |
num_images_per_prompt=1, | |
output_type="pil", | |
).images | |
elif model == "Fluently Anime": | |
images = model_pipeline( | |
prompt=prompt, | |
negative_prompt=negative_prompt, | |
width=width, | |
height=height, | |
guidance_scale=guidance_scale, | |
num_inference_steps=30, | |
num_images_per_prompt=1, | |
output_type="pil", | |
).images | |
elif model == "Fluently Epic": | |
images = model_pipeline( | |
prompt=prompt, | |
negative_prompt=negative_prompt, | |
width=width, | |
height=height, | |
guidance_scale=guidance_scale, | |
num_inference_steps=30, | |
num_images_per_prompt=1, | |
output_type="pil", | |
).images | |
elif model == "Fluently XL v4": | |
images = model_pipeline( | |
prompt=prompt, | |
negative_prompt=negative_prompt, | |
width=width, | |
height=height, | |
guidance_scale=guidance_scale, | |
num_inference_steps=25, | |
num_images_per_prompt=1, | |
output_type="pil", | |
).images | |
elif model == "Fluently XL v3 Lightning": | |
images = model_pipeline( | |
prompt=prompt, | |
negative_prompt=negative_prompt, | |
width=width, | |
height=height, | |
guidance_scale=2, | |
num_inference_steps=5, | |
num_images_per_prompt=1, | |
output_type="pil", | |
).images | |
elif model == "Fluently v4 inpaint" or model == "Fluently XL v3 inpaint": | |
blurred_mask = model_pipeline.mask_processor.blur(mask_image_pil, blur_factor=blur_factor) | |
images = model_pipeline( | |
prompt=prompt, | |
image=inpaint_image_pil, | |
mask_image=blurred_mask, | |
negative_prompt=negative_prompt, | |
width=width, | |
height=height, | |
guidance_scale=guidance_scale, | |
num_inference_steps=30, | |
strength=strength, | |
num_images_per_prompt=1, | |
output_type="pil", | |
).images | |
# Unload the model from the device | |
model_pipeline.to("cpu") | |
torch.cuda.empty_cache() | |
img = images[0] | |
img_byte_arr = io.BytesIO() | |
img.save(img_byte_arr, format='PNG') | |
img_byte_arr.seek(0) | |
return StreamingResponse(img_byte_arr, media_type="image/png") | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=7860) | |