Spaces:
Sleeping
Sleeping
File size: 6,748 Bytes
4fb4269 910ae58 4fb4269 910ae58 4fb4269 910ae58 98ada6c 910ae58 4fb4269 98ada6c 910ae58 98ada6c 910ae58 98ada6c 910ae58 98ada6c 910ae58 98ada6c 910ae58 98ada6c 910ae58 98ada6c 910ae58 98ada6c 910ae58 98ada6c 910ae58 98ada6c 910ae58 98ada6c 910ae58 98ada6c 910ae58 98ada6c 910ae58 98ada6c 910ae58 98ada6c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
from langchain_core.messages import AnyMessage, BaseMessage, AIMessage, HumanMessage
from langgraph.graph import START, END, StateGraph
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition
from typing import Annotated, Any, Dict, List, Literal, Optional, TypedDict
import logging
from pathlib import Path
from args import Args
from agents import *
from itf_agent import IAgent
class State(TypedDict):
"""State class for the agent graph."""
initial_query: str
# messages: List[Dict[str, Any]]
# messages: Annotated[list[BaseMessage], add_messages]
messages: List[BaseMessage] # Manager's context
task_progress: List[BaseMessage] # Solver's context
audit_interval: int
manager_queries: int
solver_queries: int
max_interactions: int
max_solving_effort: int
final_response: Optional[str]
class Agents:
manager = Manager()
auditor = Auditor()
summarizer = Summarizer()
solver = Solver()
researcher = Researcher()
reasoner = Reasoner()
guardian = OutputGuard()
viewer = Viewer()
@classmethod
def guard_output(cls, agent: IAgent, messages: List[BaseMessage]) -> BaseMessage:
response = agent.query(messages)
guarded_response = cls.guardian.query([response])
return guarded_response
class _Helper:
"""
Collection of helper methods.
"""
@staticmethod
def _is_divisible(first: int, second: int) -> bool:
"""
Determines if the first number is divisible by the second number.
Args:
first: The dividend (number to be divided)
second: The divisor (number to divide by)
Returns:
bool: True if first is divisible by second without remainder, False otherwise
"""
if second == 0:
return False # Division by zero is undefined
return first % second == 0
@staticmethod
def solver_handler(task_progress: List[BaseMessage]) -> Literal["manager", "researcher", "reasoner", "viewer", "unspecified"]:
response = str(task_progress[-1].content)
if "to: researcher" in response.lower():
return "researcher"
elif "to: reasoner" in response.lower():
return "reasoner"
elif "to: viewer" in response.lower():
return "viewer"
elif "to: manager" in response.lower():
return "manager"
else:
return "unspecified"
class Nodes:
"""
Collection of node functions for the agent graph.
"""
def manager_node(self, state: State) -> State:
"""
Orchestrates the workflow by delegating tasks to specialized nodes and integrating their outputs
"""
state["manager_queries"] += 1
if not _Helper._is_divisible(state["manager_queries"], state["audit_interval"]):
response = Agents.guard_output(Agents.manager, state["messages"])
state["messages"].append(response)
# else: wait for auditor's feedback !
return state
def final_answer_node(self, state: State) -> State:
"""
Formats and delivers the final response to the user
"""
instruction = BaseMessage("Formulate a definitive final answer in english. Be very concise and use no redundant words !")
state["messages"].append(instruction)
response = Agents.manager.query(state["messages"])
state["final_response"] = str(response.content)
return state
def auditor_node(self, state: State) -> State:
"""
Reviews manager's outputs for accuracy, safety, and quality and provides feedback
"""
response = Agents.guard_output(Agents.auditor, state["messages"])
state["messages"].append(response)
return state
def solver_node(self, state: State) -> State:
"""
Central problem-solving node that coordinates with specialized experts based on task requirements
"""
response = Agents.guard_output(Agents.solver, state["task_progress"])
state["task_progress"].append(response)
return state
def researcher_node(self, state: State) -> State:
"""
Retrieves and synthesizes information from various sources to answer knowledge-based questions
"""
response = Agents.guard_output(Agents.researcher, state["task_progress"])
state["task_progress"].append(response)
return state
def reasoner_node(self, state: State) -> State:
"""
Performs logical reasoning, inference, and step-by-step problem-solving
"""
response = Agents.guard_output(Agents.reasoner, state["task_progress"])
state["task_progress"].append(response)
return state
def viewer_node(self, state: State) -> State:
"""
Processes, analyzes, and generates information related to images
"""
response = Agents.guard_output(Agents.viewer, state["task_progress"])
state["task_progress"].append(response)
return state
class Edges:
"""
Collection of conditional edge functions for the agent graph.
"""
def manager_edge(self, state: State) -> Literal["solver", "auditor", "final_answer"]:
"""
Conditional edge for manager node.
Returns one of: "solver", "auditor", "final_answer"
"""
last_message = state["messages"][-1]
answer_ready = "FINAL ANSWER:" in str(last_message.content)
max_interractions_reached = state["manager_queries"] >= state["max_interactions"]
if answer_ready or max_interractions_reached:
return "final_answer"
if _Helper._is_divisible(state["manager_queries"], state["audit_interval"]):
return "auditor"
# Prepare task for Solver
state["task_progress"] = [last_message]
return "solver"
def solver_edge(self, state: State) -> Literal["manager", "researcher", "reasoner", "viewer"]:
"""
Conditional edge for solver node.
Returns one of: "manager", "researcher", "reasoner", "viewer"
"""
receiver = _Helper.solver_handler(state["task_progress"])
if receiver == "unspecified":
instruction = BaseMessage("Formulate an answer for the manager with your findings so far !")
state["task_progress"].append(instruction)
response = Agents.solver.query(state["task_progress"])
state["messages"].append(response)
return "manager"
if receiver == "manager":
response = state["task_progress"][-1]
state["messages"].append(response)
return receiver
|