Spaces:
Sleeping
Sleeping
from typing import List, Literal, Optional, TypedDict | |
from agents import * | |
from itf_agent import IAgent | |
class State(TypedDict): | |
"""State class for the agent graph.""" | |
initial_query: str | |
messages: List[str] # Manager's context | |
task_progress: List[str] # Solver's context | |
audit_interval: int | |
manager_queries: int | |
solver_queries: int | |
max_interactions: int | |
max_solving_effort: int | |
final_response: Optional[str] | |
class Agents: | |
manager = Manager() | |
auditor = Auditor() | |
summarizer = Summarizer() | |
solver = Solver() | |
researcher = Researcher() | |
reasoner = Reasoner() | |
hyper_reasoner = HyperReasoner() | |
viewer = Viewer() | |
guardian = OutputGuard() | |
final_answer = FinalAnswer() | |
def guard_output(cls, agent: IAgent, messages: List[str]) -> str: | |
response = agent.query(messages) | |
guarded_response = cls.guardian.query([response]) | |
return guarded_response | |
class _Helper: | |
""" | |
Collection of helper methods. | |
""" | |
def _is_divisible(first: int, second: int) -> bool: | |
""" | |
Determines if the first number is divisible by the second number. | |
Args: | |
first: The dividend (number to be divided) | |
second: The divisor (number to divide by) | |
Returns: | |
bool: True if first is divisible by second without remainder, False otherwise | |
""" | |
if second == 0: | |
return False # Division by zero is undefined | |
return first % second == 0 | |
def solver_successor(task_progress: List[str]) -> Literal["manager", "researcher", "reasoner", "viewer", "unspecified"]: | |
response = str(task_progress[-1]) | |
if "to: researcher" in response.lower(): | |
return "researcher" | |
elif "to: reasoner" in response.lower(): | |
return "reasoner" | |
elif "to: viewer" in response.lower(): | |
return "viewer" | |
elif "to: manager" in response.lower(): | |
return "manager" | |
else: | |
return "unspecified" | |
def manager_successor(state: State) -> Literal["solver", "auditor", "final_answer"]: | |
last_message = state["messages"][-1] | |
answer_ready = "FINAL ANSWER:" in last_message | |
max_interractions_reached = state["manager_queries"] >= state["max_interactions"] | |
if answer_ready or max_interractions_reached: | |
return "final_answer" | |
if _Helper._is_divisible(state["manager_queries"], state["audit_interval"]): | |
return "auditor" | |
return "solver" | |
class Nodes: | |
""" | |
Collection of node functions for the agent graph. | |
""" | |
def manager_node(self, state: State) -> State: | |
""" | |
Orchestrates the workflow by delegating tasks to specialized nodes and integrating their outputs | |
""" | |
state["manager_queries"] += 1 | |
successor = _Helper.manager_successor(state) | |
if successor == "solver": | |
response = Agents.guard_output(Agents.manager, state["messages"]) | |
state["messages"].append(response) | |
# Prepare task for Solver | |
state["task_progress"] = [response] | |
# else: [wait for auditor's feedback] or [is final answer] | |
return state | |
def final_answer_node(self, state: State) -> State: | |
""" | |
Formats and delivers the final response to the user | |
""" | |
instruction = "Formulate a definitive final answer in english. Be very concise and use no redundant words !" | |
state["messages"].append(instruction) | |
response = Agents.final_answer.query(state["messages"]) | |
# Post process the response | |
if "FINAL ANSWER:" in response: | |
response = response.split("FINAL ANSWER:", 1)[1] | |
if "</think>" in response: | |
response = response.split("</think>", 1)[1] | |
response = response.strip() | |
state["final_response"] = response | |
return state | |
def auditor_node(self, state: State) -> State: | |
""" | |
Reviews manager's outputs for accuracy, safety, and quality and provides feedback | |
""" | |
response = Agents.guard_output(Agents.auditor, state["messages"]) | |
state["messages"].append(response) | |
return state | |
def solver_node(self, state: State) -> State: | |
""" | |
Central problem-solving node that coordinates with specialized experts based on task requirements | |
""" | |
response = Agents.guard_output(Agents.solver, state["task_progress"]) | |
state["task_progress"].append(response) | |
successor = _Helper.solver_successor(state["task_progress"]) | |
if successor == "unspecified": | |
instruction = "Formulate an answer for the manager with your findings so far !" | |
state["task_progress"].append(instruction) | |
response = Agents.solver.query(state["task_progress"]) | |
state["messages"].append(response) | |
elif successor == "manager": | |
state["messages"].append(response) | |
return state | |
def researcher_node(self, state: State) -> State: | |
""" | |
Retrieves and synthesizes information from various sources to answer knowledge-based questions | |
""" | |
# We do not use the output guard here as it might halucinate results if there are none. | |
response = Agents.researcher.query(state["task_progress"]) | |
state["task_progress"].append(response) | |
return state | |
def reasoner_node(self, state: State) -> State: | |
""" | |
Performs logical reasoning, inference, and step-by-step problem-solving | |
""" | |
pragmatic_response = Agents.guard_output(Agents.reasoner, state["task_progress"]) | |
deep_thought_response = Agents.guard_output(Agents.hyper_reasoner, state["task_progress"]) | |
deep_thought_summary = Agents.guard_output(Agents.summarizer, [deep_thought_response]) | |
response = f"The reasoner offered 2 responses:\n\nFirst, a more pragmatic response:\n{pragmatic_response}\n\nSecond, a deeper, more mathematical response:\n{deep_thought_summary}\n" | |
state["task_progress"].append(response) | |
return state | |
def viewer_node(self, state: State) -> State: | |
""" | |
Processes, analyzes, and generates information related to images | |
""" | |
response = Agents.guard_output(Agents.viewer, state["task_progress"]) | |
state["task_progress"].append(response) | |
return state | |
class Edges: | |
""" | |
Collection of conditional edge functions for the agent graph. | |
""" | |
def manager_edge(self, state: State) -> Literal["solver", "auditor", "final_answer"]: | |
""" | |
Conditional edge for manager node. | |
Returns one of: "solver", "auditor", "final_answer" | |
""" | |
return _Helper.manager_successor(state) | |
def solver_edge(self, state: State) -> Literal["manager", "researcher", "reasoner", "viewer"]: | |
""" | |
Conditional edge for solver node. | |
Returns one of: "manager", "researcher", "reasoner", "viewer" | |
""" | |
receiver = _Helper.solver_successor(state["task_progress"]) | |
if receiver == "unspecified": | |
return "manager" | |
return receiver | |