import re import html import json from typing import Dict, List, Tuple, Optional, Any, Union class ContextProcessor: """Processes highlighted contexts for the RAG Summarizer Arena""" # Common HTML entities that might be incomplete INCOMPLETE_ENTITIES = { ''': ''', '"': '"', '<': '<', '>': '>', '&': '&' } @staticmethod def clean_text(text: str) -> str: """Cleans text by fixing HTML entities and handling escaped characters""" if not text or not isinstance(text, str): return text # Fix incomplete HTML entities for incomplete, complete in ContextProcessor.INCOMPLETE_ENTITIES.items(): text = re.sub(f"{re.escape(incomplete)}(?!;)", complete, text) # Convert HTML entities to characters try: text = html.unescape(text) except Exception: pass # Handle escaped quotes and special characters replacements = { r'\"': '"', r"\'": "'", r"\n": "\n", r"\t": "\t", r"\\": "\\", '"': '"', '"': '"', ''': "'", ''': "'", '`': "'", 'ยด': "'" } for pattern, replacement in replacements.items(): text = text.replace(pattern, replacement) # Remove trailing backslash if present if text.rstrip().endswith('\\'): text = text.rstrip().rstrip('\\') return text @staticmethod def balance_highlight_tags(text: str) -> str: """Ensures highlight tags are properly balanced""" if not text or not isinstance(text, str): return text # Define highlight tag patterns highlight_pairs = [ ('[[start_highlight]]', '[[end_highlight]]'), ('[[highlight_start]]', '[[highlight_end]]'), ('', '') ] # Check and balance each pair for start_tag, end_tag in highlight_pairs: start_count = text.count(start_tag) end_count = text.count(end_tag) # Add missing tags if needed if start_count > end_count: text += end_tag * (start_count - end_count) elif end_count > start_count: text = start_tag * (end_count - start_count) + text return text @staticmethod def balance_quotes(text: str) -> str: """Ensures quotes are properly balanced""" if not text or not isinstance(text, str): return text # First, remove escaped quotes from the count plain_text = text.replace('\\"', '') # Count quotes and balance if needed quote_count = plain_text.count('"') if quote_count % 2 == 1: text += '"' return text @staticmethod def extract_highlight_parts(text: str) -> List[Tuple[bool, str]]: """ Extracts highlighted and non-highlighted parts from text, preserving order """ # Ensure highlight tags are balanced text = ContextProcessor.balance_highlight_tags(text) # Define all highlight patterns highlight_patterns = [ ('[[start_highlight]]', '[[end_highlight]]'), ('[[highlight_start]]', '[[highlight_end]]'), ('', '') ] # Collect all highlight sections with their positions all_highlights = [] for start_tag, end_tag in highlight_patterns: # Escape special regex characters if needed start_esc = re.escape(start_tag) end_esc = re.escape(end_tag) # Find all occurrences of this highlight pattern for match in re.finditer(f"{start_esc}(.*?){end_esc}", text, re.DOTALL): all_highlights.append({ 'start': match.start(), 'end': match.end(), 'content': match.group(1), 'start_tag': start_tag, 'end_tag': end_tag }) # If no highlights found, return the whole text as unhighlighted if not all_highlights: return [(False, text)] # Sort highlights by start position all_highlights.sort(key=lambda x: x['start']) # Build the parts list by processing text portions between and including highlights parts = [] current_pos = 0 for highlight in all_highlights: # Add non-highlighted text before this highlight if highlight['start'] > current_pos: parts.append((False, text[current_pos:highlight['start']])) # Add the highlighted text parts.append((True, highlight['content'])) # Update position to end of this highlight current_pos = highlight['end'] # Add any remaining text after the last highlight if current_pos < len(text): parts.append((False, text[current_pos:])) return parts @staticmethod def is_markdown_table(text: str) -> bool: """Checks if text looks like a markdown table""" if not text or not isinstance(text, str): return False if '|' in text and '\n' in text: lines = text.strip().split('\n') pipe_lines = sum(1 for line in lines if line.strip().startswith('|')) return pipe_lines >= 2 return False @staticmethod def process_cell_content(cell_text: str) -> str: """Processes a single table cell, handling highlights if present""" # Clean and prepare the text cell_text = ContextProcessor.clean_text(cell_text) cell_text = ContextProcessor.balance_quotes(cell_text) # Check if cell has any highlight tags has_highlights = False highlight_patterns = [ '[[start_highlight]]', '[[end_highlight]]', '[[highlight_start]]', '[[highlight_end]]', '', '' ] for pattern in highlight_patterns: if pattern in cell_text: has_highlights = True break if has_highlights: # Extract and process highlight parts parts = ContextProcessor.extract_highlight_parts(cell_text) # Build the result result = "" for is_highlighted, part in parts: if is_highlighted: result += f'{html.escape(part)}' else: result += html.escape(part) return result else: # Just escape HTML in regular cells return html.escape(cell_text) @staticmethod def convert_table_to_html(text: str) -> str: """Converts markdown table to HTML with support for highlights in cells""" # Clean the text text = ContextProcessor.clean_text(text) # Split into lines and get table rows lines = text.strip().split('\n') table_lines = [line for line in lines if line.strip().startswith('|')] # Check if it's a proper table if len(table_lines) < 2: return ContextProcessor.process_text(text) # Check if second line is a separator (----) has_header = False if len(table_lines) >= 2 and '---' in table_lines[1]: has_header = True # Start building HTML table html_output = '' if has_header: # Process header row header_line = table_lines[0] # Split by pipe and remove empty first and last elements cells = [cell.strip() for cell in header_line.split('|')] if cells and not cells[0]: cells.pop(0) if cells and not cells[-1]: cells.pop() html_output += '' for cell in cells: cell_html = ContextProcessor.process_cell_content(cell) html_output += f'' html_output += '' # Process data rows (skip header and separator) html_output += '' for line in table_lines[2:]: cells = [cell.strip() for cell in line.split('|')] if cells and not cells[0]: cells.pop(0) if cells and not cells[-1]: cells.pop() html_output += '' for cell in cells: cell_html = ContextProcessor.process_cell_content(cell) html_output += f'' html_output += '' html_output += '' else: # All rows are data html_output += '' for line in table_lines: cells = [cell.strip() for cell in line.split('|')] if cells and not cells[0]: cells.pop(0) if cells and not cells[-1]: cells.pop() html_output += '' for cell in cells: cell_html = ContextProcessor.process_cell_content(cell) html_output += f'' html_output += '' html_output += '' html_output += '
{cell_html}
{cell_html}
{cell_html}
' return html_output @staticmethod def process_text(text: str) -> str: """Processes text with highlights, handling all edge cases""" # Clean and prepare the text text = ContextProcessor.clean_text(text) text = ContextProcessor.balance_quotes(text) text = ContextProcessor.balance_highlight_tags(text) # Extract and process highlight parts parts = ContextProcessor.extract_highlight_parts(text) # Build the result result = "" for is_highlighted, part in parts: if is_highlighted: escaped_part = html.escape(part) result += f'{escaped_part}' else: result += html.escape(part) return result @staticmethod def process_content(content: str, abbreviated_content: Optional[str] = None) -> str: """Main function to process any kind of content""" # Handle null/empty content if not content or not isinstance(content, str): return "" # Special cases that need abbreviated content special_cases = [ lambda c: c.strip() == "In Oklahoma,", lambda c: c.strip().startswith('"') and c.count('"') == 1, lambda c: c.rstrip().endswith('\\'), lambda c: (c.replace('\\"', '').count('"') % 2) == 1, lambda c: any((c.count(start) != c.count(end)) for start, end in [ ('[[start_highlight]]', '[[end_highlight]]'), ('[[highlight_start]]', '[[highlight_end]]'), ('', '') ]) ] # Check if we need to use abbreviated content needs_abbreviated = any(check(content) for check in special_cases) # If content needs help and we have abbreviated content, use it if needs_abbreviated and abbreviated_content: # Handle abbreviated content that might be a JSON string if abbreviated_content.strip().startswith('{') and abbreviated_content.strip().endswith('}'): try: data = json.loads(abbreviated_content) if "abbreviatedContent" in data: abbreviated_content = data["abbreviatedContent"] except json.JSONDecodeError: pass # Clean and prepare the abbreviated content abbreviated_content = ContextProcessor.clean_text(abbreviated_content) abbreviated_content = ContextProcessor.balance_quotes(abbreviated_content) abbreviated_content = ContextProcessor.balance_highlight_tags(abbreviated_content) # Use abbreviated content instead content = abbreviated_content # Check if content is a markdown table if ContextProcessor.is_markdown_table(content): return ContextProcessor.convert_table_to_html(content) else: return ContextProcessor.process_text(content) @staticmethod def parse_json_contexts(context_json: str) -> List[Dict[str, Any]]: """Parses JSON-formatted context data with fallback to regex extraction""" contexts = [] # First try standard JSON parsing try: contexts = json.loads(context_json) if not isinstance(contexts, list): contexts = [] except json.JSONDecodeError: # If standard parsing fails, use regex to extract the data try: # Extract type field type_pattern = r'"type":\s*"(primary|secondary)"' types = re.findall(type_pattern, context_json) # Extract abbreviatedContent field - more robustly handle quotes content_pattern = r'"abbreviatedContent":\s*"((?:\\.|[^"])*?)"' contents = re.findall(content_pattern, context_json) # Build context objects for i, (ctx_type, content) in enumerate(zip(types, contents)): contexts.append({ 'type': ctx_type, 'abbreviatedContent': content.replace('\\"', '"') }) except Exception as e: print(f"Error extracting contexts with regex: {e}") return contexts @staticmethod def process_json_contexts(context_json: str) -> List[Dict[str, Any]]: """Process JSON-formatted highlighted contexts""" processed_contexts = [] try: # Parse the JSON contexts contexts = ContextProcessor.parse_json_contexts(context_json) # Process each context item for i, item in enumerate(contexts): if isinstance(item, dict): context_type = item.get('type', 'secondary') content = item.get('abbreviatedContent', '') # Process the content processed_content = ContextProcessor.process_content(content) # Create processed context item processed_contexts.append({ 'chunk_num': i + 1, 'content': processed_content, 'is_primary': context_type == 'primary' }) except Exception as e: print(f"Error processing JSON contexts: {e}") return processed_contexts # Module-level functions for backward compatibility def clean_text(text): return ContextProcessor.clean_text(text) def balance_highlight_tags(text): return ContextProcessor.balance_highlight_tags(text) def balance_quotes(text): return ContextProcessor.balance_quotes(text) def extract_highlight_parts(text): return ContextProcessor.extract_highlight_parts(text) def is_markdown_table(text): return ContextProcessor.is_markdown_table(text) def process_cell_content(cell_text): return ContextProcessor.process_cell_content(cell_text) def convert_table_to_html(text): return ContextProcessor.convert_table_to_html(text) def process_text(text): return ContextProcessor.process_text(text) def process_content(content, abbreviated_content=None): return ContextProcessor.process_content(content, abbreviated_content) def process_highlights(text): """Main entry point called from data_loader.py""" return ContextProcessor.process_content(text) def get_context_html(example, show_full=False): """Format context chunks into HTML for display""" html_output = "" # Process insufficient context warning if needed if example.get("insufficient", False): insufficient_reason = example.get("insufficient_reason", "") reason_html = ( f"

{insufficient_reason}

" if insufficient_reason else "

The context may not contain enough information to fully answer the question, " "or the question might be ambiguous. Models should ideally indicate this limitation " "or refuse to answer.

" ) html_output += f"""
Insufficient Context {reason_html}
""" html_output += '
' # Display full contexts if requested if show_full and "full_contexts" in example and example["full_contexts"]: for context_item in example["full_contexts"]: content = context_item.get('content', '') abbreviated = context_item.get('abbreviatedContent', None) # Process the content processed = ContextProcessor.process_content(content, abbreviated) html_output += f'
{processed}
' else: # Display regular contexts if available if "contexts" in example and example["contexts"]: for context_item in example["contexts"]: content = context_item.get('content', '') abbreviated = context_item.get('abbreviatedContent', None) # Process the content processed = ContextProcessor.process_content(content, abbreviated) is_primary = context_item.get('is_primary', False) extra_class = " primary-context" if is_primary else "" html_output += f'
{processed}
' # Or process JSON-structured highlighted contexts elif "contexts_highlighted" in example and example["contexts_highlighted"]: processed_contexts = ContextProcessor.process_json_contexts(example["contexts_highlighted"]) for context_item in processed_contexts: is_primary = context_item.get('is_primary', False) extra_class = " primary-context" if is_primary else "" html_output += f'
{context_item["content"]}
' else: html_output += '
No context available. Try toggling to full context view.
' html_output += '
' return html_output