Spaces:
Sleeping
Sleeping
from typing import Dict, Any | |
from args import Args | |
from graph import State | |
from graph_builder import GraphBuilder | |
# Maximum number of interactions between Assistant and Manager | |
MAX_INTERACTIONS = 5 | |
# Maximum depth of recursion for Manager | |
MAX_DEPTH = 2 | |
# For both Assistant and Manager: | |
TEMPERATURE = 0.7 | |
MAX_TOKENS = 2000 | |
class Alfred: | |
def __init__(self): | |
print("Agent initialized.") | |
self.graph_builder = GraphBuilder() | |
self.agent_graph = self.graph_builder.build_agent_graph() | |
async def __call__(self, question: str) -> str: | |
print(f"Agent received question (first 50 chars): {question[:50]}...") | |
result = await self.process_query(question) | |
response = result["final_response"] | |
print(f"Agent processed the response: {response}") | |
return response | |
async def process_query(self, query: str) -> Dict[str, Any]: | |
""" | |
Process a query through the agent graph. | |
Args: | |
query: The initial query to process | |
Returns: | |
The final state of the graph execution | |
""" | |
initial_state: State = { | |
"initial_query": query, | |
"messages": [], | |
"nr_interactions": 0, | |
"final_response": None | |
} | |
result = await self.agent_graph.ainvoke(initial_state) | |
return result | |