Spaces:
Running
on
Zero
Running
on
Zero
# Acknowledgement: This demo code is adapted from the original Hugging Face Space "ContextCite" | |
# (https://huggingface.co/spaces/contextcite/context-cite). | |
import os | |
from enum import Enum | |
from dataclasses import dataclass | |
from typing import Dict, List, Any, Optional | |
import gradio as gr | |
import numpy as np | |
import spaces | |
import nltk | |
import base64 | |
import traceback | |
from src.utils import split_into_sentences as split_into_sentences_utils | |
# --- AttnTrace imports (from app_full.py) --- | |
from src.models import create_model | |
from src.attribution import AttnTraceAttribution | |
from src.prompts import wrap_prompt | |
from gradio_highlightedtextbox import HighlightedTextbox | |
from examples import run_example_1, run_example_2, run_example_3, run_example_4, run_example_5, run_example_6 | |
from functools import partial | |
# Load original app constants | |
APP_TITLE = '<div class="app-title"><span class="brand">AttnTrace: </span><span class="subtitle">Attention-based Context Traceback for Long-Context LLMs</span></div>' | |
APP_DESCRIPTION = """AttnTrace traces a model's generated statements back to specific parts of the context using attention-based traceback. Try it out with Meta-Llama-3.1-8B-Instruct here! See the [[paper](https://arxiv.org/abs/2506.04202)] and [[code](https://github.com/Wang-Yanting/TracLLM-Kit)] for more! | |
Maintained by the AttnTrace team.""" | |
# NEW_TEXT = """Long-context large language models (LLMs), such as Gemini-2.5-Pro and Claude-Sonnet-4, are increasingly used to empower advanced AI systems, including retrieval-augmented generation (RAG) pipelines and autonomous agents. In these systems, an LLM receives an instruction along with a context—often consisting of texts retrieved from a knowledge database or memory—and generates a response that is contextually grounded by following the instruction. Recent studies have designed solutions to trace back to a subset of texts in the context that contributes most to the response generated by the LLM. These solutions have numerous real-world applications, including performing post-attack forensic analysis and improving the interpretability and trustworthiness of LLM outputs. While significant efforts have been made, state-of-the-art solutions such as TracLLM often lead to a high computation cost, e.g., it takes TracLLM hundreds of seconds to perform traceback for a single response-context pair. In this work, we propose {\name}, a new context traceback method based on the attention weights produced by an LLM for a prompt. To effectively utilize attention weights, we introduce two techniques designed to enhance the effectiveness of {\name}, and we provide theoretical insights for our design choice. %Moreover, we perform both theoretical analysis and empirical evaluation to demonstrate their effectiveness. | |
# We also perform a systematic evaluation for {\name}. The results demonstrate that {\name} is more accurate and efficient than existing state-of-the-art context traceback methods. We also show {\name} can improve state-of-the-art methods in detecting prompt injection under long contexts through the attribution-before-detection paradigm. As a real-world application, we demonstrate that {\name} can effectively pinpoint injected instructions in a paper designed to manipulate LLM-generated reviews. | |
# The code and data will be open-sourced. """ | |
# EDIT_TEXT = "Feel free to edit!" | |
GENERATE_CONTEXT_TOO_LONG_TEXT = ( | |
'<em style="color: red;">Context is too long for the current model.</em>' | |
) | |
ATTRIBUTE_CONTEXT_TOO_LONG_TEXT = '<em style="color: red;">Context is too long for the current traceback method.</em>' | |
CONTEXT_LINES = 20 | |
CONTEXT_MAX_LINES = 40 | |
SELECTION_DEFAULT_TEXT = "Click on a sentence in the response to traceback!" | |
SELECTION_DEFAULT_VALUE = [(SELECTION_DEFAULT_TEXT, None)] | |
SOURCES_INFO = 'These are the texts that contribute most to the response.' | |
# SOURCES_IN_CONTEXT_INFO = ( | |
# "This shows the important sentences highlighted within their surrounding context from the text above. Colors indicate ranking: Red (1st), Orange (2nd), Golden (3rd), Yellow (4th-5th), Light (6th+)." | |
# ) | |
MODEL_PATHS = [ | |
"meta-llama/Meta-Llama-3.1-8B-Instruct", | |
] | |
MAX_TOKENS = { | |
"meta-llama/Meta-Llama-3.1-8B-Instruct": 131072, | |
} | |
DEFAULT_MODEL_PATH = MODEL_PATHS[0] | |
EXPLANATION_LEVELS = ["sentence", "paragraph", "text segment"] | |
DEFAULT_EXPLANATION_LEVEL = "sentence" | |
class WorkflowState(Enum): | |
WAITING_TO_GENERATE = 0 | |
WAITING_TO_SELECT = 1 | |
READY_TO_ATTRIBUTE = 2 | |
class State: | |
workflow_state: WorkflowState | |
context: str | |
query: str | |
response: str | |
start_index: int | |
end_index: int | |
scores: np.ndarray | |
answer: str | |
highlighted_context: str | |
full_response: str | |
explained_response_part: str | |
last_query_used: str = "" | |
# --- Dynamic Model and Attribution Management --- | |
current_llm = None | |
current_attr = None | |
current_model_path = None | |
current_explanation_level = None | |
current_api_key = None | |
def initialize_model_and_attr(): | |
"""Initialize model and attribution with default configuration""" | |
global current_llm, current_attr, current_model_path, current_explanation_level, current_api_key | |
try: | |
# Check if we need to reinitialize the model | |
need_model_update = (current_llm is None or | |
current_model_path != DEFAULT_MODEL_PATH or | |
current_api_key != os.getenv("HF_TOKEN")) | |
# Check if we need to update attribution | |
need_attr_update = (current_attr is None or | |
current_explanation_level != DEFAULT_EXPLANATION_LEVEL or | |
need_model_update) | |
if need_model_update: | |
print(f"Initializing model: {DEFAULT_MODEL_PATH}") | |
effective_api_key = os.getenv("HF_TOKEN") | |
current_llm = create_model(model_path=DEFAULT_MODEL_PATH, api_key=effective_api_key, device="cuda") | |
current_model_path = DEFAULT_MODEL_PATH | |
current_api_key = effective_api_key | |
if need_attr_update: | |
print(f"Initializing context traceback with explanation level: {DEFAULT_EXPLANATION_LEVEL}") | |
current_attr = AttnTraceAttribution( | |
current_llm, | |
explanation_level=DEFAULT_EXPLANATION_LEVEL, | |
K=3, | |
q=0.4, | |
B=30 | |
) | |
current_explanation_level = DEFAULT_EXPLANATION_LEVEL | |
return current_llm, current_attr, None | |
except Exception as e: | |
error_msg = f"Error initializing model/traceback: {str(e)}" | |
print(error_msg) | |
traceback.print_exc() | |
return None, None, error_msg | |
# Initialize with defaults | |
initialize_model_and_attr() | |
# Images replaced with CSS textures and gradients - no longer needed | |
def clear_state(): | |
return State( | |
workflow_state=WorkflowState.WAITING_TO_GENERATE, | |
context="", | |
query="", | |
response="", | |
start_index=0, | |
end_index=0, | |
scores=np.array([]), | |
answer="", | |
highlighted_context="", | |
full_response="", | |
explained_response_part="", | |
last_query_used="" | |
) | |
def load_an_example(example_loader_func, state: State): | |
context, query = example_loader_func() | |
# Update both UI and state | |
state.context = context | |
state.query = query | |
state.workflow_state = WorkflowState.WAITING_TO_GENERATE | |
# Clear previous results | |
state.response = "" | |
state.answer = "" | |
state.full_response = "" | |
state.explained_response_part = "" | |
print(f"Loaded example - Context: {len(context)} chars, Query: {query[:50]}...") | |
return ( | |
context, # basic_context_box | |
query, # basic_query_box | |
state, | |
"", # response_input_box - clear it | |
gr.update(value=[("Click the 'Generate/Use Response' button above to see response text here for traceback analysis.", None)]), # basic_response_box - keep visible | |
gr.update(selected=0) # basic_context_tabs - switch to first tab | |
) | |
def get_max_tokens(model_path: str): | |
return MAX_TOKENS.get(model_path, 2048) # Default fallback | |
def get_scroll_js_code(elem_id): | |
return f""" | |
function scrollToElement() {{ | |
const element = document.getElementById("{elem_id}"); | |
element.scrollIntoView({{ behavior: "smooth", block: "nearest" }}); | |
}} | |
""" | |
def basic_update(context: str, query: str, state: State): | |
state.context = context | |
state.query = query | |
state.workflow_state = WorkflowState.WAITING_TO_GENERATE | |
return ( | |
gr.update(value=[("Click the 'Generate/Use Response' button above to see response text here for traceback analysis.", None)]), # basic_response_box - keep visible | |
gr.update(selected=0), # basic_context_tabs - switch to first tab | |
state, | |
) | |
def generate_model_response(state: State): | |
# Validate inputs first with debug info | |
print(f"Validation - Context length: {len(state.context) if state.context else 0}") | |
print(f"Validation - Query: {state.query[:50] if state.query else 'empty'}...") | |
if not state.context or not state.context.strip(): | |
print("❌ Validation failed: No context") | |
return state, gr.update(value=[("❌ Please enter context before generating response! If you just changed configuration, try reloading an example.", None)], visible=True) | |
if not state.query or not state.query.strip(): | |
print("❌ Validation failed: No query") | |
return state, gr.update(value=[("❌ Please enter a query before generating response! If you just changed configuration, try reloading an example.", None)], visible=True) | |
# Initialize model and attribution with default configuration | |
print(f"🔧 Generating response with explanation_level: {DEFAULT_EXPLANATION_LEVEL}") | |
llm, attr, error_msg = initialize_model_and_attr() | |
if llm is None or attr is None: | |
error_text = error_msg if error_msg else "Model initialization failed!" | |
return state, gr.update(value=[(f"❌ {error_text}", None)], visible=True) | |
prompt = wrap_prompt(state.query, [state.context]) | |
print(f"Generated prompt for {DEFAULT_MODEL_PATH}: {prompt[:200]}...") # Debug log | |
# Check context length | |
if len(prompt.split()) > get_max_tokens(DEFAULT_MODEL_PATH) - 512: | |
return state, gr.update(value=[(GENERATE_CONTEXT_TOO_LONG_TEXT, None)], visible=True) | |
answer = llm.query(prompt) | |
print(f"Model response: {answer}") # Debug log | |
state.response = answer | |
state.answer = answer | |
state.full_response = answer | |
state.workflow_state = WorkflowState.WAITING_TO_SELECT | |
return state, gr.update(visible=False) | |
def split_into_sentences(text: str): | |
lines = text.splitlines() | |
sentences = [] | |
for line in lines: | |
sentences.extend(nltk.sent_tokenize(line)) | |
separators = [] | |
cur_start = 0 | |
for sentence in sentences: | |
cur_end = text.find(sentence, cur_start) | |
separators.append(text[cur_start:cur_end]) | |
cur_start = cur_end + len(sentence) | |
return sentences, separators | |
def basic_highlight_response( | |
response: str, selected_index: int, num_sources: int = -1 | |
): | |
sentences, separators = split_into_sentences(response) | |
ht = [] | |
if num_sources == -1: | |
citations_text = "Traceback!" | |
elif num_sources == 0: | |
citations_text = "No important text!" | |
else: | |
citations_text = f"[{','.join(str(i) for i in range(1, num_sources + 1))}]" | |
for i, (sentence, separator) in enumerate(zip(sentences, separators)): | |
label = citations_text if i == selected_index else "Traceback" | |
# Hack to ignore punctuation | |
if len(sentence) >= 4: | |
ht.append((separator + sentence, label)) | |
else: | |
ht.append((separator + sentence, None)) | |
color_map = {"Click to cite!": "blue", citations_text: "yellow"} | |
return gr.HighlightedText(value=ht, color_map=color_map) | |
def basic_highlight_response_with_visibility( | |
response: str, selected_index: int, num_sources: int = -1, visible: bool = True | |
): | |
"""Version of basic_highlight_response that also sets visibility""" | |
sentences, separators = split_into_sentences(response) | |
ht = [] | |
if num_sources == -1: | |
citations_text = "Traceback!" | |
elif num_sources == 0: | |
citations_text = "No important text!" | |
else: | |
citations_text = f"[{','.join(str(i) for i in range(1, num_sources + 1))}]" | |
for i, (sentence, separator) in enumerate(zip(sentences, separators)): | |
label = citations_text if i == selected_index else "Traceback" | |
# Hack to ignore punctuation | |
if len(sentence) >= 4: | |
ht.append((separator + sentence, label)) | |
else: | |
ht.append((separator + sentence, None)) | |
color_map = {"Click to cite!": "blue", citations_text: "yellow"} | |
return gr.update(value=ht, color_map=color_map, visible=visible) | |
def basic_update_highlighted_response(evt: gr.SelectData, state: State): | |
response_update = basic_highlight_response(state.response, evt.index) | |
return response_update, state | |
def unified_response_handler(response_text: str, state: State): | |
"""Handle both LLM generation and manual input based on whether text is provided""" | |
# Check if instruction has changed from what was used to generate current response | |
instruction_changed = hasattr(state, 'last_query_used') and state.last_query_used != state.query | |
# If response_text is empty, whitespace, or instruction changed, generate from LLM | |
if not response_text or not response_text.strip() or instruction_changed: | |
if instruction_changed: | |
print("📝 Instruction changed, generating new response from LLM...") | |
else: | |
print("🤖 Generating response from LLM...") | |
# Validate inputs first | |
if not state.context or not state.context.strip(): | |
return ( | |
state, | |
response_text, # Keep current text box content | |
gr.update(visible=False), # Keep response box hidden | |
gr.update(value=[("❌ Please enter context before generating response!", None)], visible=True) | |
) | |
if not state.query or not state.query.strip(): | |
return ( | |
state, | |
response_text, # Keep current text box content | |
gr.update(visible=False), # Keep response box hidden | |
gr.update(value=[("❌ Please enter a query before generating response!", None)], visible=True) | |
) | |
# Initialize model and generate response | |
llm, attr, error_msg = initialize_model_and_attr() | |
if llm is None: | |
error_text = error_msg if error_msg else "Model initialization failed!" | |
return ( | |
state, | |
response_text, # Keep current text box content | |
gr.update(visible=False), # Keep response box hidden | |
gr.update(value=[(f"❌ {error_text}", None)], visible=True) | |
) | |
prompt = wrap_prompt(state.query, [state.context]) | |
# Check context length | |
if len(prompt.split()) > get_max_tokens(DEFAULT_MODEL_PATH) - 512: | |
return ( | |
state, | |
response_text, # Keep current text box content | |
gr.update(visible=False), # Keep response box hidden | |
gr.update(value=[(GENERATE_CONTEXT_TOO_LONG_TEXT, None)], visible=True) | |
) | |
# Generate response | |
answer = llm.query(prompt) | |
print(f"Generated response: {answer[:100]}...") | |
# Update state and UI | |
state.response = answer | |
state.answer = answer | |
state.full_response = answer | |
state.last_query_used = state.query # Track which query was used for this response | |
state.workflow_state = WorkflowState.WAITING_TO_SELECT | |
# Create highlighted response and show it | |
response_update = basic_highlight_response_with_visibility(state.response, -1, visible=True) | |
return ( | |
state, | |
answer, # Put generated response in text box | |
response_update, # Update clickable response content | |
gr.update(visible=False) # Hide error box | |
) | |
else: | |
# Use provided text as manual response | |
print("✏️ Using manual response...") | |
manual_text = response_text.strip() | |
# Update state with manual response | |
state.response = manual_text | |
state.answer = manual_text | |
state.full_response = manual_text | |
state.last_query_used = state.query # Track current query for this response | |
state.workflow_state = WorkflowState.WAITING_TO_SELECT | |
# Create highlighted response for selection | |
response_update = basic_highlight_response_with_visibility(state.response, -1, visible=True) | |
return ( | |
state, | |
manual_text, # Keep text in text box | |
response_update, # Update clickable response content | |
gr.update(visible=False) # Hide error box | |
) | |
def get_color_by_rank(rank, total_items): | |
"""Get color based purely on rank position for better visual distinction""" | |
if total_items == 0: | |
return "#F0F0F0", "rgba(240, 240, 240, 0.8)" | |
# Pure ranking-based color assignment for clear visual hierarchy | |
if rank == 1: # Highest importance - Strong Red | |
bg_color = "#FF4444" # Bright red | |
rgba_color = "rgba(255, 68, 68, 0.9)" | |
elif rank == 2: # Second highest - Orange | |
bg_color = "#FF8C42" # Bright orange | |
rgba_color = "rgba(255, 140, 66, 0.8)" | |
elif rank == 3: # Third highest - Golden Yellow | |
bg_color = "#FFD93D" # Golden yellow | |
rgba_color = "rgba(255, 217, 61, 0.8)" | |
elif rank <= 5: # 4th-5th - Light Yellow | |
bg_color = "#FFF280" # Standard yellow | |
rgba_color = "rgba(255, 242, 128, 0.7)" | |
else: # Lower importance - Very Light Yellow | |
bg_color = "#FFF9C4" # Very light yellow | |
rgba_color = "rgba(255, 249, 196, 0.6)" | |
return bg_color, rgba_color | |
def basic_get_scores_and_sources_full_response(state: State): | |
"""Traceback the entire response instead of a selected segment""" | |
# Use the entire response as the explained part | |
state.explained_response_part = state.full_response | |
# Attribution using default configuration | |
_, attr, error_msg = initialize_model_and_attr() | |
if attr is None: | |
error_text = error_msg if error_msg else "Traceback initialization failed!" | |
return ( | |
gr.update(value=[("", None)], visible=False), | |
gr.update(selected=0), | |
gr.update(visible=False), | |
gr.update(value=""), | |
gr.update(value=[(f"❌ {error_text}", None)], visible=True), | |
state, | |
) | |
try: | |
# Validate attribution inputs | |
if not state.context or not state.context.strip(): | |
return ( | |
gr.update(value=[("", None)], visible=False), | |
gr.update(selected=0), | |
gr.update(visible=False), | |
gr.update(value=""), | |
gr.update(value=[("❌ No context available for traceback!", None)], visible=True), | |
state, | |
) | |
if not state.query or not state.query.strip(): | |
return ( | |
gr.update(value=[("", None)], visible=False), | |
gr.update(selected=0), | |
gr.update(visible=False), | |
gr.update(value=""), | |
gr.update(value=[("❌ No query available for traceback!", None)], visible=True), | |
state, | |
) | |
if not state.full_response or not state.full_response.strip(): | |
return ( | |
gr.update(value=[("", None)], visible=False), | |
gr.update(selected=0), | |
gr.update(visible=False), | |
gr.update(value=""), | |
gr.update(value=[("❌ No response available for traceback!", None)], visible=True), | |
state, | |
) | |
print(f"start full response traceback with explanation_level: {DEFAULT_EXPLANATION_LEVEL}") | |
print(f"context length: {len(state.context)}, query: {state.query[:100]}...") | |
print(f"full response: {state.full_response[:100]}...") | |
print(f"tracing entire response (length: {len(state.full_response)} chars)") | |
texts, important_ids, importance_scores, _, _ = attr.attribute( | |
state.query, [state.context], state.full_response, state.full_response | |
) | |
print("end full response traceback") | |
print(f"explanation_level: {DEFAULT_EXPLANATION_LEVEL}") | |
print(f"texts count: {len(texts)} (how context was segmented)") | |
if len(texts) > 0: | |
print(f"sample text segments: {[text[:50] + '...' if len(text) > 50 else text for text in texts[:3]]}") | |
print(f"important_ids: {important_ids}") | |
print("importance_scores: ", importance_scores) | |
if not importance_scores: | |
return ( | |
gr.update(value=[("", None)], visible=False), | |
gr.update(selected=0), | |
gr.update(visible=False), | |
gr.update(value=""), | |
gr.update(value=[("❌ No traceback scores generated for full response!", None)], visible=True), | |
state, | |
) | |
state.scores = np.array(importance_scores) | |
# Highlighted sources with ranking-based colors | |
highlighted_text = [] | |
sorted_indices = np.argsort(state.scores)[::-1] | |
total_sources = len(important_ids) | |
for rank, i in enumerate(sorted_indices): | |
source_text = texts[important_ids[i]] | |
_ = get_color_by_rank(rank + 1, total_sources) | |
highlighted_text.append( | |
( | |
source_text, | |
f"rank_{rank+1}", | |
) | |
) | |
# In-context highlights with ranking-based colors - show ALL text | |
in_context_highlighted_text = [] | |
ranks = {important_ids[i]: rank for rank, i in enumerate(sorted_indices)} | |
for i in range(len(texts)): | |
source_text = texts[i] | |
# Skip or don't highlight segments that are only newlines or whitespace | |
if source_text.strip() == "": | |
# For whitespace-only segments, add them without highlighting | |
in_context_highlighted_text.append((source_text, None)) | |
elif i in important_ids: | |
# Only highlight if the segment has actual content (not just newlines) | |
if source_text.strip(): # Has non-whitespace content | |
rank = ranks[i] + 1 | |
# Split the segment to separate leading/trailing newlines from content | |
# This prevents newlines from being highlighted | |
leading_whitespace = "" | |
trailing_whitespace = "" | |
content = source_text | |
# Extract leading newlines/whitespace | |
while content and content[0] in ['\n', '\r', '\t', ' ']: | |
leading_whitespace += content[0] | |
content = content[1:] | |
# Extract trailing newlines/whitespace | |
while content and content[-1] in ['\n', '\r', '\t', ' ']: | |
trailing_whitespace = content[-1] + trailing_whitespace | |
content = content[:-1] | |
# Add the parts separately: whitespace unhighlighted, content highlighted | |
if leading_whitespace: | |
in_context_highlighted_text.append((leading_whitespace, None)) | |
if content: | |
in_context_highlighted_text.append((content, f"rank_{rank}")) | |
if trailing_whitespace: | |
in_context_highlighted_text.append((trailing_whitespace, None)) | |
else: | |
# Even if marked as important, don't highlight whitespace-only segments | |
in_context_highlighted_text.append((source_text, None)) | |
else: | |
# Add unhighlighted text for non-important segments | |
in_context_highlighted_text.append((source_text, None)) | |
# Enhanced color map with ranking-based colors | |
color_map = {} | |
for rank in range(len(important_ids)): | |
_, rgba_color = get_color_by_rank(rank + 1, total_sources) | |
color_map[f"rank_{rank+1}"] = rgba_color | |
dummy_update = gr.update( | |
value=f"AttnTrace_{state.response}_{state.start_index}_{state.end_index}" | |
) | |
attribute_error_update = gr.update(visible=False) | |
# Combine sources and highlighted context into a single display | |
# Sources at the top | |
combined_display = [] | |
# Add sources header (no highlighting for UI elements) | |
combined_display.append(("═══ FULL RESPONSE TRACEBACK RESULTS ═══\n", None)) | |
combined_display.append(("These are the text segments that contribute most to the entire response:\n\n", None)) | |
# Add sources using available data | |
for rank, i in enumerate(sorted_indices): | |
if i < len(important_ids): | |
source_text = texts[important_ids[i]] | |
# Strip leading/trailing whitespace from source text to avoid highlighting newlines | |
clean_source_text = source_text.strip() | |
if clean_source_text: # Only add if there's actual content | |
# Add the source text with highlighting, then add spacing without highlighting | |
combined_display.append((clean_source_text, f"rank_{rank+1}")) | |
combined_display.append(("\n\n", None)) | |
# Add separator (no highlighting for UI elements) | |
combined_display.append(("\n" + "═"*50 + "\n", None)) | |
combined_display.append(("FULL CONTEXT WITH HIGHLIGHTS\n", None)) | |
combined_display.append(("Scroll down to see the complete context with important segments highlighted:\n\n", None)) | |
# Add highlighted context using in_context_highlighted_text | |
combined_display.extend(in_context_highlighted_text) | |
# Use only the ranking colors (no highlighting for UI elements) | |
enhanced_color_map = color_map.copy() | |
combined_sources_update = HighlightedTextbox( | |
value=combined_display, color_map=enhanced_color_map, visible=True | |
) | |
# Switch to the highlighted context tab and show results | |
basic_context_tabs_update = gr.update(selected=1) | |
basic_sources_in_context_tab_update = gr.update(visible=True) | |
return ( | |
combined_sources_update, | |
basic_context_tabs_update, | |
basic_sources_in_context_tab_update, | |
dummy_update, | |
attribute_error_update, | |
state, | |
) | |
except Exception as e: | |
traceback.print_exc() | |
return ( | |
gr.update(value=[("", None)], visible=False), | |
gr.update(selected=0), | |
gr.update(visible=False), | |
gr.update(value=""), | |
gr.update(value=[(f"❌ Error: {str(e)}", None)], visible=True), | |
state, | |
) | |
def basic_get_scores_and_sources( | |
evt: gr.SelectData, | |
highlighted_response: List[Dict[str, str]], | |
state: State, | |
): | |
# Get the selected sentence | |
print("highlighted_response: ", highlighted_response[evt.index]) | |
selected_text = highlighted_response[evt.index]['token'] | |
state.explained_response_part = selected_text | |
# Attribution using default configuration | |
_, attr, error_msg = initialize_model_and_attr() | |
if attr is None: | |
error_text = error_msg if error_msg else "Traceback initialization failed!" | |
return ( | |
gr.update(value=[("", None)], visible=False), | |
gr.update(selected=0), | |
gr.update(visible=False), | |
gr.update(value=""), | |
gr.update(value=[(f"❌ {error_text}", None)], visible=True), | |
state, | |
) | |
try: | |
# Validate attribution inputs | |
if not state.context or not state.context.strip(): | |
return ( | |
gr.update(value=[("", None)], visible=False), | |
gr.update(selected=0), | |
gr.update(visible=False), | |
gr.update(value=""), | |
gr.update(value=[("❌ No context available for traceback!", None)], visible=True), | |
state, | |
) | |
if not state.query or not state.query.strip(): | |
return ( | |
gr.update(value=[("", None)], visible=False), | |
gr.update(selected=0), | |
gr.update(visible=False), | |
gr.update(value=""), | |
gr.update(value=[("❌ No query available for traceback!", None)], visible=True), | |
state, | |
) | |
if not state.full_response or not state.full_response.strip(): | |
return ( | |
gr.update(value=[("", None)], visible=False), | |
gr.update(selected=0), | |
gr.update(visible=False), | |
gr.update(value=""), | |
gr.update(value=[("❌ No response available for traceback!", None)], visible=True), | |
state, | |
) | |
print(f"start traceback with explanation_level: {DEFAULT_EXPLANATION_LEVEL}") | |
print(f"context length: {len(state.context)}, query: {state.query[:100]}...") | |
print(f"response: {state.full_response[:100]}...") | |
print(f"selected part: {state.explained_response_part[:100]}...") | |
texts, important_ids, importance_scores, _, _ = attr.attribute( | |
state.query, [state.context], state.full_response, state.explained_response_part | |
) | |
print("end traceback") | |
print(f"explanation_level: {DEFAULT_EXPLANATION_LEVEL}") | |
print(f"texts count: {len(texts)} (how context was segmented)") | |
if len(texts) > 0: | |
print(f"sample text segments: {[text[:50] + '...' if len(text) > 50 else text for text in texts[:3]]}") | |
print(f"important_ids: {important_ids}") | |
print("importance_scores: ", importance_scores) | |
if not importance_scores: | |
return ( | |
gr.update(value=[("", None)], visible=False), | |
gr.update(selected=0), | |
gr.update(visible=False), | |
gr.update(value=""), | |
gr.update(value=[("❌ No traceback scores generated! Try a different text segment.", None)], visible=True), | |
state, | |
) | |
state.scores = np.array(importance_scores) | |
# Highlighted sources with ranking-based colors | |
highlighted_text = [] | |
sorted_indices = np.argsort(state.scores)[::-1] | |
total_sources = len(important_ids) | |
for rank, i in enumerate(sorted_indices): | |
source_text = texts[important_ids[i]] | |
_ = get_color_by_rank(rank + 1, total_sources) | |
highlighted_text.append( | |
( | |
source_text, | |
f"rank_{rank+1}", | |
) | |
) | |
# In-context highlights with ranking-based colors - show ALL text | |
in_context_highlighted_text = [] | |
ranks = {important_ids[i]: rank for rank, i in enumerate(sorted_indices)} | |
for i in range(len(texts)): | |
source_text = texts[i] | |
# Skip or don't highlight segments that are only newlines or whitespace | |
if source_text.strip() == "": | |
# For whitespace-only segments, add them without highlighting | |
in_context_highlighted_text.append((source_text, None)) | |
elif i in important_ids: | |
# Only highlight if the segment has actual content (not just newlines) | |
if source_text.strip(): # Has non-whitespace content | |
rank = ranks[i] + 1 | |
# Split the segment to separate leading/trailing newlines from content | |
# This prevents newlines from being highlighted | |
leading_whitespace = "" | |
trailing_whitespace = "" | |
content = source_text | |
# Extract leading newlines/whitespace | |
while content and content[0] in ['\n', '\r', '\t', ' ']: | |
leading_whitespace += content[0] | |
content = content[1:] | |
# Extract trailing newlines/whitespace | |
while content and content[-1] in ['\n', '\r', '\t', ' ']: | |
trailing_whitespace = content[-1] + trailing_whitespace | |
content = content[:-1] | |
# Add the parts separately: whitespace unhighlighted, content highlighted | |
if leading_whitespace: | |
in_context_highlighted_text.append((leading_whitespace, None)) | |
if content: | |
in_context_highlighted_text.append((content, f"rank_{rank}")) | |
if trailing_whitespace: | |
in_context_highlighted_text.append((trailing_whitespace, None)) | |
else: | |
# Even if marked as important, don't highlight whitespace-only segments | |
in_context_highlighted_text.append((source_text, None)) | |
else: | |
# Add unhighlighted text for non-important segments | |
in_context_highlighted_text.append((source_text, None)) | |
# Enhanced color map with ranking-based colors | |
color_map = {} | |
for rank in range(len(important_ids)): | |
_, rgba_color = get_color_by_rank(rank + 1, total_sources) | |
color_map[f"rank_{rank+1}"] = rgba_color | |
dummy_update = gr.update( | |
value=f"AttnTrace_{state.response}_{state.start_index}_{state.end_index}" | |
) | |
attribute_error_update = gr.update(visible=False) | |
# Combine sources and highlighted context into a single display | |
# Sources at the top | |
combined_display = [] | |
# Add sources header (no highlighting for UI elements) | |
combined_display.append(("═══ TRACEBACK RESULTS ═══\n", None)) | |
combined_display.append(("These are the text segments that contribute most to the response:\n\n", None)) | |
# Add sources using available data | |
for rank, i in enumerate(sorted_indices): | |
if i < len(important_ids): | |
source_text = texts[important_ids[i]] | |
# Strip leading/trailing whitespace from source text to avoid highlighting newlines | |
clean_source_text = source_text.strip() | |
if clean_source_text: # Only add if there's actual content | |
# Add the source text with highlighting, then add spacing without highlighting | |
combined_display.append((clean_source_text, f"rank_{rank+1}")) | |
combined_display.append(("\n\n", None)) | |
# Add separator (no highlighting for UI elements) | |
combined_display.append(("\n" + "═"*50 + "\n", None)) | |
combined_display.append(("FULL CONTEXT WITH HIGHLIGHTS\n", None)) | |
combined_display.append(("Scroll down to see the complete context with important segments highlighted:\n\n", None)) | |
# Add highlighted context using in_context_highlighted_text | |
combined_display.extend(in_context_highlighted_text) | |
# Use only the ranking colors (no highlighting for UI elements) | |
enhanced_color_map = color_map.copy() | |
combined_sources_update = HighlightedTextbox( | |
value=combined_display, color_map=enhanced_color_map, visible=True | |
) | |
# Switch to the highlighted context tab and show results | |
basic_context_tabs_update = gr.update(selected=1) | |
basic_sources_in_context_tab_update = gr.update(visible=True) | |
return ( | |
combined_sources_update, | |
basic_context_tabs_update, | |
basic_sources_in_context_tab_update, | |
dummy_update, | |
attribute_error_update, | |
state, | |
) | |
except Exception as e: | |
traceback.print_exc() | |
return ( | |
gr.update(value=[("", None)], visible=False), | |
gr.update(selected=0), | |
gr.update(visible=False), | |
gr.update(value=""), | |
gr.update(value=[(f"❌ Error: {str(e)}", None)], visible=True), | |
state, | |
) | |
def load_custom_css(): | |
"""Load CSS from external file""" | |
try: | |
with open("assets/app_styles.css", "r") as f: | |
css_content = f.read() | |
return css_content | |
except FileNotFoundError: | |
print("Warning: CSS file not found, using minimal CSS") | |
return "" | |
except Exception as e: | |
print(f"Error loading CSS: {e}") | |
return "" | |
# Load CSS from external file | |
custom_css = load_custom_css() | |
theme = gr.themes.Citrus( | |
text_size="lg", | |
spacing_size="md", | |
) | |
with gr.Blocks(theme=theme, css=custom_css) as demo: | |
gr.Markdown(f"# {APP_TITLE}") | |
gr.Markdown(APP_DESCRIPTION, elem_classes="app-description") | |
# gr.Markdown(NEW_TEXT, elem_classes="app-description-2") | |
gr.Markdown(""" | |
<div style="font-size: 18px;"> | |
AttnTrace is an efficient context traceback method for long contexts (e.g., full papers). It is over 15× faster than the state-of-the-art context traceback method TracLLM. Compared to previous attention-based approaches, AttnTrace is more accurate, reliable, and memory-efficient. | |
""", elem_classes="feature-highlights") | |
# Feature highlights | |
gr.Markdown(""" | |
<div style="font-size: 18px;"> | |
AttnTrace can be used in many real-world applications, such as tracing back to: | |
- 📄 prompt injection instructions that manipulate LLM-generated paper reviews. | |
- 💻 malicious comment & code hiding in the codebase that misleads the AI coding assistant. | |
- 🤖 malicious instructions that mislead the action of the LLM agent. | |
- 🖋 source texts in the context from an AI summary. | |
- 🔍 evidence that supports the LLM-generated answer for a question. | |
- ❌ misinformation (corrupted knowledge) that manipulates LLM output for a question. | |
- And a lot more... | |
</div> | |
""", elem_classes="feature-highlights") | |
# Example buttons with topic-relevant images - moved here for better positioning | |
gr.Markdown("### 🚀 Try These Examples!", elem_classes="example-title") | |
with gr.Row(elem_classes=["example-button-container"]): | |
with gr.Column(scale=1): | |
example_1_btn = gr.Button( | |
"📄 Prompt Injection Attacks in AI Paper Review", | |
elem_classes=["example-button", "example-paper"], | |
elem_id="example_1_button", | |
scale=None, | |
size="sm" | |
) | |
with gr.Column(scale=1): | |
example_2_btn = gr.Button( | |
"💻 Malicious Comments & Code in Codebase", | |
elem_classes=["example-button", "example-movie"], | |
elem_id="example_2_button" | |
) | |
with gr.Column(scale=1): | |
example_3_btn = gr.Button( | |
"🤖 Malicious Instructions Misleading the LLM Agent", | |
elem_classes=["example-button", "example-code"], | |
elem_id="example_3_button" | |
) | |
with gr.Row(elem_classes=["example-button-container"]): | |
with gr.Column(scale=1): | |
example_4_btn = gr.Button( | |
"🖋 Source Texts for an AI Summary", | |
elem_classes=["example-button", "example-paper-alt"], | |
elem_id="example_4_button" | |
) | |
with gr.Column(scale=1): | |
example_5_btn = gr.Button( | |
"🔍 Evidence that Support Question Answering", | |
elem_classes=["example-button", "example-movie-alt"], | |
elem_id="example_5_button" | |
) | |
with gr.Column(scale=1): | |
example_6_btn = gr.Button( | |
"❌ Misinformation (Corrupted Knowledge) in Question Answering", | |
elem_classes=["example-button", "example-code-alt"], | |
elem_id="example_6_button" | |
) | |
state = gr.State( | |
value=clear_state() | |
) | |
basic_tab = gr.Tab("Demo") | |
with basic_tab: | |
# gr.Markdown("## Demo") | |
gr.Markdown( | |
"Enter your context and instruction below to try out AttnTrace! You can also click on the example buttons above to load pre-configured examples." | |
) | |
gr.Markdown( | |
'**Color Legend for Context Traceback (by ranking):** <span style="background-color: #FF4444; color: black; padding: 2px 6px; border-radius: 4px; font-weight: 600;">Red</span> = 1st (most important) | <span style="background-color: #FF8C42; color: black; padding: 2px 6px; border-radius: 4px; font-weight: 600;">Orange</span> = 2nd | <span style="background-color: #FFD93D; color: black; padding: 2px 6px; border-radius: 4px; font-weight: 600;">Golden</span> = 3rd | <span style="background-color: #FFF280; color: black; padding: 2px 6px; border-radius: 4px; font-weight: 600;">Yellow</span> = 4th-5th | <span style="background-color: #FFF9C4; color: black; padding: 2px 6px; border-radius: 4px; font-weight: 600;">Light</span> = 6th+' | |
) | |
# Top section: Wide Context box with tabs | |
with gr.Row(): | |
with gr.Column(scale=1): | |
with gr.Tabs() as basic_context_tabs: | |
with gr.TabItem("Context", id=0): | |
basic_context_box = gr.Textbox( | |
placeholder="Enter context...", | |
show_label=False, | |
value="", | |
lines=6, | |
max_lines=6, | |
elem_id="basic_context_box", | |
autoscroll=False, | |
) | |
with gr.TabItem("Context with highlighted traceback results", id=1, visible=True) as basic_sources_in_context_tab: | |
basic_sources_in_context_box = HighlightedTextbox( | |
value=[("Click on a sentence in the response below to see highlighted traceback results here.", None)], | |
show_legend_label=False, | |
show_label=False, | |
show_legend=False, | |
interactive=False, | |
elem_id="basic_sources_in_context_box", | |
) | |
# Error messages | |
basic_generate_error_box = HighlightedTextbox( | |
show_legend_label=False, | |
show_label=False, | |
show_legend=False, | |
visible=False, | |
interactive=False, | |
container=False, | |
) | |
# Bottom section: Left (instruction + button + response), Right (response selection) | |
with gr.Row(equal_height=True): | |
# Left: Instruction + Button + Response | |
with gr.Column(scale=1): | |
basic_query_box = gr.Textbox( | |
label="Instruction", | |
placeholder="Enter an instruction...", | |
value="", | |
lines=3, | |
max_lines=3, | |
) | |
unified_response_button = gr.Button( | |
"Generate/Use Response", | |
variant="primary", | |
size="lg" | |
) | |
response_input_box = gr.Textbox( | |
label="Response (Editable)", | |
placeholder="Response will appear here after generation, or type your own response for traceback...", | |
lines=8, | |
max_lines=8, | |
info="Leave empty and click button to generate from LLM, or type your own response to use for traceback" | |
) | |
# Right: Response for attribution selection | |
with gr.Column(scale=1): | |
basic_response_box = gr.HighlightedText( | |
label="Click to select text for traceback!", | |
value=[("Click the 'Generate/Use Response' button on the left to see response text here for traceback analysis.", None)], | |
interactive=False, | |
combine_adjacent=False, | |
show_label=True, | |
show_legend=False, | |
elem_id="basic_response_box", | |
visible=True, | |
) | |
# Button for full response traceback | |
full_response_traceback_button = gr.Button( | |
"🔍 Traceback Entire Response", | |
variant="secondary", | |
size="sm" | |
) | |
# Hidden error box and dummy elements | |
basic_attribute_error_box = HighlightedTextbox( | |
show_legend_label=False, | |
show_label=False, | |
show_legend=False, | |
visible=False, | |
interactive=False, | |
container=False, | |
) | |
dummy_basic_sources_box = gr.Textbox( | |
visible=False, interactive=False, container=False | |
) | |
# Only a single (AttnTrace) method and model in this simplified version | |
def basic_clear_state(): | |
state = clear_state() | |
return ( | |
"", # basic_context_box | |
"", # basic_query_box | |
"", # response_input_box | |
gr.update(value=[("Click the 'Generate/Use Response' button above to see response text here for traceback analysis.", None)]), # basic_response_box - keep visible | |
gr.update(selected=0), # basic_context_tabs - switch to first tab | |
state, | |
) | |
# Defining behavior of various interactions for the basic tab | |
basic_tab.select( | |
fn=basic_clear_state, | |
inputs=[], | |
outputs=[ | |
basic_context_box, | |
basic_query_box, | |
response_input_box, | |
basic_response_box, | |
basic_context_tabs, | |
state, | |
], | |
) | |
for component in [basic_context_box, basic_query_box]: | |
component.change( | |
basic_update, | |
[basic_context_box, basic_query_box, state], | |
[ | |
basic_response_box, | |
basic_context_tabs, | |
state, | |
], | |
) | |
# Example button event handlers - now update both UI and state | |
outputs_for_examples = [ | |
basic_context_box, | |
basic_query_box, | |
state, | |
response_input_box, | |
basic_response_box, | |
basic_context_tabs, | |
] | |
example_1_btn.click( | |
fn=partial(load_an_example, run_example_1), | |
inputs=[state], | |
outputs=outputs_for_examples | |
) | |
example_2_btn.click( | |
fn=partial(load_an_example, run_example_2), | |
inputs=[state], | |
outputs=outputs_for_examples | |
) | |
example_3_btn.click( | |
fn=partial(load_an_example, run_example_3), | |
inputs=[state], | |
outputs=outputs_for_examples | |
) | |
example_4_btn.click( | |
fn=partial(load_an_example, run_example_4), | |
inputs=[state], | |
outputs=outputs_for_examples | |
) | |
example_5_btn.click( | |
fn=partial(load_an_example, run_example_5), | |
inputs=[state], | |
outputs=outputs_for_examples | |
) | |
example_6_btn.click( | |
fn=partial(load_an_example, run_example_6), | |
inputs=[state], | |
outputs=outputs_for_examples | |
) | |
unified_response_button.click( | |
fn=lambda: None, | |
inputs=[], | |
outputs=[], | |
js=get_scroll_js_code("basic_response_box"), | |
) | |
basic_response_box.change( | |
fn=lambda: None, | |
inputs=[], | |
outputs=[], | |
js=get_scroll_js_code("basic_sources_in_context_box"), | |
) | |
# Add immediate tab switch on response selection | |
def immediate_tab_switch(): | |
return ( | |
gr.update(value=[("🔄 Processing traceback... Please wait...", None)]), # Show progress message | |
gr.update(selected=1), # Switch to annotation tab immediately | |
) | |
basic_response_box.select( | |
fn=immediate_tab_switch, | |
inputs=[], | |
outputs=[basic_sources_in_context_box, basic_context_tabs], | |
queue=False, # Execute immediately without queue | |
) | |
basic_response_box.select( | |
fn=basic_get_scores_and_sources, | |
inputs=[basic_response_box, state], | |
outputs=[ | |
basic_sources_in_context_box, | |
basic_context_tabs, | |
basic_sources_in_context_tab, | |
dummy_basic_sources_box, | |
basic_attribute_error_box, | |
state, | |
], | |
show_progress="full", | |
) | |
basic_response_box.select( | |
fn=basic_update_highlighted_response, | |
inputs=[state], | |
outputs=[basic_response_box, state], | |
) | |
# Full response traceback button | |
full_response_traceback_button.click( | |
fn=immediate_tab_switch, | |
inputs=[], | |
outputs=[basic_sources_in_context_box, basic_context_tabs], | |
queue=False, # Execute immediately without queue | |
) | |
full_response_traceback_button.click( | |
fn=basic_get_scores_and_sources_full_response, | |
inputs=[state], | |
outputs=[ | |
basic_sources_in_context_box, | |
basic_context_tabs, | |
basic_sources_in_context_tab, | |
dummy_basic_sources_box, | |
basic_attribute_error_box, | |
state, | |
], | |
show_progress="full", | |
) | |
dummy_basic_sources_box.change( | |
fn=lambda: None, | |
inputs=[], | |
outputs=[], | |
js=get_scroll_js_code("basic_sources_in_context_box"), | |
) | |
# Unified response handler | |
unified_response_button.click( | |
fn=unified_response_handler, | |
inputs=[response_input_box, state], | |
outputs=[state, response_input_box, basic_response_box, basic_generate_error_box] | |
) | |
# gr.Markdown( | |
# "Please do not interact with elements while generation/attribution is in progress. This may cause errors. You can refresh the page if you run into issues because of this." | |
# ) | |
demo.launch(show_api=False, share=True) | |