oleksandrburlakov's picture
Remove height restriction
9bf3fc7 verified
from openai import OpenAI
import gradio as gr
import json
from bot_actions import functions_dictionary, save_record
import os
CSS ="""
.contain { display: flex; flex-direction: column; }
.svelte-vt1mxs div:first-child { flex-grow: 1; overflow: auto;}
#chatbot { flex-grow: 1; overflow: auto;}
footer {display: none !important;}
.app.svelte-182fdeq.svelte-182fdeq {
max-width: 100vw !important;
}
#main_container {
height: 95vh;
}
#markup_container {
height: 100%;
overflow:auto;
}
"""
openAIToken = os.environ['openAIToken']
assistantId = os.environ['assistantId']
initial_message = os.environ['initialMessage']
client = OpenAI(api_key=openAIToken)
def handle_requires_action(data):
actions_results = []
for tool in data.required_action.submit_tool_outputs.tool_calls:
function_name = tool.function.name
function_args = json.loads(tool.function.arguments)
print(function_name)
print(function_args)
try:
result = functions_dictionary[tool.function.name](**function_args)
print("Function result:", result)
actions_results.append({"tool_output" : {"tool_call_id": tool.id, "output": result["message"]}})
except Exception as e:
print(e)
# Submit all tool_outputs at the same time
return actions_results
def create_thread_openai(sessionStorage):
streaming_thread = client.beta.threads.create()
sessionStorage["threadId"] = streaming_thread.id
return sessionStorage
def add_message_to_openai(text, threadId):
print("User message: ", text)
return client.beta.threads.messages.create(
thread_id=threadId,
role="user",
content=text
)
def transform_suggestions_into_list(string_of_suggestions):
local_message = None
parts = string_of_suggestions.split('#s#')
list_of_suggestions = json.loads(parts[0])
list_of_suggestions = [ x for x in list_of_suggestions if x]
if len(parts) > 1:
local_message = parts[1]
return list_of_suggestions, local_message
def create_suggestions_list(suggestions):
update_show = [gr.update(visible=True, value=w) for w in suggestions]
update_hide = [gr.update(visible=False, value="") for _ in range(6-len(suggestions))]
return update_show + update_hide
def process_text_chunk(text, storage):
print(text, end="", flush=True)
local_message = None
if "[" in text:
storage["is_loading_suggestions"] = True
if "#" in text and storage["is_loading_suggestions"] != True:
storage["is_loading_markup"] = True
if "<" in text:
storage["is_loading_suggestions"] = False
storage["is_loading_markup"] = False
storage["is_loading_svg"] = True
if storage["is_loading_suggestions"] == True or storage["is_loading_markup"] == True or storage["is_loading_svg"] == True:
accumulative_string = storage["accumulative_string"] + text
if storage["is_loading_suggestions"] == True:
if "#s#" in accumulative_string:
storage["is_loading_suggestions"] = False
list_of_suggestions, local_message = transform_suggestions_into_list(accumulative_string)
storage["list_of_suggestions"] = list_of_suggestions
accumulative_string = ""
elif "]" in accumulative_string and "]#" not in accumulative_string and not accumulative_string.endswith("]"):
storage["is_loading_suggestions"] = False
local_message = accumulative_string
accumulative_string = ""
elif storage["is_loading_markup"]:
if "#p#" in accumulative_string:
parts = accumulative_string.split("#p#")
if len(parts) > 2:
accumulative_string = parts[0] + parts[2]
storage["markup_string"] = parts[1]
storage["is_loading_markup"] = False
else:
local_message = parts[0]
accumulative_string = "#p#" + parts[1]
storage["markup_string"] = parts[1]
elif "#" in accumulative_string and "#p" not in accumulative_string and not accumulative_string.endswith("#"):
storage["is_loading_markup"] = False
local_message = accumulative_string
accumulative_string = ""
else:
if "<" in accumulative_string and "<s" not in accumulative_string and not accumulative_string.endswith("<"):
storage["is_loading_svg"] = False
local_message = accumulative_string
accumulative_string = ""
elif "<svg" in accumulative_string:
parts = accumulative_string.split("<svg")
if "#p#" in parts[0]:
info_parts = parts[0].split('#p#')
local_message = info_parts[0]
else:
local_message = parts[0]
if "</svg>" in parts[1]:
svg_ending = ("<svg" + parts[1]).split('</svg>')
storage["svg"] = svg_ending[0] + '</svg>'
accumulative_string = svg_ending[1]
storage["is_loading_svg"] = False
else:
accumulative_string = "<svg" + parts[1]
storage["svg"] = accumulative_string
storage["accumulative_string"] = accumulative_string
else:
local_message = text
return local_message, storage
def handle_events(threadId, chat_history, storage):
storage.update({
"list_of_suggestions" : [],
"is_loading_suggestions" : False,
"is_loading_markup" : False,
"is_loading_svg": False,
"accumulative_string" : "",
"markup_string": "",
"svg": ""
})
try:
with client.beta.threads.runs.stream(
thread_id=threadId,
assistant_id=assistantId
) as stream:
for event in stream:
if event.event == "thread.message.delta" and event.data.delta.content:
text = event.data.delta.content[0].text.value
local_message, storage = process_text_chunk(text, storage)
if local_message is not None:
chat_history[-1][1] += local_message
yield [ chat_history, storage, storage["markup_string"], storage["svg"]]
if event.event == 'thread.run.requires_action':
result = handle_requires_action(event.data)
tool_outputs = [x["tool_output"] for x in result]
with client.beta.threads.runs.submit_tool_outputs_stream(
thread_id=stream.current_run.thread_id,
run_id=event.data.id,
tool_outputs=tool_outputs,
) as action_stream:
for text in action_stream.text_deltas:
local_message, storage = process_text_chunk(text, storage)
if local_message is not None:
chat_history[-1][1] += local_message
yield [chat_history, storage, storage["markup_string"], storage["svg"]]
action_stream.close()
stream.until_done()
print("")
return [chat_history, storage, storage["markup_string"], storage["svg"]]
except Exception as e:
print(e)
chat_history[-1][1] = "Error occured during processing your message. Please try again"
yield [chat_history, storage, storage["markup_string"], storage["svg"]]
def initiate_chatting(chat_history, storage):
threadId = storage["threadId"]
chat_history = [[None, ""]]
add_message_to_openai(initial_message, threadId)
for response in handle_events(threadId, chat_history, storage):
yield response
def respond_on_user_msg(chat_history, storage):
message = chat_history[-1][0]
threadId = storage["threadId"]
print("Responding for threadId: ", threadId)
chat_history[-1][1] = ""
add_message_to_openai(message, threadId)
for response in handle_events(threadId, chat_history, storage):
yield response
def create_application():
with gr.Blocks(css=CSS, fill_height=True) as demo:
storage = gr.State({"list_of_suggestions": [], "is_loading_suggestions": False, "is_loading_markup": False, "accumulative_string": "", "markup_string": ""})
btn_list = []
with gr.Row(elem_id="main_container"):
with gr.Column(scale=4):
chatbot = gr.Chatbot(label="Facility managment bot", line_breaks=False, height=300, show_label=False, show_share_button=False, elem_id="chatbot")
with gr.Row():
for i in range(6):
btn = gr.Button(visible=False, size="sm")
btn_list.append(btn)
msg = gr.Textbox(label="Answer", interactive=False)
with gr.Column(scale=1, elem_id="markup_container"):
markdown = gr.Markdown(label="Bullet-list", value="# Facility information")
with gr.Row(variant="compact"):
svg_container = gr.HTML(label="SVG Container", value="""""")
def user(user_message, history):
return "", history + [[user_message, None]]
def update_suggestions(storage):
list_of_suggestions = storage['list_of_suggestions'] or []
btn_list = create_suggestions_list(list_of_suggestions)
return btn_list
def hide_suggestions():
return [gr.update(visible=False, value="") for _ in range(6)]
def disable_msg():
message_box = gr.Textbox(value=None, interactive=False)
return message_box
def enable_msg():
message_box = gr.Textbox(value=None, interactive=True)
return message_box
add_user_message_flow = [user, [msg,chatbot], [msg,chatbot]]
chat_response_flow = [respond_on_user_msg, [chatbot, storage], [chatbot, storage, markdown, svg_container]]
update_suggestions_flow = [update_suggestions, storage, btn_list]
hide_suggestions_flow = [hide_suggestions, None, btn_list]
disable_msg_flow = [disable_msg, None, msg]
enable_msg_flow = [enable_msg, None, msg]
msg.submit(*add_user_message_flow
).then(*hide_suggestions_flow
).then(*disable_msg_flow
).then(*chat_response_flow
).then(*update_suggestions_flow
).then(*enable_msg_flow)
for sug_btn in btn_list:
add_suggestion_message_flow = [user, [sug_btn, chatbot], [msg, chatbot]]
sug_btn.click(*add_suggestion_message_flow
).then(*hide_suggestions_flow
).then(*disable_msg_flow
).then(*chat_response_flow
).then(*update_suggestions_flow
).then(*enable_msg_flow)
demo.load(create_thread_openai, inputs=storage, outputs=storage
).then(initiate_chatting, inputs=[chatbot, storage], outputs=[chatbot, storage, markdown, svg_container]
).then(*update_suggestions_flow
).then(*enable_msg_flow)
return demo
if __name__ == "__main__":
demo = create_application()
demo.launch()