import re
import html
import json
from typing import Dict, List, Tuple, Optional, Any, Union
class ContextProcessor:
"""Processes highlighted contexts for the RAG Summarizer Arena"""
# Common HTML entities that might be incomplete
INCOMPLETE_ENTITIES = {
''': ''',
'"': '"',
'<': '<',
'>': '>',
'&': '&'
}
@staticmethod
def clean_text(text: str) -> str:
"""Cleans text by fixing HTML entities and handling escaped characters"""
if not text or not isinstance(text, str):
return text
# Fix incomplete HTML entities
for incomplete, complete in ContextProcessor.INCOMPLETE_ENTITIES.items():
text = re.sub(f"{re.escape(incomplete)}(?!;)", complete, text)
# Convert HTML entities to characters
try:
text = html.unescape(text)
except Exception:
pass
# Handle escaped quotes and special characters
replacements = {
r'\"': '"', r"\'": "'", r"\n": "\n", r"\t": "\t", r"\\": "\\",
'"': '"', '"': '"', ''': "'", ''': "'", '`': "'", 'ยด': "'"
}
for pattern, replacement in replacements.items():
text = text.replace(pattern, replacement)
# Remove trailing backslash if present
if text.rstrip().endswith('\\'):
text = text.rstrip().rstrip('\\')
return text
@staticmethod
def balance_highlight_tags(text: str) -> str:
"""Ensures highlight tags are properly balanced"""
if not text or not isinstance(text, str):
return text
# Define highlight tag patterns
highlight_pairs = [
('[[start_highlight]]', '[[end_highlight]]'),
('[[highlight_start]]', '[[highlight_end]]'),
('', '')
]
# Check and balance each pair
for start_tag, end_tag in highlight_pairs:
start_count = text.count(start_tag)
end_count = text.count(end_tag)
# Add missing tags if needed
if start_count > end_count:
text += end_tag * (start_count - end_count)
elif end_count > start_count:
text = start_tag * (end_count - start_count) + text
return text
@staticmethod
def balance_quotes(text: str) -> str:
"""Ensures quotes are properly balanced"""
if not text or not isinstance(text, str):
return text
# First, remove escaped quotes from the count
plain_text = text.replace('\\"', '')
# Count quotes and balance if needed
quote_count = plain_text.count('"')
if quote_count % 2 == 1:
text += '"'
return text
@staticmethod
def extract_highlight_parts(text: str) -> List[Tuple[bool, str]]:
"""
Extracts highlighted and non-highlighted parts from text, preserving order
"""
# Ensure highlight tags are balanced
text = ContextProcessor.balance_highlight_tags(text)
# Define all highlight patterns
highlight_patterns = [
('[[start_highlight]]', '[[end_highlight]]'),
('[[highlight_start]]', '[[highlight_end]]'),
('', '')
]
# Collect all highlight sections with their positions
all_highlights = []
for start_tag, end_tag in highlight_patterns:
# Escape special regex characters if needed
start_esc = re.escape(start_tag)
end_esc = re.escape(end_tag)
# Find all occurrences of this highlight pattern
for match in re.finditer(f"{start_esc}(.*?){end_esc}", text, re.DOTALL):
all_highlights.append({
'start': match.start(),
'end': match.end(),
'content': match.group(1),
'start_tag': start_tag,
'end_tag': end_tag
})
# If no highlights found, return the whole text as unhighlighted
if not all_highlights:
return [(False, text)]
# Sort highlights by start position
all_highlights.sort(key=lambda x: x['start'])
# Build the parts list by processing text portions between and including highlights
parts = []
current_pos = 0
for highlight in all_highlights:
# Add non-highlighted text before this highlight
if highlight['start'] > current_pos:
parts.append((False, text[current_pos:highlight['start']]))
# Add the highlighted text
parts.append((True, highlight['content']))
# Update position to end of this highlight
current_pos = highlight['end']
# Add any remaining text after the last highlight
if current_pos < len(text):
parts.append((False, text[current_pos:]))
return parts
@staticmethod
def is_markdown_table(text: str) -> bool:
"""Checks if text looks like a markdown table"""
if not text or not isinstance(text, str):
return False
if '|' in text and '\n' in text:
lines = text.strip().split('\n')
pipe_lines = sum(1 for line in lines if line.strip().startswith('|'))
return pipe_lines >= 2
return False
@staticmethod
def process_cell_content(cell_text: str) -> str:
"""Processes a single table cell, handling highlights if present"""
# Clean and prepare the text
cell_text = ContextProcessor.clean_text(cell_text)
cell_text = ContextProcessor.balance_quotes(cell_text)
# Check if cell has any highlight tags
has_highlights = False
highlight_patterns = [
'[[start_highlight]]', '[[end_highlight]]',
'[[highlight_start]]', '[[highlight_end]]',
'', ''
]
for pattern in highlight_patterns:
if pattern in cell_text:
has_highlights = True
break
if has_highlights:
# Extract and process highlight parts
parts = ContextProcessor.extract_highlight_parts(cell_text)
# Build the result
result = ""
for is_highlighted, part in parts:
if is_highlighted:
result += f'{html.escape(part)}'
else:
result += html.escape(part)
return result
else:
# Just escape HTML in regular cells
return html.escape(cell_text)
@staticmethod
def convert_table_to_html(text: str) -> str:
"""Converts markdown table to HTML with support for highlights in cells"""
# Clean the text
text = ContextProcessor.clean_text(text)
# Split into lines and get table rows
lines = text.strip().split('\n')
table_lines = [line for line in lines if line.strip().startswith('|')]
# Check if it's a proper table
if len(table_lines) < 2:
return ContextProcessor.process_text(text)
# Check if second line is a separator (----)
has_header = False
if len(table_lines) >= 2 and '---' in table_lines[1]:
has_header = True
# Start building HTML table
html_output = '
'
if has_header:
# Process header row
header_line = table_lines[0]
# Split by pipe and remove empty first and last elements
cells = [cell.strip() for cell in header_line.split('|')]
if cells and not cells[0]:
cells.pop(0)
if cells and not cells[-1]:
cells.pop()
html_output += '
'
for cell in cells:
cell_html = ContextProcessor.process_cell_content(cell)
html_output += f'
{cell_html}
'
html_output += '
'
# Process data rows (skip header and separator)
html_output += ''
for line in table_lines[2:]:
cells = [cell.strip() for cell in line.split('|')]
if cells and not cells[0]:
cells.pop(0)
if cells and not cells[-1]:
cells.pop()
html_output += '
'
for cell in cells:
cell_html = ContextProcessor.process_cell_content(cell)
html_output += f'
{cell_html}
'
html_output += '
'
html_output += ''
else:
# All rows are data
html_output += ''
for line in table_lines:
cells = [cell.strip() for cell in line.split('|')]
if cells and not cells[0]:
cells.pop(0)
if cells and not cells[-1]:
cells.pop()
html_output += '
'
for cell in cells:
cell_html = ContextProcessor.process_cell_content(cell)
html_output += f'
{cell_html}
'
html_output += '
'
html_output += ''
html_output += '
'
return html_output
@staticmethod
def process_text(text: str) -> str:
"""Processes text with highlights, handling all edge cases"""
# Clean and prepare the text
text = ContextProcessor.clean_text(text)
text = ContextProcessor.balance_quotes(text)
text = ContextProcessor.balance_highlight_tags(text)
# Extract and process highlight parts
parts = ContextProcessor.extract_highlight_parts(text)
# Build the result
result = ""
for is_highlighted, part in parts:
if is_highlighted:
escaped_part = html.escape(part)
result += f'{escaped_part}'
else:
result += html.escape(part)
return result
@staticmethod
def process_content(content: str, abbreviated_content: Optional[str] = None) -> str:
"""Main function to process any kind of content"""
# Handle null/empty content
if not content or not isinstance(content, str):
return ""
# Special cases that need abbreviated content
special_cases = [
lambda c: c.strip() == "In Oklahoma,",
lambda c: c.strip().startswith('"') and c.count('"') == 1,
lambda c: c.rstrip().endswith('\\'),
lambda c: (c.replace('\\"', '').count('"') % 2) == 1,
lambda c: any((c.count(start) != c.count(end)) for start, end in [
('[[start_highlight]]', '[[end_highlight]]'),
('[[highlight_start]]', '[[highlight_end]]'),
('', '')
])
]
# Check if we need to use abbreviated content
needs_abbreviated = any(check(content) for check in special_cases)
# If content needs help and we have abbreviated content, use it
if needs_abbreviated and abbreviated_content:
# Handle abbreviated content that might be a JSON string
if abbreviated_content.strip().startswith('{') and abbreviated_content.strip().endswith('}'):
try:
data = json.loads(abbreviated_content)
if "abbreviatedContent" in data:
abbreviated_content = data["abbreviatedContent"]
except json.JSONDecodeError:
pass
# Clean and prepare the abbreviated content
abbreviated_content = ContextProcessor.clean_text(abbreviated_content)
abbreviated_content = ContextProcessor.balance_quotes(abbreviated_content)
abbreviated_content = ContextProcessor.balance_highlight_tags(abbreviated_content)
# Use abbreviated content instead
content = abbreviated_content
# Check if content is a markdown table
if ContextProcessor.is_markdown_table(content):
return ContextProcessor.convert_table_to_html(content)
else:
return ContextProcessor.process_text(content)
@staticmethod
def parse_json_contexts(context_json: str) -> List[Dict[str, Any]]:
"""Parses JSON-formatted context data with fallback to regex extraction"""
contexts = []
# First try standard JSON parsing
try:
contexts = json.loads(context_json)
if not isinstance(contexts, list):
contexts = []
except json.JSONDecodeError:
# If standard parsing fails, use regex to extract the data
try:
# Extract type field
type_pattern = r'"type":\s*"(primary|secondary)"'
types = re.findall(type_pattern, context_json)
# Extract abbreviatedContent field - more robustly handle quotes
content_pattern = r'"abbreviatedContent":\s*"((?:\\.|[^"])*?)"'
contents = re.findall(content_pattern, context_json)
# Build context objects
for i, (ctx_type, content) in enumerate(zip(types, contents)):
contexts.append({
'type': ctx_type,
'abbreviatedContent': content.replace('\\"', '"')
})
except Exception as e:
print(f"Error extracting contexts with regex: {e}")
return contexts
@staticmethod
def process_json_contexts(context_json: str) -> List[Dict[str, Any]]:
"""Process JSON-formatted highlighted contexts"""
processed_contexts = []
try:
# Parse the JSON contexts
contexts = ContextProcessor.parse_json_contexts(context_json)
# Process each context item
for i, item in enumerate(contexts):
if isinstance(item, dict):
context_type = item.get('type', 'secondary')
content = item.get('abbreviatedContent', '')
# Process the content
processed_content = ContextProcessor.process_content(content)
# Create processed context item
processed_contexts.append({
'chunk_num': i + 1,
'content': processed_content,
'is_primary': context_type == 'primary'
})
except Exception as e:
print(f"Error processing JSON contexts: {e}")
return processed_contexts
# Module-level functions for backward compatibility
def clean_text(text):
return ContextProcessor.clean_text(text)
def balance_highlight_tags(text):
return ContextProcessor.balance_highlight_tags(text)
def balance_quotes(text):
return ContextProcessor.balance_quotes(text)
def extract_highlight_parts(text):
return ContextProcessor.extract_highlight_parts(text)
def is_markdown_table(text):
return ContextProcessor.is_markdown_table(text)
def process_cell_content(cell_text):
return ContextProcessor.process_cell_content(cell_text)
def convert_table_to_html(text):
return ContextProcessor.convert_table_to_html(text)
def process_text(text):
return ContextProcessor.process_text(text)
def process_content(content, abbreviated_content=None):
return ContextProcessor.process_content(content, abbreviated_content)
def process_highlights(text):
"""Main entry point called from data_loader.py"""
return ContextProcessor.process_content(text)
def get_context_html(example, show_full=False):
"""Format context chunks into HTML for display"""
html_output = ""
# Process insufficient context warning if needed
if example.get("insufficient", False):
insufficient_reason = example.get("insufficient_reason", "")
reason_html = (
f"
{insufficient_reason}
" if insufficient_reason else
"
The context may not contain enough information to fully answer the question, "
"or the question might be ambiguous. Models should ideally indicate this limitation "
"or refuse to answer.
"
)
html_output += f"""
Insufficient Context
{reason_html}
"""
html_output += '
'
# Display full contexts if requested
if show_full and "full_contexts" in example and example["full_contexts"]:
for context_item in example["full_contexts"]:
content = context_item.get('content', '')
abbreviated = context_item.get('abbreviatedContent', None)
# Process the content
processed = ContextProcessor.process_content(content, abbreviated)
html_output += f'
{processed}
'
else:
# Display regular contexts if available
if "contexts" in example and example["contexts"]:
for context_item in example["contexts"]:
content = context_item.get('content', '')
abbreviated = context_item.get('abbreviatedContent', None)
# Process the content
processed = ContextProcessor.process_content(content, abbreviated)
is_primary = context_item.get('is_primary', False)
extra_class = " primary-context" if is_primary else ""
html_output += f'
{processed}
'
# Or process JSON-structured highlighted contexts
elif "contexts_highlighted" in example and example["contexts_highlighted"]:
processed_contexts = ContextProcessor.process_json_contexts(example["contexts_highlighted"])
for context_item in processed_contexts:
is_primary = context_item.get('is_primary', False)
extra_class = " primary-context" if is_primary else ""
html_output += f'
{context_item["content"]}
'
else:
html_output += '
No context available. Try toggling to full context view.