from openai import OpenAI import gradio as gr import json from bot_actions import functions_dictionary import os CSS =""" .contain { display: flex; flex-direction: column; } .svelte-vt1mxs div:first-child { flex-grow: 1; overflow: auto;} #chatbot { flex-grow: 1; overflow: auto;} footer {display: none !important;} .app.svelte-182fdeq.svelte-182fdeq { max-width: 100vw !important; } #main_container { height: 95vh; } #markup_container { height: 100%; overflow:auto; } """ openAIToken = os.environ['openAIToken'] assistantId = os.environ['assistantId'] initial_message = os.environ['initialMessage'] client = OpenAI(api_key=openAIToken) def handle_requires_action(data): actions_results = [] for tool in data.required_action.submit_tool_outputs.tool_calls: function_name = tool.function.name function_args = json.loads(tool.function.arguments) print(function_name) print(function_args) try: result = functions_dictionary[tool.function.name](**function_args) print("Function result:", result) actions_results.append({"tool_output" : {"tool_call_id": tool.id, "output": result["message"]}}) except Exception as e: print(e) # Submit all tool_outputs at the same time return actions_results def create_thread_openai(sessionStorage): streaming_thread = client.beta.threads.create() sessionStorage["threadId"] = streaming_thread.id return sessionStorage def add_message_to_openai(text, threadId): print("User message: ", text) return client.beta.threads.messages.create( thread_id=threadId, role="user", content=text ) def process_text_chunk(text, storage): print(text, end="", flush=True) local_message = None accumulative_string = storage["accumulative_string"] + text local_message = accumulative_string return local_message, storage def handle_events(threadId, chat_history, storage): storage.update({ "accumulative_string" : "", "markup_string": "", }) try: with client.beta.threads.runs.stream( thread_id=threadId, assistant_id=assistantId ) as stream: for event in stream: if event.event == "thread.message.delta" and event.data.delta.content: text = event.data.delta.content[0].text.value local_message, storage = process_text_chunk(text, storage) if local_message is not None: chat_history[-1][1] += local_message yield [chat_history, storage] if event.event == 'thread.run.requires_action': result = handle_requires_action(event.data) tool_outputs = [x["tool_output"] for x in result] with client.beta.threads.runs.submit_tool_outputs_stream( thread_id=stream.current_run.thread_id, run_id=event.data.id, tool_outputs=tool_outputs, ) as action_stream: for text in action_stream.text_deltas: local_message, storage = process_text_chunk(text, storage) if local_message is not None: chat_history[-1][1] += local_message yield [chat_history, storage] action_stream.close() stream.until_done() print("") return [chat_history, storage] except Exception as e: print(e) chat_history[-1][1] = "Error occured during processing your message. Please try again" yield [chat_history, storage] def check_moderation_flag(message): moderation_response = client.moderations.create(input=message, model="omni-moderation-latest") print("Moderation respones: ", moderation_response) flagged = moderation_response.results[0].flagged return flagged def process_user_input(text, thread_id, chat_history, storage): print("User input: ", text) is_flagged = check_moderation_flag(text) print("Check is flagged:", is_flagged) if is_flagged: chat_history[-1][1] = "Your request contains some inappropriate information. We cannot proceed with it." yield [chat_history, storage] else: add_message_to_openai(text, thread_id) for response in handle_events(thread_id, chat_history, storage): yield response def initiate_chatting(chat_history, storage): threadId = storage["threadId"] chat_history = [[None, ""]] for response in process_user_input(initial_message, threadId, chat_history, storage): yield response def respond_on_user_msg(chat_history, storage): message = chat_history[-1][0] threadId = storage["threadId"] print("Responding for threadId: ", threadId) chat_history[-1][1] = "" for response in process_user_input(message, threadId, chat_history, storage): yield response def create_chat_tab(): msg = gr.Textbox(label="Answer") storage = gr.State({"accumulative_string": ""}) chatbot = gr.Chatbot(label="Board of Advisors Assistant", line_breaks=False, height=300, show_label=False, show_share_button=False, elem_id="chatbot") def user(user_message, history): return "", history + [[user_message, None]] def disable_msg(): message_box = gr.Textbox(value=None, interactive=False) return message_box def enable_msg(): message_box = gr.Textbox(value=None, interactive=True) return message_box add_user_message_flow = [user, [msg,chatbot], [msg,chatbot]] chat_response_flow = [respond_on_user_msg, [chatbot, storage], [chatbot, storage]] disable_msg_flow = [disable_msg, None, msg] enable_msg_flow = [enable_msg, None, msg] with gr.Blocks(css=CSS, fill_height=True) as chat_view: storage.render() with gr.Row(elem_id="main_container"): with gr.Column(scale=4): chatbot.render() examples = gr.Examples(examples=[ "I need someone that can help me with real estate in Texas", "I'm looking for help with payment system for my business", "I need help to develop my leadership skills"], inputs=msg, ) msg.render() print(gr.Request) msg.submit(*add_user_message_flow ).then(*disable_msg_flow ).then(*chat_response_flow ).then(*enable_msg_flow) examples.load_input_event.then(*add_user_message_flow ).then(*disable_msg_flow ).then(*chat_response_flow ).then(*enable_msg_flow) chat_view.load(*disable_msg_flow ).then(create_thread_openai, inputs=storage, outputs=storage ).then(initiate_chatting, inputs=[chatbot, storage], outputs=[chatbot, storage] ).then(*enable_msg_flow) return chat_view if __name__ == "__main__": chat_view = create_chat_tab() chat_view.launch()