from llama_index.core.agent.workflow import AgentWorkflow from llama_index.core.tools import FunctionTool from llama_index.core.workflow import Context from typing import List import os from llm_factory import LLMFactory from solver import Solver, Summarizer from args import Args class Assistant: def __init__(self, temperature, max_tokens): system_prompt_path = os.path.join(os.getcwd(), "system_prompts", "01_assistant.txt") self.system_prompt = "" with open(system_prompt_path, "r") as file: self.system_prompt = file.read().strip() llm = LLMFactory.create(Args.primary_llm_interface, self.system_prompt, temperature, max_tokens) self.agent = AgentWorkflow.setup_agent(llm=llm) self.ctx = Context(self.agent) async def query(self, question: str) -> str: """ Process a user query and return a response using the agent. Args: question: The user's question or input text Returns: The agent's response as a string """ response = await self.agent.run(question, ctx=self.ctx) response = str(response) return response def clear_context(self): """ Clears the current context of the agent, resetting any conversation history. This is useful when starting a new conversation or when the context needs to be refreshed. """ self.ctx = Context(self.agent) class Manager: def __init__(self, temperature, max_tokens, max_depth): self.max_depth = max_depth self.current_depth = 0 system_prompt_path = os.path.join(os.getcwd(), "system_prompts", "02_manager.txt") self.system_prompt = "" with open(system_prompt_path, "r") as file: self.system_prompt = file.read().strip() llm = LLMFactory.create(Args.primary_llm_interface, self.system_prompt, temperature, max_tokens) self.agent = AgentWorkflow.from_tools_or_functions( [ FunctionTool.from_defaults( name="require_break_up", description="Break a complex task into simpler subtasks. Use when a task needs to be divided into manageable parts.", fn=self.require_break_up ), FunctionTool.from_defaults( name="require_solution", description="Request direct solutions for specific tasks. Use when a task is simple enough to be solved directly.", fn=self.require_solution ) ], llm=llm ) self.ctx = Context(self.agent) self.solver = Solver(temperature, max_tokens) self.summarizer = Summarizer(temperature, max_tokens) async def query(self, question: str, remember = True) -> str: """ Process a question using the manager agent and return a response. Args: question: The question or task to process remember: Whether to maintain context between queries (default: True) Returns: The agent's response as a string """ if remember: response = await self.agent.run(question, ctx=self.ctx) else: response = await self.agent.run(question) response = str(response) return response def clear_context(self): """ Clears the current context of the agent, resetting any conversation history. This is useful when starting a new conversation or when the context needs to be refreshed. """ self.ctx = Context(self.agent) async def require_break_up(self, tasks: List[str], try_solving = False) -> str: """ Break down complex tasks into simpler subtasks recursively up to max_depth. Args: tasks: List of tasks to break down try_solving: Whether to attempt solving tasks at max depth (default: False) Returns: Summarized report of the task breakdown """ print(f"-> require_break_up tool used (input: {tasks}) !") if not tasks: return "Error: No tasks provided to break up. Please provide at least one task." self.current_depth += 1 observation = "" if self.current_depth < self.max_depth: for task in tasks: solution = await self.query(task, remember=False) response = f"For task:\n\n{task}\n\nThe following break up has been provided:\n\n{solution}\n\n" observation += response elif try_solving: for task in tasks: response = await self.solver.query(task) else: observation = "Maximum depth for `break_up` tool has been reached ! At this point, you may try to break up the task yourself or try `require_solution`." self.current_depth -= 1 report = await self.summarizer.query(observation.strip()) return report async def require_solution(self, tasks: List[str]) -> str: """ Request direct solutions for the provided tasks using the Solver. Args: tasks: List of tasks to solve Returns: Summarized report of solutions for all tasks """ print(f"-> require_solution tool used (input: {tasks}) !") if not tasks: return "Error: No tasks provided to solve. Please provide at least one task." observation = "" for task in tasks: solution = await self.solver.query(task) response = f"For task:\n\n{task}\n\nThe Solver provided the solution:\n\n{solution}\n\n" observation += response report = await self.summarizer.query(observation.strip()) return report