Spaces:
Sleeping
Sleeping
File size: 5,891 Bytes
cc6bd3b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
from llama_index.core.agent.workflow import AgentWorkflow
from llama_index.core.tools import FunctionTool
from llama_index.core.workflow import Context
from typing import List
import os
from llm_factory import LLMFactory
from solver import Solver, Summarizer
from args import Args
class Assistant:
def __init__(self, temperature, max_tokens):
system_prompt_path = os.path.join(os.getcwd(), "system_prompts", "01_assistant.txt")
self.system_prompt = ""
with open(system_prompt_path, "r") as file:
self.system_prompt = file.read().strip()
llm = LLMFactory.create(Args.primary_llm_interface, self.system_prompt, temperature, max_tokens)
self.agent = AgentWorkflow.setup_agent(llm=llm)
self.ctx = Context(self.agent)
async def query(self, question: str) -> str:
"""
Process a user query and return a response using the agent.
Args:
question: The user's question or input text
Returns:
The agent's response as a string
"""
response = await self.agent.run(question, ctx=self.ctx)
response = str(response)
return response
def clear_context(self):
"""
Clears the current context of the agent, resetting any conversation history.
This is useful when starting a new conversation or when the context needs to be refreshed.
"""
self.ctx = Context(self.agent)
class Manager:
def __init__(self, temperature, max_tokens, max_depth):
self.max_depth = max_depth
self.current_depth = 0
system_prompt_path = os.path.join(os.getcwd(), "system_prompts", "02_manager.txt")
self.system_prompt = ""
with open(system_prompt_path, "r") as file:
self.system_prompt = file.read().strip()
llm = LLMFactory.create(Args.primary_llm_interface, self.system_prompt, temperature, max_tokens)
self.agent = AgentWorkflow.from_tools_or_functions(
[
FunctionTool.from_defaults(
name="require_break_up",
description="Break a complex task into simpler subtasks. Use when a task needs to be divided into manageable parts.",
fn=self.require_break_up
),
FunctionTool.from_defaults(
name="require_solution",
description="Request direct solutions for specific tasks. Use when a task is simple enough to be solved directly.",
fn=self.require_solution
)
],
llm=llm
)
self.ctx = Context(self.agent)
self.solver = Solver(temperature, max_tokens)
self.summarizer = Summarizer(temperature, max_tokens)
async def query(self, question: str, remember = True) -> str:
"""
Process a question using the manager agent and return a response.
Args:
question: The question or task to process
remember: Whether to maintain context between queries (default: True)
Returns:
The agent's response as a string
"""
if remember:
response = await self.agent.run(question, ctx=self.ctx)
else:
response = await self.agent.run(question)
response = str(response)
return response
def clear_context(self):
"""
Clears the current context of the agent, resetting any conversation history.
This is useful when starting a new conversation or when the context needs to be refreshed.
"""
self.ctx = Context(self.agent)
async def require_break_up(self, tasks: List[str], try_solving = False) -> str:
"""
Break down complex tasks into simpler subtasks recursively up to max_depth.
Args:
tasks: List of tasks to break down
try_solving: Whether to attempt solving tasks at max depth (default: False)
Returns:
Summarized report of the task breakdown
"""
print(f"-> require_break_up tool used (input: {tasks}) !")
if not tasks:
return "Error: No tasks provided to break up. Please provide at least one task."
self.current_depth += 1
observation = ""
if self.current_depth < self.max_depth:
for task in tasks:
solution = await self.query(task, remember=False)
response = f"For task:\n\n{task}\n\nThe following break up has been provided:\n\n{solution}\n\n"
observation += response
elif try_solving:
for task in tasks:
response = await self.solver.query(task)
else:
observation = "Maximum depth for `break_up` tool has been reached ! At this point, you may try to break up the task yourself or try `require_solution`."
self.current_depth -= 1
report = await self.summarizer.query(observation.strip())
return report
async def require_solution(self, tasks: List[str]) -> str:
"""
Request direct solutions for the provided tasks using the Solver.
Args:
tasks: List of tasks to solve
Returns:
Summarized report of solutions for all tasks
"""
print(f"-> require_solution tool used (input: {tasks}) !")
if not tasks:
return "Error: No tasks provided to solve. Please provide at least one task."
observation = ""
for task in tasks:
solution = await self.solver.query(task)
response = f"For task:\n\n{task}\n\nThe Solver provided the solution:\n\n{solution}\n\n"
observation += response
report = await self.summarizer.query(observation.strip())
return report
|