""" Data processor for processing extracted data. """ import re import os import json from typing import Dict, Any, List, Optional, Tuple, Union import pandas as pd class DataProcessor: """ Class for processing extracted data. """ def __init__(self): """Initialize the data processor.""" pass def process_excel_data(self, data: Dict[str, pd.DataFrame], question: str) -> str: """ Process data extracted from an Excel file. Args: data: Dictionary mapping sheet names to DataFrames question: The question to answer Returns: Answer to the question """ # Convert question to lowercase for easier matching question_lower = question.lower() # Handle specific question types if 'oldest' in question_lower: return self._find_oldest_item(data, question_lower) elif 'count' in question_lower or 'how many' in question_lower: return self._count_items(data, question_lower) elif 'average' in question_lower or 'mean' in question_lower: return self._calculate_average(data, question_lower) elif 'total' in question_lower or 'sum' in question_lower: return self._calculate_total(data, question_lower) elif 'maximum' in question_lower or 'highest' in question_lower: return self._find_maximum(data, question_lower) elif 'minimum' in question_lower or 'lowest' in question_lower: return self._find_minimum(data, question_lower) else: # Try to extract specific information return self._extract_specific_info(data, question_lower) def _find_oldest_item(self, data: Dict[str, pd.DataFrame], question: str) -> str: """Find the oldest item in the data.""" # Look for mentions of specific columns or items year_columns = ['year', 'date', 'time', 'created', 'modified', 'release'] item_type = None # Try to extract the type of item we're looking for item_types = [ 'movie', 'film', 'book', 'song', 'album', 'game', 'video game', 'dvd', 'cd', 'blu-ray', 'blu ray', 'record', 'cassette', 'vhs' ] for item in item_types: if item in question: item_type = item break # Iterate through sheets and find the oldest item oldest_year = float('inf') oldest_item = None for sheet_name, df in data.items(): # Skip empty sheets if df.empty: continue # Try to find year/date columns year_col = None for col in df.columns: if any(year_term in col.lower() for year_term in year_columns): year_col = col break if year_col is None: # If no obvious year column, look for columns with numeric values for col in df.columns: if pd.api.types.is_numeric_dtype(df[col]): try: # Check if values might be years (between 1900 and current year) if df[col].min() >= 1900 and df[col].max() <= 2025: year_col = col break except: continue if year_col is not None: # Find title/name column title_col = None title_columns = ['title', 'name', 'item', 'product', 'description'] for col in df.columns: if any(title_term in col.lower() for title_term in title_columns): title_col = col break if title_col is None and len(df.columns) > 1: # If no obvious title column, use the first non-year column for col in df.columns: if col != year_col: title_col = col break # Filter by item type if specified if item_type: filtered_df = df # Look for a column that might contain item types type_col = None type_columns = ['type', 'category', 'format', 'medium', 'platform'] for col in df.columns: if any(type_term in col.lower() for type_term in type_columns): type_col = col break if type_col: # Filter by item type filtered_df = df[df[type_col].astype(str).str.lower().str.contains(item_type.lower())] else: filtered_df = df if not filtered_df.empty and title_col: try: # Find the row with the minimum year min_year_idx = filtered_df[year_col].astype(float).idxmin() min_year = filtered_df.loc[min_year_idx, year_col] if min_year < oldest_year: oldest_year = min_year oldest_item = filtered_df.loc[min_year_idx, title_col] except: continue if oldest_item: return str(oldest_item) else: return "Could not determine the oldest item from the data." def _count_items(self, data: Dict[str, pd.DataFrame], question: str) -> str: """Count items matching specific criteria.""" # Extract conditions from the question conditions = self._extract_conditions(question) total_count = 0 for sheet_name, df in data.items(): # Skip empty sheets if df.empty: continue # Apply conditions to filter the DataFrame filtered_df = df for condition in conditions: col = condition.get('column') value = condition.get('value') operator = condition.get('operator', '=') if col and value is not None: # Find the best matching column best_col = self._find_best_matching_column(df, col) if best_col: try: if operator == '=': filtered_df = filtered_df[filtered_df[best_col].astype(str).str.lower() == str(value).lower()] elif operator == '>': filtered_df = filtered_df[filtered_df[best_col] > value] elif operator == '<': filtered_df = filtered_df[filtered_df[best_col] < value] elif operator == '>=': filtered_df = filtered_df[filtered_df[best_col] >= value] elif operator == '<=': filtered_df = filtered_df[filtered_df[best_col] <= value] elif operator == 'contains': filtered_df = filtered_df[filtered_df[best_col].astype(str).str.lower().str.contains(str(value).lower())] elif operator == 'between': if isinstance(value, list) and len(value) == 2: filtered_df = filtered_df[(filtered_df[best_col] >= value[0]) & (filtered_df[best_col] <= value[1])] except: continue # Add the count from this sheet total_count += len(filtered_df) return str(total_count) def _calculate_average(self, data: Dict[str, pd.DataFrame], question: str) -> str: """Calculate the average of a column.""" # Extract column name from the question column_name = self._extract_column_name(question) if not column_name: return "Could not determine which column to calculate the average for." for sheet_name, df in data.items(): # Skip empty sheets if df.empty: continue # Find the best matching column best_col = self._find_best_matching_column(df, column_name) if best_col and pd.api.types.is_numeric_dtype(df[best_col]): try: avg_value = df[best_col].mean() return str(avg_value) except: continue return "Could not calculate the average from the data." def _calculate_total(self, data: Dict[str, pd.DataFrame], question: str) -> str: """Calculate the total of a column.""" # Extract column name from the question column_name = self._extract_column_name(question) if not column_name: return "Could not determine which column to calculate the total for." for sheet_name, df in data.items(): # Skip empty sheets if df.empty: continue # Find the best matching column best_col = self._find_best_matching_column(df, column_name) if best_col and pd.api.types.is_numeric_dtype(df[best_col]): try: total_value = df[best_col].sum() return str(total_value) except: continue return "Could not calculate the total from the data." def _find_maximum(self, data: Dict[str, pd.DataFrame], question: str) -> str: """Find the maximum value in a column.""" # Extract column name from the question column_name = self._extract_column_name(question) if not column_name: return "Could not determine which column to find the maximum for." for sheet_name, df in data.items(): # Skip empty sheets if df.empty: continue # Find the best matching column best_col = self._find_best_matching_column(df, column_name) if best_col: try: max_value = df[best_col].max() return str(max_value) except: continue return "Could not find the maximum value from the data." def _find_minimum(self, data: Dict[str, pd.DataFrame], question: str) -> str: """Find the minimum value in a column.""" # Extract column name from the question column_name = self._extract_column_name(question) if not column_name: return "Could not determine which column to find the minimum for." for sheet_name, df in data.items(): # Skip empty sheets if df.empty: continue # Find the best matching column best_col = self._find_best_matching_column(df, column_name) if best_col: try: min_value = df[best_col].min() return str(min_value) except: continue return "Could not find the minimum value from the data." def _extract_specific_info(self, data: Dict[str, pd.DataFrame], question: str) -> str: """Extract specific information from the data.""" # Try to identify what we're looking for looking_for = self._extract_looking_for(question) conditions = self._extract_conditions(question) for sheet_name, df in data.items(): # Skip empty sheets if df.empty: continue # Apply conditions to filter the DataFrame filtered_df = df for condition in conditions: col = condition.get('column') value = condition.get('value') operator = condition.get('operator', '=') if col and value is not None: # Find the best matching column best_col = self._find_best_matching_column(df, col) if best_col: try: if operator == '=': filtered_df = filtered_df[filtered_df[best_col].astype(str).str.lower() == str(value).lower()] elif operator == '>': filtered_df = filtered_df[filtered_df[best_col] > value] elif operator == '<': filtered_df = filtered_df[filtered_df[best_col] < value] elif operator == '>=': filtered_df = filtered_df[filtered_df[best_col] >= value] elif operator == '<=': filtered_df = filtered_df[filtered_df[best_col] <= value] elif operator == 'contains': filtered_df = filtered_df[filtered_df[best_col].astype(str).str.lower().str.contains(str(value).lower())] elif operator == 'between': if isinstance(value, list) and len(value) == 2: filtered_df = filtered_df[(filtered_df[best_col] >= value[0]) & (filtered_df[best_col] <= value[1])] except: continue # If we found matching rows and know what to look for if not filtered_df.empty and looking_for: # Find the best matching column for what we're looking for best_col = self._find_best_matching_column(df, looking_for) if best_col: try: # Return the first value return str(filtered_df.iloc[0][best_col]) except: continue # If we couldn't extract specific information, return a more general response if data: # Return basic info about the first non-empty sheet for sheet_name, df in data.items(): if not df.empty: return f"The sheet contains {len(df)} rows and {len(df.columns)} columns." return "Could not extract the requested information from the data." def _extract_conditions(self, question: str) -> List[Dict[str, Any]]: """Extract conditions from the question.""" conditions = [] # Check for "between" conditions between_pattern = r'(\w+) between (\d+) and (\d+)' for match in re.finditer(between_pattern, question): column = match.group(1) start = int(match.group(2)) end = int(match.group(3)) conditions.append({ 'column': column, 'operator': 'between', 'value': [start, end], }) # Check for comparison conditions comparison_pattern = r'(\w+) (>|<|>=|<=|=|equals|equal to|contains) (\w+)' for match in re.finditer(comparison_pattern, question): column = match.group(1) op = match.group(2) value = match.group(3) # Convert operator text to symbols if op == 'equals' or op == 'equal to': op = '=' elif op == 'contains': op = 'contains' # Try to convert value to number try: value = float(value) except: pass conditions.append({ 'column': column, 'operator': op, 'value': value, }) # Check for simple equality conditions equality_pattern = r'(?:with|where) (\w+) (?:is|=) (\w+)' for match in re.finditer(equality_pattern, question): column = match.group(1) value = match.group(2) # Try to convert value to number try: value = float(value) except: pass conditions.append({ 'column': column, 'operator': '=', 'value': value, }) return conditions def _extract_column_name(self, question: str) -> Optional[str]: """Extract column name from the question.""" # Check for direct mentions of columns column_pattern = r'(?:column|field) (?:named|called) ["\']?(\w+)["\']?' match = re.search(column_pattern, question) if match: return match.group(1) # Look for common column references common_columns = [ 'year', 'date', 'time', 'name', 'title', 'price', 'cost', 'amount', 'quantity', 'total', 'value', 'age', 'rating', 'score', 'grade', 'salary', 'income', 'revenue', 'profit', 'loss', 'height', 'weight', 'length', 'width', 'depth', 'area', 'volume' ] for col in common_columns: if col in question: return col return None def _extract_looking_for(self, question: str) -> Optional[str]: """Extract what we're looking for from the question.""" # Check for direct mentions of what we're looking for looking_for_pattern = r'(?:what is|what are|find|get|return) the (\w+)' match = re.search(looking_for_pattern, question) if match: return match.group(1) # Look for common things we might be looking for common_items = [ 'name', 'title', 'price', 'cost', 'amount', 'quantity', 'total', 'value', 'age', 'rating', 'score', 'grade', 'salary', 'income', 'revenue', 'profit', 'loss', 'height', 'weight', 'length', 'width', 'depth', 'area', 'volume', 'year', 'date', 'time' ] for item in common_items: if item in question: return item return None def _find_best_matching_column(self, df: pd.DataFrame, column_name: str) -> Optional[str]: """Find the best matching column in a DataFrame.""" # Check for exact match if column_name in df.columns: return column_name # Check for case-insensitive match for col in df.columns: if col.lower() == column_name.lower(): return col # Check for partial match for col in df.columns: if column_name.lower() in col.lower(): return col return None def process_csv_data(self, data: pd.DataFrame, question: str) -> str: """ Process data extracted from a CSV file. Args: data: DataFrame containing the CSV data question: The question to answer Returns: Answer to the question """ # Wrap in a dictionary to reuse Excel processing logic return self.process_excel_data({'Sheet1': data}, question) def process_text_data(self, data: str, question: str) -> str: """ Process data extracted from a text file. Args: data: Text content of the file question: The question to answer Returns: Answer to the question """ question_lower = question.lower() # Handle specific question types if 'count' in question_lower or 'how many' in question_lower: # Count occurrences of a word or phrase count_pattern = r'(?:count|how many) (?:occurrences of|instances of|times) ["\']?([^"\']+)["\']?' match = re.search(count_pattern, question_lower) if match: term = match.group(1) count = data.lower().count(term.lower()) return str(count) # Check if the question is asking for a specific line line_pattern = r'(?:what is|what does|what are|show|return) (?:the|on) (?:line|lines) (\d+)(?:\s*(?:to|-)\s*(\d+))?' match = re.search(line_pattern, question_lower) if match: start_line = int(match.group(1)) end_line = int(match.group(2)) if match.group(2) else start_line lines = data.split('\n') if start_line <= len(lines) and end_line <= len(lines): return '\n'.join(lines[start_line-1:end_line]) # Check if the question is asking for a specific paragraph para_pattern = r'(?:what is|what does|what are|show|return) (?:the|in) paragraph (\d+)(?:\s*(?:to|-)\s*(\d+))?' match = re.search(para_pattern, question_lower) if match: start_para = int(match.group(1)) end_para = int(match.group(2)) if match.group(2) else start_para paragraphs = re.split(r'\n\s*\n', data) if start_para <= len(paragraphs) and end_para <= len(paragraphs): return '\n\n'.join(paragraphs[start_para-1:end_para]) # Check for specific information requests info_pattern = r'(?:what|who|where|when|why|how) (?:is|are|was|were|does|do|did) ([^?]+)' match = re.search(info_pattern, question_lower) if match: info = match.group(1).strip() # Look for this information in the text sentences = re.split(r'(?<=[.!?])\s+', data) for sentence in sentences: if info.lower() in sentence.lower(): return sentence.strip() # If nothing specific was found, return a generic summary words = data.split() return f"The text contains {len(words)} words and {len(data.split('. '))} sentences." def process_pdf_data(self, data: Dict[int, str], question: str) -> str: """ Process data extracted from a PDF file. Args: data: Dictionary mapping page numbers to text content question: The question to answer Returns: Answer to the question """ question_lower = question.lower() # Check if the question is asking for a specific page page_pattern = r'(?:what is|what does|what are|show|return) (?:on|in) page (\d+)' match = re.search(page_pattern, question_lower) if match: page_num = int(match.group(1)) if page_num in data: return data[page_num] else: return f"Page {page_num} not found in the PDF." # Check if the question is asking for a specific information across all pages info_pattern = r'(?:what|who|where|when|why|how) (?:is|are|was|were|does|do|did) ([^?]+)' match = re.search(info_pattern, question_lower) if match: info = match.group(1).strip() # Look for this information in all pages for page_num, content in data.items(): sentences = re.split(r'(?<=[.!?])\s+', content) for sentence in sentences: if info.lower() in sentence.lower(): return sentence.strip() # If nothing specific was found, combine all text and return a summary all_text = ' '.join(data.values()) words = all_text.split() return f"The PDF contains {len(data)} pages and approximately {len(words)} words." def process_image_metadata(self, metadata: Dict[str, Any], question: str) -> str: """ Process metadata extracted from an image file. Args: metadata: Dictionary containing image metadata question: The question to answer Returns: Answer to the question """ question_lower = question.lower() # Handle specific question types if 'format' in question_lower or 'type' in question_lower: return metadata.get('format', 'Unknown format') elif 'size' in question_lower or 'resolution' in question_lower: width = metadata.get('width', 0) height = metadata.get('height', 0) return f"{width}x{height}" elif 'width' in question_lower: return str(metadata.get('width', 0)) elif 'height' in question_lower: return str(metadata.get('height', 0)) elif 'mode' in question_lower or 'color' in question_lower: return metadata.get('mode', 'Unknown mode') elif 'exif' in question_lower: exif = metadata.get('exif', {}) if exif: return str(exif) else: return "No EXIF data found." # If nothing specific was found, return basic information return f"Image format: {metadata.get('format', 'Unknown')}, Size: {metadata.get('width', 0)}x{metadata.get('height', 0)}, Mode: {metadata.get('mode', 'Unknown')}" def process_docx_data(self, data: str, question: str) -> str: """ Process data extracted from a Word document. Args: data: Text content of the document question: The question to answer Returns: Answer to the question """ # Similar to text processing return self.process_text_data(data, question) def process_pptx_data(self, data: Dict[int, str], question: str) -> str: """ Process data extracted from a PowerPoint presentation. Args: data: Dictionary mapping slide numbers to text content question: The question to answer Returns: Answer to the question """ question_lower = question.lower() # Check if the question is asking for a specific slide slide_pattern = r'(?:what is|what does|what are|show|return) (?:on|in) slide (\d+)' match = re.search(slide_pattern, question_lower) if match: slide_num = int(match.group(1)) if slide_num in data: return data[slide_num] else: return f"Slide {slide_num} not found in the presentation." # Check if the question is asking for a specific information across all slides info_pattern = r'(?:what|who|where|when|why|how) (?:is|are|was|were|does|do|did) ([^?]+)' match = re.search(info_pattern, question_lower) if match: info = match.group(1).strip() # Look for this information in all slides for slide_num, content in data.items(): if info.lower() in content.lower(): return content.strip() # If nothing specific was found, return a summary return f"The presentation contains {len(data)} slides." def process_json_data(self, data: Dict[str, Any], question: str) -> str: """ Process data extracted from a JSON file. Args: data: Parsed JSON content question: The question to answer Returns: Answer to the question """ question_lower = question.lower() # Check if the question is asking for a specific key key_pattern = r'(?:what is|what are|show|return) (?:the|in) ["\']?(\w+)["\']?' match = re.search(key_pattern, question_lower) if match: key = match.group(1) # Look for this key in the JSON if key in data: return str(data[key]) # Look for nested keys for k, v in data.items(): if isinstance(v, dict) and key in v: return str(v[key]) # If nothing specific was found, return a summary return f"The JSON contains {len(data)} top-level keys: {', '.join(data.keys())}" def process_zip_data(self, data: Dict[str, Any], question: str) -> str: """ Process data extracted from a ZIP archive. Args: data: Dictionary containing information about the archive question: The question to answer Returns: Answer to the question """ question_lower = question.lower() # Handle specific question types if 'how many' in question_lower or 'count' in question_lower: if 'files' in question_lower: return str(len(data.get('files', []))) # Check if the question is asking for a specific file file_pattern = r'(?:does it contain|is there) (?:a file named|a file called) ["\']?([^"\']+)["\']?' match = re.search(file_pattern, question_lower) if match: filename = match.group(1) # Check if the file exists in the archive for file_info in data.get('files', []): if filename.lower() in file_info.get('filename', '').lower(): return f"Yes, the archive contains {file_info['filename']} ({file_info['size']} bytes)" return f"No, the archive does not contain a file named {filename}." # If nothing specific was found, return a summary return f"The ZIP archive contains {len(data.get('files', []))} files." def process_pdb_data(self, data: Dict[str, Any], question: str) -> str: """ Process data extracted from a PDB file. Args: data: Dictionary containing information about the PDB file question: The question to answer Returns: Answer to the question """ question_lower = question.lower() # Handle specific question types if 'title' in question_lower: return data.get('title', 'No title found.') elif 'header' in question_lower: return data.get('header', 'No header found.') elif 'compound' in question_lower or 'compounds' in question_lower: compounds = data.get('compounds', []) if compounds: return '\n'.join(compounds) else: return 'No compounds found.' elif 'author' in question_lower or 'authors' in question_lower: authors = data.get('authors', []) if authors: return '\n'.join(authors) else: return 'No authors found.' elif 'atoms' in question_lower or 'atom count' in question_lower: return str(data.get('atoms_count', 0)) # If nothing specific was found, return a summary return f"PDB file with title: {data.get('title', 'No title')}, containing {data.get('atoms_count', 0)} atoms." def process_python_data(self, data: Dict[str, Any], question: str) -> str: """ Process data extracted from a Python file. Args: data: Dictionary containing information about the Python file question: The question to answer Returns: Answer to the question """ question_lower = question.lower() # Handle specific question types if 'class' in question_lower or 'classes' in question_lower: classes = data.get('classes', []) if classes: class_names = [c['name'] for c in classes] return ', '.join(class_names) else: return 'No classes found in the file.' elif 'function' in question_lower or 'functions' in question_lower: functions = data.get('functions', []) if functions: func_names = [f['name'] for f in functions] return ', '.join(func_names) else: return 'No functions found in the file.' elif 'import' in question_lower or 'imports' in question_lower: imports = data.get('imports', []) if imports: import_strs = [] for imp in imports: if imp.get('from'): import_strs.append(f"from {imp['from']} import {imp['import']}") else: import_strs.append(f"import {imp['import']}") return '\n'.join(import_strs) else: return 'No imports found in the file.' # Check if the question is asking for a specific class or function class_pattern = r'(?:what is|what does) (?:the class|class) ["\']?(\w+)["\']?' match = re.search(class_pattern, question_lower) if match: class_name = match.group(1) # Look for this class in the data for cls in data.get('classes', []): if cls['name'].lower() == class_name.lower(): parent = f", inherits from {cls['parent']}" if cls['parent'] else "" return f"Class {cls['name']}{parent}" func_pattern = r'(?:what is|what does) (?:the function|function) ["\']?(\w+)["\']?' match = re.search(func_pattern, question_lower) if match: func_name = match.group(1) # Look for this function in the data for func in data.get('functions', []): if func['name'].lower() == func_name.lower(): return f"Function {func['name']}({func['params']})" # If nothing specific was found, look for the code of a specific function or class code_pattern = r'(?:show|return) (?:the code for|code of) (?:the )?(?:function|class) ["\']?(\w+)["\']?' match = re.search(code_pattern, question_lower) if match: entity_name = match.group(1) content = data.get('content', '') # Look for the code of this entity lines = content.split('\n') entity_lines = [] in_entity = False indent = 0 for i, line in enumerate(lines): # Check for class or function definition if re.match(rf'(class|def)\s+{re.escape(entity_name)}\s*\(', line): in_entity = True entity_lines.append(line) indent = len(line) - len(line.lstrip()) continue if in_entity: # Check if we're still in the entity based on indentation if line.strip() and len(line) - len(line.lstrip()) <= indent: in_entity = False else: entity_lines.append(line) if entity_lines: return '\n'.join(entity_lines) # If nothing specific was found, return a summary return f"Python file with {len(data.get('classes', []))} classes and {len(data.get('functions', []))} functions." def process_jsonl_data(self, data: List[Dict[str, Any]], question: str) -> str: """ Process data extracted from a JSONL file. Args: data: List of parsed JSON objects question: The question to answer Returns: Answer to the question """ question_lower = question.lower() # Handle specific question types if 'how many' in question_lower or 'count' in question_lower: return str(len(data)) # Check if the question is asking for a specific entry entry_pattern = r'(?:what is|what are|show|return) (?:the|in) entry (\d+)' match = re.search(entry_pattern, question_lower) if match: entry_num = int(match.group(1)) if 0 <= entry_num < len(data): return str(data[entry_num]) else: return f"Entry {entry_num} not found in the data." # Check if the question is asking for entries with a specific key-value pair kv_pattern = r'(?:entries|items) where ["\']?(\w+)["\']? (?:is|=|equals|contains) ["\']?([^"\']+)["\']?' match = re.search(kv_pattern, question_lower) if match: key = match.group(1) value = match.group(2) # Find entries matching the criteria matching_entries = [] for entry in data: if key in entry and str(entry[key]).lower() == value.lower(): matching_entries.append(entry) if matching_entries: return str(matching_entries) else: return f"No entries found where {key} = {value}." # If nothing specific was found, return a summary if data and isinstance(data[0], dict): keys = list(data[0].keys()) return f"The data contains {len(data)} entries with keys: {', '.join(keys)}" else: return f"The data contains {len(data)} entries."