Spaces:
Sleeping
Sleeping
File size: 4,589 Bytes
1094f14 96f6b21 1094f14 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
from typing import Annotated, Type
from langgraph.graph import StateGraph
from langchain_core.messages import HumanMessage, ToolMessage
from langgraph.graph.message import add_messages
from typing_extensions import TypedDict
from base_service import BaseService
from get_answer_gigachat import AnswerGigaChat
import logging
import os
logger = logging.getLogger(__name__)
class State(TypedDict):
messages: Annotated[list, add_messages]
class BaseGraph:
def __init__(self, service: Type[BaseService]):
self.service = service
self.tools_dict = {tool.name: tool for tool in service.tools}
self.llm_with_tools = AnswerGigaChat().bind_tools(service.tools)
self.messages = service.get_initial_messages()
self.graph = self._build_graph()
logger.info(f"BaseGraph with service {service} was built")
def rebuild_with_new_service(self, service: Type[BaseService]):
self.service = service
self.tools_dict = {tool.name: tool for tool in service.tools}
self.llm_with_tools = AnswerGigaChat().bind_tools(service.tools)
self.graph = self._build_graph()
self.messages = service.get_messages_from_redirect(self.messages)
logger.info(f"BaseGraph was rebuilt with service {service}")
def _agent_node(self, state: State):
try:
logger.info("Starting agent_node")
messages = state["messages"]
response = self.llm_with_tools.invoke(messages)
response.content = self._clean_response(response.content)
return {"messages": [response]}
except Exception as e:
logger.error(f"Error in agent_node: {str(e)}", exc_info=True)
raise
def _tool_node(self, state: State):
try:
logger.info("Starting tool_node")
last_message = state["messages"][-1]
tool_calls = last_message.tool_calls
results = []
for call in tool_calls:
tool_name = call["name"]
logger.info(f"Running tool {tool_name}")
args = call["args"]
tool = self.tools_dict.get(tool_name)
if not tool:
raise ValueError(f"Tool {tool_name} not found")
tool_response = tool.invoke(args)
if tool_name == "make_redirect":
self.rebuild_with_new_service(tool_response)
results.append(str(tool_response))
return {"messages": [ToolMessage(content=", ".join(results), tool_call_id=call["id"])]}
except Exception as e:
logger.error(f"Error in tool_node: {str(e)}", exc_info=True)
raise
def _should_continue(self, state: State):
try:
logger.info("Checking should continue")
last_message = state["messages"][-1]
return "tool" if "function_call" in last_message.additional_kwargs else "end"
except Exception as e:
logger.error(f"Error in should_continue: {str(e)}", exc_info=True)
raise
def _build_graph(self):
try:
logger.info("Building graph")
graph_builder = StateGraph(State)
graph_builder.add_node("agent", self._agent_node)
graph_builder.add_node("tool", self._tool_node)
graph_builder.add_conditional_edges(
"agent",
self._should_continue,
{"tool": "tool", "end": "__end__"}
)
graph_builder.add_edge("tool", "agent")
graph_builder.set_entry_point("agent")
return graph_builder.compile()
except Exception as e:
logger.error(f"Error building graph: {str(e)}", exc_info=True)
raise
def _clean_response(self, content: str) -> str:
content = content.replace("</final_answer>", "<final_answer>").replace("</thinking>", "<thinking>")
if "<final_answer>" in content:
content = content.split("<final_answer>")[1]
if "<thinking>" in content:
content = content.split("<thinking>")[-1]
return content
def invoke(self, user_input):
try:
self.messages.append(HumanMessage(content=user_input))
result = self.graph.invoke({"messages": self.messages})
self.messages = result["messages"]
return result
except Exception as e:
logger.error(f"Error invoking graph: {str(e)}", exc_info=True)
raise
|