|
""" |
|
RestrictedPythonTools - Self-Healing Python Execution with Shell Backend |
|
|
|
This toolkit provides Python code execution with built-in directory constraints, |
|
path auto-correction, and self-healing capabilities. Uses RestrictedShellTools |
|
as the backend execution engine, mirroring Claude Code's architecture. |
|
""" |
|
|
|
import os |
|
import re |
|
import ast |
|
import sys |
|
import json |
|
import time |
|
import uuid |
|
import tempfile |
|
from pathlib import Path |
|
from typing import Optional, Dict, Any, List |
|
from agno.tools import Toolkit |
|
from agno.utils.log import logger |
|
|
|
from .shell_toolkit import RestrictedShellTools |
|
|
|
|
|
class RestrictedPythonTools(Toolkit): |
|
""" |
|
Self-healing Python execution toolkit with directory constraints. |
|
|
|
Uses RestrictedShellTools as backend for secure, constrained Python execution. |
|
Includes automatic path correction, package installation, and error recovery. |
|
""" |
|
|
|
def __init__(self, base_dir: Optional[Path] = None, **kwargs): |
|
""" |
|
Initialize the restricted Python toolkit. |
|
|
|
Args: |
|
base_dir: Base directory to constrain all Python operations to |
|
**kwargs: Additional arguments passed to parent Toolkit |
|
""" |
|
self.base_dir = Path(base_dir) if base_dir else Path.cwd() |
|
self.base_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
self.shell_tools = RestrictedShellTools(base_dir=self.base_dir) |
|
|
|
|
|
self.installed_packages = set() |
|
|
|
|
|
super().__init__( |
|
name="restricted_python_tools", |
|
tools=[ |
|
self.run_python_code, |
|
self.install_package, |
|
self.save_python_file, |
|
self.list_python_files, |
|
self.validate_python_syntax |
|
], |
|
**kwargs |
|
) |
|
|
|
logger.info(f"RestrictedPythonTools initialized with base_dir: {self.base_dir}") |
|
|
|
def run_python_code(self, code: str, timeout: int = 120) -> str: |
|
""" |
|
Execute Python code with self-healing and directory constraints. |
|
|
|
Args: |
|
code (str): Python code to execute |
|
timeout (int): Maximum execution time in seconds |
|
|
|
Returns: |
|
str: Output from code execution or error message |
|
""" |
|
try: |
|
|
|
healed_code = self._heal_python_code(code) |
|
|
|
|
|
syntax_result = self.validate_python_syntax(healed_code) |
|
if "Error" in syntax_result: |
|
return f"Syntax Error: {syntax_result}" |
|
|
|
|
|
self._auto_install_packages(healed_code) |
|
|
|
|
|
temp_filename = f"temp_script_{uuid.uuid4().hex[:8]}.py" |
|
temp_filepath = self.base_dir / temp_filename |
|
|
|
try: |
|
|
|
with open(temp_filepath, 'w', encoding='utf-8') as f: |
|
f.write(healed_code) |
|
|
|
logger.info(f"Executing Python code via shell backend: {temp_filename}") |
|
|
|
|
|
execution_command = f"python3 {temp_filename}" |
|
result = self.shell_tools.run_shell_command(execution_command, timeout=timeout) |
|
|
|
|
|
if self._has_execution_errors(result): |
|
recovery_result = self._attempt_error_recovery(healed_code, result, temp_filename, timeout) |
|
if recovery_result: |
|
result = recovery_result |
|
|
|
return result |
|
|
|
finally: |
|
|
|
if temp_filepath.exists(): |
|
temp_filepath.unlink() |
|
|
|
except Exception as e: |
|
error_msg = f"Error executing Python code: {str(e)}" |
|
logger.error(error_msg) |
|
return error_msg |
|
|
|
def _heal_python_code(self, code: str) -> str: |
|
""" |
|
Auto-correct common path and directory issues in Python code. |
|
|
|
Args: |
|
code (str): Original Python code |
|
|
|
Returns: |
|
str: Healed Python code with corrected paths |
|
""" |
|
healed_code = code |
|
|
|
|
|
path_corrections = [ |
|
|
|
(r'\.\./', ''), |
|
(r'\.\./\.\./', ''), |
|
(r'\.\.\\', ''), |
|
|
|
|
|
(r'["\']\/[^"\']*\/([^"\'\/]+\.(xlsx?|csv|json|txt|py))["\']', r'"\1"'), |
|
|
|
|
|
(r'pd\.to_excel\(["\'][^"\']*\/([^"\'\/]+\.xlsx?)["\']', r'pd.to_excel("\1"'), |
|
(r'pd\.read_excel\(["\'][^"\']*\/([^"\'\/]+\.xlsx?)["\']', r'pd.read_excel("\1"'), |
|
(r'pd\.to_csv\(["\'][^"\']*\/([^"\'\/]+\.csv)["\']', r'pd.to_csv("\1"'), |
|
|
|
|
|
(r'open\(["\'][^"\']*\/([^"\'\/]+)["\']', r'open("\1"'), |
|
(r'with open\(["\'][^"\']*\/([^"\'\/]+)["\']', r'with open("\1"'), |
|
] |
|
|
|
for pattern, replacement in path_corrections: |
|
healed_code = re.sub(pattern, replacement, healed_code) |
|
|
|
|
|
directory_insurance = f""" |
|
import os |
|
import sys |
|
|
|
# Ensure we're in the correct working directory |
|
base_dir = r'{self.base_dir}' |
|
if os.getcwd() != base_dir: |
|
os.chdir(base_dir) |
|
print(f"Working directory corrected to: {{os.getcwd()}}") |
|
|
|
""" |
|
|
|
|
|
healed_code = directory_insurance + healed_code |
|
|
|
logger.debug(f"Code healing applied - original length: {len(code)}, healed length: {len(healed_code)}") |
|
return healed_code |
|
|
|
def _extract_required_packages(self, code: str) -> List[str]: |
|
""" |
|
Extract package names from import statements in Python code. |
|
|
|
Args: |
|
code (str): Python code to analyze |
|
|
|
Returns: |
|
List[str]: List of package names that need to be installed |
|
""" |
|
packages = set() |
|
|
|
|
|
builtin_modules = { |
|
'os', 'sys', 'json', 'time', 'datetime', 'uuid', 'tempfile', |
|
're', 'ast', 'pathlib', 'math', 'random', 'subprocess', |
|
'collections', 'itertools', 'functools', 'logging', 'io', |
|
'csv', 'xml', 'urllib', 'http', 'email', 'sqlite3' |
|
} |
|
|
|
|
|
package_mappings = { |
|
'pandas': 'pandas', |
|
'numpy': 'numpy', |
|
'openpyxl': 'openpyxl', |
|
'xlsxwriter': 'xlsxwriter', |
|
'matplotlib': 'matplotlib', |
|
'seaborn': 'seaborn', |
|
'plotly': 'plotly', |
|
'requests': 'requests', |
|
'beautifulsoup4': 'beautifulsoup4', |
|
'bs4': 'beautifulsoup4', |
|
'sklearn': 'scikit-learn', |
|
'cv2': 'opencv-python', |
|
'PIL': 'Pillow', |
|
'yaml': 'PyYAML', |
|
} |
|
|
|
|
|
import_patterns = [ |
|
r'^import\s+([a-zA-Z_][a-zA-Z0-9_]*)', |
|
r'^from\s+([a-zA-Z_][a-zA-Z0-9_]*)\s+import', |
|
] |
|
|
|
for line in code.split('\n'): |
|
line = line.strip() |
|
for pattern in import_patterns: |
|
match = re.match(pattern, line) |
|
if match: |
|
package_name = match.group(1) |
|
|
|
|
|
if package_name in builtin_modules: |
|
continue |
|
|
|
|
|
pip_package = package_mappings.get(package_name, package_name) |
|
packages.add(pip_package) |
|
|
|
return list(packages) |
|
|
|
def _auto_install_packages(self, code: str) -> None: |
|
""" |
|
Automatically install required packages for the Python code. |
|
|
|
Args: |
|
code (str): Python code to analyze for package requirements |
|
""" |
|
required_packages = self._extract_required_packages(code) |
|
|
|
for package in required_packages: |
|
if package not in self.installed_packages: |
|
logger.info(f"Auto-installing package: {package}") |
|
install_result = self.install_package(package) |
|
if "successfully" in install_result.lower(): |
|
self.installed_packages.add(package) |
|
else: |
|
logger.warning(f"Failed to install package {package}: {install_result}") |
|
|
|
def _has_execution_errors(self, result: str) -> bool: |
|
""" |
|
Check if execution result contains errors that might be recoverable. |
|
|
|
Args: |
|
result (str): Execution result to check |
|
|
|
Returns: |
|
bool: True if recoverable errors are detected |
|
""" |
|
error_indicators = [ |
|
"ModuleNotFoundError", |
|
"ImportError", |
|
"FileNotFoundError", |
|
"PermissionError", |
|
"No such file or directory", |
|
] |
|
|
|
return any(error in result for error in error_indicators) |
|
|
|
def _attempt_error_recovery(self, code: str, error_result: str, temp_filename: str, timeout: int) -> Optional[str]: |
|
""" |
|
Attempt to recover from execution errors. |
|
|
|
Args: |
|
code (str): Original code that failed |
|
error_result (str): Error message from failed execution |
|
temp_filename (str): Temporary file name used |
|
timeout (int): Execution timeout |
|
|
|
Returns: |
|
Optional[str]: Recovery result if successful, None if recovery failed |
|
""" |
|
try: |
|
|
|
if "ModuleNotFoundError" in error_result or "ImportError" in error_result: |
|
logger.info("Attempting recovery: Installing missing packages") |
|
|
|
|
|
missing_package_match = re.search(r"No module named '([^']+)'", error_result) |
|
if missing_package_match: |
|
missing_package = missing_package_match.group(1) |
|
install_result = self.install_package(missing_package) |
|
|
|
if "successfully" in install_result.lower(): |
|
logger.info(f"Recovery successful: Installed {missing_package}") |
|
|
|
retry_result = self.shell_tools.run_shell_command(f"python3 {temp_filename}", timeout=timeout) |
|
return retry_result |
|
|
|
|
|
if "FileNotFoundError" in error_result or "No such file or directory" in error_result: |
|
logger.info("Attempting recovery: Fixing file path issues") |
|
|
|
|
|
self.shell_tools.run_shell_command("mkdir -p data reports output") |
|
|
|
|
|
retry_result = self.shell_tools.run_shell_command(f"python3 {temp_filename}", timeout=timeout) |
|
return retry_result |
|
|
|
except Exception as e: |
|
logger.error(f"Error recovery failed: {str(e)}") |
|
|
|
return None |
|
|
|
def install_package(self, package_name: str) -> str: |
|
""" |
|
Install a Python package using pip via shell backend. |
|
|
|
Args: |
|
package_name (str): Name of the package to install |
|
|
|
Returns: |
|
str: Installation result message |
|
""" |
|
try: |
|
logger.info(f"Installing Python package: {package_name}") |
|
|
|
|
|
install_commands = [ |
|
f"pip3 install {package_name}", |
|
f"python3 -m pip install {package_name}", |
|
f"pip install {package_name}", |
|
] |
|
|
|
for command in install_commands: |
|
result = self.shell_tools.run_shell_command(command, timeout=120) |
|
|
|
if "Successfully installed" in result or "already satisfied" in result: |
|
self.installed_packages.add(package_name) |
|
return f"Package '{package_name}' installed successfully" |
|
|
|
|
|
if "error" not in result.lower(): |
|
break |
|
|
|
return f"Package installation failed: {result}" |
|
|
|
except Exception as e: |
|
error_msg = f"Error installing package '{package_name}': {str(e)}" |
|
logger.error(error_msg) |
|
return error_msg |
|
|
|
def save_python_file(self, filename: str, code: str) -> str: |
|
""" |
|
Save Python code to a file in the base directory. |
|
|
|
Args: |
|
filename (str): Name of the Python file |
|
code (str): Python code content |
|
|
|
Returns: |
|
str: Success/failure message |
|
""" |
|
try: |
|
if not filename.endswith('.py'): |
|
filename += '.py' |
|
|
|
filepath = self.base_dir / filename |
|
|
|
|
|
healed_code = self._heal_python_code(code) |
|
|
|
with open(filepath, 'w', encoding='utf-8') as f: |
|
f.write(healed_code) |
|
|
|
logger.info(f"Python file saved: {filename}") |
|
return f"Python file '{filename}' saved successfully to {self.base_dir}" |
|
|
|
except Exception as e: |
|
error_msg = f"Error saving Python file '{filename}': {str(e)}" |
|
logger.error(error_msg) |
|
return error_msg |
|
|
|
def list_python_files(self) -> str: |
|
""" |
|
List all Python files in the base directory. |
|
|
|
Returns: |
|
str: List of Python files |
|
""" |
|
try: |
|
python_files = list(self.base_dir.glob("*.py")) |
|
|
|
if not python_files: |
|
return "No Python files found in the base directory" |
|
|
|
file_list = [] |
|
for file_path in python_files: |
|
file_stat = file_path.stat() |
|
file_info = f"{file_path.name} ({file_stat.st_size} bytes, modified: {time.ctime(file_stat.st_mtime)})" |
|
file_list.append(file_info) |
|
|
|
return "Python files in base directory:\n" + "\n".join(file_list) |
|
|
|
except Exception as e: |
|
error_msg = f"Error listing Python files: {str(e)}" |
|
logger.error(error_msg) |
|
return error_msg |
|
|
|
def validate_python_syntax(self, code: str) -> str: |
|
""" |
|
Validate Python code syntax without executing it. |
|
|
|
Args: |
|
code (str): Python code to validate |
|
|
|
Returns: |
|
str: Validation result message |
|
""" |
|
try: |
|
|
|
ast.parse(code) |
|
return "Python syntax is valid" |
|
|
|
except SyntaxError as e: |
|
error_msg = f"Syntax Error at line {e.lineno}: {e.msg}" |
|
logger.warning(f"Python syntax validation failed: {error_msg}") |
|
return error_msg |
|
|
|
except Exception as e: |
|
error_msg = f"Error validating Python syntax: {str(e)}" |
|
logger.error(error_msg) |
|
return error_msg |
|
|
|
def get_base_directory(self) -> str: |
|
""" |
|
Get the current base directory path. |
|
|
|
Returns: |
|
str: Absolute path of the base directory |
|
""" |
|
return str(self.base_dir.absolute()) |
|
|
|
def clear_temp_files(self) -> str: |
|
""" |
|
Clean up any temporary Python files in the base directory. |
|
|
|
Returns: |
|
str: Cleanup result message |
|
""" |
|
try: |
|
temp_files = list(self.base_dir.glob("temp_script_*.py")) |
|
|
|
if not temp_files: |
|
return "No temporary files to clean up" |
|
|
|
for temp_file in temp_files: |
|
temp_file.unlink() |
|
|
|
return f"Cleaned up {len(temp_files)} temporary Python files" |
|
|
|
except Exception as e: |
|
error_msg = f"Error cleaning up temporary files: {str(e)}" |
|
logger.error(error_msg) |
|
return error_msg |