GAIA_Agent / agents /long_context_management_agent.py
Delanoe Pirard
cookies.txt
68bd1d5
import os
import logging
import json
from typing import List, Dict, Optional, Union, Literal
from llama_index.core.agent.workflow import ReActAgent
from llama_index.core.tools import FunctionTool, QueryEngineTool
from llama_index.llms.google_genai import GoogleGenAI
from llama_index.core import Document, VectorStoreIndex, Settings
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.retrievers import VectorIndexRetriever
# Setup logging
logger = logging.getLogger(__name__)
# Configure LlamaIndex Settings (optional, but good practice)
# Ensure embedding model is set if not using default OpenAI
# Settings.embed_model = ... # Example: HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")
# Settings.llm = ... # Can set a default LLM here if needed
# Helper function to load prompt from file
def load_prompt_from_file(filename: str, default_prompt: str) -> str:
"""Loads a prompt from a text file."""
try:
script_dir = os.path.dirname(__file__)
prompt_path = os.path.join(script_dir, filename)
with open(prompt_path, "r") as f:
prompt = f.read()
logger.info(f"Successfully loaded prompt from {prompt_path}")
return prompt
except FileNotFoundError:
logger.warning(f"Prompt file {filename} not found at {prompt_path}. Using default.")
return default_prompt
except Exception as e:
logger.error(f"Error loading prompt file {filename}: {e}", exc_info=True)
return default_prompt
# --- Internal Context Index Management ---
# Store index and text globally for simplicity in this example
# In a real application, consider a more robust state management approach
_context_index: Optional[VectorStoreIndex] = None
_context_text: Optional[str] = None
_context_source: Optional[str] = None # e.g., filename or description
def _build_or_get_index(text: Optional[str] = None, source: Optional[str] = "loaded_context") -> Optional[VectorStoreIndex]:
"""Builds or retrieves the VectorStoreIndex for the loaded context."""
global _context_index, _context_text, _context_source
if text is not None and (text != _context_text or _context_index is None):
logger.info(f"Building new context index from text (length: {len(text)} chars). Source: {source}")
_context_text = text
_context_source = source
try:
# Use SentenceSplitter for chunking
splitter = SentenceSplitter(chunk_size=1024, chunk_overlap=200)
Settings.node_parser = splitter # Set globally or pass to index construction
documents = [Document(text=_context_text)]
_context_index = VectorStoreIndex.from_documents(documents, show_progress=True)
logger.info("Context index built successfully.")
except Exception as e:
logger.error(f"Failed to build context index: {e}", exc_info=True)
_context_index = None
_context_text = None
_context_source = None
return None
elif _context_index is None:
logger.warning("No context loaded or index built yet.")
return None
return _context_index
def load_text_context(text: str, source: str = "provided_text") -> str:
"""Loads text into the agent's context and builds an index. Replaces existing context."""
logger.info(f"Loading new text context (length: {len(text)} chars). Source: {source}")
index = _build_or_get_index(text=text, source=source)
if index:
return f"Successfully loaded and indexed text context from {source} (Length: {len(text)} chars)."
else:
return "Error: Failed to load or index the provided text context."
# --- Tool Functions ---
def summarize_long_context(detail_level: Literal["brief", "standard", "detailed"] = "standard",
max_length: Optional[int] = None,
min_length: Optional[int] = None) -> str:
"""Summarizes the currently loaded long text context.
Args:
detail_level (str): Level of detail: "brief" (1-2 sentences), "standard" (1-2 paragraphs), "detailed" (multiple paragraphs).
max_length (Optional[int]): Approximate maximum words (overrides detail_level if set).
min_length (Optional[int]): Approximate minimum words.
Returns:
str: The summary or an error message.
"""
global _context_text, _context_source
if _context_text is None:
return "Error: No long context has been loaded yet. Use 'load_text_context' first."
logger.info(f"Summarizing loaded context (Source: {_context_source}, Length: {len(_context_text)} chars). Detail: {detail_level}")
# Determine length guidance based on detail_level if max/min not set
if max_length is None:
if detail_level == "brief":
max_length = 50
min_length = min_length or 10
elif detail_level == "detailed":
max_length = 500
min_length = min_length or 150
else: # standard
max_length = 200
min_length = min_length or 50
min_length = min_length or int(max_length * 0.3) # Default min length
# LLM configuration
llm_model = os.getenv("CONTEXT_LLM_MODEL", "gemini-2.5-pro-preview-03-25") # Use Pro for potentially long context
gemini_api_key = os.getenv("GEMINI_API_KEY")
if not gemini_api_key:
logger.error("GEMINI_API_KEY not found for summarization LLM.")
return "Error: GEMINI_API_KEY not set."
# Truncate input text only if extremely long, as Pro handles large contexts
# Let the LLM handle context window limits if possible
# max_input_chars = 100000 # Example high limit
# text_to_summarize = _context_text[:max_input_chars] if len(_context_text) > max_input_chars else _context_text
text_to_summarize = _context_text # Rely on LLM context window
prompt = (
f"Summarize the following text concisely, focusing on the main points and key information. "
f"Aim for a length between {min_length} and {max_length} words. "
f"The requested level of detail is '{detail_level}'.\n\n"
f"TEXT:\n{text_to_summarize}\n\nSUMMARY:"
)
try:
llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05)
logger.info(f"Using summarization LLM: {llm_model}")
response = llm.complete(prompt)
summary = response.text.strip()
logger.info(f"Summarization successful (output length: {len(summary.split())} words).")
return summary
except Exception as e:
logger.error(f"LLM call failed during summarization: {e}", exc_info=True)
return f"Error during summarization: {e}"
def extract_key_information(query: str, max_results: int = 10) -> Union[List[str], str]:
"""Extracts specific information or answers a question based on the loaded long context using the index.
Args:
query (str): The question or description of information to extract (e.g., "List all decisions made", "What was mentioned about Project X?").
max_results (int): Maximum number of distinct pieces of information or text snippets to return.
Returns:
List[str]: A list of extracted text snippets or answers, or str: Error message.
"""
logger.info(f"Extracting information for query: {query} from loaded context. Max results: {max_results}")
index = _build_or_get_index() # Get existing index
if index is None:
return "Error: No context loaded or index available. Use 'load_text_context' first."
try:
# Use a query engine for extraction
# Configure retriever for potentially broader search
retriever = VectorIndexRetriever(index=index, similarity_top_k=max_results * 2) # Retrieve more initially
# Configure response synthesis (optional, can customize prompt)
# response_synthesizer = ...
query_engine = RetrieverQueryEngine.from_args(retriever=retriever,
# response_synthesizer=response_synthesizer,
# llm=Settings.llm # Use default or specify
)
# Formulate a prompt that encourages extraction rather than synthesis if needed
extraction_prompt = f"Based *only* on the provided context, extract the key information or answer the following query. List distinct findings or provide relevant text snippets. Query: {query}"
response = query_engine.query(extraction_prompt)
# Process response - might need refinement based on LLM output format
# Assuming response.response contains the extracted info, potentially needing splitting
# This part is heuristic and depends on how the LLM responds to the extraction prompt.
extracted_items = [item.strip() for item in response.response.split("\n") if item.strip()]
# Limit results if necessary
final_results = extracted_items[:max_results]
logger.info(f"Extraction successful. Found {len(final_results)} items.")
return final_results if final_results else ["No specific information found matching the query in the context."]
except Exception as e:
logger.error(f"Error during information extraction: {e}", exc_info=True)
return f"Error during extraction: {e}"
def filter_by_relevance(topic: str, threshold: float = 0.75) -> str:
"""Filters the loaded long context, retaining sections relevant to the topic using the index.
Args:
topic (str): The topic or query to filter relevance by.
threshold (float): Similarity threshold (0.0 to 1.0) for relevance. Higher means more strict.
Returns:
str: The filtered text containing only relevant sections, or an error message.
"""
logger.info(f"Filtering loaded context for relevance to topic: {topic}. Threshold: {threshold}")
index = _build_or_get_index() # Get existing index
if index is None:
return "Error: No context loaded or index available. Use 'load_text_context' first."
try:
retriever = VectorIndexRetriever(index=index, similarity_top_k=20) # Retrieve a decent number of candidates
retrieved_nodes = retriever.retrieve(topic)
relevant_texts = []
for node_with_score in retrieved_nodes:
if node_with_score.score >= threshold:
relevant_texts.append(node_with_score.node.get_content())
else:
# Since results are ordered by score, we can stop early
break
if not relevant_texts:
logger.info("No sections found meeting the relevance threshold.")
return "No content found matching the specified relevance threshold for the topic."
# Combine relevant sections (consider adding separators)
filtered_text = "\n\n---\n\n".join(relevant_texts)
logger.info(f"Filtering successful. Combined relevant text length: {len(filtered_text)} chars.")
return filtered_text
except Exception as e:
logger.error(f"Error during relevance filtering: {e}", exc_info=True)
return f"Error during filtering: {e}"
def query_context_index(query: str) -> str | None:
"""Answers a specific question based on the information contained within the loaded long context using the index.
Args:
query (str): The question to answer.
Returns:
str: The answer derived from the context, or an error/"not found" message.
"""
logger.info(f"Querying loaded context index with: {query}")
index = _build_or_get_index() # Get existing index
if index is None:
return "Error: No context loaded or index available. Use 'load_text_context' first."
try:
query_engine = index.as_query_engine(similarity_top_k=5) # Default query engine
response = query_engine.query(query)
answer = response.response.strip()
logger.info("Context query successful.")
# Check if the LLM indicated it couldn't answer
if "don't know" in answer.lower() or "no information" in answer.lower() or "context does not mention" in answer.lower():
logger.warning(f"Query response suggests information not found: {answer}")
return f"The loaded context does not seem to contain the answer to: {query}"
return answer
except Exception as e:
logger.error(f"Error during context query: {e}", exc_info=True)
return f"Error querying context: {e}"
# --- Tool Definitions ---
load_context_tool = FunctionTool.from_defaults(
fn=load_text_context,
name="load_text_context",
description=(
"Loads/replaces the long text context for the agent and builds an internal index. "
"Input: text (str), Optional: source (str). Output: Status message (str)."
),
)
summarize_context_tool = FunctionTool.from_defaults(
fn=summarize_long_context,
name="summarize_long_context",
description=(
"Summarizes the currently loaded long text context. "
"Input: Optional: detail_level ('brief', 'standard', 'detailed'), max_length (int), min_length (int). Output: Summary (str) or error."
),
)
extract_info_tool = FunctionTool.from_defaults(
fn=extract_key_information,
name="extract_key_information",
description=(
"Extracts specific information or answers questions from the loaded context using its index. "
"Input: query (str), Optional: max_results (int). Output: List[str] of findings or error string."
),
)
filter_context_tool = FunctionTool.from_defaults(
fn=filter_by_relevance,
name="filter_by_relevance",
description=(
"Filters the loaded context to retain only sections relevant to a topic, using the index. "
"Input: topic (str), Optional: threshold (float 0-1). Output: Filtered text (str) or error."
),
)
query_context_tool = FunctionTool.from_defaults(
fn=query_context_index,
name="query_context_index",
description=(
"Answers a specific question based *only* on the loaded long context using its index. "
"Input: query (str). Output: Answer (str) or error/'not found' message."
),
)
# --- Agent Initialization ---
def initialize_long_context_management_agent() -> ReActAgent:
"""Initializes the Long Context Management Agent."""
logger.info("Initializing LongContextManagementAgent...")
# Configuration for the agent's main LLM
agent_llm_model = os.getenv("CONTEXT_AGENT_LLM_MODEL", "gemini-2.5-pro-preview-03-25") # Needs to handle planning
gemini_api_key = os.getenv("GEMINI_API_KEY")
if not gemini_api_key:
logger.error("GEMINI_API_KEY not found for LongContextManagementAgent.")
raise ValueError("GEMINI_API_KEY must be set for LongContextManagementAgent")
try:
llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05)
logger.info(f"Using agent LLM: {agent_llm_model}")
Settings.llm = llm # Set default LLM for LlamaIndex components used by tools
# Load system prompt
default_system_prompt = ("You are LongContextManagementAgent... [Default prompt content - replace with actual]" # Placeholder
)
system_prompt = load_prompt_from_file("../prompts/long_context_management_agent_prompt.txt", default_system_prompt)
if system_prompt == default_system_prompt:
logger.warning("Using default/fallback system prompt for LongContextManagementAgent.")
# Define available tools
tools = [
load_context_tool,
summarize_context_tool,
extract_info_tool,
filter_context_tool,
query_context_tool
]
# Define valid handoff targets
valid_handoffs = [
"planner_agent", # To return results
"text_analyzer_agent", # If further analysis of extracted/filtered text is needed
"reasoning_agent",
"research_agent"
]
agent = ReActAgent(
name="long_context_management_agent",
description=(
"Manages and processes long textual context efficiently. Handles large documents, transcripts, or datasets "
"by summarizing (`summarize_long_context`), extracting key information (`extract_key_information`), "
"filtering relevant content (`filter_by_relevance`), and answering questions based on the context (`query_context_index`). "
"Supports internal indexing for efficient retrieval and repeated queries. Optimized for chunked input processing "
"and contextual distillation. Only relies on the provided input and avoids external augmentation unless explicitly requested."
),
tools=tools,
llm=llm,
system_prompt=system_prompt,
can_handoff_to=valid_handoffs,
)
logger.info("LongContextManagementAgent initialized successfully.")
return agent
except Exception as e:
logger.error(f"Error during LongContextManagementAgent initialization: {e}", exc_info=True)
raise
# Example usage (for testing if run directly)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# Set LlamaIndex log level higher to reduce noise during testing
logging.getLogger("llama_index.core.indices.vector_store").setLevel(logging.WARNING)
logging.getLogger("llama_index.core.query_engine").setLevel(logging.WARNING)
logging.getLogger("llama_index.core.token_counter").setLevel(logging.ERROR) # Suppress token counting logs
logger.info("Running long_context_management_agent.py directly for testing...")
# Check required keys
required_keys = ["GEMINI_API_KEY"]
missing_keys = [key for key in required_keys if not os.getenv(key)]
if missing_keys:
print(f"Error: Required environment variable(s) not set: {', '.join(missing_keys)}. Cannot run test.")
else:
try:
# Example long text
long_text = """
Meeting Minutes - Project Phoenix - April 28, 2025
Attendees: Alice, Bob, Charlie, David
Agenda: Review Q1 results, Plan Q2 roadmap, Budget allocation
Q1 Results Discussion:
Alice presented the sales figures. Sales increased by 15% compared to Q4 2024, exceeding the target of 10%.
Bob highlighted the success of the marketing campaign launched in February. Customer acquisition cost decreased by 5%.
Charlie noted a slight dip in user engagement metrics in March, possibly due to a recent UI change.
Action Item: David to investigate the engagement dip.
Q2 Roadmap Planning:
The team discussed potential features for Q2. Feature A (enhanced reporting) was prioritized.
Feature B (mobile app improvements) was deferred to Q3.
Alice emphasized the need for stability improvements. Bob suggested focusing on performance optimization.
Decision: Q2 focus will be on Feature A and performance/stability improvements.
Budget Allocation:
Charlie presented the proposed budget.
An additional $50,000 was requested for cloud infrastructure scaling due to increased usage.
David questioned the necessity of the full amount.
After discussion, the team approved an additional $40,000 for infrastructure.
Decision: Allocate $40,000 extra for Q2 infrastructure.
Next Steps:
David to report on engagement metrics by May 5th.
Alice to finalize Q2 feature specifications by May 10th.
Meeting adjourned.
""" * 5 # Make it longer
# Test loading context
print("\nTesting load_text_context...")
load_status = load_text_context(long_text, source="Meeting Minutes Test")
print(load_status)
if "Error" not in load_status:
# Test summarization
print("\nTesting summarize_long_context (brief)...")
summary_brief = summarize_long_context(detail_level="brief")
print(f"Brief Summary: {summary_brief}")
# Test extraction
print("\nTesting extract_key_information (decisions)...")
decisions = extract_key_information(query="List all decisions made in the meeting")
print(f"Decisions Extracted: {decisions}")
# Test filtering
print("\nTesting filter_by_relevance (budget)...")
budget_text = filter_by_relevance(topic="budget allocation", threshold=0.7)
print(f"Filtered Budget Text (first 300 chars):\n{budget_text[:300]}...")
# Test querying
print("\nTesting query_context_index (Q1 sales)...")
sales_query = "What was the sales increase in Q1?"
sales_answer = query_context_index(sales_query)
print(f"Answer to '{sales_query}': {sales_answer}")
print("\nTesting query_context_index (non-existent info)...")
non_existent_query = "Who is the CEO?"
non_existent_answer = query_context_index(non_existent_query)
print(f"Answer to '{non_existent_query}': {non_existent_answer}")
# Initialize the agent (optional)
# test_agent = initialize_long_context_management_agent()
# print("\nLong Context Management Agent initialized successfully for testing.")
except Exception as e:
print(f"Error during testing: {e}")