File size: 10,107 Bytes
fbf421d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
"""This module contains the Graph class, which represents the main orchestration graph for the Chattr application."""

from json import dumps
from logging import getLogger
from pathlib import Path
from typing import AsyncGenerator, Self

from gradio import ChatMessage
from gradio.components.chatbot import MetadataDict
from langchain_core.messages import HumanMessage
from langchain_core.runnables import Runnable, RunnableConfig
from langchain_core.tools import BaseTool
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_mcp_adapters.sessions import (
    SSEConnection,
    StdioConnection,
    StreamableHttpConnection,
    WebsocketConnection,
)
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.redis.aio import AsyncRedisSaver
from langgraph.graph import START, StateGraph
from langgraph.graph.message import MessagesState
from langgraph.graph.state import CompiledStateGraph
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.store.redis.aio import AsyncRedisStore

from chattr.settings import Settings
from chattr.utils import convert_audio_to_wav, download_file, is_url

logger = getLogger(__name__)


class Graph:
    """
    Represents the main orchestration graph for the Chattr application.
    This class manages the setup and execution of the conversational agent, tools, and state graph.
    """

    settings: Settings

    def __init__(
        self,
        store: AsyncRedisStore,
        saver: AsyncRedisSaver,
        tools: list[BaseTool],
    ):
        self._long_term_memory: AsyncRedisStore = store
        self._short_term_memory: AsyncRedisSaver = saver
        self._tools: list[BaseTool] = tools
        self._llm: ChatOpenAI = self._initialize_llm()
        self._model: Runnable = self._llm.bind_tools(self._tools)
        self._graph: CompiledStateGraph = self._build_state_graph()

    @classmethod
    async def create(cls, settings: Settings) -> Self:
        """Async factory method to create a Graph instance."""
        cls.settings: Settings = settings
        store, saver = await cls._setup_memory()
        tools: list[BaseTool] = await cls._setup_tools(
            MultiServerMCPClient(cls._create_mcp_config())
        )
        return cls(store, saver, tools)

    def _build_state_graph(self) -> CompiledStateGraph:
        """
        Construct and compile the state graph for the Chattr application.
        This method defines the nodes and edges for the conversational agent and tool interactions.

        Returns:
            CompiledStateGraph: The compiled state graph is ready for execution.
        """

        async def _call_model(state: MessagesState) -> MessagesState:
            response = await self._model.ainvoke(
                [self.settings.model.system_message] + state["messages"]
            )
            return MessagesState(messages=[response])

        graph_builder: StateGraph = StateGraph(MessagesState)
        graph_builder.add_node("agent", _call_model)
        graph_builder.add_node("tools", ToolNode(self._tools))
        graph_builder.add_edge(START, "agent")
        graph_builder.add_conditional_edges("agent", tools_condition)
        graph_builder.add_edge("tools", "agent")
        return graph_builder.compile(
            debug=True,
            checkpointer=self._short_term_memory,
            store=self._long_term_memory,
        )

    @classmethod
    def _create_mcp_config(
        cls,
    ) -> dict[
        str,
        StdioConnection
        | SSEConnection
        | StreamableHttpConnection
        | WebsocketConnection,
    ]:
        """
        Create the configuration dictionary for MCP (Multi-Component Protocol) servers.
        This method sets up the connection details for each MCP server used by the application.

        Returns:
            dict: A dictionary mapping server names to their connection configurations.
        """

        return {
            "vector_database": StdioConnection(
                command="uvx",
                args=["mcp-server-qdrant"],
                env={
                    "QDRANT_URL": str(cls.settings.vector_database.url),
                    "COLLECTION_NAME": cls.settings.vector_database.name,
                },
                transport="stdio",
            ),
            "time": StdioConnection(
                command="uvx",
                args=["mcp-server-time"],
                transport="stdio",
            ),
            cls.settings.voice_generator_mcp.name: SSEConnection(
                url=str(cls.settings.voice_generator_mcp.url),
                transport=cls.settings.voice_generator_mcp.transport,
            ),
        }

    def _initialize_llm(self) -> ChatOpenAI:
        """
        Initialize the ChatOpenAI language model using the provided settings.
        This method creates and returns a ChatOpenAI instance configured with the model's URL, name, API key, and temperature.

        Returns:
            ChatOpenAI: The initialized ChatOpenAI language model instance.

        Raises:
            Exception: If the model initialization fails.
        """
        try:
            return ChatOpenAI(
                base_url=str(self.settings.model.url),
                model=self.settings.model.name,
                api_key=self.settings.model.api_key,
                temperature=self.settings.model.temperature,
            )
        except Exception as e:
            logger.error(f"Failed to initialize ChatOpenAI model: {e}")
            raise

    @classmethod
    async def _setup_memory(cls) -> tuple[AsyncRedisStore, AsyncRedisSaver]:
        """
        Initialize and set up the Redis store and checkpointer for state persistence.

        Returns:
            tuple[AsyncRedisStore, AsyncRedisSaver]: Configured Redis store and saver instances.
        """
        store_ctx = AsyncRedisStore.from_conn_string(str(cls.settings.memory.url))
        checkpointer_ctx = AsyncRedisSaver.from_conn_string(
            str(cls.settings.memory.url)
        )
        store = await store_ctx.__aenter__()
        checkpointer = await checkpointer_ctx.__aenter__()
        await store.setup()
        await checkpointer.asetup()
        return store, checkpointer

    @staticmethod
    async def _setup_tools(_mcp_client: MultiServerMCPClient) -> list[BaseTool]:
        """
        Retrieve a list of tools from the provided MCP client.

        Args:
            _mcp_client: The MultiServerMCPClient instance used to fetch available tools.

        Returns:
            list[BaseTool]: A list of BaseTool objects retrieved from the MCP client.
        """
        return await _mcp_client.get_tools()

    def draw_graph(self) -> None:
        """Render the compiled state graph as a Mermaid PNG image and save it."""
        self._graph.get_graph().draw_mermaid_png(
            output_file_path=self.settings.directory.assets / "graph.png"
        )

    async def generate_response(
        self, message: str, history: list[ChatMessage]
    ) -> AsyncGenerator[tuple[str, list[ChatMessage], Path | None]]:
        """
        Generate a response to a user message and update the conversation history.
        This asynchronous method streams responses from the state graph and yields updated history and audio file paths as needed.

        Args:
            message: The user's input message as a string.
            history: The conversation history as a list of ChatMessage objects.

        Returns:
            AsyncGenerator[tuple[str, list[ChatMessage], Path]]: Yields a tuple containing an empty string, the updated history, and a Path to an audio file if generated.
        """
        async for response in self._graph.astream(
            MessagesState(messages=[HumanMessage(content=message)]),
            RunnableConfig(configurable={"thread_id": "1", "user_id": "1"}),
            stream_mode="updates",
        ):
            if response.keys() == {"agent"}:
                last_agent_message = response["agent"]["messages"][-1]
                if last_agent_message.tool_calls:
                    history.append(
                        ChatMessage(
                            role="assistant",
                            content=dumps(
                                last_agent_message.tool_calls[0]["args"], indent=4
                            ),
                            metadata=MetadataDict(
                                title=last_agent_message.tool_calls[0]["name"],
                                id=last_agent_message.tool_calls[0]["id"],
                            ),
                        )
                    )
                else:
                    history.append(
                        ChatMessage(
                            role="assistant", content=last_agent_message.content
                        )
                    )
            else:
                last_tool_message = response["tools"]["messages"][-1]
                history.append(
                    ChatMessage(
                        role="assistant",
                        content=last_tool_message.content,
                        metadata=MetadataDict(
                            title=last_tool_message.name,
                            id=last_tool_message.id,
                        ),
                    )
                )
                if is_url(last_tool_message.content):
                    logger.info(f"Downloading audio from {last_tool_message.content}")
                    file_path: Path = (
                        self.settings.directory.audio / last_tool_message.id
                    )
                    download_file(
                        last_tool_message.content, file_path.with_suffix(".aac")
                    )
                    logger.info(f"Audio downloaded to {file_path.with_suffix('.aac')}")
                    convert_audio_to_wav(
                        file_path.with_suffix(".aac"), file_path.with_suffix(".wav")
                    )
                    yield "", history, file_path.with_suffix(".wav")
            yield "", history, None