Spaces:
Runtime error
Runtime error
from openai import OpenAI | |
import gradio as gr | |
import json | |
from bot_actions import functions_dictionary, save_record | |
import os | |
CSS =""" | |
.contain { display: flex; flex-direction: column; } | |
.gradio-container { height: 100vh !important; } | |
#chatbot { flex-grow: 1; overflow: auto;} | |
footer {visibility: hidden} | |
""" | |
openAIToken = os.environ['openAIToken'] | |
assistantId = os.environ['assistantId'] | |
initial_message = os.environ['initialMessage'] | |
client = OpenAI(api_key=openAIToken) | |
def handle_requires_action(data): | |
actions_results = [] | |
for tool in data.required_action.submit_tool_outputs.tool_calls: | |
function_name = tool.function.name | |
function_args = json.loads(tool.function.arguments) | |
print(function_name) | |
print(function_args) | |
try: | |
result = functions_dictionary[tool.function.name](**function_args) | |
print("Function result:", result) | |
actions_results.append({"tool_output" : {"tool_call_id": tool.id, "output": result["message"]}}) | |
except Exception as e: | |
print(e) | |
# Submit all tool_outputs at the same time | |
return actions_results | |
def create_thread_openai(sessionStorage): | |
streaming_thread = client.beta.threads.create() | |
sessionStorage["threadId"] = streaming_thread.id | |
return sessionStorage | |
def add_message_to_openai(text, threadId): | |
print("User message: ", text) | |
return client.beta.threads.messages.create( | |
thread_id=threadId, | |
role="user", | |
content=text | |
) | |
def transform_suggestions_into_list(string_of_suggestions): | |
local_message = None | |
parts = string_of_suggestions.split('#s#') | |
list_of_suggestions = json.loads(parts[0]) | |
list_of_suggestions = [ x for x in list_of_suggestions if x] | |
if len(parts) > 1: | |
local_message = parts[1] | |
return list_of_suggestions, local_message | |
def create_suggestions_list(suggestions): | |
update_show = [gr.update(visible=True, value=w) for w in suggestions] | |
update_hide = [gr.update(visible=False, value="") for _ in range(6-len(suggestions))] | |
return update_show + update_hide | |
def process_text_chunk(text, list_of_suggestions, string_of_suggestions, is_loading_suggestions): | |
print(text, end="", flush=True) | |
local_message = None | |
if "[" in text: | |
is_loading_suggestions = True | |
if is_loading_suggestions != True: | |
local_message = text | |
else: | |
string_of_suggestions = string_of_suggestions + text | |
if "#s#" in string_of_suggestions: | |
is_loading_suggestions = False | |
list_of_suggestions, local_message = transform_suggestions_into_list(string_of_suggestions) | |
string_of_suggestions = "" | |
elif "]" in string_of_suggestions and "]#" not in string_of_suggestions and not string_of_suggestions.endswith("]"): | |
is_loading_suggestions = False | |
local_message = string_of_suggestions | |
string_of_suggestions = "" | |
return local_message, list_of_suggestions, string_of_suggestions, is_loading_suggestions | |
def handle_events(threadId, chat_history, storage): | |
list_of_suggestions = [] | |
string_of_suggestions = "" | |
is_loading_suggestions = False | |
try: | |
with client.beta.threads.runs.stream( | |
thread_id=threadId, | |
assistant_id=assistantId | |
) as stream: | |
for event in stream: | |
if event.event == "thread.message.delta" and event.data.delta.content: | |
text = event.data.delta.content[0].text.value | |
local_message, list_of_suggestions, string_of_suggestions, is_loading_suggestions = process_text_chunk(text, list_of_suggestions, string_of_suggestions, is_loading_suggestions) | |
if local_message is not None: | |
chat_history[-1][1] += local_message | |
yield [ chat_history, storage] | |
if event.event == 'thread.run.requires_action': | |
result = handle_requires_action(event.data) | |
tool_outputs = [x["tool_output"] for x in result] | |
with client.beta.threads.runs.submit_tool_outputs_stream( | |
thread_id=stream.current_run.thread_id, | |
run_id=event.data.id, | |
tool_outputs=tool_outputs, | |
) as action_stream: | |
for text in action_stream.text_deltas: | |
local_message, list_of_suggestions, string_of_suggestions, is_loading_suggestions = process_text_chunk(text, list_of_suggestions, string_of_suggestions, is_loading_suggestions) | |
if local_message is not None: | |
chat_history[-1][1] += local_message | |
yield [chat_history, storage] | |
action_stream.close() | |
stream.until_done() | |
print("") | |
storage["list_of_suggestions"] = list_of_suggestions | |
return [chat_history, storage] | |
except Exception as e: | |
print(e) | |
chat_history[-1][1] = "Error occured during processing your message. Please try again" | |
yield [chat_history, storage] | |
def initiate_chatting(chat_history, storage): | |
threadId = storage["threadId"] | |
chat_history = [[None, ""]] | |
add_message_to_openai(initial_message, threadId) | |
for response in handle_events(threadId, chat_history, storage): | |
yield response | |
def respond_on_user_msg(chat_history, storage): | |
message = chat_history[-1][0] | |
threadId = storage["threadId"] | |
print("Responding for threadId: ", threadId) | |
chat_history[-1][1] = "" | |
add_message_to_openai(message, threadId) | |
for response in handle_events(threadId, chat_history, storage): | |
yield response | |
def create_application(): | |
with gr.Blocks(css=CSS, fill_height=True) as demo: | |
storage = gr.State({}) | |
chatbot = gr.Chatbot(label="Vehicle inspection bot", line_breaks=False, show_label=False, show_share_button=False, elem_id="chatbot", height=300) | |
btn_list = [] | |
with gr.Row(): | |
for i in range(6): | |
btn = gr.Button(visible=False, size="sm") | |
btn_list.append(btn) | |
msg = gr.Textbox(label="Answer", interactive=False) | |
def user(user_message, history): | |
return "", history + [[user_message, None]] | |
def update_suggestions(storage): | |
list_of_suggestions = storage['list_of_suggestions'] or [] | |
btn_list = create_suggestions_list(list_of_suggestions) | |
return btn_list | |
def hide_suggestions(): | |
return [gr.update(visible=False, value="") for _ in range(6)] | |
def disable_msg(): | |
message_box = gr.Textbox(value=None, interactive=False) | |
return message_box | |
def enable_msg(): | |
message_box = gr.Textbox(value=None, interactive=True) | |
return message_box | |
add_user_message_flow = [user, [msg,chatbot], [msg,chatbot]] | |
chat_response_flow = [respond_on_user_msg, [chatbot, storage], [chatbot, storage]] | |
update_suggestions_flow = [update_suggestions, storage, btn_list] | |
hide_suggestions_flow = [hide_suggestions, None, btn_list] | |
disable_msg_flow = [disable_msg, None, msg] | |
enable_msg_flow = [enable_msg, None, msg] | |
msg.submit(*add_user_message_flow | |
).then(*hide_suggestions_flow | |
).then(*disable_msg_flow | |
).then(*chat_response_flow | |
).then(*update_suggestions_flow | |
).then(*enable_msg_flow) | |
for sug_btn in btn_list: | |
add_suggestion_message_flow = [user, [sug_btn, chatbot], [msg, chatbot]] | |
sug_btn.click(*add_suggestion_message_flow | |
).then(*hide_suggestions_flow | |
).then(*disable_msg_flow | |
).then(*chat_response_flow | |
).then(*update_suggestions_flow | |
).then(*enable_msg_flow) | |
demo.load(create_thread_openai, inputs=storage, outputs=storage | |
).then(initiate_chatting, inputs=[chatbot, storage], outputs=[chatbot, storage] | |
).then(*update_suggestions_flow | |
).then(*enable_msg_flow) | |
return demo | |
if __name__ == "__main__": | |
demo = create_application() | |
demo.launch() |