File size: 9,419 Bytes
108e7a1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
import os
import pandas as pd
from typing import Dict, List, Union, Tuple, Any
import numpy as np


class SpreadsheetTool:
    """Tool for parsing and extracting data from Excel (.xlsx) files."""
    
    def __init__(self):
        """Initialize the SpreadsheetTool."""
        pass
    
    def parse_spreadsheet(self, file_path: str) -> Dict[str, Any]:
        """
        Parse an Excel spreadsheet and extract useful information.
        
        Args:
            file_path: Path to the .xlsx file
            
        Returns:
            Dictionary containing:
                - sheets: Dictionary of sheet names and their DataFrames
                - sheet_names: List of sheet names
                - summary: Basic spreadsheet summary
                - error: Error message if any
        """
        if not os.path.exists(file_path):
            return {"error": f"File not found: {file_path}"}
        
        try:
            # Read all sheets in the Excel file
            excel_file = pd.ExcelFile(file_path)
            sheet_names = excel_file.sheet_names
            sheets = {}
            
            for sheet_name in sheet_names:
                sheets[sheet_name] = pd.read_excel(excel_file, sheet_name=sheet_name)
                
            # Create a summary of the spreadsheet
            summary = self._create_summary(sheets)
            
            return {
                "sheets": sheets,
                "sheet_names": sheet_names,
                "summary": summary,
                "error": None
            }
        except Exception as e:
            return {"error": f"Error parsing spreadsheet: {str(e)}"}
    
    def _create_summary(self, sheets_dict: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
        """Create a summary of the spreadsheet contents."""
        summary = {}
        
        for sheet_name, df in sheets_dict.items():
            summary[sheet_name] = {
                "shape": df.shape,
                "columns": df.columns.tolist(),
                "numeric_columns": df.select_dtypes(include=[np.number]).columns.tolist(),
                "text_columns": df.select_dtypes(include=['object']).columns.tolist(),
                "has_nulls": df.isnull().any().any(),
                "first_few_rows": df.head(3).to_dict('records')
            }
        
        return summary
    
    def query_data(self, data: Dict[str, Any], query_instructions: str) -> Dict[str, Any]:
        """
        Execute a query on the spreadsheet data based on instructions.
        
        Args:
            data: The parsed spreadsheet data (from parse_spreadsheet)
            query_instructions: Instructions for querying the data (e.g., "Sum column A")
            
        Returns:
            Dictionary with query results and potential explanation
        """
        if data.get("error"):
            return {"error": data["error"]}
        
        try:
            # This is where you'd implement more sophisticated query logic
            # For now, we'll implement some basic operations
            
            sheets = data["sheets"]
            result = {}
            
            # Handle common operations based on query_instructions
            if "sum" in query_instructions.lower():
                # Extract column or range to sum
                # This is a simple implementation - a more robust one would use regex or NLP
                for sheet_name, df in sheets.items():
                    numeric_cols = df.select_dtypes(include=[np.number]).columns
                    if not numeric_cols.empty:
                        result[f"{sheet_name}_sums"] = {
                            col: df[col].sum() for col in numeric_cols
                        }
            
            elif "average" in query_instructions.lower() or "mean" in query_instructions.lower():
                for sheet_name, df in sheets.items():
                    numeric_cols = df.select_dtypes(include=[np.number]).columns
                    if not numeric_cols.empty:
                        result[f"{sheet_name}_averages"] = {
                            col: df[col].mean() for col in numeric_cols
                        }
            
            elif "count" in query_instructions.lower():
                for sheet_name, df in sheets.items():
                    result[f"{sheet_name}_counts"] = {
                        "rows": len(df),
                        "non_null_counts": df.count().to_dict()
                    }
            
            # Add the raw data structure for more custom processing by the agent
            result["data_structure"] = {
                sheet_name: {
                    "columns": df.columns.tolist(),
                    "dtypes": df.dtypes.astype(str).to_dict()
                } for sheet_name, df in sheets.items()
            }
            
            return result
            
        except Exception as e:
            return {"error": f"Error querying data: {str(e)}"}
    
    def extract_specific_data(self, data: Dict[str, Any], sheet_name: str = None, 
                             column_names: List[str] = None, 
                             row_indices: List[int] = None) -> Dict[str, Any]:
        """
        Extract specific data from the spreadsheet.
        
        Args:
            data: The parsed spreadsheet data
            sheet_name: Name of the sheet to extract from (default: first sheet)
            column_names: List of column names to extract (default: all columns)
            row_indices: List of row indices to extract (default: all rows)
            
        Returns:
            Dictionary with extracted data
        """
        if data.get("error"):
            return {"error": data["error"]}
        
        try:
            sheets = data["sheets"]
            
            # Default to the first sheet if not specified
            if sheet_name is None:
                sheet_name = data["sheet_names"][0]
            
            if sheet_name not in sheets:
                return {"error": f"Sheet '{sheet_name}' not found"}
            
            df = sheets[sheet_name]
            
            # Filter columns if specified
            if column_names:
                # Check if all requested columns exist
                missing_columns = [col for col in column_names if col not in df.columns]
                if missing_columns:
                    return {"error": f"Columns not found: {missing_columns}"}
                df = df[column_names]
            
            # Filter rows if specified
            if row_indices:
                # Check if indices are in range
                max_index = len(df) - 1
                invalid_indices = [i for i in row_indices if i < 0 or i > max_index]
                if invalid_indices:
                    return {"error": f"Row indices out of range: {invalid_indices}. Valid range: 0-{max_index}"}
                df = df.iloc[row_indices]
            
            return {
                "data": df.to_dict('records'),
                "shape": df.shape
            }
            
        except Exception as e:
            return {"error": f"Error extracting specific data: {str(e)}"}


# Example usage (if this script is run directly)
if __name__ == "__main__":
    # Create a simple test spreadsheet for demonstration
    test_dir = "spreadsheet_test"
    os.makedirs(test_dir, exist_ok=True)
    
    # Create a test DataFrame
    test_data = {
        'Product': ['Apple', 'Orange', 'Banana', 'Mango'],
        'Price': [1.2, 0.8, 0.5, 1.5],
        'Quantity': [100, 80, 200, 50],
        'Revenue': [120, 64, 100, 75]
    }
    
    df = pd.DataFrame(test_data)
    test_file_path = os.path.join(test_dir, "test_spreadsheet.xlsx")
    
    # Save to Excel
    with pd.ExcelWriter(test_file_path) as writer:
        df.to_excel(writer, sheet_name='Sales', index=False)
        # Create a second sheet with different data
        pd.DataFrame({
            'Month': ['Jan', 'Feb', 'Mar', 'Apr'],
            'Expenses': [50, 60, 55, 70]
        }).to_excel(writer, sheet_name='Expenses', index=False)
    
    print(f"Created test spreadsheet at {test_file_path}")
    
    # Test the tool
    spreadsheet_tool = SpreadsheetTool()
    
    # Parse the spreadsheet
    print("\nParsing spreadsheet...")
    parsed_data = spreadsheet_tool.parse_spreadsheet(test_file_path)
    
    if parsed_data.get("error"):
        print(f"Error: {parsed_data['error']}")
    else:
        print(f"Successfully parsed {len(parsed_data['sheet_names'])} sheets:")
        print(f"Sheet names: {parsed_data['sheet_names']}")
        
        # Show a sample of the first sheet
        first_sheet_name = parsed_data['sheet_names'][0]
        first_sheet = parsed_data['sheets'][first_sheet_name]
        print(f"\nFirst few rows of '{first_sheet_name}':")
        print(first_sheet.head())
        
        # Test query
        print("\nQuerying data (sum operation)...")
        query_result = spreadsheet_tool.query_data(parsed_data, "sum")
        print(f"Query result: {query_result}")
        
        # Test specific data extraction
        print("\nExtracting specific data...")
        extract_result = spreadsheet_tool.extract_specific_data(
            parsed_data, 
            sheet_name='Sales',
            column_names=['Product', 'Revenue']
        )
        print(f"Extracted data: {extract_result}")