Spaces:
Running
on
Zero
Running
on
Zero
import re | |
import html | |
import json | |
from typing import Dict, List, Tuple, Optional, Any, Union | |
class ContextProcessor: | |
"""Processes highlighted contexts for the RAG Summarizer Arena""" | |
# Common HTML entities that might be incomplete | |
INCOMPLETE_ENTITIES = { | |
''': ''', | |
'"': '"', | |
'<': '<', | |
'>': '>', | |
'&': '&' | |
} | |
def clean_text(text: str) -> str: | |
"""Cleans text by fixing HTML entities and handling escaped characters""" | |
if not text or not isinstance(text, str): | |
return text | |
# Fix incomplete HTML entities | |
for incomplete, complete in ContextProcessor.INCOMPLETE_ENTITIES.items(): | |
text = re.sub(f"{re.escape(incomplete)}(?!;)", complete, text) | |
# Convert HTML entities to characters | |
try: | |
text = html.unescape(text) | |
except Exception: | |
pass | |
# Handle escaped quotes and special characters | |
replacements = { | |
r'\"': '"', r"\'": "'", r"\n": "\n", r"\t": "\t", r"\\": "\\", | |
'"': '"', '"': '"', ''': "'", ''': "'", '`': "'", '´': "'" | |
} | |
for pattern, replacement in replacements.items(): | |
text = text.replace(pattern, replacement) | |
# Remove trailing backslash if present | |
if text.rstrip().endswith('\\'): | |
text = text.rstrip().rstrip('\\') | |
return text | |
def balance_highlight_tags(text: str) -> str: | |
"""Ensures highlight tags are properly balanced""" | |
if not text or not isinstance(text, str): | |
return text | |
# Define highlight tag patterns | |
highlight_pairs = [ | |
('[[start_highlight]]', '[[end_highlight]]'), | |
('[[highlight_start]]', '[[highlight_end]]'), | |
('<span class="highlight">', '</span>') | |
] | |
# Check and balance each pair | |
for start_tag, end_tag in highlight_pairs: | |
start_count = text.count(start_tag) | |
end_count = text.count(end_tag) | |
# Add missing tags if needed | |
if start_count > end_count: | |
text += end_tag * (start_count - end_count) | |
elif end_count > start_count: | |
text = start_tag * (end_count - start_count) + text | |
return text | |
def balance_quotes(text: str) -> str: | |
"""Ensures quotes are properly balanced""" | |
if not text or not isinstance(text, str): | |
return text | |
# First, remove escaped quotes from the count | |
plain_text = text.replace('\\"', '') | |
# Count quotes and balance if needed | |
quote_count = plain_text.count('"') | |
if quote_count % 2 == 1: | |
text += '"' | |
return text | |
def extract_highlight_parts(text: str) -> List[Tuple[bool, str]]: | |
""" | |
Extracts highlighted and non-highlighted parts from text, preserving order | |
""" | |
# Ensure highlight tags are balanced | |
text = ContextProcessor.balance_highlight_tags(text) | |
# Define all highlight patterns | |
highlight_patterns = [ | |
('[[start_highlight]]', '[[end_highlight]]'), | |
('[[highlight_start]]', '[[highlight_end]]'), | |
('<span class="highlight">', '</span>') | |
] | |
# Collect all highlight sections with their positions | |
all_highlights = [] | |
for start_tag, end_tag in highlight_patterns: | |
# Escape special regex characters if needed | |
start_esc = re.escape(start_tag) | |
end_esc = re.escape(end_tag) | |
# Find all occurrences of this highlight pattern | |
for match in re.finditer(f"{start_esc}(.*?){end_esc}", text, re.DOTALL): | |
all_highlights.append({ | |
'start': match.start(), | |
'end': match.end(), | |
'content': match.group(1), | |
'start_tag': start_tag, | |
'end_tag': end_tag | |
}) | |
# If no highlights found, return the whole text as unhighlighted | |
if not all_highlights: | |
return [(False, text)] | |
# Sort highlights by start position | |
all_highlights.sort(key=lambda x: x['start']) | |
# Build the parts list by processing text portions between and including highlights | |
parts = [] | |
current_pos = 0 | |
for highlight in all_highlights: | |
# Add non-highlighted text before this highlight | |
if highlight['start'] > current_pos: | |
parts.append((False, text[current_pos:highlight['start']])) | |
# Add the highlighted text | |
parts.append((True, highlight['content'])) | |
# Update position to end of this highlight | |
current_pos = highlight['end'] | |
# Add any remaining text after the last highlight | |
if current_pos < len(text): | |
parts.append((False, text[current_pos:])) | |
return parts | |
def is_markdown_table(text: str) -> bool: | |
"""Checks if text looks like a markdown table""" | |
if not text or not isinstance(text, str): | |
return False | |
if '|' in text and '\n' in text: | |
lines = text.strip().split('\n') | |
pipe_lines = sum(1 for line in lines if line.strip().startswith('|')) | |
return pipe_lines >= 2 | |
return False | |
def process_cell_content(cell_text: str) -> str: | |
"""Processes a single table cell, handling highlights if present""" | |
# Clean and prepare the text | |
cell_text = ContextProcessor.clean_text(cell_text) | |
cell_text = ContextProcessor.balance_quotes(cell_text) | |
# Check if cell has any highlight tags | |
has_highlights = False | |
highlight_patterns = [ | |
'[[start_highlight]]', '[[end_highlight]]', | |
'[[highlight_start]]', '[[highlight_end]]', | |
'<span class="highlight">', '</span>' | |
] | |
for pattern in highlight_patterns: | |
if pattern in cell_text: | |
has_highlights = True | |
break | |
if has_highlights: | |
# Extract and process highlight parts | |
parts = ContextProcessor.extract_highlight_parts(cell_text) | |
# Build the result | |
result = "" | |
for is_highlighted, part in parts: | |
if is_highlighted: | |
result += f'<span class="highlight">{html.escape(part)}</span>' | |
else: | |
result += html.escape(part) | |
return result | |
else: | |
# Just escape HTML in regular cells | |
return html.escape(cell_text) | |
def convert_table_to_html(text: str) -> str: | |
"""Converts markdown table to HTML with support for highlights in cells""" | |
# Clean the text | |
text = ContextProcessor.clean_text(text) | |
# Split into lines and get table rows | |
lines = text.strip().split('\n') | |
table_lines = [line for line in lines if line.strip().startswith('|')] | |
# Check if it's a proper table | |
if len(table_lines) < 2: | |
return ContextProcessor.process_text(text) | |
# Check if second line is a separator (----) | |
has_header = False | |
if len(table_lines) >= 2 and '---' in table_lines[1]: | |
has_header = True | |
# Start building HTML table | |
html_output = '<table class="md-table">' | |
if has_header: | |
# Process header row | |
header_line = table_lines[0] | |
# Split by pipe and remove empty first and last elements | |
cells = [cell.strip() for cell in header_line.split('|')] | |
if cells and not cells[0]: | |
cells.pop(0) | |
if cells and not cells[-1]: | |
cells.pop() | |
html_output += '<thead><tr>' | |
for cell in cells: | |
cell_html = ContextProcessor.process_cell_content(cell) | |
html_output += f'<th>{cell_html}</th>' | |
html_output += '</tr></thead>' | |
# Process data rows (skip header and separator) | |
html_output += '<tbody>' | |
for line in table_lines[2:]: | |
cells = [cell.strip() for cell in line.split('|')] | |
if cells and not cells[0]: | |
cells.pop(0) | |
if cells and not cells[-1]: | |
cells.pop() | |
html_output += '<tr>' | |
for cell in cells: | |
cell_html = ContextProcessor.process_cell_content(cell) | |
html_output += f'<td>{cell_html}</td>' | |
html_output += '</tr>' | |
html_output += '</tbody>' | |
else: | |
# All rows are data | |
html_output += '<tbody>' | |
for line in table_lines: | |
cells = [cell.strip() for cell in line.split('|')] | |
if cells and not cells[0]: | |
cells.pop(0) | |
if cells and not cells[-1]: | |
cells.pop() | |
html_output += '<tr>' | |
for cell in cells: | |
cell_html = ContextProcessor.process_cell_content(cell) | |
html_output += f'<td>{cell_html}</td>' | |
html_output += '</tr>' | |
html_output += '</tbody>' | |
html_output += '</table>' | |
return html_output | |
def process_text(text: str) -> str: | |
"""Processes text with highlights, handling all edge cases""" | |
# Clean and prepare the text | |
text = ContextProcessor.clean_text(text) | |
text = ContextProcessor.balance_quotes(text) | |
text = ContextProcessor.balance_highlight_tags(text) | |
# Extract and process highlight parts | |
parts = ContextProcessor.extract_highlight_parts(text) | |
# Build the result | |
result = "" | |
for is_highlighted, part in parts: | |
if is_highlighted: | |
escaped_part = html.escape(part) | |
result += f'<span class="highlight">{escaped_part}</span>' | |
else: | |
result += html.escape(part) | |
return result | |
def process_content(content: str, abbreviated_content: Optional[str] = None) -> str: | |
"""Main function to process any kind of content""" | |
# Handle null/empty content | |
if not content or not isinstance(content, str): | |
return "" | |
# Special cases that need abbreviated content | |
special_cases = [ | |
lambda c: c.strip() == "In Oklahoma,", | |
lambda c: c.strip().startswith('"') and c.count('"') == 1, | |
lambda c: c.rstrip().endswith('\\'), | |
lambda c: (c.replace('\\"', '').count('"') % 2) == 1, | |
lambda c: any((c.count(start) != c.count(end)) for start, end in [ | |
('[[start_highlight]]', '[[end_highlight]]'), | |
('[[highlight_start]]', '[[highlight_end]]'), | |
('<span class="highlight">', '</span>') | |
]) | |
] | |
# Check if we need to use abbreviated content | |
needs_abbreviated = any(check(content) for check in special_cases) | |
# If content needs help and we have abbreviated content, use it | |
if needs_abbreviated and abbreviated_content: | |
# Handle abbreviated content that might be a JSON string | |
if abbreviated_content.strip().startswith('{') and abbreviated_content.strip().endswith('}'): | |
try: | |
data = json.loads(abbreviated_content) | |
if "abbreviatedContent" in data: | |
abbreviated_content = data["abbreviatedContent"] | |
except json.JSONDecodeError: | |
pass | |
# Clean and prepare the abbreviated content | |
abbreviated_content = ContextProcessor.clean_text(abbreviated_content) | |
abbreviated_content = ContextProcessor.balance_quotes(abbreviated_content) | |
abbreviated_content = ContextProcessor.balance_highlight_tags(abbreviated_content) | |
# Use abbreviated content instead | |
content = abbreviated_content | |
# Check if content is a markdown table | |
if ContextProcessor.is_markdown_table(content): | |
return ContextProcessor.convert_table_to_html(content) | |
else: | |
return ContextProcessor.process_text(content) | |
def parse_json_contexts(context_json: str) -> List[Dict[str, Any]]: | |
"""Parses JSON-formatted context data with fallback to regex extraction""" | |
contexts = [] | |
# First try standard JSON parsing | |
try: | |
contexts = json.loads(context_json) | |
if not isinstance(contexts, list): | |
contexts = [] | |
except json.JSONDecodeError: | |
# If standard parsing fails, use regex to extract the data | |
try: | |
# Extract type field | |
type_pattern = r'"type":\s*"(primary|secondary)"' | |
types = re.findall(type_pattern, context_json) | |
# Extract abbreviatedContent field - more robustly handle quotes | |
content_pattern = r'"abbreviatedContent":\s*"((?:\\.|[^"])*?)"' | |
contents = re.findall(content_pattern, context_json) | |
# Build context objects | |
for i, (ctx_type, content) in enumerate(zip(types, contents)): | |
contexts.append({ | |
'type': ctx_type, | |
'abbreviatedContent': content.replace('\\"', '"') | |
}) | |
except Exception as e: | |
print(f"Error extracting contexts with regex: {e}") | |
return contexts | |
def process_json_contexts(context_json: str) -> List[Dict[str, Any]]: | |
"""Process JSON-formatted highlighted contexts""" | |
processed_contexts = [] | |
try: | |
# Parse the JSON contexts | |
contexts = ContextProcessor.parse_json_contexts(context_json) | |
# Process each context item | |
for i, item in enumerate(contexts): | |
if isinstance(item, dict): | |
context_type = item.get('type', 'secondary') | |
content = item.get('abbreviatedContent', '') | |
# Process the content | |
processed_content = ContextProcessor.process_content(content) | |
# Create processed context item | |
processed_contexts.append({ | |
'chunk_num': i + 1, | |
'content': processed_content, | |
'is_primary': context_type == 'primary' | |
}) | |
except Exception as e: | |
print(f"Error processing JSON contexts: {e}") | |
return processed_contexts | |
# Module-level functions for backward compatibility | |
def clean_text(text): | |
return ContextProcessor.clean_text(text) | |
def balance_highlight_tags(text): | |
return ContextProcessor.balance_highlight_tags(text) | |
def balance_quotes(text): | |
return ContextProcessor.balance_quotes(text) | |
def extract_highlight_parts(text): | |
return ContextProcessor.extract_highlight_parts(text) | |
def is_markdown_table(text): | |
return ContextProcessor.is_markdown_table(text) | |
def process_cell_content(cell_text): | |
return ContextProcessor.process_cell_content(cell_text) | |
def convert_table_to_html(text): | |
return ContextProcessor.convert_table_to_html(text) | |
def process_text(text): | |
return ContextProcessor.process_text(text) | |
def process_content(content, abbreviated_content=None): | |
return ContextProcessor.process_content(content, abbreviated_content) | |
def process_highlights(text): | |
"""Main entry point called from data_loader.py""" | |
return ContextProcessor.process_content(text) | |
def get_context_html(example, show_full=False): | |
"""Format context chunks into HTML for display""" | |
html_output = "" | |
# Process insufficient context warning if needed | |
if example.get("insufficient", False): | |
insufficient_reason = example.get("insufficient_reason", "") | |
reason_html = ( | |
f"<p>{insufficient_reason}</p>" if insufficient_reason else | |
"<p>The context may not contain enough information to fully answer the question, " | |
"or the question might be ambiguous. Models should ideally indicate this limitation " | |
"or refuse to answer.</p>" | |
) | |
html_output += f""" | |
<div class="insufficient-alert"> | |
<strong> | |
<svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" viewBox="0 0 24 24" fill="none" | |
stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" | |
style="vertical-align: middle; margin-right: 5px;"> | |
<path d="m21.73 18-8-14a2 2 0 0 0-3.48 0l-8 14A2 2 0 0 0 4 21h16a2 2 0 0 0 1.73-3Z"></path> | |
<line x1="12" y1="9" x2="12" y2="13"></line> | |
<line x1="12" y1="17" x2="12.01" y2="17"></line> | |
</svg> | |
Insufficient Context | |
</strong> | |
{reason_html} | |
</div> | |
""" | |
html_output += '<div class="context-items-container">' | |
# Display full contexts if requested | |
if show_full and "full_contexts" in example and example["full_contexts"]: | |
for context_item in example["full_contexts"]: | |
content = context_item.get('content', '') | |
abbreviated = context_item.get('abbreviatedContent', None) | |
# Process the content | |
processed = ContextProcessor.process_content(content, abbreviated) | |
html_output += f'<div class="context-item">{processed}</div>' | |
else: | |
# Display regular contexts if available | |
if "contexts" in example and example["contexts"]: | |
for context_item in example["contexts"]: | |
content = context_item.get('content', '') | |
abbreviated = context_item.get('abbreviatedContent', None) | |
# Process the content | |
processed = ContextProcessor.process_content(content, abbreviated) | |
is_primary = context_item.get('is_primary', False) | |
extra_class = " primary-context" if is_primary else "" | |
html_output += f'<div class="context-item{extra_class}">{processed}</div>' | |
# Or process JSON-structured highlighted contexts | |
elif "contexts_highlighted" in example and example["contexts_highlighted"]: | |
processed_contexts = ContextProcessor.process_json_contexts(example["contexts_highlighted"]) | |
for context_item in processed_contexts: | |
is_primary = context_item.get('is_primary', False) | |
extra_class = " primary-context" if is_primary else "" | |
html_output += f'<div class="context-item{extra_class}">{context_item["content"]}</div>' | |
else: | |
html_output += '<div class="context-item">No context available. Try toggling to full context view.</div>' | |
html_output += '</div>' | |
return html_output |