from openai import OpenAI import gradio as gr import json from bot_actions import functions_dictionary, save_record import os CSS =""" .contain { display: flex; flex-direction: column; } .gradio-container { height: 100vh !important; } .svelte-vt1mxs div:first-child { flex-grow: 1; overflow: auto;} #chatbot { flex-grow: 1; overflow: auto;} .svelte-vt1mxs div:nth-child(2) > div:first-child {height: 300px; overflow: auto !important;} footer {visibility: hidden} .app.svelte-182fdeq.svelte-182fdeq { max-width: 100vw !important; } """ openAIToken = os.environ['openAIToken'] assistantId = os.environ['assistantId'] initial_message = os.environ['initialMessage'] client = OpenAI(api_key=openAIToken) def handle_requires_action(data): actions_results = [] for tool in data.required_action.submit_tool_outputs.tool_calls: function_name = tool.function.name function_args = json.loads(tool.function.arguments) print(function_name) print(function_args) try: result = functions_dictionary[tool.function.name](**function_args) print("Function result:", result) actions_results.append({"tool_output" : {"tool_call_id": tool.id, "output": result["message"]}}) except Exception as e: print(e) # Submit all tool_outputs at the same time return actions_results def create_thread_openai(sessionStorage): streaming_thread = client.beta.threads.create() sessionStorage["threadId"] = streaming_thread.id return sessionStorage def add_message_to_openai(text, threadId): print("User message: ", text) return client.beta.threads.messages.create( thread_id=threadId, role="user", content=text ) def transform_suggestions_into_list(string_of_suggestions): local_message = None parts = string_of_suggestions.split('#s#') list_of_suggestions = json.loads(parts[0]) list_of_suggestions = [ x for x in list_of_suggestions if x] if len(parts) > 1: local_message = parts[1] return list_of_suggestions, local_message def create_suggestions_list(suggestions): update_show = [gr.update(visible=True, value=w) for w in suggestions] update_hide = [gr.update(visible=False, value="") for _ in range(6-len(suggestions))] return update_show + update_hide def process_text_chunk(text, storage): print(text, end="", flush=True) local_message = None if "[" in text: storage["is_loading_suggestions"] = True if "#" in text and storage["is_loading_suggestions"] != True: storage["is_loading_markup"] = True if storage["is_loading_suggestions"] == True or storage["is_loading_markup"] == True: accumulative_string = storage["accumulative_string"] + text if storage["is_loading_suggestions"] == True: if "#s#" in accumulative_string: storage["is_loading_suggestions"] = False list_of_suggestions, local_message = transform_suggestions_into_list(accumulative_string) storage["list_of_suggestions"] = list_of_suggestions accumulative_string = "" elif "]" in accumulative_string and "]#" not in accumulative_string and not accumulative_string.endswith("]"): storage["is_loading_suggestions"] = False local_message = accumulative_string accumulative_string = "" else: if "#p#" in accumulative_string: parts = accumulative_string.split("#p#") local_message = parts[0] accumulative_string = "#p#" + parts[1] storage["markup_string"] = accumulative_string[3:] elif "#" in accumulative_string and "#p" not in accumulative_string and not accumulative_string.endswith("#"): storage["markup_string"] = accumulative_string[4:] storage["is_loading_markup"] = False local_message = accumulative_string accumulative_string = "" storage["accumulative_string"] = accumulative_string else: local_message = text return local_message, storage def handle_events(threadId, chat_history, storage): storage.update({ "list_of_suggestions" : [], "is_loading_suggestions" : False, "is_loading_markup" : False, "accumulative_string" : "", "markup_string": "" }) try: with client.beta.threads.runs.stream( thread_id=threadId, assistant_id=assistantId ) as stream: for event in stream: if event.event == "thread.message.delta" and event.data.delta.content: text = event.data.delta.content[0].text.value local_message, storage = process_text_chunk(text, storage) if local_message is not None: chat_history[-1][1] += local_message yield [ chat_history, storage, storage["markup_string"]] if event.event == 'thread.run.requires_action': result = handle_requires_action(event.data) tool_outputs = [x["tool_output"] for x in result] with client.beta.threads.runs.submit_tool_outputs_stream( thread_id=stream.current_run.thread_id, run_id=event.data.id, tool_outputs=tool_outputs, ) as action_stream: for text in action_stream.text_deltas: local_message, storage = process_text_chunk(text, storage) if local_message is not None: chat_history[-1][1] += local_message yield [chat_history, storage, storage["markup_string"]] action_stream.close() stream.until_done() print("") return [chat_history, storage, storage["markup_string"]] except Exception as e: print(e) chat_history[-1][1] = "Error occured during processing your message. Please try again" yield [chat_history, storage, storage["markup_string"]] def initiate_chatting(chat_history, storage): threadId = storage["threadId"] chat_history = [[None, ""]] add_message_to_openai(initial_message, threadId) for response in handle_events(threadId, chat_history, storage): yield response def respond_on_user_msg(chat_history, storage): message = chat_history[-1][0] threadId = storage["threadId"] print("Responding for threadId: ", threadId) chat_history[-1][1] = "" add_message_to_openai(message, threadId) for response in handle_events(threadId, chat_history, storage): yield response def create_application(): with gr.Blocks(css=CSS, fill_height=True) as demo: storage = gr.State({"list_of_suggestions": [], "is_loading_suggestions": False, "is_loading_markup": False, "accumulative_string": "", "markup_string": ""}) with gr.Row(): with gr.Column(scale=4): chatbot = gr.Chatbot(label="Facility managment bot", line_breaks=False, height=300, show_label=False, show_share_button=False, elem_id="chatbot") with gr.Column(scale=1): markdown = gr.Markdown(label="Bullet-list", value="# Facility information") btn_list = [] with gr.Row(): for i in range(6): btn = gr.Button(visible=False, size="sm") btn_list.append(btn) msg = gr.Textbox(label="Answer", interactive=False) def user(user_message, history): return "", history + [[user_message, None]] def update_suggestions(storage): list_of_suggestions = storage['list_of_suggestions'] or [] btn_list = create_suggestions_list(list_of_suggestions) return btn_list def hide_suggestions(): return [gr.update(visible=False, value="") for _ in range(6)] def disable_msg(): message_box = gr.Textbox(value=None, interactive=False) return message_box def enable_msg(): message_box = gr.Textbox(value=None, interactive=True) return message_box add_user_message_flow = [user, [msg,chatbot], [msg,chatbot]] chat_response_flow = [respond_on_user_msg, [chatbot, storage], [chatbot, storage, markdown]] update_suggestions_flow = [update_suggestions, storage, btn_list] hide_suggestions_flow = [hide_suggestions, None, btn_list] disable_msg_flow = [disable_msg, None, msg] enable_msg_flow = [enable_msg, None, msg] msg.submit(*add_user_message_flow ).then(*hide_suggestions_flow ).then(*disable_msg_flow ).then(*chat_response_flow ).then(*update_suggestions_flow ).then(*enable_msg_flow) for sug_btn in btn_list: add_suggestion_message_flow = [user, [sug_btn, chatbot], [msg, chatbot]] sug_btn.click(*add_suggestion_message_flow ).then(*hide_suggestions_flow ).then(*disable_msg_flow ).then(*chat_response_flow ).then(*update_suggestions_flow ).then(*enable_msg_flow) demo.load(create_thread_openai, inputs=storage, outputs=storage ).then(initiate_chatting, inputs=[chatbot, storage], outputs=[chatbot, storage, markdown] ).then(*update_suggestions_flow ).then(*enable_msg_flow) return demo if __name__ == "__main__": demo = create_application() demo.launch()