Yago Bolivar
feat: implement SpreadsheetTool for parsing and querying Excel files with detailed summaries
108e7a1
import os | |
import pandas as pd | |
from typing import Dict, List, Union, Tuple, Any | |
import numpy as np | |
class SpreadsheetTool: | |
"""Tool for parsing and extracting data from Excel (.xlsx) files.""" | |
def __init__(self): | |
"""Initialize the SpreadsheetTool.""" | |
pass | |
def parse_spreadsheet(self, file_path: str) -> Dict[str, Any]: | |
""" | |
Parse an Excel spreadsheet and extract useful information. | |
Args: | |
file_path: Path to the .xlsx file | |
Returns: | |
Dictionary containing: | |
- sheets: Dictionary of sheet names and their DataFrames | |
- sheet_names: List of sheet names | |
- summary: Basic spreadsheet summary | |
- error: Error message if any | |
""" | |
if not os.path.exists(file_path): | |
return {"error": f"File not found: {file_path}"} | |
try: | |
# Read all sheets in the Excel file | |
excel_file = pd.ExcelFile(file_path) | |
sheet_names = excel_file.sheet_names | |
sheets = {} | |
for sheet_name in sheet_names: | |
sheets[sheet_name] = pd.read_excel(excel_file, sheet_name=sheet_name) | |
# Create a summary of the spreadsheet | |
summary = self._create_summary(sheets) | |
return { | |
"sheets": sheets, | |
"sheet_names": sheet_names, | |
"summary": summary, | |
"error": None | |
} | |
except Exception as e: | |
return {"error": f"Error parsing spreadsheet: {str(e)}"} | |
def _create_summary(self, sheets_dict: Dict[str, pd.DataFrame]) -> Dict[str, Any]: | |
"""Create a summary of the spreadsheet contents.""" | |
summary = {} | |
for sheet_name, df in sheets_dict.items(): | |
summary[sheet_name] = { | |
"shape": df.shape, | |
"columns": df.columns.tolist(), | |
"numeric_columns": df.select_dtypes(include=[np.number]).columns.tolist(), | |
"text_columns": df.select_dtypes(include=['object']).columns.tolist(), | |
"has_nulls": df.isnull().any().any(), | |
"first_few_rows": df.head(3).to_dict('records') | |
} | |
return summary | |
def query_data(self, data: Dict[str, Any], query_instructions: str) -> Dict[str, Any]: | |
""" | |
Execute a query on the spreadsheet data based on instructions. | |
Args: | |
data: The parsed spreadsheet data (from parse_spreadsheet) | |
query_instructions: Instructions for querying the data (e.g., "Sum column A") | |
Returns: | |
Dictionary with query results and potential explanation | |
""" | |
if data.get("error"): | |
return {"error": data["error"]} | |
try: | |
# This is where you'd implement more sophisticated query logic | |
# For now, we'll implement some basic operations | |
sheets = data["sheets"] | |
result = {} | |
# Handle common operations based on query_instructions | |
if "sum" in query_instructions.lower(): | |
# Extract column or range to sum | |
# This is a simple implementation - a more robust one would use regex or NLP | |
for sheet_name, df in sheets.items(): | |
numeric_cols = df.select_dtypes(include=[np.number]).columns | |
if not numeric_cols.empty: | |
result[f"{sheet_name}_sums"] = { | |
col: df[col].sum() for col in numeric_cols | |
} | |
elif "average" in query_instructions.lower() or "mean" in query_instructions.lower(): | |
for sheet_name, df in sheets.items(): | |
numeric_cols = df.select_dtypes(include=[np.number]).columns | |
if not numeric_cols.empty: | |
result[f"{sheet_name}_averages"] = { | |
col: df[col].mean() for col in numeric_cols | |
} | |
elif "count" in query_instructions.lower(): | |
for sheet_name, df in sheets.items(): | |
result[f"{sheet_name}_counts"] = { | |
"rows": len(df), | |
"non_null_counts": df.count().to_dict() | |
} | |
# Add the raw data structure for more custom processing by the agent | |
result["data_structure"] = { | |
sheet_name: { | |
"columns": df.columns.tolist(), | |
"dtypes": df.dtypes.astype(str).to_dict() | |
} for sheet_name, df in sheets.items() | |
} | |
return result | |
except Exception as e: | |
return {"error": f"Error querying data: {str(e)}"} | |
def extract_specific_data(self, data: Dict[str, Any], sheet_name: str = None, | |
column_names: List[str] = None, | |
row_indices: List[int] = None) -> Dict[str, Any]: | |
""" | |
Extract specific data from the spreadsheet. | |
Args: | |
data: The parsed spreadsheet data | |
sheet_name: Name of the sheet to extract from (default: first sheet) | |
column_names: List of column names to extract (default: all columns) | |
row_indices: List of row indices to extract (default: all rows) | |
Returns: | |
Dictionary with extracted data | |
""" | |
if data.get("error"): | |
return {"error": data["error"]} | |
try: | |
sheets = data["sheets"] | |
# Default to the first sheet if not specified | |
if sheet_name is None: | |
sheet_name = data["sheet_names"][0] | |
if sheet_name not in sheets: | |
return {"error": f"Sheet '{sheet_name}' not found"} | |
df = sheets[sheet_name] | |
# Filter columns if specified | |
if column_names: | |
# Check if all requested columns exist | |
missing_columns = [col for col in column_names if col not in df.columns] | |
if missing_columns: | |
return {"error": f"Columns not found: {missing_columns}"} | |
df = df[column_names] | |
# Filter rows if specified | |
if row_indices: | |
# Check if indices are in range | |
max_index = len(df) - 1 | |
invalid_indices = [i for i in row_indices if i < 0 or i > max_index] | |
if invalid_indices: | |
return {"error": f"Row indices out of range: {invalid_indices}. Valid range: 0-{max_index}"} | |
df = df.iloc[row_indices] | |
return { | |
"data": df.to_dict('records'), | |
"shape": df.shape | |
} | |
except Exception as e: | |
return {"error": f"Error extracting specific data: {str(e)}"} | |
# Example usage (if this script is run directly) | |
if __name__ == "__main__": | |
# Create a simple test spreadsheet for demonstration | |
test_dir = "spreadsheet_test" | |
os.makedirs(test_dir, exist_ok=True) | |
# Create a test DataFrame | |
test_data = { | |
'Product': ['Apple', 'Orange', 'Banana', 'Mango'], | |
'Price': [1.2, 0.8, 0.5, 1.5], | |
'Quantity': [100, 80, 200, 50], | |
'Revenue': [120, 64, 100, 75] | |
} | |
df = pd.DataFrame(test_data) | |
test_file_path = os.path.join(test_dir, "test_spreadsheet.xlsx") | |
# Save to Excel | |
with pd.ExcelWriter(test_file_path) as writer: | |
df.to_excel(writer, sheet_name='Sales', index=False) | |
# Create a second sheet with different data | |
pd.DataFrame({ | |
'Month': ['Jan', 'Feb', 'Mar', 'Apr'], | |
'Expenses': [50, 60, 55, 70] | |
}).to_excel(writer, sheet_name='Expenses', index=False) | |
print(f"Created test spreadsheet at {test_file_path}") | |
# Test the tool | |
spreadsheet_tool = SpreadsheetTool() | |
# Parse the spreadsheet | |
print("\nParsing spreadsheet...") | |
parsed_data = spreadsheet_tool.parse_spreadsheet(test_file_path) | |
if parsed_data.get("error"): | |
print(f"Error: {parsed_data['error']}") | |
else: | |
print(f"Successfully parsed {len(parsed_data['sheet_names'])} sheets:") | |
print(f"Sheet names: {parsed_data['sheet_names']}") | |
# Show a sample of the first sheet | |
first_sheet_name = parsed_data['sheet_names'][0] | |
first_sheet = parsed_data['sheets'][first_sheet_name] | |
print(f"\nFirst few rows of '{first_sheet_name}':") | |
print(first_sheet.head()) | |
# Test query | |
print("\nQuerying data (sum operation)...") | |
query_result = spreadsheet_tool.query_data(parsed_data, "sum") | |
print(f"Query result: {query_result}") | |
# Test specific data extraction | |
print("\nExtracting specific data...") | |
extract_result = spreadsheet_tool.extract_specific_data( | |
parsed_data, | |
sheet_name='Sales', | |
column_names=['Product', 'Revenue'] | |
) | |
print(f"Extracted data: {extract_result}") |