|
import os |
|
import gradio as gr |
|
import requests |
|
import inspect |
|
import pandas as pd |
|
import json |
|
import re |
|
from typing import List, Dict, Any, Optional |
|
|
|
|
|
|
|
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
|
|
|
|
|
class EnhancedGAIAAgent: |
|
def __init__(self): |
|
print("EnhancedGAIAAgent initialized.") |
|
self.tools = { |
|
"web_search": self._web_search, |
|
"calculator": self._calculator, |
|
"image_analysis": self._image_analysis, |
|
"text_analysis": self._text_analysis, |
|
"code_execution": self._code_execution |
|
} |
|
|
|
|
|
self.reasoning_steps = [] |
|
self.max_reasoning_steps = 5 |
|
|
|
def _web_search(self, query: str) -> str: |
|
"""Simulates web search functionality""" |
|
print(f"Performing web search for: {query}") |
|
|
|
|
|
if "population" in query.lower(): |
|
return "The population of the queried location is approximately X million people as of 2023." |
|
elif "capital" in query.lower(): |
|
return "The capital city of the queried location is X, with a population of Y million." |
|
elif "president" in query.lower() or "prime minister" in query.lower() or "leader" in query.lower(): |
|
return "The current leader of the queried location is X, who has been in office since Y." |
|
elif "tallest" in query.lower() or "highest" in query.lower(): |
|
return "The tallest structure in the queried location is X, with a height of Y meters." |
|
elif "founded" in query.lower() or "established" in query.lower() or "history" in query.lower(): |
|
return "The queried entity was established/founded in X year. Its history includes Y and Z significant events." |
|
elif "weather" in query.lower() or "temperature" in query.lower() or "climate" in query.lower(): |
|
return "The current weather/climate in the queried location is X with temperatures ranging from Y to Z degrees." |
|
else: |
|
return f"Search results for '{query}' include various websites and information sources that may contain relevant information." |
|
|
|
def _calculator(self, expression: str) -> str: |
|
"""Performs mathematical calculations""" |
|
print(f"Calculating: {expression}") |
|
|
|
|
|
cleaned_expr = expression.replace('×', '*').replace('÷', '/') |
|
cleaned_expr = re.sub(r'[^0-9+\-*/().^ ]', '', cleaned_expr) |
|
|
|
try: |
|
|
|
if '^' in cleaned_expr: |
|
cleaned_expr = cleaned_expr.replace('^', '**') |
|
|
|
|
|
result = eval(cleaned_expr) |
|
return f"The result of {expression} is {result}" |
|
except Exception as e: |
|
return f"Error calculating {expression}: {str(e)}" |
|
|
|
def _image_analysis(self, image_description: str) -> str: |
|
"""Simulates image analysis functionality""" |
|
print(f"Analyzing image: {image_description}") |
|
|
|
|
|
if "person" in image_description.lower() or "people" in image_description.lower() or "human" in image_description.lower(): |
|
return "The image contains one or more people. They appear to be [activity/pose/expression]." |
|
elif "animal" in image_description.lower() or "dog" in image_description.lower() or "cat" in image_description.lower(): |
|
return "The image shows an animal, likely a [specific animal]. It appears to be [activity/state]." |
|
elif "building" in image_description.lower() or "architecture" in image_description.lower(): |
|
return "The image depicts a building or architectural structure. It appears to be [style/type] architecture." |
|
elif "landscape" in image_description.lower() or "nature" in image_description.lower(): |
|
return "The image shows a natural landscape featuring [elements like mountains, rivers, forests, etc.]." |
|
elif "chart" in image_description.lower() or "graph" in image_description.lower() or "diagram" in image_description.lower(): |
|
return "The image contains a chart/graph showing data about [topic]. The trend appears to be [increasing/decreasing/stable]." |
|
else: |
|
return f"The image appears to show {image_description}. Key elements include [objects/subjects] and [notable features]." |
|
|
|
def _text_analysis(self, text: str) -> str: |
|
"""Analyzes text for sentiment, entities, and key information""" |
|
print(f"Analyzing text (first 50 chars): {text[:50]}...") |
|
|
|
|
|
word_count = len(text.split()) |
|
sentence_count = len(re.split(r'[.!?]+', text)) |
|
|
|
|
|
positive_words = ['good', 'great', 'excellent', 'positive', 'happy', 'best', 'love', 'wonderful', 'fantastic'] |
|
negative_words = ['bad', 'poor', 'negative', 'terrible', 'worst', 'hate', 'awful', 'horrible', 'disappointing'] |
|
|
|
positive_count = sum(1 for word in text.lower().split() if word in positive_words) |
|
negative_count = sum(1 for word in text.lower().split() if word in negative_words) |
|
|
|
if positive_count > negative_count: |
|
sentiment = "positive" |
|
elif negative_count > positive_count: |
|
sentiment = "negative" |
|
else: |
|
sentiment = "neutral" |
|
|
|
return f"Text analysis: {word_count} words, {sentence_count} sentences. The sentiment appears to be {sentiment}." |
|
|
|
def _code_execution(self, code: str) -> str: |
|
"""Simulates code execution and analysis""" |
|
print(f"Analyzing code (first 50 chars): {code[:50]}...") |
|
|
|
|
|
language = "unknown" |
|
if "def " in code or "import " in code or "print(" in code: |
|
language = "Python" |
|
elif "function " in code or "var " in code or "const " in code or "let " in code: |
|
language = "JavaScript" |
|
elif "public class " in code or "System.out.println" in code: |
|
language = "Java" |
|
elif "#include" in code or "int main" in code: |
|
language = "C/C++" |
|
|
|
|
|
lines = code.count('\n') + 1 |
|
|
|
return f"Code analysis: {lines} lines of {language} code. The code appears to [purpose/functionality]." |
|
|
|
def _reason(self, question: str) -> List[str]: |
|
"""Performs step-by-step reasoning about the question""" |
|
reasoning = [] |
|
|
|
|
|
reasoning.append(f"Question: '{question}'") |
|
reasoning.append("Let me analyze what this question is asking for.") |
|
|
|
|
|
if any(keyword in question.lower() for keyword in ["calculate", "compute", "sum", "difference", "product", "divide"]): |
|
reasoning.append("This appears to be a calculation question.") |
|
|
|
|
|
expression = re.search(r'calculate\s+(.+?)(?:\?|$)', question.lower()) |
|
if expression: |
|
reasoning.append(f"I need to calculate: {expression.group(1)}") |
|
reasoning.append(f"Using the calculator tool to compute this.") |
|
else: |
|
reasoning.append("I need to identify the mathematical operation required.") |
|
|
|
elif any(keyword in question.lower() for keyword in ["image", "picture", "photo", "graph", "chart"]): |
|
reasoning.append("This question involves analyzing an image or visual content.") |
|
reasoning.append("I should use image analysis to identify key elements in the image.") |
|
|
|
elif any(keyword in question.lower() for keyword in ["population", "capital", "country", "city", "president", "leader"]): |
|
reasoning.append("This is a factual question about geography, demographics, or leadership.") |
|
reasoning.append("I should search for the most up-to-date information.") |
|
|
|
elif any(keyword in question.lower() for keyword in ["code", "function", "program", "algorithm"]): |
|
reasoning.append("This question involves code analysis or programming.") |
|
reasoning.append("I should examine the code structure and functionality.") |
|
|
|
else: |
|
reasoning.append("This appears to be a general knowledge question.") |
|
reasoning.append("I'll need to search for relevant information and synthesize an answer.") |
|
|
|
return reasoning |
|
|
|
def __call__(self, question: str) -> str: |
|
"""Main method to process questions and generate answers""" |
|
print(f"Agent received question: {question}") |
|
|
|
|
|
self.reasoning_steps = self._reason(question) |
|
|
|
|
|
answer = "" |
|
|
|
|
|
if any(keyword in question.lower() for keyword in ["calculate", "compute", "sum", "difference", "product", "divide"]): |
|
|
|
expression_match = re.search(r'calculate\s+(.+?)(?:\?|$)', question.lower()) |
|
if expression_match: |
|
expression = expression_match.group(1).strip() |
|
answer = self._calculator(expression) |
|
else: |
|
|
|
numbers = re.findall(r'\d+', question) |
|
if len(numbers) >= 2: |
|
if "sum" in question.lower() or "add" in question.lower() or "plus" in question.lower(): |
|
result = sum(int(num) for num in numbers) |
|
answer = f"The sum of the numbers is {result}" |
|
elif "difference" in question.lower() or "subtract" in question.lower() or "minus" in question.lower(): |
|
result = int(numbers[0]) - int(numbers[1]) |
|
answer = f"The difference between {numbers[0]} and {numbers[1]} is {result}" |
|
elif "product" in question.lower() or "multiply" in question.lower(): |
|
result = int(numbers[0]) * int(numbers[1]) |
|
answer = f"The product of {numbers[0]} and {numbers[1]} is {result}" |
|
elif "divide" in question.lower(): |
|
if int(numbers[1]) != 0: |
|
result = int(numbers[0]) / int(numbers[1]) |
|
answer = f"The result of dividing {numbers[0]} by {numbers[1]} is {result}" |
|
else: |
|
answer = "Cannot divide by zero" |
|
else: |
|
answer = "I couldn't identify a clear calculation to perform." |
|
|
|
|
|
elif any(keyword in question.lower() for keyword in ["image", "picture", "photo", "graph", "chart"]): |
|
|
|
image_desc = question |
|
answer = self._image_analysis(image_desc) |
|
|
|
|
|
elif any(keyword in question.lower() for keyword in ["who", "what", "where", "when", "why", "how"]): |
|
search_query = question.replace("?", "") |
|
search_results = self._web_search(search_query) |
|
|
|
|
|
answer = f"Based on available information: {search_results}" |
|
|
|
|
|
if "who" in question.lower(): |
|
answer += " The individual mentioned is known for their contributions to the field." |
|
elif "when" in question.lower(): |
|
answer += " This occurred during a significant period in history." |
|
elif "where" in question.lower(): |
|
answer += " The location is notable for its geographical and cultural significance." |
|
|
|
|
|
elif any(keyword in question.lower() for keyword in ["code", "function", "program", "algorithm"]): |
|
|
|
code_sample = question |
|
answer = self._code_execution(code_sample) |
|
|
|
|
|
else: |
|
|
|
search_results = self._web_search(question) |
|
text_analysis = self._text_analysis(question) |
|
|
|
answer = f"To answer your question: {search_results}" |
|
|
|
|
|
if self.reasoning_steps: |
|
reasoning_summary = " ".join(self.reasoning_steps[-2:]) |
|
answer = f"{answer}\n\nReasoning: {reasoning_summary}" |
|
|
|
return answer |
|
|
|
def run_and_submit_all(profile: gr.OAuthProfile | None): |
|
""" |
|
Fetches all questions, runs the BasicAgent on them, submits all answers, and displays the results. |
|
""" |
|
|
|
space_id = os.getenv("SPACE_ID") |
|
if profile: |
|
username= f"{profile.username}" |
|
print(f"User logged in: {username}") |
|
else: |
|
print("User not logged in.") |
|
return "Please Login to Hugging Face with the button.", None |
|
|
|
api_url = DEFAULT_API_URL |
|
questions_url = f"{api_url}/questions" |
|
submit_url = f"{api_url}/submit" |
|
|
|
|
|
try: |
|
agent = EnhancedGAIAAgent() |
|
except Exception as e: |
|
print(f"Error instantiating agent: {e}") |
|
return f"Error initializing agent: {e}", None |
|
|
|
|
|
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" |
|
print(agent_code) |
|
|
|
|
|
print(f"Fetching questions from: {questions_url}") |
|
try: |
|
response = requests.get(questions_url, timeout=15) |
|
response.raise_for_status() |
|
questions_data = response.json() |
|
if not questions_data: |
|
print("Fetched questions list is empty.") |
|
return "Fetched questions list is empty or invalid format.", None |
|
print(f"Fetched {len(questions_data)} questions.") |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error fetching questions: {e}") |
|
return f"Error fetching questions: {e}", None |
|
except requests.exceptions.JSONDecodeError as e: |
|
print(f"Error decoding JSON response from questions endpoint: {e}") |
|
print(f"Response text: {response.text[:500]}") |
|
return f"Error decoding server response for questions: {e}", None |
|
except Exception as e: |
|
print(f"An unexpected error occurred fetching questions: {e}") |
|
return f"An unexpected error occurred fetching questions: {e}", None |
|
|
|
|
|
results_log = [] |
|
answers_payload = [] |
|
print(f"Running agent on {len(questions_data)} questions...") |
|
for item in questions_data: |
|
task_id = item.get("task_id") |
|
question_text = item.get("question") |
|
if not task_id or question_text is None: |
|
print(f"Skipping item with missing task_id or question: {item}") |
|
continue |
|
|
|
try: |
|
submitted_answer = agent(question_text) |
|
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) |
|
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) |
|
except Exception as e: |
|
print(f"Error running agent on task {task_id}: {e}") |
|
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) |
|
|
|
if not answers_payload: |
|
print("Agent did not produce any answers to submit.") |
|
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) |
|
|
|
|
|
submission_data = { |
|
"username": username.strip(), |
|
"agent_code": agent_code, |
|
"answers": answers_payload |
|
} |
|
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." |
|
print(status_update) |
|
|
|
|
|
print(f"Submitting {len(answers_payload)} answers to: {submit_url}") |
|
try: |
|
response = requests.post(submit_url, json=submission_data, timeout=60) |
|
response.raise_for_status() |
|
result_data = response.json() |
|
final_status = ( |
|
f"Submission Successful!\n" |
|
f"User: {result_data.get('username')}\n" |
|
f"Overall Score: {result_data.get('overall_score', 'N/A')}\n" |
|
f"Correct Answers: {result_data.get('correct_answers', 'N/A')}\n" |
|
f"Total Questions: {result_data.get('total_questions', 'N/A')}\n" |
|
) |
|
print(final_status) |
|
return final_status, pd.DataFrame(results_log) |
|
except requests.exceptions.RequestException as e: |
|
error_msg = f"Error submitting answers: {e}" |
|
print(error_msg) |
|
return error_msg, pd.DataFrame(results_log) |
|
except Exception as e: |
|
error_msg = f"An unexpected error occurred during submission: {e}" |
|
print(error_msg) |
|
return error_msg, pd.DataFrame(results_log) |
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# Basic Agent Evaluation Runner") |
|
|
|
gr.Markdown("Instructions:") |
|
gr.Markdown("1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ...") |
|
gr.Markdown("2. Log in to your Hugging Face account using the button below. This uses your HF username for submission.") |
|
gr.Markdown("3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score.") |
|
|
|
gr.Markdown("---") |
|
|
|
gr.Markdown("Disclaimers: Once clicking on the \"submit button, it can take quite some time ( this is the time for the agent to go through all the questions). This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a seperate action or even to answer the questions in async.") |
|
|
|
with gr.Row(): |
|
login_button = gr.LoginButton(value="Sign in with Hugging Face") |
|
|
|
with gr.Row(): |
|
submit_button = gr.Button("Run Evaluation & Submit All Answers") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
output_status = gr.Textbox(label="Run Status / Submission Result") |
|
output_results = gr.Dataframe(label="Questions and Agent Answers") |
|
|
|
submit_button.click(run_and_submit_all, inputs=[login_button], outputs=[output_status, output_results]) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |
|
|