Spaces:
Sleeping
Sleeping
import os | |
import re | |
import gradio as gr | |
import requests | |
import pandas as pd | |
import heapq | |
from collections import Counter | |
from io import BytesIO | |
from youtube_transcript_api import YouTubeTranscriptApi | |
from smolagents import tool, Tool, CodeAgent, DuckDuckGoSearchTool, HfApiModel, VisitWebpageTool, SpeechToTextTool, FinalAnswerTool | |
from langchain_community.document_loaders import WikipediaLoader, PyPDFLoader, TextLoader | |
from dotenv import load_dotenv | |
import tempfile | |
import mimetypes | |
# --- Load environment variables --- | |
load_dotenv() | |
HF_API_TOKEN = os.getenv("HF_API_TOKEN") | |
# (Keep Constants as is) | |
# --- Constants --- | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
# --- Utility Functions --- | |
def extract_youtube_id(url: str) -> str: | |
"""Extract YouTube ID from various URL formats""" | |
patterns = [ | |
r'(?:https?:\/\/)?(?:www\.)?youtube\.com\/watch\?v=([^&]+)', | |
r'(?:https?:\/\/)?youtu\.be\/([^?]+)', | |
r'([a-zA-Z0-9_-]{11})' # Catches just the ID if provided directly | |
] | |
for pattern in patterns: | |
match = re.search(pattern, url) | |
if match: | |
return match.group(1) | |
return "" | |
# --- Enhanced Tools --- | |
class WikiSearchTool(Tool): | |
"""Enhanced Wikipedia search with better formatting and error handling""" | |
name = "wiki_search" | |
description = "Search Wikipedia for a query. Returns up to 2 results with metadata." | |
inputs = {"query": {"type": "string", "description": "Search term for Wikipedia"}} | |
output_type = "string" | |
def forward(self, query: str) -> str: | |
try: | |
logger.info(f"Searching Wikipedia for: {query}") | |
docs = WikipediaLoader(query=query, load_max_docs=2).load() | |
if not docs: | |
logger.info(f"No Wikipedia articles found for: {query}") | |
return "No Wikipedia articles found." | |
formatted_results = [] | |
for i, doc in enumerate(docs): | |
# Limit page content length to avoid overwhelming the model, but provide enough context | |
summary = doc.page_content[:1000] + "..." if len(doc.page_content) > 1000 else doc.page_content | |
formatted_results.append( | |
f"--- Wikipedia Result {i+1} ---\n" | |
f"Title: {doc.metadata.get('title', 'N/A')}\n" | |
f"URL: {doc.metadata.get('source', 'N/A')}\n" | |
f"Summary: {summary}\n" | |
) | |
return "\n\n".join(formatted_results) | |
except Exception as e: | |
logger.error(f"Wikipedia search error for '{query}': {e}") | |
return f"Wikipedia search error: {str(e)}" | |
class FileAnalysisTool(Tool): | |
"""Universal file analyzer for text/PDF/Excel files""" | |
name = "file_analysis" | |
description = "Analyze text, PDF, and Excel files. Returns extracted content." | |
inputs = {"file_path": {"type": "string", "description": "Path to the local file"}} | |
output_type = "string" | |
def forward(self, file_path: str) -> str: | |
if not os.path.exists(file_path): | |
return f"File not found: {file_path}" | |
try: | |
mime_type, _ = mimetypes.guess_type(file_path) | |
logger.info(f"Analyzing file: {file_path} with MIME type: {mime_type}") | |
if mime_type == "application/pdf": | |
return self._process_pdf(file_path) | |
elif mime_type in ["application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", "application/vnd.ms-excel"]: | |
return self._process_excel(file_path) | |
elif mime_type and ("text" in mime_type or "csv" in mime_type): | |
return self._process_text(file_path) | |
else: | |
return f"Unsupported file type for analysis: {mime_type}. Only PDF, Excel, and text/CSV files are supported." | |
except Exception as e: | |
logger.error(f"File analysis error for '{file_path}': {e}") | |
return f"File analysis error: {str(e)}" | |
def _process_pdf(self, path: str) -> str: | |
loader = PyPDFLoader(path) | |
docs = loader.load() | |
content = "\n\n".join([doc.page_content for doc in docs]) | |
# Truncate to avoid excessive token usage, provide a warning if truncated | |
if len(content) > 8000: | |
logger.warning(f"PDF content truncated from {len(content)} to 8000 characters for {path}") | |
return content[:8000] + "\n... [Content truncated]" | |
return content | |
def _process_excel(self, path: str) -> str: | |
df = pd.read_excel(path) | |
# Provide a sample of the data and its basic info | |
info = BytesIO() | |
df.info(buf=info) | |
info_str = info.getvalue().decode('utf-8') | |
return (f"Excel file loaded. First 10 rows:\n{df.head(10).to_markdown()}\n\n" | |
f"DataFrame Info:\n{info_str}") | |
def _process_text(self, path: str) -> str: | |
with open(path, 'r', encoding='utf-8') as f: | |
content = f.read() | |
if len(content) > 8000: | |
logger.warning(f"Text file content truncated from {len(content)} to 8000 characters for {path}") | |
return content[:8000] + "\n... [Content truncated]" | |
return content | |
class VideoTranscriptionTool(Tool): | |
"""Enhanced YouTube transcription with multilingual support and better output""" | |
name = "transcript_video" | |
description = "Fetch YouTube video transcripts with optional timestamps. Supports English, French, Spanish, German." | |
inputs = { | |
"url": {"type": "string", "description": "YouTube URL or ID"}, | |
"include_timestamps": {"type": "boolean", "description": "Include timestamps? (default: False)"} | |
} | |
output_type = "string" | |
def forward(self, url: str, include_timestamps: bool = False) -> str: | |
try: | |
video_id = extract_youtube_id(url) | |
if not video_id: | |
return "Invalid YouTube URL or ID format. Please provide a valid YouTube URL or an 11-character video ID." | |
logger.info(f"Attempting to transcribe video ID: {video_id}") | |
transcript = YouTubeTranscriptApi.get_transcript( | |
video_id, | |
languages=['en', 'fr', 'es', 'de'] # Prioritize common languages | |
) | |
if not transcript: | |
return f"No transcript found for video ID: {video_id} in supported languages (en, fr, es, de)." | |
if include_timestamps: | |
formatted_transcript = "\n".join( | |
f"[{int(seg['start']//60):02d}:{int(seg['start']%60):02d}] {seg['text']}" | |
for seg in transcript | |
) | |
else: | |
formatted_transcript = " ".join(seg['text'] for seg in transcript) | |
return formatted_transcript | |
except Exception as e: | |
logger.error(f"Transcription error for '{url}': {e}") | |
return f"Transcription error: {str(e)}. This might be due to no available transcript or an unsupported video." | |
class DataAnalysisTool(Tool): | |
"""Perform data analysis using pandas on structured data (CSV/Excel)""" | |
name = "data_analysis" | |
description = "Analyze CSV/Excel data using pandas operations. Supported operations: 'describe', 'groupby:column:aggfunc' (e.g., 'groupby:Category:mean')." | |
inputs = { | |
"file_path": {"type": "string", "description": "Path to the local data file (CSV or Excel)"}, | |
"operation": {"type": "string", "description": "Pandas operation (e.g., 'describe', 'groupby:column_name:mean')"} | |
} | |
output_type = "string" | |
def forward(self, file_path: str, operation: str) -> str: | |
if not os.path.exists(file_path): | |
return f"File not found: {file_path}" | |
try: | |
if file_path.endswith('.csv'): | |
df = pd.read_csv(file_path) | |
elif file_path.endswith('.xlsx') or file_path.endswith('.xls'): | |
df = pd.read_excel(file_path) | |
else: | |
return "Unsupported file format for data analysis. Please provide a .csv or .xlsx file." | |
logger.info(f"Performing data analysis operation '{operation}' on {file_path}") | |
if operation == "describe": | |
return "Descriptive Statistics:\n" + str(df.describe()) | |
elif operation.startswith("groupby:"): | |
parts = operation.split(":") | |
if len(parts) == 3: | |
_, col, agg = parts | |
if col not in df.columns: | |
return f"Column '{col}' not found in the DataFrame." | |
try: | |
result = df.groupby(col).agg(agg) | |
return f"Groupby operation '{agg}' on column '{col}':\n" + str(result) | |
except Exception as agg_e: | |
return f"Error performing aggregation '{agg}' on column '{col}': {str(agg_e)}" | |
else: | |
return "Invalid 'groupby' operation format. Use 'groupby:column_name:agg_function'." | |
else: | |
return "Unsupported operation. Try: 'describe' or 'groupby:column_name:agg_function'." | |
except Exception as e: | |
logger.error(f"Data analysis error for '{file_path}' with operation '{operation}': {e}") | |
return f"Data analysis error: {str(e)}. Please check file content and operation." | |
# --- Agent Initialization --- | |
class BasicAgent: | |
def __init__(self): | |
self.model = HfApiModel( | |
temperature=0.0, # Slightly increased temperature for more creative responses if appropriate | |
token=HF_API_TOKEN, | |
max_tokens=2000 | |
) | |
self.tools = self._initialize_tools() | |
self.agent = self._create_agent() | |
def _initialize_tools(self) -> list: | |
"""Initialize all tools with enhanced capabilities""" | |
return [ | |
DuckDuckGoSearchTool(), | |
WikiSearchTool(), | |
VisitWebpageTool(), | |
SpeechToTextTool(), # Might be less relevant for a text-based research agent but kept if needed | |
FinalAnswerTool(), | |
VideoTranscriptionTool(), | |
FileAnalysisTool(), | |
DataAnalysisTool(), | |
self._create_excel_download_tool(), # Renamed for clarity | |
self._create_keywords_tool() | |
] | |
def _create_excel_download_tool(self): | |
"""Tool to download and parse Excel files from a specific URL""" | |
def download_and_parse_excel(task_id: str) -> dict: | |
""" | |
Downloads an Excel file from a predefined URL using a task_id and parses its content. | |
Returns a dictionary with status and data (first 20 rows). | |
""" | |
try: | |
url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}" | |
logger.info(f"Attempting to download Excel from: {url}") | |
response = requests.get(url, timeout=60) # Increased timeout for larger files | |
response.raise_for_status() # Raise an exception for HTTP errors (4xx or 5xx) | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.xlsx') as tmp: | |
tmp.write(response.content) | |
temp_file_path = tmp.name | |
df = pd.read_excel(temp_file_path) | |
os.unlink(temp_file_path) # Clean up the temporary file | |
logger.info(f"Successfully downloaded and parsed Excel for task_id: {task_id}") | |
return { | |
"task_id": task_id, | |
"data_sample": df.head(10).to_dict(orient="records"), # Reduced to 10 for conciseness | |
"status": "Success", | |
"columns": df.columns.tolist(), # Added column names for context | |
"shape": df.shape # Added shape for context | |
} | |
except requests.exceptions.RequestException as req_err: | |
logger.error(f"Network or HTTP error downloading Excel for task_id '{task_id}': {req_err}") | |
return {"status": f"Download error: {str(req_err)}"} | |
except Exception as e: | |
logger.error(f"Error parsing Excel for task_id '{task_id}': {e}") | |
return {"status": f"Parsing error: {str(e)}"} | |
return download_and_parse_excel | |
def _create_keywords_tool(self): | |
"""Keywords extractor with TF-IDF like scoring (basic frequency for now)""" | |
def extract_keywords(text: str, top_n: int = 5) -> list: | |
""" | |
Extracts the most frequent keywords from a given text, excluding common stopwords. | |
Args: | |
text (str): The input text to extract keywords from. | |
top_n (int): The number of top keywords to return. | |
Returns: | |
list: A list of the most frequent keywords. | |
""" | |
if not text: | |
return [] | |
# Use a more comprehensive list of English stopwords | |
stopwords = set([ | |
"a", "an", "and", "are", "as", "at", "be", "but", "by", "for", "if", "in", "into", "is", "it", | |
"no", "not", "of", "on", "or", "such", "that", "the", "their", "then", "there", "these", | |
"they", "this", "to", "was", "will", "with", "he", "she", "it's", "i", "we", "you", "my", | |
"your", "our", "us", "him", "her", "his", "hers", "its", "them", "their", "what", "when", | |
"where", "why", "how", "which", "who", "whom", "can", "could", "would", "should", "may", | |
"might", "must", "have", "has", "had", "do", "does", "did", "am", "are", "is", "were", "been", | |
"being", "from", "up", "down", "out", "off", "over", "under", "again", "further", "then", | |
"once", "here", "there", "when", "where", "why", "how", "all", "any", "both", "each", "few", | |
"more", "most", "other", "some", "such", "no", "nor", "not", "only", "own", "same", "so", | |
"than", "too", "very", "s", "t", "can", "will", "just", "don", "should", "now" | |
]) | |
words = re.findall(r'\b\w+\b', text.lower()) # Relaxed regex to capture all words | |
filtered = [w for w in words if w not in stopwords and len(w) > 2] # Filter words less than 3 chars | |
counter = Counter(filtered) | |
return [word for word, _ in counter.most_common(top_n)] | |
return extract_keywords | |
def _create_agent(self) -> CodeAgent: | |
"""Create agent with improved system prompt""" | |
system_prompt = """ | |
You are an advanced, helpful, and highly analytical research assistant. Your goal is to provide accurate, comprehensive, and well-structured answers to user queries, leveraging all available tools efficiently. | |
**Follow this robust process:** | |
1. **Understand the User's Need:** Carefully analyze the user's question, including any attached files or specific requests (e.g., "summarize," "analyze data," "find facts"). | |
2. **Formulate a Detailed Plan:** Before acting, create a clear, step-by-step plan. This plan should outline: | |
* What information needs to be gathered. | |
* Which tools are most appropriate for each step (e.g., `duckduckgo_search` for general web search, `wiki_search` for encyclopedic facts, `transcript_video` for YouTube, `file_analysis` or `data_analysis` for local files). | |
* How you will combine information from different sources. | |
* How you will verify or synthesize the findings. | |
3. **Execute the Plan Using Tools:** Call the necessary tools, providing clear and correct arguments. If a tool fails, try to understand why and adapt your plan (e.g., try a different search query or tool). | |
4. **Synthesize and Verify Information:** Once you have gathered sufficient information, synthesize it into a coherent answer. Do not just list facts; explain their significance and how they relate to the original question. If there are contradictions or uncertainties, mention them. | |
5. **Formulate the Final Answer:** | |
* Present your answer clearly and concisely. | |
* Always begin your ultimate response with "FINAL ANSWER:". | |
* If the answer is a single number, provide only the number. | |
* If the answer is a list, provide comma-separated values. | |
* For complex answers, use structured formats like bullet points or JSON where appropriate to enhance readability. | |
* **Crucially, always include sources or references (e.g., URLs, Wikipedia titles, file names) where you obtained the information.** This builds trust and allows for verification. | |
* If you used `file_analysis` or `data_analysis` tools on an uploaded file, explicitly state that you analyzed the provided file. | |
**Important Considerations:** | |
* **Prioritize:** If the query involves a specific file, start by analyzing that file if appropriate. | |
* **Limitations:** If you cannot answer a question with the available tools, state that clearly. | |
* **Conciseness:** Be as concise as possible while providing a complete and accurate answer. | |
""" | |
agent = CodeAgent( | |
model=self.model, | |
tools=self.tools, | |
add_base_tools=True | |
) | |
agent.prompt_templates["system_prompt"] = system_prompt | |
return agent | |
def __call__(self, question: str) -> str: | |
print(f"Agent received question (first 50 chars): {question[:50]}...") | |
answer = self.agent.run(question) | |
print(f"Agent returning answer: {answer}") | |
return answer | |
def run_and_submit_all( profile: gr.OAuthProfile | None): | |
""" | |
Fetches all questions, runs the BasicAgent on them, submits all answers, | |
and displays the results. | |
""" | |
# --- Determine HF Space Runtime URL and Repo URL --- | |
space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code | |
if profile: | |
username= f"{profile.username}" | |
print(f"User logged in: {username}") | |
else: | |
print("User not logged in.") | |
return "Please Login to Hugging Face with the button.", None | |
api_url = DEFAULT_API_URL | |
questions_url = f"{api_url}/questions" | |
submit_url = f"{api_url}/submit" | |
# 1. Instantiate Agent ( modify this part to create your agent) | |
try: | |
agent = BasicAgent() | |
except Exception as e: | |
print(f"Error instantiating agent: {e}") | |
return f"Error initializing agent: {e}", None | |
# In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public) | |
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
print(agent_code) | |
# 2. Fetch Questions | |
print(f"Fetching questions from: {questions_url}") | |
try: | |
response = requests.get(questions_url, timeout=15) | |
response.raise_for_status() | |
questions_data = response.json() | |
if not questions_data: | |
print("Fetched questions list is empty.") | |
return "Fetched questions list is empty or invalid format.", None | |
print(f"Fetched {len(questions_data)} questions.") | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching questions: {e}") | |
return f"Error fetching questions: {e}", None | |
except requests.exceptions.JSONDecodeError as e: | |
print(f"Error decoding JSON response from questions endpoint: {e}") | |
print(f"Response text: {response.text[:500]}") | |
return f"Error decoding server response for questions: {e}", None | |
except Exception as e: | |
print(f"An unexpected error occurred fetching questions: {e}") | |
return f"An unexpected error occurred fetching questions: {e}", None | |
# 3. Run your Agent | |
results_log = [] | |
answers_payload = [] | |
print(f"Running agent on {len(questions_data)} questions...") | |
for item in questions_data: | |
task_id = item.get("task_id") | |
question_text = item.get("question") | |
if not task_id or question_text is None: | |
print(f"Skipping item with missing task_id or question: {item}") | |
continue | |
try: | |
submitted_answer = agent(question_text) | |
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) | |
except Exception as e: | |
print(f"Error running agent on task {task_id}: {e}") | |
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) | |
if not answers_payload: | |
print("Agent did not produce any answers to submit.") | |
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
# 4. Prepare Submission | |
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} | |
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." | |
print(status_update) | |
# 5. Submit | |
print(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
try: | |
response = requests.post(submit_url, json=submission_data, timeout=60) | |
response.raise_for_status() | |
result_data = response.json() | |
final_status = ( | |
f"Submission Successful!\n" | |
f"User: {result_data.get('username')}\n" | |
f"Overall Score: {result_data.get('score', 'N/A')}% " | |
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
f"Message: {result_data.get('message', 'No message received.')}" | |
) | |
print("Submission successful.") | |
results_df = pd.DataFrame(results_log) | |
return final_status, results_df | |
except requests.exceptions.HTTPError as e: | |
error_detail = f"Server responded with status {e.response.status_code}." | |
try: | |
error_json = e.response.json() | |
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" | |
except requests.exceptions.JSONDecodeError: | |
error_detail += f" Response: {e.response.text[:500]}" | |
status_message = f"Submission Failed: {error_detail}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except requests.exceptions.Timeout: | |
status_message = "Submission Failed: The request timed out." | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except requests.exceptions.RequestException as e: | |
status_message = f"Submission Failed: Network error - {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except Exception as e: | |
status_message = f"An unexpected error occurred during submission: {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
# --- Build Gradio Interface using Blocks --- | |
with gr.Blocks() as demo: | |
gr.Markdown("# Basic Agent Evaluation Runner") | |
gr.Markdown( | |
""" | |
**Instructions:** | |
1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ... | |
2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. | |
3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. | |
--- | |
**Disclaimers:** | |
Once clicking on the "submit button, it can take quite some time ( this is the time for the agent to go through all the questions). | |
This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a seperate action or even to answer the questions in async. | |
""" | |
) | |
gr.LoginButton() | |
run_button = gr.Button("Run Evaluation & Submit All Answers") | |
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) | |
# Removed max_rows=10 from DataFrame constructor | |
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) | |
run_button.click( | |
fn=run_and_submit_all, | |
outputs=[status_output, results_table] | |
) | |
if __name__ == "__main__": | |
print("\n" + "-"*30 + " App Starting " + "-"*30) | |
# Check for SPACE_HOST and SPACE_ID at startup for information | |
space_host_startup = os.getenv("SPACE_HOST") | |
space_id_startup = os.getenv("SPACE_ID") # Get SPACE_ID at startup | |
if space_host_startup: | |
print(f"✅ SPACE_HOST found: {space_host_startup}") | |
print(f" Runtime URL should be: https://{space_host_startup}.hf.space") | |
else: | |
print("ℹ️ SPACE_HOST environment variable not found (running locally?).") | |
if space_id_startup: # Print repo URLs if SPACE_ID is found | |
print(f"✅ SPACE_ID found: {space_id_startup}") | |
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") | |
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") | |
else: | |
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") | |
print("-"*(60 + len(" App Starting ")) + "\n") | |
print("Launching Gradio Interface for Basic Agent Evaluation...") | |
demo.launch(debug=True, share=False) |