File size: 4,959 Bytes
1094f14
 
 
 
 
96f6b21
 
1094f14
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
from typing import Annotated, Type
from langgraph.graph import StateGraph
from langchain_core.messages import HumanMessage, ToolMessage
from langgraph.graph.message import add_messages
from typing_extensions import TypedDict
from base_service import BaseService
from get_answer_gigachat import AnswerGigaChat
import logging
import os

logger = logging.getLogger(__name__)

class State(TypedDict):
    messages: Annotated[list, add_messages]

class BaseGraph:
    def __init__(self, service: Type[BaseService]):
        self.service = service
        # self._initialize_logging()
        self.tools_dict = {tool.name: tool for tool in service.tools}
        self.llm_with_tools = AnswerGigaChat().bind_tools(service.tools)
        self.messages = service.get_initial_messages()
        self.graph = self._build_graph()
        logger.info(f"BaseGraph with service {service} was built")

    def rebuild_with_new_service(self, service: Type[BaseService]):
        self.service = service
        self.tools_dict = {tool.name: tool for tool in service.tools}
        self.llm_with_tools = AnswerGigaChat().bind_tools(service.tools)
        self.graph = self._build_graph()
        self.messages = service.get_messages_from_redirect(self.messages)
        logger.info(f"BaseGraph was rebuilt with service {service}")

    def _initialize_logging(self):
        if os.path.exists(self.service.log_path):
            os.remove(self.service.log_path)
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s', 
            handlers=[logging.FileHandler(self.service.log_path)]
        )

    def _agent_node(self, state: State):
        try:
            logger.info("Starting agent_node")
            messages = state["messages"]
            response = self.llm_with_tools.invoke(messages)
            response.content = self._clean_response(response.content)
            return {"messages": [response]}
        except Exception as e:
            logger.error(f"Error in agent_node: {str(e)}", exc_info=True)
            raise

    def _tool_node(self, state: State):
        try:
            logger.info("Starting tool_node")
            last_message = state["messages"][-1]
            tool_calls = last_message.tool_calls
            
            results = []
            for call in tool_calls:
                tool_name = call["name"]
                logger.info(f"Running tool {tool_name}")
                args = call["args"]
                
                tool = self.tools_dict.get(tool_name)
                if not tool:
                    raise ValueError(f"Tool {tool_name} not found")
                
                tool_response = tool.invoke(args)
                if tool_name == "make_redirect":
                    self.rebuild_with_new_service(tool_response)
                results.append(str(tool_response))
            
            return {"messages": [ToolMessage(content=", ".join(results), tool_call_id=call["id"])]}
        except Exception as e:
            logger.error(f"Error in tool_node: {str(e)}", exc_info=True)
            raise

    def _should_continue(self, state: State):
        try:
            logger.info("Checking should continue")
            last_message = state["messages"][-1]
            return "tool" if "function_call" in last_message.additional_kwargs else "end"
        except Exception as e:
            logger.error(f"Error in should_continue: {str(e)}", exc_info=True)
            raise

    def _build_graph(self):
        try:
            logger.info("Building graph")
            graph_builder = StateGraph(State)

            graph_builder.add_node("agent", self._agent_node)
            graph_builder.add_node("tool", self._tool_node)

            graph_builder.add_conditional_edges(
                "agent",
                self._should_continue,
                {"tool": "tool", "end": "__end__"}
            )
            graph_builder.add_edge("tool", "agent")
            graph_builder.set_entry_point("agent")

            return graph_builder.compile()
        except Exception as e:
            logger.error(f"Error building graph: {str(e)}", exc_info=True)
            raise

    def _clean_response(self, content: str) -> str:
        content = content.replace("</final_answer>", "<final_answer>").replace("</thinking>", "<thinking>")
        if "<final_answer>" in content:
            content = content.split("<final_answer>")[1]
        if "<thinking>" in content:
            content = content.split("<thinking>")[-1]
        return content

    def invoke(self, user_input):
        try:
            self.messages.append(HumanMessage(content=user_input))
            result = self.graph.invoke({"messages": self.messages})
            self.messages = result["messages"]
            return result
        except Exception as e:
            logger.error(f"Error invoking graph: {str(e)}", exc_info=True)
            raise