Spaces:
Sleeping
Sleeping
File size: 38,945 Bytes
5d9aa5e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 |
"""
Data processor for processing extracted data.
"""
import re
import os
import json
from typing import Dict, Any, List, Optional, Tuple, Union
import pandas as pd
class DataProcessor:
"""
Class for processing extracted data.
"""
def __init__(self):
"""Initialize the data processor."""
pass
def process_excel_data(self, data: Dict[str, pd.DataFrame], question: str) -> str:
"""
Process data extracted from an Excel file.
Args:
data: Dictionary mapping sheet names to DataFrames
question: The question to answer
Returns:
Answer to the question
"""
# Convert question to lowercase for easier matching
question_lower = question.lower()
# Handle specific question types
if 'oldest' in question_lower:
return self._find_oldest_item(data, question_lower)
elif 'count' in question_lower or 'how many' in question_lower:
return self._count_items(data, question_lower)
elif 'average' in question_lower or 'mean' in question_lower:
return self._calculate_average(data, question_lower)
elif 'total' in question_lower or 'sum' in question_lower:
return self._calculate_total(data, question_lower)
elif 'maximum' in question_lower or 'highest' in question_lower:
return self._find_maximum(data, question_lower)
elif 'minimum' in question_lower or 'lowest' in question_lower:
return self._find_minimum(data, question_lower)
else:
# Try to extract specific information
return self._extract_specific_info(data, question_lower)
def _find_oldest_item(self, data: Dict[str, pd.DataFrame], question: str) -> str:
"""Find the oldest item in the data."""
# Look for mentions of specific columns or items
year_columns = ['year', 'date', 'time', 'created', 'modified', 'release']
item_type = None
# Try to extract the type of item we're looking for
item_types = [
'movie', 'film', 'book', 'song', 'album', 'game', 'video game',
'dvd', 'cd', 'blu-ray', 'blu ray', 'record', 'cassette', 'vhs'
]
for item in item_types:
if item in question:
item_type = item
break
# Iterate through sheets and find the oldest item
oldest_year = float('inf')
oldest_item = None
for sheet_name, df in data.items():
# Skip empty sheets
if df.empty:
continue
# Try to find year/date columns
year_col = None
for col in df.columns:
if any(year_term in col.lower() for year_term in year_columns):
year_col = col
break
if year_col is None:
# If no obvious year column, look for columns with numeric values
for col in df.columns:
if pd.api.types.is_numeric_dtype(df[col]):
try:
# Check if values might be years (between 1900 and current year)
if df[col].min() >= 1900 and df[col].max() <= 2025:
year_col = col
break
except:
continue
if year_col is not None:
# Find title/name column
title_col = None
title_columns = ['title', 'name', 'item', 'product', 'description']
for col in df.columns:
if any(title_term in col.lower() for title_term in title_columns):
title_col = col
break
if title_col is None and len(df.columns) > 1:
# If no obvious title column, use the first non-year column
for col in df.columns:
if col != year_col:
title_col = col
break
# Filter by item type if specified
if item_type:
filtered_df = df
# Look for a column that might contain item types
type_col = None
type_columns = ['type', 'category', 'format', 'medium', 'platform']
for col in df.columns:
if any(type_term in col.lower() for type_term in type_columns):
type_col = col
break
if type_col:
# Filter by item type
filtered_df = df[df[type_col].astype(str).str.lower().str.contains(item_type.lower())]
else:
filtered_df = df
if not filtered_df.empty and title_col:
try:
# Find the row with the minimum year
min_year_idx = filtered_df[year_col].astype(float).idxmin()
min_year = filtered_df.loc[min_year_idx, year_col]
if min_year < oldest_year:
oldest_year = min_year
oldest_item = filtered_df.loc[min_year_idx, title_col]
except:
continue
if oldest_item:
return str(oldest_item)
else:
return "Could not determine the oldest item from the data."
def _count_items(self, data: Dict[str, pd.DataFrame], question: str) -> str:
"""Count items matching specific criteria."""
# Extract conditions from the question
conditions = self._extract_conditions(question)
total_count = 0
for sheet_name, df in data.items():
# Skip empty sheets
if df.empty:
continue
# Apply conditions to filter the DataFrame
filtered_df = df
for condition in conditions:
col = condition.get('column')
value = condition.get('value')
operator = condition.get('operator', '=')
if col and value is not None:
# Find the best matching column
best_col = self._find_best_matching_column(df, col)
if best_col:
try:
if operator == '=':
filtered_df = filtered_df[filtered_df[best_col].astype(str).str.lower() == str(value).lower()]
elif operator == '>':
filtered_df = filtered_df[filtered_df[best_col] > value]
elif operator == '<':
filtered_df = filtered_df[filtered_df[best_col] < value]
elif operator == '>=':
filtered_df = filtered_df[filtered_df[best_col] >= value]
elif operator == '<=':
filtered_df = filtered_df[filtered_df[best_col] <= value]
elif operator == 'contains':
filtered_df = filtered_df[filtered_df[best_col].astype(str).str.lower().str.contains(str(value).lower())]
elif operator == 'between':
if isinstance(value, list) and len(value) == 2:
filtered_df = filtered_df[(filtered_df[best_col] >= value[0]) & (filtered_df[best_col] <= value[1])]
except:
continue
# Add the count from this sheet
total_count += len(filtered_df)
return str(total_count)
def _calculate_average(self, data: Dict[str, pd.DataFrame], question: str) -> str:
"""Calculate the average of a column."""
# Extract column name from the question
column_name = self._extract_column_name(question)
if not column_name:
return "Could not determine which column to calculate the average for."
for sheet_name, df in data.items():
# Skip empty sheets
if df.empty:
continue
# Find the best matching column
best_col = self._find_best_matching_column(df, column_name)
if best_col and pd.api.types.is_numeric_dtype(df[best_col]):
try:
avg_value = df[best_col].mean()
return str(avg_value)
except:
continue
return "Could not calculate the average from the data."
def _calculate_total(self, data: Dict[str, pd.DataFrame], question: str) -> str:
"""Calculate the total of a column."""
# Extract column name from the question
column_name = self._extract_column_name(question)
if not column_name:
return "Could not determine which column to calculate the total for."
for sheet_name, df in data.items():
# Skip empty sheets
if df.empty:
continue
# Find the best matching column
best_col = self._find_best_matching_column(df, column_name)
if best_col and pd.api.types.is_numeric_dtype(df[best_col]):
try:
total_value = df[best_col].sum()
return str(total_value)
except:
continue
return "Could not calculate the total from the data."
def _find_maximum(self, data: Dict[str, pd.DataFrame], question: str) -> str:
"""Find the maximum value in a column."""
# Extract column name from the question
column_name = self._extract_column_name(question)
if not column_name:
return "Could not determine which column to find the maximum for."
for sheet_name, df in data.items():
# Skip empty sheets
if df.empty:
continue
# Find the best matching column
best_col = self._find_best_matching_column(df, column_name)
if best_col:
try:
max_value = df[best_col].max()
return str(max_value)
except:
continue
return "Could not find the maximum value from the data."
def _find_minimum(self, data: Dict[str, pd.DataFrame], question: str) -> str:
"""Find the minimum value in a column."""
# Extract column name from the question
column_name = self._extract_column_name(question)
if not column_name:
return "Could not determine which column to find the minimum for."
for sheet_name, df in data.items():
# Skip empty sheets
if df.empty:
continue
# Find the best matching column
best_col = self._find_best_matching_column(df, column_name)
if best_col:
try:
min_value = df[best_col].min()
return str(min_value)
except:
continue
return "Could not find the minimum value from the data."
def _extract_specific_info(self, data: Dict[str, pd.DataFrame], question: str) -> str:
"""Extract specific information from the data."""
# Try to identify what we're looking for
looking_for = self._extract_looking_for(question)
conditions = self._extract_conditions(question)
for sheet_name, df in data.items():
# Skip empty sheets
if df.empty:
continue
# Apply conditions to filter the DataFrame
filtered_df = df
for condition in conditions:
col = condition.get('column')
value = condition.get('value')
operator = condition.get('operator', '=')
if col and value is not None:
# Find the best matching column
best_col = self._find_best_matching_column(df, col)
if best_col:
try:
if operator == '=':
filtered_df = filtered_df[filtered_df[best_col].astype(str).str.lower() == str(value).lower()]
elif operator == '>':
filtered_df = filtered_df[filtered_df[best_col] > value]
elif operator == '<':
filtered_df = filtered_df[filtered_df[best_col] < value]
elif operator == '>=':
filtered_df = filtered_df[filtered_df[best_col] >= value]
elif operator == '<=':
filtered_df = filtered_df[filtered_df[best_col] <= value]
elif operator == 'contains':
filtered_df = filtered_df[filtered_df[best_col].astype(str).str.lower().str.contains(str(value).lower())]
elif operator == 'between':
if isinstance(value, list) and len(value) == 2:
filtered_df = filtered_df[(filtered_df[best_col] >= value[0]) & (filtered_df[best_col] <= value[1])]
except:
continue
# If we found matching rows and know what to look for
if not filtered_df.empty and looking_for:
# Find the best matching column for what we're looking for
best_col = self._find_best_matching_column(df, looking_for)
if best_col:
try:
# Return the first value
return str(filtered_df.iloc[0][best_col])
except:
continue
# If we couldn't extract specific information, return a more general response
if data:
# Return basic info about the first non-empty sheet
for sheet_name, df in data.items():
if not df.empty:
return f"The sheet contains {len(df)} rows and {len(df.columns)} columns."
return "Could not extract the requested information from the data."
def _extract_conditions(self, question: str) -> List[Dict[str, Any]]:
"""Extract conditions from the question."""
conditions = []
# Check for "between" conditions
between_pattern = r'(\w+) between (\d+) and (\d+)'
for match in re.finditer(between_pattern, question):
column = match.group(1)
start = int(match.group(2))
end = int(match.group(3))
conditions.append({
'column': column,
'operator': 'between',
'value': [start, end],
})
# Check for comparison conditions
comparison_pattern = r'(\w+) (>|<|>=|<=|=|equals|equal to|contains) (\w+)'
for match in re.finditer(comparison_pattern, question):
column = match.group(1)
op = match.group(2)
value = match.group(3)
# Convert operator text to symbols
if op == 'equals' or op == 'equal to':
op = '='
elif op == 'contains':
op = 'contains'
# Try to convert value to number
try:
value = float(value)
except:
pass
conditions.append({
'column': column,
'operator': op,
'value': value,
})
# Check for simple equality conditions
equality_pattern = r'(?:with|where) (\w+) (?:is|=) (\w+)'
for match in re.finditer(equality_pattern, question):
column = match.group(1)
value = match.group(2)
# Try to convert value to number
try:
value = float(value)
except:
pass
conditions.append({
'column': column,
'operator': '=',
'value': value,
})
return conditions
def _extract_column_name(self, question: str) -> Optional[str]:
"""Extract column name from the question."""
# Check for direct mentions of columns
column_pattern = r'(?:column|field) (?:named|called) ["\']?(\w+)["\']?'
match = re.search(column_pattern, question)
if match:
return match.group(1)
# Look for common column references
common_columns = [
'year', 'date', 'time', 'name', 'title', 'price', 'cost',
'amount', 'quantity', 'total', 'value', 'age', 'rating',
'score', 'grade', 'salary', 'income', 'revenue', 'profit',
'loss', 'height', 'weight', 'length', 'width', 'depth',
'area', 'volume'
]
for col in common_columns:
if col in question:
return col
return None
def _extract_looking_for(self, question: str) -> Optional[str]:
"""Extract what we're looking for from the question."""
# Check for direct mentions of what we're looking for
looking_for_pattern = r'(?:what is|what are|find|get|return) the (\w+)'
match = re.search(looking_for_pattern, question)
if match:
return match.group(1)
# Look for common things we might be looking for
common_items = [
'name', 'title', 'price', 'cost', 'amount', 'quantity',
'total', 'value', 'age', 'rating', 'score', 'grade',
'salary', 'income', 'revenue', 'profit', 'loss',
'height', 'weight', 'length', 'width', 'depth',
'area', 'volume', 'year', 'date', 'time'
]
for item in common_items:
if item in question:
return item
return None
def _find_best_matching_column(self, df: pd.DataFrame, column_name: str) -> Optional[str]:
"""Find the best matching column in a DataFrame."""
# Check for exact match
if column_name in df.columns:
return column_name
# Check for case-insensitive match
for col in df.columns:
if col.lower() == column_name.lower():
return col
# Check for partial match
for col in df.columns:
if column_name.lower() in col.lower():
return col
return None
def process_csv_data(self, data: pd.DataFrame, question: str) -> str:
"""
Process data extracted from a CSV file.
Args:
data: DataFrame containing the CSV data
question: The question to answer
Returns:
Answer to the question
"""
# Wrap in a dictionary to reuse Excel processing logic
return self.process_excel_data({'Sheet1': data}, question)
def process_text_data(self, data: str, question: str) -> str:
"""
Process data extracted from a text file.
Args:
data: Text content of the file
question: The question to answer
Returns:
Answer to the question
"""
question_lower = question.lower()
# Handle specific question types
if 'count' in question_lower or 'how many' in question_lower:
# Count occurrences of a word or phrase
count_pattern = r'(?:count|how many) (?:occurrences of|instances of|times) ["\']?([^"\']+)["\']?'
match = re.search(count_pattern, question_lower)
if match:
term = match.group(1)
count = data.lower().count(term.lower())
return str(count)
# Check if the question is asking for a specific line
line_pattern = r'(?:what is|what does|what are|show|return) (?:the|on) (?:line|lines) (\d+)(?:\s*(?:to|-)\s*(\d+))?'
match = re.search(line_pattern, question_lower)
if match:
start_line = int(match.group(1))
end_line = int(match.group(2)) if match.group(2) else start_line
lines = data.split('\n')
if start_line <= len(lines) and end_line <= len(lines):
return '\n'.join(lines[start_line-1:end_line])
# Check if the question is asking for a specific paragraph
para_pattern = r'(?:what is|what does|what are|show|return) (?:the|in) paragraph (\d+)(?:\s*(?:to|-)\s*(\d+))?'
match = re.search(para_pattern, question_lower)
if match:
start_para = int(match.group(1))
end_para = int(match.group(2)) if match.group(2) else start_para
paragraphs = re.split(r'\n\s*\n', data)
if start_para <= len(paragraphs) and end_para <= len(paragraphs):
return '\n\n'.join(paragraphs[start_para-1:end_para])
# Check for specific information requests
info_pattern = r'(?:what|who|where|when|why|how) (?:is|are|was|were|does|do|did) ([^?]+)'
match = re.search(info_pattern, question_lower)
if match:
info = match.group(1).strip()
# Look for this information in the text
sentences = re.split(r'(?<=[.!?])\s+', data)
for sentence in sentences:
if info.lower() in sentence.lower():
return sentence.strip()
# If nothing specific was found, return a generic summary
words = data.split()
return f"The text contains {len(words)} words and {len(data.split('. '))} sentences."
def process_pdf_data(self, data: Dict[int, str], question: str) -> str:
"""
Process data extracted from a PDF file.
Args:
data: Dictionary mapping page numbers to text content
question: The question to answer
Returns:
Answer to the question
"""
question_lower = question.lower()
# Check if the question is asking for a specific page
page_pattern = r'(?:what is|what does|what are|show|return) (?:on|in) page (\d+)'
match = re.search(page_pattern, question_lower)
if match:
page_num = int(match.group(1))
if page_num in data:
return data[page_num]
else:
return f"Page {page_num} not found in the PDF."
# Check if the question is asking for a specific information across all pages
info_pattern = r'(?:what|who|where|when|why|how) (?:is|are|was|were|does|do|did) ([^?]+)'
match = re.search(info_pattern, question_lower)
if match:
info = match.group(1).strip()
# Look for this information in all pages
for page_num, content in data.items():
sentences = re.split(r'(?<=[.!?])\s+', content)
for sentence in sentences:
if info.lower() in sentence.lower():
return sentence.strip()
# If nothing specific was found, combine all text and return a summary
all_text = ' '.join(data.values())
words = all_text.split()
return f"The PDF contains {len(data)} pages and approximately {len(words)} words."
def process_image_metadata(self, metadata: Dict[str, Any], question: str) -> str:
"""
Process metadata extracted from an image file.
Args:
metadata: Dictionary containing image metadata
question: The question to answer
Returns:
Answer to the question
"""
question_lower = question.lower()
# Handle specific question types
if 'format' in question_lower or 'type' in question_lower:
return metadata.get('format', 'Unknown format')
elif 'size' in question_lower or 'resolution' in question_lower:
width = metadata.get('width', 0)
height = metadata.get('height', 0)
return f"{width}x{height}"
elif 'width' in question_lower:
return str(metadata.get('width', 0))
elif 'height' in question_lower:
return str(metadata.get('height', 0))
elif 'mode' in question_lower or 'color' in question_lower:
return metadata.get('mode', 'Unknown mode')
elif 'exif' in question_lower:
exif = metadata.get('exif', {})
if exif:
return str(exif)
else:
return "No EXIF data found."
# If nothing specific was found, return basic information
return f"Image format: {metadata.get('format', 'Unknown')}, Size: {metadata.get('width', 0)}x{metadata.get('height', 0)}, Mode: {metadata.get('mode', 'Unknown')}"
def process_docx_data(self, data: str, question: str) -> str:
"""
Process data extracted from a Word document.
Args:
data: Text content of the document
question: The question to answer
Returns:
Answer to the question
"""
# Similar to text processing
return self.process_text_data(data, question)
def process_pptx_data(self, data: Dict[int, str], question: str) -> str:
"""
Process data extracted from a PowerPoint presentation.
Args:
data: Dictionary mapping slide numbers to text content
question: The question to answer
Returns:
Answer to the question
"""
question_lower = question.lower()
# Check if the question is asking for a specific slide
slide_pattern = r'(?:what is|what does|what are|show|return) (?:on|in) slide (\d+)'
match = re.search(slide_pattern, question_lower)
if match:
slide_num = int(match.group(1))
if slide_num in data:
return data[slide_num]
else:
return f"Slide {slide_num} not found in the presentation."
# Check if the question is asking for a specific information across all slides
info_pattern = r'(?:what|who|where|when|why|how) (?:is|are|was|were|does|do|did) ([^?]+)'
match = re.search(info_pattern, question_lower)
if match:
info = match.group(1).strip()
# Look for this information in all slides
for slide_num, content in data.items():
if info.lower() in content.lower():
return content.strip()
# If nothing specific was found, return a summary
return f"The presentation contains {len(data)} slides."
def process_json_data(self, data: Dict[str, Any], question: str) -> str:
"""
Process data extracted from a JSON file.
Args:
data: Parsed JSON content
question: The question to answer
Returns:
Answer to the question
"""
question_lower = question.lower()
# Check if the question is asking for a specific key
key_pattern = r'(?:what is|what are|show|return) (?:the|in) ["\']?(\w+)["\']?'
match = re.search(key_pattern, question_lower)
if match:
key = match.group(1)
# Look for this key in the JSON
if key in data:
return str(data[key])
# Look for nested keys
for k, v in data.items():
if isinstance(v, dict) and key in v:
return str(v[key])
# If nothing specific was found, return a summary
return f"The JSON contains {len(data)} top-level keys: {', '.join(data.keys())}"
def process_zip_data(self, data: Dict[str, Any], question: str) -> str:
"""
Process data extracted from a ZIP archive.
Args:
data: Dictionary containing information about the archive
question: The question to answer
Returns:
Answer to the question
"""
question_lower = question.lower()
# Handle specific question types
if 'how many' in question_lower or 'count' in question_lower:
if 'files' in question_lower:
return str(len(data.get('files', [])))
# Check if the question is asking for a specific file
file_pattern = r'(?:does it contain|is there) (?:a file named|a file called) ["\']?([^"\']+)["\']?'
match = re.search(file_pattern, question_lower)
if match:
filename = match.group(1)
# Check if the file exists in the archive
for file_info in data.get('files', []):
if filename.lower() in file_info.get('filename', '').lower():
return f"Yes, the archive contains {file_info['filename']} ({file_info['size']} bytes)"
return f"No, the archive does not contain a file named {filename}."
# If nothing specific was found, return a summary
return f"The ZIP archive contains {len(data.get('files', []))} files."
def process_pdb_data(self, data: Dict[str, Any], question: str) -> str:
"""
Process data extracted from a PDB file.
Args:
data: Dictionary containing information about the PDB file
question: The question to answer
Returns:
Answer to the question
"""
question_lower = question.lower()
# Handle specific question types
if 'title' in question_lower:
return data.get('title', 'No title found.')
elif 'header' in question_lower:
return data.get('header', 'No header found.')
elif 'compound' in question_lower or 'compounds' in question_lower:
compounds = data.get('compounds', [])
if compounds:
return '\n'.join(compounds)
else:
return 'No compounds found.'
elif 'author' in question_lower or 'authors' in question_lower:
authors = data.get('authors', [])
if authors:
return '\n'.join(authors)
else:
return 'No authors found.'
elif 'atoms' in question_lower or 'atom count' in question_lower:
return str(data.get('atoms_count', 0))
# If nothing specific was found, return a summary
return f"PDB file with title: {data.get('title', 'No title')}, containing {data.get('atoms_count', 0)} atoms."
def process_python_data(self, data: Dict[str, Any], question: str) -> str:
"""
Process data extracted from a Python file.
Args:
data: Dictionary containing information about the Python file
question: The question to answer
Returns:
Answer to the question
"""
question_lower = question.lower()
# Handle specific question types
if 'class' in question_lower or 'classes' in question_lower:
classes = data.get('classes', [])
if classes:
class_names = [c['name'] for c in classes]
return ', '.join(class_names)
else:
return 'No classes found in the file.'
elif 'function' in question_lower or 'functions' in question_lower:
functions = data.get('functions', [])
if functions:
func_names = [f['name'] for f in functions]
return ', '.join(func_names)
else:
return 'No functions found in the file.'
elif 'import' in question_lower or 'imports' in question_lower:
imports = data.get('imports', [])
if imports:
import_strs = []
for imp in imports:
if imp.get('from'):
import_strs.append(f"from {imp['from']} import {imp['import']}")
else:
import_strs.append(f"import {imp['import']}")
return '\n'.join(import_strs)
else:
return 'No imports found in the file.'
# Check if the question is asking for a specific class or function
class_pattern = r'(?:what is|what does) (?:the class|class) ["\']?(\w+)["\']?'
match = re.search(class_pattern, question_lower)
if match:
class_name = match.group(1)
# Look for this class in the data
for cls in data.get('classes', []):
if cls['name'].lower() == class_name.lower():
parent = f", inherits from {cls['parent']}" if cls['parent'] else ""
return f"Class {cls['name']}{parent}"
func_pattern = r'(?:what is|what does) (?:the function|function) ["\']?(\w+)["\']?'
match = re.search(func_pattern, question_lower)
if match:
func_name = match.group(1)
# Look for this function in the data
for func in data.get('functions', []):
if func['name'].lower() == func_name.lower():
return f"Function {func['name']}({func['params']})"
# If nothing specific was found, look for the code of a specific function or class
code_pattern = r'(?:show|return) (?:the code for|code of) (?:the )?(?:function|class) ["\']?(\w+)["\']?'
match = re.search(code_pattern, question_lower)
if match:
entity_name = match.group(1)
content = data.get('content', '')
# Look for the code of this entity
lines = content.split('\n')
entity_lines = []
in_entity = False
indent = 0
for i, line in enumerate(lines):
# Check for class or function definition
if re.match(rf'(class|def)\s+{re.escape(entity_name)}\s*\(', line):
in_entity = True
entity_lines.append(line)
indent = len(line) - len(line.lstrip())
continue
if in_entity:
# Check if we're still in the entity based on indentation
if line.strip() and len(line) - len(line.lstrip()) <= indent:
in_entity = False
else:
entity_lines.append(line)
if entity_lines:
return '\n'.join(entity_lines)
# If nothing specific was found, return a summary
return f"Python file with {len(data.get('classes', []))} classes and {len(data.get('functions', []))} functions."
def process_jsonl_data(self, data: List[Dict[str, Any]], question: str) -> str:
"""
Process data extracted from a JSONL file.
Args:
data: List of parsed JSON objects
question: The question to answer
Returns:
Answer to the question
"""
question_lower = question.lower()
# Handle specific question types
if 'how many' in question_lower or 'count' in question_lower:
return str(len(data))
# Check if the question is asking for a specific entry
entry_pattern = r'(?:what is|what are|show|return) (?:the|in) entry (\d+)'
match = re.search(entry_pattern, question_lower)
if match:
entry_num = int(match.group(1))
if 0 <= entry_num < len(data):
return str(data[entry_num])
else:
return f"Entry {entry_num} not found in the data."
# Check if the question is asking for entries with a specific key-value pair
kv_pattern = r'(?:entries|items) where ["\']?(\w+)["\']? (?:is|=|equals|contains) ["\']?([^"\']+)["\']?'
match = re.search(kv_pattern, question_lower)
if match:
key = match.group(1)
value = match.group(2)
# Find entries matching the criteria
matching_entries = []
for entry in data:
if key in entry and str(entry[key]).lower() == value.lower():
matching_entries.append(entry)
if matching_entries:
return str(matching_entries)
else:
return f"No entries found where {key} = {value}."
# If nothing specific was found, return a summary
if data and isinstance(data[0], dict):
keys = list(data[0].keys())
return f"The data contains {len(data)} entries with keys: {', '.join(keys)}"
else:
return f"The data contains {len(data)} entries."
|