HF_Agents_Final_Project / src /spreadsheet_tool.py
Yago Bolivar
feat: implement SpreadsheetTool for parsing and querying Excel files with detailed summaries
108e7a1
raw
history blame
9.42 kB
import os
import pandas as pd
from typing import Dict, List, Union, Tuple, Any
import numpy as np
class SpreadsheetTool:
"""Tool for parsing and extracting data from Excel (.xlsx) files."""
def __init__(self):
"""Initialize the SpreadsheetTool."""
pass
def parse_spreadsheet(self, file_path: str) -> Dict[str, Any]:
"""
Parse an Excel spreadsheet and extract useful information.
Args:
file_path: Path to the .xlsx file
Returns:
Dictionary containing:
- sheets: Dictionary of sheet names and their DataFrames
- sheet_names: List of sheet names
- summary: Basic spreadsheet summary
- error: Error message if any
"""
if not os.path.exists(file_path):
return {"error": f"File not found: {file_path}"}
try:
# Read all sheets in the Excel file
excel_file = pd.ExcelFile(file_path)
sheet_names = excel_file.sheet_names
sheets = {}
for sheet_name in sheet_names:
sheets[sheet_name] = pd.read_excel(excel_file, sheet_name=sheet_name)
# Create a summary of the spreadsheet
summary = self._create_summary(sheets)
return {
"sheets": sheets,
"sheet_names": sheet_names,
"summary": summary,
"error": None
}
except Exception as e:
return {"error": f"Error parsing spreadsheet: {str(e)}"}
def _create_summary(self, sheets_dict: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
"""Create a summary of the spreadsheet contents."""
summary = {}
for sheet_name, df in sheets_dict.items():
summary[sheet_name] = {
"shape": df.shape,
"columns": df.columns.tolist(),
"numeric_columns": df.select_dtypes(include=[np.number]).columns.tolist(),
"text_columns": df.select_dtypes(include=['object']).columns.tolist(),
"has_nulls": df.isnull().any().any(),
"first_few_rows": df.head(3).to_dict('records')
}
return summary
def query_data(self, data: Dict[str, Any], query_instructions: str) -> Dict[str, Any]:
"""
Execute a query on the spreadsheet data based on instructions.
Args:
data: The parsed spreadsheet data (from parse_spreadsheet)
query_instructions: Instructions for querying the data (e.g., "Sum column A")
Returns:
Dictionary with query results and potential explanation
"""
if data.get("error"):
return {"error": data["error"]}
try:
# This is where you'd implement more sophisticated query logic
# For now, we'll implement some basic operations
sheets = data["sheets"]
result = {}
# Handle common operations based on query_instructions
if "sum" in query_instructions.lower():
# Extract column or range to sum
# This is a simple implementation - a more robust one would use regex or NLP
for sheet_name, df in sheets.items():
numeric_cols = df.select_dtypes(include=[np.number]).columns
if not numeric_cols.empty:
result[f"{sheet_name}_sums"] = {
col: df[col].sum() for col in numeric_cols
}
elif "average" in query_instructions.lower() or "mean" in query_instructions.lower():
for sheet_name, df in sheets.items():
numeric_cols = df.select_dtypes(include=[np.number]).columns
if not numeric_cols.empty:
result[f"{sheet_name}_averages"] = {
col: df[col].mean() for col in numeric_cols
}
elif "count" in query_instructions.lower():
for sheet_name, df in sheets.items():
result[f"{sheet_name}_counts"] = {
"rows": len(df),
"non_null_counts": df.count().to_dict()
}
# Add the raw data structure for more custom processing by the agent
result["data_structure"] = {
sheet_name: {
"columns": df.columns.tolist(),
"dtypes": df.dtypes.astype(str).to_dict()
} for sheet_name, df in sheets.items()
}
return result
except Exception as e:
return {"error": f"Error querying data: {str(e)}"}
def extract_specific_data(self, data: Dict[str, Any], sheet_name: str = None,
column_names: List[str] = None,
row_indices: List[int] = None) -> Dict[str, Any]:
"""
Extract specific data from the spreadsheet.
Args:
data: The parsed spreadsheet data
sheet_name: Name of the sheet to extract from (default: first sheet)
column_names: List of column names to extract (default: all columns)
row_indices: List of row indices to extract (default: all rows)
Returns:
Dictionary with extracted data
"""
if data.get("error"):
return {"error": data["error"]}
try:
sheets = data["sheets"]
# Default to the first sheet if not specified
if sheet_name is None:
sheet_name = data["sheet_names"][0]
if sheet_name not in sheets:
return {"error": f"Sheet '{sheet_name}' not found"}
df = sheets[sheet_name]
# Filter columns if specified
if column_names:
# Check if all requested columns exist
missing_columns = [col for col in column_names if col not in df.columns]
if missing_columns:
return {"error": f"Columns not found: {missing_columns}"}
df = df[column_names]
# Filter rows if specified
if row_indices:
# Check if indices are in range
max_index = len(df) - 1
invalid_indices = [i for i in row_indices if i < 0 or i > max_index]
if invalid_indices:
return {"error": f"Row indices out of range: {invalid_indices}. Valid range: 0-{max_index}"}
df = df.iloc[row_indices]
return {
"data": df.to_dict('records'),
"shape": df.shape
}
except Exception as e:
return {"error": f"Error extracting specific data: {str(e)}"}
# Example usage (if this script is run directly)
if __name__ == "__main__":
# Create a simple test spreadsheet for demonstration
test_dir = "spreadsheet_test"
os.makedirs(test_dir, exist_ok=True)
# Create a test DataFrame
test_data = {
'Product': ['Apple', 'Orange', 'Banana', 'Mango'],
'Price': [1.2, 0.8, 0.5, 1.5],
'Quantity': [100, 80, 200, 50],
'Revenue': [120, 64, 100, 75]
}
df = pd.DataFrame(test_data)
test_file_path = os.path.join(test_dir, "test_spreadsheet.xlsx")
# Save to Excel
with pd.ExcelWriter(test_file_path) as writer:
df.to_excel(writer, sheet_name='Sales', index=False)
# Create a second sheet with different data
pd.DataFrame({
'Month': ['Jan', 'Feb', 'Mar', 'Apr'],
'Expenses': [50, 60, 55, 70]
}).to_excel(writer, sheet_name='Expenses', index=False)
print(f"Created test spreadsheet at {test_file_path}")
# Test the tool
spreadsheet_tool = SpreadsheetTool()
# Parse the spreadsheet
print("\nParsing spreadsheet...")
parsed_data = spreadsheet_tool.parse_spreadsheet(test_file_path)
if parsed_data.get("error"):
print(f"Error: {parsed_data['error']}")
else:
print(f"Successfully parsed {len(parsed_data['sheet_names'])} sheets:")
print(f"Sheet names: {parsed_data['sheet_names']}")
# Show a sample of the first sheet
first_sheet_name = parsed_data['sheet_names'][0]
first_sheet = parsed_data['sheets'][first_sheet_name]
print(f"\nFirst few rows of '{first_sheet_name}':")
print(first_sheet.head())
# Test query
print("\nQuerying data (sum operation)...")
query_result = spreadsheet_tool.query_data(parsed_data, "sum")
print(f"Query result: {query_result}")
# Test specific data extraction
print("\nExtracting specific data...")
extract_result = spreadsheet_tool.extract_specific_data(
parsed_data,
sheet_name='Sales',
column_names=['Product', 'Revenue']
)
print(f"Extracted data: {extract_result}")