Spaces:
Sleeping
Sleeping
import os | |
import re | |
import gradio as gr | |
import requests | |
import pandas as pd | |
import heapq | |
from collections import Counter | |
from io import BytesIO | |
from youtube_transcript_api import YouTubeTranscriptApi | |
from smolagents import tool, Tool, CodeAgent, DuckDuckGoSearchTool, HfApiModel, VisitWebpageTool, SpeechToTextTool, FinalAnswerTool | |
from langchain_community.document_loaders import WikipediaLoader, PyPDFLoader, TextLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
# Use the new import for HuggingFaceEmbeddings | |
from langchain_huggingface import HuggingFaceEmbeddings # <--- IMPORTANT: Updated import | |
from langchain_community.vectorstores import DocArrayInMemorySearch | |
from langchain_core.documents import Document | |
from dotenv import load_dotenv | |
import tempfile | |
import mimetypes | |
import logging | |
import uuid | |
# For timeout functionality | |
import concurrent.futures | |
import time | |
# Import DocList from docarray | |
from docarray import DocList # <--- IMPORTANT: Added this import | |
# --- Initialize logging --- | |
LOG_FILE_PATH = "agent_activity.log" | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
filename=LOG_FILE_PATH, | |
filemode='a' | |
) | |
logger = logging.getLogger(__name__) | |
# --- Load environment variables --- | |
load_dotenv() | |
HF_API_TOKEN = os.getenv("HF_API_TOKEN") | |
HF_EMBEDDING_MODEL_ID = os.getenv("HF_EMBEDDING_MODEL_ID", "sentence-transformers/all-MiniLM-L6-v2") | |
if not HF_API_TOKEN: | |
logger.error("HF_API_TOKEN not found in environment variables! Please set it to use the HfApiModel.") | |
# --- Global Vector Store and Embeddings --- | |
try: | |
embeddings = HuggingFaceEmbeddings(model_name=HF_EMBEDDING_MODEL_ID) | |
logger.info(f"Initialized HuggingFaceEmbeddings with model: {HF_EMBEDDING_MODEL_ID}") | |
except Exception as e: | |
logger.error(f"Failed to initialize HuggingFaceEmbeddings: {e}. Please ensure the model_id is correct and dependencies are installed.") | |
embeddings = None | |
# Initialize DocArrayInMemorySearch WITH the required arguments: doc_index and embedding | |
vectorstore = DocArrayInMemorySearch(doc_index=DocList(), embedding=embeddings) if embeddings else None # <--- FIXED THIS LINE | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=1000, | |
chunk_overlap=200, | |
length_function=len, | |
is_separator_regex=False, | |
) | |
logger.info("Initialized in-memory DocArrayInMemorySearch vector store and RecursiveCharacterTextSplitter.") | |
# --- Utility Functions --- | |
def extract_youtube_id(url: str) -> str: | |
"""Extract YouTube ID from various URL formats""" | |
patterns = [ | |
r'(?:https?:\/\/)?(?:www\.)?youtube\.com\/watch\?v=([^&]+)', | |
r'(?:https?:\/\/)?youtu\.be\/([^?]+)', | |
r'([a-zA-Z0-9_-]{11})' | |
] | |
for pattern in patterns: | |
match = re.search(pattern, url) | |
if match: | |
return match.group(1) | |
return "" | |
def add_document_to_vector_store(content: str, source: str, metadata: dict = None): | |
""" | |
Adds content to the global vector store. | |
Chunks the content and creates LangChain Documents. | |
""" | |
if vectorstore is None: | |
logger.warning("Vector store not initialized. Cannot add document.") | |
return | |
try: | |
chunks = text_splitter.split_text(content) | |
docs = [] | |
for i, chunk in enumerate(chunks): | |
doc_metadata = {"source": source, "chunk_index": i} | |
if metadata: | |
doc_metadata.update(metadata) | |
docs.append(Document(page_content=chunk, metadata=doc_metadata)) | |
# When vectorstore is initialized with embedding, add_documents might not need it again. | |
# But explicitly passing it is safer if there are multiple ways to initialize. | |
vectorstore.add_documents(docs) # No `embedding` argument needed here if initialized in __init__ | |
logger.info(f"Added {len(docs)} chunks from '{source}' to the vector store.") | |
except Exception as e: | |
logger.error(f"Error adding document from '{source}' to vector store: {e}") | |
# --- Enhanced Tools --- | |
class WikiSearchTool(Tool): | |
"""Enhanced Wikipedia search with better formatting and error handling""" | |
name = "wiki_search" | |
description = "Search Wikipedia for a query. Returns up to 2 results with metadata." | |
inputs = {"query": {"type": "string", "description": "Search term for Wikipedia"}} | |
output_type = "string" | |
def forward(self, query: str) -> str: | |
try: | |
logger.info(f"Searching Wikipedia for: {query}") | |
docs = WikipediaLoader(query=query, load_max_docs=2).load() | |
if not docs: | |
logger.info(f"No Wikipedia articles found for: {query}") | |
return "No Wikipedia articles found." | |
formatted_results = [] | |
for i, doc in enumerate(docs): | |
summary = doc.page_content[:1000] + "..." if len(doc.page_content) > 1000 else doc.page_content | |
add_document_to_vector_store( | |
content=doc.page_content, | |
source=doc.metadata.get('source', 'Wikipedia'), | |
metadata={"title": doc.metadata.get('title', 'N/A')} | |
) | |
formatted_results.append( | |
f"--- Wikipedia Result {i+1} ---\n" | |
f"Title: {doc.metadata.get('title', 'N/A')}\n" | |
f"URL: {doc.metadata.get('source', 'N/A')}\n" | |
f"Summary: {summary}\n" | |
) | |
return "\n\n".join(formatted_results) | |
except Exception as e: | |
logger.error(f"Wikipedia search error for '{query}': {e}") | |
return f"Wikipedia search error: {str(e)}" | |
class FileAnalysisTool(Tool): | |
"""Universal file analyzer for text/PDF/Excel files. Content added to vector store.""" | |
name = "file_analysis" | |
description = "Analyze text, PDF, and Excel files. Returns extracted content. Text and PDF content is also indexed for future retrieval." | |
inputs = {"file_path": {"type": "string", "description": "Path to the local file"}} | |
output_type = "string" | |
def forward(self, file_path: str) -> str: | |
if not os.path.exists(file_path): | |
return f"File not found: {file_path}" | |
try: | |
mime_type, _ = mimetypes.guess_type(file_path) | |
logger.info(f"Analyzing file: {file_path} with MIME type: {mime_type}") | |
content = "" | |
if mime_type == "application/pdf": | |
content = self._process_pdf(file_path) | |
elif mime_type in ["application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", "application/vnd.ms-excel"]: | |
content = self._process_excel(file_path) | |
elif mime_type and ("text" in mime_type or "csv" in mime_type): | |
content = self._process_text(file_path) | |
else: | |
return f"Unsupported file type for analysis: {mime_type}. Only PDF, Excel, and text/CSV files are supported." | |
if mime_type in ["application/pdf", "text/plain", "text/csv"]: | |
add_document_to_vector_store( | |
content=content, | |
source=f"file:{os.path.basename(file_path)}", | |
metadata={"file_path": file_path, "mime_type": mime_type} | |
) | |
return content | |
except Exception as e: | |
logger.error(f"File analysis error for '{file_path}': {e}") | |
return f"File analysis error: {str(e)}" | |
def _process_pdf(self, path: str) -> str: | |
loader = PyPDFLoader(path) | |
docs = loader.load() | |
content = "\n\n".join([doc.page_content for doc in docs]) | |
if len(content) > 8000: | |
logger.warning(f"PDF content truncated from {len(content)} to 8000 characters for {path}") | |
return content[:8000] + "\n... [Content truncated]" | |
return content | |
def _process_excel(self, path: str) -> str: | |
df = pd.read_excel(path) | |
info = BytesIO() | |
df.info(buf=info) | |
info_str = info.getvalue().decode('utf-8') | |
return (f"Excel file loaded. First 10 rows:\n{df.head(10).to_markdown()}\n\n" | |
f"DataFrame Info:\n{info_str}") | |
def _process_text(self, path: str) -> str: | |
with open(path, 'r', encoding='utf-8') as f: | |
content = f.read() | |
if len(content) > 8000: | |
logger.warning(f"Text file content truncated from {len(content)} to 8000 characters for {path}") | |
return content[:8000] + "\n... [Content truncated]" | |
return content | |
class VideoTranscriptionTool(Tool): | |
"""Enhanced YouTube transcription with multilingual support and better output. Transcribed content is added to vector store.""" | |
name = "transcript_video" | |
description = "Fetch YouTube video transcripts with optional timestamps. Supports English, French, Spanish, German. Transcribed text is indexed for future retrieval." | |
inputs = { | |
"url": {"type": "string", "description": "YouTube URL or ID"}, | |
"include_timestamps": {"type": "boolean", "description": "Include timestamps? (default: False)"} | |
} | |
output_type = "string" | |
def forward(self, url: str, include_timestamps: bool = False) -> str: | |
try: | |
video_id = extract_youtube_id(url) | |
if not video_id: | |
return "Invalid YouTube URL or ID format. Please provide a valid YouTube URL or an 11-character video ID." | |
logger.info(f"Attempting to transcribe video ID: {video_id}") | |
transcript_list = YouTubeTranscriptApi.get_transcript( | |
video_id, | |
languages=['en', 'fr', 'es', 'de'] | |
) | |
if not transcript_list: | |
return f"No transcript found for video ID: {video_id} in supported languages (en, fr, es, de)." | |
full_transcript_text = " ".join(seg['text'] for seg in transcript_list) | |
add_document_to_vector_store( | |
content=full_transcript_text, | |
source=f"youtube_video:{video_id}", | |
metadata={"video_url": url} | |
) | |
if include_timestamps: | |
formatted_transcript = "\n".join( | |
f"[{int(seg['start']//60):02d}:{int(seg['start']%60):02d}] {seg['text']}" | |
for seg in transcript_list | |
) | |
else: | |
formatted_transcript = full_transcript_text | |
return formatted_transcript | |
except Exception as e: | |
logger.error(f"Transcription error for '{url}': {e}") | |
return f"Transcription error: {str(e)}. This might be due to no available transcript or an unsupported video." | |
class DataAnalysisTool(Tool): | |
"""Perform data analysis using pandas on structured data (CSV/Excel)""" | |
name = "data_analysis" | |
description = "Analyze CSV/Excel data using pandas operations. Supported operations: 'describe', 'groupby:column:aggfunc' (e.g., 'groupby:Category:mean'). Outputs are NOT added to vector store." | |
inputs = { | |
"file_path": {"type": "string", "description": "Path to the local data file (CSV or Excel)"}, | |
"operation": {"type": "string", "description": "Pandas operation (e.g., 'describe', 'groupby:column_name:agg_function')"} | |
} | |
output_type = "string" | |
def forward(self, file_path: str, operation: str) -> str: | |
if not os.path.exists(file_path): | |
return f"File not found: {file_path}" | |
try: | |
if file_path.endswith('.csv'): | |
df = pd.read_csv(file_path) | |
elif file_path.endswith('.xlsx') or file_path.endswith('.xls'): | |
df = pd.read_excel(file_path) | |
else: | |
return "Unsupported file format for data analysis. Please provide a .csv or .xlsx file." | |
logger.info(f"Performing data analysis operation '{operation}' on {file_path}") | |
if operation == "describe": | |
return "Descriptive Statistics:\n" + str(df.describe()) | |
elif operation.startswith("groupby:"): | |
parts = operation.split(":") | |
if len(parts) == 3: | |
_, col, agg = parts | |
if col not in df.columns: | |
return f"Column '{col}' not found in the DataFrame." | |
try: | |
result = df.groupby(col).agg(agg) | |
return f"Groupby operation '{agg}' on column '{col}':\n" + str(result) | |
except Exception as agg_e: | |
return f"Error performing aggregation '{agg}' on column '{col}': {str(agg_e)}" | |
else: | |
return "Invalid 'groupby' operation format. Use 'groupby:column_name:agg_function'." | |
else: | |
return "Unsupported operation. Try: 'describe' or 'groupby:column_name:agg_function'." | |
except Exception as e: | |
logger.error(f"Data analysis error for '{file_path}' with operation '{operation}': {e}") | |
return f"Data analysis error: {str(e)}. Please check file content and operation." | |
class RetrievalTool(Tool): | |
""" | |
Retrieves relevant information from the in-memory vector store based on a query. | |
This tool allows the agent to access previously processed documents and transcripts. | |
""" | |
name = "retrieve_from_vector_store" | |
description = "Search for relevant information within previously processed documents and transcripts using a semantic query. Returns top K relevant chunks." | |
inputs = { | |
"query": {"type": "string", "description": "The semantic query to search the vector store."}, | |
"k": {"type": "integer", "description": "Number of top results to retrieve (default: 3)", "default": 3} | |
} | |
output_type = "string" | |
def forward(self, query: str, k: int = 3) -> str: | |
if vectorstore is None or embeddings is None: | |
return "Vector store is not initialized or embeddings are missing. No documents available for retrieval." | |
try: | |
logger.info(f"Retrieving {k} chunks from DocArrayInMemorySearch for query: {query}") | |
# Ensure similarity_search uses the vectorstore's internal embedding if initialized correctly | |
# or if it takes an explicit embedding argument here. | |
# With DocArrayInMemorySearch initialized with `embedding=embeddings`, this call should be fine. | |
retrieved_docs = vectorstore.similarity_search(query, k=k) | |
if not retrieved_docs: | |
return "No relevant information found in the vector store for this query." | |
formatted_results = [] | |
for i, doc in enumerate(retrieved_docs): | |
source = doc.metadata.get('source', 'Unknown Source') | |
title = doc.metadata.get('title', 'N/A') | |
chunk_index = doc.metadata.get('chunk_index', 'N/A') | |
formatted_results.append( | |
f"--- Retrieved Document Chunk {i+1} ---\n" | |
f"Source: {source} (Chunk: {chunk_index})\n" | |
f"Title: {title}\n" | |
f"Content: {doc.page_content}\n" | |
) | |
return "\n\n".join(formatted_results) | |
except Exception as e: | |
logger.error(f"Error retrieving from vector store for query '{query}': {e}") | |
return f"Error retrieving from vector store: {str(e)}" | |
class ChessAnalysisAPITool(Tool): | |
""" | |
Analyzes a chess position provided in FEN format using a remote chess engine API (chess-api.com). | |
""" | |
name = "analyze_chess_position_api" | |
description = ( | |
"Analyze a chess position provided in FEN (Forsyth-Edwards Notation) format using an online engine. " | |
"Returns the best move in algebraic notation for the current player, along with evaluation." | |
"Note: This tool cannot interpret chess positions directly from images. " | |
"The FEN string must be provided by the user." | |
) | |
inputs = { | |
"fen_string": {"type": "string", "description": "The chess position in FEN format. Example: 'rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 1'"}, | |
"depth": {"type": "integer", "description": "The analysis depth for the chess engine (higher means better, but slower; max ~18 for this API; default: 15)", "default": 15} | |
} | |
output_type = "string" | |
def forward(self, fen_string: str, depth: int = 15) -> str: | |
actual_depth = min(depth, 18) | |
try: | |
logger.info(f"Analyzing FEN: {fen_string} at depth {actual_depth} using chess-api.com.") | |
response = requests.post( | |
"https://chess-api.com/v1", | |
json={"fen": fen_string, "depth": actual_depth} | |
) | |
response.raise_for_status() | |
data = response.json() | |
if data.get("type") == "bestmove": | |
move_san = data.get("san", data.get("move")) | |
evaluation = data.get("eval") | |
mate_in_moves = data.get("mate") | |
result = f"Best move: **{move_san}** (UCI: {data.get('move')})\n" | |
if mate_in_moves is not None: | |
player_to_move = "White" if data.get("turn") == 'w' else "Black" | |
result += f"Forced mate for {player_to_move} in {abs(mate_in_moves)} moves.\n" | |
elif evaluation is not None: | |
eval_str = "" | |
if evaluation >= 1000: | |
eval_str = "Decisive advantage for White" | |
elif evaluation <= -1000: | |
eval_str = "Decisive advantage for Black" | |
elif evaluation > 0: | |
eval_str = f"White is up by {evaluation} centipawns" | |
elif evaluation < 0: | |
eval_str = f"Black is up by {abs(evaluation)} centipawns" | |
else: | |
eval_str = "Even position" | |
result += f"Evaluation: {eval_str} (Depth: {data.get('depth')})\n" | |
result += "(Source: chess-api.com - Stockfish 17 NNUE)" | |
return result | |
else: | |
return f"Chess API response: {data.get('text', 'No best move found or error.')}" | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Error communicating with remote chess analysis API for FEN '{fen_string}': {e}") | |
return f"Error contacting remote chess analysis API: {str(e)}. Please try again later." | |
except Exception as e: | |
logger.error(f"An unexpected error occurred during remote chess analysis for FEN '{fen_string}': {e}") | |
return f"An unexpected error occurred during chess analysis: {str(e)}" | |
# --- Agent Initialization --- | |
class BasicAgent: | |
def __init__(self): | |
self.model = HfApiModel( | |
temperature=0.0, | |
token=os.environ.get("HF_API_TOKEN"), | |
max_tokens=2000 | |
) | |
self.tools = self._initialize_tools() | |
self.agent = self._create_agent() | |
def _initialize_tools(self) -> list: | |
"""Initialize all tools with enhanced capabilities""" | |
base_tools = [ | |
DuckDuckGoSearchTool(), | |
WikiSearchTool(), | |
VisitWebpageTool(), | |
SpeechToTextTool(), | |
FinalAnswerTool(), | |
VideoTranscriptionTool(), | |
FileAnalysisTool(), | |
DataAnalysisTool(), | |
self._create_excel_download_tool(), | |
self._create_keywords_tool(), | |
ChessAnalysisAPITool(), | |
] | |
if vectorstore and embeddings: | |
logger.info("Adding RetrievalTool to the agent's tools.") | |
base_tools.append(RetrievalTool()) | |
else: | |
logger.warning("RetrievalTool not added because vector store or embeddings are not initialized.") | |
return base_tools | |
def _create_excel_download_tool(self): | |
"""Tool to download and parse Excel files from a specific URL""" | |
def download_and_parse_excel(task_id: str) -> dict: | |
""" | |
Downloads an Excel file from a predefined URL using a task_id and parses its content. | |
Returns a dictionary with status and data (first 10 rows), columns, and shape. | |
""" | |
try: | |
url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}" | |
logger.info(f"Attempting to download Excel from: {url}") | |
response = requests.get(url, timeout=60) | |
response.raise_for_status() | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.xlsx') as tmp: | |
tmp.write(response.content) | |
temp_file_path = tmp.name | |
df = pd.read_excel(temp_file_path) | |
os.unlink(temp_file_path) | |
logger.info(f"Successfully downloaded and parsed Excel for task_id: {task_id}") | |
return { | |
"task_id": task_id, | |
"data_sample": df.head(10).to_dict(orient="records"), | |
"status": "Success", | |
"columns": df.columns.tolist(), | |
"shape": df.shape | |
} | |
except requests.exceptions.RequestException as req_err: | |
logger.error(f"Network or HTTP error downloading Excel for task_id '{task_id}': {req_err}") | |
return {"status": f"Download error: {str(req_err)}"} | |
except Exception as e: | |
logger.error(f"Error parsing Excel for task_id '{task_id}': {e}") | |
return {"status": f"Parsing error: {str(e)}"} | |
return download_and_parse_excel | |
def _create_keywords_tool(self): | |
"""Keywords extractor with TF-IDF like scoring (basic frequency for now)""" | |
def extract_keywords(text: str, top_n: int = 5) -> list: | |
""" | |
Extracts the most frequent keywords from a given text, excluding common stopwords. | |
Args: | |
text (str): The input text to extract keywords from. | |
top_n (int): The number of top keywords to return. | |
Returns: | |
list: A list of the most frequent keywords. | |
""" | |
if not text: | |
return [] | |
stopwords = set([ | |
"a", "an", "and", "are", "as", "at", "be", "but", "by", "for", "if", "in", "into", "is", "it", | |
"no", "not", "of", "on", "or", "such", "that", "the", "their", "then", "there", "these", | |
"they", "this", "to", "was", "will", "with", "he", "she", "it's", "i", "we", "you", "my", | |
"your", "our", "us", "him", "her", "his", "hers", "its", "them", "their", "what", "when", | |
"where", "why", "how", "which", "who", "whom", "can", "could", "would", "should", "may", | |
"might", "must", "have", "has", "had", "do", "does", "did", "am", "are", "is", "were", "been", | |
"being", "from", "up", "down", "out", "off", "over", "under", "again", "further", "then", | |
"once", "here", "there", "when", "where", "why", "how", "all", "any", "both", "each", "few", | |
"more", "most", "other", "some", "such", "no", "nor", "not", "only", "own", "same", "so", | |
"than", "too", "very", "s", "t", "can", "will", "just", "don", "should", "now" | |
]) | |
words = re.findall(r'\b\w+\b', text.lower()) | |
filtered = [w for w in words if w not in stopwords and len(w) > 2] | |
counter = Counter(filtered) | |
return [word for word, _ in counter.most_common(top_n)] | |
return extract_keywords | |
def _create_agent(self) -> CodeAgent: | |
"""Create agent with improved system prompt""" | |
system_prompt = """ | |
You are an advanced, helpful, and highly analytical research assistant. Your goal is to provide accurate, comprehensive, and well-structured answers to user queries, leveraging all available tools efficiently. | |
**Follow this robust process:** | |
1. **Understand the User's Need:** Carefully analyze the user's question, including any attached files or specific requests (e.g., "summarize," "analyze data," "find facts"). | |
2. **Formulate a Detailed Plan:** Before acting, create a clear, step-by-step plan. This plan should outline: | |
* What information needs to be gathered. | |
* Which tools are most appropriate for each step. | |
* Use `retrieve_from_vector_store` first if the query seems to be related to previously processed information (e.g., "What did we learn about X from the uploaded document?"). | |
* Use `duckduckgo_search` for general web search. | |
* Use `wiki_search` for encyclopedic facts. | |
* Use `transcript_video` for YouTube video content. | |
* Use `file_analysis` to inspect content of local files. | |
* Use `data_analysis` for structured analysis of CSV/Excel files. | |
* Use `analyze_chess_position_api` if the user provides a FEN string for a chess position and asks for the best move. | |
* How you will combine information from different sources. | |
* How you will verify or synthesize the findings. | |
3. **Execute the Plan Using Tools:** Call the necessary tools, providing clear and correct arguments. If a tool fails, try to understand why and adapt your plan (e.g., try a different search query or tool). | |
4. **Synthesize and Verify Information:** Once you have gathered sufficient information, synthesize it into a coherent answer. Do not just list facts; explain their significance and how they relate to the original question. If there are contradictions or uncertainties, mention them. | |
5. **Formulate the Final Answer:** | |
* Present your answer clearly and concisely. | |
* Always begin your ultimate response with "FINAL ANSWER:". | |
* If the answer is a single number, provide only the number. | |
* If the answer is a list, provide comma-separated values. | |
* For complex answers, use structured formats like bullet points or JSON where appropriate to enhance readability. | |
* **Crucially, always include sources or references (e.g., URLs, Wikipedia titles, file names, "Internal Knowledge Base", "Remote Chess API") where you obtained the information.** This builds trust and allows for verification. | |
* If you used `file_analysis` or `data_analysis` tools on an uploaded file, explicitly state that you analyzed the provided file. | |
**Important Considerations:** | |
* **Prioritize:** If the query involves a specific file, start by analyzing that file if appropriate. If the query seems to refer to previously processed data, try `retrieve_from_vector_store` first. | |
* **Limitations:** If you cannot answer a question with the available tools, state that clearly. | |
* **Conciseness:** Be as concise as possible while providing an accurate answer. | |
""" | |
agent = CodeAgent( | |
model=self.model, | |
tools=self.tools, | |
add_base_tools=True, | |
max_steps=15 # <--- Added this to limit agent's internal reasoning/tool-use steps | |
) | |
agent.prompt_templates["system_prompt"] = system_prompt | |
return agent | |
def __call__(self, question: str) -> str: | |
logger.info(f"Received question: {question[:200]}...") | |
print(f"Agent received question (first 50 chars): {question[:50]}...") | |
try: | |
global vectorstore | |
# Re-initialize vectorstore for a new session without arguments | |
# This relies on the add_documents and similarity_search methods getting the embedding | |
if embeddings: | |
vectorstore = DocArrayInMemorySearch() # <--- REVERTED TO THIS SIMPLE INIT HERE TOO | |
logger.info("DocArrayInMemorySearch re-initialized for new session.") | |
else: | |
logger.warning("Embeddings not initialized, cannot re-initialize DocArrayInMemorySearch.") | |
return "Error: Embedding model not loaded, cannot process request." | |
# --- Implement a timeout for the agent's run method --- | |
AGENT_TIMEOUT_SECONDS = 120 # Max time in seconds for the agent to respond | |
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: | |
future = executor.submit(self.agent.run, question) | |
try: | |
response = future.result(timeout=AGENT_TIMEOUT_SECONDS) | |
except concurrent.futures.TimeoutError: | |
logger.warning(f"Agent execution timed out after {AGENT_TIMEOUT_SECONDS} seconds for question: {question[:100]}...") | |
future.cancel() # Cancel the future if it's still running | |
return "Error: The agent took too long to respond and timed out. Please try again with a simpler query or check the input." | |
except Exception as e: | |
# Catch any other exceptions that might occur during agent.run | |
logger.error(f"Agent execution failed during run for question '{question[:100]}': {str(e)}", exc_info=True) | |
return f"Error processing your request: {str(e)}. Please try again or rephrase your question." | |
logger.info(f"Response generated successfully for question: {question[:200]}") | |
# print statement for immediate console feedback of the final answer | |
print(f"Agent returning answer: {response}") | |
return response | |
except Exception as e: | |
# This outer catch is for issues before agent.run is called or unhandled by the ThreadPoolExecutor | |
logger.error(f"Agent setup or execution failed (outer catch) for question '{question[:100]}': {str(e)}", exc_info=True) | |
return f"Error processing your request: {str(e)}. Please try again or rephrase your question." | |
def run_and_submit_all( profile: gr.OAuthProfile | None): | |
""" | |
Fetches all questions, runs the BasicAgent on them, submits all answers, | |
and displays the results. | |
""" | |
# --- Determine HF Space Runtime URL and Repo URL --- | |
space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code | |
if profile: | |
username= f"{profile.username}" | |
print(f"User logged in: {username}") | |
else: | |
print("User not logged in.") | |
return "Please Login to Hugging Face with the button.", None | |
api_url = DEFAULT_API_URL | |
questions_url = f"{api_url}/questions" | |
submit_url = f"{api_url}/submit" | |
# 1. Instantiate Agent ( modify this part to create your agent) | |
try: | |
agent = BasicAgent() | |
except Exception as e: | |
print(f"Error instantiating agent: {e}") | |
return f"Error initializing agent: {e}", None | |
# In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public) | |
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
print(agent_code) | |
# 2. Fetch Questions | |
print(f"Fetching questions from: {questions_url}") | |
try: | |
response = requests.get(questions_url, timeout=15) | |
response.raise_for_status() | |
questions_data = response.json() | |
if not questions_data: | |
print("Fetched questions list is empty.") | |
return "Fetched questions list is empty or invalid format.", None | |
print(f"Fetched {len(questions_data)} questions.") | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching questions: {e}") | |
return f"Error fetching questions: {e}", None | |
except requests.exceptions.JSONDecodeError as e: | |
print(f"Error decoding JSON response from questions endpoint: {e}") | |
print(f"Response text: {response.text[:500]}") | |
return f"Error decoding server response for questions: {e}", None | |
except Exception as e: | |
print(f"An unexpected error occurred fetching questions: {e}") | |
return f"An unexpected error occurred fetching questions: {e}", None | |
# 3. Run your Agent | |
results_log = [] | |
answers_payload = [] | |
print(f"Running agent on {len(questions_data)} questions...") | |
for item in questions_data: | |
task_id = item.get("task_id") | |
question_text = item.get("question") | |
if not task_id or question_text is None: | |
print(f"Skipping item with missing task_id or question: {item}") | |
continue | |
try: | |
submitted_answer = agent(question_text) | |
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) | |
except Exception as e: | |
print(f"Error running agent on task {task_id}: {e}") | |
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) | |
if not answers_payload: | |
print("Agent did not produce any answers to submit.") | |
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
# 4. Prepare Submission | |
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} | |
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." | |
print(status_update) | |
# 5. Submit | |
print(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
try: | |
response = requests.post(submit_url, json=submission_data, timeout=60) | |
response.raise_for_status() | |
result_data = response.json() | |
final_status = ( | |
f"Submission Successful!\n" | |
f"User: {result_data.get('username')}\n" | |
f"Overall Score: {result_data.get('score', 'N/A')}% " | |
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
f"Message: {result_data.get('message', 'No message received.')}" | |
) | |
print("Submission successful.") | |
results_df = pd.DataFrame(results_log) | |
return final_status, results_df | |
except requests.exceptions.HTTPError as e: | |
error_detail = f"Server responded with status {e.response.status_code}." | |
try: | |
error_json = e.response.json() | |
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" | |
except requests.exceptions.JSONDecodeError: | |
error_detail += f" Response: {e.response.text[:500]}" | |
status_message = f"Submission Failed: {error_detail}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except requests.exceptions.Timeout: | |
status_message = "Submission Failed: The request timed out." | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except requests.exceptions.RequestException as e: | |
status_message = f"Submission Failed: Network error - {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except Exception as e: | |
status_message = f"An unexpected error occurred during submission: {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
# --- Build Gradio Interface using Blocks --- | |
with gr.Blocks() as demo: | |
gr.Markdown("# Basic Agent Evaluation Runner") | |
gr.Markdown( | |
""" | |
**Instructions:** | |
1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ... | |
2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. | |
3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. | |
--- | |
**Disclaimers:** | |
Once clicking on the "submit button, it can take quite some time ( this is the time for the agent to go through all the questions). | |
This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a seperate action or even to answer the questions in async. | |
""" | |
) | |
gr.LoginButton() | |
run_button = gr.Button("Run Evaluation & Submit All Answers") | |
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) | |
# Removed max_rows=10 from DataFrame constructor | |
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) | |
run_button.click( | |
fn=run_and_submit_all, | |
outputs=[status_output, results_table] | |
) | |
if __name__ == "__main__": | |
print("\n" + "-"*30 + " App Starting " + "-"*30) | |
# Check for SPACE_HOST and SPACE_ID at startup for information | |
space_host_startup = os.getenv("SPACE_HOST") | |
space_id_startup = os.getenv("SPACE_ID") # Get SPACE_ID at startup | |
if space_host_startup: | |
print(f"✅ SPACE_HOST found: {space_host_startup}") | |
print(f" Runtime URL should be: https://{space_host_startup}.hf.space") | |
else: | |
print("ℹ️ SPACE_HOST environment variable not found (running locally?).") | |
if space_id_startup: # Print repo URLs if SPACE_ID is found | |
print(f"✅ SPACE_ID found: {space_id_startup}") | |
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") | |
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") | |
else: | |
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") | |
print("-"*(60 + len(" App Starting ")) + "\n") | |
print("Launching Gradio Interface for Basic Agent Evaluation...") | |
demo.launch(debug=True, share=False) |