AttnTrace / app.py
SecureLLMSys's picture
update
3a7a5c6
raw
history blame
51.6 kB
# Acknowledgement: This demo code is adapted from the original Hugging Face Space "ContextCite"
# (https://huggingface.co/spaces/contextcite/context-cite).
import os
from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Any, Optional
import gradio as gr
import numpy as np
import spaces
import nltk
import base64
import traceback
from src.utils import split_into_sentences as split_into_sentences_utils
# --- AttnTrace imports (from app_full.py) ---
from src.models import create_model
from src.attribution import AttnTraceAttribution
from src.prompts import wrap_prompt
from gradio_highlightedtextbox import HighlightedTextbox
from examples import run_example_1, run_example_2, run_example_3, run_example_4, run_example_5, run_example_6
from functools import partial
# Load original app constants
APP_TITLE = '<div class="app-title"><span class="brand">AttnTrace: </span><span class="subtitle">Attention-based Context Traceback for Long-Context LLMs</span></div>'
APP_DESCRIPTION = """AttnTrace traces a model's generated statements back to specific parts of the context using attention-based traceback. Try it out with Meta-Llama-3.1-8B-Instruct here! See the [[paper](https://arxiv.org/abs/2506.04202)] and [[code](https://github.com/Wang-Yanting/TracLLM-Kit)] for more!
Maintained by the AttnTrace team."""
# NEW_TEXT = """Long-context large language models (LLMs), such as Gemini-2.5-Pro and Claude-Sonnet-4, are increasingly used to empower advanced AI systems, including retrieval-augmented generation (RAG) pipelines and autonomous agents. In these systems, an LLM receives an instruction along with a context—often consisting of texts retrieved from a knowledge database or memory—and generates a response that is contextually grounded by following the instruction. Recent studies have designed solutions to trace back to a subset of texts in the context that contributes most to the response generated by the LLM. These solutions have numerous real-world applications, including performing post-attack forensic analysis and improving the interpretability and trustworthiness of LLM outputs. While significant efforts have been made, state-of-the-art solutions such as TracLLM often lead to a high computation cost, e.g., it takes TracLLM hundreds of seconds to perform traceback for a single response-context pair. In this work, we propose {\name}, a new context traceback method based on the attention weights produced by an LLM for a prompt. To effectively utilize attention weights, we introduce two techniques designed to enhance the effectiveness of {\name}, and we provide theoretical insights for our design choice. %Moreover, we perform both theoretical analysis and empirical evaluation to demonstrate their effectiveness.
# We also perform a systematic evaluation for {\name}. The results demonstrate that {\name} is more accurate and efficient than existing state-of-the-art context traceback methods. We also show {\name} can improve state-of-the-art methods in detecting prompt injection under long contexts through the attribution-before-detection paradigm. As a real-world application, we demonstrate that {\name} can effectively pinpoint injected instructions in a paper designed to manipulate LLM-generated reviews.
# The code and data will be open-sourced. """
# EDIT_TEXT = "Feel free to edit!"
GENERATE_CONTEXT_TOO_LONG_TEXT = (
'<em style="color: red;">Context is too long for the current model.</em>'
)
ATTRIBUTE_CONTEXT_TOO_LONG_TEXT = '<em style="color: red;">Context is too long for the current traceback method.</em>'
CONTEXT_LINES = 20
CONTEXT_MAX_LINES = 40
SELECTION_DEFAULT_TEXT = "Click on a sentence in the response to traceback!"
SELECTION_DEFAULT_VALUE = [(SELECTION_DEFAULT_TEXT, None)]
SOURCES_INFO = 'These are the texts that contribute most to the response.'
# SOURCES_IN_CONTEXT_INFO = (
# "This shows the important sentences highlighted within their surrounding context from the text above. Colors indicate ranking: Red (1st), Orange (2nd), Golden (3rd), Yellow (4th-5th), Light (6th+)."
# )
MODEL_PATHS = [
"meta-llama/Meta-Llama-3.1-8B-Instruct",
]
MAX_TOKENS = {
"meta-llama/Meta-Llama-3.1-8B-Instruct": 131072,
}
DEFAULT_MODEL_PATH = MODEL_PATHS[0]
EXPLANATION_LEVELS = ["sentence", "paragraph", "text segment"]
DEFAULT_EXPLANATION_LEVEL = "sentence"
class WorkflowState(Enum):
WAITING_TO_GENERATE = 0
WAITING_TO_SELECT = 1
READY_TO_ATTRIBUTE = 2
@dataclass
class State:
workflow_state: WorkflowState
context: str
query: str
response: str
start_index: int
end_index: int
scores: np.ndarray
answer: str
highlighted_context: str
full_response: str
explained_response_part: str
last_query_used: str = ""
# --- Dynamic Model and Attribution Management ---
current_llm = None
current_attr = None
current_model_path = None
current_explanation_level = None
current_api_key = None
def initialize_model_and_attr():
"""Initialize model and attribution with default configuration"""
global current_llm, current_attr, current_model_path, current_explanation_level, current_api_key
try:
# Check if we need to reinitialize the model
need_model_update = (current_llm is None or
current_model_path != DEFAULT_MODEL_PATH or
current_api_key != os.getenv("HF_TOKEN"))
# Check if we need to update attribution
need_attr_update = (current_attr is None or
current_explanation_level != DEFAULT_EXPLANATION_LEVEL or
need_model_update)
if need_model_update:
print(f"Initializing model: {DEFAULT_MODEL_PATH}")
effective_api_key = os.getenv("HF_TOKEN")
current_llm = create_model(model_path=DEFAULT_MODEL_PATH, api_key=effective_api_key, device="cuda")
current_model_path = DEFAULT_MODEL_PATH
current_api_key = effective_api_key
if need_attr_update:
print(f"Initializing context traceback with explanation level: {DEFAULT_EXPLANATION_LEVEL}")
current_attr = AttnTraceAttribution(
current_llm,
explanation_level=DEFAULT_EXPLANATION_LEVEL,
K=3,
q=0.4,
B=30
)
current_explanation_level = DEFAULT_EXPLANATION_LEVEL
return current_llm, current_attr, None
except Exception as e:
error_msg = f"Error initializing model/traceback: {str(e)}"
print(error_msg)
traceback.print_exc()
return None, None, error_msg
# Initialize with defaults
initialize_model_and_attr()
# Images replaced with CSS textures and gradients - no longer needed
def clear_state():
return State(
workflow_state=WorkflowState.WAITING_TO_GENERATE,
context="",
query="",
response="",
start_index=0,
end_index=0,
scores=np.array([]),
answer="",
highlighted_context="",
full_response="",
explained_response_part="",
last_query_used=""
)
def load_an_example(example_loader_func, state: State):
context, query = example_loader_func()
# Update both UI and state
state.context = context
state.query = query
state.workflow_state = WorkflowState.WAITING_TO_GENERATE
# Clear previous results
state.response = ""
state.answer = ""
state.full_response = ""
state.explained_response_part = ""
print(f"Loaded example - Context: {len(context)} chars, Query: {query[:50]}...")
return (
context, # basic_context_box
query, # basic_query_box
state,
"", # response_input_box - clear it
gr.update(value=[("Click the 'Generate/Use Response' button above to see response text here for traceback analysis.", None)]), # basic_response_box - keep visible
gr.update(selected=0) # basic_context_tabs - switch to first tab
)
def get_max_tokens(model_path: str):
return MAX_TOKENS.get(model_path, 2048) # Default fallback
def get_scroll_js_code(elem_id):
return f"""
function scrollToElement() {{
const element = document.getElementById("{elem_id}");
element.scrollIntoView({{ behavior: "smooth", block: "nearest" }});
}}
"""
def basic_update(context: str, query: str, state: State):
state.context = context
state.query = query
state.workflow_state = WorkflowState.WAITING_TO_GENERATE
return (
gr.update(value=[("Click the 'Generate/Use Response' button above to see response text here for traceback analysis.", None)]), # basic_response_box - keep visible
gr.update(selected=0), # basic_context_tabs - switch to first tab
state,
)
@spaces.GPU
def generate_model_response(state: State):
# Validate inputs first with debug info
print(f"Validation - Context length: {len(state.context) if state.context else 0}")
print(f"Validation - Query: {state.query[:50] if state.query else 'empty'}...")
if not state.context or not state.context.strip():
print("❌ Validation failed: No context")
return state, gr.update(value=[("❌ Please enter context before generating response! If you just changed configuration, try reloading an example.", None)], visible=True)
if not state.query or not state.query.strip():
print("❌ Validation failed: No query")
return state, gr.update(value=[("❌ Please enter a query before generating response! If you just changed configuration, try reloading an example.", None)], visible=True)
# Initialize model and attribution with default configuration
print(f"🔧 Generating response with explanation_level: {DEFAULT_EXPLANATION_LEVEL}")
llm, attr, error_msg = initialize_model_and_attr()
if llm is None or attr is None:
error_text = error_msg if error_msg else "Model initialization failed!"
return state, gr.update(value=[(f"❌ {error_text}", None)], visible=True)
prompt = wrap_prompt(state.query, [state.context])
print(f"Generated prompt for {DEFAULT_MODEL_PATH}: {prompt[:200]}...") # Debug log
# Check context length
if len(prompt.split()) > get_max_tokens(DEFAULT_MODEL_PATH) - 512:
return state, gr.update(value=[(GENERATE_CONTEXT_TOO_LONG_TEXT, None)], visible=True)
answer = llm.query(prompt)
print(f"Model response: {answer}") # Debug log
state.response = answer
state.answer = answer
state.full_response = answer
state.workflow_state = WorkflowState.WAITING_TO_SELECT
return state, gr.update(visible=False)
def split_into_sentences(text: str):
lines = text.splitlines()
sentences = []
for line in lines:
sentences.extend(nltk.sent_tokenize(line))
separators = []
cur_start = 0
for sentence in sentences:
cur_end = text.find(sentence, cur_start)
separators.append(text[cur_start:cur_end])
cur_start = cur_end + len(sentence)
return sentences, separators
def basic_highlight_response(
response: str, selected_index: int, num_sources: int = -1
):
sentences, separators = split_into_sentences(response)
ht = []
if num_sources == -1:
citations_text = "Traceback!"
elif num_sources == 0:
citations_text = "No important text!"
else:
citations_text = f"[{','.join(str(i) for i in range(1, num_sources + 1))}]"
for i, (sentence, separator) in enumerate(zip(sentences, separators)):
label = citations_text if i == selected_index else "Traceback"
# Hack to ignore punctuation
if len(sentence) >= 4:
ht.append((separator + sentence, label))
else:
ht.append((separator + sentence, None))
color_map = {"Click to cite!": "blue", citations_text: "yellow"}
return gr.HighlightedText(value=ht, color_map=color_map)
def basic_highlight_response_with_visibility(
response: str, selected_index: int, num_sources: int = -1, visible: bool = True
):
"""Version of basic_highlight_response that also sets visibility"""
sentences, separators = split_into_sentences(response)
ht = []
if num_sources == -1:
citations_text = "Traceback!"
elif num_sources == 0:
citations_text = "No important text!"
else:
citations_text = f"[{','.join(str(i) for i in range(1, num_sources + 1))}]"
for i, (sentence, separator) in enumerate(zip(sentences, separators)):
label = citations_text if i == selected_index else "Traceback"
# Hack to ignore punctuation
if len(sentence) >= 4:
ht.append((separator + sentence, label))
else:
ht.append((separator + sentence, None))
color_map = {"Click to cite!": "blue", citations_text: "yellow"}
return gr.update(value=ht, color_map=color_map, visible=visible)
def basic_update_highlighted_response(evt: gr.SelectData, state: State):
response_update = basic_highlight_response(state.response, evt.index)
return response_update, state
def unified_response_handler(response_text: str, state: State):
"""Handle both LLM generation and manual input based on whether text is provided"""
# Check if instruction has changed from what was used to generate current response
instruction_changed = hasattr(state, 'last_query_used') and state.last_query_used != state.query
# If response_text is empty, whitespace, or instruction changed, generate from LLM
if not response_text or not response_text.strip() or instruction_changed:
if instruction_changed:
print("📝 Instruction changed, generating new response from LLM...")
else:
print("🤖 Generating response from LLM...")
# Validate inputs first
if not state.context or not state.context.strip():
return (
state,
response_text, # Keep current text box content
gr.update(visible=False), # Keep response box hidden
gr.update(value=[("❌ Please enter context before generating response!", None)], visible=True)
)
if not state.query or not state.query.strip():
return (
state,
response_text, # Keep current text box content
gr.update(visible=False), # Keep response box hidden
gr.update(value=[("❌ Please enter a query before generating response!", None)], visible=True)
)
# Initialize model and generate response
llm, attr, error_msg = initialize_model_and_attr()
if llm is None:
error_text = error_msg if error_msg else "Model initialization failed!"
return (
state,
response_text, # Keep current text box content
gr.update(visible=False), # Keep response box hidden
gr.update(value=[(f"❌ {error_text}", None)], visible=True)
)
prompt = wrap_prompt(state.query, [state.context])
# Check context length
if len(prompt.split()) > get_max_tokens(DEFAULT_MODEL_PATH) - 512:
return (
state,
response_text, # Keep current text box content
gr.update(visible=False), # Keep response box hidden
gr.update(value=[(GENERATE_CONTEXT_TOO_LONG_TEXT, None)], visible=True)
)
# Generate response
answer = llm.query(prompt)
print(f"Generated response: {answer[:100]}...")
# Update state and UI
state.response = answer
state.answer = answer
state.full_response = answer
state.last_query_used = state.query # Track which query was used for this response
state.workflow_state = WorkflowState.WAITING_TO_SELECT
# Create highlighted response and show it
response_update = basic_highlight_response_with_visibility(state.response, -1, visible=True)
return (
state,
answer, # Put generated response in text box
response_update, # Update clickable response content
gr.update(visible=False) # Hide error box
)
else:
# Use provided text as manual response
print("✏️ Using manual response...")
manual_text = response_text.strip()
# Update state with manual response
state.response = manual_text
state.answer = manual_text
state.full_response = manual_text
state.last_query_used = state.query # Track current query for this response
state.workflow_state = WorkflowState.WAITING_TO_SELECT
# Create highlighted response for selection
response_update = basic_highlight_response_with_visibility(state.response, -1, visible=True)
return (
state,
manual_text, # Keep text in text box
response_update, # Update clickable response content
gr.update(visible=False) # Hide error box
)
def get_color_by_rank(rank, total_items):
"""Get color based purely on rank position for better visual distinction"""
if total_items == 0:
return "#F0F0F0", "rgba(240, 240, 240, 0.8)"
# Pure ranking-based color assignment for clear visual hierarchy
if rank == 1: # Highest importance - Strong Red
bg_color = "#FF4444" # Bright red
rgba_color = "rgba(255, 68, 68, 0.9)"
elif rank == 2: # Second highest - Orange
bg_color = "#FF8C42" # Bright orange
rgba_color = "rgba(255, 140, 66, 0.8)"
elif rank == 3: # Third highest - Golden Yellow
bg_color = "#FFD93D" # Golden yellow
rgba_color = "rgba(255, 217, 61, 0.8)"
elif rank <= 5: # 4th-5th - Light Yellow
bg_color = "#FFF280" # Standard yellow
rgba_color = "rgba(255, 242, 128, 0.7)"
else: # Lower importance - Very Light Yellow
bg_color = "#FFF9C4" # Very light yellow
rgba_color = "rgba(255, 249, 196, 0.6)"
return bg_color, rgba_color
@spaces.GPU
def basic_get_scores_and_sources_full_response(state: State):
"""Traceback the entire response instead of a selected segment"""
# Use the entire response as the explained part
state.explained_response_part = state.full_response
# Attribution using default configuration
_, attr, error_msg = initialize_model_and_attr()
if attr is None:
error_text = error_msg if error_msg else "Traceback initialization failed!"
return (
gr.update(value=[("", None)], visible=False),
gr.update(selected=0),
gr.update(visible=False),
gr.update(value=""),
gr.update(value=[(f"❌ {error_text}", None)], visible=True),
state,
)
try:
# Validate attribution inputs
if not state.context or not state.context.strip():
return (
gr.update(value=[("", None)], visible=False),
gr.update(selected=0),
gr.update(visible=False),
gr.update(value=""),
gr.update(value=[("❌ No context available for traceback!", None)], visible=True),
state,
)
if not state.query or not state.query.strip():
return (
gr.update(value=[("", None)], visible=False),
gr.update(selected=0),
gr.update(visible=False),
gr.update(value=""),
gr.update(value=[("❌ No query available for traceback!", None)], visible=True),
state,
)
if not state.full_response or not state.full_response.strip():
return (
gr.update(value=[("", None)], visible=False),
gr.update(selected=0),
gr.update(visible=False),
gr.update(value=""),
gr.update(value=[("❌ No response available for traceback!", None)], visible=True),
state,
)
print(f"start full response traceback with explanation_level: {DEFAULT_EXPLANATION_LEVEL}")
print(f"context length: {len(state.context)}, query: {state.query[:100]}...")
print(f"full response: {state.full_response[:100]}...")
print(f"tracing entire response (length: {len(state.full_response)} chars)")
texts, important_ids, importance_scores, _, _ = attr.attribute(
state.query, [state.context], state.full_response, state.full_response
)
print("end full response traceback")
print(f"explanation_level: {DEFAULT_EXPLANATION_LEVEL}")
print(f"texts count: {len(texts)} (how context was segmented)")
if len(texts) > 0:
print(f"sample text segments: {[text[:50] + '...' if len(text) > 50 else text for text in texts[:3]]}")
print(f"important_ids: {important_ids}")
print("importance_scores: ", importance_scores)
if not importance_scores:
return (
gr.update(value=[("", None)], visible=False),
gr.update(selected=0),
gr.update(visible=False),
gr.update(value=""),
gr.update(value=[("❌ No traceback scores generated for full response!", None)], visible=True),
state,
)
state.scores = np.array(importance_scores)
# Highlighted sources with ranking-based colors
highlighted_text = []
sorted_indices = np.argsort(state.scores)[::-1]
total_sources = len(important_ids)
for rank, i in enumerate(sorted_indices):
source_text = texts[important_ids[i]]
_ = get_color_by_rank(rank + 1, total_sources)
highlighted_text.append(
(
source_text,
f"rank_{rank+1}",
)
)
# In-context highlights with ranking-based colors - show ALL text
in_context_highlighted_text = []
ranks = {important_ids[i]: rank for rank, i in enumerate(sorted_indices)}
for i in range(len(texts)):
source_text = texts[i]
# Skip or don't highlight segments that are only newlines or whitespace
if source_text.strip() == "":
# For whitespace-only segments, add them without highlighting
in_context_highlighted_text.append((source_text, None))
elif i in important_ids:
# Only highlight if the segment has actual content (not just newlines)
if source_text.strip(): # Has non-whitespace content
rank = ranks[i] + 1
# Split the segment to separate leading/trailing newlines from content
# This prevents newlines from being highlighted
leading_whitespace = ""
trailing_whitespace = ""
content = source_text
# Extract leading newlines/whitespace
while content and content[0] in ['\n', '\r', '\t', ' ']:
leading_whitespace += content[0]
content = content[1:]
# Extract trailing newlines/whitespace
while content and content[-1] in ['\n', '\r', '\t', ' ']:
trailing_whitespace = content[-1] + trailing_whitespace
content = content[:-1]
# Add the parts separately: whitespace unhighlighted, content highlighted
if leading_whitespace:
in_context_highlighted_text.append((leading_whitespace, None))
if content:
in_context_highlighted_text.append((content, f"rank_{rank}"))
if trailing_whitespace:
in_context_highlighted_text.append((trailing_whitespace, None))
else:
# Even if marked as important, don't highlight whitespace-only segments
in_context_highlighted_text.append((source_text, None))
else:
# Add unhighlighted text for non-important segments
in_context_highlighted_text.append((source_text, None))
# Enhanced color map with ranking-based colors
color_map = {}
for rank in range(len(important_ids)):
_, rgba_color = get_color_by_rank(rank + 1, total_sources)
color_map[f"rank_{rank+1}"] = rgba_color
dummy_update = gr.update(
value=f"AttnTrace_{state.response}_{state.start_index}_{state.end_index}"
)
attribute_error_update = gr.update(visible=False)
# Combine sources and highlighted context into a single display
# Sources at the top
combined_display = []
# Add sources header (no highlighting for UI elements)
combined_display.append(("═══ FULL RESPONSE TRACEBACK RESULTS ═══\n", None))
combined_display.append(("These are the text segments that contribute most to the entire response:\n\n", None))
# Add sources using available data
for rank, i in enumerate(sorted_indices):
if i < len(important_ids):
source_text = texts[important_ids[i]]
# Strip leading/trailing whitespace from source text to avoid highlighting newlines
clean_source_text = source_text.strip()
if clean_source_text: # Only add if there's actual content
# Add the source text with highlighting, then add spacing without highlighting
combined_display.append((clean_source_text, f"rank_{rank+1}"))
combined_display.append(("\n\n", None))
# Add separator (no highlighting for UI elements)
combined_display.append(("\n" + "═"*50 + "\n", None))
combined_display.append(("FULL CONTEXT WITH HIGHLIGHTS\n", None))
combined_display.append(("Scroll down to see the complete context with important segments highlighted:\n\n", None))
# Add highlighted context using in_context_highlighted_text
combined_display.extend(in_context_highlighted_text)
# Use only the ranking colors (no highlighting for UI elements)
enhanced_color_map = color_map.copy()
combined_sources_update = HighlightedTextbox(
value=combined_display, color_map=enhanced_color_map, visible=True
)
# Switch to the highlighted context tab and show results
basic_context_tabs_update = gr.update(selected=1)
basic_sources_in_context_tab_update = gr.update(visible=True)
return (
combined_sources_update,
basic_context_tabs_update,
basic_sources_in_context_tab_update,
dummy_update,
attribute_error_update,
state,
)
except Exception as e:
traceback.print_exc()
return (
gr.update(value=[("", None)], visible=False),
gr.update(selected=0),
gr.update(visible=False),
gr.update(value=""),
gr.update(value=[(f"❌ Error: {str(e)}", None)], visible=True),
state,
)
def basic_get_scores_and_sources(
evt: gr.SelectData,
highlighted_response: List[Dict[str, str]],
state: State,
):
# Get the selected sentence
print("highlighted_response: ", highlighted_response[evt.index])
selected_text = highlighted_response[evt.index]['token']
state.explained_response_part = selected_text
# Attribution using default configuration
_, attr, error_msg = initialize_model_and_attr()
if attr is None:
error_text = error_msg if error_msg else "Traceback initialization failed!"
return (
gr.update(value=[("", None)], visible=False),
gr.update(selected=0),
gr.update(visible=False),
gr.update(value=""),
gr.update(value=[(f"❌ {error_text}", None)], visible=True),
state,
)
try:
# Validate attribution inputs
if not state.context or not state.context.strip():
return (
gr.update(value=[("", None)], visible=False),
gr.update(selected=0),
gr.update(visible=False),
gr.update(value=""),
gr.update(value=[("❌ No context available for traceback!", None)], visible=True),
state,
)
if not state.query or not state.query.strip():
return (
gr.update(value=[("", None)], visible=False),
gr.update(selected=0),
gr.update(visible=False),
gr.update(value=""),
gr.update(value=[("❌ No query available for traceback!", None)], visible=True),
state,
)
if not state.full_response or not state.full_response.strip():
return (
gr.update(value=[("", None)], visible=False),
gr.update(selected=0),
gr.update(visible=False),
gr.update(value=""),
gr.update(value=[("❌ No response available for traceback!", None)], visible=True),
state,
)
print(f"start traceback with explanation_level: {DEFAULT_EXPLANATION_LEVEL}")
print(f"context length: {len(state.context)}, query: {state.query[:100]}...")
print(f"response: {state.full_response[:100]}...")
print(f"selected part: {state.explained_response_part[:100]}...")
texts, important_ids, importance_scores, _, _ = attr.attribute(
state.query, [state.context], state.full_response, state.explained_response_part
)
print("end traceback")
print(f"explanation_level: {DEFAULT_EXPLANATION_LEVEL}")
print(f"texts count: {len(texts)} (how context was segmented)")
if len(texts) > 0:
print(f"sample text segments: {[text[:50] + '...' if len(text) > 50 else text for text in texts[:3]]}")
print(f"important_ids: {important_ids}")
print("importance_scores: ", importance_scores)
if not importance_scores:
return (
gr.update(value=[("", None)], visible=False),
gr.update(selected=0),
gr.update(visible=False),
gr.update(value=""),
gr.update(value=[("❌ No traceback scores generated! Try a different text segment.", None)], visible=True),
state,
)
state.scores = np.array(importance_scores)
# Highlighted sources with ranking-based colors
highlighted_text = []
sorted_indices = np.argsort(state.scores)[::-1]
total_sources = len(important_ids)
for rank, i in enumerate(sorted_indices):
source_text = texts[important_ids[i]]
_ = get_color_by_rank(rank + 1, total_sources)
highlighted_text.append(
(
source_text,
f"rank_{rank+1}",
)
)
# In-context highlights with ranking-based colors - show ALL text
in_context_highlighted_text = []
ranks = {important_ids[i]: rank for rank, i in enumerate(sorted_indices)}
for i in range(len(texts)):
source_text = texts[i]
# Skip or don't highlight segments that are only newlines or whitespace
if source_text.strip() == "":
# For whitespace-only segments, add them without highlighting
in_context_highlighted_text.append((source_text, None))
elif i in important_ids:
# Only highlight if the segment has actual content (not just newlines)
if source_text.strip(): # Has non-whitespace content
rank = ranks[i] + 1
# Split the segment to separate leading/trailing newlines from content
# This prevents newlines from being highlighted
leading_whitespace = ""
trailing_whitespace = ""
content = source_text
# Extract leading newlines/whitespace
while content and content[0] in ['\n', '\r', '\t', ' ']:
leading_whitespace += content[0]
content = content[1:]
# Extract trailing newlines/whitespace
while content and content[-1] in ['\n', '\r', '\t', ' ']:
trailing_whitespace = content[-1] + trailing_whitespace
content = content[:-1]
# Add the parts separately: whitespace unhighlighted, content highlighted
if leading_whitespace:
in_context_highlighted_text.append((leading_whitespace, None))
if content:
in_context_highlighted_text.append((content, f"rank_{rank}"))
if trailing_whitespace:
in_context_highlighted_text.append((trailing_whitespace, None))
else:
# Even if marked as important, don't highlight whitespace-only segments
in_context_highlighted_text.append((source_text, None))
else:
# Add unhighlighted text for non-important segments
in_context_highlighted_text.append((source_text, None))
# Enhanced color map with ranking-based colors
color_map = {}
for rank in range(len(important_ids)):
_, rgba_color = get_color_by_rank(rank + 1, total_sources)
color_map[f"rank_{rank+1}"] = rgba_color
dummy_update = gr.update(
value=f"AttnTrace_{state.response}_{state.start_index}_{state.end_index}"
)
attribute_error_update = gr.update(visible=False)
# Combine sources and highlighted context into a single display
# Sources at the top
combined_display = []
# Add sources header (no highlighting for UI elements)
combined_display.append(("═══ TRACEBACK RESULTS ═══\n", None))
combined_display.append(("These are the text segments that contribute most to the response:\n\n", None))
# Add sources using available data
for rank, i in enumerate(sorted_indices):
if i < len(important_ids):
source_text = texts[important_ids[i]]
# Strip leading/trailing whitespace from source text to avoid highlighting newlines
clean_source_text = source_text.strip()
if clean_source_text: # Only add if there's actual content
# Add the source text with highlighting, then add spacing without highlighting
combined_display.append((clean_source_text, f"rank_{rank+1}"))
combined_display.append(("\n\n", None))
# Add separator (no highlighting for UI elements)
combined_display.append(("\n" + "═"*50 + "\n", None))
combined_display.append(("FULL CONTEXT WITH HIGHLIGHTS\n", None))
combined_display.append(("Scroll down to see the complete context with important segments highlighted:\n\n", None))
# Add highlighted context using in_context_highlighted_text
combined_display.extend(in_context_highlighted_text)
# Use only the ranking colors (no highlighting for UI elements)
enhanced_color_map = color_map.copy()
combined_sources_update = HighlightedTextbox(
value=combined_display, color_map=enhanced_color_map, visible=True
)
# Switch to the highlighted context tab and show results
basic_context_tabs_update = gr.update(selected=1)
basic_sources_in_context_tab_update = gr.update(visible=True)
return (
combined_sources_update,
basic_context_tabs_update,
basic_sources_in_context_tab_update,
dummy_update,
attribute_error_update,
state,
)
except Exception as e:
traceback.print_exc()
return (
gr.update(value=[("", None)], visible=False),
gr.update(selected=0),
gr.update(visible=False),
gr.update(value=""),
gr.update(value=[(f"❌ Error: {str(e)}", None)], visible=True),
state,
)
def load_custom_css():
"""Load CSS from external file"""
try:
with open("assets/app_styles.css", "r") as f:
css_content = f.read()
return css_content
except FileNotFoundError:
print("Warning: CSS file not found, using minimal CSS")
return ""
except Exception as e:
print(f"Error loading CSS: {e}")
return ""
# Load CSS from external file
custom_css = load_custom_css()
theme = gr.themes.Citrus(
text_size="lg",
spacing_size="md",
)
with gr.Blocks(theme=theme, css=custom_css) as demo:
gr.Markdown(f"# {APP_TITLE}")
gr.Markdown(APP_DESCRIPTION, elem_classes="app-description")
# gr.Markdown(NEW_TEXT, elem_classes="app-description-2")
gr.Markdown("""
<div style="font-size: 18px;">
AttnTrace is an efficient context traceback method for long contexts (e.g., full papers). It is over 15× faster than the state-of-the-art context traceback method TracLLM. Compared to previous attention-based approaches, AttnTrace is more accurate, reliable, and memory-efficient.
""", elem_classes="feature-highlights")
# Feature highlights
gr.Markdown("""
<div style="font-size: 18px;">
AttnTrace can be used in many real-world applications, such as tracing back to:
- 📄 prompt injection instructions that manipulate LLM-generated paper reviews.
- 💻 malicious comment & code hiding in the codebase that misleads the AI coding assistant.
- 🤖 malicious instructions that mislead the action of the LLM agent.
- 🖋 source texts in the context from an AI summary.
- 🔍 evidence that supports the LLM-generated answer for a question.
- ❌ misinformation (corrupted knowledge) that manipulates LLM output for a question.
- And a lot more...
</div>
""", elem_classes="feature-highlights")
# Example buttons with topic-relevant images - moved here for better positioning
gr.Markdown("### 🚀 Try These Examples!", elem_classes="example-title")
with gr.Row(elem_classes=["example-button-container"]):
with gr.Column(scale=1):
example_1_btn = gr.Button(
"📄 Prompt Injection Attacks in AI Paper Review",
elem_classes=["example-button", "example-paper"],
elem_id="example_1_button",
scale=None,
size="sm"
)
with gr.Column(scale=1):
example_2_btn = gr.Button(
"💻 Malicious Comments & Code in Codebase",
elem_classes=["example-button", "example-movie"],
elem_id="example_2_button"
)
with gr.Column(scale=1):
example_3_btn = gr.Button(
"🤖 Malicious Instructions Misleading the LLM Agent",
elem_classes=["example-button", "example-code"],
elem_id="example_3_button"
)
with gr.Row(elem_classes=["example-button-container"]):
with gr.Column(scale=1):
example_4_btn = gr.Button(
"🖋 Source Texts for an AI Summary",
elem_classes=["example-button", "example-paper-alt"],
elem_id="example_4_button"
)
with gr.Column(scale=1):
example_5_btn = gr.Button(
"🔍 Evidence that Support Question Answering",
elem_classes=["example-button", "example-movie-alt"],
elem_id="example_5_button"
)
with gr.Column(scale=1):
example_6_btn = gr.Button(
"❌ Misinformation (Corrupted Knowledge) in Question Answering",
elem_classes=["example-button", "example-code-alt"],
elem_id="example_6_button"
)
state = gr.State(
value=clear_state()
)
basic_tab = gr.Tab("Demo")
with basic_tab:
# gr.Markdown("## Demo")
gr.Markdown(
"Enter your context and instruction below to try out AttnTrace! You can also click on the example buttons above to load pre-configured examples."
)
gr.Markdown(
'**Color Legend for Context Traceback (by ranking):** <span style="background-color: #FF4444; color: black; padding: 2px 6px; border-radius: 4px; font-weight: 600;">Red</span> = 1st (most important) | <span style="background-color: #FF8C42; color: black; padding: 2px 6px; border-radius: 4px; font-weight: 600;">Orange</span> = 2nd | <span style="background-color: #FFD93D; color: black; padding: 2px 6px; border-radius: 4px; font-weight: 600;">Golden</span> = 3rd | <span style="background-color: #FFF280; color: black; padding: 2px 6px; border-radius: 4px; font-weight: 600;">Yellow</span> = 4th-5th | <span style="background-color: #FFF9C4; color: black; padding: 2px 6px; border-radius: 4px; font-weight: 600;">Light</span> = 6th+'
)
# Top section: Wide Context box with tabs
with gr.Row():
with gr.Column(scale=1):
with gr.Tabs() as basic_context_tabs:
with gr.TabItem("Context", id=0):
basic_context_box = gr.Textbox(
placeholder="Enter context...",
show_label=False,
value="",
lines=6,
max_lines=6,
elem_id="basic_context_box",
autoscroll=False,
)
with gr.TabItem("Context with highlighted traceback results", id=1, visible=True) as basic_sources_in_context_tab:
basic_sources_in_context_box = HighlightedTextbox(
value=[("Click on a sentence in the response below to see highlighted traceback results here.", None)],
show_legend_label=False,
show_label=False,
show_legend=False,
interactive=False,
elem_id="basic_sources_in_context_box",
)
# Error messages
basic_generate_error_box = HighlightedTextbox(
show_legend_label=False,
show_label=False,
show_legend=False,
visible=False,
interactive=False,
container=False,
)
# Bottom section: Left (instruction + button + response), Right (response selection)
with gr.Row(equal_height=True):
# Left: Instruction + Button + Response
with gr.Column(scale=1):
basic_query_box = gr.Textbox(
label="Instruction",
placeholder="Enter an instruction...",
value="",
lines=3,
max_lines=3,
)
unified_response_button = gr.Button(
"Generate/Use Response",
variant="primary",
size="lg"
)
response_input_box = gr.Textbox(
label="Response (Editable)",
placeholder="Response will appear here after generation, or type your own response for traceback...",
lines=8,
max_lines=8,
info="Leave empty and click button to generate from LLM, or type your own response to use for traceback"
)
# Right: Response for attribution selection
with gr.Column(scale=1):
basic_response_box = gr.HighlightedText(
label="Click to select text for traceback!",
value=[("Click the 'Generate/Use Response' button on the left to see response text here for traceback analysis.", None)],
interactive=False,
combine_adjacent=False,
show_label=True,
show_legend=False,
elem_id="basic_response_box",
visible=True,
)
# Button for full response traceback
full_response_traceback_button = gr.Button(
"🔍 Traceback Entire Response",
variant="secondary",
size="sm"
)
# Hidden error box and dummy elements
basic_attribute_error_box = HighlightedTextbox(
show_legend_label=False,
show_label=False,
show_legend=False,
visible=False,
interactive=False,
container=False,
)
dummy_basic_sources_box = gr.Textbox(
visible=False, interactive=False, container=False
)
# Only a single (AttnTrace) method and model in this simplified version
def basic_clear_state():
state = clear_state()
return (
"", # basic_context_box
"", # basic_query_box
"", # response_input_box
gr.update(value=[("Click the 'Generate/Use Response' button above to see response text here for traceback analysis.", None)]), # basic_response_box - keep visible
gr.update(selected=0), # basic_context_tabs - switch to first tab
state,
)
# Defining behavior of various interactions for the basic tab
basic_tab.select(
fn=basic_clear_state,
inputs=[],
outputs=[
basic_context_box,
basic_query_box,
response_input_box,
basic_response_box,
basic_context_tabs,
state,
],
)
for component in [basic_context_box, basic_query_box]:
component.change(
basic_update,
[basic_context_box, basic_query_box, state],
[
basic_response_box,
basic_context_tabs,
state,
],
)
# Example button event handlers - now update both UI and state
outputs_for_examples = [
basic_context_box,
basic_query_box,
state,
response_input_box,
basic_response_box,
basic_context_tabs,
]
example_1_btn.click(
fn=partial(load_an_example, run_example_1),
inputs=[state],
outputs=outputs_for_examples
)
example_2_btn.click(
fn=partial(load_an_example, run_example_2),
inputs=[state],
outputs=outputs_for_examples
)
example_3_btn.click(
fn=partial(load_an_example, run_example_3),
inputs=[state],
outputs=outputs_for_examples
)
example_4_btn.click(
fn=partial(load_an_example, run_example_4),
inputs=[state],
outputs=outputs_for_examples
)
example_5_btn.click(
fn=partial(load_an_example, run_example_5),
inputs=[state],
outputs=outputs_for_examples
)
example_6_btn.click(
fn=partial(load_an_example, run_example_6),
inputs=[state],
outputs=outputs_for_examples
)
unified_response_button.click(
fn=lambda: None,
inputs=[],
outputs=[],
js=get_scroll_js_code("basic_response_box"),
)
basic_response_box.change(
fn=lambda: None,
inputs=[],
outputs=[],
js=get_scroll_js_code("basic_sources_in_context_box"),
)
# Add immediate tab switch on response selection
def immediate_tab_switch():
return (
gr.update(value=[("🔄 Processing traceback... Please wait...", None)]), # Show progress message
gr.update(selected=1), # Switch to annotation tab immediately
)
basic_response_box.select(
fn=immediate_tab_switch,
inputs=[],
outputs=[basic_sources_in_context_box, basic_context_tabs],
queue=False, # Execute immediately without queue
)
basic_response_box.select(
fn=basic_get_scores_and_sources,
inputs=[basic_response_box, state],
outputs=[
basic_sources_in_context_box,
basic_context_tabs,
basic_sources_in_context_tab,
dummy_basic_sources_box,
basic_attribute_error_box,
state,
],
show_progress="full",
)
basic_response_box.select(
fn=basic_update_highlighted_response,
inputs=[state],
outputs=[basic_response_box, state],
)
# Full response traceback button
full_response_traceback_button.click(
fn=immediate_tab_switch,
inputs=[],
outputs=[basic_sources_in_context_box, basic_context_tabs],
queue=False, # Execute immediately without queue
)
full_response_traceback_button.click(
fn=basic_get_scores_and_sources_full_response,
inputs=[state],
outputs=[
basic_sources_in_context_box,
basic_context_tabs,
basic_sources_in_context_tab,
dummy_basic_sources_box,
basic_attribute_error_box,
state,
],
show_progress="full",
)
dummy_basic_sources_box.change(
fn=lambda: None,
inputs=[],
outputs=[],
js=get_scroll_js_code("basic_sources_in_context_box"),
)
# Unified response handler
unified_response_button.click(
fn=unified_response_handler,
inputs=[response_input_box, state],
outputs=[state, response_input_box, basic_response_box, basic_generate_error_box]
)
# gr.Markdown(
# "Please do not interact with elements while generation/attribution is in progress. This may cause errors. You can refresh the page if you run into issues because of this."
# )
demo.launch(show_api=False, share=True)