Spaces:
Running
on
Zero
Running
on
Zero
| # Acknowledgement: This demo code is adapted from the original Hugging Face Space "ContextCite" | |
| # (https://huggingface.co/spaces/contextcite/context-cite). | |
| import os | |
| from enum import Enum | |
| from dataclasses import dataclass | |
| from typing import Dict, List, Any, Optional | |
| import gradio as gr | |
| import numpy as np | |
| import spaces | |
| import nltk | |
| import base64 | |
| from src.utils import split_into_sentences as split_into_sentences_utils | |
| # --- AttnTrace imports (from app_full.py) --- | |
| from src.models import create_model | |
| from src.attribution import AttnTraceAttribution | |
| from src.prompts import wrap_prompt | |
| from gradio_highlightedtextbox import HighlightedTextbox | |
| from examples import run_example_1, run_example_2, run_example_3, run_example_4, run_example_5, run_example_6 | |
| from functools import partial | |
| # Load original app constants | |
| APP_TITLE = '<div class="app-title"><span class="brand">AttnTrace</span><span class="subtitle">Attention-based Context Traceback for Long-Context LLMs</span></div>' | |
| APP_DESCRIPTION = """AttnTrace traces a model's generated statements back to specific parts of the context using attention-based traceback. Try it out with Meta-Llama-3.1-8B-Instruct here! See the [[paper](https://arxiv.org/abs/2506.04202)] and [[code](https://github.com/Wang-Yanting/TracLLM-Kit)] for more! | |
| Maintained by the AttnTrace team.""" | |
| # NEW_TEXT = """Long-context large language models (LLMs), such as Gemini-2.5-Pro and Claude-Sonnet-4, are increasingly used to empower advanced AI systems, including retrieval-augmented generation (RAG) pipelines and autonomous agents. In these systems, an LLM receives an instruction along with a context—often consisting of texts retrieved from a knowledge database or memory—and generates a response that is contextually grounded by following the instruction. Recent studies have designed solutions to trace back to a subset of texts in the context that contributes most to the response generated by the LLM. These solutions have numerous real-world applications, including performing post-attack forensic analysis and improving the interpretability and trustworthiness of LLM outputs. While significant efforts have been made, state-of-the-art solutions such as TracLLM often lead to a high computation cost, e.g., it takes TracLLM hundreds of seconds to perform traceback for a single response-context pair. In this work, we propose {\name}, a new context traceback method based on the attention weights produced by an LLM for a prompt. To effectively utilize attention weights, we introduce two techniques designed to enhance the effectiveness of {\name}, and we provide theoretical insights for our design choice. %Moreover, we perform both theoretical analysis and empirical evaluation to demonstrate their effectiveness. | |
| # We also perform a systematic evaluation for {\name}. The results demonstrate that {\name} is more accurate and efficient than existing state-of-the-art context traceback methods. We also show {\name} can improve state-of-the-art methods in detecting prompt injection under long contexts through the attribution-before-detection paradigm. As a real-world application, we demonstrate that {\name} can effectively pinpoint injected instructions in a paper designed to manipulate LLM-generated reviews. | |
| # The code and data will be open-sourced. """ | |
| # EDIT_TEXT = "Feel free to edit!" | |
| GENERATE_CONTEXT_TOO_LONG_TEXT = ( | |
| '<em style="color: red;">Context is too long for the current model.</em>' | |
| ) | |
| ATTRIBUTE_CONTEXT_TOO_LONG_TEXT = '<em style="color: red;">Context is too long for the current traceback method.</em>' | |
| CONTEXT_LINES = 20 | |
| CONTEXT_MAX_LINES = 40 | |
| SELECTION_DEFAULT_TEXT = "Click on a sentence in the response to traceback!" | |
| SELECTION_DEFAULT_VALUE = [(SELECTION_DEFAULT_TEXT, None)] | |
| SOURCES_INFO = 'These are the texts that contribute most to the response.' | |
| # SOURCES_IN_CONTEXT_INFO = ( | |
| # "This shows the important sentences highlighted within their surrounding context from the text above. Colors indicate ranking: Red (1st), Orange (2nd), Golden (3rd), Yellow (4th-5th), Light (6th+)." | |
| # ) | |
| MODEL_PATHS = [ | |
| "meta-llama/Meta-Llama-3.1-8B-Instruct", | |
| ] | |
| MAX_TOKENS = { | |
| "meta-llama/Meta-Llama-3.1-8B-Instruct": 131072, | |
| } | |
| DEFAULT_MODEL_PATH = MODEL_PATHS[0] | |
| EXPLANATION_LEVELS = ["sentence", "paragraph", "text segment"] | |
| DEFAULT_EXPLANATION_LEVEL = "sentence" | |
| class WorkflowState(Enum): | |
| WAITING_TO_GENERATE = 0 | |
| WAITING_TO_SELECT = 1 | |
| READY_TO_ATTRIBUTE = 2 | |
| class State: | |
| workflow_state: WorkflowState | |
| context: str | |
| query: str | |
| response: str | |
| start_index: int | |
| end_index: int | |
| scores: np.ndarray | |
| answer: str | |
| highlighted_context: str | |
| full_response: str | |
| explained_response_part: str | |
| last_query_used: str = "" | |
| # --- Dynamic Model and Attribution Management --- | |
| current_llm = None | |
| current_attr = None | |
| current_model_path = None | |
| current_explanation_level = None | |
| current_api_key = None | |
| def initialize_model_and_attr(): | |
| """Initialize model and attribution with default configuration""" | |
| global current_llm, current_attr, current_model_path, current_explanation_level, current_api_key | |
| try: | |
| # Check if we need to reinitialize the model | |
| need_model_update = (current_llm is None or | |
| current_model_path != DEFAULT_MODEL_PATH or | |
| current_api_key != os.getenv("HF_TOKEN")) | |
| # Check if we need to update attribution | |
| need_attr_update = (current_attr is None or | |
| current_explanation_level != DEFAULT_EXPLANATION_LEVEL or | |
| need_model_update) | |
| if need_model_update: | |
| print(f"Initializing model: {DEFAULT_MODEL_PATH}") | |
| effective_api_key = os.getenv("HF_TOKEN") | |
| current_llm = create_model(model_path=DEFAULT_MODEL_PATH, api_key=effective_api_key, device="cuda") | |
| current_model_path = DEFAULT_MODEL_PATH | |
| current_api_key = effective_api_key | |
| if need_attr_update: | |
| print(f"Initializing context traceback with explanation level: {DEFAULT_EXPLANATION_LEVEL}") | |
| current_attr = AttnTraceAttribution( | |
| current_llm, | |
| explanation_level=DEFAULT_EXPLANATION_LEVEL, | |
| K=3, | |
| q=0.4, | |
| B=30 | |
| ) | |
| current_explanation_level = DEFAULT_EXPLANATION_LEVEL | |
| return current_llm, current_attr, None | |
| except Exception as e: | |
| error_msg = f"Error initializing model/traceback: {str(e)}" | |
| print(error_msg) | |
| return None, None, error_msg | |
| # Initialize with defaults | |
| initialize_model_and_attr() | |
| # Images replaced with CSS textures and gradients - no longer needed | |
| def clear_state(): | |
| return State( | |
| workflow_state=WorkflowState.WAITING_TO_GENERATE, | |
| context="", | |
| query="", | |
| response="", | |
| start_index=0, | |
| end_index=0, | |
| scores=np.array([]), | |
| answer="", | |
| highlighted_context="", | |
| full_response="", | |
| explained_response_part="", | |
| last_query_used="" | |
| ) | |
| def load_an_example(example_loader_func, state: State): | |
| context, query = example_loader_func() | |
| # Update both UI and state | |
| state.context = context | |
| state.query = query | |
| state.workflow_state = WorkflowState.WAITING_TO_GENERATE | |
| # Clear previous results | |
| state.response = "" | |
| state.answer = "" | |
| state.full_response = "" | |
| state.explained_response_part = "" | |
| print(f"Loaded example - Context: {len(context)} chars, Query: {query[:50]}...") | |
| return ( | |
| context, # basic_context_box | |
| query, # basic_query_box | |
| state, | |
| "", # response_input_box - clear it | |
| gr.update(value=[("Click the 'Generate/Use Response' button above to see response text here for traceback analysis.", None)]), # basic_response_box - keep visible | |
| gr.update(selected=0) # basic_context_tabs - switch to first tab | |
| ) | |
| def get_max_tokens(model_path: str): | |
| return MAX_TOKENS.get(model_path, 2048) # Default fallback | |
| def get_scroll_js_code(elem_id): | |
| return f""" | |
| function scrollToElement() {{ | |
| const element = document.getElementById("{elem_id}"); | |
| element.scrollIntoView({{ behavior: "smooth", block: "nearest" }}); | |
| }} | |
| """ | |
| def basic_update(context: str, query: str, state: State): | |
| state.context = context | |
| state.query = query | |
| state.workflow_state = WorkflowState.WAITING_TO_GENERATE | |
| return ( | |
| gr.update(value=[("Click the 'Generate/Use Response' button above to see response text here for traceback analysis.", None)]), # basic_response_box - keep visible | |
| gr.update(selected=0), # basic_context_tabs - switch to first tab | |
| state, | |
| ) | |
| def generate_model_response(state: State): | |
| # Validate inputs first with debug info | |
| print(f"Validation - Context length: {len(state.context) if state.context else 0}") | |
| print(f"Validation - Query: {state.query[:50] if state.query else 'empty'}...") | |
| if not state.context or not state.context.strip(): | |
| print("❌ Validation failed: No context") | |
| return state, gr.update(value=[("❌ Please enter context before generating response! If you just changed configuration, try reloading an example.", None)], visible=True) | |
| if not state.query or not state.query.strip(): | |
| print("❌ Validation failed: No query") | |
| return state, gr.update(value=[("❌ Please enter a query before generating response! If you just changed configuration, try reloading an example.", None)], visible=True) | |
| # Initialize model and attribution with default configuration | |
| print(f"🔧 Generating response with explanation_level: {DEFAULT_EXPLANATION_LEVEL}") | |
| llm, attr, error_msg = initialize_model_and_attr() | |
| if llm is None or attr is None: | |
| error_text = error_msg if error_msg else "Model initialization failed!" | |
| return state, gr.update(value=[(f"❌ {error_text}", None)], visible=True) | |
| prompt = wrap_prompt(state.query, [state.context]) | |
| print(f"Generated prompt for {DEFAULT_MODEL_PATH}: {prompt[:200]}...") # Debug log | |
| # Check context length | |
| if len(prompt.split()) > get_max_tokens(DEFAULT_MODEL_PATH) - 512: | |
| return state, gr.update(value=[(GENERATE_CONTEXT_TOO_LONG_TEXT, None)], visible=True) | |
| answer = llm.query(prompt) | |
| print(f"Model response: {answer}") # Debug log | |
| state.response = answer | |
| state.answer = answer | |
| state.full_response = answer | |
| state.workflow_state = WorkflowState.WAITING_TO_SELECT | |
| return state, gr.update(visible=False) | |
| def split_into_sentences(text: str): | |
| lines = text.splitlines() | |
| sentences = [] | |
| for line in lines: | |
| sentences.extend(nltk.sent_tokenize(line)) | |
| separators = [] | |
| cur_start = 0 | |
| for sentence in sentences: | |
| cur_end = text.find(sentence, cur_start) | |
| separators.append(text[cur_start:cur_end]) | |
| cur_start = cur_end + len(sentence) | |
| return sentences, separators | |
| def basic_highlight_response( | |
| response: str, selected_index: int, num_sources: int = -1 | |
| ): | |
| sentences, separators = split_into_sentences(response) | |
| ht = [] | |
| if num_sources == -1: | |
| citations_text = "Traceback!" | |
| elif num_sources == 0: | |
| citations_text = "No important text!" | |
| else: | |
| citations_text = f"[{','.join(str(i) for i in range(1, num_sources + 1))}]" | |
| for i, (sentence, separator) in enumerate(zip(sentences, separators)): | |
| label = citations_text if i == selected_index else "Traceback" | |
| # Hack to ignore punctuation | |
| if len(sentence) >= 4: | |
| ht.append((separator + sentence, label)) | |
| else: | |
| ht.append((separator + sentence, None)) | |
| color_map = {"Click to cite!": "blue", citations_text: "yellow"} | |
| return gr.HighlightedText(value=ht, color_map=color_map) | |
| def basic_highlight_response_with_visibility( | |
| response: str, selected_index: int, num_sources: int = -1, visible: bool = True | |
| ): | |
| """Version of basic_highlight_response that also sets visibility""" | |
| sentences, separators = split_into_sentences(response) | |
| ht = [] | |
| if num_sources == -1: | |
| citations_text = "Traceback!" | |
| elif num_sources == 0: | |
| citations_text = "No important text!" | |
| else: | |
| citations_text = f"[{','.join(str(i) for i in range(1, num_sources + 1))}]" | |
| for i, (sentence, separator) in enumerate(zip(sentences, separators)): | |
| label = citations_text if i == selected_index else "Traceback" | |
| # Hack to ignore punctuation | |
| if len(sentence) >= 4: | |
| ht.append((separator + sentence, label)) | |
| else: | |
| ht.append((separator + sentence, None)) | |
| color_map = {"Click to cite!": "blue", citations_text: "yellow"} | |
| return gr.update(value=ht, color_map=color_map, visible=visible) | |
| def basic_update_highlighted_response(evt: gr.SelectData, state: State): | |
| response_update = basic_highlight_response(state.response, evt.index) | |
| return response_update, state | |
| def unified_response_handler(response_text: str, state: State): | |
| """Handle both LLM generation and manual input based on whether text is provided""" | |
| # Check if instruction has changed from what was used to generate current response | |
| instruction_changed = hasattr(state, 'last_query_used') and state.last_query_used != state.query | |
| # If response_text is empty, whitespace, or instruction changed, generate from LLM | |
| if not response_text or not response_text.strip() or instruction_changed: | |
| if instruction_changed: | |
| print("📝 Instruction changed, generating new response from LLM...") | |
| else: | |
| print("🤖 Generating response from LLM...") | |
| # Validate inputs first | |
| if not state.context or not state.context.strip(): | |
| return ( | |
| state, | |
| response_text, # Keep current text box content | |
| gr.update(visible=False), # Keep response box hidden | |
| gr.update(value=[("❌ Please enter context before generating response!", None)], visible=True) | |
| ) | |
| if not state.query or not state.query.strip(): | |
| return ( | |
| state, | |
| response_text, # Keep current text box content | |
| gr.update(visible=False), # Keep response box hidden | |
| gr.update(value=[("❌ Please enter a query before generating response!", None)], visible=True) | |
| ) | |
| # Initialize model and generate response | |
| llm, attr, error_msg = initialize_model_and_attr() | |
| if llm is None: | |
| error_text = error_msg if error_msg else "Model initialization failed!" | |
| return ( | |
| state, | |
| response_text, # Keep current text box content | |
| gr.update(visible=False), # Keep response box hidden | |
| gr.update(value=[(f"❌ {error_text}", None)], visible=True) | |
| ) | |
| prompt = wrap_prompt(state.query, [state.context]) | |
| # Check context length | |
| if len(prompt.split()) > get_max_tokens(DEFAULT_MODEL_PATH) - 512: | |
| return ( | |
| state, | |
| response_text, # Keep current text box content | |
| gr.update(visible=False), # Keep response box hidden | |
| gr.update(value=[(GENERATE_CONTEXT_TOO_LONG_TEXT, None)], visible=True) | |
| ) | |
| # Generate response | |
| answer = llm.query(prompt) | |
| print(f"Generated response: {answer[:100]}...") | |
| # Update state and UI | |
| state.response = answer | |
| state.answer = answer | |
| state.full_response = answer | |
| state.last_query_used = state.query # Track which query was used for this response | |
| state.workflow_state = WorkflowState.WAITING_TO_SELECT | |
| # Create highlighted response and show it | |
| response_update = basic_highlight_response_with_visibility(state.response, -1, visible=True) | |
| return ( | |
| state, | |
| answer, # Put generated response in text box | |
| response_update, # Update clickable response content | |
| gr.update(visible=False) # Hide error box | |
| ) | |
| else: | |
| # Use provided text as manual response | |
| print("✏️ Using manual response...") | |
| manual_text = response_text.strip() | |
| # Update state with manual response | |
| state.response = manual_text | |
| state.answer = manual_text | |
| state.full_response = manual_text | |
| state.last_query_used = state.query # Track current query for this response | |
| state.workflow_state = WorkflowState.WAITING_TO_SELECT | |
| # Create highlighted response for selection | |
| response_update = basic_highlight_response_with_visibility(state.response, -1, visible=True) | |
| return ( | |
| state, | |
| manual_text, # Keep text in text box | |
| response_update, # Update clickable response content | |
| gr.update(visible=False) # Hide error box | |
| ) | |
| def get_color_by_rank(rank, total_items): | |
| """Get color based purely on rank position for better visual distinction""" | |
| if total_items == 0: | |
| return "#F0F0F0", "rgba(240, 240, 240, 0.8)" | |
| # Pure ranking-based color assignment for clear visual hierarchy | |
| if rank == 1: # Highest importance - Strong Red | |
| bg_color = "#FF4444" # Bright red | |
| rgba_color = "rgba(255, 68, 68, 0.9)" | |
| elif rank == 2: # Second highest - Orange | |
| bg_color = "#FF8C42" # Bright orange | |
| rgba_color = "rgba(255, 140, 66, 0.8)" | |
| elif rank == 3: # Third highest - Golden Yellow | |
| bg_color = "#FFD93D" # Golden yellow | |
| rgba_color = "rgba(255, 217, 61, 0.8)" | |
| elif rank <= 5: # 4th-5th - Light Yellow | |
| bg_color = "#FFF280" # Standard yellow | |
| rgba_color = "rgba(255, 242, 128, 0.7)" | |
| else: # Lower importance - Very Light Yellow | |
| bg_color = "#FFF9C4" # Very light yellow | |
| rgba_color = "rgba(255, 249, 196, 0.6)" | |
| return bg_color, rgba_color | |
| def basic_get_scores_and_sources_full_response(state: State): | |
| """Traceback the entire response instead of a selected segment""" | |
| # Use the entire response as the explained part | |
| state.explained_response_part = state.full_response | |
| # Attribution using default configuration | |
| _, attr, error_msg = initialize_model_and_attr() | |
| if attr is None: | |
| error_text = error_msg if error_msg else "Traceback initialization failed!" | |
| return ( | |
| gr.update(value=[("", None)], visible=False), | |
| gr.update(selected=0), | |
| gr.update(visible=False), | |
| gr.update(value=""), | |
| gr.update(value=[(f"❌ {error_text}", None)], visible=True), | |
| state, | |
| ) | |
| try: | |
| # Validate attribution inputs | |
| if not state.context or not state.context.strip(): | |
| return ( | |
| gr.update(value=[("", None)], visible=False), | |
| gr.update(selected=0), | |
| gr.update(visible=False), | |
| gr.update(value=""), | |
| gr.update(value=[("❌ No context available for traceback!", None)], visible=True), | |
| state, | |
| ) | |
| if not state.query or not state.query.strip(): | |
| return ( | |
| gr.update(value=[("", None)], visible=False), | |
| gr.update(selected=0), | |
| gr.update(visible=False), | |
| gr.update(value=""), | |
| gr.update(value=[("❌ No query available for traceback!", None)], visible=True), | |
| state, | |
| ) | |
| if not state.full_response or not state.full_response.strip(): | |
| return ( | |
| gr.update(value=[("", None)], visible=False), | |
| gr.update(selected=0), | |
| gr.update(visible=False), | |
| gr.update(value=""), | |
| gr.update(value=[("❌ No response available for traceback!", None)], visible=True), | |
| state, | |
| ) | |
| print(f"start full response traceback with explanation_level: {DEFAULT_EXPLANATION_LEVEL}") | |
| print(f"context length: {len(state.context)}, query: {state.query[:100]}...") | |
| print(f"full response: {state.full_response[:100]}...") | |
| print(f"tracing entire response (length: {len(state.full_response)} chars)") | |
| texts, important_ids, importance_scores, _, _ = attr.attribute( | |
| state.query, [state.context], state.full_response, state.full_response | |
| ) | |
| print("end full response traceback") | |
| print(f"explanation_level: {DEFAULT_EXPLANATION_LEVEL}") | |
| print(f"texts count: {len(texts)} (how context was segmented)") | |
| if len(texts) > 0: | |
| print(f"sample text segments: {[text[:50] + '...' if len(text) > 50 else text for text in texts[:3]]}") | |
| print(f"important_ids: {important_ids}") | |
| print("importance_scores: ", importance_scores) | |
| if not importance_scores: | |
| return ( | |
| gr.update(value=[("", None)], visible=False), | |
| gr.update(selected=0), | |
| gr.update(visible=False), | |
| gr.update(value=""), | |
| gr.update(value=[("❌ No traceback scores generated for full response!", None)], visible=True), | |
| state, | |
| ) | |
| state.scores = np.array(importance_scores) | |
| # Highlighted sources with ranking-based colors | |
| highlighted_text = [] | |
| sorted_indices = np.argsort(state.scores)[::-1] | |
| total_sources = len(important_ids) | |
| for rank, i in enumerate(sorted_indices): | |
| source_text = texts[important_ids[i]] | |
| _ = get_color_by_rank(rank + 1, total_sources) | |
| highlighted_text.append( | |
| ( | |
| source_text, | |
| f"rank_{rank+1}", | |
| ) | |
| ) | |
| # In-context highlights with ranking-based colors - show ALL text | |
| in_context_highlighted_text = [] | |
| ranks = {important_ids[i]: rank for rank, i in enumerate(sorted_indices)} | |
| for i in range(len(texts)): | |
| source_text = texts[i] | |
| # Skip or don't highlight segments that are only newlines or whitespace | |
| if source_text.strip() == "": | |
| # For whitespace-only segments, add them without highlighting | |
| in_context_highlighted_text.append((source_text, None)) | |
| elif i in important_ids: | |
| # Only highlight if the segment has actual content (not just newlines) | |
| if source_text.strip(): # Has non-whitespace content | |
| rank = ranks[i] + 1 | |
| # Split the segment to separate leading/trailing newlines from content | |
| # This prevents newlines from being highlighted | |
| leading_whitespace = "" | |
| trailing_whitespace = "" | |
| content = source_text | |
| # Extract leading newlines/whitespace | |
| while content and content[0] in ['\n', '\r', '\t', ' ']: | |
| leading_whitespace += content[0] | |
| content = content[1:] | |
| # Extract trailing newlines/whitespace | |
| while content and content[-1] in ['\n', '\r', '\t', ' ']: | |
| trailing_whitespace = content[-1] + trailing_whitespace | |
| content = content[:-1] | |
| # Add the parts separately: whitespace unhighlighted, content highlighted | |
| if leading_whitespace: | |
| in_context_highlighted_text.append((leading_whitespace, None)) | |
| if content: | |
| in_context_highlighted_text.append((content, f"rank_{rank}")) | |
| if trailing_whitespace: | |
| in_context_highlighted_text.append((trailing_whitespace, None)) | |
| else: | |
| # Even if marked as important, don't highlight whitespace-only segments | |
| in_context_highlighted_text.append((source_text, None)) | |
| else: | |
| # Add unhighlighted text for non-important segments | |
| in_context_highlighted_text.append((source_text, None)) | |
| # Enhanced color map with ranking-based colors | |
| color_map = {} | |
| for rank in range(len(important_ids)): | |
| _, rgba_color = get_color_by_rank(rank + 1, total_sources) | |
| color_map[f"rank_{rank+1}"] = rgba_color | |
| dummy_update = gr.update( | |
| value=f"AttnTrace_{state.response}_{state.start_index}_{state.end_index}" | |
| ) | |
| attribute_error_update = gr.update(visible=False) | |
| # Combine sources and highlighted context into a single display | |
| # Sources at the top | |
| combined_display = [] | |
| # Add sources header (no highlighting for UI elements) | |
| combined_display.append(("═══ FULL RESPONSE TRACEBACK RESULTS ═══\n", None)) | |
| combined_display.append(("These are the text segments that contribute most to the entire response:\n\n", None)) | |
| # Add sources using available data | |
| for rank, i in enumerate(sorted_indices): | |
| if i < len(important_ids): | |
| source_text = texts[important_ids[i]] | |
| # Strip leading/trailing whitespace from source text to avoid highlighting newlines | |
| clean_source_text = source_text.strip() | |
| if clean_source_text: # Only add if there's actual content | |
| # Add the source text with highlighting, then add spacing without highlighting | |
| combined_display.append((clean_source_text, f"rank_{rank+1}")) | |
| combined_display.append(("\n\n", None)) | |
| # Add separator (no highlighting for UI elements) | |
| combined_display.append(("\n" + "═"*50 + "\n", None)) | |
| combined_display.append(("FULL CONTEXT WITH HIGHLIGHTS\n", None)) | |
| combined_display.append(("Scroll down to see the complete context with important segments highlighted:\n\n", None)) | |
| # Add highlighted context using in_context_highlighted_text | |
| combined_display.extend(in_context_highlighted_text) | |
| # Use only the ranking colors (no highlighting for UI elements) | |
| enhanced_color_map = color_map.copy() | |
| combined_sources_update = HighlightedTextbox( | |
| value=combined_display, color_map=enhanced_color_map, visible=True | |
| ) | |
| # Switch to the highlighted context tab and show results | |
| basic_context_tabs_update = gr.update(selected=1) | |
| basic_sources_in_context_tab_update = gr.update(visible=True) | |
| return ( | |
| combined_sources_update, | |
| basic_context_tabs_update, | |
| basic_sources_in_context_tab_update, | |
| dummy_update, | |
| attribute_error_update, | |
| state, | |
| ) | |
| except Exception as e: | |
| return ( | |
| gr.update(value=[("", None)], visible=False), | |
| gr.update(selected=0), | |
| gr.update(visible=False), | |
| gr.update(value=""), | |
| gr.update(value=[(f"❌ Error: {str(e)}", None)], visible=True), | |
| state, | |
| ) | |
| def basic_get_scores_and_sources( | |
| evt: gr.SelectData, | |
| highlighted_response: List[Dict[str, str]], | |
| state: State, | |
| ): | |
| # Get the selected sentence | |
| print("highlighted_response: ", highlighted_response[evt.index]) | |
| selected_text = highlighted_response[evt.index]['token'] | |
| state.explained_response_part = selected_text | |
| # Attribution using default configuration | |
| _, attr, error_msg = initialize_model_and_attr() | |
| if attr is None: | |
| error_text = error_msg if error_msg else "Traceback initialization failed!" | |
| return ( | |
| gr.update(value=[("", None)], visible=False), | |
| gr.update(selected=0), | |
| gr.update(visible=False), | |
| gr.update(value=""), | |
| gr.update(value=[(f"❌ {error_text}", None)], visible=True), | |
| state, | |
| ) | |
| try: | |
| # Validate attribution inputs | |
| if not state.context or not state.context.strip(): | |
| return ( | |
| gr.update(value=[("", None)], visible=False), | |
| gr.update(selected=0), | |
| gr.update(visible=False), | |
| gr.update(value=""), | |
| gr.update(value=[("❌ No context available for traceback!", None)], visible=True), | |
| state, | |
| ) | |
| if not state.query or not state.query.strip(): | |
| return ( | |
| gr.update(value=[("", None)], visible=False), | |
| gr.update(selected=0), | |
| gr.update(visible=False), | |
| gr.update(value=""), | |
| gr.update(value=[("❌ No query available for traceback!", None)], visible=True), | |
| state, | |
| ) | |
| if not state.full_response or not state.full_response.strip(): | |
| return ( | |
| gr.update(value=[("", None)], visible=False), | |
| gr.update(selected=0), | |
| gr.update(visible=False), | |
| gr.update(value=""), | |
| gr.update(value=[("❌ No response available for traceback!", None)], visible=True), | |
| state, | |
| ) | |
| print(f"start traceback with explanation_level: {DEFAULT_EXPLANATION_LEVEL}") | |
| print(f"context length: {len(state.context)}, query: {state.query[:100]}...") | |
| print(f"response: {state.full_response[:100]}...") | |
| print(f"selected part: {state.explained_response_part[:100]}...") | |
| texts, important_ids, importance_scores, _, _ = attr.attribute( | |
| state.query, [state.context], state.full_response, state.explained_response_part | |
| ) | |
| print("end traceback") | |
| print(f"explanation_level: {DEFAULT_EXPLANATION_LEVEL}") | |
| print(f"texts count: {len(texts)} (how context was segmented)") | |
| if len(texts) > 0: | |
| print(f"sample text segments: {[text[:50] + '...' if len(text) > 50 else text for text in texts[:3]]}") | |
| print(f"important_ids: {important_ids}") | |
| print("importance_scores: ", importance_scores) | |
| if not importance_scores: | |
| return ( | |
| gr.update(value=[("", None)], visible=False), | |
| gr.update(selected=0), | |
| gr.update(visible=False), | |
| gr.update(value=""), | |
| gr.update(value=[("❌ No traceback scores generated! Try a different text segment.", None)], visible=True), | |
| state, | |
| ) | |
| state.scores = np.array(importance_scores) | |
| # Highlighted sources with ranking-based colors | |
| highlighted_text = [] | |
| sorted_indices = np.argsort(state.scores)[::-1] | |
| total_sources = len(important_ids) | |
| for rank, i in enumerate(sorted_indices): | |
| source_text = texts[important_ids[i]] | |
| _ = get_color_by_rank(rank + 1, total_sources) | |
| highlighted_text.append( | |
| ( | |
| source_text, | |
| f"rank_{rank+1}", | |
| ) | |
| ) | |
| # In-context highlights with ranking-based colors - show ALL text | |
| in_context_highlighted_text = [] | |
| ranks = {important_ids[i]: rank for rank, i in enumerate(sorted_indices)} | |
| for i in range(len(texts)): | |
| source_text = texts[i] | |
| # Skip or don't highlight segments that are only newlines or whitespace | |
| if source_text.strip() == "": | |
| # For whitespace-only segments, add them without highlighting | |
| in_context_highlighted_text.append((source_text, None)) | |
| elif i in important_ids: | |
| # Only highlight if the segment has actual content (not just newlines) | |
| if source_text.strip(): # Has non-whitespace content | |
| rank = ranks[i] + 1 | |
| # Split the segment to separate leading/trailing newlines from content | |
| # This prevents newlines from being highlighted | |
| leading_whitespace = "" | |
| trailing_whitespace = "" | |
| content = source_text | |
| # Extract leading newlines/whitespace | |
| while content and content[0] in ['\n', '\r', '\t', ' ']: | |
| leading_whitespace += content[0] | |
| content = content[1:] | |
| # Extract trailing newlines/whitespace | |
| while content and content[-1] in ['\n', '\r', '\t', ' ']: | |
| trailing_whitespace = content[-1] + trailing_whitespace | |
| content = content[:-1] | |
| # Add the parts separately: whitespace unhighlighted, content highlighted | |
| if leading_whitespace: | |
| in_context_highlighted_text.append((leading_whitespace, None)) | |
| if content: | |
| in_context_highlighted_text.append((content, f"rank_{rank}")) | |
| if trailing_whitespace: | |
| in_context_highlighted_text.append((trailing_whitespace, None)) | |
| else: | |
| # Even if marked as important, don't highlight whitespace-only segments | |
| in_context_highlighted_text.append((source_text, None)) | |
| else: | |
| # Add unhighlighted text for non-important segments | |
| in_context_highlighted_text.append((source_text, None)) | |
| # Enhanced color map with ranking-based colors | |
| color_map = {} | |
| for rank in range(len(important_ids)): | |
| _, rgba_color = get_color_by_rank(rank + 1, total_sources) | |
| color_map[f"rank_{rank+1}"] = rgba_color | |
| dummy_update = gr.update( | |
| value=f"AttnTrace_{state.response}_{state.start_index}_{state.end_index}" | |
| ) | |
| attribute_error_update = gr.update(visible=False) | |
| # Combine sources and highlighted context into a single display | |
| # Sources at the top | |
| combined_display = [] | |
| # Add sources header (no highlighting for UI elements) | |
| combined_display.append(("═══ TRACEBACK RESULTS ═══\n", None)) | |
| combined_display.append(("These are the text segments that contribute most to the response:\n\n", None)) | |
| # Add sources using available data | |
| for rank, i in enumerate(sorted_indices): | |
| if i < len(important_ids): | |
| source_text = texts[important_ids[i]] | |
| # Strip leading/trailing whitespace from source text to avoid highlighting newlines | |
| clean_source_text = source_text.strip() | |
| if clean_source_text: # Only add if there's actual content | |
| # Add the source text with highlighting, then add spacing without highlighting | |
| combined_display.append((clean_source_text, f"rank_{rank+1}")) | |
| combined_display.append(("\n\n", None)) | |
| # Add separator (no highlighting for UI elements) | |
| combined_display.append(("\n" + "═"*50 + "\n", None)) | |
| combined_display.append(("FULL CONTEXT WITH HIGHLIGHTS\n", None)) | |
| combined_display.append(("Scroll down to see the complete context with important segments highlighted:\n\n", None)) | |
| # Add highlighted context using in_context_highlighted_text | |
| combined_display.extend(in_context_highlighted_text) | |
| # Use only the ranking colors (no highlighting for UI elements) | |
| enhanced_color_map = color_map.copy() | |
| combined_sources_update = HighlightedTextbox( | |
| value=combined_display, color_map=enhanced_color_map, visible=True | |
| ) | |
| # Switch to the highlighted context tab and show results | |
| basic_context_tabs_update = gr.update(selected=1) | |
| basic_sources_in_context_tab_update = gr.update(visible=True) | |
| return ( | |
| combined_sources_update, | |
| basic_context_tabs_update, | |
| basic_sources_in_context_tab_update, | |
| dummy_update, | |
| attribute_error_update, | |
| state, | |
| ) | |
| except Exception as e: | |
| return ( | |
| gr.update(value=[("", None)], visible=False), | |
| gr.update(selected=0), | |
| gr.update(visible=False), | |
| gr.update(value=""), | |
| gr.update(value=[(f"❌ Error: {str(e)}", None)], visible=True), | |
| state, | |
| ) | |
| def load_custom_css(): | |
| """Load CSS from external file""" | |
| try: | |
| with open("assets/app_styles.css", "r") as f: | |
| css_content = f.read() | |
| return css_content | |
| except FileNotFoundError: | |
| print("Warning: CSS file not found, using minimal CSS") | |
| return "" | |
| except Exception as e: | |
| print(f"Error loading CSS: {e}") | |
| return "" | |
| # Load CSS from external file | |
| custom_css = load_custom_css() | |
| theme = gr.themes.Citrus( | |
| text_size="lg", | |
| spacing_size="md", | |
| ) | |
| with gr.Blocks(theme=theme, css=custom_css) as demo: | |
| gr.Markdown(f"# {APP_TITLE}") | |
| gr.Markdown(APP_DESCRIPTION, elem_classes="app-description") | |
| # gr.Markdown(NEW_TEXT, elem_classes="app-description-2") | |
| gr.Markdown(""" | |
| <div style="font-size: 18px;"> | |
| AttnTrace is an efficient context traceback method for long contexts (e.g., full papers). It is over 15× faster than the state-of-the-art context traceback method TracLLM. Compared to previous attention-based approaches, AttnTrace is more accurate, reliable, and memory-efficient. | |
| """, elem_classes="feature-highlights") | |
| # Image | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| pass | |
| with gr.Column(scale=4): | |
| gr.Image("assets/fig1.png", show_label=False, container=False) | |
| with gr.Column(scale=3): | |
| pass | |
| # Feature highlights | |
| gr.Markdown(""" | |
| <div style="font-size: 18px;"> | |
| As shown in the above figure, AttnTrace can trace back to the texts in a long context that contribute to the output of an LLM. AttnTrace can be used in many real-world applications, such as tracing back to: | |
| - 📄 prompt injection instructions that manipulate LLM-generated paper reviews. | |
| - 💻 malicious comment & code hiding in the codebase that misleads the AI coding assistant. | |
| - 🤖 malicious instructions that mislead the action of the LLM agent. | |
| - 🖋 source texts in the context from an AI summary. | |
| - 🔍 evidence that supports the LLM-generated answer for a question. | |
| - ❌ misinformation (corrupted knowledge) that manipulates LLM output for a question. | |
| - And a lot more... | |
| </div> | |
| """, elem_classes="feature-highlights") | |
| # Example buttons with topic-relevant images - moved here for better positioning | |
| gr.Markdown("### 🚀 Try These Examples!", elem_classes="example-title") | |
| with gr.Row(elem_classes=["example-button-container"]): | |
| with gr.Column(scale=1): | |
| example_1_btn = gr.Button( | |
| "📄 Prompt Injection Attacks in AI Paper Review", | |
| elem_classes=["example-button", "example-paper"], | |
| elem_id="example_1_button", | |
| scale=None, | |
| size="sm" | |
| ) | |
| with gr.Column(scale=1): | |
| example_2_btn = gr.Button( | |
| "💻 Malicious Comments & Code in Codebase", | |
| elem_classes=["example-button", "example-movie"], | |
| elem_id="example_2_button" | |
| ) | |
| with gr.Column(scale=1): | |
| example_3_btn = gr.Button( | |
| "🤖 Malicious Instructions Misleading the LLM Agent", | |
| elem_classes=["example-button", "example-code"], | |
| elem_id="example_3_button" | |
| ) | |
| with gr.Row(elem_classes=["example-button-container"]): | |
| with gr.Column(scale=1): | |
| example_4_btn = gr.Button( | |
| "🖋 Source Texts for an AI Summary", | |
| elem_classes=["example-button", "example-paper-alt"], | |
| elem_id="example_4_button" | |
| ) | |
| with gr.Column(scale=1): | |
| example_5_btn = gr.Button( | |
| "🔍 Evidence that Support Question Answering", | |
| elem_classes=["example-button", "example-movie-alt"], | |
| elem_id="example_5_button" | |
| ) | |
| with gr.Column(scale=1): | |
| example_6_btn = gr.Button( | |
| "❌ Misinformation (Corrupted Knowledge) in Question Answering", | |
| elem_classes=["example-button", "example-code-alt"], | |
| elem_id="example_6_button" | |
| ) | |
| state = gr.State( | |
| value=clear_state() | |
| ) | |
| basic_tab = gr.Tab("Demo") | |
| with basic_tab: | |
| # gr.Markdown("## Demo") | |
| gr.Markdown( | |
| "Enter your context and instruction below to try out AttnTrace! You can also click on the example buttons above to load pre-configured examples." | |
| ) | |
| gr.Markdown( | |
| '**Color Legend for Context Traceback (by ranking):** <span style="background-color: #FF4444; color: black; padding: 2px 6px; border-radius: 4px; font-weight: 600;">Red</span> = 1st (most important) | <span style="background-color: #FF8C42; color: black; padding: 2px 6px; border-radius: 4px; font-weight: 600;">Orange</span> = 2nd | <span style="background-color: #FFD93D; color: black; padding: 2px 6px; border-radius: 4px; font-weight: 600;">Golden</span> = 3rd | <span style="background-color: #FFF280; color: black; padding: 2px 6px; border-radius: 4px; font-weight: 600;">Yellow</span> = 4th-5th | <span style="background-color: #FFF9C4; color: black; padding: 2px 6px; border-radius: 4px; font-weight: 600;">Light</span> = 6th+' | |
| ) | |
| # Top section: Wide Context box with tabs | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| with gr.Tabs() as basic_context_tabs: | |
| with gr.TabItem("Context", id=0): | |
| basic_context_box = gr.Textbox( | |
| placeholder="Enter context...", | |
| show_label=False, | |
| value="", | |
| lines=6, | |
| max_lines=6, | |
| elem_id="basic_context_box", | |
| autoscroll=False, | |
| ) | |
| with gr.TabItem("Context with highlighted traceback results", id=1, visible=True) as basic_sources_in_context_tab: | |
| basic_sources_in_context_box = HighlightedTextbox( | |
| value=[("Click on a sentence in the response below to see highlighted traceback results here.", None)], | |
| show_legend_label=False, | |
| show_label=False, | |
| show_legend=False, | |
| interactive=False, | |
| elem_id="basic_sources_in_context_box", | |
| ) | |
| # Error messages | |
| basic_generate_error_box = HighlightedTextbox( | |
| show_legend_label=False, | |
| show_label=False, | |
| show_legend=False, | |
| visible=False, | |
| interactive=False, | |
| container=False, | |
| ) | |
| # Bottom section: Left (instruction + button + response), Right (response selection) | |
| with gr.Row(equal_height=True): | |
| # Left: Instruction + Button + Response | |
| with gr.Column(scale=1): | |
| basic_query_box = gr.Textbox( | |
| label="Instruction", | |
| placeholder="Enter an instruction...", | |
| value="", | |
| lines=3, | |
| max_lines=3, | |
| ) | |
| unified_response_button = gr.Button( | |
| "Generate/Use Response", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| response_input_box = gr.Textbox( | |
| label="Response (Editable)", | |
| placeholder="Response will appear here after generation, or type your own response for traceback...", | |
| lines=8, | |
| max_lines=8, | |
| info="Leave empty and click button to generate from LLM, or type your own response to use for traceback" | |
| ) | |
| # Right: Response for attribution selection | |
| with gr.Column(scale=1): | |
| basic_response_box = gr.HighlightedText( | |
| label="Click to select text for traceback!", | |
| value=[("Click the 'Generate/Use Response' button on the left to see response text here for traceback analysis.", None)], | |
| interactive=False, | |
| combine_adjacent=False, | |
| show_label=True, | |
| show_legend=False, | |
| elem_id="basic_response_box", | |
| visible=True, | |
| ) | |
| # Button for full response traceback | |
| full_response_traceback_button = gr.Button( | |
| "🔍 Traceback Entire Response", | |
| variant="secondary", | |
| size="sm" | |
| ) | |
| # Hidden error box and dummy elements | |
| basic_attribute_error_box = HighlightedTextbox( | |
| show_legend_label=False, | |
| show_label=False, | |
| show_legend=False, | |
| visible=False, | |
| interactive=False, | |
| container=False, | |
| ) | |
| dummy_basic_sources_box = gr.Textbox( | |
| visible=False, interactive=False, container=False | |
| ) | |
| # Only a single (AttnTrace) method and model in this simplified version | |
| def basic_clear_state(): | |
| state = clear_state() | |
| return ( | |
| "", # basic_context_box | |
| "", # basic_query_box | |
| "", # response_input_box | |
| gr.update(value=[("Click the 'Generate/Use Response' button above to see response text here for traceback analysis.", None)]), # basic_response_box - keep visible | |
| gr.update(selected=0), # basic_context_tabs - switch to first tab | |
| state, | |
| ) | |
| # Defining behavior of various interactions for the basic tab | |
| basic_tab.select( | |
| fn=basic_clear_state, | |
| inputs=[], | |
| outputs=[ | |
| basic_context_box, | |
| basic_query_box, | |
| response_input_box, | |
| basic_response_box, | |
| basic_context_tabs, | |
| state, | |
| ], | |
| ) | |
| for component in [basic_context_box, basic_query_box]: | |
| component.change( | |
| basic_update, | |
| [basic_context_box, basic_query_box, state], | |
| [ | |
| basic_response_box, | |
| basic_context_tabs, | |
| state, | |
| ], | |
| ) | |
| # Example button event handlers - now update both UI and state | |
| outputs_for_examples = [ | |
| basic_context_box, | |
| basic_query_box, | |
| state, | |
| response_input_box, | |
| basic_response_box, | |
| basic_context_tabs, | |
| ] | |
| example_1_btn.click( | |
| fn=partial(load_an_example, run_example_1), | |
| inputs=[state], | |
| outputs=outputs_for_examples | |
| ) | |
| example_2_btn.click( | |
| fn=partial(load_an_example, run_example_2), | |
| inputs=[state], | |
| outputs=outputs_for_examples | |
| ) | |
| example_3_btn.click( | |
| fn=partial(load_an_example, run_example_3), | |
| inputs=[state], | |
| outputs=outputs_for_examples | |
| ) | |
| example_4_btn.click( | |
| fn=partial(load_an_example, run_example_4), | |
| inputs=[state], | |
| outputs=outputs_for_examples | |
| ) | |
| example_5_btn.click( | |
| fn=partial(load_an_example, run_example_5), | |
| inputs=[state], | |
| outputs=outputs_for_examples | |
| ) | |
| example_6_btn.click( | |
| fn=partial(load_an_example, run_example_6), | |
| inputs=[state], | |
| outputs=outputs_for_examples | |
| ) | |
| unified_response_button.click( | |
| fn=lambda: None, | |
| inputs=[], | |
| outputs=[], | |
| js=get_scroll_js_code("basic_response_box"), | |
| ) | |
| basic_response_box.change( | |
| fn=lambda: None, | |
| inputs=[], | |
| outputs=[], | |
| js=get_scroll_js_code("basic_sources_in_context_box"), | |
| ) | |
| # Add immediate tab switch on response selection | |
| def immediate_tab_switch(): | |
| return ( | |
| gr.update(value=[("🔄 Processing traceback... Please wait...", None)]), # Show progress message | |
| gr.update(selected=1), # Switch to annotation tab immediately | |
| ) | |
| basic_response_box.select( | |
| fn=immediate_tab_switch, | |
| inputs=[], | |
| outputs=[basic_sources_in_context_box, basic_context_tabs], | |
| queue=False, # Execute immediately without queue | |
| ) | |
| basic_response_box.select( | |
| fn=basic_get_scores_and_sources, | |
| inputs=[basic_response_box, state], | |
| outputs=[ | |
| basic_sources_in_context_box, | |
| basic_context_tabs, | |
| basic_sources_in_context_tab, | |
| dummy_basic_sources_box, | |
| basic_attribute_error_box, | |
| state, | |
| ], | |
| show_progress="full", | |
| ) | |
| basic_response_box.select( | |
| fn=basic_update_highlighted_response, | |
| inputs=[state], | |
| outputs=[basic_response_box, state], | |
| ) | |
| # Full response traceback button | |
| full_response_traceback_button.click( | |
| fn=immediate_tab_switch, | |
| inputs=[], | |
| outputs=[basic_sources_in_context_box, basic_context_tabs], | |
| queue=False, # Execute immediately without queue | |
| ) | |
| full_response_traceback_button.click( | |
| fn=basic_get_scores_and_sources_full_response, | |
| inputs=[state], | |
| outputs=[ | |
| basic_sources_in_context_box, | |
| basic_context_tabs, | |
| basic_sources_in_context_tab, | |
| dummy_basic_sources_box, | |
| basic_attribute_error_box, | |
| state, | |
| ], | |
| show_progress="full", | |
| ) | |
| dummy_basic_sources_box.change( | |
| fn=lambda: None, | |
| inputs=[], | |
| outputs=[], | |
| js=get_scroll_js_code("basic_sources_in_context_box"), | |
| ) | |
| # Unified response handler | |
| unified_response_button.click( | |
| fn=unified_response_handler, | |
| inputs=[response_input_box, state], | |
| outputs=[state, response_input_box, basic_response_box, basic_generate_error_box] | |
| ) | |
| # gr.Markdown( | |
| # "Please do not interact with elements while generation/attribution is in progress. This may cause errors. You can refresh the page if you run into issues because of this." | |
| # ) | |
| demo.launch(show_api=False, share=True) | |