Image-engine / main.py
saicharan1234's picture
Update main.py
84ec534 verified
raw
history blame
7.81 kB
import os
from fastapi import FastAPI, File, UploadFile, Form
from fastapi.responses import StreamingResponse
import torch
from diffusers import StableDiffusionPipeline, StableDiffusionXLPipeline, EulerAncestralDiscreteScheduler, DPMSolverSinglestepScheduler
from diffusers.pipelines import StableDiffusionInpaintPipeline, StableDiffusionXLInpaintPipeline
from huggingface_hub import hf_hub_download
import numpy as np
import random
from PIL import Image
import io
app = FastAPI()
MAX_SEED = np.iinfo(np.int32).max
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# Set directory for model storage
MODEL_DIR = "/data"
# Ensure model directory exists
os.makedirs(MODEL_DIR, exist_ok=True)
# Download models to local directory
HF_TOKEN = os.getenv("HF_TOKEN") # Replace with your actual token
def download_model(repo_id, filename=None, model_dir=MODEL_DIR, token=HF_TOKEN):
if filename:
return hf_hub_download(repo_id=repo_id, filename=filename, local_dir=model_dir, token=token)
return hf_hub_download(repo_id=repo_id, local_dir=model_dir, token=token)
# Paths for models
paths = {
"Fluently XL Final": download_model("fluently/Fluently-XL-Final", "FluentlyXL-Final.safetensors"),
"Fluently Anime": download_model("fluently/Fluently-anime"),
"Fluently Epic": download_model("fluently/Fluently-epic"),
"Fluently XL v4": download_model("fluently/Fluently-XL-v4"),
"Fluently XL v3 Lightning": download_model("fluently/Fluently-XL-v3-lightning"),
"Fluently v4 inpaint": download_model("fluently/Fluently-v4-inpainting"),
"Fluently XL v3 inpaint": download_model("fluently/Fluently-XL-v3-inpainting", "FluentlyXL-v3-inpainting.safetensors"),
}
# Function to load model dynamically
def load_model(model_name):
if model_name == "Fluently XL Final":
model = StableDiffusionXLPipeline.from_single_file(
paths[model_name],
torch_dtype=torch.float16,
use_safetensors=True,
)
model.scheduler = EulerAncestralDiscreteScheduler.from_config(model.scheduler.config)
elif model_name == "Fluently Anime":
model = StableDiffusionPipeline.from_pretrained(
paths[model_name],
torch_dtype=torch.float16,
use_safetensors=True,
)
model.scheduler = EulerAncestralDiscreteScheduler.from_config(model.scheduler.config)
elif model_name == "Fluently Epic":
model = StableDiffusionPipeline.from_pretrained(
paths[model_name],
torch_dtype=torch.float16,
use_safetensors=True,
)
model.scheduler = EulerAncestralDiscreteScheduler.from_config(model.scheduler.config)
elif model_name == "Fluently XL v4":
model = StableDiffusionXLPipeline.from_pretrained(
paths[model_name],
torch_dtype=torch.float16,
use_safetensors=True,
)
model.scheduler = EulerAncestralDiscreteScheduler.from_config(model.scheduler.config)
elif model_name == "Fluently XL v3 Lightning":
model = StableDiffusionXLPipeline.from_pretrained(
paths[model_name],
torch_dtype=torch.float16,
use_safetensors=True,
)
model.scheduler = DPMSolverSinglestepScheduler.from_config(model.scheduler.config, use_karras_sigmas=False, timestep_spacing="trailing", lower_order_final=True)
elif model_name in ["Fluently v4 inpaint", "Fluently XL v3 inpaint"]:
if model_name == "Fluently v4 inpaint":
model = StableDiffusionInpaintPipeline.from_pretrained(
paths[model_name],
torch_dtype=torch.float16,
use_safetensors=True,
)
else:
model = StableDiffusionXLInpaintPipeline.from_single_file(
paths[model_name],
torch_dtype=torch.float16,
use_safetensors=True,
)
else:
raise ValueError(f"Model {model_name} not found")
model.to(device)
return model
def randomize_seed_fn(seed: int, randomize_seed: bool) -> int:
if randomize_seed:
seed = random.randint(0, MAX_SEED)
return seed
@app.post("/generate")
async def generate(
model: str = Form(...),
prompt: str = Form(...),
negative_prompt: str = Form(""),
use_negative_prompt: bool = Form(False),
seed: int = Form(0),
width: int = Form(1024),
height: int = Form(1024),
guidance_scale: float = Form(3),
randomize_seed: bool = Form(False),
inpaint_image: UploadFile = File(None),
mask_image: UploadFile = File(None),
blur_factor: float = Form(1.0),
strength: float = Form(0.75)
):
seed = int(randomize_seed_fn(seed, randomize_seed))
if not use_negative_prompt:
negative_prompt = ""
inpaint_image_pil = Image.open(io.BytesIO(await inpaint_image.read())) if inpaint_image else None
mask_image_pil = Image.open(io.BytesIO(await mask_image.read())) if mask_image else None
model_pipeline = load_model(model)
if model == "Fluently XL Final":
images = model_pipeline(
prompt=prompt,
negative_prompt=negative_prompt,
width=width,
height=height,
guidance_scale=guidance_scale,
num_inference_steps=25,
num_images_per_prompt=1,
output_type="pil",
).images
elif model == "Fluently Anime":
images = model_pipeline(
prompt=prompt,
negative_prompt=negative_prompt,
width=width,
height=height,
guidance_scale=guidance_scale,
num_inference_steps=30,
num_images_per_prompt=1,
output_type="pil",
).images
elif model == "Fluently Epic":
images = model_pipeline(
prompt=prompt,
negative_prompt=negative_prompt,
width=width,
height=height,
guidance_scale=guidance_scale,
num_inference_steps=30,
num_images_per_prompt=1,
output_type="pil",
).images
elif model == "Fluently XL v4":
images = model_pipeline(
prompt=prompt,
negative_prompt=negative_prompt,
width=width,
height=height,
guidance_scale=guidance_scale,
num_inference_steps=25,
num_images_per_prompt=1,
output_type="pil",
).images
elif model == "Fluently XL v3 Lightning":
images = model_pipeline(
prompt=prompt,
negative_prompt=negative_prompt,
width=width,
height=height,
guidance_scale=2,
num_inference_steps=5,
num_images_per_prompt=1,
output_type="pil",
).images
elif model == "Fluently v4 inpaint" or model == "Fluently XL v3 inpaint":
blurred_mask = model_pipeline.mask_processor.blur(mask_image_pil, blur_factor=blur_factor)
images = model_pipeline(
prompt=prompt,
image=inpaint_image_pil,
mask_image=blurred_mask,
negative_prompt=negative_prompt,
width=width,
height=height,
guidance_scale=guidance_scale,
num_inference_steps=30,
strength=strength,
num_images_per_prompt=1,
output_type="pil",
).images
# Unload the model from the device
model_pipeline.to("cpu")
torch.cuda.empty_cache()
img = images[0]
img_byte_arr = io.BytesIO()
img.save(img_byte_arr, format='PNG')
img_byte_arr.seek(0)
return StreamingResponse(img_byte_arr, media_type="image/png")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)