Spaces:
Sleeping
Sleeping
from openai import AsyncAssistantEventHandler | |
from openai import AsyncOpenAI | |
import gradio as gr | |
import asyncio | |
# Set your OpenAI API key here | |
client = AsyncOpenAI( | |
api_key="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" | |
) | |
assistantID = "asst_7xyER9PDcv13UJ22U2zz4x1z" | |
mytitle = "<h1 align=center>Wat war lass am Land 🇱🇺 an op der Welt 🌎 ?</h1>" | |
mydescription=""" | |
<h3 align='center'>Wat fir een Thema interesséiert Iech : 🐶 🏃🏻♂️ 🌗 🍇 🌈 🍽️ 🏆 🚘 ✈️ 🩺 </h3> | |
<table width=100%> | |
<tr> | |
<th width=50% bgcolor="Moccasin">Stell deng Froen op Lëtzebuergesch, oder an enger anerer Sprooch :</th> | |
<th bgcolor="Khaki">Äntwert vum OpenAI File-Search Assistent : </th> | |
</tr> | |
</table> | |
""" | |
myarticle =""" | |
<h3>Hannergrënn :</h3> | |
<p>Dës HuggingFace Space Demo gouf vum <a href="https://github.com/mbarnig">Marco Barnig</a> realiséiert. | |
Als kënstlech Intelligenz gëtt, mëttels API, den <a href="https://platform.openai.com/docs/models">OpenAI Modell</a> | |
gpt-4o-mini-2024-07-18 benotzt, deen als Kontext bis 128.000 Tokens ka benotzen, eng Äntwert op eng Fro vu maximal 16.384 | |
Tokens ka ginn a bis zu 200.000 Tokens pro Minutt (TPM) ka beaarbechten. Fir dës Demo ginn nëmmen 6 News-JSON-Datei mat | |
enger Gréisst vun je 30 MB benotzt. Et ass méiglech bis zu 10.000 Dateien op en OpenAI Assistent opzelueden. | |
D'Äntwerte vun de Beispiller sinn am Cache gespäichert a ginn duerfir ouni Delai ugewise.</p> | |
""" | |
myinput = gr.Textbox(lines=3, label="Wat wëllt Der wëssen ?") | |
myexamples = [ | |
"Wat war lass am Juni 2023 ?", | |
"Wat ass gewosst iwwert de SREL ?", | |
"Wat fir eng Katastroph war 2022 zu Lëtzebuerg ?", | |
"Koumen an de leschte Jore gréisser Kriminalfäll viru Geriicht ?" | |
] | |
class EventHandler(AsyncAssistantEventHandler): | |
def __init__(self) -> None: | |
super().__init__() | |
self.response_text = "" | |
async def on_text_created(self, text) -> None: | |
self.response_text += str(text) | |
async def on_text_delta(self, delta, snapshot): | |
self.response_text += str(delta.value) | |
async def on_text_done(self, text): | |
pass | |
async def on_tool_call_created(self, tool_call): | |
self.response_text += f"\n[Tool Call]: {str(tool_call.type)}\n" | |
async def on_tool_call_delta(self, delta, snapshot): | |
if snapshot.id != getattr(self, "current_tool_call", None): | |
self.current_tool_call = snapshot.id | |
self.response_text += f"\n[Tool Call Delta]: {str(delta.type)}\n" | |
if delta.type == 'code_interpreter': | |
if delta.code_interpreter.input: | |
self.response_text += str(delta.code_interpreter.input) | |
if delta.code_interpreter.outputs: | |
self.response_text += "\n\n[Output]:\n" | |
for output in delta.code_interpreter.outputs: | |
if output.type == "logs": | |
self.response_text += f"\n{str(output.logs)}" | |
async def on_tool_call_done(self, text): | |
pass | |
# Initialize session variables | |
session_data = {"assistant_id": assistantID, "thread_id": None} | |
async def initialize_thread(): | |
# Create a Thread | |
thread = await client.beta.threads.create() | |
# Store thread ID in session_data for later use | |
session_data["thread_id"] = thread.id | |
async def generate_response(user_input): | |
assistant_id = session_data["assistant_id"] | |
thread_id = session_data["thread_id"] | |
# Add a Message to the Thread | |
oai_message = await client.beta.threads.messages.create( | |
thread_id=thread_id, | |
role="user", | |
content=user_input | |
) | |
# Create and Stream a Run | |
event_handler = EventHandler() | |
async with client.beta.threads.runs.stream( | |
thread_id=thread_id, | |
assistant_id=assistant_id, | |
instructions="Please assist the user with their query.", | |
event_handler=event_handler, | |
) as stream: | |
# Yield incremental updates | |
async for _ in stream: | |
await asyncio.sleep(0.1) # Small delay to mimic streaming | |
yield event_handler.response_text | |
# Gradio interface function (generator) | |
async def gradio_chat_interface(user_input): | |
# Create a new event loop if none exists (or if we are in a new thread) | |
try: | |
loop = asyncio.get_running_loop() | |
except RuntimeError: | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
# Initialize the thread if not already done | |
if session_data["thread_id"] is None: | |
await initialize_thread() | |
# Generate and yield responses | |
async for response in generate_response(user_input): | |
yield response | |
# Set up Gradio interface with streaming | |
interface = gr.Interface( | |
fn=gradio_chat_interface, | |
inputs=myinput, | |
outputs="markdown", | |
title=mytitle, | |
description=mydescription, | |
article=myarticle, | |
live=False, | |
allow_flagging="never", | |
examples=myexamples | |
) | |
# Launch the Gradio app | |
interface.launch() |