Spaces:
Running
Running
| from collections.abc import AsyncIterator | |
| from typing import Any | |
| from unittest.mock import AsyncMock | |
| from langchain_core.agents import AgentFinish | |
| from langflow.base.agents.agent import process_agent_events | |
| from langflow.base.agents.events import ( | |
| handle_on_chain_end, | |
| handle_on_chain_start, | |
| handle_on_chain_stream, | |
| handle_on_tool_end, | |
| handle_on_tool_error, | |
| handle_on_tool_start, | |
| ) | |
| from langflow.schema.content_block import ContentBlock | |
| from langflow.schema.content_types import ToolContent | |
| from langflow.schema.message import Message | |
| from langflow.utils.constants import MESSAGE_SENDER_AI | |
| async def create_event_iterator(events: list[dict[str, Any]]) -> AsyncIterator[dict[str, Any]]: | |
| """Helper function to create an async iterator from a list of events.""" | |
| for event in events: | |
| yield event | |
| async def test_chain_start_event(): | |
| """Test handling of on_chain_start event.""" | |
| send_message = AsyncMock(side_effect=lambda message: message) | |
| events = [ | |
| {"event": "on_chain_start", "data": {"input": {"input": "test input", "chat_history": []}}, "start_time": 0} | |
| ] | |
| # Initialize message with content blocks | |
| agent_message = Message( | |
| sender=MESSAGE_SENDER_AI, | |
| sender_name="Agent", | |
| properties={"icon": "Bot", "state": "partial"}, | |
| content_blocks=[ContentBlock(title="Agent Steps", contents=[])], | |
| session_id="test_session_id", | |
| ) | |
| send_message.return_value = agent_message | |
| result = await process_agent_events(create_event_iterator(events), agent_message, send_message) | |
| assert result.properties.icon == "Bot" | |
| assert len(result.content_blocks) == 1 | |
| assert result.content_blocks[0].title == "Agent Steps" | |
| async def test_chain_end_event(): | |
| """Test handling of on_chain_end event.""" | |
| send_message = AsyncMock(side_effect=lambda message: message) | |
| # Create a mock AgentFinish output | |
| output = AgentFinish(return_values={"output": "final output"}, log="test log") | |
| events = [{"event": "on_chain_end", "data": {"output": output}, "start_time": 0}] | |
| # Initialize message with content blocks | |
| agent_message = Message( | |
| sender=MESSAGE_SENDER_AI, | |
| sender_name="Agent", | |
| properties={"icon": "Bot", "state": "partial"}, | |
| content_blocks=[ContentBlock(title="Agent Steps", contents=[])], | |
| session_id="test_session_id", | |
| ) | |
| send_message.return_value = agent_message | |
| result = await process_agent_events(create_event_iterator(events), agent_message, send_message) | |
| assert result.properties.icon == "Bot" | |
| assert result.properties.state == "complete" | |
| assert result.text == "final output" | |
| async def test_tool_start_event(): | |
| """Test handling of on_tool_start event.""" | |
| send_message = AsyncMock() | |
| # Set up the send_message mock to return the modified message | |
| def update_message(message): | |
| # Return a copy of the message to simulate real behavior | |
| return Message(**message.model_dump()) | |
| send_message.side_effect = update_message | |
| events = [ | |
| { | |
| "event": "on_tool_start", | |
| "name": "test_tool", | |
| "run_id": "test_run", | |
| "data": {"input": {"query": "tool input"}}, | |
| "start_time": 0, | |
| } | |
| ] | |
| agent_message = Message( | |
| sender=MESSAGE_SENDER_AI, | |
| sender_name="Agent", | |
| properties={"icon": "Bot", "state": "partial"}, | |
| content_blocks=[ContentBlock(title="Agent Steps", contents=[])], | |
| session_id="test_session_id", | |
| ) | |
| result = await process_agent_events(create_event_iterator(events), agent_message, send_message) | |
| assert result.properties.icon == "Bot" | |
| assert len(result.content_blocks) == 1 | |
| assert result.content_blocks[0].title == "Agent Steps" | |
| assert len(result.content_blocks[0].contents) > 0 | |
| tool_content = result.content_blocks[0].contents[-1] | |
| assert isinstance(tool_content, ToolContent) | |
| assert tool_content.name == "test_tool" | |
| assert tool_content.tool_input == {"query": "tool input"}, tool_content | |
| async def test_tool_end_event(): | |
| """Test handling of on_tool_end event.""" | |
| send_message = AsyncMock(side_effect=lambda message: message) | |
| events = [ | |
| { | |
| "event": "on_tool_start", | |
| "name": "test_tool", | |
| "run_id": "test_run", | |
| "data": {"input": {"query": "tool input"}}, | |
| "start_time": 0, | |
| }, | |
| { | |
| "event": "on_tool_end", | |
| "name": "test_tool", | |
| "run_id": "test_run", | |
| "data": {"output": "tool output"}, | |
| "start_time": 0, | |
| }, | |
| ] | |
| agent_message = Message( | |
| sender=MESSAGE_SENDER_AI, | |
| sender_name="Agent", | |
| properties={"icon": "Bot", "state": "partial"}, | |
| content_blocks=[ContentBlock(title="Agent Steps", contents=[])], | |
| session_id="test_session_id", | |
| ) | |
| result = await process_agent_events(create_event_iterator(events), agent_message, send_message) | |
| assert len(result.content_blocks) == 1 | |
| tool_content = result.content_blocks[0].contents[-1] | |
| assert tool_content.name == "test_tool" | |
| assert tool_content.output == "tool output" | |
| async def test_tool_error_event(): | |
| """Test handling of on_tool_error event.""" | |
| send_message = AsyncMock(side_effect=lambda message: message) | |
| events = [ | |
| { | |
| "event": "on_tool_start", | |
| "name": "test_tool", | |
| "run_id": "test_run", | |
| "data": {"input": {"query": "tool input"}}, | |
| "start_time": 0, | |
| }, | |
| { | |
| "event": "on_tool_error", | |
| "name": "test_tool", | |
| "run_id": "test_run", | |
| "data": {"error": "error message"}, | |
| "start_time": 0, | |
| }, | |
| ] | |
| agent_message = Message( | |
| sender=MESSAGE_SENDER_AI, | |
| sender_name="Agent", | |
| properties={"icon": "Bot", "state": "partial"}, | |
| content_blocks=[ContentBlock(title="Agent Steps", contents=[])], | |
| session_id="test_session_id", | |
| ) | |
| result = await process_agent_events(create_event_iterator(events), agent_message, send_message) | |
| tool_content = result.content_blocks[0].contents[-1] | |
| assert tool_content.name == "test_tool" | |
| assert tool_content.error == "error message" | |
| assert tool_content.header["title"] == "Error using **test_tool**" | |
| async def test_chain_stream_event(): | |
| """Test handling of on_chain_stream event.""" | |
| send_message = AsyncMock(side_effect=lambda message: message) | |
| events = [{"event": "on_chain_stream", "data": {"chunk": {"output": "streamed output"}}, "start_time": 0}] | |
| agent_message = Message( | |
| sender=MESSAGE_SENDER_AI, | |
| sender_name="Agent", | |
| properties={"icon": "Bot", "state": "partial"}, | |
| content_blocks=[ContentBlock(title="Agent Steps", contents=[])], | |
| session_id="test_session_id", | |
| ) | |
| result = await process_agent_events(create_event_iterator(events), agent_message, send_message) | |
| assert result.properties.state == "complete" | |
| assert result.text == "streamed output" | |
| async def test_multiple_events(): | |
| """Test handling of multiple events in sequence.""" | |
| send_message = AsyncMock(side_effect=lambda message: message) | |
| # Create a mock AgentFinish output instead of MockOutput | |
| output = AgentFinish(return_values={"output": "final output"}, log="test log") | |
| events = [ | |
| {"event": "on_chain_start", "data": {"input": {"input": "initial input", "chat_history": []}}, "start_time": 0}, | |
| { | |
| "event": "on_tool_start", | |
| "name": "test_tool", | |
| "run_id": "test_run", | |
| "data": {"input": {"query": "tool input"}}, | |
| "start_time": 0, | |
| }, | |
| { | |
| "event": "on_tool_end", | |
| "name": "test_tool", | |
| "run_id": "test_run", | |
| "data": {"output": "tool output"}, | |
| "start_time": 0, | |
| }, | |
| {"event": "on_chain_end", "data": {"output": output}, "start_time": 0}, | |
| ] | |
| # Initialize message with content blocks | |
| agent_message = Message( | |
| sender=MESSAGE_SENDER_AI, | |
| sender_name="Agent", | |
| properties={"icon": "Bot", "state": "partial"}, | |
| content_blocks=[ContentBlock(title="Agent Steps", contents=[])], | |
| ) | |
| send_message.return_value = agent_message | |
| result = await process_agent_events(create_event_iterator(events), agent_message, send_message) | |
| assert result.properties.state == "complete" | |
| assert result.properties.icon == "Bot" | |
| assert len(result.content_blocks) == 1 | |
| assert result.text == "final output" | |
| async def test_unknown_event(): | |
| """Test handling of unknown event type.""" | |
| send_message = AsyncMock(side_effect=lambda message: message) | |
| agent_message = Message( | |
| sender=MESSAGE_SENDER_AI, | |
| sender_name="Agent", | |
| properties={"icon": "Bot", "state": "partial"}, | |
| content_blocks=[ContentBlock(title="Agent Steps", contents=[])], # Initialize with empty content block | |
| ) | |
| send_message.return_value = agent_message | |
| events = [{"event": "unknown_event", "data": {"some": "data"}, "start_time": 0}] | |
| result = await process_agent_events(create_event_iterator(events), agent_message, send_message) | |
| # Should complete without error and maintain default state | |
| assert result.properties.state == "complete" | |
| # Content blocks should be empty but present | |
| assert len(result.content_blocks) == 1 | |
| assert len(result.content_blocks[0].contents) == 0 | |
| # Additional tests for individual handler functions | |
| async def test_handle_on_chain_start_with_input(): | |
| """Test handle_on_chain_start with input.""" | |
| send_message = AsyncMock(side_effect=lambda message: message) | |
| agent_message = Message( | |
| sender=MESSAGE_SENDER_AI, | |
| sender_name="Agent", | |
| properties={"icon": "Bot", "state": "partial"}, | |
| content_blocks=[ContentBlock(title="Agent Steps", contents=[])], | |
| ) | |
| event = {"event": "on_chain_start", "data": {"input": {"input": "test input", "chat_history": []}}, "start_time": 0} | |
| updated_message, start_time = await handle_on_chain_start(event, agent_message, send_message, 0.0) | |
| assert updated_message.properties.icon == "Bot" | |
| assert len(updated_message.content_blocks) == 1 | |
| assert updated_message.content_blocks[0].title == "Agent Steps" | |
| assert isinstance(start_time, float) | |
| async def test_handle_on_chain_start_no_input(): | |
| """Test handle_on_chain_start without input.""" | |
| send_message = AsyncMock(side_effect=lambda message: message) | |
| agent_message = Message( | |
| sender=MESSAGE_SENDER_AI, | |
| sender_name="Agent", | |
| properties={"icon": "Bot", "state": "partial"}, | |
| content_blocks=[ContentBlock(title="Agent Steps", contents=[])], | |
| ) | |
| event = {"event": "on_chain_start", "data": {}, "start_time": 0} | |
| updated_message, start_time = await handle_on_chain_start(event, agent_message, send_message, 0.0) | |
| assert updated_message.properties.icon == "Bot" | |
| assert len(updated_message.content_blocks) == 1 | |
| assert len(updated_message.content_blocks[0].contents) == 0 | |
| assert isinstance(start_time, float) | |
| async def test_handle_on_chain_end_with_output(): | |
| """Test handle_on_chain_end with output.""" | |
| send_message = AsyncMock(side_effect=lambda message: message) | |
| agent_message = Message( | |
| sender=MESSAGE_SENDER_AI, | |
| sender_name="Agent", | |
| properties={"icon": "Bot", "state": "partial"}, | |
| content_blocks=[ContentBlock(title="Agent Steps", contents=[])], | |
| ) | |
| output = AgentFinish(return_values={"output": "final output"}, log="test log") | |
| event = {"event": "on_chain_end", "data": {"output": output}, "start_time": 0} | |
| updated_message, start_time = await handle_on_chain_end(event, agent_message, send_message, 0.0) | |
| assert updated_message.properties.icon == "Bot" | |
| assert updated_message.properties.state == "complete" | |
| assert updated_message.text == "final output" | |
| assert isinstance(start_time, float) | |
| async def test_handle_on_chain_end_no_output(): | |
| """Test handle_on_chain_end without output key in data.""" | |
| send_message = AsyncMock(side_effect=lambda message: message) | |
| agent_message = Message( | |
| sender=MESSAGE_SENDER_AI, | |
| sender_name="Agent", | |
| properties={"icon": "Bot", "state": "partial"}, | |
| content_blocks=[ContentBlock(title="Agent Steps", contents=[])], | |
| ) | |
| event = {"event": "on_chain_end", "data": {}, "start_time": 0} | |
| updated_message, start_time = await handle_on_chain_end(event, agent_message, send_message, 0.0) | |
| assert updated_message.properties.icon == "Bot" | |
| assert updated_message.properties.state == "partial" | |
| assert updated_message.text == "" | |
| assert isinstance(start_time, float) | |
| async def test_handle_on_chain_end_empty_data(): | |
| """Test handle_on_chain_end with empty data.""" | |
| send_message = AsyncMock(side_effect=lambda message: message) | |
| agent_message = Message( | |
| sender=MESSAGE_SENDER_AI, | |
| sender_name="Agent", | |
| properties={"icon": "Bot", "state": "partial"}, | |
| content_blocks=[ContentBlock(title="Agent Steps", contents=[])], | |
| ) | |
| event = {"event": "on_chain_end", "data": {"output": None}, "start_time": 0} | |
| updated_message, start_time = await handle_on_chain_end(event, agent_message, send_message, 0.0) | |
| assert updated_message.properties.icon == "Bot" | |
| assert updated_message.properties.state == "partial" | |
| assert updated_message.text == "" | |
| assert isinstance(start_time, float) | |
| async def test_handle_on_chain_end_with_empty_return_values(): | |
| """Test handle_on_chain_end with empty return_values.""" | |
| send_message = AsyncMock(side_effect=lambda message: message) | |
| agent_message = Message( | |
| sender=MESSAGE_SENDER_AI, | |
| sender_name="Agent", | |
| properties={"icon": "Bot", "state": "partial"}, | |
| content_blocks=[ContentBlock(title="Agent Steps", contents=[])], | |
| ) | |
| class MockOutputEmptyReturnValues: | |
| def __init__(self): | |
| self.return_values = {} | |
| event = {"event": "on_chain_end", "data": {"output": MockOutputEmptyReturnValues()}, "start_time": 0} | |
| updated_message, start_time = await handle_on_chain_end(event, agent_message, send_message, 0.0) | |
| assert updated_message.properties.icon == "Bot" | |
| assert updated_message.properties.state == "partial" | |
| assert updated_message.text == "" | |
| assert isinstance(start_time, float) | |
| async def test_handle_on_tool_start(): | |
| """Test handle_on_tool_start event.""" | |
| send_message = AsyncMock(side_effect=lambda message: message) | |
| tool_blocks_map = {} | |
| agent_message = Message( | |
| sender=MESSAGE_SENDER_AI, | |
| sender_name="Agent", | |
| properties={"icon": "Bot", "state": "partial"}, | |
| content_blocks=[ContentBlock(title="Agent Steps", contents=[])], | |
| ) | |
| event = { | |
| "event": "on_tool_start", | |
| "name": "test_tool", | |
| "run_id": "test_run", | |
| "data": {"input": {"query": "tool input"}}, | |
| "start_time": 0, | |
| } | |
| updated_message, start_time = await handle_on_tool_start(event, agent_message, tool_blocks_map, send_message, 0.0) | |
| assert len(updated_message.content_blocks) == 1 | |
| assert len(updated_message.content_blocks[0].contents) > 0 | |
| tool_key = f"{event['name']}_{event['run_id']}" | |
| tool_content = updated_message.content_blocks[0].contents[-1] | |
| assert tool_content == tool_blocks_map.get(tool_key) | |
| assert isinstance(tool_content, ToolContent) | |
| assert tool_content.name == "test_tool" | |
| assert tool_content.tool_input == {"query": "tool input"} | |
| assert isinstance(tool_content.duration, int) | |
| assert isinstance(start_time, float) | |
| async def test_handle_on_tool_end(): | |
| """Test handle_on_tool_end event.""" | |
| send_message = AsyncMock(side_effect=lambda message: message) | |
| tool_blocks_map = {} | |
| agent_message = Message( | |
| sender=MESSAGE_SENDER_AI, | |
| sender_name="Agent", | |
| properties={"icon": "Bot", "state": "partial"}, | |
| content_blocks=[ContentBlock(title="Agent Steps", contents=[])], | |
| ) | |
| start_event = { | |
| "event": "on_tool_start", | |
| "name": "test_tool", | |
| "run_id": "test_run", | |
| "data": {"input": {"query": "tool input"}}, | |
| } | |
| agent_message, _ = await handle_on_tool_start(start_event, agent_message, tool_blocks_map, send_message, 0.0) | |
| end_event = { | |
| "event": "on_tool_end", | |
| "name": "test_tool", | |
| "run_id": "test_run", | |
| "data": {"output": "tool output"}, | |
| "start_time": 0, | |
| } | |
| updated_message, start_time = await handle_on_tool_end(end_event, agent_message, tool_blocks_map, send_message, 0.0) | |
| f"{end_event['name']}_{end_event['run_id']}" | |
| tool_content = updated_message.content_blocks[0].contents[-1] | |
| assert tool_content.name == "test_tool" | |
| assert tool_content.output == "tool output" | |
| assert isinstance(tool_content.duration, int) | |
| assert isinstance(start_time, float) | |
| async def test_handle_on_tool_error(): | |
| """Test handle_on_tool_error event.""" | |
| send_message = AsyncMock(side_effect=lambda message: message) | |
| tool_blocks_map = {} | |
| agent_message = Message( | |
| sender=MESSAGE_SENDER_AI, | |
| sender_name="Agent", | |
| properties={"icon": "Bot", "state": "partial"}, | |
| content_blocks=[ContentBlock(title="Agent Steps", contents=[])], | |
| ) | |
| start_event = { | |
| "event": "on_tool_start", | |
| "name": "test_tool", | |
| "run_id": "test_run", | |
| "data": {"input": {"query": "tool input"}}, | |
| } | |
| agent_message, _ = await handle_on_tool_start(start_event, agent_message, tool_blocks_map, send_message, 0.0) | |
| error_event = { | |
| "event": "on_tool_error", | |
| "name": "test_tool", | |
| "run_id": "test_run", | |
| "data": {"error": "error message"}, | |
| "start_time": 0, | |
| } | |
| updated_message, start_time = await handle_on_tool_error( | |
| error_event, agent_message, tool_blocks_map, send_message, 0.0 | |
| ) | |
| tool_content = updated_message.content_blocks[0].contents[-1] | |
| assert tool_content.name == "test_tool" | |
| assert tool_content.error == "error message" | |
| assert tool_content.header["title"] == "Error using **test_tool**" | |
| assert isinstance(tool_content.duration, int) | |
| assert isinstance(start_time, float) | |
| async def test_handle_on_chain_stream_with_output(): | |
| """Test handle_on_chain_stream with output.""" | |
| send_message = AsyncMock(side_effect=lambda message: message) | |
| agent_message = Message( | |
| sender=MESSAGE_SENDER_AI, | |
| sender_name="Agent", | |
| properties={"icon": "Bot", "state": "partial"}, | |
| content_blocks=[ContentBlock(title="Agent Steps", contents=[])], | |
| ) | |
| event = { | |
| "event": "on_chain_stream", | |
| "data": {"chunk": {"output": "streamed output"}}, | |
| } | |
| updated_message, start_time = await handle_on_chain_stream(event, agent_message, send_message, 0.0) | |
| assert updated_message.text == "streamed output" | |
| assert updated_message.properties.state == "complete" | |
| assert isinstance(start_time, float) | |
| async def test_handle_on_chain_stream_no_output(): | |
| """Test handle_on_chain_stream without output.""" | |
| send_message = AsyncMock(side_effect=lambda message: message) | |
| agent_message = Message( | |
| sender=MESSAGE_SENDER_AI, | |
| sender_name="Agent", | |
| properties={"icon": "Bot", "state": "partial"}, | |
| content_blocks=[ContentBlock(title="Agent Steps", contents=[])], | |
| session_id="test_session_id", | |
| ) | |
| event = { | |
| "event": "on_chain_stream", | |
| "data": {"chunk": {}}, | |
| } | |
| updated_message, start_time = await handle_on_chain_stream(event, agent_message, send_message, 0.0) | |
| assert updated_message.text == "" | |
| assert updated_message.properties.state == "partial" | |
| assert isinstance(start_time, float) | |