from langgraph.graph import START, END, StateGraph from langgraph.graph.state import CompiledStateGraph from typing import Dict, Any, TypedDict, Literal, Optional import asyncio from management import Manager, Assistant # Maximum number of interactions between Assistant and Manager MAX_INTERACTIONS = 5 # Maximum depth of recursion for Manager MAX_DEPTH = 3 # For both Assistant and Manager: TEMPERATURE = 0.7 MAX_TOKENS = 100 class State(TypedDict): """State for the agent graph.""" initial_query: str current_message: str nr_interactions: int final_response: Optional[str] class GraphBuilder: def __init__(self): """ Initializes the GraphBuilder. """ self.assistant_agent = Assistant(TEMPERATURE, MAX_TOKENS) self.manager_agent = Manager(TEMPERATURE, MAX_TOKENS, MAX_DEPTH) self.final_answer_hint = "Final answer:" def clear_chat_history(self): self.assistant_agent.clear_context() self.manager_agent.clear_context() async def assistant_node(self, state: State) -> State: """ Assistant agent that evaluates the query and decides whether to give a final answer or continue the conversation with the Manager. Uses the existing Assistant implementation. """ response = await self.assistant_agent.query(state["current_message"]) # Check if this is a final answer if self.final_answer_hint in response: # Extract the text after final answer hint state["final_response"] = response.split(self.final_answer_hint, 1)[1].strip() state["current_message"] = response state["nr_interactions"] += 1 return state async def manager_node(self, state: State) -> State: """ Manager agent that handles the queries from the Assistant and provides responses. Uses the existing Manager implementation. """ response = await self.manager_agent.query(state["current_message"]) state["current_message"] = response return state async def final_answer_node(self, state: State) -> State: """ Final answer node that formats and returns the final response. If there's already a final answer in the state, it uses that. Otherwise, it asks the assistant to formulate a final answer. """ print("========== final_answer_node ==========") # If we already have a final answer, use it final_response = state.get("final_response") if final_response is not None: print(f"====================\nFinal response:\n{final_response}\n====================") return state # Otherwise, have the assistant formulate a final answer prompt = f"Based on the conversation so far, provide a final answer to the original query:\n\n{state['initial_query']}" state["current_message"] = prompt response = await self.assistant_agent.query(state["current_message"]) # Format the response if self.final_answer_hint not in response: print(f"WARNING: final_answer_hint '{self.final_answer_hint}' not in response !") response = f"{self.final_answer_hint}{response}" # Extract the text after final answer hint state["final_response"] = response.split(self.final_answer_hint, 1)[1].strip() final_response = state.get("final_response") print(f"====================\nFinal response:\n{final_response}\n====================") return state def should_continue(self, state: State) -> Literal["manager", "final_answer"]: """ Decides whether to continue to the Manager or to provide a final answer. Returns: "manager": If the Assistant has decided to continue the conversation "final_answer": If the Assistant has decided to provide a final answer """ message = state["current_message"] if state["nr_interactions"] >= MAX_INTERACTIONS or self.final_answer_hint in message: return "final_answer" else: return "manager" def build_agent_graph(self) -> CompiledStateGraph: """Build and return the agent graph.""" graph = StateGraph(State) # Add the nodes with sync wrappers graph.add_node("assistant", self.assistant_node) graph.add_node("manager", self.manager_node) graph.add_node("final_answer", self.final_answer_node) # Add the edges graph.add_edge(START, "assistant") graph.add_conditional_edges( "assistant", self.should_continue, { "manager": "manager", "final_answer": "final_answer" } ) graph.add_edge("manager", "assistant") graph.add_edge("final_answer", END) return graph.compile() class Alfred: def __init__(self): print("Agent initialized.") self.graph_builder = GraphBuilder() self.agent_graph = self.graph_builder.build_agent_graph() async def __call__(self, question: str) -> str: print(f"Agent received question (first 50 chars): {question[:50]}...") result = await self.process_query(question) response = result["final_response"] print(f"Agent processed the response: {response}") return response async def process_query(self, query: str) -> Dict[str, Any]: """ Process a query through the agent graph. Args: query: The initial query to process Returns: The final state of the graph execution """ initial_state: State = { "initial_query": query, "current_message": query, "nr_interactions": 0, "final_response": None } self.graph_builder.clear_chat_history() result = await self.agent_graph.ainvoke(initial_state) return result