Spaces:
Sleeping
Sleeping
| from typing import Annotated, Type | |
| from langgraph.graph import StateGraph | |
| from langchain_core.messages import HumanMessage, ToolMessage | |
| from langgraph.graph.message import add_messages | |
| from typing_extensions import TypedDict | |
| from base_service import BaseService | |
| from get_answer_gigachat import AnswerGigaChat | |
| import logging | |
| import os | |
| logger = logging.getLogger(__name__) | |
| class State(TypedDict): | |
| messages: Annotated[list, add_messages] | |
| class BaseGraph: | |
| def __init__(self, service: Type[BaseService]): | |
| self.service = service | |
| self.tools_dict = {tool.name: tool for tool in service.tools} | |
| self.llm_with_tools = AnswerGigaChat().bind_tools(service.tools) | |
| self.messages = service.get_initial_messages() | |
| self.graph = self._build_graph() | |
| logger.info(f"BaseGraph with service {service} was built") | |
| def rebuild_with_new_service(self, service: Type[BaseService]): | |
| self.service = service | |
| self.tools_dict = {tool.name: tool for tool in service.tools} | |
| self.llm_with_tools = AnswerGigaChat().bind_tools(service.tools) | |
| self.graph = self._build_graph() | |
| self.messages = service.get_messages_from_redirect(self.messages) | |
| logger.info(f"BaseGraph was rebuilt with service {service}") | |
| def _agent_node(self, state: State): | |
| try: | |
| logger.info("Starting agent_node") | |
| messages = state["messages"] | |
| response = self.llm_with_tools.invoke(messages) | |
| response.content = self._clean_response(response.content) | |
| return {"messages": [response]} | |
| except Exception as e: | |
| logger.error(f"Error in agent_node: {str(e)}", exc_info=True) | |
| raise | |
| def _tool_node(self, state: State): | |
| try: | |
| logger.info("Starting tool_node") | |
| last_message = state["messages"][-1] | |
| tool_calls = last_message.tool_calls | |
| results = [] | |
| for call in tool_calls: | |
| tool_name = call["name"] | |
| logger.info(f"Running tool {tool_name}") | |
| args = call["args"] | |
| tool = self.tools_dict.get(tool_name) | |
| if not tool: | |
| raise ValueError(f"Tool {tool_name} not found") | |
| tool_response = tool.invoke(args) | |
| if tool_name == "make_redirect": | |
| self.rebuild_with_new_service(tool_response) | |
| results.append(str(tool_response)) | |
| return {"messages": [ToolMessage(content=", ".join(results), tool_call_id=call["id"])]} | |
| except Exception as e: | |
| logger.error(f"Error in tool_node: {str(e)}", exc_info=True) | |
| raise | |
| def _should_continue(self, state: State): | |
| try: | |
| logger.info("Checking should continue") | |
| last_message = state["messages"][-1] | |
| return "tool" if "function_call" in last_message.additional_kwargs else "end" | |
| except Exception as e: | |
| logger.error(f"Error in should_continue: {str(e)}", exc_info=True) | |
| raise | |
| def _build_graph(self): | |
| try: | |
| logger.info("Building graph") | |
| graph_builder = StateGraph(State) | |
| graph_builder.add_node("agent", self._agent_node) | |
| graph_builder.add_node("tool", self._tool_node) | |
| graph_builder.add_conditional_edges( | |
| "agent", | |
| self._should_continue, | |
| {"tool": "tool", "end": "__end__"} | |
| ) | |
| graph_builder.add_edge("tool", "agent") | |
| graph_builder.set_entry_point("agent") | |
| return graph_builder.compile() | |
| except Exception as e: | |
| logger.error(f"Error building graph: {str(e)}", exc_info=True) | |
| raise | |
| def _clean_response(self, content: str) -> str: | |
| content = content.replace("</final_answer>", "<final_answer>").replace("</thinking>", "<thinking>") | |
| if "<final_answer>" in content: | |
| content = content.split("<final_answer>")[1] | |
| if "<thinking>" in content: | |
| content = content.split("<thinking>")[-1] | |
| return content | |
| def invoke(self, user_input): | |
| try: | |
| self.messages.append(HumanMessage(content=user_input)) | |
| result = self.graph.invoke({"messages": self.messages}) | |
| self.messages = result["messages"] | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error invoking graph: {str(e)}", exc_info=True) | |
| raise | |