from openai import OpenAI import gradio as gr import json from bot_actions import functions_dictionary, save_record import os CSS =""" .contain { display: flex; flex-direction: column; } .svelte-vt1mxs div:first-child { flex-grow: 1; overflow: auto;} #chatbot { flex-grow: 1; overflow: auto;} footer {display: none !important;} .app.svelte-182fdeq.svelte-182fdeq { max-width: 100vw !important; } #main_container { height: 95vh; } #markup_container { height: 100%; overflow:auto; } """ openAIToken = os.environ['openAIToken'] assistantId = os.environ['assistantId'] initial_message = os.environ['initialMessage'] client = OpenAI(api_key=openAIToken) def handle_requires_action(data): actions_results = [] for tool in data.required_action.submit_tool_outputs.tool_calls: function_name = tool.function.name function_args = json.loads(tool.function.arguments) print(function_name) print(function_args) try: result = functions_dictionary[tool.function.name](**function_args) print("Function result:", result) actions_results.append({"tool_output" : {"tool_call_id": tool.id, "output": result["message"]}}) except Exception as e: print(e) # Submit all tool_outputs at the same time return actions_results def create_thread_openai(sessionStorage): streaming_thread = client.beta.threads.create() sessionStorage["threadId"] = streaming_thread.id return sessionStorage def add_message_to_openai(text, threadId): print("User message: ", text) return client.beta.threads.messages.create( thread_id=threadId, role="user", content=text ) def transform_suggestions_into_list(string_of_suggestions): local_message = None parts = string_of_suggestions.split('#s#') list_of_suggestions = json.loads(parts[0]) list_of_suggestions = [ x for x in list_of_suggestions if x] if len(parts) > 1: local_message = parts[1] return list_of_suggestions, local_message def create_suggestions_list(suggestions): update_show = [gr.update(visible=True, value=w) for w in suggestions] update_hide = [gr.update(visible=False, value="") for _ in range(6-len(suggestions))] return update_show + update_hide def process_text_chunk(text, storage): print(text, end="", flush=True) local_message = None if "[" in text: storage["is_loading_suggestions"] = True if "#" in text and storage["is_loading_suggestions"] != True: storage["is_loading_markup"] = True if "<" in text: storage["is_loading_suggestions"] = False storage["is_loading_markup"] = False storage["is_loading_svg"] = True if storage["is_loading_suggestions"] == True or storage["is_loading_markup"] == True or storage["is_loading_svg"] == True: accumulative_string = storage["accumulative_string"] + text if storage["is_loading_suggestions"] == True: if "#s#" in accumulative_string: storage["is_loading_suggestions"] = False list_of_suggestions, local_message = transform_suggestions_into_list(accumulative_string) storage["list_of_suggestions"] = list_of_suggestions accumulative_string = "" elif "]" in accumulative_string and "]#" not in accumulative_string and not accumulative_string.endswith("]"): storage["is_loading_suggestions"] = False local_message = accumulative_string accumulative_string = "" elif storage["is_loading_markup"]: if "#p#" in accumulative_string: parts = accumulative_string.split("#p#") if len(parts) > 2: accumulative_string = parts[0] + parts[2] storage["markup_string"] = parts[1] storage["is_loading_markup"] = False else: local_message = parts[0] accumulative_string = "#p#" + parts[1] storage["markup_string"] = parts[1] elif "#" in accumulative_string and "#p" not in accumulative_string and not accumulative_string.endswith("#"): storage["is_loading_markup"] = False local_message = accumulative_string accumulative_string = "" else: if "<" in accumulative_string and "" in parts[1]: svg_ending = ("') storage["svg"] = svg_ending[0] + '' accumulative_string = svg_ending[1] storage["is_loading_svg"] = False else: accumulative_string = "