import os import logging import json from typing import List, Dict, Optional, Union, Literal from llama_index.core.agent.workflow import ReActAgent from llama_index.core.tools import FunctionTool, QueryEngineTool from llama_index.llms.google_genai import GoogleGenAI from llama_index.core import Document, VectorStoreIndex, Settings from llama_index.core.node_parser import SentenceSplitter from llama_index.core.query_engine import RetrieverQueryEngine from llama_index.core.retrievers import VectorIndexRetriever # Setup logging logger = logging.getLogger(__name__) # Configure LlamaIndex Settings (optional, but good practice) # Ensure embedding model is set if not using default OpenAI # Settings.embed_model = ... # Example: HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5") # Settings.llm = ... # Can set a default LLM here if needed # Helper function to load prompt from file def load_prompt_from_file(filename: str, default_prompt: str) -> str: """Loads a prompt from a text file.""" try: script_dir = os.path.dirname(__file__) prompt_path = os.path.join(script_dir, filename) with open(prompt_path, "r") as f: prompt = f.read() logger.info(f"Successfully loaded prompt from {prompt_path}") return prompt except FileNotFoundError: logger.warning(f"Prompt file {filename} not found at {prompt_path}. Using default.") return default_prompt except Exception as e: logger.error(f"Error loading prompt file {filename}: {e}", exc_info=True) return default_prompt # --- Internal Context Index Management --- # Store index and text globally for simplicity in this example # In a real application, consider a more robust state management approach _context_index: Optional[VectorStoreIndex] = None _context_text: Optional[str] = None _context_source: Optional[str] = None # e.g., filename or description def _build_or_get_index(text: Optional[str] = None, source: Optional[str] = "loaded_context") -> Optional[VectorStoreIndex]: """Builds or retrieves the VectorStoreIndex for the loaded context.""" global _context_index, _context_text, _context_source if text is not None and (text != _context_text or _context_index is None): logger.info(f"Building new context index from text (length: {len(text)} chars). Source: {source}") _context_text = text _context_source = source try: # Use SentenceSplitter for chunking splitter = SentenceSplitter(chunk_size=1024, chunk_overlap=200) Settings.node_parser = splitter # Set globally or pass to index construction documents = [Document(text=_context_text)] _context_index = VectorStoreIndex.from_documents(documents, show_progress=True) logger.info("Context index built successfully.") except Exception as e: logger.error(f"Failed to build context index: {e}", exc_info=True) _context_index = None _context_text = None _context_source = None return None elif _context_index is None: logger.warning("No context loaded or index built yet.") return None return _context_index def load_text_context(text: str, source: str = "provided_text") -> str: """Loads text into the agent's context and builds an index. Replaces existing context.""" logger.info(f"Loading new text context (length: {len(text)} chars). Source: {source}") index = _build_or_get_index(text=text, source=source) if index: return f"Successfully loaded and indexed text context from {source} (Length: {len(text)} chars)." else: return "Error: Failed to load or index the provided text context." # --- Tool Functions --- def summarize_long_context(detail_level: Literal["brief", "standard", "detailed"] = "standard", max_length: Optional[int] = None, min_length: Optional[int] = None) -> str: """Summarizes the currently loaded long text context. Args: detail_level (str): Level of detail: "brief" (1-2 sentences), "standard" (1-2 paragraphs), "detailed" (multiple paragraphs). max_length (Optional[int]): Approximate maximum words (overrides detail_level if set). min_length (Optional[int]): Approximate minimum words. Returns: str: The summary or an error message. """ global _context_text, _context_source if _context_text is None: return "Error: No long context has been loaded yet. Use 'load_text_context' first." logger.info(f"Summarizing loaded context (Source: {_context_source}, Length: {len(_context_text)} chars). Detail: {detail_level}") # Determine length guidance based on detail_level if max/min not set if max_length is None: if detail_level == "brief": max_length = 50 min_length = min_length or 10 elif detail_level == "detailed": max_length = 500 min_length = min_length or 150 else: # standard max_length = 200 min_length = min_length or 50 min_length = min_length or int(max_length * 0.3) # Default min length # LLM configuration llm_model = os.getenv("CONTEXT_LLM_MODEL", "gemini-2.5-pro-preview-03-25") # Use Pro for potentially long context gemini_api_key = os.getenv("GEMINI_API_KEY") if not gemini_api_key: logger.error("GEMINI_API_KEY not found for summarization LLM.") return "Error: GEMINI_API_KEY not set." # Truncate input text only if extremely long, as Pro handles large contexts # Let the LLM handle context window limits if possible # max_input_chars = 100000 # Example high limit # text_to_summarize = _context_text[:max_input_chars] if len(_context_text) > max_input_chars else _context_text text_to_summarize = _context_text # Rely on LLM context window prompt = ( f"Summarize the following text concisely, focusing on the main points and key information. " f"Aim for a length between {min_length} and {max_length} words. " f"The requested level of detail is '{detail_level}'.\n\n" f"TEXT:\n{text_to_summarize}\n\nSUMMARY:" ) try: llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05) logger.info(f"Using summarization LLM: {llm_model}") response = llm.complete(prompt) summary = response.text.strip() logger.info(f"Summarization successful (output length: {len(summary.split())} words).") return summary except Exception as e: logger.error(f"LLM call failed during summarization: {e}", exc_info=True) return f"Error during summarization: {e}" def extract_key_information(query: str, max_results: int = 10) -> Union[List[str], str]: """Extracts specific information or answers a question based on the loaded long context using the index. Args: query (str): The question or description of information to extract (e.g., "List all decisions made", "What was mentioned about Project X?"). max_results (int): Maximum number of distinct pieces of information or text snippets to return. Returns: List[str]: A list of extracted text snippets or answers, or str: Error message. """ logger.info(f"Extracting information for query: {query} from loaded context. Max results: {max_results}") index = _build_or_get_index() # Get existing index if index is None: return "Error: No context loaded or index available. Use 'load_text_context' first." try: # Use a query engine for extraction # Configure retriever for potentially broader search retriever = VectorIndexRetriever(index=index, similarity_top_k=max_results * 2) # Retrieve more initially # Configure response synthesis (optional, can customize prompt) # response_synthesizer = ... query_engine = RetrieverQueryEngine.from_args(retriever=retriever, # response_synthesizer=response_synthesizer, # llm=Settings.llm # Use default or specify ) # Formulate a prompt that encourages extraction rather than synthesis if needed extraction_prompt = f"Based *only* on the provided context, extract the key information or answer the following query. List distinct findings or provide relevant text snippets. Query: {query}" response = query_engine.query(extraction_prompt) # Process response - might need refinement based on LLM output format # Assuming response.response contains the extracted info, potentially needing splitting # This part is heuristic and depends on how the LLM responds to the extraction prompt. extracted_items = [item.strip() for item in response.response.split("\n") if item.strip()] # Limit results if necessary final_results = extracted_items[:max_results] logger.info(f"Extraction successful. Found {len(final_results)} items.") return final_results if final_results else ["No specific information found matching the query in the context."] except Exception as e: logger.error(f"Error during information extraction: {e}", exc_info=True) return f"Error during extraction: {e}" def filter_by_relevance(topic: str, threshold: float = 0.75) -> str: """Filters the loaded long context, retaining sections relevant to the topic using the index. Args: topic (str): The topic or query to filter relevance by. threshold (float): Similarity threshold (0.0 to 1.0) for relevance. Higher means more strict. Returns: str: The filtered text containing only relevant sections, or an error message. """ logger.info(f"Filtering loaded context for relevance to topic: {topic}. Threshold: {threshold}") index = _build_or_get_index() # Get existing index if index is None: return "Error: No context loaded or index available. Use 'load_text_context' first." try: retriever = VectorIndexRetriever(index=index, similarity_top_k=20) # Retrieve a decent number of candidates retrieved_nodes = retriever.retrieve(topic) relevant_texts = [] for node_with_score in retrieved_nodes: if node_with_score.score >= threshold: relevant_texts.append(node_with_score.node.get_content()) else: # Since results are ordered by score, we can stop early break if not relevant_texts: logger.info("No sections found meeting the relevance threshold.") return "No content found matching the specified relevance threshold for the topic." # Combine relevant sections (consider adding separators) filtered_text = "\n\n---\n\n".join(relevant_texts) logger.info(f"Filtering successful. Combined relevant text length: {len(filtered_text)} chars.") return filtered_text except Exception as e: logger.error(f"Error during relevance filtering: {e}", exc_info=True) return f"Error during filtering: {e}" def query_context_index(query: str) -> str | None: """Answers a specific question based on the information contained within the loaded long context using the index. Args: query (str): The question to answer. Returns: str: The answer derived from the context, or an error/"not found" message. """ logger.info(f"Querying loaded context index with: {query}") index = _build_or_get_index() # Get existing index if index is None: return "Error: No context loaded or index available. Use 'load_text_context' first." try: query_engine = index.as_query_engine(similarity_top_k=5) # Default query engine response = query_engine.query(query) answer = response.response.strip() logger.info("Context query successful.") # Check if the LLM indicated it couldn't answer if "don't know" in answer.lower() or "no information" in answer.lower() or "context does not mention" in answer.lower(): logger.warning(f"Query response suggests information not found: {answer}") return f"The loaded context does not seem to contain the answer to: {query}" return answer except Exception as e: logger.error(f"Error during context query: {e}", exc_info=True) return f"Error querying context: {e}" # --- Tool Definitions --- load_context_tool = FunctionTool.from_defaults( fn=load_text_context, name="load_text_context", description=( "Loads/replaces the long text context for the agent and builds an internal index. " "Input: text (str), Optional: source (str). Output: Status message (str)." ), ) summarize_context_tool = FunctionTool.from_defaults( fn=summarize_long_context, name="summarize_long_context", description=( "Summarizes the currently loaded long text context. " "Input: Optional: detail_level ('brief', 'standard', 'detailed'), max_length (int), min_length (int). Output: Summary (str) or error." ), ) extract_info_tool = FunctionTool.from_defaults( fn=extract_key_information, name="extract_key_information", description=( "Extracts specific information or answers questions from the loaded context using its index. " "Input: query (str), Optional: max_results (int). Output: List[str] of findings or error string." ), ) filter_context_tool = FunctionTool.from_defaults( fn=filter_by_relevance, name="filter_by_relevance", description=( "Filters the loaded context to retain only sections relevant to a topic, using the index. " "Input: topic (str), Optional: threshold (float 0-1). Output: Filtered text (str) or error." ), ) query_context_tool = FunctionTool.from_defaults( fn=query_context_index, name="query_context_index", description=( "Answers a specific question based *only* on the loaded long context using its index. " "Input: query (str). Output: Answer (str) or error/'not found' message." ), ) # --- Agent Initialization --- def initialize_long_context_management_agent() -> ReActAgent: """Initializes the Long Context Management Agent.""" logger.info("Initializing LongContextManagementAgent...") # Configuration for the agent's main LLM agent_llm_model = os.getenv("CONTEXT_AGENT_LLM_MODEL", "gemini-2.5-pro-preview-03-25") # Needs to handle planning gemini_api_key = os.getenv("GEMINI_API_KEY") if not gemini_api_key: logger.error("GEMINI_API_KEY not found for LongContextManagementAgent.") raise ValueError("GEMINI_API_KEY must be set for LongContextManagementAgent") try: llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05) logger.info(f"Using agent LLM: {agent_llm_model}") Settings.llm = llm # Set default LLM for LlamaIndex components used by tools # Load system prompt default_system_prompt = ("You are LongContextManagementAgent... [Default prompt content - replace with actual]" # Placeholder ) system_prompt = load_prompt_from_file("../prompts/long_context_management_agent_prompt.txt", default_system_prompt) if system_prompt == default_system_prompt: logger.warning("Using default/fallback system prompt for LongContextManagementAgent.") # Define available tools tools = [ load_context_tool, summarize_context_tool, extract_info_tool, filter_context_tool, query_context_tool ] # Define valid handoff targets valid_handoffs = [ "planner_agent", # To return results "text_analyzer_agent", # If further analysis of extracted/filtered text is needed "reasoning_agent", "research_agent" ] agent = ReActAgent( name="long_context_management_agent", description=( "Manages and processes long textual context efficiently. Handles large documents, transcripts, or datasets " "by summarizing (`summarize_long_context`), extracting key information (`extract_key_information`), " "filtering relevant content (`filter_by_relevance`), and answering questions based on the context (`query_context_index`). " "Supports internal indexing for efficient retrieval and repeated queries. Optimized for chunked input processing " "and contextual distillation. Only relies on the provided input and avoids external augmentation unless explicitly requested." ), tools=tools, llm=llm, system_prompt=system_prompt, can_handoff_to=valid_handoffs, ) logger.info("LongContextManagementAgent initialized successfully.") return agent except Exception as e: logger.error(f"Error during LongContextManagementAgent initialization: {e}", exc_info=True) raise # Example usage (for testing if run directly) if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') # Set LlamaIndex log level higher to reduce noise during testing logging.getLogger("llama_index.core.indices.vector_store").setLevel(logging.WARNING) logging.getLogger("llama_index.core.query_engine").setLevel(logging.WARNING) logging.getLogger("llama_index.core.token_counter").setLevel(logging.ERROR) # Suppress token counting logs logger.info("Running long_context_management_agent.py directly for testing...") # Check required keys required_keys = ["GEMINI_API_KEY"] missing_keys = [key for key in required_keys if not os.getenv(key)] if missing_keys: print(f"Error: Required environment variable(s) not set: {', '.join(missing_keys)}. Cannot run test.") else: try: # Example long text long_text = """ Meeting Minutes - Project Phoenix - April 28, 2025 Attendees: Alice, Bob, Charlie, David Agenda: Review Q1 results, Plan Q2 roadmap, Budget allocation Q1 Results Discussion: Alice presented the sales figures. Sales increased by 15% compared to Q4 2024, exceeding the target of 10%. Bob highlighted the success of the marketing campaign launched in February. Customer acquisition cost decreased by 5%. Charlie noted a slight dip in user engagement metrics in March, possibly due to a recent UI change. Action Item: David to investigate the engagement dip. Q2 Roadmap Planning: The team discussed potential features for Q2. Feature A (enhanced reporting) was prioritized. Feature B (mobile app improvements) was deferred to Q3. Alice emphasized the need for stability improvements. Bob suggested focusing on performance optimization. Decision: Q2 focus will be on Feature A and performance/stability improvements. Budget Allocation: Charlie presented the proposed budget. An additional $50,000 was requested for cloud infrastructure scaling due to increased usage. David questioned the necessity of the full amount. After discussion, the team approved an additional $40,000 for infrastructure. Decision: Allocate $40,000 extra for Q2 infrastructure. Next Steps: David to report on engagement metrics by May 5th. Alice to finalize Q2 feature specifications by May 10th. Meeting adjourned. """ * 5 # Make it longer # Test loading context print("\nTesting load_text_context...") load_status = load_text_context(long_text, source="Meeting Minutes Test") print(load_status) if "Error" not in load_status: # Test summarization print("\nTesting summarize_long_context (brief)...") summary_brief = summarize_long_context(detail_level="brief") print(f"Brief Summary: {summary_brief}") # Test extraction print("\nTesting extract_key_information (decisions)...") decisions = extract_key_information(query="List all decisions made in the meeting") print(f"Decisions Extracted: {decisions}") # Test filtering print("\nTesting filter_by_relevance (budget)...") budget_text = filter_by_relevance(topic="budget allocation", threshold=0.7) print(f"Filtered Budget Text (first 300 chars):\n{budget_text[:300]}...") # Test querying print("\nTesting query_context_index (Q1 sales)...") sales_query = "What was the sales increase in Q1?" sales_answer = query_context_index(sales_query) print(f"Answer to '{sales_query}': {sales_answer}") print("\nTesting query_context_index (non-existent info)...") non_existent_query = "Who is the CEO?" non_existent_answer = query_context_index(non_existent_query) print(f"Answer to '{non_existent_query}': {non_existent_answer}") # Initialize the agent (optional) # test_agent = initialize_long_context_management_agent() # print("\nLong Context Management Agent initialized successfully for testing.") except Exception as e: print(f"Error during testing: {e}")