Spaces:
Running
Running
# Copyright (c) Kuaishou. | |
# | |
# This source code is licensed under the license found in the | |
# LICENSE file in the root directory of this source tree. | |
import os | |
import base64 | |
import numpy as np | |
os.system('pip install huggingface_hub gradio openai keye_vl_utils torch torchvision -U') | |
from argparse import ArgumentParser | |
from pathlib import Path | |
import copy | |
import gradio as gr | |
import os | |
import re | |
import tempfile | |
from openai import OpenAI | |
from PIL import Image | |
from io import BytesIO | |
from keye_vl_utils import fetch_video, process_vision_info | |
def _get_args(): | |
parser = ArgumentParser() | |
parser.add_argument("--share", action="store_true", default=False, | |
help="Create a publicly shareable link for the interface.") | |
parser.add_argument("--inbrowser", action="store_true", default=False, | |
help="Automatically launch the interface in a new tab on the default browser.") | |
parser.add_argument("--server-port", type=int, default=7860, | |
help="Demo server port.") | |
parser.add_argument("--server-name", type=str, default="127.0.0.1", | |
help="Demo server name.") | |
args = parser.parse_args() | |
return args | |
def _parse_text(text): | |
lines = text.split("\n") | |
lines = [line for line in lines if line != ""] | |
count = 0 | |
for i, line in enumerate(lines): | |
line = line.replace("<think>", "**ζθθΏη¨εΌε§**οΌ\n") | |
line = line.replace("</think>", "\n**ζθθΏη¨η»ζ**\n") | |
line = line.replace("<answer>", "**εηεΌε§**οΌ\n") | |
line = line.replace("</answer>", "\n**εηη»ζ**\n") | |
line = line.replace("<analysis>", "**εζεΌε§**οΌ\n") | |
line = line.replace("</analysis>", "\n**εζη»ζ**\n") | |
if "```" in line: | |
count += 1 | |
items = line.split("`") | |
if count % 2 == 1: | |
lines[i] = f'<pre><code class="language-{items[-1]}">' | |
else: | |
lines[i] = f"<br></code></pre>" | |
else: | |
if True or i > 0: | |
if count % 2 == 1: | |
line = line.replace("`", r"\`") | |
line = line.replace("<", "<") | |
line = line.replace(">", ">") | |
line = line.replace(" ", " ") | |
line = line.replace("*", "*") | |
line = line.replace("_", "_") | |
line = line.replace("-", "-") | |
line = line.replace(".", ".") | |
line = line.replace("!", "!") | |
line = line.replace("(", "(") | |
line = line.replace(")", ")") | |
line = line.replace("$", "$") | |
lines[i] = "<br>" + line | |
text = "".join(lines) | |
return text | |
def is_video_file(filename): | |
video_extensions = ['.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm', '.mpeg'] | |
return any(filename.lower().endswith(ext) for ext in video_extensions) | |
def video_processor(video_path): | |
video_inputs, video_sample_fps = fetch_video({"video": video_path, "max_frames": 8, "max_pixels": 256*28*28}, return_video_sample_fps=True) | |
video_input = video_inputs.permute(0, 2, 3, 1).numpy().astype(np.uint8) | |
# encode image with base64 | |
base64_frames = [] | |
for frame in video_input: | |
img = Image.fromarray(frame) | |
output_buffer = BytesIO() | |
img.save(output_buffer, format="jpeg") | |
byte_data = output_buffer.getvalue() | |
base64_str = base64.b64encode(byte_data).decode("utf-8") | |
base64_frames.append(base64_str) | |
video_info = { | |
"type": "video_url", | |
"video_url": {"url": f"data:video/jpeg;base64,{','.join(base64_frames)}"} | |
} | |
return video_info | |
def _launch_demo(args): | |
EP_URL = os.getenv("ENDPOINT_URL") | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
client = OpenAI( | |
base_url=f"{EP_URL}/v1/", | |
api_key=HF_TOKEN | |
) | |
def predict(_chatbot, task_history): | |
chat_query = _chatbot[-1][0] | |
query = task_history[-1][0] | |
if len(chat_query) == 0: | |
_chatbot.pop() | |
task_history.pop() | |
return _chatbot | |
print("User: " + _parse_text(query)) | |
history_cp = copy.deepcopy(task_history) | |
full_response = "" | |
messages = [] | |
content = [] | |
for q, a in history_cp: | |
if isinstance(q, (tuple, list)): | |
if is_video_file(q[0]): | |
video_info = video_processor(q[0]) | |
content.append(video_info) | |
else: | |
# convert image to base64 | |
with open(q[0], 'rb') as img_file: | |
img_base64 = base64.b64encode(img_file.read()).decode('utf-8') | |
content.append({'type': 'image_url', 'image_url': {"url": f"data:image/jpeg;base64,{img_base64}"}}) | |
else: | |
content.append({'type': 'text', 'text': q}) | |
messages.append({'role': 'user', 'content': content}) | |
messages.append({'role': 'assistant', 'content': [{'type': 'text', 'text': a}]}) | |
content = [] | |
messages.pop() | |
responses = client.chat.completions.create( | |
model="Kwai-Keye/Keye-VL-8B-Preview", | |
messages=messages, | |
top_p=0.95, | |
temperature=0.6, | |
stream=True, | |
timeout=360 | |
) | |
response_text = "" | |
for response in responses: | |
response = response.choices[0].delta.content | |
response_text += response | |
_chatbot[-1] = (chat_query, _parse_text(response_text)) | |
yield _chatbot | |
response = response_text | |
_chatbot[-1] = (chat_query, _parse_text(response)) | |
full_response = _parse_text(response) | |
task_history[-1] = (query, full_response) | |
print("Kwai-Keye-VL-Chat: " + full_response) | |
yield _chatbot | |
def regenerate(_chatbot, task_history): | |
if not task_history: | |
return _chatbot | |
item = task_history[-1] | |
if item[1] is None: | |
return _chatbot | |
task_history[-1] = (item[0], None) | |
chatbot_item = _chatbot.pop(-1) | |
if chatbot_item[0] is None: | |
_chatbot[-1] = (_chatbot[-1][0], None) | |
else: | |
_chatbot.append((chatbot_item[0], None)) | |
_chatbot_gen = predict(_chatbot, task_history) | |
for _chatbot in _chatbot_gen: | |
yield _chatbot | |
def add_text(history, task_history, text): | |
task_text = text | |
history = history if history is not None else [] | |
task_history = task_history if task_history is not None else [] | |
history = history + [(_parse_text(text), None)] | |
task_history = task_history + [(task_text, None)] | |
return history, task_history, "" | |
def add_file(history, task_history, file): | |
history = history if history is not None else [] | |
task_history = task_history if task_history is not None else [] | |
history = history + [((file.name,), None)] | |
task_history = task_history + [((file.name,), None)] | |
return history, task_history | |
def reset_user_input(): | |
return gr.update(value="") | |
def reset_state(task_history): | |
task_history.clear() | |
return [] | |
with gr.Blocks() as demo: | |
gr.Markdown("""<center><font size=3> Kwai-Keye-VL Demo </center>""") | |
chatbot = gr.Chatbot(label='Kwai-Keye-VL', elem_classes="control-height", height=500) | |
query = gr.Textbox(lines=2, label='Input') | |
task_history = gr.State([]) | |
with gr.Row(): | |
addfile_btn = gr.UploadButton("π Upload (δΈδΌ ζδ»Ά)", file_types=["image", "video"]) | |
submit_btn = gr.Button("π Submit (ει)") | |
regen_btn = gr.Button("π€οΈ Regenerate (ιθ―)") | |
empty_bin = gr.Button("π§Ή Clear History (ζΈ ι€εε²)") | |
submit_btn.click(add_text, [chatbot, task_history, query], [chatbot, task_history]).then( | |
predict, [chatbot, task_history], [chatbot], show_progress=True | |
) | |
submit_btn.click(reset_user_input, [], [query]) | |
empty_bin.click(reset_state, [task_history], [chatbot], show_progress=True) | |
regen_btn.click(regenerate, [chatbot, task_history], [chatbot], show_progress=True) | |
addfile_btn.upload(add_file, [chatbot, task_history, addfile_btn], [chatbot, task_history], show_progress=True) | |
demo.queue(default_concurrency_limit=40).launch( | |
share=args.share, | |
) | |
def main(): | |
args = _get_args() | |
_launch_demo(args) | |
if __name__ == '__main__': | |
main() |