import ast import contextlib import io import signal import re import traceback from typing import Dict, Any, Optional, Union, List from smolagents.tools import Tool import os class CodeExecutionTool(Tool): """ Executes Python code in a controlled environment for safe code interpretation. Useful for evaluating code snippets and returning their output or errors. """ name = "python_code_executor" description = "Executes a given Python code string or Python code from a file. Returns the output or error." inputs = { 'code_string': {'type': 'string', 'description': 'The Python code to execute directly.', 'nullable': True}, 'filepath': {'type': 'string', 'description': 'The path to a Python file to execute.', 'nullable': True} } outputs = {'result': {'type': 'object', 'description': 'A dictionary containing \'success\', \'output\', and/or \'error\'.'}} output_type = "object" def __init__(self, timeout: int = 10, max_output_size: int = 20000, *args, **kwargs): super().__init__(*args, **kwargs) self.timeout = timeout self.max_output_size = max_output_size self.banned_modules = [ 'os', 'subprocess', 'sys', 'builtins', 'importlib', 'pickle', 'requests', 'socket', 'shutil', 'ctypes', 'multiprocessing' ] self.is_initialized = True def _analyze_code_safety(self, code: str) -> Dict[str, Any]: """Perform static analysis to check for potentially harmful code.""" try: parsed = ast.parse(code) # Check for banned imports imports = [] for node in ast.walk(parsed): if isinstance(node, ast.Import): imports.extend(n.name for n in node.names) elif isinstance(node, ast.ImportFrom): # Ensure node.module is not None before attempting to check against banned_modules if node.module and any(banned in node.module for banned in self.banned_modules): imports.append(node.module) dangerous_imports = [imp for imp in imports if imp and any( banned in imp for banned in self.banned_modules)] if dangerous_imports: return { "safe": False, "reason": f"Potentially harmful imports detected: {dangerous_imports}" } # Check for exec/eval usage for node in ast.walk(parsed): if isinstance(node, ast.Call) and hasattr(node, 'func'): if isinstance(node.func, ast.Name) and node.func.id in ['exec', 'eval']: return { "safe": False, "reason": "Contains exec() or eval() calls" } return {"safe": True} except SyntaxError: return {"safe": False, "reason": "Invalid Python syntax"} def _timeout_handler(self, signum, frame): """Handler for timeout signal.""" raise TimeoutError("Code execution timed out") def _extract_numeric_value(self, output: str) -> Optional[Union[int, float]]: """Extract the final numeric value from output.""" # First try to get the last line that's a number lines = [line.strip() for line in output.strip().split('\n') if line.strip()] for line in reversed(lines): # Try direct conversion first try: return float(line) except ValueError: pass # Try to extract numeric portion if embedded in text numeric_match = re.search(r'[-+]?\d*\.?\d+', line) if numeric_match: try: return float(numeric_match.group()) except ValueError: pass return None # Main entry point for the agent def forward(self, code_string: Optional[str] = None, filepath: Optional[str] = None) -> Dict[str, Any]: if not code_string and not filepath: return {"success": False, "error": "No code string or filepath provided."} if code_string and filepath: return {"success": False, "error": "Provide either a code string or a filepath, not both."} code_to_execute = "" if filepath: if not os.path.exists(filepath): return {"success": False, "error": f"File not found: {filepath}"} if not filepath.endswith(".py"): return {"success": False, "error": f"File is not a Python file: {filepath}"} try: with open(filepath, 'r') as file: code_to_execute = file.read() except Exception as e: return {"success": False, "error": f"Error reading file {filepath}: {str(e)}"} elif code_string: code_to_execute = code_string return self._execute_actual_code(code_to_execute) # Renamed from execute_code to _execute_actual_code to be internal def _execute_actual_code(self, code: str) -> Dict[str, Any]: """Execute Python code and capture the output or error.""" safety_check = self._analyze_code_safety(code) if not safety_check["safe"]: return {"success": False, "error": f"Safety check failed: {safety_check['reason']}"} # Setup timeout signal.signal(signal.SIGALRM, self._timeout_handler) signal.alarm(self.timeout) captured_output = io.StringIO() # It's generally safer to execute in a restricted scope # and not provide access to all globals/locals by default. # However, for a tool that might need to define functions/classes and use them, # a shared scope might be necessary. This needs careful consideration. exec_globals = {} try: with contextlib.redirect_stdout(captured_output): with contextlib.redirect_stderr(captured_output): # Capture stderr as well exec(code, exec_globals) # Execute in a controlled global scope output = captured_output.getvalue() if len(output) > self.max_output_size: output = output[:self.max_output_size] + "... [output truncated]" # Attempt to extract a final numeric value if applicable # This might be specific to certain tasks, consider making it optional # numeric_result = self._extract_numeric_value(output) return { "success": True, "output": output, # "numeric_value": numeric_result } except TimeoutError: return {"success": False, "error": "Code execution timed out"} except Exception as e: # Get detailed traceback tb_lines = traceback.format_exception(type(e), e, e.__traceback__) error_details = "".join(tb_lines) if len(error_details) > self.max_output_size: error_details = error_details[:self.max_output_size] + "... [error truncated]" return {"success": False, "error": f"Execution failed: {str(e)}\nTraceback:\n{error_details}"} finally: signal.alarm(0) # Disable the alarm captured_output.close() # Kept execute_file and execute_code as helper methods if direct access is ever needed, # but they now call the main _execute_actual_code method. def execute_file(self, filepath: str) -> Dict[str, Any]: """Helper to execute Python code from file.""" if not os.path.exists(filepath): return {"success": False, "error": f"File not found: {filepath}"} if not filepath.endswith(".py"): return {"success": False, "error": f"File is not a Python file: {filepath}"} try: with open(filepath, 'r') as file: code = file.read() return self._execute_actual_code(code) except Exception as e: return {"success": False, "error": f"Error reading file {filepath}: {str(e)}"} def execute_code(self, code: str) -> Dict[str, Any]: """Helper to execute Python code from a string.""" return self._execute_actual_code(code) if __name__ == '__main__': tool = CodeExecutionTool(timeout=5) # Test 1: Safe code string safe_code = "print('Hello from safe code!'); result = 10 * 2; print(result)" print("\n--- Test 1: Safe Code String ---") result1 = tool.forward(code_string=safe_code) print(result1) assert result1['success'] assert "Hello from safe code!" in result1['output'] assert "20" in result1['output'] # Test 2: Code with an error error_code = "print(1/0)" print("\n--- Test 2: Code with Error ---") result2 = tool.forward(code_string=error_code) print(result2) assert not result2['success'] assert "ZeroDivisionError" in result2['error'] # Test 3: Code with a banned import unsafe_import_code = "import os; print(os.getcwd())" print("\n--- Test 3: Unsafe Import ---") result3 = tool.forward(code_string=unsafe_import_code) print(result3) assert not result3['success'] assert "Safety check failed" in result3['error'] assert "os" in result3['error'] # Test 4: Timeout timeout_code = "import time; time.sleep(10); print('Done sleeping')" print("\n--- Test 4: Timeout ---") # tool_timeout_short = CodeExecutionTool(timeout=2) # For testing timeout specifically # result4 = tool_timeout_short.forward(code_string=timeout_code) result4 = tool.forward(code_string=timeout_code) # Using the main tool instance with its timeout print(result4) assert not result4['success'] assert "timed out" in result4['error'] # Test 5: Execute from file test_file_content = "print('Hello from file!'); x = 5; y = 7; print(f'Sum: {x+y}')" test_filename = "temp_test_script.py" with open(test_filename, "w") as f: f.write(test_file_content) print("\n--- Test 5: Execute from File ---") result5 = tool.forward(filepath=test_filename) print(result5) assert result5['success'] assert "Hello from file!" in result5['output'] assert "Sum: 12" in result5['output'] os.remove(test_filename) # Test 6: File not found print("\n--- Test 6: File Not Found ---") result6 = tool.forward(filepath="non_existent_script.py") print(result6) assert not result6['success'] assert "File not found" in result6['error'] # Test 7: Provide both code_string and filepath print("\n--- Test 7: Both code_string and filepath ---") result7 = tool.forward(code_string="print('hello')", filepath=test_filename) print(result7) assert not result7['success'] assert "Provide either a code string or a filepath, not both" in result7['error'] # Test 8: Provide neither print("\n--- Test 8: Neither code_string nor filepath ---") result8 = tool.forward() print(result8) assert not result8['success'] assert "No code string or filepath provided" in result8['error'] # Test 9: Code that defines a function and calls it func_def_code = "def my_func(a, b): return a + b; print(my_func(3,4))" print("\n--- Test 9: Function Definition and Call ---") result9 = tool.forward(code_string=func_def_code) print(result9) assert result9['success'] assert "7" in result9['output'] # Test 10: Max output size # tool_max_output = CodeExecutionTool(max_output_size=50) # long_output_code = "for i in range(20): print(f'Line {i}')" # print("\n--- Test 10: Max Output Size ---") # result10 = tool_max_output.forward(code_string=long_output_code) # print(result10) # assert result10['success'] # assert "... [output truncated]" in result10['output'] # assert len(result10['output']) <= 50 + len("... [output truncated]") + 5 # a bit of leeway print("\nAll tests seem to have passed (check output for details).")