Final_Assignment_Template / management.py
24Arys11's picture
built management agents with llama_index, recursive self call for task breakdown, fixed LLMFactory to create LLMs from different interfaces
cc6bd3b
raw
history blame
5.89 kB
from llama_index.core.agent.workflow import AgentWorkflow
from llama_index.core.tools import FunctionTool
from llama_index.core.workflow import Context
from typing import List
import os
from llm_factory import LLMFactory
from solver import Solver, Summarizer
from args import Args
class Assistant:
def __init__(self, temperature, max_tokens):
system_prompt_path = os.path.join(os.getcwd(), "system_prompts", "01_assistant.txt")
self.system_prompt = ""
with open(system_prompt_path, "r") as file:
self.system_prompt = file.read().strip()
llm = LLMFactory.create(Args.primary_llm_interface, self.system_prompt, temperature, max_tokens)
self.agent = AgentWorkflow.setup_agent(llm=llm)
self.ctx = Context(self.agent)
async def query(self, question: str) -> str:
"""
Process a user query and return a response using the agent.
Args:
question: The user's question or input text
Returns:
The agent's response as a string
"""
response = await self.agent.run(question, ctx=self.ctx)
response = str(response)
return response
def clear_context(self):
"""
Clears the current context of the agent, resetting any conversation history.
This is useful when starting a new conversation or when the context needs to be refreshed.
"""
self.ctx = Context(self.agent)
class Manager:
def __init__(self, temperature, max_tokens, max_depth):
self.max_depth = max_depth
self.current_depth = 0
system_prompt_path = os.path.join(os.getcwd(), "system_prompts", "02_manager.txt")
self.system_prompt = ""
with open(system_prompt_path, "r") as file:
self.system_prompt = file.read().strip()
llm = LLMFactory.create(Args.primary_llm_interface, self.system_prompt, temperature, max_tokens)
self.agent = AgentWorkflow.from_tools_or_functions(
[
FunctionTool.from_defaults(
name="require_break_up",
description="Break a complex task into simpler subtasks. Use when a task needs to be divided into manageable parts.",
fn=self.require_break_up
),
FunctionTool.from_defaults(
name="require_solution",
description="Request direct solutions for specific tasks. Use when a task is simple enough to be solved directly.",
fn=self.require_solution
)
],
llm=llm
)
self.ctx = Context(self.agent)
self.solver = Solver(temperature, max_tokens)
self.summarizer = Summarizer(temperature, max_tokens)
async def query(self, question: str, remember = True) -> str:
"""
Process a question using the manager agent and return a response.
Args:
question: The question or task to process
remember: Whether to maintain context between queries (default: True)
Returns:
The agent's response as a string
"""
if remember:
response = await self.agent.run(question, ctx=self.ctx)
else:
response = await self.agent.run(question)
response = str(response)
return response
def clear_context(self):
"""
Clears the current context of the agent, resetting any conversation history.
This is useful when starting a new conversation or when the context needs to be refreshed.
"""
self.ctx = Context(self.agent)
async def require_break_up(self, tasks: List[str], try_solving = False) -> str:
"""
Break down complex tasks into simpler subtasks recursively up to max_depth.
Args:
tasks: List of tasks to break down
try_solving: Whether to attempt solving tasks at max depth (default: False)
Returns:
Summarized report of the task breakdown
"""
print(f"-> require_break_up tool used (input: {tasks}) !")
if not tasks:
return "Error: No tasks provided to break up. Please provide at least one task."
self.current_depth += 1
observation = ""
if self.current_depth < self.max_depth:
for task in tasks:
solution = await self.query(task, remember=False)
response = f"For task:\n\n{task}\n\nThe following break up has been provided:\n\n{solution}\n\n"
observation += response
elif try_solving:
for task in tasks:
response = await self.solver.query(task)
else:
observation = "Maximum depth for `break_up` tool has been reached ! At this point, you may try to break up the task yourself or try `require_solution`."
self.current_depth -= 1
report = await self.summarizer.query(observation.strip())
return report
async def require_solution(self, tasks: List[str]) -> str:
"""
Request direct solutions for the provided tasks using the Solver.
Args:
tasks: List of tasks to solve
Returns:
Summarized report of solutions for all tasks
"""
print(f"-> require_solution tool used (input: {tasks}) !")
if not tasks:
return "Error: No tasks provided to solve. Please provide at least one task."
observation = ""
for task in tasks:
solution = await self.solver.query(task)
response = f"For task:\n\n{task}\n\nThe Solver provided the solution:\n\n{solution}\n\n"
observation += response
report = await self.summarizer.query(observation.strip())
return report