Spaces:
Running
Running
import os | |
import pprint | |
import re | |
from typing import List, Optional, Union | |
from qwen_agent import Agent, MultiAgentHub | |
from qwen_agent.agents.user_agent import PENDING_USER_INPUT | |
from qwen_agent.gui.gradio_utils import format_cover_html | |
from qwen_agent.gui.utils import convert_fncall_to_text, convert_history_to_chatbot, get_avatar_image | |
from qwen_agent.llm.schema import CONTENT, FILE, IMAGE, NAME, ROLE, USER, Message | |
from qwen_agent.log import logger | |
from qwen_agent.utils.utils import print_traceback | |
from patching import common_programming_language_extensions | |
class WebUI: | |
"""A Common chatbot application for agent.""" | |
def __init__(self, agent: Union[Agent, MultiAgentHub, List[Agent]], chatbot_config: Optional[dict] = None): | |
""" | |
Initialization the chatbot. | |
Args: | |
agent: The agent or a list of agents, | |
supports various types of agents such as Assistant, GroupChat, Router, etc. | |
chatbot_config: The chatbot configuration. | |
Set the configuration as {'user.name': '', 'user.avatar': '', 'agent.avatar': '', 'input.placeholder': '', 'prompt.suggestions': []}. | |
""" | |
chatbot_config = chatbot_config or {} | |
if isinstance(agent, MultiAgentHub): | |
self.agent_list = [agent for agent in agent.nonuser_agents] | |
self.agent_hub = agent | |
elif isinstance(agent, list): | |
self.agent_list = agent | |
self.agent_hub = None | |
else: | |
self.agent_list = [agent] | |
self.agent_hub = None | |
user_name = chatbot_config.get('user.name', 'user') | |
self.user_config = { | |
'name': user_name, | |
'avatar': chatbot_config.get( | |
'user.avatar', | |
get_avatar_image(user_name), | |
), | |
} | |
self.agent_config_list = [{ | |
'name': agent.name, | |
'avatar': chatbot_config.get( | |
'agent.avatar', | |
os.path.join(os.path.dirname(__file__), 'assets/logo.jpeg'), | |
), | |
'description': agent.description or "I'm a helpful assistant.", | |
} for agent in self.agent_list] | |
self.input_placeholder = chatbot_config.get('input.placeholder', '跟我聊聊吧~') | |
self.prompt_suggestions = chatbot_config.get('prompt.suggestions', []) | |
self.verbose = chatbot_config.get('verbose', False) | |
""" | |
Run the chatbot. | |
Args: | |
messages: The chat history. | |
""" | |
def run(self, | |
messages: List[Message] = None, | |
share: bool = False, | |
server_name: str = None, | |
server_port: int = None, | |
concurrency_limit: int = 10, | |
enable_mention: bool = False, | |
**kwargs): | |
self.run_kwargs = kwargs | |
from qwen_agent.gui.gradio import gr, mgr | |
# Создаем более современную цветовую схему | |
customTheme = gr.themes.Soft( | |
primary_hue=gr.themes.colors.blue, | |
secondary_hue=gr.themes.colors.gray, | |
radius_size=gr.themes.sizes.radius_md, | |
) | |
with gr.Blocks( | |
css=os.path.join(os.path.dirname(__file__), 'assets/appBot.css'), | |
theme=customTheme, | |
) as demo: | |
history = gr.State([]) | |
with gr.Container(): | |
with gr.Row(): | |
with gr.Column(scale=3): | |
# Улучшенный чатбот с расширенными опциями | |
chatbot = mgr.Chatbot( | |
value=convert_history_to_chatbot(messages=messages), | |
avatar_images=[ | |
self.user_config, | |
self.agent_config_list, | |
], | |
height=700, | |
avatar_image_width=60, | |
bubble_full_width=False, | |
layout='bubble', | |
show_copy_button=True, | |
show_share_button=True, | |
latex_delimiters=[ | |
{'left': '\\(', 'right': '\\)', 'display': False}, | |
{'left': '\\[', 'right': '\\]', 'display': True}, | |
] | |
) | |
# Улучшенный мультимодальный ввод | |
with gr.Row(): | |
input = mgr.MultimodalInput( | |
placeholder=self.input_placeholder, | |
upload_button_props=dict( | |
file_types=[ | |
".pdf", ".docx", ".pptx", ".txt", | |
".html", ".csv", ".tsv", ".xlsx", ".xls" | |
] + ["." + file_type for file_type in common_programming_language_extensions] | |
), | |
show_label=False | |
) | |
send_btn = gr.Button("Отправить", variant="primary", size="lg") | |
with gr.Column(scale=1, min_width=300): | |
# Боковая панель с информацией об агенте | |
with gr.Card(variant="filled"): | |
agent_info_block = self._create_agent_info_block() | |
# Селектор агентов (если их несколько) | |
if len(self.agent_list) > 1: | |
agent_selector = gr.Dropdown( | |
[(agent.name, i) for i, agent in enumerate(self.agent_list)], | |
label='Выберите агента', | |
value=0, | |
interactive=True, | |
) | |
# Блок с возможностями агента | |
agent_plugins_block = self._create_agent_plugins_block() | |
# Рекомендованные промпты | |
if self.prompt_suggestions: | |
gr.Examples( | |
label='Быстрый старт', | |
examples=self.prompt_suggestions, | |
inputs=[input], | |
) | |
# Привязка событий | |
send_btn.click( | |
fn=self.add_text, | |
inputs=[input, chatbot, history], | |
outputs=[input, chatbot, history] | |
).then( | |
self.agent_run, | |
[chatbot, history], | |
[chatbot, history] | |
) | |
# Обработка ввода по нажатию Enter | |
input.submit( | |
fn=self.add_text, | |
inputs=[input, chatbot, history], | |
outputs=[input, chatbot, history] | |
).then( | |
self.agent_run, | |
[chatbot, history], | |
[chatbot, history] | |
) | |
# Запуск демо | |
demo.queue(default_concurrency_limit=concurrency_limit).launch( | |
share=share, | |
server_name=server_name, | |
server_port=server_port | |
) | |
def change_agent(self, agent_selector): | |
yield agent_selector, self._create_agent_info_block(agent_selector), self._create_agent_plugins_block( | |
agent_selector) | |
def add_text(self, _input, _chatbot, _history): | |
from qwen_agent.gui.gradio import gr | |
if _input.text == "/clear": | |
_chatbot = [] | |
_history.clear() | |
yield gr.update(interactive=False, value=""), _chatbot, _history | |
return | |
_history.append({ | |
ROLE: USER, | |
CONTENT: [{ | |
'text': _input.text | |
}], | |
}) | |
if self.user_config[NAME]: | |
_history[-1][NAME] = self.user_config[NAME] | |
if _input.files: | |
for file in _input.files: | |
if file.mime_type.startswith('image/'): | |
_history[-1][CONTENT].append({IMAGE: 'file://' + file.path}) | |
else: | |
_history[-1][CONTENT].append({FILE: file.path}) | |
_chatbot.append([_input, None]) | |
yield gr.update(interactive=False, value=None), _chatbot, _history | |
def add_mention(self, _chatbot, _agent_selector): | |
if len(self.agent_list) == 1: | |
yield _chatbot, _agent_selector | |
query = _chatbot[-1][0].text | |
match = re.search(r'@\w+\b', query) | |
if match: | |
_agent_selector = self._get_agent_index_by_name(match.group()[1:]) | |
agent_name = self.agent_list[_agent_selector].name | |
if ('@' + agent_name) not in query and self.agent_hub is None: | |
_chatbot[-1][0].text = '@' + agent_name + ' ' + query | |
yield _chatbot, _agent_selector | |
def agent_run(self, _chatbot, _history, _agent_selector=None): | |
if not _history: | |
if _agent_selector is not None: | |
yield _chatbot, _history, _agent_selector | |
else: | |
yield _chatbot, _history | |
return | |
if self.verbose: | |
logger.info('agent_run input:\n' + pprint.pformat(_history, indent=2)) | |
num_input_bubbles = len(_chatbot) - 1 | |
num_output_bubbles = 1 | |
_chatbot[-1][1] = [None for _ in range(len(self.agent_list))] | |
agent_runner = self.agent_list[_agent_selector or 0] | |
if self.agent_hub: | |
agent_runner = self.agent_hub | |
responses = [] | |
for responses in agent_runner.run(_history, **self.run_kwargs): | |
# usage = responses.usage | |
# responses = [Message(ASSISTANT, responses.output.choices[0].message.content)] | |
if not responses: | |
continue | |
if responses[-1][CONTENT] == PENDING_USER_INPUT: | |
logger.info('Interrupted. Waiting for user input!') | |
break | |
display_responses = convert_fncall_to_text(responses) | |
# display_responses[-1][CONTENT] += "\n<summary>" + repr({"usage": usage}) + "</summary>" | |
if not display_responses: | |
continue | |
if display_responses[-1][CONTENT] is None: | |
continue | |
while len(display_responses) > num_output_bubbles: | |
# Create a new chat bubble | |
_chatbot.append([None, None]) | |
_chatbot[-1][1] = [None for _ in range(len(self.agent_list))] | |
num_output_bubbles += 1 | |
assert num_output_bubbles == len(display_responses) | |
assert num_input_bubbles + num_output_bubbles == len(_chatbot) | |
for i, rsp in enumerate(display_responses): | |
agent_index = self._get_agent_index_by_name(rsp[NAME]) | |
_chatbot[num_input_bubbles + i][1][agent_index] = rsp[CONTENT] | |
if len(self.agent_list) > 1: | |
_agent_selector = agent_index | |
if _agent_selector is not None: | |
yield _chatbot, _history, _agent_selector | |
else: | |
yield _chatbot, _history | |
if responses: | |
for res in responses: | |
res['content'] = re.sub(r"\n<summary>input tokens.*</summary>", "", res['content']) | |
_history.extend([res for res in responses if res[CONTENT] != PENDING_USER_INPUT]) | |
if _agent_selector is not None: | |
yield _chatbot, _history, _agent_selector | |
else: | |
yield _chatbot, _history | |
if self.verbose: | |
logger.info('agent_run response:\n' + pprint.pformat(responses, indent=2)) | |
def flushed(self): | |
from qwen_agent.gui.gradio import gr | |
return gr.update(interactive=True) | |
def _get_agent_index_by_name(self, agent_name): | |
if agent_name is None: | |
return 0 | |
try: | |
agent_name = agent_name.strip() | |
for i, agent in enumerate(self.agent_list): | |
if agent.name == agent_name: | |
return i | |
return 0 | |
except Exception: | |
print_traceback() | |
return 0 | |
def _create_agent_info_block(self, agent_index=0): | |
from qwen_agent.gui.gradio import gr | |
agent_config_interactive = self.agent_config_list[agent_index] | |
return gr.HTML( | |
format_cover_html( | |
bot_name=agent_config_interactive['name'], | |
bot_description=agent_config_interactive['description'], | |
bot_avatar=agent_config_interactive['avatar'], | |
)) | |
def _create_agent_plugins_block(self, agent_index=0): | |
from qwen_agent.gui.gradio import gr | |
agent_interactive = self.agent_list[agent_index] | |
if agent_interactive.function_map: | |
capabilities = [key for key in agent_interactive.function_map.keys()] | |
return gr.CheckboxGroup( | |
label='插件', | |
value=capabilities, | |
choices=capabilities, | |
interactive=False, | |
) | |
else: | |
return gr.CheckboxGroup( | |
label='插件', | |
value=[], | |
choices=[], | |
interactive=False, | |
) | |