Spaces:
Sleeping
Sleeping
import os | |
import gradio as gr | |
import requests | |
import inspect | |
import pandas as pd | |
from smolagents import tool, Tool, CodeAgent, DuckDuckGoSearchTool, HfApiModel, VisitWebpageTool, SpeechToTextTool, FinalAnswerTool | |
from dotenv import load_dotenv | |
import heapq | |
from collections import Counter | |
import re | |
from io import BytesIO | |
from youtube_transcript_api import YouTubeTranscriptApi | |
from langchain_community.tools.tavily_search import TavilySearchResults | |
from langchain_community.document_loaders import WikipediaLoader | |
from langchain_community.utilities import WikipediaAPIWrapper | |
from langchain_community.document_loaders import ArxivLoader | |
# (Keep Constants as is) | |
# --- Constants --- | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
#Load environment variables | |
load_dotenv() | |
from langgraph.graph import END, StateGraph | |
from langchain_core.prompts import ChatPromptTemplate | |
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage | |
from langchain_core.tools import tool | |
from typing import Dict, List, TypedDict, Annotated | |
import operator | |
from langchain_community.llms import HuggingFaceHub | |
from langchain_community.chat_models import ChatHuggingFace | |
from langchain.schema import HumanMessage # Or your framework's equivalent | |
def init_state(question: str): | |
return { | |
"question": question, | |
"history": [HumanMessage(content=question)], | |
"context": {} # <- Add this line | |
} | |
# ====== Tool Definitions ====== | |
def duckduckgo_search(query: str) -> str: | |
"""Search web using DuckDuckGo. Returns top 3 results.""" | |
from duckduckgo_search import DDGS | |
with DDGS() as ddgs: | |
return "\n\n".join( | |
f"Title: {res['title']}\nURL: {res['href']}\nSnippet: {res['body']}" | |
for res in ddgs.text(query, max_results=3) | |
) | |
def wikipedia_search(query: str) -> str: | |
"""Get Wikipedia summaries. Returns first 3 sentences.""" | |
import wikipedia | |
try: | |
return wikipedia.summary(query, sentences=3) | |
except wikipedia.DisambiguationError as e: | |
return f"Disambiguation options: {', '.join(e.options[:3])}" | |
except wikipedia.PageError: | |
return "Page not found" | |
def arxiv_search(query: str) -> str: | |
"""Search academic papers on arXiv. Returns top 3 results.""" | |
import arxiv | |
results = arxiv.Search( | |
query=query, | |
max_results=3, | |
sort_by=arxiv.SortCriterion.Relevance | |
).results() | |
return "\n\n".join( | |
f"Title: {r.title}\nAuthors: {', '.join(a.name for a in r.authors)}\n" | |
f"Published: {r.published.strftime('%Y-%m-%d')}\nSummary: {r.summary[:250]}..." | |
for r in results | |
) | |
def document_qa(input_str: str) -> str: | |
"""Answer questions from documents. Input format: 'document_text||question'""" | |
from transformers import pipeline | |
if '||' not in input_str: | |
return "Invalid format. Use: 'document_text||question'" | |
context, question = input_str.split('||', 1) | |
qa_model = pipeline('question-answering', model='deepset/roberta-base-squad2') | |
return qa_model(question=question, context=context)['answer'] | |
def python_execution(code: str) -> str: | |
"""Execute Python code and return output.""" | |
try: | |
# Create isolated environment | |
env = {} | |
exec(f"def __exec_fn__():\n {indent_code(code)}\nresult = __exec_fn__()", env) | |
return str(env.get('result', 'No output')) | |
except Exception as e: | |
return f"Error: {str(e)}" | |
from typing import Optional | |
from langchain_core.tools import BaseTool | |
from youtube_transcript_api import YouTubeTranscriptApi | |
class VideoTranscriptionTool(BaseTool): | |
name: str = "transcript_video" | |
description: str = "Fetch text transcript from YouTube videos using URL or ID. Optionally include timestamps." | |
def _run(self, url: str, include_timestamps: Optional[bool] = False) -> str: | |
# Extract video ID | |
video_id = None | |
if "youtube.com/watch?v=" in url: | |
video_id = url.split("v=")[1].split("&")[0] | |
elif "youtu.be/" in url: | |
video_id = url.split("youtu.be/")[1].split("?")[0] | |
elif len(url.strip()) == 11 and not ("http://" in url or "https://" in url): | |
video_id = url.strip() | |
if not video_id: | |
return f"Invalid or unsupported YouTube URL/ID: {url}" | |
try: | |
transcription = YouTubeTranscriptApi.get_transcript(video_id) | |
if include_timestamps: | |
formatted = [] | |
for part in transcription: | |
timestamp = f"{int(part['start']//60)}:{int(part['start']%60):02d}" | |
formatted.append(f"[{timestamp}] {part['text']}") | |
return "\n".join(formatted) | |
else: | |
return " ".join([part['text'] for part in transcription]) | |
except Exception as e: | |
return f"Error fetching transcript: {str(e)}" | |
def _arun(self, *args, **kwargs): | |
raise NotImplementedError("Async not supported for this tool.") | |
import os | |
import time | |
import json | |
from typing import TypedDict, List, Union, Any, Dict | |
from langchain_google_genai import ChatGoogleGenerativeAI | |
from langchain.schema import HumanMessage, AIMessage, SystemMessage | |
from langchain.prompts import ChatPromptTemplate | |
from langgraph.graph import StateGraph, END | |
from google.api_core.exceptions import ResourceExhausted | |
# Assume these tools are defined elsewhere and imported | |
# Placeholder for your actual tool implementations | |
# For example: | |
# from your_tools_module import duckduckgo_search, wikipedia_search, arxiv_search, document_qa, python_execution | |
# And ensure you have a proper VideoTranscriptionTool | |
def duckduckgo_search(query: str) -> str: | |
"""Performs a DuckDuckGo search for current events or general facts.""" | |
# Placeholder for actual implementation | |
print(f"DEBUG: duckduckgo_search called with: {query}") | |
return f"Search result for '{query}': Example relevant information from web." | |
def wikipedia_search(query: str) -> str: | |
"""Searches Wikipedia for encyclopedic information.""" | |
# Placeholder for actual implementation | |
print(f"DEBUG: wikipedia_search called with: {query}") | |
return f"Wikipedia result for '{query}': Found detailed article." | |
def arxiv_search(query: str) -> str: | |
"""Searches ArXiv for scientific preprints and papers.""" | |
# Placeholder for actual implementation | |
print(f"DEBUG: arxiv_search called with: {query}") | |
return f"ArXiv result for '{query}': Found relevant research paper." | |
def document_qa(document_path: str, question: str) -> str: | |
"""Answers questions based on the content of a given document file (PDF, DOCX, TXT).""" | |
# Placeholder for actual implementation | |
print(f"DEBUG: document_qa called with: {document_path}, question: {question}") | |
return f"Document QA result for '{question}': Answer extracted from document." | |
def python_execution(code: str) -> str: | |
"""Executes Python code in a sandboxed environment for calculations or data manipulation.""" | |
# Placeholder for actual implementation - IMPORTANT: Implement this securely! | |
# Example (UNSAFE for real use without proper sandboxing): | |
try: | |
exec_globals = {} | |
exec_locals = {} | |
exec(code, exec_globals, exec_locals) | |
return str(exec_locals.get('result', 'Code executed, no explicit result assigned to "result" variable.')) | |
except Exception as e: | |
return f"Python execution error: {str(e)}" | |
class VideoTranscriptionTool: | |
"""Transcribes and analyzes video content from a URL or ID.""" | |
def __call__(self, video_id_or_url: str) -> str: | |
# Placeholder for actual implementation using youtube-transcript-api etc. | |
print(f"DEBUG: VideoTranscriptionTool called with: {video_id_or_url}") | |
return f"Video transcription/analysis result for '{video_id_or_url}': Summary of video content." | |
# --- Agent State Definition --- | |
class AgentState(TypedDict): | |
question: str | |
history: List[Union[HumanMessage, AIMessage, Dict[str, Any]]] # Allows for tool calls as dicts | |
context: Dict[str, Any] | |
reasoning: str | |
iterations: int | |
final_answer: Union[str, float, int, None] | |
current_task: str # Added for more focused reasoning | |
current_thoughts: str # Added for more focused reasoning | |
# --- Utility Functions --- | |
def parse_agent_response(response_content: str) -> tuple[str, str, str]: | |
""" | |
Parses the LLM's JSON output for reasoning, action, and action input. | |
""" | |
try: | |
response_json = json.loads(response_content) | |
reasoning = response_json.get("Reasoning", "").strip() | |
action = response_json.get("Action", "").strip() | |
action_input = response_json.get("Action Input", "").strip() | |
return reasoning, action, action_input | |
except json.JSONDecodeError: | |
# Fallback for when LLM doesn't return perfect JSON (less likely with good prompt) | |
print(f"WARNING: LLM response not perfectly JSON: {response_content[:200]}...") | |
# Attempt heuristic parsing as a last resort | |
reasoning_match = response_content.split("Reasoning:", 1) | |
reasoning = reasoning_match[1].split("Action:", 1)[0].strip() if len(reasoning_match) > 1 else "" | |
action_part_match = response_content.split("Action:", 1) | |
action_part = action_part_match[1].strip() if len(action_part_match) > 1 else "" | |
action_input_match = action_part.split("Action Input:", 1) | |
action = action_input_match[0].strip() | |
action_input = action_input_match[1].strip() if len(action_input_match) > 1 else "" | |
return reasoning, action, action_input | |
# --- Graph Nodes --- | |
def should_continue(state: AgentState) -> str: | |
""" | |
Determines if the agent should continue reasoning, use a tool, or end. | |
""" | |
history = state.get("history", []) | |
# Check for final answer in the last AIMessage | |
if history and isinstance(history[-1], AIMessage) and "FINAL ANSWER:" in history[-1].content: | |
print("DEBUG: should_continue -> END (Final Answer detected)") | |
return "end" | |
# Check if a tool was just executed (its output is in history) | |
# and the next step should be reasoning over that output | |
for msg in reversed(history): | |
if isinstance(msg, AIMessage) and any(f"[{tool.name} output]" in msg.content for tool in state.get("tools", [])): | |
print("DEBUG: should_continue -> REASON (Tool output detected, need to process)") | |
return "reason" | |
# Check if there's an action request to be executed | |
# This happens *after* reasoning has determined a tool is needed, | |
# but *before* the tool has run. | |
for msg in reversed(history): | |
if isinstance(msg, dict) and msg.get("type") == "action_request": | |
print("DEBUG: should_continue -> ACTION (Action request pending)") | |
return "action" | |
# If nothing else, assume we need to reason | |
print("DEBUG: should_continue -> REASON (Default to reasoning)") | |
return "reason" | |
def reasoning_node(state: AgentState) -> AgentState: | |
""" | |
Node for the agent to analyze the question, determine next steps, | |
and select tools. | |
""" | |
print(f"DEBUG: Entering reasoning_node. Iteration: {state['iterations']}") | |
print(f"DEBUG: Current history length: {len(state.get('history', []))}") | |
# Load API key | |
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") | |
if not GOOGLE_API_KEY: | |
raise ValueError("GOOGLE_API_KEY not set in environment variables.") | |
# Ensure history is well-formed for the LLM prompt | |
if "history" not in state or not isinstance(state["history"], list): | |
state["history"] = [] | |
# Initialize/update state fields | |
state.setdefault("context", {}) | |
state.setdefault("reasoning", "") | |
state.setdefault("iterations", 0) | |
state.setdefault("current_task", "Understand the question and plan the next step.") | |
state.setdefault("current_thoughts", "") | |
# Create Gemini model wrapper | |
llm = ChatGoogleGenerativeAI( | |
model="gemini-1.5-flash", # Use a fast model for agentic loops | |
temperature=0.1, # Keep it low for more deterministic reasoning | |
google_api_key=GOOGLE_API_KEY | |
) | |
# Dynamically generate tool descriptions for the prompt | |
tool_descriptions = "\n".join([ | |
f"- **{t.name}**: {t.description}" for t in state.get("tools", []) | |
]) | |
# Craft a more robust and explicit system prompt | |
system_prompt = ( | |
"You are an expert problem solver, designed to provide concise and accurate answers. " | |
"Your process involves analyzing the question, intelligently selecting and using tools, " | |
"and synthesizing information.\n\n" | |
"**Available Tools:**\n" | |
f"{tool_descriptions}\n\n" | |
"**Tool Usage Guidelines:**\n" | |
"- Use **duckduckgo_search** for current events, general facts, or quick lookups.\n" | |
"- Use **wikipedia_search** for encyclopedic information, historical context, or detailed topics.\n" | |
"- Use **arxiv_search** for scientific papers, research, or cutting-edge technical information.\n" | |
"- Use **document_qa** when the question explicitly refers to a specific document file (e.g., 'Analyze this PDF').\n" | |
"- Use **python_execution** for complex calculations, data manipulation, or logical operations that cannot be done with simple reasoning. Always provide the full Python code.\n" | |
"- Use **VideoTranscriptionTool** for any question involving video or audio content.\n\n" | |
"**Current Context:**\n{context}\n\n" | |
"**Previous Reasoning Steps:**\n{reasoning}\n\n" | |
"**Current Task:** {current_task}\n" | |
"**Current Thoughts:** {current_thoughts}\n\n" | |
"**Your Response MUST be a valid JSON object with the following keys:**\n" | |
"```json\n" | |
"{\n" | |
" \"Reasoning\": \"Your detailed analysis of the question and why you chose a specific action.\",\n" | |
" \"Action\": \"[Tool name OR 'Final Answer']\",\n" | |
" \"Action Input\": \"[Input for the selected tool OR the final response]\"\n" | |
"}\n" | |
"```\n" | |
"Ensure 'Action Input' is appropriate for the chosen 'Action'. If 'Action' is 'Final Answer', provide the complete, concise answer." | |
) | |
prompt = ChatPromptTemplate.from_messages([ | |
SystemMessage(content=system_prompt), | |
*state["history"] # Include full history for conversational context | |
]) | |
chain = prompt | llm | |
# === Add Retry Logic === | |
def call_with_retry(inputs, retries=3, delay=60): | |
for attempt in range(retries): | |
try: | |
response = chain.invoke(inputs) | |
# Attempt to parse immediately to catch bad JSON before returning | |
parse_agent_response(response.content) | |
return response | |
except ResourceExhausted as e: | |
print(f"[Retry {attempt+1}/{retries}] Gemini rate limit hit. Waiting {delay}s...") | |
time.sleep(delay) | |
except json.JSONDecodeError as e: | |
print(f"[Retry {attempt+1}/{retries}] LLM returned invalid JSON. Retrying...") | |
print(f"Invalid JSON content: {response.content[:200]}...") | |
time.sleep(5) # Shorter delay for parsing errors | |
except Exception as e: | |
print(f"[Retry {attempt+1}/{retries}] An unexpected error occurred during LLM call: {e}. Retrying...") | |
time.sleep(delay) | |
raise RuntimeError("Failed after multiple retries due to Gemini quota limit or invalid JSON.") | |
# Call model with retry protection | |
response = call_with_retry({ | |
"context": state["context"], | |
"reasoning": state["reasoning"], | |
"question": state["question"], # Redundant as it's in history, but keeps prompt consistent | |
"current_task": state["current_task"], | |
"current_thoughts": state["current_thoughts"] | |
}) | |
# Parse output using the robust JSON parser | |
content = response.content | |
reasoning, action, action_input = parse_agent_response(content) | |
print(f"DEBUG: LLM Response Content: {content[:200]}...") | |
print(f"DEBUG: Parsed Action: {action}, Action Input: {action_input[:100]}...") | |
# Update state | |
state["history"].append(AIMessage(content=content)) # Store the raw LLM response | |
state["reasoning"] += f"\nStep {state['iterations'] + 1}: {reasoning}" | |
state["iterations"] += 1 | |
state["current_thoughts"] = reasoning # Update current thoughts for next iteration | |
if "final answer" in action.lower(): | |
state["history"].append(AIMessage(content=f"FINAL ANSWER: {action_input}")) | |
state["final_answer"] = action_input # Set final answer directly in state | |
else: | |
# Store the action request in history for tool_node | |
state["history"].append({ | |
"type": "action_request", | |
"tool": action, | |
"input": action_input | |
}) | |
print(f"DEBUG: Exiting reasoning_node. New history length: {len(state['history'])}") | |
return state | |
def tool_node(state: AgentState) -> AgentState: | |
# ... (previous code) | |
tool_call_dict = None | |
for msg in reversed(state["history"]): | |
if isinstance(msg, dict) and msg.get("type") == "action_request": | |
tool_call_dict = msg | |
break | |
if not tool_call_dict: | |
print("WARNING: No action_request found in history, skipping tool execution.") | |
return state # Or raise a more specific error if this truly shouldn't happen | |
tool_name = tool_call_dict.get("tool") | |
tool_input = tool_call_dict.get("input") | |
# --- ADD THIS DEBUG PRINT --- | |
print(f"DEBUG: tool_node received action_request: tool='{tool_name}', input='{tool_input[:100]}...'") | |
# --- END DEBUG PRINT --- | |
if not tool_name or tool_input is None: # tool_input can be empty string for some tools, but not None | |
print(f"ERROR: Invalid tool call in action_request. Tool name: '{tool_name}', Input: '{tool_input}'") | |
# Instead of raising directly, you might want to send this back to reasoning | |
# Or provide a specific error message as tool output | |
state["history"].append(AIMessage(content=f"[Tool Error] Invalid tool call: Tool name '{tool_name}' or input was empty. LLM needs to provide valid action.")) | |
return state | |
# Look up and invoke the tool from the state's tool list | |
available_tools = state.get("tools", []) | |
tool_fn = next((t for t in available_tools if t.name == tool_name), None) # Assuming tools are LangChain Tool objects now | |
if tool_fn is None: | |
# Fallback for unrecognized tool - feedback to LLM | |
tool_output = f"[Tool Error] Tool '{tool_name}' not found or not available. Please choose from: {', '.join([t.name for t in available_tools])}" | |
print(f"ERROR: {tool_output}") | |
else: | |
try: | |
print(f"DEBUG: Invoking tool '{tool_name}' with input: '{tool_input[:100]}...'") | |
tool_output = tool_fn.run(tool_input) # Assuming tool.run() method for LangChain Tools | |
if not tool_output: # Handle empty tool output | |
tool_output = f"[{tool_name} output] No specific result found for '{tool_input}'. The tool might have returned an empty response." | |
except Exception as e: | |
tool_output = f"[Tool Error] An error occurred while running '{tool_name}': {str(e)}" | |
print(f"ERROR: {tool_output}") | |
# Add output to history as an AIMessage | |
# Ensure the history only contains HumanMessage and AIMessage objects for LangGraph's internal processing. | |
# The action_request dict can be removed or transformed if it's no longer needed for internal state. | |
# For now, we'll just add the tool output. | |
state["history"].append(AIMessage(content=f"[{tool_name} output]\n{tool_output}")) | |
print(f"DEBUG: Exiting tool_node. Tool output added to history. New history length: {len(state['history'])}") | |
return state | |
# ====== Agent Graph ====== | |
def create_agent_workflow(tools: List[Any]): # tools are passed in now | |
workflow = StateGraph(AgentState) | |
# Define nodes | |
workflow.add_node("reason", reasoning_node) | |
workflow.add_node("action", tool_node) | |
# Set entry point | |
workflow.set_entry_point("reason") | |
# Define edges | |
workflow.add_conditional_edges( | |
"reason", | |
should_continue, | |
{ | |
"action": "action", # Go to action node if a tool is requested | |
"reason": "reason", # Loop back to reason if more thinking is needed | |
"end": END # End if final answer detected | |
} | |
) | |
workflow.add_edge("action", "reason") # Always go back to reasoning after a tool action | |
# Compile the graph | |
app = workflow.compile() | |
# Pass tools into the state so nodes can access them. | |
# This is a bit of a hacky way to get them into the state, but works for now. | |
# A cleaner way might be to make `tool_node` receive tools as a closure or directly from agent init. | |
# For this example, we'll modify the initial state for each invocation. | |
return app | |
# ====== Agent Interface ====== | |
class BasicAgent: | |
def __init__(self): | |
# Tools need to be LangChain Tool objects for name and description | |
from langchain.tools import Tool | |
self.tools = [ | |
Tool(name="duckduckgo_search", func=duckduckgo_search, description="Performs a DuckDuckGo search for current events or general facts."), | |
Tool(name="wikipedia_search", func=wikipedia_search, description="Searches Wikipedia for encyclopedic information."), | |
Tool(name="arxiv_search", func=arxiv_search, description="Searches ArXiv for scientific preprints and papers."), | |
Tool(name="document_qa", func=document_qa, description="Answers questions based on the content of a given document file (PDF, DOCX, TXT). Requires 'attachment_path' and 'question' as input."), | |
Tool(name="python_execution", func=python_execution, description="Executes Python code in a sandboxed environment for complex calculations or data manipulation."), | |
Tool(name="VideoTranscriptionTool", func=VideoTranscriptionTool(), description="Transcribes and analyzes video content from a URL or ID. Use for any question involving video or audio.") | |
] | |
self.workflow = create_agent_workflow(self.tools) # Pass tools to workflow creator | |
def __call__(self, question: str) -> str: | |
print(f"\n--- Agent received question: {question[:50]}{'...' if len(question) > 50 else ''} ---") | |
# Initialize state with proper structure and pass tools | |
state = { | |
"question": question, | |
"context": {}, | |
"reasoning": "", | |
"iterations": 0, | |
"history": [HumanMessage(content=question)], | |
"final_answer": None, | |
"current_task": "Understand the question and plan the next step.", | |
"current_thoughts": "", | |
"tools": self.tools # Pass tools into the state | |
} | |
# Invoke the workflow | |
final_state = self.workflow.invoke(state) | |
# Extract the FINAL ANSWER from history | |
if final_state.get("final_answer"): | |
answer = final_state["final_answer"] | |
print(f"--- Agent returning FINAL ANSWER: {answer} ---") | |
return answer | |
# Fallback if final_answer wasn't set correctly in state | |
for msg in reversed(final_state["history"]): | |
if isinstance(msg, AIMessage) and "FINAL ANSWER:" in msg.content: | |
answer = msg.content.split("FINAL ANSWER:")[1].strip() | |
print(f"--- Agent returning FINAL ANSWER (from history): {answer} ---") | |
return answer | |
print(f"--- ERROR: No FINAL ANSWER found in agent history for question: {question} ---") | |
raise ValueError("No FINAL ANSWER found in agent history.") | |
def run_and_submit_all( profile: gr.OAuthProfile | None): | |
""" | |
Fetches all questions, runs the BasicAgent on them, submits all answers, | |
and displays the results. | |
""" | |
# --- Determine HF Space Runtime URL and Repo URL --- | |
space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code | |
if profile: | |
username= f"{profile.username}" | |
print(f"User logged in: {username}") | |
else: | |
print("User not logged in.") | |
return "Please Login to Hugging Face with the button.", None | |
api_url = DEFAULT_API_URL | |
questions_url = f"{api_url}/questions" | |
submit_url = f"{api_url}/submit" | |
# 1. Instantiate Agent ( modify this part to create your agent) | |
try: | |
agent = BasicAgent() | |
except Exception as e: | |
print(f"Error instantiating agent: {e}") | |
return f"Error initializing agent: {e}", None | |
# In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public) | |
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
print(agent_code) | |
# 2. Fetch Questions | |
print(f"Fetching questions from: {questions_url}") | |
try: | |
response = requests.get(questions_url, timeout=15) | |
response.raise_for_status() | |
questions_data = response.json() | |
if not questions_data: | |
print("Fetched questions list is empty.") | |
return "Fetched questions list is empty or invalid format.", None | |
print(f"Fetched {len(questions_data)} questions.") | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching questions: {e}") | |
return f"Error fetching questions: {e}", None | |
except requests.exceptions.JSONDecodeError as e: | |
print(f"Error decoding JSON response from questions endpoint: {e}") | |
print(f"Response text: {response.text[:500]}") | |
return f"Error decoding server response for questions: {e}", None | |
except Exception as e: | |
print(f"An unexpected error occurred fetching questions: {e}") | |
return f"An unexpected error occurred fetching questions: {e}", None | |
# 3. Run your Agent | |
results_log = [] | |
answers_payload = [] | |
print(f"Running agent on {len(questions_data)} questions...") | |
for item in questions_data: | |
task_id = item.get("task_id") | |
question_text = item.get("question") | |
if not task_id or question_text is None: | |
print(f"Skipping item with missing task_id or question: {item}") | |
continue | |
try: | |
submitted_answer = agent(question_text) | |
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) | |
except Exception as e: | |
print(f"Error running agent on task {task_id}: {e}") | |
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) | |
if not answers_payload: | |
print("Agent did not produce any answers to submit.") | |
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
# 4. Prepare Submission | |
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} | |
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." | |
print(status_update) | |
# 5. Submit | |
print(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
try: | |
response = requests.post(submit_url, json=submission_data, timeout=60) | |
response.raise_for_status() | |
result_data = response.json() | |
final_status = ( | |
f"Submission Successful!\n" | |
f"User: {result_data.get('username')}\n" | |
f"Overall Score: {result_data.get('score', 'N/A')}% " | |
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
f"Message: {result_data.get('message', 'No message received.')}" | |
) | |
print("Submission successful.") | |
results_df = pd.DataFrame(results_log) | |
return final_status, results_df | |
except requests.exceptions.HTTPError as e: | |
error_detail = f"Server responded with status {e.response.status_code}." | |
try: | |
error_json = e.response.json() | |
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" | |
except requests.exceptions.JSONDecodeError: | |
error_detail += f" Response: {e.response.text[:500]}" | |
status_message = f"Submission Failed: {error_detail}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except requests.exceptions.Timeout: | |
status_message = "Submission Failed: The request timed out." | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except requests.exceptions.RequestException as e: | |
status_message = f"Submission Failed: Network error - {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except Exception as e: | |
status_message = f"An unexpected error occurred during submission: {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
# --- Build Gradio Interface using Blocks --- | |
with gr.Blocks() as demo: | |
gr.Markdown("# Basic Agent Evaluation Runner") | |
gr.Markdown( | |
""" | |
**Instructions:** | |
1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ... | |
2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. | |
3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. | |
--- | |
**Disclaimers:** | |
Once clicking on the "submit button, it can take quite some time ( this is the time for the agent to go through all the questions). | |
This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a seperate action or even to answer the questions in async. | |
""" | |
) | |
gr.LoginButton() | |
run_button = gr.Button("Run Evaluation & Submit All Answers") | |
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) | |
# Removed max_rows=10 from DataFrame constructor | |
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) | |
run_button.click( | |
fn=run_and_submit_all, | |
outputs=[status_output, results_table] | |
) | |
if __name__ == "__main__": | |
print("\n" + "-"*30 + " App Starting " + "-"*30) | |
# Check for SPACE_HOST and SPACE_ID at startup for information | |
space_host_startup = os.getenv("SPACE_HOST") | |
space_id_startup = os.getenv("SPACE_ID") # Get SPACE_ID at startup | |
if space_host_startup: | |
print(f"✅ SPACE_HOST found: {space_host_startup}") | |
print(f" Runtime URL should be: https://{space_host_startup}.hf.space") | |
else: | |
print("ℹ️ SPACE_HOST environment variable not found (running locally?).") | |
if space_id_startup: # Print repo URLs if SPACE_ID is found | |
print(f"✅ SPACE_ID found: {space_id_startup}") | |
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") | |
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") | |
else: | |
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") | |
print("-"*(60 + len(" App Starting ")) + "\n") | |
print("Launching Gradio Interface for Basic Agent Evaluation...") | |
demo.launch(debug=True, share=False) |