Spaces:
Sleeping
Sleeping
""" | |
Data processor for processing extracted data. | |
""" | |
import re | |
import os | |
import json | |
from typing import Dict, Any, List, Optional, Tuple, Union | |
import pandas as pd | |
class DataProcessor: | |
""" | |
Class for processing extracted data. | |
""" | |
def __init__(self): | |
"""Initialize the data processor.""" | |
pass | |
def process_excel_data(self, data: Dict[str, pd.DataFrame], question: str) -> str: | |
""" | |
Process data extracted from an Excel file. | |
Args: | |
data: Dictionary mapping sheet names to DataFrames | |
question: The question to answer | |
Returns: | |
Answer to the question | |
""" | |
# Convert question to lowercase for easier matching | |
question_lower = question.lower() | |
# Handle specific question types | |
if 'oldest' in question_lower: | |
return self._find_oldest_item(data, question_lower) | |
elif 'count' in question_lower or 'how many' in question_lower: | |
return self._count_items(data, question_lower) | |
elif 'average' in question_lower or 'mean' in question_lower: | |
return self._calculate_average(data, question_lower) | |
elif 'total' in question_lower or 'sum' in question_lower: | |
return self._calculate_total(data, question_lower) | |
elif 'maximum' in question_lower or 'highest' in question_lower: | |
return self._find_maximum(data, question_lower) | |
elif 'minimum' in question_lower or 'lowest' in question_lower: | |
return self._find_minimum(data, question_lower) | |
else: | |
# Try to extract specific information | |
return self._extract_specific_info(data, question_lower) | |
def _find_oldest_item(self, data: Dict[str, pd.DataFrame], question: str) -> str: | |
"""Find the oldest item in the data.""" | |
# Look for mentions of specific columns or items | |
year_columns = ['year', 'date', 'time', 'created', 'modified', 'release'] | |
item_type = None | |
# Try to extract the type of item we're looking for | |
item_types = [ | |
'movie', 'film', 'book', 'song', 'album', 'game', 'video game', | |
'dvd', 'cd', 'blu-ray', 'blu ray', 'record', 'cassette', 'vhs' | |
] | |
for item in item_types: | |
if item in question: | |
item_type = item | |
break | |
# Iterate through sheets and find the oldest item | |
oldest_year = float('inf') | |
oldest_item = None | |
for sheet_name, df in data.items(): | |
# Skip empty sheets | |
if df.empty: | |
continue | |
# Try to find year/date columns | |
year_col = None | |
for col in df.columns: | |
if any(year_term in col.lower() for year_term in year_columns): | |
year_col = col | |
break | |
if year_col is None: | |
# If no obvious year column, look for columns with numeric values | |
for col in df.columns: | |
if pd.api.types.is_numeric_dtype(df[col]): | |
try: | |
# Check if values might be years (between 1900 and current year) | |
if df[col].min() >= 1900 and df[col].max() <= 2025: | |
year_col = col | |
break | |
except: | |
continue | |
if year_col is not None: | |
# Find title/name column | |
title_col = None | |
title_columns = ['title', 'name', 'item', 'product', 'description'] | |
for col in df.columns: | |
if any(title_term in col.lower() for title_term in title_columns): | |
title_col = col | |
break | |
if title_col is None and len(df.columns) > 1: | |
# If no obvious title column, use the first non-year column | |
for col in df.columns: | |
if col != year_col: | |
title_col = col | |
break | |
# Filter by item type if specified | |
if item_type: | |
filtered_df = df | |
# Look for a column that might contain item types | |
type_col = None | |
type_columns = ['type', 'category', 'format', 'medium', 'platform'] | |
for col in df.columns: | |
if any(type_term in col.lower() for type_term in type_columns): | |
type_col = col | |
break | |
if type_col: | |
# Filter by item type | |
filtered_df = df[df[type_col].astype(str).str.lower().str.contains(item_type.lower())] | |
else: | |
filtered_df = df | |
if not filtered_df.empty and title_col: | |
try: | |
# Find the row with the minimum year | |
min_year_idx = filtered_df[year_col].astype(float).idxmin() | |
min_year = filtered_df.loc[min_year_idx, year_col] | |
if min_year < oldest_year: | |
oldest_year = min_year | |
oldest_item = filtered_df.loc[min_year_idx, title_col] | |
except: | |
continue | |
if oldest_item: | |
return str(oldest_item) | |
else: | |
return "Could not determine the oldest item from the data." | |
def _count_items(self, data: Dict[str, pd.DataFrame], question: str) -> str: | |
"""Count items matching specific criteria.""" | |
# Extract conditions from the question | |
conditions = self._extract_conditions(question) | |
total_count = 0 | |
for sheet_name, df in data.items(): | |
# Skip empty sheets | |
if df.empty: | |
continue | |
# Apply conditions to filter the DataFrame | |
filtered_df = df | |
for condition in conditions: | |
col = condition.get('column') | |
value = condition.get('value') | |
operator = condition.get('operator', '=') | |
if col and value is not None: | |
# Find the best matching column | |
best_col = self._find_best_matching_column(df, col) | |
if best_col: | |
try: | |
if operator == '=': | |
filtered_df = filtered_df[filtered_df[best_col].astype(str).str.lower() == str(value).lower()] | |
elif operator == '>': | |
filtered_df = filtered_df[filtered_df[best_col] > value] | |
elif operator == '<': | |
filtered_df = filtered_df[filtered_df[best_col] < value] | |
elif operator == '>=': | |
filtered_df = filtered_df[filtered_df[best_col] >= value] | |
elif operator == '<=': | |
filtered_df = filtered_df[filtered_df[best_col] <= value] | |
elif operator == 'contains': | |
filtered_df = filtered_df[filtered_df[best_col].astype(str).str.lower().str.contains(str(value).lower())] | |
elif operator == 'between': | |
if isinstance(value, list) and len(value) == 2: | |
filtered_df = filtered_df[(filtered_df[best_col] >= value[0]) & (filtered_df[best_col] <= value[1])] | |
except: | |
continue | |
# Add the count from this sheet | |
total_count += len(filtered_df) | |
return str(total_count) | |
def _calculate_average(self, data: Dict[str, pd.DataFrame], question: str) -> str: | |
"""Calculate the average of a column.""" | |
# Extract column name from the question | |
column_name = self._extract_column_name(question) | |
if not column_name: | |
return "Could not determine which column to calculate the average for." | |
for sheet_name, df in data.items(): | |
# Skip empty sheets | |
if df.empty: | |
continue | |
# Find the best matching column | |
best_col = self._find_best_matching_column(df, column_name) | |
if best_col and pd.api.types.is_numeric_dtype(df[best_col]): | |
try: | |
avg_value = df[best_col].mean() | |
return str(avg_value) | |
except: | |
continue | |
return "Could not calculate the average from the data." | |
def _calculate_total(self, data: Dict[str, pd.DataFrame], question: str) -> str: | |
"""Calculate the total of a column.""" | |
# Extract column name from the question | |
column_name = self._extract_column_name(question) | |
if not column_name: | |
return "Could not determine which column to calculate the total for." | |
for sheet_name, df in data.items(): | |
# Skip empty sheets | |
if df.empty: | |
continue | |
# Find the best matching column | |
best_col = self._find_best_matching_column(df, column_name) | |
if best_col and pd.api.types.is_numeric_dtype(df[best_col]): | |
try: | |
total_value = df[best_col].sum() | |
return str(total_value) | |
except: | |
continue | |
return "Could not calculate the total from the data." | |
def _find_maximum(self, data: Dict[str, pd.DataFrame], question: str) -> str: | |
"""Find the maximum value in a column.""" | |
# Extract column name from the question | |
column_name = self._extract_column_name(question) | |
if not column_name: | |
return "Could not determine which column to find the maximum for." | |
for sheet_name, df in data.items(): | |
# Skip empty sheets | |
if df.empty: | |
continue | |
# Find the best matching column | |
best_col = self._find_best_matching_column(df, column_name) | |
if best_col: | |
try: | |
max_value = df[best_col].max() | |
return str(max_value) | |
except: | |
continue | |
return "Could not find the maximum value from the data." | |
def _find_minimum(self, data: Dict[str, pd.DataFrame], question: str) -> str: | |
"""Find the minimum value in a column.""" | |
# Extract column name from the question | |
column_name = self._extract_column_name(question) | |
if not column_name: | |
return "Could not determine which column to find the minimum for." | |
for sheet_name, df in data.items(): | |
# Skip empty sheets | |
if df.empty: | |
continue | |
# Find the best matching column | |
best_col = self._find_best_matching_column(df, column_name) | |
if best_col: | |
try: | |
min_value = df[best_col].min() | |
return str(min_value) | |
except: | |
continue | |
return "Could not find the minimum value from the data." | |
def _extract_specific_info(self, data: Dict[str, pd.DataFrame], question: str) -> str: | |
"""Extract specific information from the data.""" | |
# Try to identify what we're looking for | |
looking_for = self._extract_looking_for(question) | |
conditions = self._extract_conditions(question) | |
for sheet_name, df in data.items(): | |
# Skip empty sheets | |
if df.empty: | |
continue | |
# Apply conditions to filter the DataFrame | |
filtered_df = df | |
for condition in conditions: | |
col = condition.get('column') | |
value = condition.get('value') | |
operator = condition.get('operator', '=') | |
if col and value is not None: | |
# Find the best matching column | |
best_col = self._find_best_matching_column(df, col) | |
if best_col: | |
try: | |
if operator == '=': | |
filtered_df = filtered_df[filtered_df[best_col].astype(str).str.lower() == str(value).lower()] | |
elif operator == '>': | |
filtered_df = filtered_df[filtered_df[best_col] > value] | |
elif operator == '<': | |
filtered_df = filtered_df[filtered_df[best_col] < value] | |
elif operator == '>=': | |
filtered_df = filtered_df[filtered_df[best_col] >= value] | |
elif operator == '<=': | |
filtered_df = filtered_df[filtered_df[best_col] <= value] | |
elif operator == 'contains': | |
filtered_df = filtered_df[filtered_df[best_col].astype(str).str.lower().str.contains(str(value).lower())] | |
elif operator == 'between': | |
if isinstance(value, list) and len(value) == 2: | |
filtered_df = filtered_df[(filtered_df[best_col] >= value[0]) & (filtered_df[best_col] <= value[1])] | |
except: | |
continue | |
# If we found matching rows and know what to look for | |
if not filtered_df.empty and looking_for: | |
# Find the best matching column for what we're looking for | |
best_col = self._find_best_matching_column(df, looking_for) | |
if best_col: | |
try: | |
# Return the first value | |
return str(filtered_df.iloc[0][best_col]) | |
except: | |
continue | |
# If we couldn't extract specific information, return a more general response | |
if data: | |
# Return basic info about the first non-empty sheet | |
for sheet_name, df in data.items(): | |
if not df.empty: | |
return f"The sheet contains {len(df)} rows and {len(df.columns)} columns." | |
return "Could not extract the requested information from the data." | |
def _extract_conditions(self, question: str) -> List[Dict[str, Any]]: | |
"""Extract conditions from the question.""" | |
conditions = [] | |
# Check for "between" conditions | |
between_pattern = r'(\w+) between (\d+) and (\d+)' | |
for match in re.finditer(between_pattern, question): | |
column = match.group(1) | |
start = int(match.group(2)) | |
end = int(match.group(3)) | |
conditions.append({ | |
'column': column, | |
'operator': 'between', | |
'value': [start, end], | |
}) | |
# Check for comparison conditions | |
comparison_pattern = r'(\w+) (>|<|>=|<=|=|equals|equal to|contains) (\w+)' | |
for match in re.finditer(comparison_pattern, question): | |
column = match.group(1) | |
op = match.group(2) | |
value = match.group(3) | |
# Convert operator text to symbols | |
if op == 'equals' or op == 'equal to': | |
op = '=' | |
elif op == 'contains': | |
op = 'contains' | |
# Try to convert value to number | |
try: | |
value = float(value) | |
except: | |
pass | |
conditions.append({ | |
'column': column, | |
'operator': op, | |
'value': value, | |
}) | |
# Check for simple equality conditions | |
equality_pattern = r'(?:with|where) (\w+) (?:is|=) (\w+)' | |
for match in re.finditer(equality_pattern, question): | |
column = match.group(1) | |
value = match.group(2) | |
# Try to convert value to number | |
try: | |
value = float(value) | |
except: | |
pass | |
conditions.append({ | |
'column': column, | |
'operator': '=', | |
'value': value, | |
}) | |
return conditions | |
def _extract_column_name(self, question: str) -> Optional[str]: | |
"""Extract column name from the question.""" | |
# Check for direct mentions of columns | |
column_pattern = r'(?:column|field) (?:named|called) ["\']?(\w+)["\']?' | |
match = re.search(column_pattern, question) | |
if match: | |
return match.group(1) | |
# Look for common column references | |
common_columns = [ | |
'year', 'date', 'time', 'name', 'title', 'price', 'cost', | |
'amount', 'quantity', 'total', 'value', 'age', 'rating', | |
'score', 'grade', 'salary', 'income', 'revenue', 'profit', | |
'loss', 'height', 'weight', 'length', 'width', 'depth', | |
'area', 'volume' | |
] | |
for col in common_columns: | |
if col in question: | |
return col | |
return None | |
def _extract_looking_for(self, question: str) -> Optional[str]: | |
"""Extract what we're looking for from the question.""" | |
# Check for direct mentions of what we're looking for | |
looking_for_pattern = r'(?:what is|what are|find|get|return) the (\w+)' | |
match = re.search(looking_for_pattern, question) | |
if match: | |
return match.group(1) | |
# Look for common things we might be looking for | |
common_items = [ | |
'name', 'title', 'price', 'cost', 'amount', 'quantity', | |
'total', 'value', 'age', 'rating', 'score', 'grade', | |
'salary', 'income', 'revenue', 'profit', 'loss', | |
'height', 'weight', 'length', 'width', 'depth', | |
'area', 'volume', 'year', 'date', 'time' | |
] | |
for item in common_items: | |
if item in question: | |
return item | |
return None | |
def _find_best_matching_column(self, df: pd.DataFrame, column_name: str) -> Optional[str]: | |
"""Find the best matching column in a DataFrame.""" | |
# Check for exact match | |
if column_name in df.columns: | |
return column_name | |
# Check for case-insensitive match | |
for col in df.columns: | |
if col.lower() == column_name.lower(): | |
return col | |
# Check for partial match | |
for col in df.columns: | |
if column_name.lower() in col.lower(): | |
return col | |
return None | |
def process_csv_data(self, data: pd.DataFrame, question: str) -> str: | |
""" | |
Process data extracted from a CSV file. | |
Args: | |
data: DataFrame containing the CSV data | |
question: The question to answer | |
Returns: | |
Answer to the question | |
""" | |
# Wrap in a dictionary to reuse Excel processing logic | |
return self.process_excel_data({'Sheet1': data}, question) | |
def process_text_data(self, data: str, question: str) -> str: | |
""" | |
Process data extracted from a text file. | |
Args: | |
data: Text content of the file | |
question: The question to answer | |
Returns: | |
Answer to the question | |
""" | |
question_lower = question.lower() | |
# Handle specific question types | |
if 'count' in question_lower or 'how many' in question_lower: | |
# Count occurrences of a word or phrase | |
count_pattern = r'(?:count|how many) (?:occurrences of|instances of|times) ["\']?([^"\']+)["\']?' | |
match = re.search(count_pattern, question_lower) | |
if match: | |
term = match.group(1) | |
count = data.lower().count(term.lower()) | |
return str(count) | |
# Check if the question is asking for a specific line | |
line_pattern = r'(?:what is|what does|what are|show|return) (?:the|on) (?:line|lines) (\d+)(?:\s*(?:to|-)\s*(\d+))?' | |
match = re.search(line_pattern, question_lower) | |
if match: | |
start_line = int(match.group(1)) | |
end_line = int(match.group(2)) if match.group(2) else start_line | |
lines = data.split('\n') | |
if start_line <= len(lines) and end_line <= len(lines): | |
return '\n'.join(lines[start_line-1:end_line]) | |
# Check if the question is asking for a specific paragraph | |
para_pattern = r'(?:what is|what does|what are|show|return) (?:the|in) paragraph (\d+)(?:\s*(?:to|-)\s*(\d+))?' | |
match = re.search(para_pattern, question_lower) | |
if match: | |
start_para = int(match.group(1)) | |
end_para = int(match.group(2)) if match.group(2) else start_para | |
paragraphs = re.split(r'\n\s*\n', data) | |
if start_para <= len(paragraphs) and end_para <= len(paragraphs): | |
return '\n\n'.join(paragraphs[start_para-1:end_para]) | |
# Check for specific information requests | |
info_pattern = r'(?:what|who|where|when|why|how) (?:is|are|was|were|does|do|did) ([^?]+)' | |
match = re.search(info_pattern, question_lower) | |
if match: | |
info = match.group(1).strip() | |
# Look for this information in the text | |
sentences = re.split(r'(?<=[.!?])\s+', data) | |
for sentence in sentences: | |
if info.lower() in sentence.lower(): | |
return sentence.strip() | |
# If nothing specific was found, return a generic summary | |
words = data.split() | |
return f"The text contains {len(words)} words and {len(data.split('. '))} sentences." | |
def process_pdf_data(self, data: Dict[int, str], question: str) -> str: | |
""" | |
Process data extracted from a PDF file. | |
Args: | |
data: Dictionary mapping page numbers to text content | |
question: The question to answer | |
Returns: | |
Answer to the question | |
""" | |
question_lower = question.lower() | |
# Check if the question is asking for a specific page | |
page_pattern = r'(?:what is|what does|what are|show|return) (?:on|in) page (\d+)' | |
match = re.search(page_pattern, question_lower) | |
if match: | |
page_num = int(match.group(1)) | |
if page_num in data: | |
return data[page_num] | |
else: | |
return f"Page {page_num} not found in the PDF." | |
# Check if the question is asking for a specific information across all pages | |
info_pattern = r'(?:what|who|where|when|why|how) (?:is|are|was|were|does|do|did) ([^?]+)' | |
match = re.search(info_pattern, question_lower) | |
if match: | |
info = match.group(1).strip() | |
# Look for this information in all pages | |
for page_num, content in data.items(): | |
sentences = re.split(r'(?<=[.!?])\s+', content) | |
for sentence in sentences: | |
if info.lower() in sentence.lower(): | |
return sentence.strip() | |
# If nothing specific was found, combine all text and return a summary | |
all_text = ' '.join(data.values()) | |
words = all_text.split() | |
return f"The PDF contains {len(data)} pages and approximately {len(words)} words." | |
def process_image_metadata(self, metadata: Dict[str, Any], question: str) -> str: | |
""" | |
Process metadata extracted from an image file. | |
Args: | |
metadata: Dictionary containing image metadata | |
question: The question to answer | |
Returns: | |
Answer to the question | |
""" | |
question_lower = question.lower() | |
# Handle specific question types | |
if 'format' in question_lower or 'type' in question_lower: | |
return metadata.get('format', 'Unknown format') | |
elif 'size' in question_lower or 'resolution' in question_lower: | |
width = metadata.get('width', 0) | |
height = metadata.get('height', 0) | |
return f"{width}x{height}" | |
elif 'width' in question_lower: | |
return str(metadata.get('width', 0)) | |
elif 'height' in question_lower: | |
return str(metadata.get('height', 0)) | |
elif 'mode' in question_lower or 'color' in question_lower: | |
return metadata.get('mode', 'Unknown mode') | |
elif 'exif' in question_lower: | |
exif = metadata.get('exif', {}) | |
if exif: | |
return str(exif) | |
else: | |
return "No EXIF data found." | |
# If nothing specific was found, return basic information | |
return f"Image format: {metadata.get('format', 'Unknown')}, Size: {metadata.get('width', 0)}x{metadata.get('height', 0)}, Mode: {metadata.get('mode', 'Unknown')}" | |
def process_docx_data(self, data: str, question: str) -> str: | |
""" | |
Process data extracted from a Word document. | |
Args: | |
data: Text content of the document | |
question: The question to answer | |
Returns: | |
Answer to the question | |
""" | |
# Similar to text processing | |
return self.process_text_data(data, question) | |
def process_pptx_data(self, data: Dict[int, str], question: str) -> str: | |
""" | |
Process data extracted from a PowerPoint presentation. | |
Args: | |
data: Dictionary mapping slide numbers to text content | |
question: The question to answer | |
Returns: | |
Answer to the question | |
""" | |
question_lower = question.lower() | |
# Check if the question is asking for a specific slide | |
slide_pattern = r'(?:what is|what does|what are|show|return) (?:on|in) slide (\d+)' | |
match = re.search(slide_pattern, question_lower) | |
if match: | |
slide_num = int(match.group(1)) | |
if slide_num in data: | |
return data[slide_num] | |
else: | |
return f"Slide {slide_num} not found in the presentation." | |
# Check if the question is asking for a specific information across all slides | |
info_pattern = r'(?:what|who|where|when|why|how) (?:is|are|was|were|does|do|did) ([^?]+)' | |
match = re.search(info_pattern, question_lower) | |
if match: | |
info = match.group(1).strip() | |
# Look for this information in all slides | |
for slide_num, content in data.items(): | |
if info.lower() in content.lower(): | |
return content.strip() | |
# If nothing specific was found, return a summary | |
return f"The presentation contains {len(data)} slides." | |
def process_json_data(self, data: Dict[str, Any], question: str) -> str: | |
""" | |
Process data extracted from a JSON file. | |
Args: | |
data: Parsed JSON content | |
question: The question to answer | |
Returns: | |
Answer to the question | |
""" | |
question_lower = question.lower() | |
# Check if the question is asking for a specific key | |
key_pattern = r'(?:what is|what are|show|return) (?:the|in) ["\']?(\w+)["\']?' | |
match = re.search(key_pattern, question_lower) | |
if match: | |
key = match.group(1) | |
# Look for this key in the JSON | |
if key in data: | |
return str(data[key]) | |
# Look for nested keys | |
for k, v in data.items(): | |
if isinstance(v, dict) and key in v: | |
return str(v[key]) | |
# If nothing specific was found, return a summary | |
return f"The JSON contains {len(data)} top-level keys: {', '.join(data.keys())}" | |
def process_zip_data(self, data: Dict[str, Any], question: str) -> str: | |
""" | |
Process data extracted from a ZIP archive. | |
Args: | |
data: Dictionary containing information about the archive | |
question: The question to answer | |
Returns: | |
Answer to the question | |
""" | |
question_lower = question.lower() | |
# Handle specific question types | |
if 'how many' in question_lower or 'count' in question_lower: | |
if 'files' in question_lower: | |
return str(len(data.get('files', []))) | |
# Check if the question is asking for a specific file | |
file_pattern = r'(?:does it contain|is there) (?:a file named|a file called) ["\']?([^"\']+)["\']?' | |
match = re.search(file_pattern, question_lower) | |
if match: | |
filename = match.group(1) | |
# Check if the file exists in the archive | |
for file_info in data.get('files', []): | |
if filename.lower() in file_info.get('filename', '').lower(): | |
return f"Yes, the archive contains {file_info['filename']} ({file_info['size']} bytes)" | |
return f"No, the archive does not contain a file named {filename}." | |
# If nothing specific was found, return a summary | |
return f"The ZIP archive contains {len(data.get('files', []))} files." | |
def process_pdb_data(self, data: Dict[str, Any], question: str) -> str: | |
""" | |
Process data extracted from a PDB file. | |
Args: | |
data: Dictionary containing information about the PDB file | |
question: The question to answer | |
Returns: | |
Answer to the question | |
""" | |
question_lower = question.lower() | |
# Handle specific question types | |
if 'title' in question_lower: | |
return data.get('title', 'No title found.') | |
elif 'header' in question_lower: | |
return data.get('header', 'No header found.') | |
elif 'compound' in question_lower or 'compounds' in question_lower: | |
compounds = data.get('compounds', []) | |
if compounds: | |
return '\n'.join(compounds) | |
else: | |
return 'No compounds found.' | |
elif 'author' in question_lower or 'authors' in question_lower: | |
authors = data.get('authors', []) | |
if authors: | |
return '\n'.join(authors) | |
else: | |
return 'No authors found.' | |
elif 'atoms' in question_lower or 'atom count' in question_lower: | |
return str(data.get('atoms_count', 0)) | |
# If nothing specific was found, return a summary | |
return f"PDB file with title: {data.get('title', 'No title')}, containing {data.get('atoms_count', 0)} atoms." | |
def process_python_data(self, data: Dict[str, Any], question: str) -> str: | |
""" | |
Process data extracted from a Python file. | |
Args: | |
data: Dictionary containing information about the Python file | |
question: The question to answer | |
Returns: | |
Answer to the question | |
""" | |
question_lower = question.lower() | |
# Handle specific question types | |
if 'class' in question_lower or 'classes' in question_lower: | |
classes = data.get('classes', []) | |
if classes: | |
class_names = [c['name'] for c in classes] | |
return ', '.join(class_names) | |
else: | |
return 'No classes found in the file.' | |
elif 'function' in question_lower or 'functions' in question_lower: | |
functions = data.get('functions', []) | |
if functions: | |
func_names = [f['name'] for f in functions] | |
return ', '.join(func_names) | |
else: | |
return 'No functions found in the file.' | |
elif 'import' in question_lower or 'imports' in question_lower: | |
imports = data.get('imports', []) | |
if imports: | |
import_strs = [] | |
for imp in imports: | |
if imp.get('from'): | |
import_strs.append(f"from {imp['from']} import {imp['import']}") | |
else: | |
import_strs.append(f"import {imp['import']}") | |
return '\n'.join(import_strs) | |
else: | |
return 'No imports found in the file.' | |
# Check if the question is asking for a specific class or function | |
class_pattern = r'(?:what is|what does) (?:the class|class) ["\']?(\w+)["\']?' | |
match = re.search(class_pattern, question_lower) | |
if match: | |
class_name = match.group(1) | |
# Look for this class in the data | |
for cls in data.get('classes', []): | |
if cls['name'].lower() == class_name.lower(): | |
parent = f", inherits from {cls['parent']}" if cls['parent'] else "" | |
return f"Class {cls['name']}{parent}" | |
func_pattern = r'(?:what is|what does) (?:the function|function) ["\']?(\w+)["\']?' | |
match = re.search(func_pattern, question_lower) | |
if match: | |
func_name = match.group(1) | |
# Look for this function in the data | |
for func in data.get('functions', []): | |
if func['name'].lower() == func_name.lower(): | |
return f"Function {func['name']}({func['params']})" | |
# If nothing specific was found, look for the code of a specific function or class | |
code_pattern = r'(?:show|return) (?:the code for|code of) (?:the )?(?:function|class) ["\']?(\w+)["\']?' | |
match = re.search(code_pattern, question_lower) | |
if match: | |
entity_name = match.group(1) | |
content = data.get('content', '') | |
# Look for the code of this entity | |
lines = content.split('\n') | |
entity_lines = [] | |
in_entity = False | |
indent = 0 | |
for i, line in enumerate(lines): | |
# Check for class or function definition | |
if re.match(rf'(class|def)\s+{re.escape(entity_name)}\s*\(', line): | |
in_entity = True | |
entity_lines.append(line) | |
indent = len(line) - len(line.lstrip()) | |
continue | |
if in_entity: | |
# Check if we're still in the entity based on indentation | |
if line.strip() and len(line) - len(line.lstrip()) <= indent: | |
in_entity = False | |
else: | |
entity_lines.append(line) | |
if entity_lines: | |
return '\n'.join(entity_lines) | |
# If nothing specific was found, return a summary | |
return f"Python file with {len(data.get('classes', []))} classes and {len(data.get('functions', []))} functions." | |
def process_jsonl_data(self, data: List[Dict[str, Any]], question: str) -> str: | |
""" | |
Process data extracted from a JSONL file. | |
Args: | |
data: List of parsed JSON objects | |
question: The question to answer | |
Returns: | |
Answer to the question | |
""" | |
question_lower = question.lower() | |
# Handle specific question types | |
if 'how many' in question_lower or 'count' in question_lower: | |
return str(len(data)) | |
# Check if the question is asking for a specific entry | |
entry_pattern = r'(?:what is|what are|show|return) (?:the|in) entry (\d+)' | |
match = re.search(entry_pattern, question_lower) | |
if match: | |
entry_num = int(match.group(1)) | |
if 0 <= entry_num < len(data): | |
return str(data[entry_num]) | |
else: | |
return f"Entry {entry_num} not found in the data." | |
# Check if the question is asking for entries with a specific key-value pair | |
kv_pattern = r'(?:entries|items) where ["\']?(\w+)["\']? (?:is|=|equals|contains) ["\']?([^"\']+)["\']?' | |
match = re.search(kv_pattern, question_lower) | |
if match: | |
key = match.group(1) | |
value = match.group(2) | |
# Find entries matching the criteria | |
matching_entries = [] | |
for entry in data: | |
if key in entry and str(entry[key]).lower() == value.lower(): | |
matching_entries.append(entry) | |
if matching_entries: | |
return str(matching_entries) | |
else: | |
return f"No entries found where {key} = {value}." | |
# If nothing specific was found, return a summary | |
if data and isinstance(data[0], dict): | |
keys = list(data[0].keys()) | |
return f"The data contains {len(data)} entries with keys: {', '.join(keys)}" | |
else: | |
return f"The data contains {len(data)} entries." | |