Lab2_Extension / app.py
eaglelandsonce's picture
Update app.py
fed29da verified
import os
import gradio as gr
from typing import List, Dict, Tuple, Optional
# Azure AI Agents SDK (API key auth)
from azure.core.credentials import AzureKeyCredential
from azure.ai.agents import AgentsClient
from azure.ai.agents.models import (
FilePurpose,
CodeInterpreterTool,
ListSortOrder,
MessageRole,
)
# ----------------- Core Agent Helpers -----------------
def init_agent(
endpoint: str,
api_key: str,
model_deployment: str,
data_file_path: Optional[str],
) -> dict:
"""
Initialize an Azure AI Agent with an optional data file for the Code Interpreter.
Returns a session dict containing client, agent_id, thread_id, etc.
"""
if not endpoint or not api_key or not model_deployment:
raise ValueError("Please provide endpoint, key, and model deployment name.")
client = AgentsClient(
endpoint=endpoint.strip(),
credential=AzureKeyCredential(api_key.strip()),
)
# Optionally upload file and bind it to a Code Interpreter tool
code_interpreter = None
if data_file_path:
uploaded = client.files.upload_and_poll(
file_path=data_file_path,
purpose=FilePurpose.AGENTS
)
code_interpreter = CodeInterpreterTool(file_ids=[uploaded.id])
# Create the agent (attach tools only if present)
agent = client.create_agent(
model=model_deployment.strip(),
name="data-agent",
instructions=(
"You are an AI agent that analyzes the uploaded data when present. "
"Use Python via the Code Interpreter to compute statistical metrics "
"or produce text-based charts when asked. If no file is provided, "
"proceed with normal reasoning."
),
tools=(code_interpreter.definitions if code_interpreter else None),
tool_resources=(code_interpreter.resources if code_interpreter else None),
)
# Create a thread for the conversation
thread = client.threads.create()
# Session we keep in Gradio state
return {
"endpoint": endpoint.strip(),
"api_key": api_key.strip(),
"model": model_deployment.strip(),
"client": client,
"agent_id": agent.id,
"thread_id": thread.id,
"has_file": bool(data_file_path),
"uploaded_path": data_file_path,
}
def send_to_agent(user_msg: str, session: dict) -> Tuple[str, str]:
"""
Send a message to the existing agent thread and return:
- agent_reply (str)
- history_str (str) readable, chronological log
"""
if not session or "client" not in session:
raise ValueError("Agent is not initialized. Click 'Connect & Prepare' first.")
client: AgentsClient = session["client"]
agent_id = session["agent_id"]
thread_id = session["thread_id"]
# Add user message
client.messages.create(
thread_id=thread_id,
role="user",
content=user_msg,
)
# Run and wait for completion
run = client.runs.create_and_process(thread_id=thread_id, agent_id=agent_id)
if getattr(run, "status", None) == "failed":
last_error = getattr(run, "last_error", "Unknown error")
return f"Run failed: {last_error}", ""
# Get last agent message text
last_msg = client.messages.get_last_message_text_by_role(
thread_id=thread_id,
role=MessageRole.AGENT,
)
agent_reply = last_msg.text.value if last_msg else "(No reply text found.)"
# Build readable history (chronological)
history_lines = []
messages = client.messages.list(thread_id=thread_id, order=ListSortOrder.ASCENDING)
for m in messages:
if m.text_messages:
last_text = m.text_messages[-1].text.value
history_lines.append(f"{m.role}: {last_text}")
history_str = "\n\n".join(history_lines)
return agent_reply, history_str
def teardown(session: dict) -> str:
"""
Delete the agent to reduce costs. (Threads are retained by service.)
"""
if not session:
return "Nothing to clean up."
messages = []
try:
client: AgentsClient = session.get("client")
agent_id = session.get("agent_id")
if client and agent_id:
client.delete_agent(agent_id)
messages.append("Deleted agent.")
except Exception as e:
messages.append(f"Cleanup warning: {e}")
return " ".join(messages) if messages else "Cleanup complete."
# ----------------- Gradio App -----------------
with gr.Blocks(title="Azure AI Agent (Endpoint+Key) — Gradio") as demo:
gr.Markdown(
"## Azure AI Agent (Code Interpreter Ready)\n"
"Enter your **Project Endpoint** and **Key**, set your **Model Deployment** (e.g., `gpt-4o`), "
"optionally upload a data file (TXT/CSV), then chat.\n"
"Click **Connect & Prepare Agent** once, then send prompts."
)
with gr.Row():
endpoint = gr.Textbox(label="Project Endpoint", placeholder="https://<your-project-endpoint>")
api_key = gr.Textbox(label="Project Key", placeholder="paste your key", type="password")
with gr.Row():
model = gr.Textbox(label="Model Deployment Name", value="gpt-4o")
data_file = gr.File(
label="Optional data file (txt/csv) for Code Interpreter",
file_types=[".txt", ".csv"],
type="filepath" # returns a filesystem path string
)
session_state = gr.State(value=None)
connect_btn = gr.Button("🔌 Connect & Prepare Agent", variant="primary")
connect_status = gr.Markdown("")
# Use messages-format chatbot
with gr.Row():
chatbot = gr.Chatbot(
label="Conversation",
height=420,
type="messages", # openai-style dicts: {"role": "...", "content": "..."}
)
user_input = gr.Textbox(label="Your message", placeholder="Ask a question or request a chart…")
with gr.Row():
send_btn = gr.Button("Send ▶")
cleanup_btn = gr.Button("Delete Agent & Cleanup 🧹")
history = gr.Textbox(label="Conversation Log (chronological)", lines=12)
# --------- Callbacks ---------
def on_connect(ep, key, mdl, fpath):
try:
sess = init_agent(ep, key, mdl, fpath)
return sess, "✅ Connected. Agent and thread are ready."
except Exception as e:
return None, f"❌ Connection error: {e}"
connect_btn.click(
fn=on_connect,
inputs=[endpoint, api_key, model, data_file],
outputs=[session_state, connect_status],
)
def on_send(msg: str, session: dict, chat_msgs: List[Dict[str, str]]):
"""
chat_msgs is a list of dicts with 'role' and 'content' (messages format).
We append the user's message and the assistant's reply in that same format.
"""
if not msg:
return gr.update(), gr.update(), gr.update(value="Please enter a message.")
try:
agent_reply, log = send_to_agent(msg, session)
# Build updated chat message list
chat_msgs = (chat_msgs or []) + [
{"role": "user", "content": msg},
{"role": "assistant", "content": agent_reply},
]
return chat_msgs, "", gr.update(value=log) # clear user input after send
except Exception as e:
# Keep chat as-is, show error in history box
return chat_msgs, msg, gr.update(value=f"❌ Error: {e}")
send_btn.click(
fn=on_send,
inputs=[user_input, session_state, chatbot],
outputs=[chatbot, user_input, history],
)
def on_cleanup(session):
try:
msg = teardown(session)
return None, f"🧹 {msg}"
except Exception as e:
return session, f"⚠️ Cleanup error: {e}"
cleanup_btn.click(
fn=on_cleanup,
inputs=[session_state],
outputs=[session_state, connect_status],
)
if __name__ == "__main__":
# If deploying to spaces/containers you can set server_name/port via env if needed
demo.launch()