Spaces:
Sleeping
Sleeping
import os | |
import gradio as gr | |
from typing import List, Dict, Tuple, Optional | |
# Azure AI Agents SDK (API key auth) | |
from azure.core.credentials import AzureKeyCredential | |
from azure.ai.agents import AgentsClient | |
from azure.ai.agents.models import ( | |
FilePurpose, | |
CodeInterpreterTool, | |
ListSortOrder, | |
MessageRole, | |
) | |
# ----------------- Core Agent Helpers ----------------- | |
def init_agent( | |
endpoint: str, | |
api_key: str, | |
model_deployment: str, | |
data_file_path: Optional[str], | |
) -> dict: | |
""" | |
Initialize an Azure AI Agent with an optional data file for the Code Interpreter. | |
Returns a session dict containing client, agent_id, thread_id, etc. | |
""" | |
if not endpoint or not api_key or not model_deployment: | |
raise ValueError("Please provide endpoint, key, and model deployment name.") | |
client = AgentsClient( | |
endpoint=endpoint.strip(), | |
credential=AzureKeyCredential(api_key.strip()), | |
) | |
# Optionally upload file and bind it to a Code Interpreter tool | |
code_interpreter = None | |
if data_file_path: | |
uploaded = client.files.upload_and_poll( | |
file_path=data_file_path, | |
purpose=FilePurpose.AGENTS | |
) | |
code_interpreter = CodeInterpreterTool(file_ids=[uploaded.id]) | |
# Create the agent (attach tools only if present) | |
agent = client.create_agent( | |
model=model_deployment.strip(), | |
name="data-agent", | |
instructions=( | |
"You are an AI agent that analyzes the uploaded data when present. " | |
"Use Python via the Code Interpreter to compute statistical metrics " | |
"or produce text-based charts when asked. If no file is provided, " | |
"proceed with normal reasoning." | |
), | |
tools=(code_interpreter.definitions if code_interpreter else None), | |
tool_resources=(code_interpreter.resources if code_interpreter else None), | |
) | |
# Create a thread for the conversation | |
thread = client.threads.create() | |
# Session we keep in Gradio state | |
return { | |
"endpoint": endpoint.strip(), | |
"api_key": api_key.strip(), | |
"model": model_deployment.strip(), | |
"client": client, | |
"agent_id": agent.id, | |
"thread_id": thread.id, | |
"has_file": bool(data_file_path), | |
"uploaded_path": data_file_path, | |
} | |
def send_to_agent(user_msg: str, session: dict) -> Tuple[str, str]: | |
""" | |
Send a message to the existing agent thread and return: | |
- agent_reply (str) | |
- history_str (str) readable, chronological log | |
""" | |
if not session or "client" not in session: | |
raise ValueError("Agent is not initialized. Click 'Connect & Prepare' first.") | |
client: AgentsClient = session["client"] | |
agent_id = session["agent_id"] | |
thread_id = session["thread_id"] | |
# Add user message | |
client.messages.create( | |
thread_id=thread_id, | |
role="user", | |
content=user_msg, | |
) | |
# Run and wait for completion | |
run = client.runs.create_and_process(thread_id=thread_id, agent_id=agent_id) | |
if getattr(run, "status", None) == "failed": | |
last_error = getattr(run, "last_error", "Unknown error") | |
return f"Run failed: {last_error}", "" | |
# Get last agent message text | |
last_msg = client.messages.get_last_message_text_by_role( | |
thread_id=thread_id, | |
role=MessageRole.AGENT, | |
) | |
agent_reply = last_msg.text.value if last_msg else "(No reply text found.)" | |
# Build readable history (chronological) | |
history_lines = [] | |
messages = client.messages.list(thread_id=thread_id, order=ListSortOrder.ASCENDING) | |
for m in messages: | |
if m.text_messages: | |
last_text = m.text_messages[-1].text.value | |
history_lines.append(f"{m.role}: {last_text}") | |
history_str = "\n\n".join(history_lines) | |
return agent_reply, history_str | |
def teardown(session: dict) -> str: | |
""" | |
Delete the agent to reduce costs. (Threads are retained by service.) | |
""" | |
if not session: | |
return "Nothing to clean up." | |
messages = [] | |
try: | |
client: AgentsClient = session.get("client") | |
agent_id = session.get("agent_id") | |
if client and agent_id: | |
client.delete_agent(agent_id) | |
messages.append("Deleted agent.") | |
except Exception as e: | |
messages.append(f"Cleanup warning: {e}") | |
return " ".join(messages) if messages else "Cleanup complete." | |
# ----------------- Gradio App ----------------- | |
with gr.Blocks(title="Azure AI Agent (Endpoint+Key) — Gradio") as demo: | |
gr.Markdown( | |
"## Azure AI Agent (Code Interpreter Ready)\n" | |
"Enter your **Project Endpoint** and **Key**, set your **Model Deployment** (e.g., `gpt-4o`), " | |
"optionally upload a data file (TXT/CSV), then chat.\n" | |
"Click **Connect & Prepare Agent** once, then send prompts." | |
) | |
with gr.Row(): | |
endpoint = gr.Textbox(label="Project Endpoint", placeholder="https://<your-project-endpoint>") | |
api_key = gr.Textbox(label="Project Key", placeholder="paste your key", type="password") | |
with gr.Row(): | |
model = gr.Textbox(label="Model Deployment Name", value="gpt-4o") | |
data_file = gr.File( | |
label="Optional data file (txt/csv) for Code Interpreter", | |
file_types=[".txt", ".csv"], | |
type="filepath" # returns a filesystem path string | |
) | |
session_state = gr.State(value=None) | |
connect_btn = gr.Button("🔌 Connect & Prepare Agent", variant="primary") | |
connect_status = gr.Markdown("") | |
# Use messages-format chatbot | |
with gr.Row(): | |
chatbot = gr.Chatbot( | |
label="Conversation", | |
height=420, | |
type="messages", # openai-style dicts: {"role": "...", "content": "..."} | |
) | |
user_input = gr.Textbox(label="Your message", placeholder="Ask a question or request a chart…") | |
with gr.Row(): | |
send_btn = gr.Button("Send ▶") | |
cleanup_btn = gr.Button("Delete Agent & Cleanup 🧹") | |
history = gr.Textbox(label="Conversation Log (chronological)", lines=12) | |
# --------- Callbacks --------- | |
def on_connect(ep, key, mdl, fpath): | |
try: | |
sess = init_agent(ep, key, mdl, fpath) | |
return sess, "✅ Connected. Agent and thread are ready." | |
except Exception as e: | |
return None, f"❌ Connection error: {e}" | |
connect_btn.click( | |
fn=on_connect, | |
inputs=[endpoint, api_key, model, data_file], | |
outputs=[session_state, connect_status], | |
) | |
def on_send(msg: str, session: dict, chat_msgs: List[Dict[str, str]]): | |
""" | |
chat_msgs is a list of dicts with 'role' and 'content' (messages format). | |
We append the user's message and the assistant's reply in that same format. | |
""" | |
if not msg: | |
return gr.update(), gr.update(), gr.update(value="Please enter a message.") | |
try: | |
agent_reply, log = send_to_agent(msg, session) | |
# Build updated chat message list | |
chat_msgs = (chat_msgs or []) + [ | |
{"role": "user", "content": msg}, | |
{"role": "assistant", "content": agent_reply}, | |
] | |
return chat_msgs, "", gr.update(value=log) # clear user input after send | |
except Exception as e: | |
# Keep chat as-is, show error in history box | |
return chat_msgs, msg, gr.update(value=f"❌ Error: {e}") | |
send_btn.click( | |
fn=on_send, | |
inputs=[user_input, session_state, chatbot], | |
outputs=[chatbot, user_input, history], | |
) | |
def on_cleanup(session): | |
try: | |
msg = teardown(session) | |
return None, f"🧹 {msg}" | |
except Exception as e: | |
return session, f"⚠️ Cleanup error: {e}" | |
cleanup_btn.click( | |
fn=on_cleanup, | |
inputs=[session_state], | |
outputs=[session_state, connect_status], | |
) | |
if __name__ == "__main__": | |
# If deploying to spaces/containers you can set server_name/port via env if needed | |
demo.launch() | |