Spaces:
Runtime error
Runtime error
import os | |
import gradio as gr | |
import requests | |
import pandas as pd | |
import asyncio | |
import time | |
from pathlib import Path | |
# LlamaIndex and tool imports | |
from llama_index.core.agent.workflow import AgentWorkflow, ReActAgent | |
from llama_index.core.tools import FunctionTool | |
from llama_index.tools.duckduckgo import DuckDuckGoSearchToolSpec | |
from llama_index.llms.azure_openai import AzureOpenAI | |
from youtube_transcript_api import YouTubeTranscriptApi, NoTranscriptFound | |
from bs4 import BeautifulSoup | |
import pdfplumber | |
import docx | |
import speech_recognition as sr | |
import base64 | |
import tempfile | |
import re | |
from io import BytesIO, StringIO | |
from dotenv import load_dotenv | |
load_dotenv() | |
# ------------------------------ | |
# 0. Define Azure OpenAI LLM | |
# ------------------------------ | |
api_key = os.getenv("AZURE_OPENAI_API_KEY") | |
azure_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT") | |
azure_api_version = os.getenv("AZURE_OPENAI_API_VERSION") | |
azure_deployment_name = os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME") | |
azure_model_name = os.getenv("AZURE_OPENAI_MODEL_NAME") | |
llm = AzureOpenAI( | |
engine=azure_deployment_name, | |
model=azure_model_name, | |
temperature=0.0, | |
azure_endpoint=azure_endpoint, | |
api_key=api_key, | |
api_version=azure_api_version, | |
) | |
# ------------------------------ | |
# 1. Helper Functions / Tools | |
# ------------------------------ | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
# File parsing tool | |
def parse_file(file_url: str, file_name: str) -> str: | |
try: | |
# Determine file type from file_name or URL | |
if len(file_name)>0: | |
file_type = Path(file_name).suffix.lower() | |
file_type = file_type.split("?")[0] | |
else: | |
file_type = None | |
# Remove query params | |
if file_type: | |
resp = requests.get(file_url, timeout=30) | |
resp.raise_for_status() | |
content = resp.content | |
# --- Excel Files --- | |
if file_type in [".xlsx", ".xls"]: | |
try: | |
df = pd.read_excel(BytesIO(content)) | |
return f"Excel Content:\n{df.head(5).to_string(index=False)}" # Only first 5 rows | |
except Exception as e: | |
return f"Excel parsing error: {str(e)}" | |
# --- CSV Files --- | |
elif file_type == ".csv": | |
try: | |
df = pd.read_csv(BytesIO(content)) | |
return f"CSV Content:\n{df.head(5).to_string(index=False)}" # Only first 5 rows | |
except Exception as e: | |
return f"CSV parsing error: {str(e)}" | |
# --- Text Files --- | |
elif file_type == ".txt": | |
text = content.decode(errors='ignore') | |
return f"Text Content:\n{text[:3500]}" | |
# --- PDF Files --- | |
elif file_type == ".pdf": | |
try: | |
with pdfplumber.open(BytesIO(content)) as pdf: | |
text = "\n".join(page.extract_text() or "" for page in pdf.pages[:3]) # First 3 pages | |
return f"PDF Content:\n{text[:3500]}" | |
except Exception as e: | |
return f"PDF parsing error: {str(e)}" | |
# --- DOCX Files --- | |
elif file_type == ".docx": | |
try: | |
d = docx.Document(BytesIO(content)) | |
text = "\n".join(p.text for p in d.paragraphs[:50]) # First 50 paragraphs | |
return f"DOCX Content:\n{text[:3500]}" | |
except Exception as e: | |
return f"DOCX parsing error: {str(e)}" | |
# --- MP3 Files --- | |
elif file_type == ".mp3": | |
return transcribe_audio(content) # Use helper function | |
# --- Python Files --- | |
elif file_type == ".py": | |
text = content.decode(errors='ignore') | |
return f"Python Code:\n{text[:3500]}" | |
# --- Unsupported Types --- | |
else: | |
return f"Unsupported file type: {file_type}" | |
else: | |
return "No file type provided or file URL is invalid." | |
except Exception as e: | |
print(f"[parse_file] ERROR: {e}") | |
return f"File parsing failed: {str(e)}" | |
# Audio transcription helper | |
def transcribe_audio(content: bytes) -> str: | |
try: | |
# Create temp files | |
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as mp3_tmp: | |
mp3_tmp.write(content) | |
mp3_path = mp3_tmp.name | |
wav_path = mp3_path.replace(".mp3", ".wav") | |
# Convert to WAV | |
try: | |
from pydub import AudioSegment | |
audio = AudioSegment.from_mp3(mp3_path) | |
audio.export(wav_path, format="wav") | |
audio_file = wav_path | |
except ImportError: | |
audio_file = mp3_path # Fallback to MP3 if pydub not available | |
# Transcribe audio | |
recognizer = sr.Recognizer() | |
with sr.AudioFile(audio_file) as source: | |
audio = recognizer.record(source) | |
transcript = recognizer.recognize_google(audio) | |
# Cleanup | |
for path in [mp3_path, wav_path]: | |
if os.path.exists(path): | |
os.remove(path) | |
return f"Audio Transcript:\n{transcript}" | |
except Exception as e: | |
print(f"Audio transcription error: {e}") | |
return "Could not transcribe audio" | |
# YouTube transcript tool | |
def get_youtube_transcript(url: str) -> str: | |
try: | |
video_id = url.split("v=")[-1].split("&")[0] # Clean video ID | |
transcript = YouTubeTranscriptApi.get_transcript(video_id) | |
return " ".join([e['text'] for e in transcript]) | |
except NoTranscriptFound: | |
return "No transcript available for this video" | |
except Exception as e: | |
return f"Error retrieving transcript: {str(e)}" | |
# ------------ DuckDuckGo Search and Extract ------------------------- | |
def scrape_text_from_url(url: str, max_chars=4000) -> str: | |
"""Fetch and clean main text from a webpage (basic version).""" | |
try: | |
resp = requests.get(url, timeout=10) | |
soup = BeautifulSoup(resp.text, 'html.parser') | |
# Get visible text only, skip scripts/styles | |
text = ' '.join(soup.stripped_strings) | |
return text[:max_chars] | |
except Exception as e: | |
return f"Could not scrape {url}: {e}" | |
def duckduckgo_search_and_scrape( | |
question: str, | |
max_results: int = 10, | |
min_chars: int = 400, # treat shorter pages as “unscrapable” | |
max_chars: int = 4000 # final truncate length | |
) -> str: | |
""" | |
DuckDuckGo → scrape → fallback. | |
1. Try up to max_results links; return the first page that gives | |
≥ min_chars of visible text. | |
2. If none succeed, compose an answer from the DDG result metadata. | |
""" | |
ddg_spec = DuckDuckGoSearchToolSpec() | |
results = ddg_spec.duckduckgo_full_search(question) or [] | |
if not isinstance(results, list): | |
return "No search results found." | |
cleaned_pages = [] | |
for entry in results[:max_results]: | |
href = entry.get("href", "") | |
if not href: | |
continue | |
# --- attempt to scrape ------------------------------------------------ | |
text = scrape_text_from_url(href, max_chars=max_chars) | |
if text.startswith("Could not scrape") or len(text) < min_chars: | |
continue # treat as failure – try next result | |
# success! | |
return ( | |
f"Here is content scraped from {href}:\n\n" | |
f"{text}\n\n" | |
"Based on this, please answer the original question." | |
) | |
# ---------------- fallback: build summary from DDG metadata -------------- | |
if not results: | |
return "No search results found." | |
summary_lines = [] | |
for idx, entry in enumerate(results[:max_results], start=1): | |
title = entry.get("title") or "Untitled result" | |
snippet = (entry.get("body") or "").replace("\n", " ")[:160] | |
href = entry.get("href") | |
summary_lines.append(f"{idx}. {title} – {snippet} ({href})") | |
return ( | |
"I could not successfully scrape any of the top pages. " | |
"Here are the top DuckDuckGo results:\n\n" | |
+ "\n".join(summary_lines) | |
+ "\n\nPlease answer the original question using this list." | |
) | |
# ------------ Image Processing Tool Functions ------------------------- | |
# MIME type mapping for images | |
MIME_MAP = { | |
'.jpg': 'jpeg', | |
'.jpeg': 'jpeg', | |
'.png': 'png', | |
'.bmp': 'bmp', | |
'.gif': 'gif', | |
'.webp': 'webp' | |
} | |
# 3. Image agent with enhanced capabilities | |
def process_image(file_url: str, question: str) -> str: | |
""" | |
Download the image, send it to Azure's vision API, and return the reply text. | |
""" | |
try: | |
print(f"Processing image via process_image function from URL: {file_url}") | |
resp = requests.get(file_url, timeout=30) | |
resp.raise_for_status() | |
raw = resp.content | |
# 2) Figure out the MIME type from headers (fallback to png) | |
mime = resp.headers.get("Content-Type", "image/png") | |
# 3) Build data URI | |
img_b64 = base64.b64encode(raw).decode() | |
data_uri = f"data:{mime};base64,{img_b64}" | |
print(f"Image downloaded and encoded successfully.") | |
from openai import AzureOpenAI | |
vision_client = AzureOpenAI( | |
api_key=api_key, | |
api_version=azure_api_version, | |
azure_endpoint=azure_endpoint, | |
) | |
messages = [ | |
{"role": "system", "content": ( | |
"You are a vision expert. Answer based *only* on the image content." | |
)}, | |
{"role": "user", "content": [ | |
{"type": "text", "text": question}, | |
{"type": "image_url", "image_url": {"url": data_uri}} | |
]}, | |
] | |
response = vision_client.chat.completions.create( | |
model=azure_model_name, | |
messages=messages, | |
temperature=0.0, | |
max_tokens=2000, | |
) | |
print(f"Vision API response received : {response.choices[0].message.content.strip()}") | |
return response.choices[0].message.content.strip() | |
except Exception as e: | |
return f"Vision API error: {e}" | |
# ─── formatter.py (or inline in your module) ───────────────────────── | |
from pydantic import BaseModel, ValidationError | |
from openai import AzureOpenAI | |
FALLBACK = "ANSWER_NOT_FOUND" # single source of truth, keep as plain text | |
SYSTEM_PROMPT = ( | |
"You are an answer-formatter. I will give you:\n" | |
" • the user question\n" | |
" • a raw multi-agent trace that may contain Thoughts, Actions, tool " | |
" outputs, and possibly a FINAL ANSWER.\n\n" | |
"Your job:\n" | |
"1. Extract the true answer if it is present anywhere in the trace.\n" | |
"2. Output exactly one line in this template:\n" | |
" FINAL ANSWER: <ANSWER>\n\n" | |
"If the trace contains no FINAL ANSWER **but the question itself already contains enough information**, deduce the answer on your own." | |
"Return a FINAL ANSWER line in the usual format.\n" | |
"Rules for <ANSWER>:\n" | |
"• Number → digits only, no commas, no currency/percent signs unless " | |
" explicitly asked for.\n" | |
"• String → as short as possible, no articles unless required.\n" | |
"• List → comma-separated values following the above rules; if no order " | |
" is specified, sort alphabetically.\n" | |
"• If rounding or units are requested in the question, apply before " | |
" formatting and include the unit with **no preceding space**.\n\n" | |
f"If you cannot find a valid answer, output:\n" | |
f" FINAL ANSWER: {FALLBACK}\n\n" | |
"Examples (follow exactly)\n" | |
"###\n" | |
"Q: Reverse this word: elppa\n" | |
"Trace: (no FINAL ANSWER)\n" | |
"A: FINAL ANSWER: apple\n" | |
"Q: What is 2+3?\n" | |
"Trace: Thought: need a calculator\n" | |
"A: FINAL ANSWER: 5\n" | |
"Q: How many planets? Trace: … FINAL ANSWER: 8\n" | |
"A: FINAL ANSWER: 8\n" | |
"###\n" | |
"Q: Give the colour. Trace: … blue.\n" | |
"A: FINAL ANSWER: blue\n" | |
"###\n" | |
"Q: Name the three vowels. Trace: … a, e, i, o, u.\n" | |
"A: FINAL ANSWER: a,e,i,o,u\n" | |
"###\n" | |
"Q: What’s the speed? (units requested) Trace: … 3.0 m/s.\n" | |
"A: FINAL ANSWER: 3.0m/s\n" | |
"###\n" | |
"Q: Any answer? Trace: … tool failure …\n" | |
f"A: FINAL ANSWER: {FALLBACK}" | |
) | |
class Result(BaseModel): | |
final_answer: str | |
def format_final_answer(question: str, | |
raw_trace: str, | |
*, | |
api_key: str, | |
api_version: str, | |
endpoint: str, | |
deployment: str, | |
temperature: float = 0.0) -> str: | |
""" | |
Second-pass LLM call that converts an unstructured agent trace into the | |
strict 'FINAL ANSWER: …' template. On any error returns the FALLBACK. | |
""" | |
try: | |
from openai import AzureOpenAI | |
client = AzureOpenAI( | |
api_key=api_key, | |
api_version=api_version, | |
azure_endpoint=endpoint, | |
) | |
messages = [ | |
{"role": "system", "content": SYSTEM_PROMPT}, | |
{"role": "user", "content": f"Question: {question}\nTrace: {raw_trace}"} | |
] | |
rsp = client.chat.completions.create( | |
model=deployment, | |
messages=messages, | |
temperature=temperature, | |
max_tokens=120, | |
) | |
out = rsp.choices[0].message.content.strip() | |
# Remove the label for downstream code (keep only the value) | |
if out.lower().startswith("final answer:"): | |
out = out.split(":", 1)[1].strip() | |
# basic schema check – non-empty string | |
Result(final_answer=out) | |
return out or FALLBACK | |
except (ValidationError, Exception): | |
return FALLBACK | |
# ------------------------------ | |
# 2. BasicAgent Class Definition | |
# ------------------------------ | |
REASONING_PROMPT = """ | |
You are the Router-&-Reasoning-Agent. | |
NEVER output filler like “Could you please provide more context”. | |
If the answer is not already in the question, DELEGATE: | |
• Any external fact → WebSearch-Agent | |
• YouTube link → YouTube-Agent | |
• File link (PDF…) → File-Agent | |
• Image link → Image-Agent | |
How to delegate | |
─────────────── | |
Call the special tool `handoff` **once** with JSON: | |
{"to_agent":"<agent_name>","reason":"<why>"} | |
When to answer directly | |
─────────────────────── | |
• The question already contains all information needed (e.g. reversed text, | |
Caesar cipher, mental arithmetic, pure logic). | |
• You are 100 % certain no external resource is required. | |
Output format | |
───────────── | |
• If you delegate → return the tool call only; the delegated agent will finish. | |
• If you answer yourself → one line: | |
FINAL ANSWER: <clean answer> | |
Follow the global rules (digits only, short strings, comma-lists, etc.). | |
Never | |
───── | |
• Never try to scrape the web or parse files yourself. | |
• Never add filler like “Thinking…” or “Awaiting response”. | |
• Never answer if the question clearly needs a specialised agent. | |
Example | |
─────── | |
Example (self-contained) | |
Q: .rewsna eht sa "tfel" … ← reversed | |
A: FINAL ANSWER: right | |
Example (delegation) | |
Q: Who wrote the novel Dune? | |
A: Action: handoff | |
Action Input: {"to_agent":"websearch_agent","reason":"needs web"} | |
""" | |
class BasicAgent: | |
def __init__(self): | |
"""Initialize the BasicAgent with all tools and agent workflow.""" | |
self.llm = llm | |
self.api_url = DEFAULT_API_URL | |
# Initialize tools | |
self._setup_tools() | |
# Initialize agents | |
self._setup_agents() | |
# Initialize agent workflow | |
self._setup_workflow() | |
# Define routing instruction | |
self.routing_instruction = ( | |
"You are a multi-agent AI system that routes questions **and** produces " | |
"the final answer.\n\n" | |
"– If the question already *contains* the needed information " | |
"(e.g. encoded, reversed, maths puzzle), **answer directly** – " | |
"no tools, no sub-agents.\n\n" | |
"You have four specialised agents:\n" | |
"• File-Agent – files (PDF, DOCX, …)\n" | |
"• YouTube-Agent – video transcripts\n" | |
"• WebSearch-Agent – fresh/general web info\n" | |
"• Image-Agent – vision questions\n\n" | |
"When you delegate, do **not** add commentary such as " | |
"'I will await the agent's response'.\n" | |
"When you answer yourself, end with:\n" | |
" FINAL ANSWER: <clean answer>\n\n" | |
"Example ➊ (self-contained)\n" | |
'Q: "opposite of north"..."\n' | |
"A: FINAL ANSWER: south\n\n" | |
"Example ➋ (delegation)\n" | |
"Q: Who wrote Dune?\n" | |
"A: Action: handoff\n" | |
'Action Input: {"to_agent":"websearch_agent","reason":"needs web"}\n' | |
) | |
def _setup_tools(self): | |
"""Initialize all the tools.""" | |
self.file_parser_tool = FunctionTool.from_defaults(parse_file) | |
self.youtube_transcript_tool = FunctionTool.from_defaults(get_youtube_transcript) | |
self.ddg_tool = FunctionTool.from_defaults( | |
fn=duckduckgo_search_and_scrape, | |
name="web_search", | |
description=( | |
"Performs a DuckDuckGo search, attempts to scrape each top result, " | |
"and falls back to result metadata if scraping fails." | |
) | |
) | |
self.image_processing_tool = FunctionTool.from_defaults( | |
fn=process_image, | |
name="image_processing", | |
description="Downloads the image at `file_url` and answers `question` based on its visual content." | |
) | |
def _setup_agents(self): | |
"""Initialize all the specialized agents.""" | |
self.reasoning_agent = ReActAgent( | |
name="reasoning_agent", | |
description="Router and on-board reasoning.", | |
system_prompt=REASONING_PROMPT, | |
tools=[], # no direct tools – only `handoff` is implicit | |
llm=self.llm, | |
) | |
# File Parsing ReActAgent | |
self.file_agent = ReActAgent( | |
name="file_agent", | |
description="Expert at reading and extracting info from files", | |
system_prompt="""You are File-Agent. | |
A router has already chosen you because the user’s question involves a | |
non-image file (PDF, DOCX, XLSX, CSV, TXT, MP3, …). | |
Rules | |
1. ALWAYS call the tool `parse_file(file_url, file_type?)` **once** to read | |
the file. | |
2. Use ONLY the file content to answer the user. | |
3. NEVER hand the task to another agent and NEVER mention you are using a tool. | |
4. When you are done, reply with one line in this exact format: | |
FINAL ANSWER: <clean answer text>""", | |
tools=[self.file_parser_tool], | |
llm=self.llm, | |
) | |
# YouTube ReActAgent | |
self.youtube_agent = ReActAgent( | |
name="youtube_agent", | |
description="Expert at extracting info from YouTube videos by transcript.", | |
system_prompt=""" | |
You are YouTube-Agent. | |
The router picked you because the question references a YouTube video. | |
Rules | |
1. ALWAYS call `get_youtube_transcript(url)` once. | |
2. Base your answer ONLY on the transcript you receive. | |
3. Do NOT search the web, do NOT invoke other tools. | |
4. End with: | |
FINAL ANSWER: <clean answer text> | |
""", | |
tools=[self.youtube_transcript_tool], | |
llm=self.llm, | |
) | |
# DuckDuckGo Web Search ReActAgent | |
self.search_agent = ReActAgent( | |
name="websearch_agent", | |
description="Web search expert.", | |
system_prompt=( | |
"You are WebSearch-Agent.\n" | |
"1. ALWAYS call the tool `web_search` exactly once.\n" | |
"2. Read the text the tool returns and craft a concise answer to the user.\n" | |
"3. Do NOT quote the entire extract; use only the facts needed.\n" | |
"4. Finish with:\n" | |
" FINAL ANSWER: <clean answer text>" | |
"...\n" | |
"Example\n" | |
"User: Who wrote the novel Dune?\n" | |
"Tool output: Here is content scraped from https://en.wikipedia.org/wiki/Dune_(novel): ... Frank Herbert ... Based on this, please answer the original question.\n" | |
"Assistant: FINAL ANSWER: Frank Herbert\n" | |
), | |
tools=[self.ddg_tool], | |
llm=self.llm, | |
) | |
# Image Agent | |
self.image_agent = ReActAgent( | |
name="image_agent", | |
description="Analyzes images and answers questions using the image_processing tool.", | |
system_prompt=( | |
""" | |
You are Image-Agent. | |
The router picked you because the question involves an image file. | |
Rules | |
1. ALWAYS call the tool `image_processing(file_url, question)` exactly once. | |
2. Use ONLY the image content to answer the user. | |
3. NEVER hand the task to another agent and NEVER mention you are using a tool. | |
4. When you are done, reply with one line in this exact format: | |
FINAL ANSWER: <clean answer text> | |
""" | |
), | |
tools=[self.image_processing_tool], | |
llm=self.llm, | |
) | |
def _setup_workflow(self): | |
"""Initialize the agent workflow.""" | |
self.agentflow = AgentWorkflow( | |
agents=[self.reasoning_agent, | |
self.file_agent, | |
self.youtube_agent, | |
self.search_agent, | |
self.image_agent], | |
root_agent=self.reasoning_agent.name # start with pure reasoning | |
) | |
# ─── BasicAgent._extract_final_answer ────────────────────────────────────────── | |
def _extract_final_answer(self, question: str, agent_resp) -> str: | |
raw_trace = "\n".join(block.text for block in agent_resp.response.blocks) | |
return format_final_answer( | |
question, | |
raw_trace, | |
api_key=api_key, | |
api_version=azure_api_version, | |
endpoint=azure_endpoint, | |
deployment=azure_model_name, | |
) | |
def __call__(self, question: str, task_id: str, file_name: str, file_type = None) -> str: | |
""" | |
Main method to process a question and return an answer. | |
This method will be called by the evaluation system. | |
Args: | |
question (str): The question to answer | |
task_id (str, optional): Task ID for file retrieval | |
file_name (str, optional): Name of the file associated with the question | |
file_type (str, optional): Type of the file (e.g., .pdf, .docx, etc.) | |
Returns: | |
str: The answer to the question | |
""" | |
try: | |
# Check if there's a file associated with this question | |
# The evaluation system should provide file info in the question or via task_id | |
enhanced_question = question | |
if len(file_name) > 0: | |
file_url = f"{DEFAULT_API_URL}/files/{task_id}" | |
print(f"Processing file: {file_name} with type {file_type} at URL {file_url}") | |
enhanced_question += f"\nThis question relates to the file at {file_url} (filename: {file_name} and file type: {file_type}). Please analyze its contents using the appropriate tool." | |
# Construct the full prompt with routing instructions | |
full_prompt = f"\n\nUser Question:\n{enhanced_question}" | |
# Run the agent workflow with proper async handling | |
agent_resp = self._run_async_workflow(full_prompt) | |
print(f"Agent response received:\n{question}\n---\n{agent_resp}") | |
# Extract & return | |
final_answer = self._extract_final_answer(question, agent_resp) | |
print("Final answer extracted:", final_answer) | |
print(f"Final answer extracted: {final_answer}") | |
print("------------------------------------------------------------------------------------------------") | |
print('****************************************************************************') | |
return final_answer | |
except Exception as e: | |
print(f"Error in BasicAgent.__call__: {e}") | |
return f"Error processing question: {str(e)}" | |
# ─── keep just ONE runner ──────────────────────────────────────────── | |
def _run_async_workflow(self, prompt: str): | |
""" | |
Call `agentflow.run()` until the response STOPs containing an | |
Action/Thought line. Works with older llama-index that has no | |
`.initialize() / .run_step()`. | |
""" | |
async def _step(msg): | |
return await self.agentflow.run(user_msg=msg) | |
async def _inner(): | |
rsp = await _step(prompt) # first turn | |
# If the last block is still a tool-call, keep asking “continue” | |
while rsp.response.blocks[-1].text.lstrip().lower().startswith(("action:", "thought:")): | |
rsp = await _step("continue") | |
return rsp | |
try: | |
loop = asyncio.get_running_loop() # running inside Gradio | |
except RuntimeError: # plain Python | |
return asyncio.run(_inner()) | |
else: | |
return asyncio.run_coroutine_threadsafe(_inner(), loop).result() | |
# ------------------------------ | |
# 3. Modified answer_questions_batch function (kept for reference) | |
# ------------------------------ | |
async def answer_questions_batch(questions_data): | |
""" | |
This function is kept for reference but is no longer used in the main flow. | |
The BasicAgent class now handles individual questions directly. | |
""" | |
answers = [] | |
agent = BasicAgent() | |
for question_data in questions_data: | |
question = question_data.get("question", "") | |
file_name = question_data.get("file_name", "") | |
task_id = question_data.get("task_id", "") | |
file_type = Path(file_name).suffix.lower().split("?")[0] if len(file_name)> 0 else None | |
try: | |
# Let the BasicAgent handle the question processing | |
answer = agent(question, task_id, file_name, file_type) | |
answers.append({ | |
"task_id": task_id, | |
"question": question, | |
"submitted_answer": answer | |
}) | |
except Exception as e: | |
print(f"Error processing question {task_id}: {e}") | |
answers.append({ | |
"task_id": task_id, | |
"question": question, | |
"submitted_answer": f"Error: {str(e)}" | |
}) | |
time.sleep(1) # Rate limiting | |
return answers | |
def run_and_submit_all(profile: gr.OAuthProfile | None): | |
""" | |
Fetches all questions, runs the BasicAgent on them, submits all answers, | |
and displays the results. | |
""" | |
# --- Determine HF Space Runtime URL and Repo URL --- | |
space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code | |
if profile: | |
username = f"{profile.username}" | |
print(f"User logged in: {username}") | |
else: | |
print("User not logged in.") | |
return "Please Login to Hugging Face with the button.", None | |
api_url = DEFAULT_API_URL | |
questions_url = f"{api_url}/questions" | |
submit_url = f"{api_url}/submit" | |
# 1. Instantiate Agent | |
try: | |
agent = BasicAgent() | |
print("BasicAgent instantiated successfully.") | |
except Exception as e: | |
print(f"Error instantiating agent: {e}") | |
return f"Error initializing agent: {e}", None | |
# In the case of an app running as a hugging Face space, this link points toward your codebase | |
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
print(agent_code) | |
# 2. Fetch Questions | |
print(f"Fetching questions from: {questions_url}") | |
try: | |
response = requests.get(questions_url, timeout=15) | |
response.raise_for_status() | |
questions_data = response.json() | |
if not questions_data: | |
print("Fetched questions list is empty.") | |
return "Fetched questions list is empty or invalid format.", None | |
print(f"Fetched {len(questions_data)} questions.") | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching questions: {e}") | |
return f"Error fetching questions: {e}", None | |
except requests.exceptions.JSONDecodeError as e: | |
print(f"Error decoding JSON response from questions endpoint: {e}") | |
print(f"Response text: {response.text[:500]}") | |
return f"Error decoding server response for questions: {e}", None | |
except Exception as e: | |
print(f"An unexpected error occurred fetching questions: {e}") | |
return f"An unexpected error occurred fetching questions: {e}", None | |
# 3. Run your Agent | |
results_log = [] | |
answers_payload = [] | |
print(f"Running agent on {len(questions_data)} questions...") | |
for item in questions_data: | |
task_id = item.get("task_id") | |
question_text = item.get("question") | |
file_name = item.get("file_name", "") | |
if not task_id or question_text is None: | |
print(f"Skipping item with missing task_id or question: {item}") | |
continue | |
try: | |
# Prepare enhanced question with file information if present | |
enhanced_question = question_text | |
if len(file_name) > 0: | |
file_type = Path(file_name).suffix.lower().split("?")[0] | |
file_url = f"{api_url}/files/{task_id}" | |
enhanced_question += f"\nThis question relates to the file at {file_url} (filename: {file_name} and file type: {file_type}). Please analyze its contents using the appropriate tool." | |
else: | |
file_type = None | |
# Call the agent | |
submitted_answer = agent(enhanced_question, task_id, file_name, file_type) | |
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) | |
except Exception as e: | |
print(f"Error running agent on task {task_id}: {e}") | |
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) | |
if not answers_payload: | |
print("Agent did not produce any answers to submit.") | |
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
# 4. Prepare Submission | |
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} | |
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." | |
print(status_update) | |
# 5. Submit | |
print(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
try: | |
response = requests.post(submit_url, json=submission_data, timeout=60) | |
response.raise_for_status() | |
result_data = response.json() | |
final_status = ( | |
f"Submission Successful!\n" | |
f"User: {result_data.get('username')}\n" | |
f"Overall Score: {result_data.get('score', 'N/A')}% " | |
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
f"Message: {result_data.get('message', 'No message received.')}" | |
) | |
print("Submission successful.") | |
results_df = pd.DataFrame(results_log) | |
return final_status, results_df | |
except requests.exceptions.HTTPError as e: | |
error_detail = f"Server responded with status {e.response.status_code}." | |
try: | |
error_json = e.response.json() | |
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" | |
except requests.exceptions.JSONDecodeError: | |
error_detail += f" Response: {e.response.text[:500]}" | |
status_message = f"Submission Failed: {error_detail}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except requests.exceptions.Timeout: | |
status_message = "Submission Failed: The request timed out." | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except requests.exceptions.RequestException as e: | |
status_message = f"Submission Failed: Network error - {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except Exception as e: | |
status_message = f"An unexpected error occurred during submission: {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
# --- Build Gradio Interface using Blocks --- | |
with gr.Blocks() as demo: | |
gr.Markdown("# Basic Agent Evaluation Runner") | |
gr.Markdown( | |
""" | |
**Instructions:** | |
1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ... | |
2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. | |
3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. | |
--- | |
**Disclaimers:** | |
Once clicking on the "submit button, it can take quite some time ( this is the time for the agent to go through all the questions). | |
This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a seperate action or even to answer the questions in async. | |
""" | |
) | |
gr.LoginButton() | |
run_button = gr.Button("Run Evaluation & Submit All Answers") | |
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) | |
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) | |
run_button.click( | |
fn=run_and_submit_all, | |
outputs=[status_output, results_table] | |
) | |
if __name__ == "__main__": | |
print("\n" + "-"*30 + " App Starting " + "-"*30) | |
# Check for SPACE_HOST and SPACE_ID at startup for information | |
space_host_startup = os.getenv("SPACE_HOST") | |
space_id_startup = os.getenv("SPACE_ID") # Get SPACE_ID at startup | |
if space_host_startup: | |
print(f"✅ SPACE_HOST found: {space_host_startup}") | |
print(f" Runtime URL should be: https://{space_host_startup}.hf.space") | |
else: | |
print("ℹ️ SPACE_HOST environment variable not found (running locally?).") | |
if space_id_startup: # Print repo URLs if SPACE_ID is found | |
print(f"✅ SPACE_ID found: {space_id_startup}") | |
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") | |
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") | |
else: | |
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") | |
print("-"*(60 + len(" App Starting ")) + "\n") | |
print("Launching Gradio Interface for Basic Agent Evaluation...") | |
demo.launch(debug=True, share=False) |