from langgraph.graph import END, StateGraph from langgraph.graph.state import CompiledStateGraph from typing import Dict, Any, TypedDict, Literal, Optional import asyncio from management import Manager, Assistant # Maximum number of interactions between Assistant and Manager MAX_INTERACTIONS = 5 # Maximum depth of recursion for Manager MAX_DEPTH = 3 # For both Assistant and Manager: TEMPERATURE = 0.7 MAX_TOKENS = 100 class State(TypedDict): """State for the agent graph.""" initial_query: str current_message: str nr_interactions: int final_answer: Optional[str] class GraphBuilder: def __init__(self): """ Initializes the GraphBuilder. """ self.assistant_agent = Assistant(TEMPERATURE, MAX_TOKENS) self.manager_agent = Manager(TEMPERATURE, MAX_TOKENS, MAX_DEPTH) self.final_answer_hint = "Final answer:" def clear_chat_history(self): self.assistant_agent.clear_context() self.manager_agent.clear_context() async def assistant_node(self, state: State) -> State: """ Assistant agent that evaluates the query and decides whether to give a final answer or continue the conversation with the Manager. Uses the existing Assistant implementation. """ response = await self.assistant_agent.query(state["current_message"]) # Check if this is a final answer if self.final_answer_hint in response: # Extract the text after final answer hint state["final_answer"] = response.split(self.final_answer_hint, 1)[1].strip() state["current_message"] = response state["nr_interactions"] += 1 return state async def manager_node(self, state: State) -> State: """ Manager agent that handles the queries from the Assistant and provides responses. Uses the existing Manager implementation. """ response = await self.manager_agent.query(state["current_message"]) state["current_message"] = response return state async def final_answer_node(self, state: State) -> State: """ Final answer node that formats and returns the final response. If there's already a final answer in the state, it uses that. Otherwise, it asks the assistant to formulate a final answer. """ # If we already have a final answer, use it if state.get("final_answer") is not None: return state # Otherwise, have the assistant formulate a final answer prompt = f"Based on the conversation so far, provide a final answer to the original query:\n\n{state['initial_query']}" state["current_message"] = prompt response = await self.assistant_agent.query(state["current_message"]) # Format the response if self.final_answer_hint not in response: response = f"{self.final_answer_hint}{response}" # Extract the text after final answer hint state["final_answer"] = response.split(self.final_answer_hint, 1)[1].strip() return state def should_continue(self, state: State) -> Literal["manager", "final_answer"]: """ Decides whether to continue to the Manager or to provide a final answer. Returns: "manager": If the Assistant has decided to continue the conversation "final_answer": If the Assistant has decided to provide a final answer """ message = state["current_message"] if state["nr_interactions"] >= MAX_INTERACTIONS or self.final_answer_hint in message: return "final_answer" else: return "manager" def build_agent_graph(self) -> CompiledStateGraph: """Build and return the agent graph.""" graph = StateGraph(State) # Convert async functions to sync functions using asyncio.run def sync_assistant_node(state: State) -> State: return asyncio.run(self.assistant_node(state)) def sync_manager_node(state: State) -> State: return asyncio.run(self.manager_node(state)) # Add the nodes with sync wrappers graph.add_node("assistant", sync_assistant_node) graph.add_node("manager", sync_manager_node) graph.add_node("final_answer", self.final_answer_node) # Add the edges graph.add_edge("START", "assistant") graph.add_conditional_edges( "assistant", self.should_continue, { "manager": "manager", "final_answer": "final_answer" } ) graph.add_edge("manager", "assistant") graph.add_edge("final_answer", END) return graph.compile() class Alfred: def __init__(self): print("Agent initialized.") self.graph_builder = GraphBuilder() self.agent_graph = self.graph_builder.build_agent_graph() async def __call__(self, question: str) -> str: print(f"Agent received question (first 50 chars): {question[:50]}...") result = await self.process_query(question) response = result["final_answer"] print(f"Agent processed the response: {response}") return response async def process_query(self, query: str) -> Dict[str, Any]: """ Process a query through the agent graph. Args: query: The initial query to process Returns: The final state of the graph execution """ initial_state: State = { "initial_query": query, "current_message": query, "nr_interactions": 0, "final_answer": None } self.graph_builder.clear_chat_history() # Since agent_graph.invoke is synchronous, we don't need to await it # But we might need to run it in an executor if it's computationally intensive loop = asyncio.get_running_loop() result = await loop.run_in_executor(None, self.agent_graph.invoke, initial_state) return result