Kokoro-API-5 / app.py
Yaron Koresh
Update app.py
b8d8aa1 verified
raw
history blame
14.5 kB
# built-in
import os
import subprocess
import logging
import re
import random
from string import ascii_letters, digits, punctuation
import requests
import sys
import warnings
import time
from concurrent.futures import ProcessPoolExecutor
import threading
import asyncio
from queue import Queue as BlockingQueue
# external
import spaces
import torch
import gradio as gr
from numpy import asarray as array
from lxml.html import fromstring
from diffusers.utils import export_to_gif, load_image
from huggingface_hub import hf_hub_download
from safetensors.torch import load_file, save_file
from diffusers import DiffusionPipeline, AnimateDiffPipeline, MotionAdapter, EulerDiscreteScheduler, DDIMScheduler, StableDiffusionXLPipeline, UNet2DConditionModel, AutoencoderKL, UNet3DConditionModel
from functools import partial
# logging
warnings.filterwarnings("ignore")
root = logging.getLogger()
root.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('\n >>> [%(levelname)s] %(asctime)s %(name)s: %(message)s\n')
handler.setFormatter(formatter)
root.addHandler(handler)
handler2 = logging.StreamHandler(sys.stderr)
handler2.setLevel(logging.DEBUG)
formatter = logging.Formatter('\n >>> [%(levelname)s] %(asctime)s %(name)s: %(message)s\n')
handler2.setFormatter(formatter)
root.addHandler(handler2)
# constant data
dtype = torch.float16
device = "cuda"
#repo = "ByteDance/AnimateDiff-Lightning"
#ckpt = f"animatediff_lightning_{step}step_diffusers.safetensors"
base = "emilianJR/epiCRealism"
#base = "SG161222/Realistic_Vision_V6.0_B1_noVAE"
vae = AutoencoderKL.from_pretrained("stabilityai/sd-vae-ft-mse").to(device, dtype=dtype)
#unet = UNet2DConditionModel.from_config("emilianJR/epiCRealism",subfolder="unet").to(device, dtype).load_state_dict(load_file(hf_hub_download("emilianJR/epiCRealism", "unet/diffusion_pytorch_model.safetensors"), device=device), strict=False)
adapter = MotionAdapter.from_pretrained("guoyww/animatediff-motion-adapter-v1-5-3", torch_dtype=dtype, device=device)
# variable data
last_motion=""
result = []
# precision data
fast=True
fps=15
time=3
width=896
height=896
step=40
accu=10
# ui data
css="".join(["""
input, input::placeholder {
text-align: center !important;
}
*, *::placeholder {
font-family: Suez One !important;
}
h1,h2,h3,h4,h5,h6 {
width: 100%;
text-align: center;
}
footer {
display: none !important;
}
#col-container {
margin: 0 auto;
max-width: 15cm;
}
.image-container {
aspect-ratio: """,str(width),"/",str(height),""" !important;
}
.dropdown-arrow {
display: none !important;
}
*:has(>.btn) {
display: flex;
justify-content: space-evenly;
align-items: center;
}
.btn {
display: flex;
}
"""])
js="""
function custom(){
document.querySelector("div#prompt input").setAttribute("maxlength","38")
document.querySelector("div#prompt2 input").setAttribute("maxlength","38")
}
"""
# torch pipe
pipe = AnimateDiffPipeline.from_pretrained(base, vae=vae, motion_adapter=adapter, torch_dtype=dtype).to(device)
pipe.scheduler = DDIMScheduler(
clip_sample=False,
beta_start=0.00085,
beta_end=0.012,
beta_schedule="linear",
timestep_spacing="trailing",
steps_offset=1
)
#pipe.unet.load_state_dict(load_file(hf_hub_download(repo, ckpt), device=device), strict=False)
pipe.load_ip_adapter("h94/IP-Adapter", subfolder="models", weight_name="ip-adapter_sd15.bin")
pipe.enable_vae_slicing()
pipe.enable_free_init(method="butterworth", use_fast_sampling=fast)
# Threading
class TwoSidedQueue:
def __init__(self, queue_in, queue_out):
self._queue_in = queue_in
self._queue_out = queue_out
self._sides = {
'empty': queue_out,
'full': queue_out,
'get': queue_in,
'get_nowait': queue_in,
'join': queue_out,
'put': queue_out,
'put_nowait': queue_out,
'qsize': queue_out,
'task_done': queue_in,
}
def __getattr__(self, name):
return getattr(self._sides.get(name, self._queue_in), name)
class LaunchAsync:
def __init__(self, coro, *args, **kwargs):
self._coro = coro
self._args = args
self._kwargs = kwargs
self._thread = None
self._loop = None
self._task = None
self._queue_in = None
self._queue_out = None
self._size = 0
def size(self, size):
self._size = size or 0
return self
def put(self, data, *, timeout=None):
"""
`put` data in for the `coro` to `get` out. Will block if the maximum `size` was reached.
Does nothing if the `coro` is dead.
"""
try:
return asyncio.run_coroutine_threadsafe(self._queue_out.put(data), self._loop).result(timeout)
except RuntimeError:
if self._loop.is_running():
raise
else:
return None
def get(self, *, timeout=None):
"""
`get` data out of the `coro` it `put` in. Will block if the queue is empty.
Returns `None` if the `coro` is dead.
"""
try:
return asyncio.run_coroutine_threadsafe(self._queue_in.get(), self._loop).result(timeout)
except RuntimeError:
if self._loop.is_running():
raise
else:
return None
def dead(self):
"""
Return `true` if the other side is dead (the `coro` has exited, with or without error).
"""
return not self._loop.is_running()
def __enter__(self):
# asyncio.run is used as it's a battle-tested way to safely set up a new loop and tear
# it down. However it does mean it's necessary to wait for the task to run before it's
# possible to get said loop and task back. For this, the usual blocking queue is used.
oneshot = BlockingQueue(1)
self._thread = threading.Thread(target=asyncio.run, args=(
self._run(self._coro, self._size, oneshot, self._args, self._kwargs),))
self._thread.start()
self._loop, self._task, self._queue_in, self._queue_out = oneshot.get()
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
try:
self._loop.call_soon_threadsafe(self._task.cancel)
except RuntimeError:
if self._loop.is_running():
raise
finally:
self._thread.join()
@staticmethod
async def _run(coro, size, oneshot, args, kwargs):
# asyncio.Queue's are created here so that they pick up the right loop.
queue_in, queue_out = asyncio.Queue(size), asyncio.Queue(size)
oneshot.put((asyncio.get_event_loop(), asyncio.current_task(), queue_in, queue_out))
try:
# `queue_in` and `queue_out` are intentionally swapped here.
await coro(TwoSidedQueue(queue_out, queue_in), *args, **kwargs)
except asyncio.CancelledError:
pass
class Command:
def __init__(self, func, data=None):
self.func = func
self.data = data
def parallel(*pairs):
if len(pairs) == 0:
return
if len(pairs) == 1:
pairs = pairs[0]
async def async_main(queue):
while True:
command = await queue.get()
await queue.put(command.func(*command.data))
with LaunchAsync(async_main) as queue:
for pair in pairs:
f = pair.pop(0)
queue.put(Command(f, pair))
response = queue.get()
return response
# functionality
def run(cmd):
return str(subprocess.run(cmd, shell=True, capture_output=True, env=None).stdout)
def xpath_finder(str,pattern):
try:
return ""+fromstring(str).xpath(pattern)[0].text_content().lower().strip()
except:
return ""
def translate(text,lang):
if text == None or lang == None:
return ""
text = re.sub(f'[{punctuation}]', '', re.sub('[\s+]', ' ', text)).lower().strip()
lang = re.sub(f'[{punctuation}]', '', re.sub('[\s+]', ' ', lang)).lower().strip()
if text == "" or lang == "":
return ""
if len(text) > 38:
raise Exception("Translation Error: Too long text!")
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 13_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15'
]
padded_chars = re.sub("[(^\-)(\-$)]","",text.replace("","-").replace("- -"," ")).strip()
query_text = f'Please translate {padded_chars}, into {lang}'
url = f'https://www.google.com/search?q={query_text}'
content = str(requests.get(
url = url,
headers = {
'User-Agent': random.choice(user_agents)
}
).content)
translated = text
src_lang = xpath_finder(content,'//*[@class="source-language"]')
trgt_lang = xpath_finder(content,'//*[@class="target-language"]')
src_text = xpath_finder(content,'//*[@id="tw-source-text"]/*')
trgt_text = xpath_finder(content,'//*[@id="tw-target-text"]/*')
if trgt_lang == lang:
translated = trgt_text
ret = re.sub(f'[{punctuation}]', '', re.sub('[\s+]', ' ', translated)).lower().strip()
print(ret)
return ret
def generate_random_string(length):
characters = str(ascii_letters + digits)
return ''.join(random.choice(characters) for _ in range(length))
@spaces.GPU(duration=120)
def calc(img,p1,p2,motion):
global last_motion
global pipe
if last_motion != motion:
pipe.unload_lora_weights()
if inp[3] != "":
pipe.load_lora_weights(motion, adapter_name="motion")
pipe.fuse_lora()
pipe.set_adapters("motion", [0.7])
last_motion = motion
pipe.to(device,dtype)
if p2=="":
return pipe(
prompt=p1,
height=height,
width=width,
ip_adapter_image=img.convert("RGB").resize((width,height)),
num_inference_steps=step,
guidance_scale=accu,
num_frames=(fps*time)
)
return pipe(
prompt=p1,
negative_prompt=p2,
height=height,
width=width,
ip_adapter_image=img.convert("RGB").resize((width,height)),
num_inference_steps=step,
guidance_scale=accu,
num_frames=(fps*time)
)
def handle(*inp):
inp = list(inp)
inp[1] = translate(inp[1],"english")
inp[2] = translate(inp[2],"english")
if inp[0] == None:
return None
if inp[2] != "":
inp[2] = f"{inp[2]} where in the image"
_do = ['photographed', 'realistic', 'dynamic poze', 'deep field', 'reasonable', "natural", 'rough', 'best quality', 'focused', "highly detailed"]
if inp[1] != "":
_do.append(f"a new {inp[1]} content in the image")
inp[1] = ", ".join(_do)
ln = len(result)
parargs = [[calc,*inp] for i in range(ln)]
out_pipe = parallel(parargs)
for i in range(ln):
name = generate_random_string(12)+".png"
export_to_gif(out_pipe[i].frames[0],name,fps=fps)
out_pipe[i] = name
return out_pipe
def ui():
global result
with gr.Blocks(theme=gr.themes.Soft(),css=css,js=js) as demo:
with gr.Column(elem_id="col-container"):
gr.Markdown(f"""
# MULTI-LANGUAGE GIF CREATOR
""")
with gr.Row():
img = gr.Image(label="STATIC PHOTO",show_label=True,container=True,type="pil")
with gr.Row():
prompt = gr.Textbox(
elem_id="prompt",
placeholder="INCLUDE",
container=False,
max_lines=1
)
with gr.Row():
prompt2 = gr.Textbox(
elem_id="prompt2",
placeholder="EXCLUDE",
container=False,
max_lines=1
)
with gr.Row():
motion = gr.Dropdown(
label='CAMERA',
show_label=True,
container=True,
choices=[
("(No Effect)", ""),
("Zoom in", "guoyww/animatediff-motion-lora-zoom-in"),
("Zoom out", "guoyww/animatediff-motion-lora-zoom-out"),
("Tilt up", "guoyww/animatediff-motion-lora-tilt-up"),
("Tilt down", "guoyww/animatediff-motion-lora-tilt-down"),
("Pan left", "guoyww/animatediff-motion-lora-pan-left"),
("Pan right", "guoyww/animatediff-motion-lora-pan-right"),
("Roll left", "guoyww/animatediff-motion-lora-rolling-anticlockwise"),
("Roll right", "guoyww/animatediff-motion-lora-rolling-clockwise"),
],
value="",
interactive=True
)
with gr.Row():
run_button = gr.Button("START",elem_classes="btn",scale=0)
with gr.Row():
result.append(gr.Image(interactive=False,elem_classes="image-container", label="Result", show_label=False, type='filepath', show_share_button=False))
result.append(gr.Image(interactive=False,elem_classes="image-container", label="Result", show_label=False, type='filepath', show_share_button=False))
gr.on(
triggers=[
run_button.click,
prompt.submit,
prompt2.submit
],
fn=handle,
inputs=[img,prompt,prompt2,motion],
outputs=result
)
demo.queue().launch()
# entry
if __name__ == "__main__":
os.chdir(os.path.abspath(os.path.dirname(__file__)))
ui()
# end