#!/usr/bin/env python # coding=utf-8 # Copyright 2024 The HuggingFace Inc. team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import base64 import inspect import json import pickle import time from io import BytesIO from pathlib import Path from textwrap import dedent from typing import Any import PIL.Image import requests from .default_tools import FinalAnswerTool from .local_python_executor import PythonExecutor from .monitoring import LogLevel from .tools import Tool, get_tools_definition_code from .utils import AgentError try: from dotenv import load_dotenv load_dotenv() except ModuleNotFoundError: pass class RemotePythonExecutor(PythonExecutor): FINAL_ANSWER_EXCEPTION = "FinalAnswerException" def __init__(self, additional_imports: list[str], logger): self.additional_imports = additional_imports self.logger = logger self.logger.log("Initializing executor, hold on...") self.installed_packages = [] def run_code_raise_errors(self, code: str) -> tuple[Any, str, bool]: """ Execute code, return the result and output, also determining if the result is the final answer. """ raise NotImplementedError def send_tools(self, tools: dict[str, Tool]): if "final_answer" in tools: self._patch_final_answer_with_exception(tools["final_answer"]) # Install tool packages packages_to_install = { pkg for tool in tools.values() for pkg in tool.to_dict()["requirements"] if pkg not in self.installed_packages + ["smolagents"] } if packages_to_install: self.installed_packages += self.install_packages(list(packages_to_install)) # Get tool definitions code = get_tools_definition_code(tools) if code: execution = self.run_code_raise_errors(code) self.logger.log(execution[1]) def send_variables(self, variables: dict): """ Send variables to the kernel namespace using pickle. """ pickled_vars = base64.b64encode(pickle.dumps(variables)).decode() code = f""" import pickle, base64 vars_dict = pickle.loads(base64.b64decode('{pickled_vars}')) locals().update(vars_dict) """ self.run_code_raise_errors(code) def __call__(self, code_action: str) -> tuple[Any, str, bool]: """Run the code and determine if it is the final answer.""" return self.run_code_raise_errors(code_action) def install_packages(self, additional_imports: list[str]): if additional_imports: _, execution_logs, _ = self.run_code_raise_errors(f"!pip install {' '.join(additional_imports)}") self.logger.log(execution_logs) return additional_imports def _patch_final_answer_with_exception(self, final_answer_tool: FinalAnswerTool): """Patch the FinalAnswerTool to raise an exception. This is necessary because the remote executors rely on the FinalAnswerTool to detect the final answer. It modifies the `forward` method of the FinalAnswerTool to raise a `FinalAnswerException` with the final answer as a pickled value. This allows the executor to catch this exception and return the final answer. Args: final_answer_tool (`FinalAnswerTool`): FinalAnswerTool instance to patch. """ # Create a new class that inherits from the original FinalAnswerTool class _FinalAnswerTool(final_answer_tool.__class__): pass # Add a new forward method that raises the FinalAnswerException # - Define the new forward method function def forward(self, *args, **kwargs) -> Any: import base64 import pickle class FinalAnswerException(Exception): def __init__(self, value): self.value = value raise FinalAnswerException(base64.b64encode(pickle.dumps(self._forward(*args, **kwargs))).decode()) # - Set the new forward method function to the _FinalAnswerTool class _FinalAnswerTool.forward = forward # Rename the original forward method to _forward # - Get the original forward method function from the final_answer_tool instance original_forward_function = final_answer_tool.forward.__func__ # - Set the new _forward method function to the _FinalAnswerTool class _FinalAnswerTool._forward = original_forward_function # - Update the source code of the new forward method to match the original but with the new name _FinalAnswerTool._forward.__source__ = inspect.getsource(original_forward_function).replace( "def forward(", "def _forward(" ) # Set the new class as the class of the final_answer_tool instance final_answer_tool.__class__ = _FinalAnswerTool class E2BExecutor(RemotePythonExecutor): """ Executes Python code using E2B. Args: additional_imports (`list[str]`): Additional imports to install. logger (`Logger`): Logger to use. **kwargs: Additional arguments to pass to the E2B Sandbox. """ def __init__(self, additional_imports: list[str], logger, **kwargs): super().__init__(additional_imports, logger) try: from e2b_code_interpreter import Sandbox except ModuleNotFoundError: raise ModuleNotFoundError( """Please install 'e2b' extra to use E2BExecutor: `pip install 'smolagents[e2b]'`""" ) self.sandbox = Sandbox(**kwargs) self.installed_packages = self.install_packages(additional_imports) self.logger.log("E2B is running", level=LogLevel.INFO) def run_code_raise_errors(self, code: str) -> tuple[Any, str, bool]: execution = self.sandbox.run_code(code) execution_logs = "\n".join([str(log) for log in execution.logs.stdout]) # Handle errors if execution.error: # Check if the error is a FinalAnswerException if execution.error.name == RemotePythonExecutor.FINAL_ANSWER_EXCEPTION: final_answer = pickle.loads(base64.b64decode(execution.error.value)) return final_answer, execution_logs, True # Construct error message error_message = ( f"{execution_logs}\n" f"Executing code yielded an error:\n" f"{execution.error.name}\n" f"{execution.error.value}\n" f"{execution.error.traceback}" ) raise AgentError(error_message, self.logger) # Handle results if not execution.results: return None, execution_logs, False for result in execution.results: if not result.is_main_result: continue # Handle image outputs for attribute_name in ["jpeg", "png"]: img_data = getattr(result, attribute_name, None) if img_data is not None: decoded_bytes = base64.b64decode(img_data.encode("utf-8")) return PIL.Image.open(BytesIO(decoded_bytes)), execution_logs, False # Handle other data formats for attribute_name in [ "chart", "data", "html", "javascript", "json", "latex", "markdown", "pdf", "svg", "text", ]: data = getattr(result, attribute_name, None) if data is not None: return data, execution_logs, False # If no main result found, return None return None, execution_logs, False def cleanup(self): """Clean up the E2B sandbox and resources.""" try: if hasattr(self, "sandbox"): self.logger.log("Shutting down sandbox...", level=LogLevel.INFO) self.sandbox.kill() self.logger.log("Sandbox cleanup completed", level=LogLevel.INFO) del self.sandbox except Exception as e: self.logger.log_error(f"Error during cleanup: {e}") class DockerExecutor(RemotePythonExecutor): """ Executes Python code using Jupyter Kernel Gateway in a Docker container. """ def __init__( self, additional_imports: list[str], logger, host: str = "127.0.0.1", port: int = 8888, image_name: str = "jupyter-kernel", build_new_image: bool = True, container_run_kwargs: dict[str, Any] | None = None, ): """ Initialize the Docker-based Jupyter Kernel Gateway executor. Args: additional_imports: Additional imports to install. logger: Logger to use. host: Host to bind to. port: Port to bind to. image_name: Name of the Docker image to use. If the image doesn't exist, it will be built. build_new_image: If True, the image will be rebuilt even if it already exists. container_run_kwargs: Additional keyword arguments to pass to the Docker container run command. """ super().__init__(additional_imports, logger) try: import docker from websocket import create_connection except ModuleNotFoundError: raise ModuleNotFoundError( "Please install 'docker' extra to use DockerExecutor: `pip install 'smolagents[docker]'`" ) self.host = host self.port = port self.image_name = image_name # Initialize Docker try: self.client = docker.from_env() except docker.errors.DockerException as e: raise RuntimeError("Could not connect to Docker daemon: make sure Docker is running.") from e # Build and start container try: # Check if image exists, unless forced to rebuild if not build_new_image: try: self.client.images.get(self.image_name) self.logger.log(f"Using existing Docker image: {self.image_name}", level=LogLevel.INFO) except docker.errors.ImageNotFound: self.logger.log(f"Image {self.image_name} not found, building...", level=LogLevel.INFO) build_new_image = True if build_new_image: self.logger.log(f"Building Docker image {self.image_name}...", level=LogLevel.INFO) dockerfile_path = Path(__file__).parent / "Dockerfile" if not dockerfile_path.exists(): with open(dockerfile_path, "w") as f: f.write( dedent( """\ FROM python:3.12-slim RUN pip install jupyter_kernel_gateway jupyter_client EXPOSE 8888 CMD ["jupyter", "kernelgateway", "--KernelGatewayApp.ip='0.0.0.0'", "--KernelGatewayApp.port=8888", "--KernelGatewayApp.allow_origin='*'"] """ ) ) _, build_logs = self.client.images.build( path=str(dockerfile_path.parent), dockerfile=str(dockerfile_path), tag=self.image_name ) for log_chunk in build_logs: # Only log non-empty messages if log_message := log_chunk.get("stream", "").rstrip(): self.logger.log(log_message, level=LogLevel.DEBUG) self.logger.log(f"Starting container on {host}:{port}...", level=LogLevel.INFO) # Create base container parameters container_kwargs = {} if container_run_kwargs: container_kwargs.update(container_run_kwargs) # Ensure required port mapping and background running if not isinstance(container_kwargs.get("ports"), dict): container_kwargs["ports"] = {} container_kwargs["ports"]["8888/tcp"] = (host, port) container_kwargs["detach"] = True self.container = self.client.containers.run(self.image_name, **container_kwargs) retries = 0 while self.container.status != "running" and retries < 5: self.logger.log(f"Container status: {self.container.status}, waiting...", level=LogLevel.INFO) time.sleep(1) self.container.reload() retries += 1 self.base_url = f"http://{host}:{port}" # Create new kernel via HTTP r = requests.post(f"{self.base_url}/api/kernels") if r.status_code != 201: error_details = { "status_code": r.status_code, "headers": dict(r.headers), "url": r.url, "body": r.text, "request_method": r.request.method, "request_headers": dict(r.request.headers), "request_body": r.request.body, } self.logger.log_error(f"Failed to create kernel. Details: {json.dumps(error_details, indent=2)}") raise RuntimeError(f"Failed to create kernel: Status {r.status_code}\nResponse: {r.text}") from None self.kernel_id = r.json()["id"] ws_url = f"ws://{host}:{port}/api/kernels/{self.kernel_id}/channels" self.ws = create_connection(ws_url) self.installed_packages = self.install_packages(additional_imports) self.logger.log( f"Container {self.container.short_id} is running with kernel {self.kernel_id}", level=LogLevel.INFO ) except Exception as e: self.cleanup() raise RuntimeError(f"Failed to initialize Jupyter kernel: {e}") from e def run_code_raise_errors(self, code_action: str) -> tuple[Any, str, bool]: try: # Send execute request msg_id = self._send_execute_request(code_action) # Collect output and results outputs = [] result = None is_final_answer = False while True: msg = json.loads(self.ws.recv()) parent_msg_id = msg.get("parent_header", {}).get("msg_id") # Skip unrelated messages if parent_msg_id != msg_id: continue msg_type = msg.get("msg_type", "") msg_content = msg.get("content", {}) if msg_type == "stream": outputs.append(msg_content["text"]) elif msg_type == "execute_result": result = msg_content["data"].get("text/plain", None) elif msg_type == "error": if msg_content.get("ename", "") == RemotePythonExecutor.FINAL_ANSWER_EXCEPTION: result = pickle.loads(base64.b64decode(msg_content.get("evalue", ""))) is_final_answer = True else: raise AgentError("\n".join(msg_content.get("traceback", [])), self.logger) elif msg_type == "status" and msg_content["execution_state"] == "idle": break return result, "".join(outputs), is_final_answer except Exception as e: self.logger.log_error(f"Code execution failed: {e}") raise def _send_execute_request(self, code: str) -> str: """Send code execution request to kernel.""" import uuid # Generate a unique message ID msg_id = str(uuid.uuid4()) # Create execute request execute_request = { "header": { "msg_id": msg_id, "username": "anonymous", "session": str(uuid.uuid4()), "msg_type": "execute_request", "version": "5.0", }, "parent_header": {}, "metadata": {}, "content": { "code": code, "silent": False, "store_history": True, "user_expressions": {}, "allow_stdin": False, }, } self.ws.send(json.dumps(execute_request)) return msg_id def cleanup(self): """Clean up the Docker container and resources.""" try: if hasattr(self, "container"): self.logger.log(f"Stopping and removing container {self.container.short_id}...", level=LogLevel.INFO) self.container.stop() self.container.remove() self.logger.log("Container cleanup completed", level=LogLevel.INFO) del self.container except Exception as e: self.logger.log_error(f"Error during cleanup: {e}") def delete(self): """Ensure cleanup on deletion.""" self.cleanup() __all__ = ["E2BExecutor", "DockerExecutor"]