FinalTest / app.py
yoshizen's picture
Update app.py
544a61e verified
raw
history blame
19.4 kB
import os
import gradio as gr
import requests
import inspect
import pandas as pd
import json
import re
from typing import List, Dict, Any, Optional
# (Keep Constants as is)
# --- Constants ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
# --- Enhanced GAIA Agent Definition ---
# ----- THIS IS WERE YOU CAN BUILD WHAT YOU WANT ------
class EnhancedGAIAAgent:
def __init__(self):
print("EnhancedGAIAAgent initialized.")
self.tools = {
"web_search": self._web_search,
"calculator": self._calculator,
"image_analysis": self._image_analysis,
"text_analysis": self._text_analysis,
"code_execution": self._code_execution
}
# Tracking for reasoning steps
self.reasoning_steps = []
self.max_reasoning_steps = 5
def _web_search(self, query: str) -> str:
"""Simulates web search functionality"""
print(f"Performing web search for: {query}")
# Simulate search results based on query keywords
if "population" in query.lower():
return "The population of the queried location is approximately X million people as of 2023."
elif "capital" in query.lower():
return "The capital city of the queried location is X, with a population of Y million."
elif "president" in query.lower() or "prime minister" in query.lower() or "leader" in query.lower():
return "The current leader of the queried location is X, who has been in office since Y."
elif "tallest" in query.lower() or "highest" in query.lower():
return "The tallest structure in the queried location is X, with a height of Y meters."
elif "founded" in query.lower() or "established" in query.lower() or "history" in query.lower():
return "The queried entity was established/founded in X year. Its history includes Y and Z significant events."
elif "weather" in query.lower() or "temperature" in query.lower() or "climate" in query.lower():
return "The current weather/climate in the queried location is X with temperatures ranging from Y to Z degrees."
else:
return f"Search results for '{query}' include various websites and information sources that may contain relevant information."
def _calculator(self, expression: str) -> str:
"""Performs mathematical calculations"""
print(f"Calculating: {expression}")
# Clean the expression
cleaned_expr = expression.replace('×', '*').replace('÷', '/')
cleaned_expr = re.sub(r'[^0-9+\-*/().^ ]', '', cleaned_expr)
try:
# Handle exponentiation separately
if '^' in cleaned_expr:
cleaned_expr = cleaned_expr.replace('^', '**')
# Safely evaluate the expression
result = eval(cleaned_expr)
return f"The result of {expression} is {result}"
except Exception as e:
return f"Error calculating {expression}: {str(e)}"
def _image_analysis(self, image_description: str) -> str:
"""Simulates image analysis functionality"""
print(f"Analyzing image: {image_description}")
# Simulate image analysis based on description keywords
if "person" in image_description.lower() or "people" in image_description.lower() or "human" in image_description.lower():
return "The image contains one or more people. They appear to be [activity/pose/expression]."
elif "animal" in image_description.lower() or "dog" in image_description.lower() or "cat" in image_description.lower():
return "The image shows an animal, likely a [specific animal]. It appears to be [activity/state]."
elif "building" in image_description.lower() or "architecture" in image_description.lower():
return "The image depicts a building or architectural structure. It appears to be [style/type] architecture."
elif "landscape" in image_description.lower() or "nature" in image_description.lower():
return "The image shows a natural landscape featuring [elements like mountains, rivers, forests, etc.]."
elif "chart" in image_description.lower() or "graph" in image_description.lower() or "diagram" in image_description.lower():
return "The image contains a chart/graph showing data about [topic]. The trend appears to be [increasing/decreasing/stable]."
else:
return f"The image appears to show {image_description}. Key elements include [objects/subjects] and [notable features]."
def _text_analysis(self, text: str) -> str:
"""Analyzes text for sentiment, entities, and key information"""
print(f"Analyzing text (first 50 chars): {text[:50]}...")
# Count words and sentences
word_count = len(text.split())
sentence_count = len(re.split(r'[.!?]+', text))
# Simple sentiment analysis
positive_words = ['good', 'great', 'excellent', 'positive', 'happy', 'best', 'love', 'wonderful', 'fantastic']
negative_words = ['bad', 'poor', 'negative', 'terrible', 'worst', 'hate', 'awful', 'horrible', 'disappointing']
positive_count = sum(1 for word in text.lower().split() if word in positive_words)
negative_count = sum(1 for word in text.lower().split() if word in negative_words)
if positive_count > negative_count:
sentiment = "positive"
elif negative_count > positive_count:
sentiment = "negative"
else:
sentiment = "neutral"
return f"Text analysis: {word_count} words, {sentence_count} sentences. The sentiment appears to be {sentiment}."
def _code_execution(self, code: str) -> str:
"""Simulates code execution and analysis"""
print(f"Analyzing code (first 50 chars): {code[:50]}...")
# Identify language
language = "unknown"
if "def " in code or "import " in code or "print(" in code:
language = "Python"
elif "function " in code or "var " in code or "const " in code or "let " in code:
language = "JavaScript"
elif "public class " in code or "System.out.println" in code:
language = "Java"
elif "#include" in code or "int main" in code:
language = "C/C++"
# Simple code analysis
lines = code.count('\n') + 1
return f"Code analysis: {lines} lines of {language} code. The code appears to [purpose/functionality]."
def _reason(self, question: str) -> List[str]:
"""Performs step-by-step reasoning about the question"""
reasoning = []
# Initial analysis
reasoning.append(f"Question: '{question}'")
reasoning.append("Let me analyze what this question is asking for.")
# Identify question type
if any(keyword in question.lower() for keyword in ["calculate", "compute", "sum", "difference", "product", "divide"]):
reasoning.append("This appears to be a calculation question.")
# Extract mathematical expression
expression = re.search(r'calculate\s+(.+?)(?:\?|$)', question.lower())
if expression:
reasoning.append(f"I need to calculate: {expression.group(1)}")
reasoning.append(f"Using the calculator tool to compute this.")
else:
reasoning.append("I need to identify the mathematical operation required.")
elif any(keyword in question.lower() for keyword in ["image", "picture", "photo", "graph", "chart"]):
reasoning.append("This question involves analyzing an image or visual content.")
reasoning.append("I should use image analysis to identify key elements in the image.")
elif any(keyword in question.lower() for keyword in ["population", "capital", "country", "city", "president", "leader"]):
reasoning.append("This is a factual question about geography, demographics, or leadership.")
reasoning.append("I should search for the most up-to-date information.")
elif any(keyword in question.lower() for keyword in ["code", "function", "program", "algorithm"]):
reasoning.append("This question involves code analysis or programming.")
reasoning.append("I should examine the code structure and functionality.")
else:
reasoning.append("This appears to be a general knowledge question.")
reasoning.append("I'll need to search for relevant information and synthesize an answer.")
return reasoning
def __call__(self, question: str) -> str:
"""Main method to process questions and generate answers"""
print(f"Agent received question: {question}")
# Step 1: Reasoning
self.reasoning_steps = self._reason(question)
# Step 2: Determine approach and tools to use
answer = ""
# Handle calculation questions
if any(keyword in question.lower() for keyword in ["calculate", "compute", "sum", "difference", "product", "divide"]):
# Extract mathematical expression
expression_match = re.search(r'calculate\s+(.+?)(?:\?|$)', question.lower())
if expression_match:
expression = expression_match.group(1).strip()
answer = self._calculator(expression)
else:
# Try to extract numbers and operations
numbers = re.findall(r'\d+', question)
if len(numbers) >= 2:
if "sum" in question.lower() or "add" in question.lower() or "plus" in question.lower():
result = sum(int(num) for num in numbers)
answer = f"The sum of the numbers is {result}"
elif "difference" in question.lower() or "subtract" in question.lower() or "minus" in question.lower():
result = int(numbers[0]) - int(numbers[1])
answer = f"The difference between {numbers[0]} and {numbers[1]} is {result}"
elif "product" in question.lower() or "multiply" in question.lower():
result = int(numbers[0]) * int(numbers[1])
answer = f"The product of {numbers[0]} and {numbers[1]} is {result}"
elif "divide" in question.lower():
if int(numbers[1]) != 0:
result = int(numbers[0]) / int(numbers[1])
answer = f"The result of dividing {numbers[0]} by {numbers[1]} is {result}"
else:
answer = "Cannot divide by zero"
else:
answer = "I couldn't identify a clear calculation to perform."
# Handle image analysis questions
elif any(keyword in question.lower() for keyword in ["image", "picture", "photo", "graph", "chart"]):
# Extract image description if available
image_desc = question
answer = self._image_analysis(image_desc)
# Handle factual questions
elif any(keyword in question.lower() for keyword in ["who", "what", "where", "when", "why", "how"]):
search_query = question.replace("?", "")
search_results = self._web_search(search_query)
# Process and synthesize search results
answer = f"Based on available information: {search_results}"
# Add specific details for common question types
if "who" in question.lower():
answer += " The individual mentioned is known for their contributions to the field."
elif "when" in question.lower():
answer += " This occurred during a significant period in history."
elif "where" in question.lower():
answer += " The location is notable for its geographical and cultural significance."
# Handle code questions
elif any(keyword in question.lower() for keyword in ["code", "function", "program", "algorithm"]):
# Extract code if present or use the question itself
code_sample = question
answer = self._code_execution(code_sample)
# General knowledge questions
else:
# Combine web search and text analysis
search_results = self._web_search(question)
text_analysis = self._text_analysis(question)
answer = f"To answer your question: {search_results}"
# Add reasoning steps if available
if self.reasoning_steps:
reasoning_summary = " ".join(self.reasoning_steps[-2:]) # Use last two reasoning steps
answer = f"{answer}\n\nReasoning: {reasoning_summary}"
return answer
def run_and_submit_all(profile: gr.OAuthProfile | None):
"""
Fetches all questions, runs the BasicAgent on them, submits all answers, and displays the results.
"""
# --- Determine HF Space Runtime URL and Repo URL ---
space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code
if profile:
username= f"{profile.username}"
print(f"User logged in: {username}")
else:
print("User not logged in.")
return "Please Login to Hugging Face with the button.", None
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
submit_url = f"{api_url}/submit"
# 1. Instantiate Agent ( modify this part to create your agent)
try:
agent = EnhancedGAIAAgent()
except Exception as e:
print(f"Error instantiating agent: {e}")
return f"Error initializing agent: {e}", None
# In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public)
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
print(agent_code)
# 2. Fetch Questions
print(f"Fetching questions from: {questions_url}")
try:
response = requests.get(questions_url, timeout=15)
response.raise_for_status()
questions_data = response.json()
if not questions_data:
print("Fetched questions list is empty.")
return "Fetched questions list is empty or invalid format.", None
print(f"Fetched {len(questions_data)} questions.")
except requests.exceptions.RequestException as e:
print(f"Error fetching questions: {e}")
return f"Error fetching questions: {e}", None
except requests.exceptions.JSONDecodeError as e:
print(f"Error decoding JSON response from questions endpoint: {e}")
print(f"Response text: {response.text[:500]}")
return f"Error decoding server response for questions: {e}", None
except Exception as e:
print(f"An unexpected error occurred fetching questions: {e}")
return f"An unexpected error occurred fetching questions: {e}", None
# 3. Run your Agent
results_log = []
answers_payload = []
print(f"Running agent on {len(questions_data)} questions...")
for item in questions_data:
task_id = item.get("task_id")
question_text = item.get("question")
if not task_id or question_text is None:
print(f"Skipping item with missing task_id or question: {item}")
continue
try:
submitted_answer = agent(question_text)
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer})
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer})
except Exception as e:
print(f"Error running agent on task {task_id}: {e}")
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"})
if not answers_payload:
print("Agent did not produce any answers to submit.")
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log)
# 4. Prepare Submission
submission_data = {
"username": username.strip(),
"agent_code": agent_code,
"answers": answers_payload
}
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..."
print(status_update)
# 5. Submit
print(f"Submitting {len(answers_payload)} answers to: {submit_url}")
try:
response = requests.post(submit_url, json=submission_data, timeout=60)
response.raise_for_status()
result_data = response.json()
final_status = (
f"Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('overall_score', 'N/A')}\n"
f"Correct Answers: {result_data.get('correct_answers', 'N/A')}\n"
f"Total Questions: {result_data.get('total_questions', 'N/A')}\n"
)
print(final_status)
return final_status, pd.DataFrame(results_log)
except requests.exceptions.RequestException as e:
error_msg = f"Error submitting answers: {e}"
print(error_msg)
return error_msg, pd.DataFrame(results_log)
except Exception as e:
error_msg = f"An unexpected error occurred during submission: {e}"
print(error_msg)
return error_msg, pd.DataFrame(results_log)
# --- Gradio Interface ---
with gr.Blocks() as demo:
gr.Markdown("# Basic Agent Evaluation Runner")
gr.Markdown("Instructions:")
gr.Markdown("1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ...")
gr.Markdown("2. Log in to your Hugging Face account using the button below. This uses your HF username for submission.")
gr.Markdown("3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score.")
gr.Markdown("---")
gr.Markdown("Disclaimers: Once clicking on the \"submit button, it can take quite some time ( this is the time for the agent to go through all the questions). This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a seperate action or even to answer the questions in async.")
with gr.Row():
login_button = gr.LoginButton(value="Sign in with Hugging Face")
with gr.Row():
submit_button = gr.Button("Run Evaluation & Submit All Answers")
with gr.Row():
with gr.Column():
output_status = gr.Textbox(label="Run Status / Submission Result")
output_results = gr.Dataframe(label="Questions and Agent Answers")
submit_button.click(run_and_submit_all, inputs=[login_button], outputs=[output_status, output_results])
if __name__ == "__main__":
demo.launch()