ai_agents_final / agent /utils /data_processor.py
Arbnor Tefiki
Test the agent in HF
5d9aa5e
raw
history blame
38.9 kB
"""
Data processor for processing extracted data.
"""
import re
import os
import json
from typing import Dict, Any, List, Optional, Tuple, Union
import pandas as pd
class DataProcessor:
"""
Class for processing extracted data.
"""
def __init__(self):
"""Initialize the data processor."""
pass
def process_excel_data(self, data: Dict[str, pd.DataFrame], question: str) -> str:
"""
Process data extracted from an Excel file.
Args:
data: Dictionary mapping sheet names to DataFrames
question: The question to answer
Returns:
Answer to the question
"""
# Convert question to lowercase for easier matching
question_lower = question.lower()
# Handle specific question types
if 'oldest' in question_lower:
return self._find_oldest_item(data, question_lower)
elif 'count' in question_lower or 'how many' in question_lower:
return self._count_items(data, question_lower)
elif 'average' in question_lower or 'mean' in question_lower:
return self._calculate_average(data, question_lower)
elif 'total' in question_lower or 'sum' in question_lower:
return self._calculate_total(data, question_lower)
elif 'maximum' in question_lower or 'highest' in question_lower:
return self._find_maximum(data, question_lower)
elif 'minimum' in question_lower or 'lowest' in question_lower:
return self._find_minimum(data, question_lower)
else:
# Try to extract specific information
return self._extract_specific_info(data, question_lower)
def _find_oldest_item(self, data: Dict[str, pd.DataFrame], question: str) -> str:
"""Find the oldest item in the data."""
# Look for mentions of specific columns or items
year_columns = ['year', 'date', 'time', 'created', 'modified', 'release']
item_type = None
# Try to extract the type of item we're looking for
item_types = [
'movie', 'film', 'book', 'song', 'album', 'game', 'video game',
'dvd', 'cd', 'blu-ray', 'blu ray', 'record', 'cassette', 'vhs'
]
for item in item_types:
if item in question:
item_type = item
break
# Iterate through sheets and find the oldest item
oldest_year = float('inf')
oldest_item = None
for sheet_name, df in data.items():
# Skip empty sheets
if df.empty:
continue
# Try to find year/date columns
year_col = None
for col in df.columns:
if any(year_term in col.lower() for year_term in year_columns):
year_col = col
break
if year_col is None:
# If no obvious year column, look for columns with numeric values
for col in df.columns:
if pd.api.types.is_numeric_dtype(df[col]):
try:
# Check if values might be years (between 1900 and current year)
if df[col].min() >= 1900 and df[col].max() <= 2025:
year_col = col
break
except:
continue
if year_col is not None:
# Find title/name column
title_col = None
title_columns = ['title', 'name', 'item', 'product', 'description']
for col in df.columns:
if any(title_term in col.lower() for title_term in title_columns):
title_col = col
break
if title_col is None and len(df.columns) > 1:
# If no obvious title column, use the first non-year column
for col in df.columns:
if col != year_col:
title_col = col
break
# Filter by item type if specified
if item_type:
filtered_df = df
# Look for a column that might contain item types
type_col = None
type_columns = ['type', 'category', 'format', 'medium', 'platform']
for col in df.columns:
if any(type_term in col.lower() for type_term in type_columns):
type_col = col
break
if type_col:
# Filter by item type
filtered_df = df[df[type_col].astype(str).str.lower().str.contains(item_type.lower())]
else:
filtered_df = df
if not filtered_df.empty and title_col:
try:
# Find the row with the minimum year
min_year_idx = filtered_df[year_col].astype(float).idxmin()
min_year = filtered_df.loc[min_year_idx, year_col]
if min_year < oldest_year:
oldest_year = min_year
oldest_item = filtered_df.loc[min_year_idx, title_col]
except:
continue
if oldest_item:
return str(oldest_item)
else:
return "Could not determine the oldest item from the data."
def _count_items(self, data: Dict[str, pd.DataFrame], question: str) -> str:
"""Count items matching specific criteria."""
# Extract conditions from the question
conditions = self._extract_conditions(question)
total_count = 0
for sheet_name, df in data.items():
# Skip empty sheets
if df.empty:
continue
# Apply conditions to filter the DataFrame
filtered_df = df
for condition in conditions:
col = condition.get('column')
value = condition.get('value')
operator = condition.get('operator', '=')
if col and value is not None:
# Find the best matching column
best_col = self._find_best_matching_column(df, col)
if best_col:
try:
if operator == '=':
filtered_df = filtered_df[filtered_df[best_col].astype(str).str.lower() == str(value).lower()]
elif operator == '>':
filtered_df = filtered_df[filtered_df[best_col] > value]
elif operator == '<':
filtered_df = filtered_df[filtered_df[best_col] < value]
elif operator == '>=':
filtered_df = filtered_df[filtered_df[best_col] >= value]
elif operator == '<=':
filtered_df = filtered_df[filtered_df[best_col] <= value]
elif operator == 'contains':
filtered_df = filtered_df[filtered_df[best_col].astype(str).str.lower().str.contains(str(value).lower())]
elif operator == 'between':
if isinstance(value, list) and len(value) == 2:
filtered_df = filtered_df[(filtered_df[best_col] >= value[0]) & (filtered_df[best_col] <= value[1])]
except:
continue
# Add the count from this sheet
total_count += len(filtered_df)
return str(total_count)
def _calculate_average(self, data: Dict[str, pd.DataFrame], question: str) -> str:
"""Calculate the average of a column."""
# Extract column name from the question
column_name = self._extract_column_name(question)
if not column_name:
return "Could not determine which column to calculate the average for."
for sheet_name, df in data.items():
# Skip empty sheets
if df.empty:
continue
# Find the best matching column
best_col = self._find_best_matching_column(df, column_name)
if best_col and pd.api.types.is_numeric_dtype(df[best_col]):
try:
avg_value = df[best_col].mean()
return str(avg_value)
except:
continue
return "Could not calculate the average from the data."
def _calculate_total(self, data: Dict[str, pd.DataFrame], question: str) -> str:
"""Calculate the total of a column."""
# Extract column name from the question
column_name = self._extract_column_name(question)
if not column_name:
return "Could not determine which column to calculate the total for."
for sheet_name, df in data.items():
# Skip empty sheets
if df.empty:
continue
# Find the best matching column
best_col = self._find_best_matching_column(df, column_name)
if best_col and pd.api.types.is_numeric_dtype(df[best_col]):
try:
total_value = df[best_col].sum()
return str(total_value)
except:
continue
return "Could not calculate the total from the data."
def _find_maximum(self, data: Dict[str, pd.DataFrame], question: str) -> str:
"""Find the maximum value in a column."""
# Extract column name from the question
column_name = self._extract_column_name(question)
if not column_name:
return "Could not determine which column to find the maximum for."
for sheet_name, df in data.items():
# Skip empty sheets
if df.empty:
continue
# Find the best matching column
best_col = self._find_best_matching_column(df, column_name)
if best_col:
try:
max_value = df[best_col].max()
return str(max_value)
except:
continue
return "Could not find the maximum value from the data."
def _find_minimum(self, data: Dict[str, pd.DataFrame], question: str) -> str:
"""Find the minimum value in a column."""
# Extract column name from the question
column_name = self._extract_column_name(question)
if not column_name:
return "Could not determine which column to find the minimum for."
for sheet_name, df in data.items():
# Skip empty sheets
if df.empty:
continue
# Find the best matching column
best_col = self._find_best_matching_column(df, column_name)
if best_col:
try:
min_value = df[best_col].min()
return str(min_value)
except:
continue
return "Could not find the minimum value from the data."
def _extract_specific_info(self, data: Dict[str, pd.DataFrame], question: str) -> str:
"""Extract specific information from the data."""
# Try to identify what we're looking for
looking_for = self._extract_looking_for(question)
conditions = self._extract_conditions(question)
for sheet_name, df in data.items():
# Skip empty sheets
if df.empty:
continue
# Apply conditions to filter the DataFrame
filtered_df = df
for condition in conditions:
col = condition.get('column')
value = condition.get('value')
operator = condition.get('operator', '=')
if col and value is not None:
# Find the best matching column
best_col = self._find_best_matching_column(df, col)
if best_col:
try:
if operator == '=':
filtered_df = filtered_df[filtered_df[best_col].astype(str).str.lower() == str(value).lower()]
elif operator == '>':
filtered_df = filtered_df[filtered_df[best_col] > value]
elif operator == '<':
filtered_df = filtered_df[filtered_df[best_col] < value]
elif operator == '>=':
filtered_df = filtered_df[filtered_df[best_col] >= value]
elif operator == '<=':
filtered_df = filtered_df[filtered_df[best_col] <= value]
elif operator == 'contains':
filtered_df = filtered_df[filtered_df[best_col].astype(str).str.lower().str.contains(str(value).lower())]
elif operator == 'between':
if isinstance(value, list) and len(value) == 2:
filtered_df = filtered_df[(filtered_df[best_col] >= value[0]) & (filtered_df[best_col] <= value[1])]
except:
continue
# If we found matching rows and know what to look for
if not filtered_df.empty and looking_for:
# Find the best matching column for what we're looking for
best_col = self._find_best_matching_column(df, looking_for)
if best_col:
try:
# Return the first value
return str(filtered_df.iloc[0][best_col])
except:
continue
# If we couldn't extract specific information, return a more general response
if data:
# Return basic info about the first non-empty sheet
for sheet_name, df in data.items():
if not df.empty:
return f"The sheet contains {len(df)} rows and {len(df.columns)} columns."
return "Could not extract the requested information from the data."
def _extract_conditions(self, question: str) -> List[Dict[str, Any]]:
"""Extract conditions from the question."""
conditions = []
# Check for "between" conditions
between_pattern = r'(\w+) between (\d+) and (\d+)'
for match in re.finditer(between_pattern, question):
column = match.group(1)
start = int(match.group(2))
end = int(match.group(3))
conditions.append({
'column': column,
'operator': 'between',
'value': [start, end],
})
# Check for comparison conditions
comparison_pattern = r'(\w+) (>|<|>=|<=|=|equals|equal to|contains) (\w+)'
for match in re.finditer(comparison_pattern, question):
column = match.group(1)
op = match.group(2)
value = match.group(3)
# Convert operator text to symbols
if op == 'equals' or op == 'equal to':
op = '='
elif op == 'contains':
op = 'contains'
# Try to convert value to number
try:
value = float(value)
except:
pass
conditions.append({
'column': column,
'operator': op,
'value': value,
})
# Check for simple equality conditions
equality_pattern = r'(?:with|where) (\w+) (?:is|=) (\w+)'
for match in re.finditer(equality_pattern, question):
column = match.group(1)
value = match.group(2)
# Try to convert value to number
try:
value = float(value)
except:
pass
conditions.append({
'column': column,
'operator': '=',
'value': value,
})
return conditions
def _extract_column_name(self, question: str) -> Optional[str]:
"""Extract column name from the question."""
# Check for direct mentions of columns
column_pattern = r'(?:column|field) (?:named|called) ["\']?(\w+)["\']?'
match = re.search(column_pattern, question)
if match:
return match.group(1)
# Look for common column references
common_columns = [
'year', 'date', 'time', 'name', 'title', 'price', 'cost',
'amount', 'quantity', 'total', 'value', 'age', 'rating',
'score', 'grade', 'salary', 'income', 'revenue', 'profit',
'loss', 'height', 'weight', 'length', 'width', 'depth',
'area', 'volume'
]
for col in common_columns:
if col in question:
return col
return None
def _extract_looking_for(self, question: str) -> Optional[str]:
"""Extract what we're looking for from the question."""
# Check for direct mentions of what we're looking for
looking_for_pattern = r'(?:what is|what are|find|get|return) the (\w+)'
match = re.search(looking_for_pattern, question)
if match:
return match.group(1)
# Look for common things we might be looking for
common_items = [
'name', 'title', 'price', 'cost', 'amount', 'quantity',
'total', 'value', 'age', 'rating', 'score', 'grade',
'salary', 'income', 'revenue', 'profit', 'loss',
'height', 'weight', 'length', 'width', 'depth',
'area', 'volume', 'year', 'date', 'time'
]
for item in common_items:
if item in question:
return item
return None
def _find_best_matching_column(self, df: pd.DataFrame, column_name: str) -> Optional[str]:
"""Find the best matching column in a DataFrame."""
# Check for exact match
if column_name in df.columns:
return column_name
# Check for case-insensitive match
for col in df.columns:
if col.lower() == column_name.lower():
return col
# Check for partial match
for col in df.columns:
if column_name.lower() in col.lower():
return col
return None
def process_csv_data(self, data: pd.DataFrame, question: str) -> str:
"""
Process data extracted from a CSV file.
Args:
data: DataFrame containing the CSV data
question: The question to answer
Returns:
Answer to the question
"""
# Wrap in a dictionary to reuse Excel processing logic
return self.process_excel_data({'Sheet1': data}, question)
def process_text_data(self, data: str, question: str) -> str:
"""
Process data extracted from a text file.
Args:
data: Text content of the file
question: The question to answer
Returns:
Answer to the question
"""
question_lower = question.lower()
# Handle specific question types
if 'count' in question_lower or 'how many' in question_lower:
# Count occurrences of a word or phrase
count_pattern = r'(?:count|how many) (?:occurrences of|instances of|times) ["\']?([^"\']+)["\']?'
match = re.search(count_pattern, question_lower)
if match:
term = match.group(1)
count = data.lower().count(term.lower())
return str(count)
# Check if the question is asking for a specific line
line_pattern = r'(?:what is|what does|what are|show|return) (?:the|on) (?:line|lines) (\d+)(?:\s*(?:to|-)\s*(\d+))?'
match = re.search(line_pattern, question_lower)
if match:
start_line = int(match.group(1))
end_line = int(match.group(2)) if match.group(2) else start_line
lines = data.split('\n')
if start_line <= len(lines) and end_line <= len(lines):
return '\n'.join(lines[start_line-1:end_line])
# Check if the question is asking for a specific paragraph
para_pattern = r'(?:what is|what does|what are|show|return) (?:the|in) paragraph (\d+)(?:\s*(?:to|-)\s*(\d+))?'
match = re.search(para_pattern, question_lower)
if match:
start_para = int(match.group(1))
end_para = int(match.group(2)) if match.group(2) else start_para
paragraphs = re.split(r'\n\s*\n', data)
if start_para <= len(paragraphs) and end_para <= len(paragraphs):
return '\n\n'.join(paragraphs[start_para-1:end_para])
# Check for specific information requests
info_pattern = r'(?:what|who|where|when|why|how) (?:is|are|was|were|does|do|did) ([^?]+)'
match = re.search(info_pattern, question_lower)
if match:
info = match.group(1).strip()
# Look for this information in the text
sentences = re.split(r'(?<=[.!?])\s+', data)
for sentence in sentences:
if info.lower() in sentence.lower():
return sentence.strip()
# If nothing specific was found, return a generic summary
words = data.split()
return f"The text contains {len(words)} words and {len(data.split('. '))} sentences."
def process_pdf_data(self, data: Dict[int, str], question: str) -> str:
"""
Process data extracted from a PDF file.
Args:
data: Dictionary mapping page numbers to text content
question: The question to answer
Returns:
Answer to the question
"""
question_lower = question.lower()
# Check if the question is asking for a specific page
page_pattern = r'(?:what is|what does|what are|show|return) (?:on|in) page (\d+)'
match = re.search(page_pattern, question_lower)
if match:
page_num = int(match.group(1))
if page_num in data:
return data[page_num]
else:
return f"Page {page_num} not found in the PDF."
# Check if the question is asking for a specific information across all pages
info_pattern = r'(?:what|who|where|when|why|how) (?:is|are|was|were|does|do|did) ([^?]+)'
match = re.search(info_pattern, question_lower)
if match:
info = match.group(1).strip()
# Look for this information in all pages
for page_num, content in data.items():
sentences = re.split(r'(?<=[.!?])\s+', content)
for sentence in sentences:
if info.lower() in sentence.lower():
return sentence.strip()
# If nothing specific was found, combine all text and return a summary
all_text = ' '.join(data.values())
words = all_text.split()
return f"The PDF contains {len(data)} pages and approximately {len(words)} words."
def process_image_metadata(self, metadata: Dict[str, Any], question: str) -> str:
"""
Process metadata extracted from an image file.
Args:
metadata: Dictionary containing image metadata
question: The question to answer
Returns:
Answer to the question
"""
question_lower = question.lower()
# Handle specific question types
if 'format' in question_lower or 'type' in question_lower:
return metadata.get('format', 'Unknown format')
elif 'size' in question_lower or 'resolution' in question_lower:
width = metadata.get('width', 0)
height = metadata.get('height', 0)
return f"{width}x{height}"
elif 'width' in question_lower:
return str(metadata.get('width', 0))
elif 'height' in question_lower:
return str(metadata.get('height', 0))
elif 'mode' in question_lower or 'color' in question_lower:
return metadata.get('mode', 'Unknown mode')
elif 'exif' in question_lower:
exif = metadata.get('exif', {})
if exif:
return str(exif)
else:
return "No EXIF data found."
# If nothing specific was found, return basic information
return f"Image format: {metadata.get('format', 'Unknown')}, Size: {metadata.get('width', 0)}x{metadata.get('height', 0)}, Mode: {metadata.get('mode', 'Unknown')}"
def process_docx_data(self, data: str, question: str) -> str:
"""
Process data extracted from a Word document.
Args:
data: Text content of the document
question: The question to answer
Returns:
Answer to the question
"""
# Similar to text processing
return self.process_text_data(data, question)
def process_pptx_data(self, data: Dict[int, str], question: str) -> str:
"""
Process data extracted from a PowerPoint presentation.
Args:
data: Dictionary mapping slide numbers to text content
question: The question to answer
Returns:
Answer to the question
"""
question_lower = question.lower()
# Check if the question is asking for a specific slide
slide_pattern = r'(?:what is|what does|what are|show|return) (?:on|in) slide (\d+)'
match = re.search(slide_pattern, question_lower)
if match:
slide_num = int(match.group(1))
if slide_num in data:
return data[slide_num]
else:
return f"Slide {slide_num} not found in the presentation."
# Check if the question is asking for a specific information across all slides
info_pattern = r'(?:what|who|where|when|why|how) (?:is|are|was|were|does|do|did) ([^?]+)'
match = re.search(info_pattern, question_lower)
if match:
info = match.group(1).strip()
# Look for this information in all slides
for slide_num, content in data.items():
if info.lower() in content.lower():
return content.strip()
# If nothing specific was found, return a summary
return f"The presentation contains {len(data)} slides."
def process_json_data(self, data: Dict[str, Any], question: str) -> str:
"""
Process data extracted from a JSON file.
Args:
data: Parsed JSON content
question: The question to answer
Returns:
Answer to the question
"""
question_lower = question.lower()
# Check if the question is asking for a specific key
key_pattern = r'(?:what is|what are|show|return) (?:the|in) ["\']?(\w+)["\']?'
match = re.search(key_pattern, question_lower)
if match:
key = match.group(1)
# Look for this key in the JSON
if key in data:
return str(data[key])
# Look for nested keys
for k, v in data.items():
if isinstance(v, dict) and key in v:
return str(v[key])
# If nothing specific was found, return a summary
return f"The JSON contains {len(data)} top-level keys: {', '.join(data.keys())}"
def process_zip_data(self, data: Dict[str, Any], question: str) -> str:
"""
Process data extracted from a ZIP archive.
Args:
data: Dictionary containing information about the archive
question: The question to answer
Returns:
Answer to the question
"""
question_lower = question.lower()
# Handle specific question types
if 'how many' in question_lower or 'count' in question_lower:
if 'files' in question_lower:
return str(len(data.get('files', [])))
# Check if the question is asking for a specific file
file_pattern = r'(?:does it contain|is there) (?:a file named|a file called) ["\']?([^"\']+)["\']?'
match = re.search(file_pattern, question_lower)
if match:
filename = match.group(1)
# Check if the file exists in the archive
for file_info in data.get('files', []):
if filename.lower() in file_info.get('filename', '').lower():
return f"Yes, the archive contains {file_info['filename']} ({file_info['size']} bytes)"
return f"No, the archive does not contain a file named {filename}."
# If nothing specific was found, return a summary
return f"The ZIP archive contains {len(data.get('files', []))} files."
def process_pdb_data(self, data: Dict[str, Any], question: str) -> str:
"""
Process data extracted from a PDB file.
Args:
data: Dictionary containing information about the PDB file
question: The question to answer
Returns:
Answer to the question
"""
question_lower = question.lower()
# Handle specific question types
if 'title' in question_lower:
return data.get('title', 'No title found.')
elif 'header' in question_lower:
return data.get('header', 'No header found.')
elif 'compound' in question_lower or 'compounds' in question_lower:
compounds = data.get('compounds', [])
if compounds:
return '\n'.join(compounds)
else:
return 'No compounds found.'
elif 'author' in question_lower or 'authors' in question_lower:
authors = data.get('authors', [])
if authors:
return '\n'.join(authors)
else:
return 'No authors found.'
elif 'atoms' in question_lower or 'atom count' in question_lower:
return str(data.get('atoms_count', 0))
# If nothing specific was found, return a summary
return f"PDB file with title: {data.get('title', 'No title')}, containing {data.get('atoms_count', 0)} atoms."
def process_python_data(self, data: Dict[str, Any], question: str) -> str:
"""
Process data extracted from a Python file.
Args:
data: Dictionary containing information about the Python file
question: The question to answer
Returns:
Answer to the question
"""
question_lower = question.lower()
# Handle specific question types
if 'class' in question_lower or 'classes' in question_lower:
classes = data.get('classes', [])
if classes:
class_names = [c['name'] for c in classes]
return ', '.join(class_names)
else:
return 'No classes found in the file.'
elif 'function' in question_lower or 'functions' in question_lower:
functions = data.get('functions', [])
if functions:
func_names = [f['name'] for f in functions]
return ', '.join(func_names)
else:
return 'No functions found in the file.'
elif 'import' in question_lower or 'imports' in question_lower:
imports = data.get('imports', [])
if imports:
import_strs = []
for imp in imports:
if imp.get('from'):
import_strs.append(f"from {imp['from']} import {imp['import']}")
else:
import_strs.append(f"import {imp['import']}")
return '\n'.join(import_strs)
else:
return 'No imports found in the file.'
# Check if the question is asking for a specific class or function
class_pattern = r'(?:what is|what does) (?:the class|class) ["\']?(\w+)["\']?'
match = re.search(class_pattern, question_lower)
if match:
class_name = match.group(1)
# Look for this class in the data
for cls in data.get('classes', []):
if cls['name'].lower() == class_name.lower():
parent = f", inherits from {cls['parent']}" if cls['parent'] else ""
return f"Class {cls['name']}{parent}"
func_pattern = r'(?:what is|what does) (?:the function|function) ["\']?(\w+)["\']?'
match = re.search(func_pattern, question_lower)
if match:
func_name = match.group(1)
# Look for this function in the data
for func in data.get('functions', []):
if func['name'].lower() == func_name.lower():
return f"Function {func['name']}({func['params']})"
# If nothing specific was found, look for the code of a specific function or class
code_pattern = r'(?:show|return) (?:the code for|code of) (?:the )?(?:function|class) ["\']?(\w+)["\']?'
match = re.search(code_pattern, question_lower)
if match:
entity_name = match.group(1)
content = data.get('content', '')
# Look for the code of this entity
lines = content.split('\n')
entity_lines = []
in_entity = False
indent = 0
for i, line in enumerate(lines):
# Check for class or function definition
if re.match(rf'(class|def)\s+{re.escape(entity_name)}\s*\(', line):
in_entity = True
entity_lines.append(line)
indent = len(line) - len(line.lstrip())
continue
if in_entity:
# Check if we're still in the entity based on indentation
if line.strip() and len(line) - len(line.lstrip()) <= indent:
in_entity = False
else:
entity_lines.append(line)
if entity_lines:
return '\n'.join(entity_lines)
# If nothing specific was found, return a summary
return f"Python file with {len(data.get('classes', []))} classes and {len(data.get('functions', []))} functions."
def process_jsonl_data(self, data: List[Dict[str, Any]], question: str) -> str:
"""
Process data extracted from a JSONL file.
Args:
data: List of parsed JSON objects
question: The question to answer
Returns:
Answer to the question
"""
question_lower = question.lower()
# Handle specific question types
if 'how many' in question_lower or 'count' in question_lower:
return str(len(data))
# Check if the question is asking for a specific entry
entry_pattern = r'(?:what is|what are|show|return) (?:the|in) entry (\d+)'
match = re.search(entry_pattern, question_lower)
if match:
entry_num = int(match.group(1))
if 0 <= entry_num < len(data):
return str(data[entry_num])
else:
return f"Entry {entry_num} not found in the data."
# Check if the question is asking for entries with a specific key-value pair
kv_pattern = r'(?:entries|items) where ["\']?(\w+)["\']? (?:is|=|equals|contains) ["\']?([^"\']+)["\']?'
match = re.search(kv_pattern, question_lower)
if match:
key = match.group(1)
value = match.group(2)
# Find entries matching the criteria
matching_entries = []
for entry in data:
if key in entry and str(entry[key]).lower() == value.lower():
matching_entries.append(entry)
if matching_entries:
return str(matching_entries)
else:
return f"No entries found where {key} = {value}."
# If nothing specific was found, return a summary
if data and isinstance(data[0], dict):
keys = list(data[0].keys())
return f"The data contains {len(data)} entries with keys: {', '.join(keys)}"
else:
return f"The data contains {len(data)} entries."