""" RestrictedPythonTools - Self-Healing Python Execution with Shell Backend This toolkit provides Python code execution with built-in directory constraints, path auto-correction, and self-healing capabilities. Uses RestrictedShellTools as the backend execution engine, mirroring Claude Code's architecture. """ import os import re import ast import sys import json import time import uuid import tempfile from pathlib import Path from typing import Optional, Dict, Any, List from agno.tools import Toolkit from agno.utils.log import logger from .shell_toolkit import RestrictedShellTools class RestrictedPythonTools(Toolkit): """ Self-healing Python execution toolkit with directory constraints. Uses RestrictedShellTools as backend for secure, constrained Python execution. Includes automatic path correction, package installation, and error recovery. """ def __init__(self, base_dir: Optional[Path] = None, **kwargs): """ Initialize the restricted Python toolkit. Args: base_dir: Base directory to constrain all Python operations to **kwargs: Additional arguments passed to parent Toolkit """ self.base_dir = Path(base_dir) if base_dir else Path.cwd() self.base_dir.mkdir(parents=True, exist_ok=True) # Initialize backend tools self.shell_tools = RestrictedShellTools(base_dir=self.base_dir) # Track installed packages to avoid redundant installations self.installed_packages = set() # Initialize toolkit with Python execution functions super().__init__( name="restricted_python_tools", tools=[ self.run_python_code, self.install_package, self.save_python_file, self.list_python_files, self.validate_python_syntax ], **kwargs ) logger.info(f"RestrictedPythonTools initialized with base_dir: {self.base_dir}") def run_python_code(self, code: str, timeout: int = 120) -> str: """ Execute Python code with self-healing and directory constraints. Args: code (str): Python code to execute timeout (int): Maximum execution time in seconds Returns: str: Output from code execution or error message """ try: # Step 1: Auto-correct and heal the code healed_code = self._heal_python_code(code) # Step 2: Validate syntax before execution syntax_result = self.validate_python_syntax(healed_code) if "Error" in syntax_result: return f"Syntax Error: {syntax_result}" # Step 3: Extract and auto-install required packages self._auto_install_packages(healed_code) # Step 4: Create temporary Python file temp_filename = f"temp_script_{uuid.uuid4().hex[:8]}.py" temp_filepath = self.base_dir / temp_filename try: # Save healed code to temporary file with open(temp_filepath, 'w', encoding='utf-8') as f: f.write(healed_code) logger.info(f"Executing Python code via shell backend: {temp_filename}") # Step 5: Execute via RestrictedShellTools backend execution_command = f"python3 {temp_filename}" result = self.shell_tools.run_shell_command(execution_command, timeout=timeout) # Step 6: Check for common errors and attempt recovery if self._has_execution_errors(result): recovery_result = self._attempt_error_recovery(healed_code, result, temp_filename, timeout) if recovery_result: result = recovery_result return result finally: # Cleanup temporary file if temp_filepath.exists(): temp_filepath.unlink() except Exception as e: error_msg = f"Error executing Python code: {str(e)}" logger.error(error_msg) return error_msg def _heal_python_code(self, code: str) -> str: """ Auto-correct common path and directory issues in Python code. Args: code (str): Original Python code Returns: str: Healed Python code with corrected paths """ healed_code = code # Path correction patterns path_corrections = [ # Fix relative paths that go outside base directory (r'\.\./', ''), (r'\.\./\.\./', ''), (r'\.\.\\', ''), # Convert absolute paths to relative paths within base directory (r'["\']\/[^"\']*\/([^"\'\/]+\.(xlsx?|csv|json|txt|py))["\']', r'"\1"'), # Fix common pandas path issues (r'pd\.to_excel\(["\'][^"\']*\/([^"\'\/]+\.xlsx?)["\']', r'pd.to_excel("\1"'), (r'pd\.read_excel\(["\'][^"\']*\/([^"\'\/]+\.xlsx?)["\']', r'pd.read_excel("\1"'), (r'pd\.to_csv\(["\'][^"\']*\/([^"\'\/]+\.csv)["\']', r'pd.to_csv("\1"'), # Fix file operations (r'open\(["\'][^"\']*\/([^"\'\/]+)["\']', r'open("\1"'), (r'with open\(["\'][^"\']*\/([^"\'\/]+)["\']', r'with open("\1"'), ] for pattern, replacement in path_corrections: healed_code = re.sub(pattern, replacement, healed_code) # Add working directory insurance at the beginning directory_insurance = f""" import os import sys # Ensure we're in the correct working directory base_dir = r'{self.base_dir}' if os.getcwd() != base_dir: os.chdir(base_dir) print(f"Working directory corrected to: {{os.getcwd()}}") """ # Add directory insurance to the beginning of the code healed_code = directory_insurance + healed_code logger.debug(f"Code healing applied - original length: {len(code)}, healed length: {len(healed_code)}") return healed_code def _extract_required_packages(self, code: str) -> List[str]: """ Extract package names from import statements in Python code. Args: code (str): Python code to analyze Returns: List[str]: List of package names that need to be installed """ packages = set() # Built-in modules that don't need installation builtin_modules = { 'os', 'sys', 'json', 'time', 'datetime', 'uuid', 'tempfile', 're', 'ast', 'pathlib', 'math', 'random', 'subprocess', 'collections', 'itertools', 'functools', 'logging', 'io', 'csv', 'xml', 'urllib', 'http', 'email', 'sqlite3' } # Common package mappings (import name -> pip package name) package_mappings = { 'pandas': 'pandas', 'numpy': 'numpy', 'openpyxl': 'openpyxl', 'xlsxwriter': 'xlsxwriter', 'matplotlib': 'matplotlib', 'seaborn': 'seaborn', 'plotly': 'plotly', 'requests': 'requests', 'beautifulsoup4': 'beautifulsoup4', 'bs4': 'beautifulsoup4', 'sklearn': 'scikit-learn', 'cv2': 'opencv-python', 'PIL': 'Pillow', 'yaml': 'PyYAML', } # Extract import statements using regex import_patterns = [ r'^import\s+([a-zA-Z_][a-zA-Z0-9_]*)', r'^from\s+([a-zA-Z_][a-zA-Z0-9_]*)\s+import', ] for line in code.split('\n'): line = line.strip() for pattern in import_patterns: match = re.match(pattern, line) if match: package_name = match.group(1) # Skip built-in modules if package_name in builtin_modules: continue # Map to pip package name if known pip_package = package_mappings.get(package_name, package_name) packages.add(pip_package) return list(packages) def _auto_install_packages(self, code: str) -> None: """ Automatically install required packages for the Python code. Args: code (str): Python code to analyze for package requirements """ required_packages = self._extract_required_packages(code) for package in required_packages: if package not in self.installed_packages: logger.info(f"Auto-installing package: {package}") install_result = self.install_package(package) if "successfully" in install_result.lower(): self.installed_packages.add(package) else: logger.warning(f"Failed to install package {package}: {install_result}") def _has_execution_errors(self, result: str) -> bool: """ Check if execution result contains errors that might be recoverable. Args: result (str): Execution result to check Returns: bool: True if recoverable errors are detected """ error_indicators = [ "ModuleNotFoundError", "ImportError", "FileNotFoundError", "PermissionError", "No such file or directory", ] return any(error in result for error in error_indicators) def _attempt_error_recovery(self, code: str, error_result: str, temp_filename: str, timeout: int) -> Optional[str]: """ Attempt to recover from execution errors. Args: code (str): Original code that failed error_result (str): Error message from failed execution temp_filename (str): Temporary file name used timeout (int): Execution timeout Returns: Optional[str]: Recovery result if successful, None if recovery failed """ try: # Recovery attempt 1: Install missing packages if "ModuleNotFoundError" in error_result or "ImportError" in error_result: logger.info("Attempting recovery: Installing missing packages") # Extract package name from error message missing_package_match = re.search(r"No module named '([^']+)'", error_result) if missing_package_match: missing_package = missing_package_match.group(1) install_result = self.install_package(missing_package) if "successfully" in install_result.lower(): logger.info(f"Recovery successful: Installed {missing_package}") # Retry execution retry_result = self.shell_tools.run_shell_command(f"python3 {temp_filename}", timeout=timeout) return retry_result # Recovery attempt 2: Fix file path issues if "FileNotFoundError" in error_result or "No such file or directory" in error_result: logger.info("Attempting recovery: Fixing file path issues") # Create any missing directories that might be referenced self.shell_tools.run_shell_command("mkdir -p data reports output") # Retry execution retry_result = self.shell_tools.run_shell_command(f"python3 {temp_filename}", timeout=timeout) return retry_result except Exception as e: logger.error(f"Error recovery failed: {str(e)}") return None def install_package(self, package_name: str) -> str: """ Install a Python package using pip via shell backend. Args: package_name (str): Name of the package to install Returns: str: Installation result message """ try: logger.info(f"Installing Python package: {package_name}") # Try multiple installation methods install_commands = [ f"pip3 install {package_name}", f"python3 -m pip install {package_name}", f"pip install {package_name}", ] for command in install_commands: result = self.shell_tools.run_shell_command(command, timeout=120) if "Successfully installed" in result or "already satisfied" in result: self.installed_packages.add(package_name) return f"Package '{package_name}' installed successfully" # If first method fails, try the next one if "error" not in result.lower(): break return f"Package installation failed: {result}" except Exception as e: error_msg = f"Error installing package '{package_name}': {str(e)}" logger.error(error_msg) return error_msg def save_python_file(self, filename: str, code: str) -> str: """ Save Python code to a file in the base directory. Args: filename (str): Name of the Python file code (str): Python code content Returns: str: Success/failure message """ try: if not filename.endswith('.py'): filename += '.py' filepath = self.base_dir / filename # Heal the code before saving healed_code = self._heal_python_code(code) with open(filepath, 'w', encoding='utf-8') as f: f.write(healed_code) logger.info(f"Python file saved: {filename}") return f"Python file '{filename}' saved successfully to {self.base_dir}" except Exception as e: error_msg = f"Error saving Python file '{filename}': {str(e)}" logger.error(error_msg) return error_msg def list_python_files(self) -> str: """ List all Python files in the base directory. Returns: str: List of Python files """ try: python_files = list(self.base_dir.glob("*.py")) if not python_files: return "No Python files found in the base directory" file_list = [] for file_path in python_files: file_stat = file_path.stat() file_info = f"{file_path.name} ({file_stat.st_size} bytes, modified: {time.ctime(file_stat.st_mtime)})" file_list.append(file_info) return "Python files in base directory:\n" + "\n".join(file_list) except Exception as e: error_msg = f"Error listing Python files: {str(e)}" logger.error(error_msg) return error_msg def validate_python_syntax(self, code: str) -> str: """ Validate Python code syntax without executing it. Args: code (str): Python code to validate Returns: str: Validation result message """ try: # Parse the code to check for syntax errors ast.parse(code) return "Python syntax is valid" except SyntaxError as e: error_msg = f"Syntax Error at line {e.lineno}: {e.msg}" logger.warning(f"Python syntax validation failed: {error_msg}") return error_msg except Exception as e: error_msg = f"Error validating Python syntax: {str(e)}" logger.error(error_msg) return error_msg def get_base_directory(self) -> str: """ Get the current base directory path. Returns: str: Absolute path of the base directory """ return str(self.base_dir.absolute()) def clear_temp_files(self) -> str: """ Clean up any temporary Python files in the base directory. Returns: str: Cleanup result message """ try: temp_files = list(self.base_dir.glob("temp_script_*.py")) if not temp_files: return "No temporary files to clean up" for temp_file in temp_files: temp_file.unlink() return f"Cleaned up {len(temp_files)} temporary Python files" except Exception as e: error_msg = f"Error cleaning up temporary files: {str(e)}" logger.error(error_msg) return error_msg