Spaces:
Sleeping
Sleeping
from llama_index.core.agent.workflow import AgentWorkflow | |
from llama_index.core.tools import FunctionTool | |
from llama_index.core.workflow import Context | |
from typing import List | |
import os | |
from llm_factory import LLMFactory | |
from solver import Solver, Summarizer | |
from args import Args | |
class Assistant: | |
def __init__(self, temperature, max_tokens): | |
system_prompt_path = os.path.join(os.getcwd(), "system_prompts", "01_assistant.txt") | |
self.system_prompt = "" | |
with open(system_prompt_path, "r") as file: | |
self.system_prompt = file.read().strip() | |
llm = LLMFactory.create(Args.primary_llm_interface, self.system_prompt, temperature, max_tokens) | |
self.agent = AgentWorkflow.setup_agent(llm=llm) | |
self.ctx = Context(self.agent) | |
async def query(self, question: str) -> str: | |
""" | |
Process a user query and return a response using the agent. | |
Args: | |
question: The user's question or input text | |
Returns: | |
The agent's response as a string | |
""" | |
response = await self.agent.run(question, ctx=self.ctx) | |
response = str(response) | |
return response | |
def clear_context(self): | |
""" | |
Clears the current context of the agent, resetting any conversation history. | |
This is useful when starting a new conversation or when the context needs to be refreshed. | |
""" | |
self.ctx = Context(self.agent) | |
class Manager: | |
def __init__(self, temperature, max_tokens, max_depth): | |
self.max_depth = max_depth | |
self.current_depth = 0 | |
system_prompt_path = os.path.join(os.getcwd(), "system_prompts", "02_manager.txt") | |
self.system_prompt = "" | |
with open(system_prompt_path, "r") as file: | |
self.system_prompt = file.read().strip() | |
llm = LLMFactory.create(Args.primary_llm_interface, self.system_prompt, temperature, max_tokens) | |
self.agent = AgentWorkflow.from_tools_or_functions( | |
[ | |
FunctionTool.from_defaults( | |
name="require_break_up", | |
description="Break a complex task into simpler subtasks. Use when a task needs to be divided into manageable parts.", | |
fn=self.require_break_up | |
), | |
FunctionTool.from_defaults( | |
name="require_solution", | |
description="Request direct solutions for specific tasks. Use when a task is simple enough to be solved directly.", | |
fn=self.require_solution | |
) | |
], | |
llm=llm | |
) | |
self.ctx = Context(self.agent) | |
self.solver = Solver(temperature, max_tokens) | |
self.summarizer = Summarizer(temperature, max_tokens) | |
async def query(self, question: str, remember = True) -> str: | |
""" | |
Process a question using the manager agent and return a response. | |
Args: | |
question: The question or task to process | |
remember: Whether to maintain context between queries (default: True) | |
Returns: | |
The agent's response as a string | |
""" | |
if remember: | |
response = await self.agent.run(question, ctx=self.ctx) | |
else: | |
response = await self.agent.run(question) | |
response = str(response) | |
return response | |
def clear_context(self): | |
""" | |
Clears the current context of the agent, resetting any conversation history. | |
This is useful when starting a new conversation or when the context needs to be refreshed. | |
""" | |
self.ctx = Context(self.agent) | |
async def require_break_up(self, tasks: List[str], try_solving = False) -> str: | |
""" | |
Break down complex tasks into simpler subtasks recursively up to max_depth. | |
Args: | |
tasks: List of tasks to break down | |
try_solving: Whether to attempt solving tasks at max depth (default: False) | |
Returns: | |
Summarized report of the task breakdown | |
""" | |
print(f"-> require_break_up tool used (input: {tasks}) !") | |
if not tasks: | |
return "Error: No tasks provided to break up. Please provide at least one task." | |
self.current_depth += 1 | |
observation = "" | |
if self.current_depth < self.max_depth: | |
for task in tasks: | |
solution = await self.query(task, remember=False) | |
response = f"For task:\n\n{task}\n\nThe following break up has been provided:\n\n{solution}\n\n" | |
observation += response | |
elif try_solving: | |
for task in tasks: | |
response = await self.solver.query(task) | |
else: | |
observation = "Maximum depth for `break_up` tool has been reached ! At this point, you may try to break up the task yourself or try `require_solution`." | |
self.current_depth -= 1 | |
report = await self.summarizer.query(observation.strip()) | |
return report | |
async def require_solution(self, tasks: List[str]) -> str: | |
""" | |
Request direct solutions for the provided tasks using the Solver. | |
Args: | |
tasks: List of tasks to solve | |
Returns: | |
Summarized report of solutions for all tasks | |
""" | |
print(f"-> require_solution tool used (input: {tasks}) !") | |
if not tasks: | |
return "Error: No tasks provided to solve. Please provide at least one task." | |
observation = "" | |
for task in tasks: | |
solution = await self.solver.query(task) | |
response = f"For task:\n\n{task}\n\nThe Solver provided the solution:\n\n{solution}\n\n" | |
observation += response | |
report = await self.summarizer.query(observation.strip()) | |
return report | |