Spaces:
Runtime error
Runtime error
import os | |
import gradio as gr | |
import requests | |
import pandas as pd | |
import time | |
from pathlib import Path | |
from typing import Dict, Any, List, Optional, TypedDict, Annotated | |
import operator | |
# LangChain and LangGraph imports | |
from langchain_community.tools import DuckDuckGoSearchResults | |
from langchain_openai import AzureChatOpenAI | |
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage | |
from langchain_core.tools import tool | |
from langchain_core.prompts import ChatPromptTemplate | |
from langgraph.graph import StateGraph, MessagesState, START, END | |
from langgraph.prebuilt import ToolNode | |
from langgraph.checkpoint.memory import MemorySaver | |
# Existing utility imports | |
from youtube_transcript_api import YouTubeTranscriptApi, NoTranscriptFound | |
from bs4 import BeautifulSoup | |
import pdfplumber | |
import docx | |
import speech_recognition as sr | |
import base64 | |
import tempfile | |
import re | |
from io import BytesIO, StringIO | |
from dotenv import load_dotenv | |
load_dotenv() | |
# ------------------------------ | |
# Configuration | |
# ------------------------------ | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
api_key = os.getenv("AZURE_OPENAI_API_KEY") | |
azure_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT") | |
azure_api_version = os.getenv("AZURE_OPENAI_API_VERSION") | |
azure_deployment_name = os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME") | |
azure_model_name = os.getenv("AZURE_OPENAI_MODEL_NAME") | |
# Initialize Azure OpenAI LLM | |
llm = AzureChatOpenAI( | |
deployment_name=azure_deployment_name, | |
model_name=azure_model_name, | |
temperature=0.0, | |
top_p=0.1, | |
azure_endpoint=azure_endpoint, | |
api_key=api_key, | |
api_version=azure_api_version, | |
) | |
# ------------------------------ | |
# State Definition | |
# ------------------------------ | |
class AgentState(TypedDict): | |
messages: Annotated[List[Any], operator.add] | |
question: str | |
task_id: str | |
file_name: str | |
file_type: Optional[str] | |
file_url: Optional[str] | |
final_answer: Optional[str] | |
agent_used: Optional[str] | |
reasoning: Optional[str] | |
# ------------------------------ | |
# Tool Functions | |
# ------------------------------ | |
def transcribe_audio(content: bytes) -> str: | |
"""Transcribe audio from bytes to text.""" | |
try: | |
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as mp3_tmp: | |
mp3_tmp.write(content) | |
mp3_path = mp3_tmp.name | |
wav_path = mp3_path.replace(".mp3", ".wav") | |
try: | |
from pydub import AudioSegment | |
audio = AudioSegment.from_mp3(mp3_path) | |
audio.export(wav_path, format="wav") | |
audio_file = wav_path | |
except ImportError: | |
audio_file = mp3_path | |
recognizer = sr.Recognizer() | |
with sr.AudioFile(audio_file) as source: | |
audio = recognizer.record(source) | |
transcript = recognizer.recognize_google(audio) | |
for path in [mp3_path, wav_path]: | |
if os.path.exists(path): | |
os.remove(path) | |
return f"Audio Transcript: {transcript}" | |
except Exception as e: | |
print(f"Audio transcription error: {e}") | |
return "Could not transcribe audio" | |
def parse_file_tool(file_url: str, file_name: str) -> str: | |
"""Parse various file types and extract content.""" | |
try: | |
if len(file_name) > 0: | |
file_type = Path(file_name).suffix.lower() | |
file_type = file_type.split("?")[0] | |
else: | |
file_type = None | |
if file_type: | |
resp = requests.get(file_url, timeout=30) | |
resp.raise_for_status() | |
content = resp.content | |
# Excel Files | |
if file_type in [".xlsx", ".xls"]: | |
try: | |
df = pd.read_excel(BytesIO(content)) | |
return f"Excel Content:\n{df.head(10).to_string(index=False)}" | |
except Exception as e: | |
return f"Excel parsing error: {str(e)}" | |
# CSV Files | |
elif file_type == ".csv": | |
try: | |
df = pd.read_csv(BytesIO(content)) | |
return f"CSV Content:\n{df.head(10).to_string(index=False)}" | |
except Exception as e: | |
return f"CSV parsing error: {str(e)}" | |
# Text Files | |
elif file_type == ".txt": | |
text = content.decode(errors='ignore') | |
return f"Text Content:\n{text[:5000]}" | |
# PDF Files | |
elif file_type == ".pdf": | |
try: | |
with pdfplumber.open(BytesIO(content)) as pdf: | |
text = "\n".join(page.extract_text() or "" for page in pdf.pages[:5]) | |
return f"PDF Content:\n{text[:5000]}" | |
except Exception as e: | |
return f"PDF parsing error: {str(e)}" | |
# DOCX Files | |
elif file_type == ".docx": | |
try: | |
d = docx.Document(BytesIO(content)) | |
text = "\n".join(p.text for p in d.paragraphs[:100]) | |
return f"DOCX Content:\n{text[:5000]}" | |
except Exception as e: | |
return f"DOCX parsing error: {str(e)}" | |
# MP3 Files | |
elif file_type == ".mp3": | |
return transcribe_audio(content) | |
# Python Files | |
elif file_type == ".py": | |
text = content.decode(errors='ignore') | |
return f"Python Code:\n{text[:5000]}" | |
else: | |
return f"Unsupported file type: {file_type}" | |
else: | |
return "No file type provided or file URL is invalid." | |
except Exception as e: | |
print(f"[parse_file_tool] ERROR: {e}") | |
return f"File parsing failed: {str(e)}" | |
def youtube_transcript_tool(url: str) -> str: | |
"""Extract transcript from YouTube video.""" | |
try: | |
video_id = url.split("v=")[-1].split("&")[0] | |
transcript = YouTubeTranscriptApi.get_transcript(video_id) | |
return " ".join([e['text'] for e in transcript]) | |
except NoTranscriptFound: | |
return "No transcript available for this video" | |
except Exception as e: | |
return f"Error retrieving transcript: {str(e)}" | |
def scrape_text_from_url(url: str, max_chars=4000) -> str: | |
"""Fetch and clean main text from a webpage.""" | |
try: | |
resp = requests.get(url, timeout=10) | |
soup = BeautifulSoup(resp.text, 'html.parser') | |
text = ' '.join(soup.stripped_strings) | |
return text[:max_chars] | |
except Exception as e: | |
return f"Could not scrape {url}: {e}" | |
def web_search_tool(question: str) -> str: | |
"""Perform web search using DuckDuckGo and scrape results.""" | |
try: | |
from llama_index.tools.duckduckgo import DuckDuckGoSearchToolSpec | |
ddg_spec = DuckDuckGoSearchToolSpec() | |
results = ddg_spec.duckduckgo_full_search(question) or [] | |
if not isinstance(results, list): | |
return "No search results found." | |
max_results = 10 | |
min_chars = 400 | |
max_chars = 4000 | |
for entry in results[:max_results]: | |
href = entry.get("href", "") | |
if not href: | |
continue | |
text = scrape_text_from_url(href, max_chars=max_chars) | |
if text.startswith("Could not scrape") or len(text) < min_chars: | |
continue | |
return ( | |
f"Here is content scraped from {href}:\n\n" | |
f"{text}\n\n" | |
"Based on this, please answer the original question." | |
) | |
# Fallback to search result metadata | |
if not results: | |
return "No search results found." | |
summary_lines = [] | |
for idx, entry in enumerate(results[:max_results], start=1): | |
title = entry.get("title") or "Untitled result" | |
snippet = (entry.get("body") or "").replace("\n", " ")[:160] | |
href = entry.get("href") | |
summary_lines.append(f"{idx}. {title} – {snippet} ({href})") | |
return ( | |
"I could not successfully scrape any of the top pages. " | |
"Here are the top DuckDuckGo results:\n\n" | |
+ "\n".join(summary_lines) | |
+ "\n\nPlease answer the original question using this list." | |
) | |
except Exception as e: | |
return f"Web search failed: {str(e)}" | |
def image_processing_tool(file_url: str, question: str) -> str: | |
"""Process image and answer questions about it using Azure Vision.""" | |
try: | |
print(f"Processing image from URL: {file_url}") | |
resp = requests.get(file_url, timeout=30) | |
resp.raise_for_status() | |
raw = resp.content | |
mime = resp.headers.get("Content-Type", "image/png") | |
img_b64 = base64.b64encode(raw).decode() | |
data_uri = f"data:{mime};base64,{img_b64}" | |
print("Image downloaded and encoded successfully.") | |
from openai import AzureOpenAI | |
vision_client = AzureOpenAI( | |
api_key=api_key, | |
api_version=azure_api_version, | |
azure_endpoint=azure_endpoint, | |
) | |
messages = [ | |
{"role": "system", "content": "You are a vision expert. Answer based only on the image content."}, | |
{"role": "user", "content": [ | |
{"type": "text", "text": question}, | |
{"type": "image_url", "image_url": {"url": data_uri}} | |
]}, | |
] | |
response = vision_client.chat.completions.create( | |
model=azure_model_name, | |
messages=messages, | |
temperature=0.0, | |
max_tokens=2000, | |
) | |
print(f"Vision API response received: {response.choices[0].message.content.strip()}") | |
return response.choices[0].message.content.strip() | |
except Exception as e: | |
return f"Vision API error: {e}" | |
# ------------------------------ | |
# Agent Functions | |
# ------------------------------ | |
# prompts.py (new file) | |
SCORER_TEMPLATE = """You are a general AI assistant. | |
Answer the question and finish with: | |
FINAL ANSWER: <your answer> | |
Formatting rules: | |
• numbers: digits only, no commas/units unless requested | |
• strings: no articles/abbreviations, digits in plain text | |
• for lists: same rules per element, comma-separated, no spaces | |
""" | |
from langchain_core.prompts import ChatPromptTemplate | |
def make_prompt(extra_instruction: str = "") -> ChatPromptTemplate: | |
return ChatPromptTemplate.from_messages([ | |
("system", SCORER_TEMPLATE + "\n" + extra_instruction), | |
("human", "{human_input}") | |
]) | |
import re | |
def extract_final_answer(text: str) -> str: | |
# robust to quotes, stray whitespace, different capitalisation | |
m = re.search(r"FINAL ANSWER:\s*(.+)", text, re.I | re.S) | |
ans = m.group(1).strip() if m else text.strip() | |
# strip surrounding quotes/backticks | |
return re.sub(r'^[\'"`\s]+|[\'"`\s]+$', "", ans) | |
def router_agent(state: AgentState) -> AgentState: | |
"""Router agent that determines which specialized agent to use.""" | |
question = state["question"] | |
file_name = state.get("file_name", "") | |
# Check for files | |
if file_name: | |
file_type = Path(file_name).suffix.lower().split("?")[0] if len(file_name)>0 else None | |
# Image files | |
if file_type in ['.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp']: | |
return { | |
**state, | |
"agent_used": "image_agent", | |
"reasoning": f"Image file detected: {file_name}" | |
} | |
# Other files | |
else: | |
return { | |
**state, | |
"agent_used": "file_agent", | |
"reasoning": f"File detected: {file_name} (type: {file_type})" | |
} | |
# Check for YouTube links | |
if "youtube.com" in question.lower() or "youtu.be" in question.lower(): | |
return { | |
**state, | |
"agent_used": "youtube_agent", | |
"reasoning": "YouTube link detected in question" | |
} | |
# Check if question contains all needed information (self-contained) | |
self_contained_indicators = [ | |
"reverse", "backward", "opposite", "calculate", "math", "add", "subtract", | |
"multiply", "divide", "cipher", "decode", "encode", "spell", "count" | |
] | |
if any(indicator in question.lower() for indicator in self_contained_indicators): | |
# Additional check: does it seem like it needs external info? | |
external_indicators = ["who is", "when did", "where is", "what year", "latest", "current"] | |
if not any(indicator in question.lower() for indicator in external_indicators): | |
return { | |
**state, | |
"agent_used": "reasoning_agent", | |
"reasoning": "Question appears self-contained, no external data needed" | |
} | |
# Default to web search | |
return { | |
**state, | |
"agent_used": "web_search_agent", | |
"reasoning": "Question requires external knowledge - using web search" | |
} | |
def reasoning_agent(state: AgentState) -> AgentState: | |
"""Agent for self-contained reasoning tasks.""" | |
question = state["question"] | |
extra_sys = """You are a reasoning expert. Answer questions that can be | |
solved with logic, mathematics, or text manipulation without external data.""" | |
prompt = make_prompt(extra_sys) | |
human_block = question | |
content = (prompt | llm).invoke({"human_input": human_block}).content | |
final_answer = extract_final_answer(content) | |
return { | |
**state, | |
"final_answer": final_answer, | |
"messages": state["messages"] + [AIMessage(content=content)] | |
} | |
def file_agent(state: AgentState) -> AgentState: | |
"""Agent for processing various file types.""" | |
question = state["question"] | |
file_url = state.get("file_url") | |
file_name = state.get("file_name", "") | |
if not file_url: | |
return { | |
**state, | |
"final_answer": "No file URL provided", | |
"messages": state["messages"] + [AIMessage(content="No file URL provided")] | |
} | |
# Parse the file | |
file_content = parse_file_tool.invoke({"file_url": file_url, "file_name": file_name}) | |
extra_sys = """You are a file analysis expert. Based on the file content provided, | |
answer the user's question accurately and concisely.""" | |
prompt = make_prompt(extra_sys) | |
human_block = f"Question: {question}\n\nFile Content:\n{file_content}" | |
content = (prompt | llm).invoke({"human_input": human_block}).content | |
final_answer = extract_final_answer(content) | |
return { | |
**state, | |
"final_answer": final_answer, | |
"messages": state["messages"] + [AIMessage(content=content)] | |
} | |
def youtube_agent(state: AgentState) -> AgentState: | |
"""Agent for processing YouTube video transcripts.""" | |
question = state["question"] | |
# Extract YouTube URL from question | |
import re | |
youtube_pattern = r'(https?://(?:www\.)?(?:youtube\.com/watch\?v=|youtu\.be/)[\w-]+)' | |
urls = re.findall(youtube_pattern, question) | |
if not urls: | |
return { | |
**state, | |
"final_answer": "No YouTube URL found in question", | |
"messages": state["messages"] + [AIMessage(content="No YouTube URL found")] | |
} | |
# Get transcript | |
transcript = youtube_transcript_tool.invoke({"url": urls[0]}) | |
extra_sys = """You are a YouTube content expert. Based on the video transcript provided, | |
answer the user's question accurately and concisely.""" | |
prompt = make_prompt(extra_sys) | |
human_block = f"Question: {question}\n\nTranscript: {transcript}" | |
content = (prompt | llm).invoke({"human_input": human_block}).content | |
final_answer = extract_final_answer(content) | |
return { | |
**state, | |
"final_answer": final_answer, | |
"messages": state["messages"] + [AIMessage(content=content)] | |
} | |
def web_search_agent(state: AgentState) -> AgentState: | |
"""Agent for web search and information retrieval.""" | |
question = state["question"] | |
# Perform web search | |
search_results = web_search_tool.invoke({"question": question}) | |
extra_sys = """You are a web search expert. Based on the search results provided, | |
answer the user's question accurately and concisely.""" | |
prompt = make_prompt(extra_sys) | |
human_block = f"Question: {question}\n\Search Results:: {search_results}" | |
content = (prompt | llm).invoke({"human_input": human_block}).content | |
final_answer = extract_final_answer(content) | |
return { | |
**state, | |
"final_answer": final_answer, | |
"messages": state["messages"] + [AIMessage(content=content)] | |
} | |
def image_agent(state: AgentState) -> AgentState: | |
"""Agent for processing images.""" | |
question = state["question"] | |
file_url = state.get("file_url") | |
if not file_url: | |
return { | |
**state, | |
"final_answer": "No image URL provided", | |
"messages": state["messages"] + [AIMessage(content="No image URL provided")] | |
} | |
# Process the image | |
image_analysis = image_processing_tool.invoke({"file_url": file_url, "question": question}) | |
extra_sys = """You are a web search expert. Based on the search results provided, | |
answer the user's question accurately and concisely.""" | |
prompt = make_prompt(extra_sys) | |
human_block = f"Question: {question}\n\nImage Analysis: {image_analysis}" | |
content = (prompt | llm).invoke({"human_input": human_block}).content | |
final_answer = extract_final_answer(content) | |
return { | |
**state, | |
"final_answer": final_answer, | |
"messages": state["messages"] + [AIMessage(content=content)] | |
} | |
# ------------------------------ | |
# Conditional Logic | |
# ------------------------------ | |
def route_to_agent(state: AgentState) -> str: | |
"""Route to the appropriate agent based on the router's decision.""" | |
agent_used = state.get("agent_used") | |
if agent_used == "reasoning_agent": | |
return "reasoning_agent" | |
elif agent_used == "file_agent": | |
return "file_agent" | |
elif agent_used == "youtube_agent": | |
return "youtube_agent" | |
elif agent_used == "image_agent": | |
return "image_agent" | |
else: | |
return "web_search_agent" | |
def should_end(state: AgentState) -> str: | |
"""Check if we have a final answer and should end.""" | |
if state.get("final_answer"): | |
return END | |
else: | |
return "router" | |
# ------------------------------ | |
# Graph Construction | |
# ------------------------------ | |
def create_agent_graph(): | |
"""Create and return the agent graph.""" | |
workflow = StateGraph(AgentState) | |
# Add nodes | |
workflow.add_node("router", router_agent) | |
workflow.add_node("reasoning_agent", reasoning_agent) | |
workflow.add_node("file_agent", file_agent) | |
workflow.add_node("youtube_agent", youtube_agent) | |
workflow.add_node("web_search_agent", web_search_agent) | |
workflow.add_node("image_agent", image_agent) | |
# Add edges | |
workflow.add_edge(START, "router") | |
workflow.add_conditional_edges("router", route_to_agent) | |
# All agents go to end | |
workflow.add_edge("reasoning_agent", END) | |
workflow.add_edge("file_agent", END) | |
workflow.add_edge("youtube_agent", END) | |
workflow.add_edge("web_search_agent", END) | |
workflow.add_edge("image_agent", END) | |
# Compile the graph | |
memory = MemorySaver() | |
graph = workflow.compile(checkpointer=memory) | |
return graph | |
# ------------------------------ | |
# Main Agent Class | |
# ------------------------------ | |
class LangGraphAgent: | |
def __init__(self): | |
"""Initialize the LangGraph agent.""" | |
self.graph = create_agent_graph() | |
self.api_url = DEFAULT_API_URL | |
def __call__(self, question: str, task_id: str, file_name: str, file_type: str = None) -> str: | |
""" | |
Main method to process a question and return an answer. | |
Args: | |
question (str): The question to answer | |
task_id (str): Task ID for file retrieval | |
file_name (str): Name of the file associated with the question | |
file_type (str): Type of the file (e.g., .pdf, .docx, etc.) | |
Returns: | |
str: The answer to the question | |
""" | |
try: | |
# Prepare initial state | |
initial_state = { | |
"messages": [HumanMessage(content=question)], | |
"question": question, | |
"task_id": task_id, | |
"file_name": file_name or "", | |
"file_type": Path(file_name).suffix.lower().split("?")[0] if len(file_name)>0 else None, | |
"file_url": f"{self.api_url}/files/{task_id}" if len(file_name)>0 else None, | |
"final_answer": None, | |
"agent_used": None, | |
"reasoning": None | |
} | |
print(f"Processing question: {question}") | |
if len(file_name)>0: | |
print(f"File detected: {file_name} (type: {file_type})") | |
# Run the graph | |
config = {"configurable": {"thread_id": task_id}} | |
result = self.graph.invoke(initial_state, config=config) | |
final_answer = result.get("final_answer", "No answer generated") | |
agent_used = result.get("agent_used", "unknown") | |
reasoning = result.get("reasoning", "") | |
print(f"Agent used: {agent_used}") | |
print(f"Reasoning: {reasoning}") | |
print(f"Final answer: {final_answer}") | |
print("=" * 80) | |
return final_answer | |
except Exception as e: | |
print(f"Error in LangGraphAgent.__call__: {e}") | |
return f"Error processing question: {str(e)}" | |
# ------------------------------ | |
# Gradio Interface Functions | |
# ------------------------------ | |
def run_and_submit_all(profile: gr.OAuthProfile | None): | |
""" | |
Fetches all questions, runs the LangGraphAgent on them, submits all answers, | |
and displays the results. | |
""" | |
space_id = os.getenv("SPACE_ID") | |
if profile: | |
username = f"{profile.username}" | |
print(f"User logged in: {username}") | |
else: | |
print("User not logged in.") | |
return "Please Login to Hugging Face with the button.", None | |
api_url = DEFAULT_API_URL | |
questions_url = f"{api_url}/questions" | |
submit_url = f"{api_url}/submit" | |
# 1. Instantiate Agent | |
try: | |
agent = LangGraphAgent() | |
print("LangGraphAgent instantiated successfully.") | |
except Exception as e: | |
print(f"Error instantiating agent: {e}") | |
return f"Error initializing agent: {e}", None | |
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
print(agent_code) | |
# 2. Fetch Questions | |
print(f"Fetching questions from: {questions_url}") | |
try: | |
response = requests.get(questions_url, timeout=15) | |
response.raise_for_status() | |
questions_data = response.json() | |
if not questions_data: | |
print("Fetched questions list is empty.") | |
return "Fetched questions list is empty or invalid format.", None | |
print(f"Fetched {len(questions_data)} questions.") | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching questions: {e}") | |
return f"Error fetching questions: {e}", None | |
except requests.exceptions.JSONDecodeError as e: | |
print(f"Error decoding JSON response from questions endpoint: {e}") | |
print(f"Response text: {response.text[:500]}") | |
return f"Error decoding server response for questions: {e}", None | |
except Exception as e: | |
print(f"An unexpected error occurred fetching questions: {e}") | |
return f"An unexpected error occurred fetching questions: {e}", None | |
# 3. Run Agent | |
results_log = [] | |
answers_payload = [] | |
print(f"Running agent on {len(questions_data)} questions...") | |
for item in questions_data: | |
task_id = item.get("task_id") | |
question_text = item.get("question") | |
file_name = item.get("file_name", "") | |
if not task_id or question_text is None: | |
print(f"Skipping item with missing task_id or question: {item}") | |
continue | |
try: | |
file_type = Path(file_name).suffix.lower().split("?")[0] if len(file_name)>0 else None | |
# Call the agent | |
submitted_answer = agent(question_text, task_id, file_name, file_type) | |
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) | |
except Exception as e: | |
print(f"Error running agent on task {task_id}: {e}") | |
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) | |
if not answers_payload: | |
print("Agent did not produce any answers to submit.") | |
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
# 4. Prepare Submission | |
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} | |
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." | |
print(status_update) | |
# 5. Submit | |
print(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
try: | |
response = requests.post(submit_url, json=submission_data, timeout=60) | |
response.raise_for_status() | |
result_data = response.json() | |
final_status = ( | |
f"Submission Successful!\n" | |
f"User: {result_data.get('username')}\n" | |
f"Overall Score: {result_data.get('score', 'N/A')}% " | |
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
f"Message: {result_data.get('message', 'No message received.')}" | |
) | |
print("Submission successful.") | |
results_df = pd.DataFrame(results_log) | |
return final_status, results_df | |
except requests.exceptions.HTTPError as e: | |
error_detail = f"Server responded with status {e.response.status_code}." | |
try: | |
error_json = e.response.json() | |
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" | |
except requests.exceptions.JSONDecodeError: | |
error_detail += f" Response: {e.response.text[:500]}" | |
status_message = f"Submission Failed: {error_detail}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except requests.exceptions.Timeout: | |
status_message = "Submission Failed: The request timed out." | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except requests.exceptions.RequestException as e: | |
status_message = f"Submission Failed: Network error - {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except Exception as e: | |
status_message = f"An unexpected error occurred during submission: {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
# ------------------------------ | |
# Gradio Interface | |
# ------------------------------ | |
# --- Build Gradio Interface using Blocks --- | |
with gr.Blocks() as demo: | |
gr.Markdown("# Basic Agent Evaluation Runner") | |
gr.Markdown( | |
""" | |
**Instructions:** | |
1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ... | |
2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. | |
3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. | |
--- | |
**Disclaimers:** | |
Once clicking on the "submit button, it can take quite some time ( this is the time for the agent to go through all the questions). | |
This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a seperate action or even to answer the questions in async. | |
""" | |
) | |
gr.LoginButton() | |
run_button = gr.Button("Run Evaluation & Submit All Answers") | |
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) | |
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) | |
run_button.click( | |
fn=run_and_submit_all, | |
outputs=[status_output, results_table] | |
) | |
if __name__ == "__main__": | |
print("\n" + "-"*30 + " App Starting " + "-"*30) | |
# Check for SPACE_HOST and SPACE_ID at startup for information | |
space_host_startup = os.getenv("SPACE_HOST") | |
space_id_startup = os.getenv("SPACE_ID") # Get SPACE_ID at startup | |
if space_host_startup: | |
print(f"✅ SPACE_HOST found: {space_host_startup}") | |
print(f" Runtime URL should be: https://{space_host_startup}.hf.space") | |
else: | |
print("ℹ️ SPACE_HOST environment variable not found (running locally?).") | |
if space_id_startup: # Print repo URLs if SPACE_ID is found | |
print(f"✅ SPACE_ID found: {space_id_startup}") | |
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") | |
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") | |
else: | |
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") | |
print("-"*(60 + len(" App Starting ")) + "\n") | |
print("Launching Gradio Interface for Basic Agent Evaluation...") | |
demo.launch(debug=True, share=False) |