from openai import OpenAI import gradio as gr import json from bot_actions import functions_dictionary, save_record import os CSS =""" .contain { display: flex; flex-direction: column; } .gradio-container { height: 100vh !important; } #chatbot { flex-grow: 1; overflow: auto;} footer {visibility: hidden} """ openAIToken = os.environ['openAIToken'] assistantId = os.environ['assistantId'] initial_message = os.environ['initialMessage'] client = OpenAI(api_key=openAIToken) def handle_requires_action(data): actions_results = [] for tool in data.required_action.submit_tool_outputs.tool_calls: function_name = tool.function.name function_args = json.loads(tool.function.arguments) print(function_name) print(function_args) try: result = functions_dictionary[tool.function.name](**function_args) print("Function result:", result) actions_results.append({"tool_output" : {"tool_call_id": tool.id, "output": result["message"]}}) except Exception as e: print(e) # Submit all tool_outputs at the same time return actions_results def create_thread_openai(sessionStorage): streaming_thread = client.beta.threads.create() sessionStorage["threadId"] = streaming_thread.id return sessionStorage def add_message_to_openai(text, threadId): print("User message: ", text) return client.beta.threads.messages.create( thread_id=threadId, role="user", content=text ) def transform_suggestions_into_list(string_of_suggestions): local_message = None parts = string_of_suggestions.split('#s#') list_of_suggestions = json.loads(parts[0]) list_of_suggestions = [ x for x in list_of_suggestions if x] if len(parts) > 1: local_message = parts[1] return list_of_suggestions, local_message def create_suggestions_list(suggestions): update_show = [gr.update(visible=True, value=w) for w in suggestions] update_hide = [gr.update(visible=False, value="") for _ in range(6-len(suggestions))] return update_show + update_hide def process_text_chunk(text, list_of_suggestions, string_of_suggestions, is_loading_suggestions): print(text, end="", flush=True) local_message = None if "[" in text: is_loading_suggestions = True if is_loading_suggestions != True: local_message = text else: string_of_suggestions = string_of_suggestions + text if "#s#" in string_of_suggestions: is_loading_suggestions = False list_of_suggestions, local_message = transform_suggestions_into_list(string_of_suggestions) string_of_suggestions = "" elif "]" in string_of_suggestions and "]#" not in string_of_suggestions and not string_of_suggestions.endswith("]"): is_loading_suggestions = False local_message = string_of_suggestions string_of_suggestions = "" return local_message, list_of_suggestions, string_of_suggestions, is_loading_suggestions def handle_events(threadId, chat_history, storage): list_of_suggestions = [] string_of_suggestions = "" is_loading_suggestions = False try: with client.beta.threads.runs.stream( thread_id=threadId, assistant_id=assistantId ) as stream: for event in stream: if event.event == "thread.message.delta" and event.data.delta.content: text = event.data.delta.content[0].text.value local_message, list_of_suggestions, string_of_suggestions, is_loading_suggestions = process_text_chunk(text, list_of_suggestions, string_of_suggestions, is_loading_suggestions) if local_message is not None: chat_history[-1][1] += local_message yield [ chat_history, storage] if event.event == 'thread.run.requires_action': result = handle_requires_action(event.data) tool_outputs = [x["tool_output"] for x in result] with client.beta.threads.runs.submit_tool_outputs_stream( thread_id=stream.current_run.thread_id, run_id=event.data.id, tool_outputs=tool_outputs, ) as action_stream: for text in action_stream.text_deltas: local_message, list_of_suggestions, string_of_suggestions, is_loading_suggestions = process_text_chunk(text, list_of_suggestions, string_of_suggestions, is_loading_suggestions) if local_message is not None: chat_history[-1][1] += local_message yield [chat_history, storage] action_stream.close() stream.until_done() print("") storage["list_of_suggestions"] = list_of_suggestions return [chat_history, storage] except Exception as e: print(e) chat_history[-1][1] = "Error occured during processing your message. Please try again" yield [chat_history, storage] def initiate_chatting(chat_history, storage): threadId = storage["threadId"] chat_history = [[None, ""]] add_message_to_openai(initial_message, threadId) for response in handle_events(threadId, chat_history, storage): yield response def respond_on_user_msg(chat_history, storage): message = chat_history[-1][0] threadId = storage["threadId"] print("Responding for threadId: ", threadId) chat_history[-1][1] = "" add_message_to_openai(message, threadId) for response in handle_events(threadId, chat_history, storage): yield response def create_application(): with gr.Blocks(css=CSS, fill_height=True) as demo: storage = gr.State({}) chatbot = gr.Chatbot(label="Vehicle inspection bot", line_breaks=False, show_label=False, show_share_button=False, elem_id="chatbot", height=300) btn_list = [] with gr.Row(): for i in range(6): btn = gr.Button(visible=False, size="sm") btn_list.append(btn) msg = gr.Textbox(label="Answer", interactive=False) def user(user_message, history): return "", history + [[user_message, None]] def update_suggestions(storage): list_of_suggestions = storage['list_of_suggestions'] or [] btn_list = create_suggestions_list(list_of_suggestions) return btn_list def hide_suggestions(): return [gr.update(visible=False, value="") for _ in range(6)] def disable_msg(): message_box = gr.Textbox(value=None, interactive=False) return message_box def enable_msg(): message_box = gr.Textbox(value=None, interactive=True) return message_box add_user_message_flow = [user, [msg,chatbot], [msg,chatbot]] chat_response_flow = [respond_on_user_msg, [chatbot, storage], [chatbot, storage]] update_suggestions_flow = [update_suggestions, storage, btn_list] hide_suggestions_flow = [hide_suggestions, None, btn_list] disable_msg_flow = [disable_msg, None, msg] enable_msg_flow = [enable_msg, None, msg] msg.submit(*add_user_message_flow ).then(*hide_suggestions_flow ).then(*disable_msg_flow ).then(*chat_response_flow ).then(*update_suggestions_flow ).then(*enable_msg_flow) for sug_btn in btn_list: add_suggestion_message_flow = [user, [sug_btn, chatbot], [msg, chatbot]] sug_btn.click(*add_suggestion_message_flow ).then(*hide_suggestions_flow ).then(*disable_msg_flow ).then(*chat_response_flow ).then(*update_suggestions_flow ).then(*enable_msg_flow) demo.load(create_thread_openai, inputs=storage, outputs=storage ).then(initiate_chatting, inputs=[chatbot, storage], outputs=[chatbot, storage] ).then(*update_suggestions_flow ).then(*enable_msg_flow) return demo if __name__ == "__main__": demo = create_application() demo.launch()