Tonic's picture
Update app.py
5207986
raw
history blame
12.3 kB
from modelscope import AutoModelForCausalLM, AutoTokenizer, GenerationConfig, snapshot_download
from argparse import ArgumentParser
from pathlib import Path
import shutil
import copy
import gradio as gr
import os
import re
import secrets
import tempfile
os.environ['CUDA_VISIBLE_DEVICES'] = '0,1'
DEFAULT_CKPT_PATH = 'qwen/Qwen-VL-Chat'
REVISION = 'v1.0.4'
BOX_TAG_PATTERN = r"<box>([\s\S]*?)</box>"
PUNCTUATION = "!?。"#$%&'()*+,-/:;<=>@[\]^_`{|}~⦅⦆「」、、〃》「」『』【】〔〕〖〗〘〙〚〛〜〝〞〟〰〾〿–—‘’‛“”„‟…‧﹏."
uploaded_file_dir = os.environ.get("GRADIO_TEMP_DIR") or str(Path(tempfile.gettempdir()) / "gradio")
def _get_args() -> ArgumentParser:
parser = ArgumentParser()
parser.add_argument("-c", "--checkpoint-path", type=str, default=DEFAULT_CKPT_PATH,
help="Checkpoint name or path, default to %(default)r")
parser.add_argument("--revision", type=str, default=REVISION)
parser.add_argument("--cpu-only", action="store_true", help="Run demo with CPU only")
parser.add_argument("--share", action="store_true", default=False,
help="Create a publicly shareable link for the interface.")
parser.add_argument("--inbrowser", action="store_true", default=False,
help="Automatically launch the interface in a new tab on the default browser.")
parser.add_argument("--server-port", type=int, default=8000,
help="Demo server port.")
parser.add_argument("--server-name", type=str, default="127.0.0.1",
help="Demo server name.")
args = parser.parse_args()
return args
def handle_image_submission(_chatbot, task_history, file, tokenizer, model) -> tuple:
print("handle_image_submission called")
if file is None:
print("No file uploaded")
return _chatbot, task_history
print("File received:", file)
file_path = save_image(file, uploaded_file_dir)
print("File saved at:", file_path)
history_item = ((file_path,), None)
_chatbot.append(history_item)
task_history.append(history_item)
return predict(_chatbot, task_history, tokenizer, model)
def _load_model_tokenizer(args) -> tuple:
model_id = args.checkpoint_path
model_dir = snapshot_download(model_id, revision=args.revision)
tokenizer = AutoTokenizer.from_pretrained(
model_dir, trust_remote_code=True, resume_download=True,
)
if args.cpu_only:
device_map = "cpu"
else:
device_map = "auto"
model = AutoModelForCausalLM.from_pretrained(
model_dir,
device_map=device_map,
trust_remote_code=True,
bf16=True,
resume_download=True,
).eval()
model.generation_config = GenerationConfig.from_pretrained(
model_dir, trust_remote_code=True, resume_download=True,
)
return model, tokenizer
def _parse_text(text: str) -> str:
lines = text.split("\n")
lines = [line for line in lines if line != ""]
count = 0
for i, line in enumerate(lines):
if "```" in line:
count += 1
items = line.split("`")
if count % 2 == 1:
lines[i] = f'<pre><code class="language-{items[-1]}">'
else:
lines[i] = f"<br></code></pre>"
else:
if i > 0:
if count % 2 == 1:
line = line.replace("`", r"\`")
line = line.replace("<", "&lt;")
line = line.replace(">", "&gt;")
line = line.replace(" ", "&nbsp;")
line = line.replace("*", "&ast;")
line = line.replace("_", "&lowbar;")
line = line.replace("-", "&#45;")
line = line.replace(".", "&#46;")
line = line.replace("!", "&#33;")
line = line.replace("(", "&#40;")
line = line.replace(")", "&#41;")
line = line.replace("$", "&#36;")
lines[i] = "<br>" + line
text = "".join(lines)
return text
def save_image(image_file, upload_dir: str) -> str:
print("save_image called with:", image_file)
Path(upload_dir).mkdir(parents=True, exist_ok=True)
filename = secrets.token_hex(10) + Path(image_file.name).suffix
file_path = Path(upload_dir) / filename
print("Saving to:", file_path)
with open(image_file.name, "rb") as f_input, open(file_path, "wb") as f_output:
f_output.write(f_input.read())
return str(file_path)
def add_file(history, task_history, file):
if file is None:
return history, task_history
file_path = save_image(file)
history = history + [((file_path,), None)]
task_history = task_history + [((file_path,), None)]
return history, task_history
def predict(_chatbot, task_history, tokenizer, model) -> list:
print("predict called")
if not _chatbot:
return _chatbot
chat_query = _chatbot[-1][0]
print("Chat query:", chat_query)
if isinstance(chat_query, tuple):
query = [{'image': chat_query[0]}]
else:
query = [{'text': _parse_text(chat_query)}]
print("Query for model:", query)
inputs = tokenizer.from_list_format(query)
tokenized_inputs = tokenizer(inputs, return_tensors='pt')
tokenized_inputs = tokenized_inputs.to(model.device)
pred = model.generate(**tokenized_inputs)
response = tokenizer.decode(pred.cpu()[0], skip_special_tokens=False)
print("Model response:", response)
if 'image' in query[0]:
image = tokenizer.draw_bbox_on_latest_picture(response)
if image is not None:
image_path = save_image(image, uploaded_file_dir)
_chatbot[-1] = (chat_query, (image_path,))
else:
_chatbot[-1] = (chat_query, "No image to display.")
else:
_chatbot[-1] = (chat_query, response)
return _chatbot
def save_uploaded_image(image_file, upload_dir):
if image is None:
return None
temp_dir = secrets.token_hex(20)
temp_dir = Path(uploaded_file_dir) / temp_dir
temp_dir.mkdir(exist_ok=True, parents=True)
name = f"tmp{secrets.token_hex(5)}.jpg"
filename = temp_dir / name
image.save(str(filename))
return str(filename)
def regenerate(_chatbot, task_history) -> list:
if not task_history:
return _chatbot
item = task_history[-1]
if item[1] is None:
return _chatbot
task_history[-1] = (item[0], None)
chatbot_item = _chatbot.pop(-1)
if chatbot_item[0] is None:
_chatbot[-1] = (_chatbot[-1][0], None)
else:
_chatbot.append((chatbot_item[0], None))
return predict(_chatbot, task_history, tokenizer, model)
def add_text(history, task_history, text) -> tuple:
task_text = text
if len(text) >= 2 and text[-1] in PUNCTUATION and text[-2] not in PUNCTUATION:
task_text = text[:-1]
history = history + [(_parse_text(text), None)]
task_history = task_history + [(task_text, None)]
return history, task_history, ""
def add_file(history, task_history, file):
if file is None:
return history, task_history # Return if no file is uploaded
file_path = file.name
history = history + [((file.name,), None)]
task_history = task_history + [((file.name,), None)]
return history, task_history
def reset_user_input():
return gr.update(value="")
def process_response(response: str) -> str:
response = response.replace("<ref>", "").replace(r"</ref>", "")
response = re.sub(BOX_TAG_PATTERN, "", response)
return response
def process_history_for_model(task_history) -> list:
processed_history = []
for query, response in task_history:
if isinstance(query, tuple):
query = {'image': query[0]}
else:
query = {'text': query}
response = response or ""
processed_history.append((query, response))
return processed_history
def reset_state(task_history) -> list:
task_history.clear()
return []
def _launch_demo(args, model, tokenizer):
uploaded_file_dir = os.environ.get("GRADIO_TEMP_DIR") or str(
Path(tempfile.gettempdir()) / "gradio"
)
with gr.Blocks() as demo:
gr.Markdown("""# Welcome to Tonic's Qwen-VL-Chat Bot""")
gr.Markdown(
""" Qwen-VL-Chat is a multimodal input model.
本WebUI基于Qwen-VL-Chat打造,实现聊天机器人功能 但我必须修复它这么多也许我也得到一些荣誉
You can use this Space to test out the current model [qwen/Qwen-VL-Chat](https://huggingface.co/qwen/Qwen-VL-Chat) You can also use 🧑🏻‍🚀qwen/Qwen-VL-Chat🚀 by cloning this space. 🧬🔬🔍 Simply click here: <a style="display:inline-block" href="https://huggingface.co/spaces/Tonic1/VLChat?duplicate=true"><img src="https://img.shields.io/badge/-Duplicate%20Space-blue?labelColor=white&style=flat&logo=&logoWidth=14" alt="Duplicate Space"></a></h3>
Join us : 🌟TeamTonic🌟 is always making cool demos! Join our active builder's🛠️community on 👻Discord: [Discord](https://discord.gg/nXx5wbX9) On 🤗Huggingface: [TeamTonic](https://huggingface.co/TeamTonic) & [MultiTransformer](https://huggingface.co/MultiTransformer) On 🌐Github: [Polytonic](https://github.com/tonic-ai) & contribute to 🌟 [PolyGPT](https://github.com/tonic-ai/polygpt-alpha)
""")
with gr.Row():
with gr.Column(scale=1):
chatbot = gr.Chatbot(label='Qwen-VL-Chat')
with gr.Column(scale=1):
with gr.Row():
query = gr.Textbox(lines=2, label='Input', placeholder="Type your message here...")
submit_btn = gr.Button("🚀 Submit")
with gr.Row():
file_upload = gr.UploadButton("📁 Upload Image", file_types=["image"])
submit_file_btn = gr.Button("Submit Image")
regen_btn = gr.Button("🤔️ Regenerate")
empty_bin = gr.Button("🧹 Clear History")
task_history = gr.State([])
submit_btn.click(
fn=predict,
inputs=[chatbot, task_history],
outputs=[chatbot],
_state=[tokenizer, model]
)
submit_file_btn.click(
fn=handle_image_submission,
inputs=[chatbot, task_history, file_upload],
outputs=[chatbot, task_history],
_state=[tokenizer, model]
)
regen_btn.click(
fn=regenerate,
inputs=[chatbot, task_history],
outputs=[chatbot],
_state=[tokenizer, model]
)
empty_bin.click(
fn=reset_state,
inputs=[task_history],
outputs=[task_history],
_state=[tokenizer, model]
)
query.submit(
fn=add_text,
inputs=[chatbot, task_history, query],
outputs=[chatbot, task_history, query],
_state=[tokenizer, model]
)
gr.Markdown("""
Note: This demo is governed by the original license of Qwen-VL.
We strongly advise users not to knowingly generate or allow others to knowingly generate harmful content,
including hate speech, violence, pornography, deception, etc.
(注:本演示受Qwen-VL的许可协议限制。我们强烈建议,用户不应传播及不应允许他人传播以下内容,
包括但不限于仇恨言论、暴力、色情、欺诈相关的有害信息。)""")
demo.queue().launch()
def main():
args = _get_args()
model, tokenizer = _load_model_tokenizer(args)
_launch_demo(args, model, tokenizer)
if __name__ == '__main__':
main()