Spaces:
Running
Running
import os | |
import logging | |
import json | |
from typing import List, Dict, Optional, Union, Literal | |
from llama_index.core.agent.workflow import ReActAgent | |
from llama_index.core.tools import FunctionTool, QueryEngineTool | |
from llama_index.llms.google_genai import GoogleGenAI | |
from llama_index.core import Document, VectorStoreIndex, Settings | |
from llama_index.core.node_parser import SentenceSplitter | |
from llama_index.core.query_engine import RetrieverQueryEngine | |
from llama_index.core.retrievers import VectorIndexRetriever | |
# Setup logging | |
logger = logging.getLogger(__name__) | |
# Configure LlamaIndex Settings (optional, but good practice) | |
# Ensure embedding model is set if not using default OpenAI | |
# Settings.embed_model = ... # Example: HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5") | |
# Settings.llm = ... # Can set a default LLM here if needed | |
# Helper function to load prompt from file | |
def load_prompt_from_file(filename: str, default_prompt: str) -> str: | |
"""Loads a prompt from a text file.""" | |
try: | |
script_dir = os.path.dirname(__file__) | |
prompt_path = os.path.join(script_dir, filename) | |
with open(prompt_path, "r") as f: | |
prompt = f.read() | |
logger.info(f"Successfully loaded prompt from {prompt_path}") | |
return prompt | |
except FileNotFoundError: | |
logger.warning(f"Prompt file {filename} not found at {prompt_path}. Using default.") | |
return default_prompt | |
except Exception as e: | |
logger.error(f"Error loading prompt file {filename}: {e}", exc_info=True) | |
return default_prompt | |
# --- Internal Context Index Management --- | |
# Store index and text globally for simplicity in this example | |
# In a real application, consider a more robust state management approach | |
_context_index: Optional[VectorStoreIndex] = None | |
_context_text: Optional[str] = None | |
_context_source: Optional[str] = None # e.g., filename or description | |
def _build_or_get_index(text: Optional[str] = None, source: Optional[str] = "loaded_context") -> Optional[VectorStoreIndex]: | |
"""Builds or retrieves the VectorStoreIndex for the loaded context.""" | |
global _context_index, _context_text, _context_source | |
if text is not None and (text != _context_text or _context_index is None): | |
logger.info(f"Building new context index from text (length: {len(text)} chars). Source: {source}") | |
_context_text = text | |
_context_source = source | |
try: | |
# Use SentenceSplitter for chunking | |
splitter = SentenceSplitter(chunk_size=1024, chunk_overlap=200) | |
Settings.node_parser = splitter # Set globally or pass to index construction | |
documents = [Document(text=_context_text)] | |
_context_index = VectorStoreIndex.from_documents(documents, show_progress=True) | |
logger.info("Context index built successfully.") | |
except Exception as e: | |
logger.error(f"Failed to build context index: {e}", exc_info=True) | |
_context_index = None | |
_context_text = None | |
_context_source = None | |
return None | |
elif _context_index is None: | |
logger.warning("No context loaded or index built yet.") | |
return None | |
return _context_index | |
def load_text_context(text: str, source: str = "provided_text") -> str: | |
"""Loads text into the agent's context and builds an index. Replaces existing context.""" | |
logger.info(f"Loading new text context (length: {len(text)} chars). Source: {source}") | |
index = _build_or_get_index(text=text, source=source) | |
if index: | |
return f"Successfully loaded and indexed text context from {source} (Length: {len(text)} chars)." | |
else: | |
return "Error: Failed to load or index the provided text context." | |
# --- Tool Functions --- | |
def summarize_long_context(detail_level: Literal["brief", "standard", "detailed"] = "standard", | |
max_length: Optional[int] = None, | |
min_length: Optional[int] = None) -> str: | |
"""Summarizes the currently loaded long text context. | |
Args: | |
detail_level (str): Level of detail: "brief" (1-2 sentences), "standard" (1-2 paragraphs), "detailed" (multiple paragraphs). | |
max_length (Optional[int]): Approximate maximum words (overrides detail_level if set). | |
min_length (Optional[int]): Approximate minimum words. | |
Returns: | |
str: The summary or an error message. | |
""" | |
global _context_text, _context_source | |
if _context_text is None: | |
return "Error: No long context has been loaded yet. Use 'load_text_context' first." | |
logger.info(f"Summarizing loaded context (Source: {_context_source}, Length: {len(_context_text)} chars). Detail: {detail_level}") | |
# Determine length guidance based on detail_level if max/min not set | |
if max_length is None: | |
if detail_level == "brief": | |
max_length = 50 | |
min_length = min_length or 10 | |
elif detail_level == "detailed": | |
max_length = 500 | |
min_length = min_length or 150 | |
else: # standard | |
max_length = 200 | |
min_length = min_length or 50 | |
min_length = min_length or int(max_length * 0.3) # Default min length | |
# LLM configuration | |
llm_model = os.getenv("CONTEXT_LLM_MODEL", "gemini-2.5-pro-preview-03-25") # Use Pro for potentially long context | |
gemini_api_key = os.getenv("GEMINI_API_KEY") | |
if not gemini_api_key: | |
logger.error("GEMINI_API_KEY not found for summarization LLM.") | |
return "Error: GEMINI_API_KEY not set." | |
# Truncate input text only if extremely long, as Pro handles large contexts | |
# Let the LLM handle context window limits if possible | |
# max_input_chars = 100000 # Example high limit | |
# text_to_summarize = _context_text[:max_input_chars] if len(_context_text) > max_input_chars else _context_text | |
text_to_summarize = _context_text # Rely on LLM context window | |
prompt = ( | |
f"Summarize the following text concisely, focusing on the main points and key information. " | |
f"Aim for a length between {min_length} and {max_length} words. " | |
f"The requested level of detail is '{detail_level}'.\n\n" | |
f"TEXT:\n{text_to_summarize}\n\nSUMMARY:" | |
) | |
try: | |
llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05) | |
logger.info(f"Using summarization LLM: {llm_model}") | |
response = llm.complete(prompt) | |
summary = response.text.strip() | |
logger.info(f"Summarization successful (output length: {len(summary.split())} words).") | |
return summary | |
except Exception as e: | |
logger.error(f"LLM call failed during summarization: {e}", exc_info=True) | |
return f"Error during summarization: {e}" | |
def extract_key_information(query: str, max_results: int = 10) -> Union[List[str], str]: | |
"""Extracts specific information or answers a question based on the loaded long context using the index. | |
Args: | |
query (str): The question or description of information to extract (e.g., "List all decisions made", "What was mentioned about Project X?"). | |
max_results (int): Maximum number of distinct pieces of information or text snippets to return. | |
Returns: | |
List[str]: A list of extracted text snippets or answers, or str: Error message. | |
""" | |
logger.info(f"Extracting information for query: {query} from loaded context. Max results: {max_results}") | |
index = _build_or_get_index() # Get existing index | |
if index is None: | |
return "Error: No context loaded or index available. Use 'load_text_context' first." | |
try: | |
# Use a query engine for extraction | |
# Configure retriever for potentially broader search | |
retriever = VectorIndexRetriever(index=index, similarity_top_k=max_results * 2) # Retrieve more initially | |
# Configure response synthesis (optional, can customize prompt) | |
# response_synthesizer = ... | |
query_engine = RetrieverQueryEngine.from_args(retriever=retriever, | |
# response_synthesizer=response_synthesizer, | |
# llm=Settings.llm # Use default or specify | |
) | |
# Formulate a prompt that encourages extraction rather than synthesis if needed | |
extraction_prompt = f"Based *only* on the provided context, extract the key information or answer the following query. List distinct findings or provide relevant text snippets. Query: {query}" | |
response = query_engine.query(extraction_prompt) | |
# Process response - might need refinement based on LLM output format | |
# Assuming response.response contains the extracted info, potentially needing splitting | |
# This part is heuristic and depends on how the LLM responds to the extraction prompt. | |
extracted_items = [item.strip() for item in response.response.split("\n") if item.strip()] | |
# Limit results if necessary | |
final_results = extracted_items[:max_results] | |
logger.info(f"Extraction successful. Found {len(final_results)} items.") | |
return final_results if final_results else ["No specific information found matching the query in the context."] | |
except Exception as e: | |
logger.error(f"Error during information extraction: {e}", exc_info=True) | |
return f"Error during extraction: {e}" | |
def filter_by_relevance(topic: str, threshold: float = 0.75) -> str: | |
"""Filters the loaded long context, retaining sections relevant to the topic using the index. | |
Args: | |
topic (str): The topic or query to filter relevance by. | |
threshold (float): Similarity threshold (0.0 to 1.0) for relevance. Higher means more strict. | |
Returns: | |
str: The filtered text containing only relevant sections, or an error message. | |
""" | |
logger.info(f"Filtering loaded context for relevance to topic: {topic}. Threshold: {threshold}") | |
index = _build_or_get_index() # Get existing index | |
if index is None: | |
return "Error: No context loaded or index available. Use 'load_text_context' first." | |
try: | |
retriever = VectorIndexRetriever(index=index, similarity_top_k=20) # Retrieve a decent number of candidates | |
retrieved_nodes = retriever.retrieve(topic) | |
relevant_texts = [] | |
for node_with_score in retrieved_nodes: | |
if node_with_score.score >= threshold: | |
relevant_texts.append(node_with_score.node.get_content()) | |
else: | |
# Since results are ordered by score, we can stop early | |
break | |
if not relevant_texts: | |
logger.info("No sections found meeting the relevance threshold.") | |
return "No content found matching the specified relevance threshold for the topic." | |
# Combine relevant sections (consider adding separators) | |
filtered_text = "\n\n---\n\n".join(relevant_texts) | |
logger.info(f"Filtering successful. Combined relevant text length: {len(filtered_text)} chars.") | |
return filtered_text | |
except Exception as e: | |
logger.error(f"Error during relevance filtering: {e}", exc_info=True) | |
return f"Error during filtering: {e}" | |
def query_context_index(query: str) -> str | None: | |
"""Answers a specific question based on the information contained within the loaded long context using the index. | |
Args: | |
query (str): The question to answer. | |
Returns: | |
str: The answer derived from the context, or an error/"not found" message. | |
""" | |
logger.info(f"Querying loaded context index with: {query}") | |
index = _build_or_get_index() # Get existing index | |
if index is None: | |
return "Error: No context loaded or index available. Use 'load_text_context' first." | |
try: | |
query_engine = index.as_query_engine(similarity_top_k=5) # Default query engine | |
response = query_engine.query(query) | |
answer = response.response.strip() | |
logger.info("Context query successful.") | |
# Check if the LLM indicated it couldn't answer | |
if "don't know" in answer.lower() or "no information" in answer.lower() or "context does not mention" in answer.lower(): | |
logger.warning(f"Query response suggests information not found: {answer}") | |
return f"The loaded context does not seem to contain the answer to: {query}" | |
return answer | |
except Exception as e: | |
logger.error(f"Error during context query: {e}", exc_info=True) | |
return f"Error querying context: {e}" | |
# --- Tool Definitions --- | |
load_context_tool = FunctionTool.from_defaults( | |
fn=load_text_context, | |
name="load_text_context", | |
description=( | |
"Loads/replaces the long text context for the agent and builds an internal index. " | |
"Input: text (str), Optional: source (str). Output: Status message (str)." | |
), | |
) | |
summarize_context_tool = FunctionTool.from_defaults( | |
fn=summarize_long_context, | |
name="summarize_long_context", | |
description=( | |
"Summarizes the currently loaded long text context. " | |
"Input: Optional: detail_level ('brief', 'standard', 'detailed'), max_length (int), min_length (int). Output: Summary (str) or error." | |
), | |
) | |
extract_info_tool = FunctionTool.from_defaults( | |
fn=extract_key_information, | |
name="extract_key_information", | |
description=( | |
"Extracts specific information or answers questions from the loaded context using its index. " | |
"Input: query (str), Optional: max_results (int). Output: List[str] of findings or error string." | |
), | |
) | |
filter_context_tool = FunctionTool.from_defaults( | |
fn=filter_by_relevance, | |
name="filter_by_relevance", | |
description=( | |
"Filters the loaded context to retain only sections relevant to a topic, using the index. " | |
"Input: topic (str), Optional: threshold (float 0-1). Output: Filtered text (str) or error." | |
), | |
) | |
query_context_tool = FunctionTool.from_defaults( | |
fn=query_context_index, | |
name="query_context_index", | |
description=( | |
"Answers a specific question based *only* on the loaded long context using its index. " | |
"Input: query (str). Output: Answer (str) or error/'not found' message." | |
), | |
) | |
# --- Agent Initialization --- | |
def initialize_long_context_management_agent() -> ReActAgent: | |
"""Initializes the Long Context Management Agent.""" | |
logger.info("Initializing LongContextManagementAgent...") | |
# Configuration for the agent's main LLM | |
agent_llm_model = os.getenv("CONTEXT_AGENT_LLM_MODEL", "gemini-2.5-pro-preview-03-25") # Needs to handle planning | |
gemini_api_key = os.getenv("GEMINI_API_KEY") | |
if not gemini_api_key: | |
logger.error("GEMINI_API_KEY not found for LongContextManagementAgent.") | |
raise ValueError("GEMINI_API_KEY must be set for LongContextManagementAgent") | |
try: | |
llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05) | |
logger.info(f"Using agent LLM: {agent_llm_model}") | |
Settings.llm = llm # Set default LLM for LlamaIndex components used by tools | |
# Load system prompt | |
default_system_prompt = ("You are LongContextManagementAgent... [Default prompt content - replace with actual]" # Placeholder | |
) | |
system_prompt = load_prompt_from_file("../prompts/long_context_management_agent_prompt.txt", default_system_prompt) | |
if system_prompt == default_system_prompt: | |
logger.warning("Using default/fallback system prompt for LongContextManagementAgent.") | |
# Define available tools | |
tools = [ | |
load_context_tool, | |
summarize_context_tool, | |
extract_info_tool, | |
filter_context_tool, | |
query_context_tool | |
] | |
# Define valid handoff targets | |
valid_handoffs = [ | |
"planner_agent", # To return results | |
"text_analyzer_agent", # If further analysis of extracted/filtered text is needed | |
"reasoning_agent", | |
"research_agent" | |
] | |
agent = ReActAgent( | |
name="long_context_management_agent", | |
description=( | |
"Manages and processes long textual context efficiently. Handles large documents, transcripts, or datasets " | |
"by summarizing (`summarize_long_context`), extracting key information (`extract_key_information`), " | |
"filtering relevant content (`filter_by_relevance`), and answering questions based on the context (`query_context_index`). " | |
"Supports internal indexing for efficient retrieval and repeated queries. Optimized for chunked input processing " | |
"and contextual distillation. Only relies on the provided input and avoids external augmentation unless explicitly requested." | |
), | |
tools=tools, | |
llm=llm, | |
system_prompt=system_prompt, | |
can_handoff_to=valid_handoffs, | |
) | |
logger.info("LongContextManagementAgent initialized successfully.") | |
return agent | |
except Exception as e: | |
logger.error(f"Error during LongContextManagementAgent initialization: {e}", exc_info=True) | |
raise | |
# Example usage (for testing if run directly) | |
if __name__ == "__main__": | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
# Set LlamaIndex log level higher to reduce noise during testing | |
logging.getLogger("llama_index.core.indices.vector_store").setLevel(logging.WARNING) | |
logging.getLogger("llama_index.core.query_engine").setLevel(logging.WARNING) | |
logging.getLogger("llama_index.core.token_counter").setLevel(logging.ERROR) # Suppress token counting logs | |
logger.info("Running long_context_management_agent.py directly for testing...") | |
# Check required keys | |
required_keys = ["GEMINI_API_KEY"] | |
missing_keys = [key for key in required_keys if not os.getenv(key)] | |
if missing_keys: | |
print(f"Error: Required environment variable(s) not set: {', '.join(missing_keys)}. Cannot run test.") | |
else: | |
try: | |
# Example long text | |
long_text = """ | |
Meeting Minutes - Project Phoenix - April 28, 2025 | |
Attendees: Alice, Bob, Charlie, David | |
Agenda: Review Q1 results, Plan Q2 roadmap, Budget allocation | |
Q1 Results Discussion: | |
Alice presented the sales figures. Sales increased by 15% compared to Q4 2024, exceeding the target of 10%. | |
Bob highlighted the success of the marketing campaign launched in February. Customer acquisition cost decreased by 5%. | |
Charlie noted a slight dip in user engagement metrics in March, possibly due to a recent UI change. | |
Action Item: David to investigate the engagement dip. | |
Q2 Roadmap Planning: | |
The team discussed potential features for Q2. Feature A (enhanced reporting) was prioritized. | |
Feature B (mobile app improvements) was deferred to Q3. | |
Alice emphasized the need for stability improvements. Bob suggested focusing on performance optimization. | |
Decision: Q2 focus will be on Feature A and performance/stability improvements. | |
Budget Allocation: | |
Charlie presented the proposed budget. | |
An additional $50,000 was requested for cloud infrastructure scaling due to increased usage. | |
David questioned the necessity of the full amount. | |
After discussion, the team approved an additional $40,000 for infrastructure. | |
Decision: Allocate $40,000 extra for Q2 infrastructure. | |
Next Steps: | |
David to report on engagement metrics by May 5th. | |
Alice to finalize Q2 feature specifications by May 10th. | |
Meeting adjourned. | |
""" * 5 # Make it longer | |
# Test loading context | |
print("\nTesting load_text_context...") | |
load_status = load_text_context(long_text, source="Meeting Minutes Test") | |
print(load_status) | |
if "Error" not in load_status: | |
# Test summarization | |
print("\nTesting summarize_long_context (brief)...") | |
summary_brief = summarize_long_context(detail_level="brief") | |
print(f"Brief Summary: {summary_brief}") | |
# Test extraction | |
print("\nTesting extract_key_information (decisions)...") | |
decisions = extract_key_information(query="List all decisions made in the meeting") | |
print(f"Decisions Extracted: {decisions}") | |
# Test filtering | |
print("\nTesting filter_by_relevance (budget)...") | |
budget_text = filter_by_relevance(topic="budget allocation", threshold=0.7) | |
print(f"Filtered Budget Text (first 300 chars):\n{budget_text[:300]}...") | |
# Test querying | |
print("\nTesting query_context_index (Q1 sales)...") | |
sales_query = "What was the sales increase in Q1?" | |
sales_answer = query_context_index(sales_query) | |
print(f"Answer to '{sales_query}': {sales_answer}") | |
print("\nTesting query_context_index (non-existent info)...") | |
non_existent_query = "Who is the CEO?" | |
non_existent_answer = query_context_index(non_existent_query) | |
print(f"Answer to '{non_existent_query}': {non_existent_answer}") | |
# Initialize the agent (optional) | |
# test_agent = initialize_long_context_management_agent() | |
# print("\nLong Context Management Agent initialized successfully for testing.") | |
except Exception as e: | |
print(f"Error during testing: {e}") | |