# built-in import os import subprocess import logging import re import random from string import ascii_letters, digits, punctuation import requests import sys import warnings import time from concurrent.futures import ProcessPoolExecutor import threading import asyncio from queue import Queue as BlockingQueue # external import torch import gradio as gr from numpy import asarray as array from lxml.html import fromstring from diffusers.utils import export_to_gif, load_image from huggingface_hub import hf_hub_download from safetensors.torch import load_file, save_file from diffusers import DiffusionPipeline, AnimateDiffPipeline, MotionAdapter, EulerDiscreteScheduler, DDIMScheduler, StableDiffusionXLPipeline, UNet2DConditionModel, AutoencoderKL, UNet3DConditionModel from functools import partial # logging warnings.filterwarnings("ignore") root = logging.getLogger() root.setLevel(logging.DEBUG) handler = logging.StreamHandler(sys.stdout) handler.setLevel(logging.DEBUG) formatter = logging.Formatter('\n >>> [%(levelname)s] %(asctime)s %(name)s: %(message)s\n') handler.setFormatter(formatter) root.addHandler(handler) handler2 = logging.StreamHandler(sys.stderr) handler2.setLevel(logging.DEBUG) formatter = logging.Formatter('\n >>> [%(levelname)s] %(asctime)s %(name)s: %(message)s\n') handler2.setFormatter(formatter) root.addHandler(handler2) # constant data dtype = torch.float16 device = "cuda" #repo = "ByteDance/AnimateDiff-Lightning" #ckpt = f"animatediff_lightning_{step}step_diffusers.safetensors" base = "emilianJR/epiCRealism" #base = "SG161222/Realistic_Vision_V6.0_B1_noVAE" vae = AutoencoderKL.from_pretrained("stabilityai/sd-vae-ft-mse").to(device, dtype=dtype) #unet = UNet2DConditionModel.from_config("emilianJR/epiCRealism",subfolder="unet").to(device, dtype).load_state_dict(load_file(hf_hub_download("emilianJR/epiCRealism", "unet/diffusion_pytorch_model.safetensors"), device=device), strict=False) adapter = MotionAdapter.from_pretrained("guoyww/animatediff-motion-adapter-v1-5-3", torch_dtype=dtype, device=device) # variable data last_motion="" result = [] # precision data fast=True fps=15 time=3 width=384 height=768 step=40 accu=10 # ui data css="".join([""" input, input::placeholder { text-align: center !important; } *, *::placeholder { font-family: Suez One !important; } h1,h2,h3,h4,h5,h6 { width: 100%; text-align: center; } footer { display: none !important; } #col-container { margin: 0 auto; max-width: 15cm; } .image-container { aspect-ratio: """,str(width),"/",str(height),""" !important; } .dropdown-arrow { display: none !important; } *:has(>.btn) { display: flex; justify-content: space-evenly; align-items: center; } .btn { display: flex; } """]) js=""" function custom(){ document.querySelector("div#prompt input").setAttribute("maxlength","38") document.querySelector("div#prompt2 input").setAttribute("maxlength","38") } """ # torch pipe pipe = AnimateDiffPipeline.from_pretrained(base, vae=vae, motion_adapter=adapter, torch_dtype=dtype).to(device) pipe.scheduler = DDIMScheduler( clip_sample=False, beta_start=0.00085, beta_end=0.012, beta_schedule="linear", timestep_spacing="trailing", steps_offset=1 ) #pipe.unet.load_state_dict(load_file(hf_hub_download(repo, ckpt), device=device), strict=False) pipe.load_ip_adapter("h94/IP-Adapter", subfolder="models", weight_name="ip-adapter_sd15.bin") pipe.enable_vae_slicing() pipe.enable_free_init(method="butterworth", use_fast_sampling=fast) # Threading class TwoSidedQueue: def __init__(self, queue_in, queue_out): self._queue_in = queue_in self._queue_out = queue_out self._sides = { 'empty': queue_out, 'full': queue_out, 'get': queue_in, 'get_nowait': queue_in, 'join': queue_out, 'put': queue_out, 'put_nowait': queue_out, 'qsize': queue_out, 'task_done': queue_in, } def __getattr__(self, name): return getattr(self._sides.get(name, self._queue_in), name) class LaunchAsync: def __init__(self, coro, *args, **kwargs): self._coro = coro self._args = args self._kwargs = kwargs self._thread = None self._loop = None self._task = None self._queue_in = None self._queue_out = None self._size = 0 def size(self, size): self._size = size or 0 return self def put(self, data, *, timeout=None): """ `put` data in for the `coro` to `get` out. Will block if the maximum `size` was reached. Does nothing if the `coro` is dead. """ try: return asyncio.run_coroutine_threadsafe(self._queue_out.put(data), self._loop).result(timeout) except RuntimeError: if self._loop.is_running(): raise else: return None def get(self, *, timeout=None): """ `get` data out of the `coro` it `put` in. Will block if the queue is empty. Returns `None` if the `coro` is dead. """ try: return asyncio.run_coroutine_threadsafe(self._queue_in.get(), self._loop).result(timeout) except RuntimeError: if self._loop.is_running(): raise else: return None def dead(self): """ Return `true` if the other side is dead (the `coro` has exited, with or without error). """ return not self._loop.is_running() def __enter__(self): # asyncio.run is used as it's a battle-tested way to safely set up a new loop and tear # it down. However it does mean it's necessary to wait for the task to run before it's # possible to get said loop and task back. For this, the usual blocking queue is used. oneshot = BlockingQueue(1) self._thread = threading.Thread(target=asyncio.run, args=( self._run(self._coro, self._size, oneshot, self._args, self._kwargs),)) self._thread.start() self._loop, self._task, self._queue_in, self._queue_out = oneshot.get() return self def __exit__(self, exc_type, exc_value, exc_traceback): try: self._loop.call_soon_threadsafe(self._task.cancel) except RuntimeError: if self._loop.is_running(): raise finally: self._thread.join() @staticmethod async def _run(coro, size, oneshot, args, kwargs): # asyncio.Queue's are created here so that they pick up the right loop. queue_in, queue_out = asyncio.Queue(size), asyncio.Queue(size) oneshot.put((asyncio.get_event_loop(), asyncio.current_task(), queue_in, queue_out)) try: # `queue_in` and `queue_out` are intentionally swapped here. await coro(TwoSidedQueue(queue_out, queue_in), *args, **kwargs) except asyncio.CancelledError: pass class Command: def __init__(self, func, data=None): self.func = func self.data = data def parallel(*pairs): if len(pairs) == 0: return if len(pairs) == 1: pairs = pairs[0] async def async_main(queue): while True: command = await queue.get() await queue.put(command.func(*command.data)) with LaunchAsync(async_main) as queue: for pair in pairs: f = pair.pop() queue.put(Command(f, pair)) response = queue.get() return response # functionality def run(cmd): return str(subprocess.run(cmd, shell=True, capture_output=True, env=None).stdout) def xpath_finder(str,pattern): try: return ""+fromstring(str).xpath(pattern)[0].text_content().lower().strip() except: return "" def translate(text,lang): if text == None or lang == None: return "" text = re.sub(f'[{punctuation}]', '', re.sub('[\s+]', ' ', text)).lower().strip() lang = re.sub(f'[{punctuation}]', '', re.sub('[\s+]', ' ', lang)).lower().strip() if text == "" or lang == "": return "" if len(text) > 38: raise Exception("Translation Error: Too long text!") user_agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 13_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15' ] padded_chars = re.sub("[(^\-)(\-$)]","",text.replace("","-").replace("- -"," ")).strip() query_text = f'Please translate {padded_chars}, into {lang}' url = f'https://www.google.com/search?q={query_text}' content = str(requests.get( url = url, headers = { 'User-Agent': random.choice(user_agents) } ).content) translated = text src_lang = xpath_finder(content,'//*[@class="source-language"]') trgt_lang = xpath_finder(content,'//*[@class="target-language"]') src_text = xpath_finder(content,'//*[@id="tw-source-text"]/*') trgt_text = xpath_finder(content,'//*[@id="tw-target-text"]/*') if trgt_lang == lang: translated = trgt_text ret = re.sub(f'[{punctuation}]', '', re.sub('[\s+]', ' ', translated)).lower().strip() print(ret) return ret def generate_random_string(length): characters = str(ascii_letters + digits) return ''.join(random.choice(characters) for _ in range(length)) def calc(img,p1,p2,motion): global last_motion global pipe if last_motion != motion: pipe.unload_lora_weights() if inp[3] != "": pipe.load_lora_weights(motion, adapter_name="motion") pipe.fuse_lora() pipe.set_adapters("motion", [0.7]) last_motion = motion pipe.to(device,dtype) if p2=="": return pipe( prompt=p1, height=height, width=width, ip_adapter_image=array2image(img).convert("RGB").resize((width,height)), num_inference_steps=step, guidance_scale=accu, num_frames=(fps*time) ) return pipe( prompt=p1, negative_prompt=p2, height=height, width=width, ip_adapter_image=array2image(img).convert("RGB").resize((width,height)), num_inference_steps=step, guidance_scale=accu, num_frames=(fps*time) ) def handle(*inp): inp = list(inp) inp[1] = translate(inp[1],"english") inp[2] = translate(inp[2],"english") if inp[0] == None: return None if inp[2] != "": inp[2] = f"{inp[2]} where in the image" _do = ['photographed', 'realistic', 'dynamic poze', 'deep field', 'reasonable', "natural", 'rough', 'best quality', 'focused', "highly detailed"] if inp[1] != "": _do.append(f"a new {inp[1]} content in the image") inp[1] = ", ".join(_do) ln = len(result) inp[0] = array(inp[0]) inp[1] = array(inp[1]) inp[2] = array(inp[2]) inp[3] = array(inp[3]) parargs = [[calc,*inp] for i in range(ln)] out_pipe = parallel(parargs) for i in range(ln): name = generate_random_string(12)+".png" export_to_gif(out_pipe[i].frames[0],name,fps=fps) out_pipe[i] = name return out_pipe def ui(): global result with gr.Blocks(theme=gr.themes.Soft(),css=css,js=js) as demo: with gr.Column(elem_id="col-container"): gr.Markdown(f""" # MULTI-LANGUAGE GIF CREATOR """) with gr.Row(): img = gr.Image(label="STATIC PHOTO",show_label=True,container=True,type="pil") with gr.Row(): prompt = gr.Textbox( elem_id="prompt", placeholder="INCLUDE", container=False, max_lines=1 ) with gr.Row(): prompt2 = gr.Textbox( elem_id="prompt2", placeholder="EXCLUDE", container=False, max_lines=1 ) with gr.Row(): motion = gr.Dropdown( label='CAMERA', show_label=True, container=True, choices=[ ("(No Effect)", ""), ("Zoom in", "guoyww/animatediff-motion-lora-zoom-in"), ("Zoom out", "guoyww/animatediff-motion-lora-zoom-out"), ("Tilt up", "guoyww/animatediff-motion-lora-tilt-up"), ("Tilt down", "guoyww/animatediff-motion-lora-tilt-down"), ("Pan left", "guoyww/animatediff-motion-lora-pan-left"), ("Pan right", "guoyww/animatediff-motion-lora-pan-right"), ("Roll left", "guoyww/animatediff-motion-lora-rolling-anticlockwise"), ("Roll right", "guoyww/animatediff-motion-lora-rolling-clockwise"), ], value="", interactive=True ) with gr.Row(): run_button = gr.Button("START",elem_classes="btn",scale=0) with gr.Row(): result.append(gr.Image(interactive=False,elem_classes="image-container", label="Result", show_label=False, type='filepath', show_share_button=False)) result.append(gr.Image(interactive=False,elem_classes="image-container", label="Result", show_label=False, type='filepath', show_share_button=False)) gr.on( triggers=[ run_button.click, prompt.submit, prompt2.submit ], fn=handle, inputs=[img,prompt,prompt2,motion], outputs=result ) demo.queue().launch() # entry if __name__ == "__main__": os.chdir(os.path.abspath(os.path.dirname(__file__))) ui() # end