from langchain_core.messages import AnyMessage, BaseMessage, AIMessage, HumanMessage from langgraph.graph import START, END, StateGraph from langgraph.graph.message import add_messages from langgraph.prebuilt import ToolNode, tools_condition from typing import Annotated, Any, Dict, List, Literal, Optional, TypedDict import logging from pathlib import Path from args import Args from agents import * from itf_agent import IAgent class State(TypedDict): """State class for the agent graph.""" initial_query: str # messages: List[Dict[str, Any]] # messages: Annotated[list[BaseMessage], add_messages] messages: List[BaseMessage] # Manager's context task_progress: List[BaseMessage] # Solver's context audit_interval: int manager_queries: int solver_queries: int max_interactions: int max_solving_effort: int final_response: Optional[str] class Agents: manager = Manager() auditor = Auditor() summarizer = Summarizer() solver = Solver() researcher = Researcher() reasoner = Reasoner() guardian = OutputGuard() viewer = Viewer() @classmethod def guard_output(cls, agent: IAgent, messages: List[BaseMessage]) -> BaseMessage: response = agent.query(messages) guarded_response = cls.guardian.query([response]) return guarded_response class _Helper: """ Collection of helper methods. """ @staticmethod def _is_divisible(first: int, second: int) -> bool: """ Determines if the first number is divisible by the second number. Args: first: The dividend (number to be divided) second: The divisor (number to divide by) Returns: bool: True if first is divisible by second without remainder, False otherwise """ if second == 0: return False # Division by zero is undefined return first % second == 0 @staticmethod def solver_handler(task_progress: List[BaseMessage]) -> Literal["manager", "researcher", "reasoner", "viewer", "unspecified"]: response = str(task_progress[-1].content) if "to: researcher" in response.lower(): return "researcher" elif "to: reasoner" in response.lower(): return "reasoner" elif "to: viewer" in response.lower(): return "viewer" elif "to: manager" in response.lower(): return "manager" else: return "unspecified" class Nodes: """ Collection of node functions for the agent graph. """ def manager_node(self, state: State) -> State: """ Orchestrates the workflow by delegating tasks to specialized nodes and integrating their outputs """ state["manager_queries"] += 1 if not _Helper._is_divisible(state["manager_queries"], state["audit_interval"]): response = Agents.guard_output(Agents.manager, state["messages"]) state["messages"].append(response) # else: wait for auditor's feedback ! return state def final_answer_node(self, state: State) -> State: """ Formats and delivers the final response to the user """ instruction = BaseMessage("Formulate a definitive final answer in english. Be very concise and use no redundant words !") state["messages"].append(instruction) response = Agents.manager.query(state["messages"]) state["final_response"] = str(response.content) return state def auditor_node(self, state: State) -> State: """ Reviews manager's outputs for accuracy, safety, and quality and provides feedback """ response = Agents.guard_output(Agents.auditor, state["messages"]) state["messages"].append(response) return state def solver_node(self, state: State) -> State: """ Central problem-solving node that coordinates with specialized experts based on task requirements """ response = Agents.guard_output(Agents.solver, state["task_progress"]) state["task_progress"].append(response) return state def researcher_node(self, state: State) -> State: """ Retrieves and synthesizes information from various sources to answer knowledge-based questions """ response = Agents.guard_output(Agents.researcher, state["task_progress"]) state["task_progress"].append(response) return state def reasoner_node(self, state: State) -> State: """ Performs logical reasoning, inference, and step-by-step problem-solving """ response = Agents.guard_output(Agents.reasoner, state["task_progress"]) state["task_progress"].append(response) return state def viewer_node(self, state: State) -> State: """ Processes, analyzes, and generates information related to images """ response = Agents.guard_output(Agents.viewer, state["task_progress"]) state["task_progress"].append(response) return state class Edges: """ Collection of conditional edge functions for the agent graph. """ def manager_edge(self, state: State) -> Literal["solver", "auditor", "final_answer"]: """ Conditional edge for manager node. Returns one of: "solver", "auditor", "final_answer" """ last_message = state["messages"][-1] answer_ready = "FINAL ANSWER:" in str(last_message.content) max_interractions_reached = state["manager_queries"] >= state["max_interactions"] if answer_ready or max_interractions_reached: return "final_answer" if _Helper._is_divisible(state["manager_queries"], state["audit_interval"]): return "auditor" # Prepare task for Solver state["task_progress"] = [last_message] return "solver" def solver_edge(self, state: State) -> Literal["manager", "researcher", "reasoner", "viewer"]: """ Conditional edge for solver node. Returns one of: "manager", "researcher", "reasoner", "viewer" """ receiver = _Helper.solver_handler(state["task_progress"]) if receiver == "unspecified": instruction = BaseMessage("Formulate an answer for the manager with your findings so far !") state["task_progress"].append(instruction) response = Agents.solver.query(state["task_progress"]) state["messages"].append(response) return "manager" if receiver == "manager": response = state["task_progress"][-1] state["messages"].append(response) return receiver