File size: 5,891 Bytes
cc6bd3b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
from llama_index.core.agent.workflow import AgentWorkflow
from llama_index.core.tools import FunctionTool
from llama_index.core.workflow import Context

from typing import List
import os

from llm_factory import LLMFactory
from solver import Solver, Summarizer
from args import Args


class Assistant:
    def __init__(self, temperature, max_tokens):
        system_prompt_path = os.path.join(os.getcwd(), "system_prompts", "01_assistant.txt")
        self.system_prompt = ""
        with open(system_prompt_path, "r") as file:
            self.system_prompt = file.read().strip()
        llm = LLMFactory.create(Args.primary_llm_interface, self.system_prompt, temperature, max_tokens)
        self.agent = AgentWorkflow.setup_agent(llm=llm)
        self.ctx = Context(self.agent)

    async def query(self, question: str) -> str:
        """
        Process a user query and return a response using the agent.
        
        Args:
            question: The user's question or input text
            
        Returns:
            The agent's response as a string
        """
        response = await self.agent.run(question, ctx=self.ctx)
        response = str(response)
        return response

    def clear_context(self):
        """
        Clears the current context of the agent, resetting any conversation history.
        This is useful when starting a new conversation or when the context needs to be refreshed.
        """
        self.ctx = Context(self.agent)


class Manager:
    def __init__(self, temperature, max_tokens, max_depth):
        self.max_depth = max_depth
        self.current_depth = 0

        system_prompt_path = os.path.join(os.getcwd(), "system_prompts", "02_manager.txt")
        self.system_prompt = ""
        with open(system_prompt_path, "r") as file:
            self.system_prompt = file.read().strip()

        llm = LLMFactory.create(Args.primary_llm_interface, self.system_prompt, temperature, max_tokens)
        self.agent = AgentWorkflow.from_tools_or_functions(
            [
            FunctionTool.from_defaults(
                name="require_break_up",
                description="Break a complex task into simpler subtasks. Use when a task needs to be divided into manageable parts.",
                fn=self.require_break_up
            ),
            FunctionTool.from_defaults(
                name="require_solution",
                description="Request direct solutions for specific tasks. Use when a task is simple enough to be solved directly.",
                fn=self.require_solution
            )
            ],
            llm=llm
        )
        self.ctx = Context(self.agent)
        self.solver = Solver(temperature, max_tokens)
        self.summarizer = Summarizer(temperature, max_tokens)

    async def query(self, question: str, remember = True) -> str:
        """
        Process a question using the manager agent and return a response.
        
        Args:
            question: The question or task to process
            remember: Whether to maintain context between queries (default: True)
            
        Returns:
            The agent's response as a string
        """
        if remember:
            response = await self.agent.run(question, ctx=self.ctx)
        else:
            response = await self.agent.run(question)
        response = str(response)
        return response

    def clear_context(self):
        """
        Clears the current context of the agent, resetting any conversation history.
        This is useful when starting a new conversation or when the context needs to be refreshed.
        """
        self.ctx = Context(self.agent)

    async def require_break_up(self, tasks: List[str], try_solving = False) -> str:
        """
        Break down complex tasks into simpler subtasks recursively up to max_depth.
        
        Args:
            tasks: List of tasks to break down
            try_solving: Whether to attempt solving tasks at max depth (default: False)
            
        Returns:
            Summarized report of the task breakdown
        """
        print(f"-> require_break_up tool used (input: {tasks}) !")
        if not tasks:
            return "Error: No tasks provided to break up. Please provide at least one task."
        
        self.current_depth += 1

        observation = ""
        if self.current_depth < self.max_depth:
            for task in tasks:
                solution = await self.query(task, remember=False)
                response = f"For task:\n\n{task}\n\nThe following break up has been provided:\n\n{solution}\n\n"
                observation += response
        elif try_solving:
            for task in tasks:
                response = await self.solver.query(task)
        else:
            observation = "Maximum depth for `break_up` tool has been reached ! At this point, you may try to break up the task yourself or try `require_solution`."
        
        self.current_depth -= 1
        report = await self.summarizer.query(observation.strip())
        return report

    async def require_solution(self, tasks: List[str]) -> str:
        """
        Request direct solutions for the provided tasks using the Solver.
        
        Args:
            tasks: List of tasks to solve
            
        Returns:
            Summarized report of solutions for all tasks
        """
        print(f"-> require_solution tool used (input: {tasks}) !")
        if not tasks:
            return "Error: No tasks provided to solve. Please provide at least one task."
        
        observation = ""
        for task in tasks:
            solution = await self.solver.query(task)
            response = f"For task:\n\n{task}\n\nThe Solver provided the solution:\n\n{solution}\n\n"
            observation += response

        report = await self.summarizer.query(observation.strip())
        return report