ba / app.py
andriydovgal's picture
Update app.py
83645f7 verified
from openai import OpenAI
import gradio as gr
import json
from bot_actions import functions_dictionary
import os
CSS ="""
.contain { display: flex; flex-direction: column; }
.svelte-vt1mxs div:first-child { flex-grow: 1; overflow: auto;}
#chatbot { flex-grow: 1; overflow: auto;}
footer {display: none !important;}
.app.svelte-182fdeq.svelte-182fdeq {
max-width: 100vw !important;
}
#main_container {
height: 95vh;
}
#markup_container {
height: 100%;
overflow:auto;
}
"""
openAIToken = os.environ['openAIToken']
assistantId = os.environ['assistantId']
initial_message = os.environ['initialMessage']
client = OpenAI(api_key=openAIToken)
def handle_requires_action(data):
actions_results = []
for tool in data.required_action.submit_tool_outputs.tool_calls:
function_name = tool.function.name
function_args = json.loads(tool.function.arguments)
print(function_name)
print(function_args)
try:
result = functions_dictionary[tool.function.name](**function_args)
print("Function result:", result)
actions_results.append({"tool_output" : {"tool_call_id": tool.id, "output": result["message"]}})
except Exception as e:
print(e)
# Submit all tool_outputs at the same time
return actions_results
def create_thread_openai(sessionStorage):
streaming_thread = client.beta.threads.create()
sessionStorage["threadId"] = streaming_thread.id
return sessionStorage
def add_message_to_openai(text, threadId):
print("User message: ", text)
return client.beta.threads.messages.create(
thread_id=threadId,
role="user",
content=text
)
def process_text_chunk(text, storage):
print(text, end="", flush=True)
local_message = None
accumulative_string = storage["accumulative_string"] + text
local_message = accumulative_string
return local_message, storage
def handle_events(threadId, chat_history, storage):
storage.update({
"accumulative_string" : "",
"markup_string": "",
})
try:
with client.beta.threads.runs.stream(
thread_id=threadId,
assistant_id=assistantId
) as stream:
for event in stream:
if event.event == "thread.message.delta" and event.data.delta.content:
text = event.data.delta.content[0].text.value
local_message, storage = process_text_chunk(text, storage)
if local_message is not None:
chat_history[-1][1] += local_message
yield [chat_history, storage]
if event.event == 'thread.run.requires_action':
result = handle_requires_action(event.data)
tool_outputs = [x["tool_output"] for x in result]
with client.beta.threads.runs.submit_tool_outputs_stream(
thread_id=stream.current_run.thread_id,
run_id=event.data.id,
tool_outputs=tool_outputs,
) as action_stream:
for text in action_stream.text_deltas:
local_message, storage = process_text_chunk(text, storage)
if local_message is not None:
chat_history[-1][1] += local_message
yield [chat_history, storage]
action_stream.close()
stream.until_done()
print("")
return [chat_history, storage]
except Exception as e:
print(e)
chat_history[-1][1] = "Error occured during processing your message. Please try again"
yield [chat_history, storage]
def check_moderation_flag(message):
moderation_response = client.moderations.create(input=message, model="omni-moderation-latest")
print("Moderation respones: ", moderation_response)
flagged = moderation_response.results[0].flagged
return flagged
def process_user_input(text, thread_id, chat_history, storage):
print("User input: ", text)
is_flagged = check_moderation_flag(text)
print("Check is flagged:", is_flagged)
if is_flagged:
chat_history[-1][1] = "Your request contains some inappropriate information. We cannot proceed with it."
yield [chat_history, storage]
else:
add_message_to_openai(text, thread_id)
for response in handle_events(thread_id, chat_history, storage):
yield response
def initiate_chatting(chat_history, storage):
threadId = storage["threadId"]
chat_history = [[None, ""]]
for response in process_user_input(initial_message, threadId, chat_history, storage):
yield response
def respond_on_user_msg(chat_history, storage):
message = chat_history[-1][0]
threadId = storage["threadId"]
print("Responding for threadId: ", threadId)
chat_history[-1][1] = ""
for response in process_user_input(message, threadId, chat_history, storage):
yield response
def create_chat_tab():
msg = gr.Textbox(label="Answer")
storage = gr.State({"accumulative_string": ""})
chatbot = gr.Chatbot(label="Board of Advisors Assistant", line_breaks=False, height=300, show_label=False, show_share_button=False, elem_id="chatbot")
def user(user_message, history):
return "", history + [[user_message, None]]
def disable_msg():
message_box = gr.Textbox(value=None, interactive=False)
return message_box
def enable_msg():
message_box = gr.Textbox(value=None, interactive=True)
return message_box
add_user_message_flow = [user, [msg,chatbot], [msg,chatbot]]
chat_response_flow = [respond_on_user_msg, [chatbot, storage], [chatbot, storage]]
disable_msg_flow = [disable_msg, None, msg]
enable_msg_flow = [enable_msg, None, msg]
with gr.Blocks(css=CSS, fill_height=True) as chat_view:
storage.render()
with gr.Row(elem_id="main_container"):
with gr.Column(scale=4):
chatbot.render()
examples = gr.Examples(examples=[
"I need someone that can help me with real estate in Texas",
"I'm looking for help with payment system for my business",
"I need help to develop my leadership skills"],
inputs=msg,
)
msg.render()
print(gr.Request)
msg.submit(*add_user_message_flow
).then(*disable_msg_flow
).then(*chat_response_flow
).then(*enable_msg_flow)
examples.load_input_event.then(*add_user_message_flow
).then(*disable_msg_flow
).then(*chat_response_flow
).then(*enable_msg_flow)
chat_view.load(*disable_msg_flow
).then(create_thread_openai, inputs=storage, outputs=storage
).then(initiate_chatting, inputs=[chatbot, storage], outputs=[chatbot, storage]
).then(*enable_msg_flow)
return chat_view
if __name__ == "__main__":
chat_view = create_chat_tab()
chat_view.launch()