Spaces:
Paused
Paused
File size: 7,814 Bytes
4b9e459 6145eb5 84ec534 6145eb5 84ec534 6145eb5 84ec534 6145eb5 84ec534 6145eb5 84ec534 6145eb5 84ec534 6145eb5 84ec534 6145eb5 84ec534 6145eb5 84ec534 6145eb5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
import os
from fastapi import FastAPI, File, UploadFile, Form
from fastapi.responses import StreamingResponse
import torch
from diffusers import StableDiffusionPipeline, StableDiffusionXLPipeline, EulerAncestralDiscreteScheduler, DPMSolverSinglestepScheduler
from diffusers.pipelines import StableDiffusionInpaintPipeline, StableDiffusionXLInpaintPipeline
from huggingface_hub import hf_hub_download
import numpy as np
import random
from PIL import Image
import io
app = FastAPI()
MAX_SEED = np.iinfo(np.int32).max
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# Set directory for model storage
MODEL_DIR = "/data"
# Ensure model directory exists
os.makedirs(MODEL_DIR, exist_ok=True)
# Download models to local directory
HF_TOKEN = os.getenv("HF_TOKEN") # Replace with your actual token
def download_model(repo_id, filename=None, model_dir=MODEL_DIR, token=HF_TOKEN):
if filename:
return hf_hub_download(repo_id=repo_id, filename=filename, local_dir=model_dir, token=token)
return hf_hub_download(repo_id=repo_id, local_dir=model_dir, token=token)
# Paths for models
paths = {
"Fluently XL Final": download_model("fluently/Fluently-XL-Final", "FluentlyXL-Final.safetensors"),
"Fluently Anime": download_model("fluently/Fluently-anime"),
"Fluently Epic": download_model("fluently/Fluently-epic"),
"Fluently XL v4": download_model("fluently/Fluently-XL-v4"),
"Fluently XL v3 Lightning": download_model("fluently/Fluently-XL-v3-lightning"),
"Fluently v4 inpaint": download_model("fluently/Fluently-v4-inpainting"),
"Fluently XL v3 inpaint": download_model("fluently/Fluently-XL-v3-inpainting", "FluentlyXL-v3-inpainting.safetensors"),
}
# Function to load model dynamically
def load_model(model_name):
if model_name == "Fluently XL Final":
model = StableDiffusionXLPipeline.from_single_file(
paths[model_name],
torch_dtype=torch.float16,
use_safetensors=True,
)
model.scheduler = EulerAncestralDiscreteScheduler.from_config(model.scheduler.config)
elif model_name == "Fluently Anime":
model = StableDiffusionPipeline.from_pretrained(
paths[model_name],
torch_dtype=torch.float16,
use_safetensors=True,
)
model.scheduler = EulerAncestralDiscreteScheduler.from_config(model.scheduler.config)
elif model_name == "Fluently Epic":
model = StableDiffusionPipeline.from_pretrained(
paths[model_name],
torch_dtype=torch.float16,
use_safetensors=True,
)
model.scheduler = EulerAncestralDiscreteScheduler.from_config(model.scheduler.config)
elif model_name == "Fluently XL v4":
model = StableDiffusionXLPipeline.from_pretrained(
paths[model_name],
torch_dtype=torch.float16,
use_safetensors=True,
)
model.scheduler = EulerAncestralDiscreteScheduler.from_config(model.scheduler.config)
elif model_name == "Fluently XL v3 Lightning":
model = StableDiffusionXLPipeline.from_pretrained(
paths[model_name],
torch_dtype=torch.float16,
use_safetensors=True,
)
model.scheduler = DPMSolverSinglestepScheduler.from_config(model.scheduler.config, use_karras_sigmas=False, timestep_spacing="trailing", lower_order_final=True)
elif model_name in ["Fluently v4 inpaint", "Fluently XL v3 inpaint"]:
if model_name == "Fluently v4 inpaint":
model = StableDiffusionInpaintPipeline.from_pretrained(
paths[model_name],
torch_dtype=torch.float16,
use_safetensors=True,
)
else:
model = StableDiffusionXLInpaintPipeline.from_single_file(
paths[model_name],
torch_dtype=torch.float16,
use_safetensors=True,
)
else:
raise ValueError(f"Model {model_name} not found")
model.to(device)
return model
def randomize_seed_fn(seed: int, randomize_seed: bool) -> int:
if randomize_seed:
seed = random.randint(0, MAX_SEED)
return seed
@app.post("/generate")
async def generate(
model: str = Form(...),
prompt: str = Form(...),
negative_prompt: str = Form(""),
use_negative_prompt: bool = Form(False),
seed: int = Form(0),
width: int = Form(1024),
height: int = Form(1024),
guidance_scale: float = Form(3),
randomize_seed: bool = Form(False),
inpaint_image: UploadFile = File(None),
mask_image: UploadFile = File(None),
blur_factor: float = Form(1.0),
strength: float = Form(0.75)
):
seed = int(randomize_seed_fn(seed, randomize_seed))
if not use_negative_prompt:
negative_prompt = ""
inpaint_image_pil = Image.open(io.BytesIO(await inpaint_image.read())) if inpaint_image else None
mask_image_pil = Image.open(io.BytesIO(await mask_image.read())) if mask_image else None
model_pipeline = load_model(model)
if model == "Fluently XL Final":
images = model_pipeline(
prompt=prompt,
negative_prompt=negative_prompt,
width=width,
height=height,
guidance_scale=guidance_scale,
num_inference_steps=25,
num_images_per_prompt=1,
output_type="pil",
).images
elif model == "Fluently Anime":
images = model_pipeline(
prompt=prompt,
negative_prompt=negative_prompt,
width=width,
height=height,
guidance_scale=guidance_scale,
num_inference_steps=30,
num_images_per_prompt=1,
output_type="pil",
).images
elif model == "Fluently Epic":
images = model_pipeline(
prompt=prompt,
negative_prompt=negative_prompt,
width=width,
height=height,
guidance_scale=guidance_scale,
num_inference_steps=30,
num_images_per_prompt=1,
output_type="pil",
).images
elif model == "Fluently XL v4":
images = model_pipeline(
prompt=prompt,
negative_prompt=negative_prompt,
width=width,
height=height,
guidance_scale=guidance_scale,
num_inference_steps=25,
num_images_per_prompt=1,
output_type="pil",
).images
elif model == "Fluently XL v3 Lightning":
images = model_pipeline(
prompt=prompt,
negative_prompt=negative_prompt,
width=width,
height=height,
guidance_scale=2,
num_inference_steps=5,
num_images_per_prompt=1,
output_type="pil",
).images
elif model == "Fluently v4 inpaint" or model == "Fluently XL v3 inpaint":
blurred_mask = model_pipeline.mask_processor.blur(mask_image_pil, blur_factor=blur_factor)
images = model_pipeline(
prompt=prompt,
image=inpaint_image_pil,
mask_image=blurred_mask,
negative_prompt=negative_prompt,
width=width,
height=height,
guidance_scale=guidance_scale,
num_inference_steps=30,
strength=strength,
num_images_per_prompt=1,
output_type="pil",
).images
# Unload the model from the device
model_pipeline.to("cpu")
torch.cuda.empty_cache()
img = images[0]
img_byte_arr = io.BytesIO()
img.save(img_byte_arr, format='PNG')
img_byte_arr.seek(0)
return StreamingResponse(img_byte_arr, media_type="image/png")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)
|