Yago Bolivar commited on
Commit
108e7a1
·
1 Parent(s): 2172594

feat: implement SpreadsheetTool for parsing and querying Excel files with detailed summaries

Browse files
src/spreadsheet_tool.py ADDED
@@ -0,0 +1,243 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import pandas as pd
3
+ from typing import Dict, List, Union, Tuple, Any
4
+ import numpy as np
5
+
6
+
7
+ class SpreadsheetTool:
8
+ """Tool for parsing and extracting data from Excel (.xlsx) files."""
9
+
10
+ def __init__(self):
11
+ """Initialize the SpreadsheetTool."""
12
+ pass
13
+
14
+ def parse_spreadsheet(self, file_path: str) -> Dict[str, Any]:
15
+ """
16
+ Parse an Excel spreadsheet and extract useful information.
17
+
18
+ Args:
19
+ file_path: Path to the .xlsx file
20
+
21
+ Returns:
22
+ Dictionary containing:
23
+ - sheets: Dictionary of sheet names and their DataFrames
24
+ - sheet_names: List of sheet names
25
+ - summary: Basic spreadsheet summary
26
+ - error: Error message if any
27
+ """
28
+ if not os.path.exists(file_path):
29
+ return {"error": f"File not found: {file_path}"}
30
+
31
+ try:
32
+ # Read all sheets in the Excel file
33
+ excel_file = pd.ExcelFile(file_path)
34
+ sheet_names = excel_file.sheet_names
35
+ sheets = {}
36
+
37
+ for sheet_name in sheet_names:
38
+ sheets[sheet_name] = pd.read_excel(excel_file, sheet_name=sheet_name)
39
+
40
+ # Create a summary of the spreadsheet
41
+ summary = self._create_summary(sheets)
42
+
43
+ return {
44
+ "sheets": sheets,
45
+ "sheet_names": sheet_names,
46
+ "summary": summary,
47
+ "error": None
48
+ }
49
+ except Exception as e:
50
+ return {"error": f"Error parsing spreadsheet: {str(e)}"}
51
+
52
+ def _create_summary(self, sheets_dict: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
53
+ """Create a summary of the spreadsheet contents."""
54
+ summary = {}
55
+
56
+ for sheet_name, df in sheets_dict.items():
57
+ summary[sheet_name] = {
58
+ "shape": df.shape,
59
+ "columns": df.columns.tolist(),
60
+ "numeric_columns": df.select_dtypes(include=[np.number]).columns.tolist(),
61
+ "text_columns": df.select_dtypes(include=['object']).columns.tolist(),
62
+ "has_nulls": df.isnull().any().any(),
63
+ "first_few_rows": df.head(3).to_dict('records')
64
+ }
65
+
66
+ return summary
67
+
68
+ def query_data(self, data: Dict[str, Any], query_instructions: str) -> Dict[str, Any]:
69
+ """
70
+ Execute a query on the spreadsheet data based on instructions.
71
+
72
+ Args:
73
+ data: The parsed spreadsheet data (from parse_spreadsheet)
74
+ query_instructions: Instructions for querying the data (e.g., "Sum column A")
75
+
76
+ Returns:
77
+ Dictionary with query results and potential explanation
78
+ """
79
+ if data.get("error"):
80
+ return {"error": data["error"]}
81
+
82
+ try:
83
+ # This is where you'd implement more sophisticated query logic
84
+ # For now, we'll implement some basic operations
85
+
86
+ sheets = data["sheets"]
87
+ result = {}
88
+
89
+ # Handle common operations based on query_instructions
90
+ if "sum" in query_instructions.lower():
91
+ # Extract column or range to sum
92
+ # This is a simple implementation - a more robust one would use regex or NLP
93
+ for sheet_name, df in sheets.items():
94
+ numeric_cols = df.select_dtypes(include=[np.number]).columns
95
+ if not numeric_cols.empty:
96
+ result[f"{sheet_name}_sums"] = {
97
+ col: df[col].sum() for col in numeric_cols
98
+ }
99
+
100
+ elif "average" in query_instructions.lower() or "mean" in query_instructions.lower():
101
+ for sheet_name, df in sheets.items():
102
+ numeric_cols = df.select_dtypes(include=[np.number]).columns
103
+ if not numeric_cols.empty:
104
+ result[f"{sheet_name}_averages"] = {
105
+ col: df[col].mean() for col in numeric_cols
106
+ }
107
+
108
+ elif "count" in query_instructions.lower():
109
+ for sheet_name, df in sheets.items():
110
+ result[f"{sheet_name}_counts"] = {
111
+ "rows": len(df),
112
+ "non_null_counts": df.count().to_dict()
113
+ }
114
+
115
+ # Add the raw data structure for more custom processing by the agent
116
+ result["data_structure"] = {
117
+ sheet_name: {
118
+ "columns": df.columns.tolist(),
119
+ "dtypes": df.dtypes.astype(str).to_dict()
120
+ } for sheet_name, df in sheets.items()
121
+ }
122
+
123
+ return result
124
+
125
+ except Exception as e:
126
+ return {"error": f"Error querying data: {str(e)}"}
127
+
128
+ def extract_specific_data(self, data: Dict[str, Any], sheet_name: str = None,
129
+ column_names: List[str] = None,
130
+ row_indices: List[int] = None) -> Dict[str, Any]:
131
+ """
132
+ Extract specific data from the spreadsheet.
133
+
134
+ Args:
135
+ data: The parsed spreadsheet data
136
+ sheet_name: Name of the sheet to extract from (default: first sheet)
137
+ column_names: List of column names to extract (default: all columns)
138
+ row_indices: List of row indices to extract (default: all rows)
139
+
140
+ Returns:
141
+ Dictionary with extracted data
142
+ """
143
+ if data.get("error"):
144
+ return {"error": data["error"]}
145
+
146
+ try:
147
+ sheets = data["sheets"]
148
+
149
+ # Default to the first sheet if not specified
150
+ if sheet_name is None:
151
+ sheet_name = data["sheet_names"][0]
152
+
153
+ if sheet_name not in sheets:
154
+ return {"error": f"Sheet '{sheet_name}' not found"}
155
+
156
+ df = sheets[sheet_name]
157
+
158
+ # Filter columns if specified
159
+ if column_names:
160
+ # Check if all requested columns exist
161
+ missing_columns = [col for col in column_names if col not in df.columns]
162
+ if missing_columns:
163
+ return {"error": f"Columns not found: {missing_columns}"}
164
+ df = df[column_names]
165
+
166
+ # Filter rows if specified
167
+ if row_indices:
168
+ # Check if indices are in range
169
+ max_index = len(df) - 1
170
+ invalid_indices = [i for i in row_indices if i < 0 or i > max_index]
171
+ if invalid_indices:
172
+ return {"error": f"Row indices out of range: {invalid_indices}. Valid range: 0-{max_index}"}
173
+ df = df.iloc[row_indices]
174
+
175
+ return {
176
+ "data": df.to_dict('records'),
177
+ "shape": df.shape
178
+ }
179
+
180
+ except Exception as e:
181
+ return {"error": f"Error extracting specific data: {str(e)}"}
182
+
183
+
184
+ # Example usage (if this script is run directly)
185
+ if __name__ == "__main__":
186
+ # Create a simple test spreadsheet for demonstration
187
+ test_dir = "spreadsheet_test"
188
+ os.makedirs(test_dir, exist_ok=True)
189
+
190
+ # Create a test DataFrame
191
+ test_data = {
192
+ 'Product': ['Apple', 'Orange', 'Banana', 'Mango'],
193
+ 'Price': [1.2, 0.8, 0.5, 1.5],
194
+ 'Quantity': [100, 80, 200, 50],
195
+ 'Revenue': [120, 64, 100, 75]
196
+ }
197
+
198
+ df = pd.DataFrame(test_data)
199
+ test_file_path = os.path.join(test_dir, "test_spreadsheet.xlsx")
200
+
201
+ # Save to Excel
202
+ with pd.ExcelWriter(test_file_path) as writer:
203
+ df.to_excel(writer, sheet_name='Sales', index=False)
204
+ # Create a second sheet with different data
205
+ pd.DataFrame({
206
+ 'Month': ['Jan', 'Feb', 'Mar', 'Apr'],
207
+ 'Expenses': [50, 60, 55, 70]
208
+ }).to_excel(writer, sheet_name='Expenses', index=False)
209
+
210
+ print(f"Created test spreadsheet at {test_file_path}")
211
+
212
+ # Test the tool
213
+ spreadsheet_tool = SpreadsheetTool()
214
+
215
+ # Parse the spreadsheet
216
+ print("\nParsing spreadsheet...")
217
+ parsed_data = spreadsheet_tool.parse_spreadsheet(test_file_path)
218
+
219
+ if parsed_data.get("error"):
220
+ print(f"Error: {parsed_data['error']}")
221
+ else:
222
+ print(f"Successfully parsed {len(parsed_data['sheet_names'])} sheets:")
223
+ print(f"Sheet names: {parsed_data['sheet_names']}")
224
+
225
+ # Show a sample of the first sheet
226
+ first_sheet_name = parsed_data['sheet_names'][0]
227
+ first_sheet = parsed_data['sheets'][first_sheet_name]
228
+ print(f"\nFirst few rows of '{first_sheet_name}':")
229
+ print(first_sheet.head())
230
+
231
+ # Test query
232
+ print("\nQuerying data (sum operation)...")
233
+ query_result = spreadsheet_tool.query_data(parsed_data, "sum")
234
+ print(f"Query result: {query_result}")
235
+
236
+ # Test specific data extraction
237
+ print("\nExtracting specific data...")
238
+ extract_result = spreadsheet_tool.extract_specific_data(
239
+ parsed_data,
240
+ sheet_name='Sales',
241
+ column_names=['Product', 'Revenue']
242
+ )
243
+ print(f"Extracted data: {extract_result}")
tests/conftest.py ADDED
@@ -0,0 +1,9 @@
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Configuration file for pytest.
3
+ This file modifies sys.path to allow imports from the parent directory.
4
+ """
5
+ import sys
6
+ import os
7
+
8
+ # Add the parent directory to sys.path
9
+ sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
tests/test_spreadsheet_tool.py ADDED
@@ -0,0 +1,90 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ # Testing the spreadsheet tool with a downloaded Excel file
3
+
4
+ import os
5
+ import sys
6
+
7
+ # Add the parent directory to sys.path to find the src module
8
+ sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
9
+
10
+ from src.spreadsheet_tool import SpreadsheetTool
11
+
12
+ def main():
13
+ # Initialize the spreadsheet tool
14
+ spreadsheet_tool = SpreadsheetTool()
15
+
16
+ # Path to the downloaded Excel file
17
+ # Need to navigate up one level and then to downloaded_files
18
+ project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
19
+ excel_file_path = os.path.join(project_root, "downloaded_files", "7bd855d8-463d-4ed5-93ca-5fe35145f733.xlsx")
20
+
21
+ print(f"Testing SpreadsheetTool with file: {excel_file_path}")
22
+ print(f"File exists: {os.path.exists(excel_file_path)}")
23
+
24
+ # Parse the spreadsheet
25
+ print("\n--- PARSING SPREADSHEET ---")
26
+ parsed_data = spreadsheet_tool.parse_spreadsheet(excel_file_path)
27
+
28
+ if parsed_data.get("error"):
29
+ print(f"Error: {parsed_data['error']}")
30
+ return
31
+
32
+ # Display basic information about the spreadsheet
33
+ print(f"\nSpreadsheet contains {len(parsed_data['sheet_names'])} sheets:")
34
+ print(f"Sheet names: {parsed_data['sheet_names']}")
35
+
36
+ # Display a summary of each sheet
37
+ print("\n--- SHEET SUMMARIES ---")
38
+ for sheet_name, info in parsed_data["summary"].items():
39
+ print(f"\nSheet: {sheet_name}")
40
+ print(f" Dimensions: {info['shape'][0]} rows × {info['shape'][1]} columns")
41
+ print(f" Column names: {info['columns']}")
42
+ print(f" Numeric columns: {info['numeric_columns']}")
43
+ print(f" Text columns: {info['text_columns']}")
44
+ print(f" Contains null values: {info['has_nulls']}")
45
+
46
+ # Display a sample of the first 3 rows
47
+ print(f"\n Sample data (first 3 rows):")
48
+ for i, row in enumerate(info['first_few_rows']):
49
+ print(f" Row {i+1}: {row}")
50
+
51
+ # Test the query_data method for numeric operations
52
+ print("\n--- TESTING QUERY OPERATIONS ---")
53
+ for query in ["sum", "average", "count"]:
54
+ print(f"\nTesting '{query}' operation:")
55
+ query_result = spreadsheet_tool.query_data(parsed_data, query)
56
+
57
+ if query_result.get("error"):
58
+ print(f" Error: {query_result['error']}")
59
+ else:
60
+ # Remove data_structure from output to keep it cleaner
61
+ if "data_structure" in query_result:
62
+ del query_result["data_structure"]
63
+ print(f" Result: {query_result}")
64
+
65
+ # Test extracting specific data
66
+ print("\n--- TESTING DATA EXTRACTION ---")
67
+ # We'll extract data from the first sheet
68
+ first_sheet = parsed_data["sheet_names"][0]
69
+ all_columns = parsed_data["summary"][first_sheet]["columns"]
70
+
71
+ # Extract first two columns from the first sheet
72
+ if len(all_columns) >= 2:
73
+ extract_columns = all_columns[:2]
74
+ print(f"\nExtracting columns {extract_columns} from sheet '{first_sheet}':")
75
+ extract_result = spreadsheet_tool.extract_specific_data(
76
+ parsed_data,
77
+ sheet_name=first_sheet,
78
+ column_names=extract_columns
79
+ )
80
+
81
+ if extract_result.get("error"):
82
+ print(f" Error: {extract_result['error']}")
83
+ else:
84
+ print(f" Extracted data shape: {extract_result['shape']}")
85
+ print(f" First few rows:")
86
+ for i, row in enumerate(extract_result['data'][:3]):
87
+ print(f" Row {i+1}: {row}")
88
+
89
+ if __name__ == "__main__":
90
+ main()