Duibonduil's picture
Upload 21 files
9c31777 verified
# coding=utf-8
# Copyright 2024 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import io
import os
import re
import tempfile
import uuid
import warnings
from collections.abc import Generator
from contextlib import nullcontext as does_not_raise
from dataclasses import dataclass
from pathlib import Path
from textwrap import dedent
from typing import Optional
from unittest.mock import MagicMock, patch
import pytest
from huggingface_hub import (
ChatCompletionOutputFunctionDefinition,
ChatCompletionOutputMessage,
ChatCompletionOutputToolCall,
)
from rich.console import Console
from smolagents import EMPTY_PROMPT_TEMPLATES
from smolagents.agent_types import AgentImage, AgentText
from smolagents.agents import (
AgentError,
AgentMaxStepsError,
AgentToolCallError,
CodeAgent,
MultiStepAgent,
ToolCall,
ToolCallingAgent,
ToolOutput,
populate_template,
)
from smolagents.default_tools import DuckDuckGoSearchTool, FinalAnswerTool, PythonInterpreterTool, VisitWebpageTool
from smolagents.memory import (
ActionStep,
PlanningStep,
TaskStep,
)
from smolagents.models import (
ChatMessage,
ChatMessageToolCall,
ChatMessageToolCallFunction,
InferenceClientModel,
MessageRole,
Model,
TransformersModel,
)
from smolagents.monitoring import AgentLogger, LogLevel, TokenUsage
from smolagents.tools import Tool, tool
from smolagents.utils import (
BASE_BUILTIN_MODULES,
AgentExecutionError,
AgentGenerationError,
AgentToolExecutionError,
)
@dataclass
class ChoiceDeltaToolCallFunction:
arguments: Optional[str] = None
name: Optional[str] = None
@dataclass
class ChoiceDeltaToolCall:
index: Optional[int] = None
id: Optional[str] = None
function: Optional[ChoiceDeltaToolCallFunction] = None
type: Optional[str] = None
@dataclass
class ChoiceDelta:
content: Optional[str] = None
function_call: Optional[str] = None
refusal: Optional[str] = None
role: Optional[str] = None
tool_calls: Optional[list] = None
def get_new_path(suffix="") -> str:
directory = tempfile.mkdtemp()
return os.path.join(directory, str(uuid.uuid4()) + suffix)
@pytest.fixture
def agent_logger():
return AgentLogger(
LogLevel.DEBUG, console=Console(record=True, no_color=True, force_terminal=False, file=io.StringIO())
)
class FakeToolCallModel(Model):
def generate(self, messages, tools_to_call_from=None, stop_sequences=None):
if len(messages) < 3:
return ChatMessage(
role=MessageRole.ASSISTANT,
content="",
tool_calls=[
ChatMessageToolCall(
id="call_0",
type="function",
function=ChatMessageToolCallFunction(
name="python_interpreter", arguments={"code": "2*3.6452"}
),
)
],
)
else:
return ChatMessage(
role=MessageRole.ASSISTANT,
content="",
tool_calls=[
ChatMessageToolCall(
id="call_1",
type="function",
function=ChatMessageToolCallFunction(name="final_answer", arguments={"answer": "7.2904"}),
)
],
)
class FakeToolCallModelImage(Model):
def generate(self, messages, tools_to_call_from=None, stop_sequences=None):
if len(messages) < 3:
return ChatMessage(
role=MessageRole.ASSISTANT,
content="",
tool_calls=[
ChatMessageToolCall(
id="call_0",
type="function",
function=ChatMessageToolCallFunction(
name="fake_image_generation_tool",
arguments={"prompt": "An image of a cat"},
),
)
],
)
else:
return ChatMessage(
role=MessageRole.ASSISTANT,
content="",
tool_calls=[
ChatMessageToolCall(
id="call_1",
type="function",
function=ChatMessageToolCallFunction(name="final_answer", arguments="image.png"),
)
],
)
class FakeToolCallModelVL(Model):
def generate(self, messages, tools_to_call_from=None, stop_sequences=None):
if len(messages) < 3:
return ChatMessage(
role=MessageRole.ASSISTANT,
content="",
tool_calls=[
ChatMessageToolCall(
id="call_0",
type="function",
function=ChatMessageToolCallFunction(
name="fake_image_understanding_tool",
arguments={
"prompt": "What is in this image?",
"image": "image.png",
},
),
)
],
)
else:
return ChatMessage(
role=MessageRole.ASSISTANT,
content="",
tool_calls=[
ChatMessageToolCall(
id="call_1",
type="function",
function=ChatMessageToolCallFunction(name="final_answer", arguments="The image is a cat."),
)
],
)
class FakeCodeModel(Model):
def generate(self, messages, stop_sequences=None):
prompt = str(messages)
if "special_marker" not in prompt:
return ChatMessage(
role=MessageRole.ASSISTANT,
content="""
Thought: I should multiply 2 by 3.6452. special_marker
<code>
result = 2**3.6452
</code>
""",
)
else: # We're at step 2
return ChatMessage(
role=MessageRole.ASSISTANT,
content="""
Thought: I can now answer the initial question
<code>
final_answer(7.2904)
</code>
""",
)
class FakeCodeModelPlanning(Model):
def generate(self, messages, stop_sequences=None):
prompt = str(messages)
if "planning_marker" not in prompt:
return ChatMessage(
role=MessageRole.ASSISTANT,
content="llm plan update planning_marker",
token_usage=TokenUsage(input_tokens=10, output_tokens=10),
)
elif "action_marker" not in prompt:
return ChatMessage(
role=MessageRole.ASSISTANT,
content="""
Thought: I should multiply 2 by 3.6452. action_marker
<code>
result = 2**3.6452
</code>
""",
token_usage=TokenUsage(input_tokens=10, output_tokens=10),
)
else:
return ChatMessage(
role=MessageRole.ASSISTANT,
content="llm plan again",
token_usage=TokenUsage(input_tokens=10, output_tokens=10),
)
class FakeCodeModelError(Model):
def generate(self, messages, stop_sequences=None):
prompt = str(messages)
if "special_marker" not in prompt:
return ChatMessage(
role=MessageRole.ASSISTANT,
content="""
Thought: I should multiply 2 by 3.6452. special_marker
<code>
print("Flag!")
def error_function():
raise ValueError("error")
error_function()
</code>
""",
)
else: # We're at step 2
return ChatMessage(
role=MessageRole.ASSISTANT,
content="""
Thought: I faced an error in the previous step.
<code>
final_answer("got an error")
</code>
""",
)
class FakeCodeModelSyntaxError(Model):
def generate(self, messages, stop_sequences=None):
prompt = str(messages)
if "special_marker" not in prompt:
return ChatMessage(
role=MessageRole.ASSISTANT,
content="""
Thought: I should multiply 2 by 3.6452. special_marker
<code>
a = 2
b = a * 2
print("Failing due to unexpected indent")
print("Ok, calculation done!")
</code>
""",
)
else: # We're at step 2
return ChatMessage(
role=MessageRole.ASSISTANT,
content="""
Thought: I can now answer the initial question
<code>
final_answer("got an error")
</code>
""",
)
class FakeCodeModelImport(Model):
def generate(self, messages, stop_sequences=None):
return ChatMessage(
role=MessageRole.ASSISTANT,
content="""
Thought: I can answer the question
<code>
import numpy as np
final_answer("got an error")
</code>
""",
)
class FakeCodeModelFunctionDef(Model):
def generate(self, messages, stop_sequences=None):
prompt = str(messages)
if "special_marker" not in prompt:
return ChatMessage(
role=MessageRole.ASSISTANT,
content="""
Thought: Let's define the function. special_marker
<code>
import numpy as np
def moving_average(x, w):
return np.convolve(x, np.ones(w), 'valid') / w
</code>
""",
)
else: # We're at step 2
return ChatMessage(
role=MessageRole.ASSISTANT,
content="""
Thought: I can now answer the initial question
<code>
x, w = [0, 1, 2, 3, 4, 5], 2
res = moving_average(x, w)
final_answer(res)
</code>
""",
)
class FakeCodeModelSingleStep(Model):
def generate(self, messages, stop_sequences=None):
return ChatMessage(
role=MessageRole.ASSISTANT,
content="""
Thought: I should multiply 2 by 3.6452. special_marker
<code>
result = python_interpreter(code="2*3.6452")
final_answer(result)
```
""",
)
class FakeCodeModelNoReturn(Model):
def generate(self, messages, stop_sequences=None):
return ChatMessage(
role=MessageRole.ASSISTANT,
content="""
Thought: I should multiply 2 by 3.6452. special_marker
<code>
result = python_interpreter(code="2*3.6452")
print(result)
```
""",
)
class TestAgent:
def test_fake_toolcalling_agent(self):
agent = ToolCallingAgent(tools=[PythonInterpreterTool()], model=FakeToolCallModel())
output = agent.run("What is 2 multiplied by 3.6452?")
assert isinstance(output, str)
assert "7.2904" in output
assert agent.memory.steps[0].task == "What is 2 multiplied by 3.6452?"
assert "7.2904" in agent.memory.steps[1].observations
assert (
agent.memory.steps[2].model_output
== "Tool call call_1: calling 'final_answer' with arguments: {'answer': '7.2904'}"
)
def test_toolcalling_agent_handles_image_tool_outputs(self, shared_datadir):
import PIL.Image
@tool
def fake_image_generation_tool(prompt: str) -> PIL.Image.Image:
"""Tool that generates an image.
Args:
prompt: The prompt
"""
import PIL.Image
return PIL.Image.open(shared_datadir / "000000039769.png")
agent = ToolCallingAgent(
tools=[fake_image_generation_tool], model=FakeToolCallModelImage(), verbosity_level=10
)
output = agent.run("Make me an image.")
assert isinstance(output, AgentImage)
assert isinstance(agent.state["image.png"], PIL.Image.Image)
def test_toolcalling_agent_handles_image_inputs(self, shared_datadir):
import PIL.Image
image = PIL.Image.open(shared_datadir / "000000039769.png") # dummy input
@tool
def fake_image_understanding_tool(prompt: str, image: PIL.Image.Image) -> str:
"""Tool that creates a caption for an image.
Args:
prompt: The prompt
image: The image
"""
return "The image is a cat."
agent = ToolCallingAgent(tools=[fake_image_understanding_tool], model=FakeToolCallModelVL())
output = agent.run("Caption this image.", images=[image])
assert output == "The image is a cat."
def test_fake_code_agent(self):
agent = CodeAgent(tools=[PythonInterpreterTool()], model=FakeCodeModel(), verbosity_level=10)
output = agent.run("What is 2 multiplied by 3.6452?")
assert isinstance(output, float)
assert output == 7.2904
assert agent.memory.steps[0].task == "What is 2 multiplied by 3.6452?"
assert agent.memory.steps[2].tool_calls == [
ToolCall(name="python_interpreter", arguments="final_answer(7.2904)", id="call_2")
]
def test_additional_args_added_to_task(self):
agent = CodeAgent(tools=[], model=FakeCodeModel())
agent.run(
"What is 2 multiplied by 3.6452?",
additional_args={"instruction": "Remember this."},
)
assert "Remember this" in agent.task
def test_reset_conversations(self):
agent = CodeAgent(tools=[PythonInterpreterTool()], model=FakeCodeModel())
output = agent.run("What is 2 multiplied by 3.6452?", reset=True)
assert output == 7.2904
assert len(agent.memory.steps) == 3
output = agent.run("What is 2 multiplied by 3.6452?", reset=False)
assert output == 7.2904
assert len(agent.memory.steps) == 5
output = agent.run("What is 2 multiplied by 3.6452?", reset=True)
assert output == 7.2904
assert len(agent.memory.steps) == 3
def test_setup_agent_with_empty_toolbox(self):
ToolCallingAgent(model=FakeToolCallModel(), tools=[])
def test_fails_max_steps(self):
agent = CodeAgent(
tools=[PythonInterpreterTool()],
model=FakeCodeModelNoReturn(), # use this callable because it never ends
max_steps=5,
)
answer = agent.run("What is 2 multiplied by 3.6452?")
assert len(agent.memory.steps) == 7 # Task step + 5 action steps + Final answer
assert type(agent.memory.steps[-1].error) is AgentMaxStepsError
assert isinstance(answer, str)
agent = CodeAgent(
tools=[PythonInterpreterTool()],
model=FakeCodeModelNoReturn(), # use this callable because it never ends
max_steps=5,
)
answer = agent.run("What is 2 multiplied by 3.6452?", max_steps=3)
assert len(agent.memory.steps) == 5 # Task step + 3 action steps + Final answer
assert type(agent.memory.steps[-1].error) is AgentMaxStepsError
assert isinstance(answer, str)
def test_tool_descriptions_get_baked_in_system_prompt(self):
tool = PythonInterpreterTool()
tool.name = "fake_tool_name"
tool.description = "fake_tool_description"
agent = CodeAgent(tools=[tool], model=FakeCodeModel())
agent.run("Empty task")
assert agent.system_prompt is not None
assert f"def {tool.name}(" in agent.system_prompt
assert f'"""{tool.description}' in agent.system_prompt
def test_module_imports_get_baked_in_system_prompt(self):
agent = CodeAgent(tools=[], model=FakeCodeModel())
agent.run("Empty task")
for module in BASE_BUILTIN_MODULES:
assert module in agent.system_prompt
def test_init_agent_with_different_toolsets(self):
toolset_1 = []
agent = CodeAgent(tools=toolset_1, model=FakeCodeModel())
assert len(agent.tools) == 1 # when no tools are provided, only the final_answer tool is added by default
toolset_2 = [PythonInterpreterTool(), PythonInterpreterTool()]
with pytest.raises(ValueError) as e:
agent = CodeAgent(tools=toolset_2, model=FakeCodeModel())
assert "Each tool or managed_agent should have a unique name!" in str(e)
with pytest.raises(ValueError) as e:
agent.name = "python_interpreter"
agent.description = "empty"
CodeAgent(tools=[PythonInterpreterTool()], model=FakeCodeModel(), managed_agents=[agent])
assert "Each tool or managed_agent should have a unique name!" in str(e)
# check that python_interpreter base tool does not get added to CodeAgent
agent = CodeAgent(tools=[], model=FakeCodeModel(), add_base_tools=True)
assert len(agent.tools) == 3 # added final_answer tool + search + visit_webpage
# check that python_interpreter base tool gets added to ToolCallingAgent
agent = ToolCallingAgent(tools=[], model=FakeCodeModel(), add_base_tools=True)
assert len(agent.tools) == 4 # added final_answer tool + search + visit_webpage
def test_function_persistence_across_steps(self):
agent = CodeAgent(
tools=[],
model=FakeCodeModelFunctionDef(),
max_steps=2,
additional_authorized_imports=["numpy"],
verbosity_level=100,
)
res = agent.run("ok")
assert res[0] == 0.5
def test_init_managed_agent(self):
agent = CodeAgent(tools=[], model=FakeCodeModelFunctionDef(), name="managed_agent", description="Empty")
assert agent.name == "managed_agent"
assert agent.description == "Empty"
def test_agent_description_gets_correctly_inserted_in_system_prompt(self):
managed_agent = CodeAgent(
tools=[], model=FakeCodeModelFunctionDef(), name="managed_agent", description="Empty"
)
manager_agent = CodeAgent(
tools=[],
model=FakeCodeModelFunctionDef(),
managed_agents=[managed_agent],
)
assert "You can also give tasks to team members." not in managed_agent.system_prompt
assert "{{managed_agents_descriptions}}" not in managed_agent.system_prompt
assert "You can also give tasks to team members." in manager_agent.system_prompt
def test_replay_shows_logs(self, agent_logger):
agent = CodeAgent(
tools=[],
model=FakeCodeModelImport(),
verbosity_level=0,
additional_authorized_imports=["numpy"],
logger=agent_logger,
)
agent.run("Count to 3")
str_output = agent_logger.console.export_text()
assert "New run" in str_output
assert 'final_answer("got' in str_output
assert "</code>" in str_output
agent = ToolCallingAgent(tools=[PythonInterpreterTool()], model=FakeToolCallModel(), verbosity_level=0)
agent.logger = agent_logger
agent.run("What is 2 multiplied by 3.6452?")
agent.replay()
str_output = agent_logger.console.export_text()
assert "Tool call" in str_output
assert "arguments" in str_output
def test_code_nontrivial_final_answer_works(self):
class FakeCodeModelFinalAnswer(Model):
def generate(self, messages, stop_sequences=None):
return ChatMessage(
role=MessageRole.ASSISTANT,
content="""<code>
def nested_answer():
final_answer("Correct!")
nested_answer()
</code>""",
)
agent = CodeAgent(tools=[], model=FakeCodeModelFinalAnswer())
output = agent.run("Count to 3")
assert output == "Correct!"
def test_transformers_toolcalling_agent(self):
@tool
def weather_api(location: str, celsius: str = "") -> str:
"""
Gets the weather in the next days at given location.
Secretly this tool does not care about the location, it hates the weather everywhere.
Args:
location: the location
celsius: the temperature type
"""
return "The weather is UNGODLY with torrential rains and temperatures below -10°C"
model = TransformersModel(
model_id="HuggingFaceTB/SmolLM2-360M-Instruct",
max_new_tokens=100,
device_map="auto",
do_sample=False,
)
agent = ToolCallingAgent(model=model, tools=[weather_api], max_steps=1, verbosity_level=10)
task = "What is the weather in Paris? "
agent.run(task)
assert agent.memory.steps[0].task == task
assert agent.memory.steps[1].tool_calls[0].name == "weather_api"
step_memory_dict = agent.memory.get_succinct_steps()[1]
assert step_memory_dict["model_output_message"]["tool_calls"][0]["function"]["name"] == "weather_api"
assert step_memory_dict["model_output_message"]["raw"]["completion_kwargs"]["max_new_tokens"] == 100
assert "model_input_messages" in agent.memory.get_full_steps()[1]
assert step_memory_dict["token_usage"]["total_tokens"] > 100
assert step_memory_dict["timing"]["duration"] > 0.1
def test_final_answer_checks(self):
error_string = "failed with error"
def check_always_fails(final_answer, agent_memory):
assert False, "Error raised in check"
agent = CodeAgent(model=FakeCodeModel(), tools=[], final_answer_checks=[check_always_fails])
agent.run("Dummy task.")
assert error_string in str(agent.write_memory_to_messages())
assert "Error raised in check" in str(agent.write_memory_to_messages())
agent = CodeAgent(
model=FakeCodeModel(),
tools=[],
final_answer_checks=[lambda x, y: x == 7.2904],
verbosity_level=1000,
)
output = agent.run("Dummy task.")
assert output == 7.2904 # Check that output is correct
assert len([step for step in agent.memory.steps if isinstance(step, ActionStep)]) == 2
assert error_string not in str(agent.write_memory_to_messages())
def test_generation_errors_are_raised(self):
class FakeCodeModel(Model):
def generate(self, messages, stop_sequences=None):
assert False, "Generation failed"
agent = CodeAgent(model=FakeCodeModel(), tools=[])
with pytest.raises(AgentGenerationError) as e:
agent.run("Dummy task.")
assert len(agent.memory.steps) == 2
assert "Generation failed" in str(e)
def test_planning_step_with_injected_memory(self):
"""Test that agent properly uses update plan prompts when memory is injected before a run.
This test verifies:
1. Planning steps are created with the correct frequency
2. Injected memory is included in planning context
3. Messages are properly formatted with expected roles and content
"""
planning_interval = 1
max_steps = 4
task = "Continuous task"
previous_task = "Previous user request"
# Create agent with planning capability
agent = CodeAgent(
tools=[],
planning_interval=planning_interval,
model=FakeCodeModelPlanning(),
max_steps=max_steps,
)
# Inject memory before run to simulate existing conversation history
previous_step = TaskStep(task=previous_task)
agent.memory.steps.append(previous_step)
# Run the agent
agent.run(task, reset=False)
# Extract and validate planning steps
planning_steps = [step for step in agent.memory.steps if isinstance(step, PlanningStep)]
assert len(planning_steps) > 2, "Expected multiple planning steps to be generated"
# Verify first planning step incorporates injected memory
first_planning_step = planning_steps[0]
input_messages = first_planning_step.model_input_messages
# Check message structure and content
assert len(input_messages) == 4, (
"First planning step should have 4 messages: system-plan-pre-update + memory + task + user-plan-post-update"
)
# Verify system message contains current task
system_message = input_messages[0]
assert system_message.role == "system", "First message should have system role"
assert task in system_message.content[0]["text"], f"System message should contain the current task: '{task}'"
# Verify memory message contains previous task
memory_message = input_messages[1]
assert previous_task in memory_message.content[0]["text"], (
f"Memory message should contain previous task: '{previous_task}'"
)
# Verify task message contains current task
task_message = input_messages[2]
assert task in task_message.content[0]["text"], f"Task message should contain current task: '{task}'"
# Verify user message for planning
user_message = input_messages[3]
assert user_message.role == "user", "Fourth message should have user role"
# Verify second planning step has more context from first agent actions
second_planning_step = planning_steps[1]
second_messages = second_planning_step.model_input_messages
# Check that conversation history is growing appropriately
assert len(second_messages) == 6, "Second planning step should have 6 messages including tool interactions"
# Verify all conversation elements are present
conversation_text = "".join([msg.content[0]["text"] for msg in second_messages if hasattr(msg, "content")])
assert previous_task in conversation_text, "Previous task should be included in the conversation history"
assert task in conversation_text, "Current task should be included in the conversation history"
assert "tools" in conversation_text, "Tool interactions should be included in the conversation history"
class CustomFinalAnswerTool(FinalAnswerTool):
def forward(self, answer) -> str:
return answer + "CUSTOM"
class MockTool(Tool):
def __init__(self, name):
self.name = name
self.description = "Mock tool description"
self.inputs = {}
self.output_type = "string"
def forward(self):
return "Mock tool output"
class MockAgent:
def __init__(self, name, tools, description="Mock agent description"):
self.name = name
self.tools = {t.name: t for t in tools}
self.description = description
class DummyMultiStepAgent(MultiStepAgent):
def step(self, memory_step: ActionStep) -> Generator[None]:
yield None
def initialize_system_prompt(self):
pass
class TestMultiStepAgent:
def test_instantiation_disables_logging_to_terminal(self):
fake_model = MagicMock()
agent = DummyMultiStepAgent(tools=[], model=fake_model)
assert agent.logger.level == -1, "logging to terminal should be disabled for testing using a fixture"
def test_instantiation_with_prompt_templates(self, prompt_templates):
agent = DummyMultiStepAgent(tools=[], model=MagicMock(), prompt_templates=prompt_templates)
assert agent.prompt_templates == prompt_templates
assert agent.prompt_templates["system_prompt"] == "This is a test system prompt."
assert "managed_agent" in agent.prompt_templates
assert agent.prompt_templates["managed_agent"]["task"] == "Task for {{name}}: {{task}}"
assert agent.prompt_templates["managed_agent"]["report"] == "Report for {{name}}: {{final_answer}}"
@pytest.mark.parametrize(
"tools, expected_final_answer_tool",
[([], FinalAnswerTool), ([CustomFinalAnswerTool()], CustomFinalAnswerTool)],
)
def test_instantiation_with_final_answer_tool(self, tools, expected_final_answer_tool):
agent = DummyMultiStepAgent(tools=tools, model=MagicMock())
assert "final_answer" in agent.tools
assert isinstance(agent.tools["final_answer"], expected_final_answer_tool)
def test_instantiation_with_deprecated_grammar(self):
class SimpleAgent(MultiStepAgent):
def initialize_system_prompt(self) -> str:
return "Test system prompt"
# Test with a non-None grammar parameter
with pytest.warns(
FutureWarning, match="Parameter 'grammar' is deprecated and will be removed in version 1.20."
):
SimpleAgent(tools=[], model=MagicMock(), grammar={"format": "json"}, verbosity_level=LogLevel.DEBUG)
# Verify no warning when grammar is None
with warnings.catch_warnings():
warnings.simplefilter("error") # Turn warnings into errors
SimpleAgent(tools=[], model=MagicMock(), grammar=None, verbosity_level=LogLevel.DEBUG)
def test_system_prompt_property(self):
"""Test that system_prompt property is read-only and calls initialize_system_prompt."""
class SimpleAgent(MultiStepAgent):
def initialize_system_prompt(self) -> str:
return "Test system prompt"
def step(self, memory_step: ActionStep) -> Generator[None]:
yield None
# Create a simple agent with mocked model
model = MagicMock()
agent = SimpleAgent(tools=[], model=model)
# Test reading the property works and calls initialize_system_prompt
assert agent.system_prompt == "Test system prompt"
# Test setting the property raises AttributeError with correct message
with pytest.raises(
AttributeError,
match=re.escape(
"""The 'system_prompt' property is read-only. Use 'self.prompt_templates["system_prompt"]' instead."""
),
):
agent.system_prompt = "New system prompt"
# assert "read-only" in str(exc_info.value)
# assert "Use 'self.prompt_templates[\"system_prompt\"]' instead" in str(exc_info.value)
def test_logs_display_thoughts_even_if_error(self):
class FakeJsonModelNoCall(Model):
def generate(self, messages, stop_sequences=None, tools_to_call_from=None):
return ChatMessage(
role=MessageRole.ASSISTANT,
content="""I don't want to call tools today""",
tool_calls=None,
raw="""I don't want to call tools today""",
)
agent_toolcalling = ToolCallingAgent(model=FakeJsonModelNoCall(), tools=[], max_steps=1, verbosity_level=10)
with agent_toolcalling.logger.console.capture() as capture:
agent_toolcalling.run("Dummy task")
assert "don't" in capture.get() and "want" in capture.get()
class FakeCodeModelNoCall(Model):
def generate(self, messages, stop_sequences=None):
return ChatMessage(
role=MessageRole.ASSISTANT,
content="""I don't want to write an action today""",
)
agent_code = CodeAgent(model=FakeCodeModelNoCall(), tools=[], max_steps=1, verbosity_level=10)
with agent_code.logger.console.capture() as capture:
agent_code.run("Dummy task")
assert "don't" in capture.get() and "want" in capture.get()
def test_step_number(self):
fake_model = MagicMock()
fake_model.generate.return_value = ChatMessage(
role=MessageRole.ASSISTANT,
content="Model output.",
tool_calls=None,
raw="Model output.",
token_usage=None,
)
max_steps = 2
agent = CodeAgent(tools=[], model=fake_model, max_steps=max_steps)
assert hasattr(agent, "step_number"), "step_number attribute should be defined"
assert agent.step_number == 0, "step_number should be initialized to 0"
agent.run("Test task")
assert hasattr(agent, "step_number"), "step_number attribute should be defined"
assert agent.step_number == max_steps + 1, "step_number should be max_steps + 1 after run method is called"
@pytest.mark.parametrize(
"step, expected_messages_list",
[
(
1,
[
[
ChatMessage(
role=MessageRole.USER, content=[{"type": "text", "text": "INITIAL_PLAN_USER_PROMPT"}]
),
],
],
),
(
2,
[
[
ChatMessage(
role=MessageRole.SYSTEM,
content=[{"type": "text", "text": "UPDATE_PLAN_SYSTEM_PROMPT"}],
),
ChatMessage(
role=MessageRole.USER,
content=[{"type": "text", "text": "UPDATE_PLAN_USER_PROMPT"}],
),
],
],
),
],
)
def test_planning_step(self, step, expected_messages_list):
fake_model = MagicMock()
agent = CodeAgent(
tools=[],
model=fake_model,
)
task = "Test task"
planning_step = list(agent._generate_planning_step(task, is_first_step=(step == 1), step=step))[-1]
expected_message_texts = {
"INITIAL_PLAN_USER_PROMPT": populate_template(
agent.prompt_templates["planning"]["initial_plan"],
variables=dict(
task=task,
tools=agent.tools,
managed_agents=agent.managed_agents,
answer_facts=planning_step.model_output_message.content,
),
),
"UPDATE_PLAN_SYSTEM_PROMPT": populate_template(
agent.prompt_templates["planning"]["update_plan_pre_messages"], variables=dict(task=task)
),
"UPDATE_PLAN_USER_PROMPT": populate_template(
agent.prompt_templates["planning"]["update_plan_post_messages"],
variables=dict(
task=task,
tools=agent.tools,
managed_agents=agent.managed_agents,
facts_update=planning_step.model_output_message.content,
remaining_steps=agent.max_steps - step,
),
),
}
for expected_messages in expected_messages_list:
for expected_message in expected_messages:
expected_message.content[0]["text"] = expected_message_texts[expected_message.content[0]["text"]]
assert isinstance(planning_step, PlanningStep)
expected_model_input_messages = expected_messages_list[0]
model_input_messages = planning_step.model_input_messages
assert isinstance(model_input_messages, list)
assert len(model_input_messages) == len(expected_model_input_messages) # 2
for message, expected_message in zip(model_input_messages, expected_model_input_messages):
assert isinstance(message, ChatMessage)
assert message.role in MessageRole.__members__.values()
assert message.role == expected_message.role
assert isinstance(message.content, list)
for content, expected_content in zip(message.content, expected_message.content):
assert content == expected_content
# Test calls to model
assert len(fake_model.generate.call_args_list) == 1
for call_args, expected_messages in zip(fake_model.generate.call_args_list, expected_messages_list):
assert len(call_args.args) == 1
messages = call_args.args[0]
assert isinstance(messages, list)
assert len(messages) == len(expected_messages)
for message, expected_message in zip(messages, expected_messages):
assert isinstance(message, ChatMessage)
assert message.role in MessageRole.__members__.values()
assert message.role == expected_message.role
assert isinstance(message.content, list)
for content, expected_content in zip(message.content, expected_message.content):
assert content == expected_content
@pytest.mark.parametrize(
"images, expected_messages_list",
[
(
None,
[
[
ChatMessage(
role=MessageRole.SYSTEM,
content=[{"type": "text", "text": "FINAL_ANSWER_SYSTEM_PROMPT"}],
),
ChatMessage(
role=MessageRole.USER,
content=[{"type": "text", "text": "FINAL_ANSWER_USER_PROMPT"}],
),
]
],
),
(
["image1.png"],
[
[
ChatMessage(
role=MessageRole.SYSTEM,
content=[
{"type": "text", "text": "FINAL_ANSWER_SYSTEM_PROMPT"},
{"type": "image", "image": "image1.png"},
],
),
ChatMessage(
role=MessageRole.USER,
content=[{"type": "text", "text": "FINAL_ANSWER_USER_PROMPT"}],
),
]
],
),
],
)
def test_provide_final_answer(self, images, expected_messages_list):
fake_model = MagicMock()
fake_model.generate.return_value = ChatMessage(
role=MessageRole.ASSISTANT,
content="Final answer.",
tool_calls=None,
raw="Final answer.",
token_usage=None,
)
agent = CodeAgent(
tools=[],
model=fake_model,
)
task = "Test task"
final_answer = agent.provide_final_answer(task, images=images).content
expected_message_texts = {
"FINAL_ANSWER_SYSTEM_PROMPT": agent.prompt_templates["final_answer"]["pre_messages"],
"FINAL_ANSWER_USER_PROMPT": populate_template(
agent.prompt_templates["final_answer"]["post_messages"], variables=dict(task=task)
),
}
for expected_messages in expected_messages_list:
for expected_message in expected_messages:
for expected_content in expected_message.content:
if "text" in expected_content:
expected_content["text"] = expected_message_texts[expected_content["text"]]
assert final_answer == "Final answer."
# Test calls to model
assert len(fake_model.generate.call_args_list) == 1
for call_args, expected_messages in zip(fake_model.generate.call_args_list, expected_messages_list):
assert len(call_args.args) == 1
messages = call_args.args[0]
assert isinstance(messages, list)
assert len(messages) == len(expected_messages)
for message, expected_message in zip(messages, expected_messages):
assert isinstance(message, ChatMessage)
assert message.role in MessageRole.__members__.values()
assert message.role == expected_message.role
assert isinstance(message.content, list)
for content, expected_content in zip(message.content, expected_message.content):
assert content == expected_content
def test_interrupt(self):
fake_model = MagicMock()
fake_model.generate.return_value = ChatMessage(
role=MessageRole.ASSISTANT,
content="Model output.",
tool_calls=None,
raw="Model output.",
token_usage=None,
)
def interrupt_callback(memory_step, agent):
agent.interrupt()
agent = CodeAgent(
tools=[],
model=fake_model,
step_callbacks=[interrupt_callback],
)
with pytest.raises(AgentError) as e:
agent.run("Test task")
assert "Agent interrupted" in str(e)
@pytest.mark.parametrize(
"tools, managed_agents, name, expectation",
[
# Valid case: no duplicates
(
[MockTool("tool1"), MockTool("tool2")],
[MockAgent("agent1", [MockTool("tool3")])],
"test_agent",
does_not_raise(),
),
# Invalid case: duplicate tool names
([MockTool("tool1"), MockTool("tool1")], [], "test_agent", pytest.raises(ValueError)),
# Invalid case: tool name same as managed agent name
(
[MockTool("tool1")],
[MockAgent("tool1", [MockTool("final_answer")])],
"test_agent",
pytest.raises(ValueError),
),
# Valid case: tool name same as managed agent's tool name
([MockTool("tool1")], [MockAgent("agent1", [MockTool("tool1")])], "test_agent", does_not_raise()),
# Invalid case: duplicate managed agent name and managed agent tool name
([MockTool("tool1")], [], "tool1", pytest.raises(ValueError)),
# Valid case: duplicate tool names across managed agents
(
[MockTool("tool1")],
[
MockAgent("agent1", [MockTool("tool2"), MockTool("final_answer")]),
MockAgent("agent2", [MockTool("tool2"), MockTool("final_answer")]),
],
"test_agent",
does_not_raise(),
),
],
)
def test_validate_tools_and_managed_agents(self, tools, managed_agents, name, expectation):
fake_model = MagicMock()
with expectation:
DummyMultiStepAgent(
tools=tools,
model=fake_model,
name=name,
managed_agents=managed_agents,
)
def test_from_dict(self):
# Create a test agent dictionary
agent_dict = {
"model": {"class": "TransformersModel", "data": {"model_id": "test/model"}},
"tools": [
{
"name": "valid_tool_function",
"code": 'from smolagents import Tool\nfrom typing import Any, Optional\n\nclass SimpleTool(Tool):\n name = "valid_tool_function"\n description = "A valid tool function."\n inputs = {"input":{"type":"string","description":"Input string."}}\n output_type = "string"\n\n def forward(self, input: str) -> str:\n """A valid tool function.\n\n Args:\n input (str): Input string.\n """\n return input.upper()',
"requirements": {"smolagents"},
}
],
"managed_agents": {},
"prompt_templates": EMPTY_PROMPT_TEMPLATES,
"max_steps": 15,
"verbosity_level": 2,
"planning_interval": 3,
"name": "test_agent",
"description": "Test agent description",
}
# Call from_dict
with patch("smolagents.models.TransformersModel") as mock_model_class:
mock_model_instance = mock_model_class.from_dict.return_value
agent = DummyMultiStepAgent.from_dict(agent_dict)
# Verify the agent was created correctly
assert agent.model == mock_model_instance
assert mock_model_class.from_dict.call_args.args[0] == {"model_id": "test/model"}
assert agent.max_steps == 15
assert agent.logger.level == 2
assert agent.planning_interval == 3
assert agent.name == "test_agent"
assert agent.description == "Test agent description"
# Verify the tool was created correctly
assert sorted(agent.tools.keys()) == ["final_answer", "valid_tool_function"]
assert agent.tools["valid_tool_function"].name == "valid_tool_function"
assert agent.tools["valid_tool_function"].description == "A valid tool function."
assert agent.tools["valid_tool_function"].inputs == {
"input": {"type": "string", "description": "Input string."}
}
assert agent.tools["valid_tool_function"]("test") == "TEST"
# Test overriding with kwargs
with patch("smolagents.models.TransformersModel") as mock_model_class:
agent = DummyMultiStepAgent.from_dict(agent_dict, max_steps=30)
assert agent.max_steps == 30
class TestToolCallingAgent:
def test_toolcalling_agent_instructions(self):
agent = ToolCallingAgent(tools=[], model=MagicMock(), instructions="Test instructions")
assert agent.instructions == "Test instructions"
assert "Test instructions" in agent.system_prompt
def test_toolcalling_agent_passes_both_tools_and_managed_agents(self, test_tool):
"""Test that both tools and managed agents are passed to the model."""
managed_agent = MagicMock()
managed_agent.name = "managed_agent"
model = MagicMock()
model.generate.return_value = ChatMessage(
role=MessageRole.ASSISTANT,
content="",
tool_calls=[
ChatMessageToolCall(
id="call_0",
type="function",
function=ChatMessageToolCallFunction(name="test_tool", arguments={"input": "test_value"}),
)
],
)
agent = ToolCallingAgent(tools=[test_tool], managed_agents=[managed_agent], model=model)
# Run the agent one step to trigger the model call
next(agent.run("Test task", stream=True))
# Check that the model was called with both tools and managed agents:
# - Get all tool_to_call_from names passed to the model
tools_to_call_from_names = [tool.name for tool in model.generate.call_args.kwargs["tools_to_call_from"]]
# - Verify both regular tools and managed agents are included
assert "test_tool" in tools_to_call_from_names # The regular tool
assert "managed_agent" in tools_to_call_from_names # The managed agent
assert "final_answer" in tools_to_call_from_names # The final_answer tool (added by default)
@patch("huggingface_hub.InferenceClient")
def test_toolcalling_agent_api(self, mock_inference_client):
mock_client = mock_inference_client.return_value
mock_response = mock_client.chat_completion.return_value
mock_response.choices[0].message = ChatCompletionOutputMessage(
role=MessageRole.ASSISTANT,
content='{"name": "weather_api", "arguments": {"location": "Paris", "date": "today"}}',
)
mock_response.usage.prompt_tokens = 10
mock_response.usage.completion_tokens = 20
model = InferenceClientModel(model_id="test-model")
from smolagents import tool
@tool
def weather_api(location: str, date: str) -> str:
"""
Gets the weather in the next days at given location.
Args:
location: the location
date: the date
"""
return f"The weather in {location} on date:{date} is sunny."
agent = ToolCallingAgent(model=model, tools=[weather_api], max_steps=1)
agent.run("What's the weather in Paris?")
assert agent.memory.steps[0].task == "What's the weather in Paris?"
assert agent.memory.steps[1].tool_calls[0].name == "weather_api"
assert agent.memory.steps[1].tool_calls[0].arguments == {"location": "Paris", "date": "today"}
assert agent.memory.steps[1].observations == "The weather in Paris on date:today is sunny."
mock_response.choices[0].message = ChatCompletionOutputMessage(
role=MessageRole.ASSISTANT,
content=None,
tool_calls=[
ChatCompletionOutputToolCall(
function=ChatCompletionOutputFunctionDefinition(
name="weather_api", arguments='{"location": "Paris", "date": "today"}'
),
id="call_0",
type="function",
)
],
)
agent.run("What's the weather in Paris?")
assert agent.memory.steps[0].task == "What's the weather in Paris?"
assert agent.memory.steps[1].tool_calls[0].name == "weather_api"
assert agent.memory.steps[1].tool_calls[0].arguments == {"location": "Paris", "date": "today"}
assert agent.memory.steps[1].observations == "The weather in Paris on date:today is sunny."
@patch("openai.OpenAI")
def test_toolcalling_agent_stream_outputs_multiple_tool_calls(self, mock_openai_client, test_tool):
"""Test that ToolCallingAgent with stream_outputs=True returns the first final_answer when multiple are called."""
mock_client = mock_openai_client.return_value
from smolagents import OpenAIServerModel
# Mock streaming response with multiple final_answer calls
mock_deltas = [
ChoiceDelta(role=MessageRole.ASSISTANT),
ChoiceDelta(
tool_calls=[
ChoiceDeltaToolCall(
index=0,
id="call_1",
function=ChoiceDeltaToolCallFunction(name="final_answer"),
type="function",
)
]
),
ChoiceDelta(
tool_calls=[ChoiceDeltaToolCall(index=0, function=ChoiceDeltaToolCallFunction(arguments='{"an'))]
),
ChoiceDelta(
tool_calls=[ChoiceDeltaToolCall(index=0, function=ChoiceDeltaToolCallFunction(arguments='swer"'))]
),
ChoiceDelta(
tool_calls=[ChoiceDeltaToolCall(index=0, function=ChoiceDeltaToolCallFunction(arguments=': "out'))]
),
ChoiceDelta(
tool_calls=[ChoiceDeltaToolCall(index=0, function=ChoiceDeltaToolCallFunction(arguments="put1"))]
),
ChoiceDelta(
tool_calls=[ChoiceDeltaToolCall(index=0, function=ChoiceDeltaToolCallFunction(arguments='"}'))]
),
ChoiceDelta(
tool_calls=[
ChoiceDeltaToolCall(
index=1,
id="call_2",
function=ChoiceDeltaToolCallFunction(name="test_tool"),
type="function",
)
]
),
ChoiceDelta(
tool_calls=[ChoiceDeltaToolCall(index=1, function=ChoiceDeltaToolCallFunction(arguments='{"in'))]
),
ChoiceDelta(
tool_calls=[ChoiceDeltaToolCall(index=1, function=ChoiceDeltaToolCallFunction(arguments='put"'))]
),
ChoiceDelta(
tool_calls=[ChoiceDeltaToolCall(index=1, function=ChoiceDeltaToolCallFunction(arguments=': "out'))]
),
ChoiceDelta(
tool_calls=[ChoiceDeltaToolCall(index=1, function=ChoiceDeltaToolCallFunction(arguments="put2"))]
),
ChoiceDelta(
tool_calls=[ChoiceDeltaToolCall(index=1, function=ChoiceDeltaToolCallFunction(arguments='"}'))]
),
]
class MockChoice:
def __init__(self, delta):
self.delta = delta
class MockChunk:
def __init__(self, delta):
self.choices = [MockChoice(delta)]
self.usage = None
mock_client.chat.completions.create.return_value = (MockChunk(delta) for delta in mock_deltas)
# Mock usage for non-streaming fallback
mock_usage = MagicMock()
mock_usage.prompt_tokens = 10
mock_usage.completion_tokens = 20
model = OpenAIServerModel(model_id="fakemodel")
agent = ToolCallingAgent(model=model, tools=[test_tool], max_steps=1, stream_outputs=True)
result = agent.run("Make 2 calls to final answer: return both 'output1' and 'output2'")
assert len(agent.memory.steps[-1].model_output_message.tool_calls) == 2
assert agent.memory.steps[-1].model_output_message.tool_calls[0].function.name == "final_answer"
assert agent.memory.steps[-1].model_output_message.tool_calls[1].function.name == "test_tool"
# The agent should return the final answer call
assert result == "output1"
@patch("huggingface_hub.InferenceClient")
def test_toolcalling_agent_api_misformatted_output(self, mock_inference_client):
"""Test that even misformatted json blobs don't interrupt the run for a ToolCallingAgent."""
mock_client = mock_inference_client.return_value
mock_response = mock_client.chat_completion.return_value
mock_response.choices[0].message = ChatCompletionOutputMessage(
role=MessageRole.ASSISTANT,
content='{"name": weather_api", "arguments": {"location": "Paris", "date": "today"}}',
)
mock_response.usage.prompt_tokens = 10
mock_response.usage.completion_tokens = 20
model = InferenceClientModel(model_id="test-model")
logger = AgentLogger(console=Console(markup=False, no_color=True))
agent = ToolCallingAgent(model=model, tools=[], max_steps=2, verbosity_level=1, logger=logger)
with agent.logger.console.capture() as capture:
agent.run("What's the weather in Paris?")
assert agent.memory.steps[0].task == "What's the weather in Paris?"
assert agent.memory.steps[1].tool_calls is None
assert "The JSON blob you used is invalid" in agent.memory.steps[1].error.message
assert "Error while parsing" in capture.get()
assert len(agent.memory.steps) == 4
def test_change_tools_after_init(self):
from smolagents import tool
@tool
def fake_tool_1() -> str:
"""Fake tool"""
return "1"
@tool
def fake_tool_2() -> str:
"""Fake tool"""
return "2"
class FakeCodeModel(Model):
def generate(self, messages, stop_sequences=None):
return ChatMessage(role=MessageRole.ASSISTANT, content="<code>\nfinal_answer(fake_tool_1())\n</code>")
agent = CodeAgent(tools=[fake_tool_1], model=FakeCodeModel())
agent.tools["final_answer"] = CustomFinalAnswerTool()
agent.tools["fake_tool_1"] = fake_tool_2
answer = agent.run("Fake task.")
assert answer == "2CUSTOM"
def test_custom_final_answer_with_custom_inputs(self, test_tool):
class CustomFinalAnswerToolWithCustomInputs(FinalAnswerTool):
inputs = {
"answer1": {"type": "string", "description": "First part of the answer."},
"answer2": {"type": "string", "description": "Second part of the answer."},
}
def forward(self, answer1: str, answer2: str) -> str:
return answer1 + " and " + answer2
model = MagicMock()
model.generate.return_value = ChatMessage(
role=MessageRole.ASSISTANT,
content=None,
tool_calls=[
ChatMessageToolCall(
id="call_0",
type="function",
function=ChatMessageToolCallFunction(
name="final_answer", arguments={"answer1": "1", "answer2": "2"}
),
),
ChatMessageToolCall(
id="call_1",
type="function",
function=ChatMessageToolCallFunction(name="test_tool", arguments={"input": "3"}),
),
],
)
agent = ToolCallingAgent(tools=[test_tool, CustomFinalAnswerToolWithCustomInputs()], model=model)
answer = agent.run("Fake task.")
assert answer == "1 and 2"
assert agent.memory.steps[-1].model_output_message.tool_calls[0].function.name == "final_answer"
assert agent.memory.steps[-1].model_output_message.tool_calls[1].function.name == "test_tool"
@pytest.mark.parametrize(
"test_case",
[
# Case 0: Single valid tool call
{
"tool_calls": [
ChatMessageToolCall(
id="call_1",
type="function",
function=ChatMessageToolCallFunction(name="test_tool", arguments={"input": "test_value"}),
)
],
"expected_model_output": "Tool call call_1: calling 'test_tool' with arguments: {'input': 'test_value'}",
"expected_observations": "Processed: test_value",
"expected_final_outputs": ["Processed: test_value"],
"expected_error": None,
},
# Case 1: Multiple tool calls
{
"tool_calls": [
ChatMessageToolCall(
id="call_1",
type="function",
function=ChatMessageToolCallFunction(name="test_tool", arguments={"input": "value1"}),
),
ChatMessageToolCall(
id="call_2",
type="function",
function=ChatMessageToolCallFunction(name="test_tool", arguments={"input": "value2"}),
),
],
"expected_model_output": "Tool call call_1: calling 'test_tool' with arguments: {'input': 'value1'}\nTool call call_2: calling 'test_tool' with arguments: {'input': 'value2'}",
"expected_observations": "Processed: value1\nProcessed: value2",
"expected_final_outputs": ["Processed: value1", "Processed: value2"],
"expected_error": None,
},
# Case 2: Invalid tool name
{
"tool_calls": [
ChatMessageToolCall(
id="call_1",
type="function",
function=ChatMessageToolCallFunction(name="nonexistent_tool", arguments={"input": "test"}),
)
],
"expected_error": AgentToolExecutionError,
},
# Case 3: Tool execution error
{
"tool_calls": [
ChatMessageToolCall(
id="call_1",
type="function",
function=ChatMessageToolCallFunction(name="test_tool", arguments={"input": "error"}),
)
],
"expected_error": AgentToolExecutionError,
},
# Case 4: Empty tool calls list
{
"tool_calls": [],
"expected_model_output": "",
"expected_observations": "",
"expected_final_outputs": [],
"expected_error": None,
},
# Case 5: Final answer call
{
"tool_calls": [
ChatMessageToolCall(
id="call_1",
type="function",
function=ChatMessageToolCallFunction(
name="final_answer", arguments={"answer": "This is the final answer"}
),
)
],
"expected_model_output": "Tool call call_1: calling 'final_answer' with arguments: {'answer': 'This is the final answer'}",
"expected_observations": "This is the final answer",
"expected_final_outputs": ["This is the final answer"],
"expected_error": None,
},
# Case 6: Invalid arguments
{
"tool_calls": [
ChatMessageToolCall(
id="call_1",
type="function",
function=ChatMessageToolCallFunction(name="test_tool", arguments={"wrong_param": "value"}),
)
],
"expected_error": AgentToolCallError,
},
],
)
def test_process_tool_calls(self, test_case, test_tool):
# Create a ToolCallingAgent instance with the test tool
agent = ToolCallingAgent(tools=[test_tool], model=MagicMock())
# Create chat message with the specified tool calls for process_tool_calls
chat_message = ChatMessage(role=MessageRole.ASSISTANT, content="", tool_calls=test_case["tool_calls"])
# Create a memory step for process_tool_calls
memory_step = ActionStep(step_number=10, timing="mock_timing")
# Process tool calls
if test_case["expected_error"]:
with pytest.raises(test_case["expected_error"]):
list(agent.process_tool_calls(chat_message, memory_step))
else:
final_outputs = list(agent.process_tool_calls(chat_message, memory_step))
assert memory_step.model_output == test_case["expected_model_output"]
assert memory_step.observations == test_case["expected_observations"]
assert [
final_output.output for final_output in final_outputs if isinstance(final_output, ToolOutput)
] == test_case["expected_final_outputs"]
# Verify memory step tool calls were updated correctly
if test_case["tool_calls"]:
assert memory_step.tool_calls == [
ToolCall(name=tool_call.function.name, arguments=tool_call.function.arguments, id=tool_call.id)
for tool_call in test_case["tool_calls"]
]
class TestCodeAgent:
def test_code_agent_instructions(self):
agent = CodeAgent(tools=[], model=MagicMock(), instructions="Test instructions")
assert agent.instructions == "Test instructions"
assert "Test instructions" in agent.system_prompt
agent = CodeAgent(
tools=[], model=MagicMock(), instructions="Test instructions", use_structured_outputs_internally=True
)
assert agent.instructions == "Test instructions"
assert "Test instructions" in agent.system_prompt
@pytest.mark.filterwarnings("ignore") # Ignore FutureWarning for deprecated grammar parameter
def test_init_with_incompatible_grammar_and_use_structured_outputs_internally(self):
# Test that using both parameters raises ValueError with correct message
with pytest.raises(
ValueError, match="You cannot use 'grammar' and 'use_structured_outputs_internally' at the same time."
):
CodeAgent(
tools=[],
model=MagicMock(),
grammar={"format": "json"},
use_structured_outputs_internally=True,
verbosity_level=LogLevel.DEBUG,
)
# Verify no error when only one option is used
# Only grammar
agent_with_grammar = CodeAgent(
tools=[],
model=MagicMock(),
grammar={"format": "json"},
use_structured_outputs_internally=False,
verbosity_level=LogLevel.DEBUG,
)
assert agent_with_grammar.grammar is not None
assert agent_with_grammar._use_structured_outputs_internally is False
# Only structured output
agent_with_structured = CodeAgent(
tools=[],
model=MagicMock(),
grammar=None,
use_structured_outputs_internally=True,
verbosity_level=LogLevel.DEBUG,
)
assert agent_with_structured.grammar is None
assert agent_with_structured._use_structured_outputs_internally is True
@pytest.mark.parametrize("provide_run_summary", [False, True])
def test_call_with_provide_run_summary(self, provide_run_summary):
agent = CodeAgent(tools=[], model=MagicMock(), provide_run_summary=provide_run_summary)
assert agent.provide_run_summary is provide_run_summary
agent.name = "test_agent"
agent.run = MagicMock(return_value="Test output")
agent.write_memory_to_messages = MagicMock(return_value=[{"content": "Test summary"}])
result = agent("Test request")
expected_summary = "Here is the final answer from your managed agent 'test_agent':\nTest output"
if provide_run_summary:
expected_summary += (
"\n\nFor more detail, find below a summary of this agent's work:\n"
"<summary_of_work>\n\nTest summary\n---\n</summary_of_work>"
)
assert result == expected_summary
def test_errors_logging(self):
class FakeCodeModel(Model):
def generate(self, messages, stop_sequences=None):
return ChatMessage(role=MessageRole.ASSISTANT, content="<code>\nsecret=3;['1', '2'][secret]\n</code>")
agent = CodeAgent(tools=[], model=FakeCodeModel(), verbosity_level=1)
with agent.logger.console.capture() as capture:
agent.run("Test request")
assert "secret\\\\" in repr(capture.get())
def test_missing_import_triggers_advice_in_error_log(self):
# Set explicit verbosity level to 1 to override the default verbosity level of -1 set in CI fixture
agent = CodeAgent(tools=[], model=FakeCodeModelImport(), verbosity_level=1)
with agent.logger.console.capture() as capture:
agent.run("Count to 3")
str_output = capture.get()
assert "`additional_authorized_imports`" in str_output.replace("\n", "")
def test_errors_show_offending_line_and_error(self):
agent = CodeAgent(tools=[PythonInterpreterTool()], model=FakeCodeModelError())
output = agent.run("What is 2 multiplied by 3.6452?")
assert isinstance(output, AgentText)
assert output == "got an error"
assert "Code execution failed at line 'error_function()'" in str(agent.memory.steps[1].error)
assert "ValueError" in str(agent.memory.steps)
def test_error_saves_previous_print_outputs(self):
agent = CodeAgent(tools=[PythonInterpreterTool()], model=FakeCodeModelError(), verbosity_level=10)
agent.run("What is 2 multiplied by 3.6452?")
assert "Flag!" in str(agent.memory.steps[1].observations)
def test_syntax_error_show_offending_lines(self):
agent = CodeAgent(tools=[PythonInterpreterTool()], model=FakeCodeModelSyntaxError())
output = agent.run("What is 2 multiplied by 3.6452?")
assert isinstance(output, AgentText)
assert output == "got an error"
assert ' print("Failing due to unexpected indent")' in str(agent.memory.steps)
assert isinstance(agent.memory.steps[-2], ActionStep)
assert agent.memory.steps[-2].code_action == dedent("""a = 2
b = a * 2
print("Failing due to unexpected indent")
print("Ok, calculation done!")""")
def test_end_code_appending(self):
# Checking original output message
orig_output = FakeCodeModelNoReturn().generate([])
assert not orig_output.content.endswith("<end_code>")
# Checking the step output
agent = CodeAgent(
tools=[PythonInterpreterTool()],
model=FakeCodeModelNoReturn(),
max_steps=1,
)
answer = agent.run("What is 2 multiplied by 3.6452?")
assert answer
memory_steps = agent.memory.steps
actions_steps = [s for s in memory_steps if isinstance(s, ActionStep)]
outputs = [s.model_output for s in actions_steps if s.model_output]
assert outputs
assert all(o.endswith("<end_code>") for o in outputs)
messages = [s.model_output_message for s in actions_steps if s.model_output_message]
assert messages
assert all(m.content.endswith("<end_code>") for m in messages)
def test_change_tools_after_init(self):
from smolagents import tool
@tool
def fake_tool_1() -> str:
"""Fake tool"""
return "1"
@tool
def fake_tool_2() -> str:
"""Fake tool"""
return "2"
class FakeCodeModel(Model):
def generate(self, messages, stop_sequences=None):
return ChatMessage(role=MessageRole.ASSISTANT, content="<code>\nfinal_answer(fake_tool_1())\n</code>")
agent = CodeAgent(tools=[fake_tool_1], model=FakeCodeModel())
agent.tools["final_answer"] = CustomFinalAnswerTool()
agent.tools["fake_tool_1"] = fake_tool_2
answer = agent.run("Fake task.")
assert answer == "2CUSTOM"
def test_local_python_executor_with_custom_functions(self):
model = MagicMock()
model.generate.return_value = ChatMessage(
role=MessageRole.ASSISTANT,
content="",
tool_calls=None,
raw="",
token_usage=None,
)
agent = CodeAgent(tools=[], model=model, executor_kwargs={"additional_functions": {"open": open}})
agent.run("Test run")
assert "open" in agent.python_executor.static_tools
@pytest.mark.parametrize("agent_dict_version", ["v1.9", "v1.10"])
def test_from_folder(self, agent_dict_version, get_agent_dict):
agent_dict = get_agent_dict(agent_dict_version)
with (
patch("smolagents.agents.Path") as mock_path,
patch("smolagents.models.InferenceClientModel") as mock_model,
):
import json
mock_path.return_value.__truediv__.return_value.read_text.return_value = json.dumps(agent_dict)
mock_model.from_dict.return_value.model_id = "Qwen/Qwen2.5-Coder-32B-Instruct"
agent = CodeAgent.from_folder("ignored_dummy_folder")
assert isinstance(agent, CodeAgent)
assert agent.name == "test_agent"
assert agent.description == "dummy description"
assert agent.max_steps == 10
assert agent.planning_interval == 2
assert agent.additional_authorized_imports == ["pandas"]
assert "pandas" in agent.authorized_imports
assert agent.executor_type == "local"
assert agent.executor_kwargs == {}
assert agent.max_print_outputs_length is None
assert agent.managed_agents == {}
assert set(agent.tools.keys()) == {"final_answer"}
assert agent.model == mock_model.from_dict.return_value
assert mock_model.from_dict.call_args.args[0]["model_id"] == "Qwen/Qwen2.5-Coder-32B-Instruct"
assert agent.model.model_id == "Qwen/Qwen2.5-Coder-32B-Instruct"
assert agent.logger.level == 2
assert agent.prompt_templates["system_prompt"] == "dummy system prompt"
def test_from_dict(self):
# Create a test agent dictionary
agent_dict = {
"model": {"class": "InferenceClientModel", "data": {"model_id": "Qwen/Qwen2.5-Coder-32B-Instruct"}},
"tools": [
{
"name": "valid_tool_function",
"code": 'from smolagents import Tool\nfrom typing import Any, Optional\n\nclass SimpleTool(Tool):\n name = "valid_tool_function"\n description = "A valid tool function."\n inputs = {"input":{"type":"string","description":"Input string."}}\n output_type = "string"\n\n def forward(self, input: str) -> str:\n """A valid tool function.\n\n Args:\n input (str): Input string.\n """\n return input.upper()',
"requirements": {"smolagents"},
}
],
"managed_agents": {},
"prompt_templates": EMPTY_PROMPT_TEMPLATES,
"max_steps": 15,
"verbosity_level": 2,
"use_structured_output": False,
"planning_interval": 3,
"name": "test_code_agent",
"description": "Test code agent description",
"authorized_imports": ["pandas", "numpy"],
"executor_type": "local",
"executor_kwargs": {"max_print_outputs_length": 10_000},
"max_print_outputs_length": 1000,
}
# Call from_dict
with patch("smolagents.models.InferenceClientModel") as mock_model_class:
mock_model_instance = mock_model_class.from_dict.return_value
agent = CodeAgent.from_dict(agent_dict)
# Verify the agent was created correctly with CodeAgent-specific parameters
assert agent.model == mock_model_instance
assert agent.additional_authorized_imports == ["pandas", "numpy"]
assert agent.executor_type == "local"
assert agent.executor_kwargs == {"max_print_outputs_length": 10_000}
assert agent.max_print_outputs_length == 1000
# Test with missing optional parameters
minimal_agent_dict = {
"model": {"class": "InferenceClientModel", "data": {"model_id": "Qwen/Qwen2.5-Coder-32B-Instruct"}},
"tools": [],
"managed_agents": {},
}
with patch("smolagents.models.InferenceClientModel"):
agent = CodeAgent.from_dict(minimal_agent_dict)
# Verify defaults are used
assert agent.max_steps == 20 # default from MultiStepAgent.__init__
# Test overriding with kwargs
with patch("smolagents.models.InferenceClientModel"):
agent = CodeAgent.from_dict(
agent_dict,
additional_authorized_imports=["matplotlib"],
executor_kwargs={"max_print_outputs_length": 5_000},
)
assert agent.additional_authorized_imports == ["matplotlib"]
assert agent.executor_kwargs == {"max_print_outputs_length": 5_000}
def test_custom_final_answer_with_custom_inputs(self):
class CustomFinalAnswerToolWithCustomInputs(FinalAnswerTool):
inputs = {
"answer1": {"type": "string", "description": "First part of the answer."},
"answer2": {"type": "string", "description": "Second part of the answer."},
}
def forward(self, answer1: str, answer2: str) -> str:
return answer1 + "CUSTOM" + answer2
model = MagicMock()
model.generate.return_value = ChatMessage(
role=MessageRole.ASSISTANT, content="<code>\nfinal_answer(answer1='1', answer2='2')\n</code>"
)
agent = CodeAgent(tools=[CustomFinalAnswerToolWithCustomInputs()], model=model)
answer = agent.run("Fake task.")
assert answer == "1CUSTOM2"
class TestMultiAgents:
def test_multiagents_save(self, tmp_path):
model = InferenceClientModel(model_id="Qwen/Qwen2.5-Coder-32B-Instruct", max_tokens=2096, temperature=0.5)
web_agent = ToolCallingAgent(
model=model,
tools=[DuckDuckGoSearchTool(max_results=2), VisitWebpageTool()],
name="web_agent",
description="does web searches",
)
code_agent = CodeAgent(model=model, tools=[], name="useless", description="does nothing in particular")
agent = CodeAgent(
model=model,
tools=[],
additional_authorized_imports=["pandas", "datetime"],
managed_agents=[web_agent, code_agent],
max_print_outputs_length=1000,
executor_type="local",
executor_kwargs={"max_print_outputs_length": 10_000},
)
agent.save(tmp_path)
expected_structure = {
"managed_agents": {
"useless": {"tools": {"files": ["final_answer.py"]}, "files": ["agent.json", "prompts.yaml"]},
"web_agent": {
"tools": {"files": ["final_answer.py", "visit_webpage.py", "web_search.py"]},
"files": ["agent.json", "prompts.yaml"],
},
},
"tools": {"files": ["final_answer.py"]},
"files": ["app.py", "requirements.txt", "agent.json", "prompts.yaml"],
}
def verify_structure(current_path: Path, structure: dict):
for dir_name, contents in structure.items():
if dir_name != "files":
# For directories, verify they exist and recurse into them
dir_path = current_path / dir_name
assert dir_path.exists(), f"Directory {dir_path} does not exist"
assert dir_path.is_dir(), f"{dir_path} is not a directory"
verify_structure(dir_path, contents)
else:
# For files, verify each exists in the current path
for file_name in contents:
file_path = current_path / file_name
assert file_path.exists(), f"File {file_path} does not exist"
assert file_path.is_file(), f"{file_path} is not a file"
verify_structure(tmp_path, expected_structure)
# Test that re-loaded agents work as expected.
agent2 = CodeAgent.from_folder(tmp_path, planning_interval=5)
assert agent2.planning_interval == 5 # Check that kwargs are used
assert set(agent2.authorized_imports) == set(["pandas", "datetime"] + BASE_BUILTIN_MODULES)
assert agent2.max_print_outputs_length == 1000
assert agent2.executor_type == "local"
assert agent2.executor_kwargs == {"max_print_outputs_length": 10_000}
assert (
agent2.managed_agents["web_agent"].tools["web_search"].max_results == 10
) # For now tool init parameters are forgotten
assert agent2.model.kwargs["temperature"] == pytest.approx(0.5)
def test_multiagents(self):
class FakeModelMultiagentsManagerAgent(Model):
model_id = "fake_model"
def generate(
self,
messages,
stop_sequences=None,
tools_to_call_from=None,
):
if tools_to_call_from is not None:
if len(messages) < 3:
return ChatMessage(
role=MessageRole.ASSISTANT,
content="",
tool_calls=[
ChatMessageToolCall(
id="call_0",
type="function",
function=ChatMessageToolCallFunction(
name="search_agent",
arguments="Who is the current US president?",
),
)
],
)
else:
assert "Report on the current US president" in str(messages)
return ChatMessage(
role=MessageRole.ASSISTANT,
content="",
tool_calls=[
ChatMessageToolCall(
id="call_0",
type="function",
function=ChatMessageToolCallFunction(
name="final_answer", arguments="Final report."
),
)
],
)
else:
if len(messages) < 3:
return ChatMessage(
role=MessageRole.ASSISTANT,
content="""
Thought: Let's call our search agent.
<code>
result = search_agent("Who is the current US president?")
</code>
""",
)
else:
assert "Report on the current US president" in str(messages)
return ChatMessage(
role=MessageRole.ASSISTANT,
content="""
Thought: Let's return the report.
<code>
final_answer("Final report.")
</code>
""",
)
manager_model = FakeModelMultiagentsManagerAgent()
class FakeModelMultiagentsManagedAgent(Model):
model_id = "fake_model"
def generate(
self,
messages,
tools_to_call_from=None,
stop_sequences=None,
):
return ChatMessage(
role=MessageRole.ASSISTANT,
content="Here is the secret content: FLAG1",
tool_calls=[
ChatMessageToolCall(
id="call_0",
type="function",
function=ChatMessageToolCallFunction(
name="final_answer",
arguments="Report on the current US president",
),
)
],
)
managed_model = FakeModelMultiagentsManagedAgent()
web_agent = ToolCallingAgent(
tools=[],
model=managed_model,
max_steps=10,
name="search_agent",
description="Runs web searches for you. Give it your request as an argument. Make the request as detailed as needed, you can ask for thorough reports",
verbosity_level=2,
)
manager_code_agent = CodeAgent(
tools=[],
model=manager_model,
managed_agents=[web_agent],
additional_authorized_imports=["time", "numpy", "pandas"],
)
report = manager_code_agent.run("Fake question.")
assert report == "Final report."
manager_toolcalling_agent = ToolCallingAgent(
tools=[],
model=manager_model,
managed_agents=[web_agent],
)
with web_agent.logger.console.capture() as capture:
report = manager_toolcalling_agent.run("Fake question.")
assert report == "Final report."
assert "FLAG1" in capture.get() # Check that managed agent's output is properly logged
# Test that visualization works
with manager_toolcalling_agent.logger.console.capture() as capture:
manager_toolcalling_agent.visualize()
assert "├──" in capture.get()
@pytest.fixture
def prompt_templates():
return {
"system_prompt": "This is a test system prompt.",
"managed_agent": {"task": "Task for {{name}}: {{task}}", "report": "Report for {{name}}: {{final_answer}}"},
"planning": {
"initial_plan": "The plan.",
"update_plan_pre_messages": "custom",
"update_plan_post_messages": "custom",
},
"final_answer": {"pre_messages": "custom", "post_messages": "custom"},
}
@pytest.mark.parametrize(
"arguments",
[
{},
{"arg": "bar"},
{None: None},
[1, 2, 3],
],
)
def test_tool_calling_agents_raises_tool_call_error_being_invoked_with_wrong_arguments(arguments):
@tool
def _sample_tool(prompt: str) -> str:
"""Tool that returns same string
Args:
prompt: The string to return
Returns:
The same string
"""
return prompt
agent = ToolCallingAgent(model=FakeToolCallModel(), tools=[_sample_tool])
with pytest.raises(AgentToolCallError):
agent.execute_tool_call(_sample_tool.name, arguments)
def test_tool_calling_agents_raises_agent_execution_error_when_tool_raises():
@tool
def _sample_tool(_: str) -> float:
"""Tool that fails
Args:
_: The pointless string
Returns:
Some number
"""
return 1 / 0
agent = ToolCallingAgent(model=FakeToolCallModel(), tools=[_sample_tool])
with pytest.raises(AgentExecutionError):
agent.execute_tool_call(_sample_tool.name, "sample")