Spaces:
Running
Running
import os | |
import re | |
import spaces | |
import random | |
import string | |
import torch | |
import requests | |
import gradio as gr | |
import numpy as np | |
from lxml.html import fromstring | |
from transformers import pipeline | |
from torch import multiprocessing as mp | |
#from torch.multiprocessing import Pool | |
#from pathos.multiprocessing import ProcessPool as Pool | |
from pathos.threading import ThreadPool as Pool | |
from diffusers.pipelines.flux import FluxPipeline | |
from diffusers.utils import export_to_gif, load_image | |
from huggingface_hub import hf_hub_download | |
from safetensors.torch import load_file | |
from diffusers import DiffusionPipeline, AnimateDiffPipeline, MotionAdapter, EulerDiscreteScheduler, StableDiffusionXLPipeline, UNet2DConditionModel | |
import jax | |
import jax.numpy as jnp | |
def forest_schnell(): | |
PIPE = DiffusionPipeline.from_pretrained("black-forest-labs/FLUX.1-schnell", torch_dtype=torch.bfloat16, token=os.getenv("hf_token")).to("cuda") | |
return PIPE | |
def translate(text,lang): | |
if text == None or lang == None: | |
return "" | |
text = re.sub(f'[{string.punctuation}]', '', re.sub('[\s+]', ' ', text)).lower().strip() | |
lang = re.sub(f'[{string.punctuation}]', '', re.sub('[\s+]', ' ', lang)).lower().strip() | |
if text == "" or lang == "": | |
return "" | |
if len(text) > 38: | |
raise Exception("Translation Error: Too long text!") | |
user_agents = [ | |
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36', | |
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36', | |
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36', | |
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15', | |
'Mozilla/5.0 (Macintosh; Intel Mac OS X 13_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15' | |
] | |
padded_chars = re.sub("[(^\-)(\-$)]","",text.replace("","-").replace("- -"," ")).strip() | |
query_text = f'Please translate {padded_chars}, into {lang}' | |
url = f'https://www.google.com/search?q={query_text}' | |
resp = requests.get( | |
url = url, | |
headers = { | |
'User-Agent': random.choice(user_agents) | |
} | |
) | |
content = resp.content | |
html = fromstring(content) | |
translated = text | |
try: | |
src_lang = html.xpath('//*[@class="source-language"]')[0].text_content().lower().strip() | |
trgt_lang = html.xpath('//*[@class="target-language"]')[0].text_content().lower().strip() | |
src_text = html.xpath('//*[@id="tw-source-text"]/*')[0].text_content().lower().strip() | |
trgt_text = html.xpath('//*[@id="tw-target-text"]/*')[0].text_content().lower().strip() | |
if trgt_lang == lang: | |
translated = trgt_text | |
except: | |
print(f'Translation Warning: Failed To Translate!') | |
ret = re.sub(f'[{string.punctuation}]', '', re.sub('[\s+]', ' ', translated)).lower().strip() | |
print(ret) | |
return ret | |
def progress_callback(i, t, z): | |
global progress | |
progress((i+1, step)) | |
def generate_random_string(length): | |
characters = string.ascii_letters + string.digits | |
return ''.join(random.choice(characters) for _ in range(length)) | |
def Piper(name,positive_prompt,motion): | |
global step | |
global fps | |
global time | |
global last_motion | |
print("starting piper") | |
if motion_loaded != motion: | |
pipe.unload_lora_weights() | |
if motion != "": | |
pipe.load_lora_weights(motion, adapter_name="motion") | |
pipe.set_adapters(["motion"], [0.7]) | |
last_motion = motion | |
out = pipe( | |
positive_prompt, | |
height=512, | |
width=512, | |
num_inference_steps=step, | |
guidance_scale=1, | |
callback=progress_callback, | |
callback_step=1, | |
frames=fps*time | |
) | |
export_to_gif(out.frames[0],name,fps=fps) | |
return name | |
css=""" | |
input, input::placeholder { | |
text-align: center !important; | |
} | |
*, *::placeholder { | |
direction: ltr !important; | |
font-family: Suez One !important; | |
} | |
h1,h2,h3,h4,h5,h6,span,p,pre { | |
width: 100% !important; | |
text-align: center !important; | |
display: block !important; | |
} | |
footer { | |
display: none !important; | |
} | |
#col-container { | |
margin: 0 auto !important; | |
max-width: 15cm !important; | |
} | |
.image-container { | |
aspect-ratio: 512 / 512 !important; | |
} | |
.dropdown-arrow { | |
display: none !important; | |
} | |
*:has(.btn), .btn { | |
width: 100% !important; | |
margin: 0 auto !important; | |
} | |
""" | |
js=""" | |
function custom(){ | |
document.querySelector("div#prompt input").setAttribute("maxlength","38") | |
document.querySelector("div#prompt2 input").setAttribute("maxlength","38") | |
} | |
""" | |
def infer(pm): | |
print("infer: started") | |
p1 = pm["p"] | |
name = generate_random_string(12)+".png" | |
_do = ['beautiful', 'playful', 'photographed', 'realistic', 'dynamic poze', 'deep field', 'reasonable coloring', 'rough texture', 'best quality', 'focused'] | |
if p1 != "": | |
_do.append(f'{p1}') | |
posi = " ".join(_do) | |
return Piper(name,posi,pm["m"]) | |
def run(m,p1,*result): | |
p1_en = translate(p1,"english") | |
pm = {"p":p1_en,"m":m} | |
ln = len(result) | |
print("images: "+str(ln)) | |
rng = list(range(ln)) | |
arr = [pm for _ in rng] | |
pool = Pool(ln) | |
out = list(pool.imap(infer,arr)) | |
pool.close() | |
pool.join() | |
pool.clear() | |
return out | |
def main(): | |
global result | |
global pipe | |
global device | |
global step | |
global dtype | |
global progress | |
global fps | |
global time | |
global last_motion | |
last_motion=None | |
fps=40 | |
time=5 | |
device = "cuda" | |
dtype = torch.float16 | |
result=[] | |
step = 2 | |
progress=gr.Progress() | |
progress((0, step)) | |
base = "stabilityai/stable-diffusion-xl-base-1.0" | |
repo = "ByteDance/SDXL-Lightning" | |
ckpt = f"sdxl_lightning_{step}step_unet.safetensors" | |
unet = UNet2DConditionModel.from_config(base, subfolder="unet").to(device, dtype) | |
unet.load_state_dict(torch.load(hf_hub_download(repo, ckpt), map_location=device), strict=False) | |
repo = "ByteDance/AnimateDiff-Lightning" | |
ckpt = f"animatediff_lightning_{step}step_diffusers.safetensors" | |
adapter = MotionAdapter().to(device, dtype) | |
adapter.load_state_dict(load_file(hf_hub_download(repo ,ckpt), device=device)) | |
pipe = AnimateDiffPipeline.from_pretrained(base, unet=unet, motion_adapter=adapter, torch_dtype=dtype, variant="fp16").to(device, dtype) | |
pipe.scheduler = EulerDiscreteScheduler.from_config(pipe.scheduler.config, timestep_spacing="trailing", beta_schedule="linear") | |
mp.set_start_method("spawn", force=True) | |
with gr.Blocks(theme=gr.themes.Soft(),css=css,js=js) as demo: | |
with gr.Column(elem_id="col-container"): | |
gr.Markdown(f""" | |
# MULTI-LANGUAGE IMAGE GENERATOR | |
""") | |
with gr.Row(): | |
prompt = gr.Textbox( | |
elem_id="prompt", | |
placeholder="DESCRIPTION", | |
container=False, | |
max_lines=1 | |
) | |
with gr.Row(): | |
motion = gr.Dropdown( | |
label='Motion', | |
choices=[ | |
("Default", ""), | |
("Zoom in", "guoyww/animatediff-motion-lora-zoom-in"), | |
("Zoom out", "guoyww/animatediff-motion-lora-zoom-out"), | |
("Tilt up", "guoyww/animatediff-motion-lora-tilt-up"), | |
("Tilt down", "guoyww/animatediff-motion-lora-tilt-down"), | |
("Pan left", "guoyww/animatediff-motion-lora-pan-left"), | |
("Pan right", "guoyww/animatediff-motion-lora-pan-right"), | |
("Roll left", "guoyww/animatediff-motion-lora-rolling-anticlockwise"), | |
("Roll right", "guoyww/animatediff-motion-lora-rolling-clockwise"), | |
], | |
value="", | |
interactive=True | |
) | |
with gr.Row(): | |
run_button = gr.Button("START",elem_classes="btn",scale=0) | |
with gr.Row(): | |
result.append(gr.Image(interactive=False,elem_classes="image-container", label="Result", show_label=False, type='filepath', show_share_button=False)) | |
result.append(gr.Image(interactive=False,elem_classes="image-container", label="Result", show_label=False, type='filepath', show_share_button=False)) | |
gr.on( | |
triggers=[run_button.click, prompt.submit], | |
fn=run,inputs=[motion,prompt,*result],outputs=result | |
) | |
demo.queue().launch() | |
if __name__ == "__main__": | |
main() |