Kokoro-API-5 / code.py
Yaron Koresh
Update code.py
92f372e verified
raw
history blame
13.5 kB
# built-in
import os
import subprocess
import logging
import re
import random
import string
import requests
import sys
import warnings
# external
#import spaces
import gradio as gr
import numpy as np
from lxml.html import fromstring
#from transformers import pipeline
#from diffusers.pipelines.flux import FluxPipeline
from diffusers.utils import export_to_gif, load_image
from diffusers.models.modeling_utils import ModelMixin
from huggingface_hub import hf_hub_download
from safetensors.torch import load_file, save_file
from diffusers import DiffusionPipeline, AnimateDiffPipeline, MotionAdapter, EulerDiscreteScheduler, DDIMScheduler, StableDiffusionXLPipeline, UNet2DConditionModel, AutoencoderKL, UNet3DConditionModel
#import jax
#import jax.numpy as jnp
from numba import njit as cpu1, jit as cpu2, cuda
from numba.cuda import jit as gpu
# optimization:
# @gpu(cache=True)
# @cpu1(cache=True,nopython=True,parallel=True)
# @cpu2(cache=True,nopython=True,parallel=True)
# @cpu1(cache=True)
# @cpu2(cache=True)
# logging
warnings.filterwarnings("ignore")
root = logging.getLogger()
root.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('\n >>> [%(levelname)s] %(asctime)s %(name)s: %(message)s\n')
handler.setFormatter(formatter)
root.addHandler(handler)
handler2 = logging.StreamHandler(sys.stderr)
handler2.setLevel(logging.DEBUG)
formatter = logging.Formatter('\n >>> [%(levelname)s] %(asctime)s %(name)s: %(message)s\n')
handler2.setFormatter(formatter)
root.addHandler(handler2)
# data
last_motion=None
dtype = torch.float16
result=[]
device = "cuda"
#repo = "ByteDance/AnimateDiff-Lightning"
#ckpt = f"animatediff_lightning_{step}step_diffusers.safetensors"
base = "emilianJR/epiCRealism"
#base = "SG161222/Realistic_Vision_V6.0_B1_noVAE"
vae = AutoencoderKL.from_pretrained("stabilityai/sd-vae-ft-mse").to(device, dtype=dtype)
#unet = UNet2DConditionModel.from_config("emilianJR/epiCRealism",subfolder="unet").to(device, dtype).load_state_dict(load_file(hf_hub_download("emilianJR/epiCRealism", "unet/diffusion_pytorch_model.safetensors"), device=device), strict=False)
adapter = MotionAdapter.from_pretrained("guoyww/animatediff-motion-adapter-v1-5-3", torch_dtype=dtype, device=device)
fast=True
fps=10
time=1
width=384
height=768
step=40
accu=10
css="""
input, input::placeholder {
text-align: center !important;
}
*, *::placeholder {
font-family: Suez One !important;
}
h1,h2,h3,h4,h5,h6 {
width: 100%;
text-align: center;
}
footer {
display: none !important;
}
#col-container {
margin: 0 auto;
max-width: 15cm;
}
.image-container {
aspect-ratio: """+str(width)+"/"+str(height)+""" !important;
}
.dropdown-arrow {
display: none !important;
}
*:has(>.btn) {
display: flex;
justify-content: space-evenly;
align-items: center;
}
.btn {
display: flex;
}
"""
js="""
function custom(){
document.querySelector("div#prompt input").setAttribute("maxlength","38")
document.querySelector("div#prompt2 input").setAttribute("maxlength","38")
}
"""
# functionality
@gpu(cache=True)
# @cpu1(cache=True,nopython=True,parallel=True)
# @cpu2(cache=True,nopython=True,parallel=True)
# @cpu1(cache=True)
# @cpu2(cache=True)
def run(*args):
tx = cuda.threadIdx.x
bx = cuda.blockIdx.x
dx = cuda.blockDim.x
pos = tx + bx * dx
cmd=args[0]
result = subprocess.run(cmd, shell=True, capture_output=True, env=None)
if result.returncode != 0:
logging.error(
f"Command '{cmd}' failed with exit status code '{result.returncode}'. Exiting..."
)
sys.exit()
return result
@gpu(cache=True)
# @cpu1(cache=True,nopython=True,parallel=True)
# @cpu2(cache=True,nopython=True,parallel=True)
# @cpu1(cache=True)
# @cpu2(cache=True)
def translate(*args):
tx = cuda.threadIdx.x
bx = cuda.blockIdx.x
dx = cuda.blockDim.x
pos = tx + bx * dx
text,lang=args
if text == None or lang == None:
return ""
text = re.sub(f'[{string.punctuation}]', '', re.sub('[\s+]', ' ', text)).lower().strip()
lang = re.sub(f'[{string.punctuation}]', '', re.sub('[\s+]', ' ', lang)).lower().strip()
if text == "" or lang == "":
return ""
if len(text) > 38:
raise Exception("Translation Error: Too long text!")
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 13_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15'
]
padded_chars = re.sub("[(^\-)(\-$)]","",text.replace("","-").replace("- -"," ")).strip()
query_text = f'Please translate {padded_chars}, into {lang}'
url = f'https://www.google.com/search?q={query_text}'
resp = requests.get(
url = url,
headers = {
'User-Agent': random.choice(user_agents)
}
)
content = resp.content
html = fromstring(content)
translated = text
try:
src_lang = html.xpath('//*[@class="source-language"]')[0].text_content().lower().strip()
trgt_lang = html.xpath('//*[@class="target-language"]')[0].text_content().lower().strip()
src_text = html.xpath('//*[@id="tw-source-text"]/*')[0].text_content().lower().strip()
trgt_text = html.xpath('//*[@id="tw-target-text"]/*')[0].text_content().lower().strip()
if trgt_lang == lang:
translated = trgt_text
except:
print(f'Translation Warning: Failed To Translate!')
ret = re.sub(f'[{string.punctuation}]', '', re.sub('[\s+]', ' ', translated)).lower().strip()
print(ret)
return ret
@gpu(cache=True)
# @cpu1(cache=True,nopython=True,parallel=True)
# @cpu2(cache=True,nopython=True,parallel=True)
# @cpu1(cache=True)
# @cpu2(cache=True)
def generate_random_string(*args):
tx = cuda.threadIdx.x
bx = cuda.blockIdx.x
dx = cuda.blockDim.x
pos = tx + bx * dx
length=args[0]
characters = string.ascii_letters + string.digits
return ''.join(random.choice(characters) for _ in range(length))
@gpu(cache=True)
# @cpu1(cache=True,nopython=True,parallel=True)
# @cpu2(cache=True,nopython=True,parallel=True)
# @cpu1(cache=True)
# @cpu2(cache=True)
def Piper(*args):
tx = cuda.threadIdx.x
bx = cuda.blockIdx.x
dx = cuda.blockDim.x
pos = tx + bx * dx
image,positive,negative,motion=args
global last_motion
global ip_loaded
if last_motion != motion:
pipe.unload_lora_weights()
if motion != "":
pipe.load_lora_weights(motion, adapter_name="motion")
pipe.fuse_lora()
pipe.set_adapters(["motion"], [0.7])
last_motion = motion
pipe.to(device,dtype)
if negative=="":
return pipe(
prompt=positive,
height=height,
width=width,
ip_adapter_image=image.convert("RGB").resize((width,height)),
num_inference_steps=step,
guidance_scale=accu,
num_frames=(fps*time)
)
return pipe(
prompt=positive,
negative_prompt=negative,
height=height,
width=width,
ip_adapter_image=image.convert("RGB").resize((width,height)),
num_inference_steps=step,
guidance_scale=accu,
num_frames=(fps*time)
)
@gpu(cache=True)
# @cpu1(cache=True,nopython=True,parallel=True)
# @cpu2(cache=True,nopython=True,parallel=True)
# @cpu1(cache=True)
# @cpu2(cache=True)
def infer(args):
tx = cuda.threadIdx.x
bx = cuda.blockIdx.x
dx = cuda.blockDim.x
pos = tx + bx * dx
pm = args[0]
print("infer: started")
p1 = pm["p"]
name = generate_random_string[32,32](12)+".png"
neg = pm["n"]
if neg != "":
neg = f"{neg} where in the image"
_do = ['photographed', 'realistic', 'dynamic poze', 'deep field', 'reasonable', "natural", 'rough', 'best quality', 'focused', "highly detailed"]
if p1 != "":
_do.append(f"a new {p1} content in the image")
posi = ", ".join(_do)
if pm["i"] == None:
return None
out = Piper[32,32](pm["i"],posi,neg,pm["m"])
export_to_gif(out.frames[0],name,fps=fps)
return name
@gpu(cache=True)
# @cpu1(cache=True,nopython=True,parallel=True)
# @cpu2(cache=True,nopython=True,parallel=True)
# @cpu1(cache=True)
# @cpu2(cache=True)
def handle(*args):
tx = cuda.threadIdx.x
bx = cuda.blockIdx.x
dx = cuda.blockDim.x
pos = tx + bx * dx
i,m,p1,p2,*result=args
p1_en = translate[32,32](p1,"english")
p2_en = translate[32,32](p2,"english")
pm = {"p":p1_en,"n":p2_en,"m":m,"i":i}
ln = len(result)
rng = list(range(ln))
arr = [pm for _ in rng]
#with Pool(f'{ ln }:ppn=2', queue='productionQ', timelimit='5:00:00', workdir='.') as pool:
#return pool.map(infer,arr)
ret = infer[32+ln,32](pm)
return ret
@gpu(cache=True)
# @cpu1(cache=True,nopython=True,parallel=True)
# @cpu2(cache=True,nopython=True,parallel=True)
# @cpu1(cache=True)
# @cpu2(cache=True)
def ui():
tx = cuda.threadIdx.x
bx = cuda.blockIdx.x
dx = cuda.blockDim.x
pos = tx + bx * dx
with gr.Blocks(theme=gr.themes.Soft(),css=css,js=js) as demo:
with gr.Column(elem_id="col-container"):
gr.Markdown(f"""
# MULTI-LANGUAGE IMAGE GENERATOR
""")
with gr.Row():
img = gr.Image(label="STATIC PHOTO",show_label=True,container=True,type="pil")
with gr.Row():
prompt = gr.Textbox(
elem_id="prompt",
placeholder="INCLUDE",
container=False,
max_lines=1
)
with gr.Row():
prompt2 = gr.Textbox(
elem_id="prompt2",
placeholder="EXCLUDE",
container=False,
max_lines=1
)
with gr.Row():
motion = gr.Dropdown(
label='CAMERA',
show_label=True,
container=True,
choices=[
("(No Effect)", ""),
("Zoom in", "guoyww/animatediff-motion-lora-zoom-in"),
("Zoom out", "guoyww/animatediff-motion-lora-zoom-out"),
("Tilt up", "guoyww/animatediff-motion-lora-tilt-up"),
("Tilt down", "guoyww/animatediff-motion-lora-tilt-down"),
("Pan left", "guoyww/animatediff-motion-lora-pan-left"),
("Pan right", "guoyww/animatediff-motion-lora-pan-right"),
("Roll left", "guoyww/animatediff-motion-lora-rolling-anticlockwise"),
("Roll right", "guoyww/animatediff-motion-lora-rolling-clockwise"),
],
value="",
interactive=True
)
with gr.Row():
run_button = gr.Button("START",elem_classes="btn",scale=0)
with gr.Row():
result.append(gr.Image(interactive=False,elem_classes="image-container", label="Result", show_label=False, type='filepath', show_share_button=False))
result.append(gr.Image(interactive=False,elem_classes="image-container", label="Result", show_label=False, type='filepath', show_share_button=False))
gr.on(
triggers=[run_button.click, prompt.submit, prompt2.submit],
fn=handle[32,32],inputs=[img,motion,prompt,prompt2,*result],outputs=result
)
demo.queue().launch()
@gpu(cache=True)
# @cpu1(cache=True,nopython=True,parallel=True)
# @cpu2(cache=True,nopython=True,parallel=True)
# @cpu1(cache=True)
# @cpu2(cache=True)
def pre():
tx = cuda.threadIdx.x
bx = cuda.blockIdx.x
dx = cuda.blockDim.x
pos = tx + bx * dx
pipe = AnimateDiffPipeline.from_pretrained(base, vae=vae, motion_adapter=adapter, torch_dtype=dtype).to(device)
pipe.scheduler = DDIMScheduler(
clip_sample=False,
beta_start=0.00085,
beta_end=0.012,
beta_schedule="linear",
timestep_spacing="trailing",
steps_offset=1
)
pipe.unet.load_state_dict(load_file(hf_hub_download(repo, ckpt), device=device), strict=False)
pipe.load_ip_adapter("h94/IP-Adapter", subfolder="models", weight_name="ip-adapter_sd15.bin")
pipe.enable_vae_slicing()
pipe.enable_free_init(method="butterworth", use_fast_sampling=fast)
@gpu(cache=True)
# @cpu1(cache=True,nopython=True,parallel=True)
# @cpu2(cache=True,nopython=True,parallel=True)
# @cpu1(cache=True)
# @cpu2(cache=True)
def entry():
os.chdir(os.path.abspath(os.path.dirname(__file__)))
mp.set_start_method("spawn", force=True)
pre[32,32]()
ui[32,32]()
# entry
entry[32,32]()
# end