Data_Extractor_Using_Gemini / utils /restricted_python_tools.py
methunraj
feat: Implement revenue data organization workflow with JSON output
8b21729
"""
RestrictedPythonTools - Self-Healing Python Execution with Shell Backend
This toolkit provides Python code execution with built-in directory constraints,
path auto-correction, and self-healing capabilities. Uses RestrictedShellTools
as the backend execution engine, mirroring Claude Code's architecture.
"""
import os
import re
import ast
import sys
import json
import time
import uuid
import tempfile
from pathlib import Path
from typing import Optional, Dict, Any, List
from agno.tools import Toolkit
from agno.utils.log import logger
from .shell_toolkit import RestrictedShellTools
class RestrictedPythonTools(Toolkit):
"""
Self-healing Python execution toolkit with directory constraints.
Uses RestrictedShellTools as backend for secure, constrained Python execution.
Includes automatic path correction, package installation, and error recovery.
"""
def __init__(self, base_dir: Optional[Path] = None, **kwargs):
"""
Initialize the restricted Python toolkit.
Args:
base_dir: Base directory to constrain all Python operations to
**kwargs: Additional arguments passed to parent Toolkit
"""
self.base_dir = Path(base_dir) if base_dir else Path.cwd()
self.base_dir.mkdir(parents=True, exist_ok=True)
# Initialize backend tools
self.shell_tools = RestrictedShellTools(base_dir=self.base_dir)
# Track installed packages to avoid redundant installations
self.installed_packages = set()
# Initialize toolkit with Python execution functions
super().__init__(
name="restricted_python_tools",
tools=[
self.run_python_code,
self.install_package,
self.save_python_file,
self.list_python_files,
self.validate_python_syntax
],
**kwargs
)
logger.info(f"RestrictedPythonTools initialized with base_dir: {self.base_dir}")
def run_python_code(self, code: str, timeout: int = 120) -> str:
"""
Execute Python code with self-healing and directory constraints.
Args:
code (str): Python code to execute
timeout (int): Maximum execution time in seconds
Returns:
str: Output from code execution or error message
"""
try:
# Step 1: Auto-correct and heal the code
healed_code = self._heal_python_code(code)
# Step 2: Validate syntax before execution
syntax_result = self.validate_python_syntax(healed_code)
if "Error" in syntax_result:
return f"Syntax Error: {syntax_result}"
# Step 3: Extract and auto-install required packages
self._auto_install_packages(healed_code)
# Step 4: Create temporary Python file
temp_filename = f"temp_script_{uuid.uuid4().hex[:8]}.py"
temp_filepath = self.base_dir / temp_filename
try:
# Save healed code to temporary file
with open(temp_filepath, 'w', encoding='utf-8') as f:
f.write(healed_code)
logger.info(f"Executing Python code via shell backend: {temp_filename}")
# Step 5: Execute via RestrictedShellTools backend
execution_command = f"python3 {temp_filename}"
result = self.shell_tools.run_shell_command(execution_command, timeout=timeout)
# Step 6: Check for common errors and attempt recovery
if self._has_execution_errors(result):
recovery_result = self._attempt_error_recovery(healed_code, result, temp_filename, timeout)
if recovery_result:
result = recovery_result
return result
finally:
# Cleanup temporary file
if temp_filepath.exists():
temp_filepath.unlink()
except Exception as e:
error_msg = f"Error executing Python code: {str(e)}"
logger.error(error_msg)
return error_msg
def _heal_python_code(self, code: str) -> str:
"""
Auto-correct common path and directory issues in Python code.
Args:
code (str): Original Python code
Returns:
str: Healed Python code with corrected paths
"""
healed_code = code
# Path correction patterns
path_corrections = [
# Fix relative paths that go outside base directory
(r'\.\./', ''),
(r'\.\./\.\./', ''),
(r'\.\.\\', ''),
# Convert absolute paths to relative paths within base directory
(r'["\']\/[^"\']*\/([^"\'\/]+\.(xlsx?|csv|json|txt|py))["\']', r'"\1"'),
# Fix common pandas path issues
(r'pd\.to_excel\(["\'][^"\']*\/([^"\'\/]+\.xlsx?)["\']', r'pd.to_excel("\1"'),
(r'pd\.read_excel\(["\'][^"\']*\/([^"\'\/]+\.xlsx?)["\']', r'pd.read_excel("\1"'),
(r'pd\.to_csv\(["\'][^"\']*\/([^"\'\/]+\.csv)["\']', r'pd.to_csv("\1"'),
# Fix file operations
(r'open\(["\'][^"\']*\/([^"\'\/]+)["\']', r'open("\1"'),
(r'with open\(["\'][^"\']*\/([^"\'\/]+)["\']', r'with open("\1"'),
]
for pattern, replacement in path_corrections:
healed_code = re.sub(pattern, replacement, healed_code)
# Add working directory insurance at the beginning
directory_insurance = f"""
import os
import sys
# Ensure we're in the correct working directory
base_dir = r'{self.base_dir}'
if os.getcwd() != base_dir:
os.chdir(base_dir)
print(f"Working directory corrected to: {{os.getcwd()}}")
"""
# Add directory insurance to the beginning of the code
healed_code = directory_insurance + healed_code
logger.debug(f"Code healing applied - original length: {len(code)}, healed length: {len(healed_code)}")
return healed_code
def _extract_required_packages(self, code: str) -> List[str]:
"""
Extract package names from import statements in Python code.
Args:
code (str): Python code to analyze
Returns:
List[str]: List of package names that need to be installed
"""
packages = set()
# Built-in modules that don't need installation
builtin_modules = {
'os', 'sys', 'json', 'time', 'datetime', 'uuid', 'tempfile',
're', 'ast', 'pathlib', 'math', 'random', 'subprocess',
'collections', 'itertools', 'functools', 'logging', 'io',
'csv', 'xml', 'urllib', 'http', 'email', 'sqlite3'
}
# Common package mappings (import name -> pip package name)
package_mappings = {
'pandas': 'pandas',
'numpy': 'numpy',
'openpyxl': 'openpyxl',
'xlsxwriter': 'xlsxwriter',
'matplotlib': 'matplotlib',
'seaborn': 'seaborn',
'plotly': 'plotly',
'requests': 'requests',
'beautifulsoup4': 'beautifulsoup4',
'bs4': 'beautifulsoup4',
'sklearn': 'scikit-learn',
'cv2': 'opencv-python',
'PIL': 'Pillow',
'yaml': 'PyYAML',
}
# Extract import statements using regex
import_patterns = [
r'^import\s+([a-zA-Z_][a-zA-Z0-9_]*)',
r'^from\s+([a-zA-Z_][a-zA-Z0-9_]*)\s+import',
]
for line in code.split('\n'):
line = line.strip()
for pattern in import_patterns:
match = re.match(pattern, line)
if match:
package_name = match.group(1)
# Skip built-in modules
if package_name in builtin_modules:
continue
# Map to pip package name if known
pip_package = package_mappings.get(package_name, package_name)
packages.add(pip_package)
return list(packages)
def _auto_install_packages(self, code: str) -> None:
"""
Automatically install required packages for the Python code.
Args:
code (str): Python code to analyze for package requirements
"""
required_packages = self._extract_required_packages(code)
for package in required_packages:
if package not in self.installed_packages:
logger.info(f"Auto-installing package: {package}")
install_result = self.install_package(package)
if "successfully" in install_result.lower():
self.installed_packages.add(package)
else:
logger.warning(f"Failed to install package {package}: {install_result}")
def _has_execution_errors(self, result: str) -> bool:
"""
Check if execution result contains errors that might be recoverable.
Args:
result (str): Execution result to check
Returns:
bool: True if recoverable errors are detected
"""
error_indicators = [
"ModuleNotFoundError",
"ImportError",
"FileNotFoundError",
"PermissionError",
"No such file or directory",
]
return any(error in result for error in error_indicators)
def _attempt_error_recovery(self, code: str, error_result: str, temp_filename: str, timeout: int) -> Optional[str]:
"""
Attempt to recover from execution errors.
Args:
code (str): Original code that failed
error_result (str): Error message from failed execution
temp_filename (str): Temporary file name used
timeout (int): Execution timeout
Returns:
Optional[str]: Recovery result if successful, None if recovery failed
"""
try:
# Recovery attempt 1: Install missing packages
if "ModuleNotFoundError" in error_result or "ImportError" in error_result:
logger.info("Attempting recovery: Installing missing packages")
# Extract package name from error message
missing_package_match = re.search(r"No module named '([^']+)'", error_result)
if missing_package_match:
missing_package = missing_package_match.group(1)
install_result = self.install_package(missing_package)
if "successfully" in install_result.lower():
logger.info(f"Recovery successful: Installed {missing_package}")
# Retry execution
retry_result = self.shell_tools.run_shell_command(f"python3 {temp_filename}", timeout=timeout)
return retry_result
# Recovery attempt 2: Fix file path issues
if "FileNotFoundError" in error_result or "No such file or directory" in error_result:
logger.info("Attempting recovery: Fixing file path issues")
# Create any missing directories that might be referenced
self.shell_tools.run_shell_command("mkdir -p data reports output")
# Retry execution
retry_result = self.shell_tools.run_shell_command(f"python3 {temp_filename}", timeout=timeout)
return retry_result
except Exception as e:
logger.error(f"Error recovery failed: {str(e)}")
return None
def install_package(self, package_name: str) -> str:
"""
Install a Python package using pip via shell backend.
Args:
package_name (str): Name of the package to install
Returns:
str: Installation result message
"""
try:
logger.info(f"Installing Python package: {package_name}")
# Try multiple installation methods
install_commands = [
f"pip3 install {package_name}",
f"python3 -m pip install {package_name}",
f"pip install {package_name}",
]
for command in install_commands:
result = self.shell_tools.run_shell_command(command, timeout=120)
if "Successfully installed" in result or "already satisfied" in result:
self.installed_packages.add(package_name)
return f"Package '{package_name}' installed successfully"
# If first method fails, try the next one
if "error" not in result.lower():
break
return f"Package installation failed: {result}"
except Exception as e:
error_msg = f"Error installing package '{package_name}': {str(e)}"
logger.error(error_msg)
return error_msg
def save_python_file(self, filename: str, code: str) -> str:
"""
Save Python code to a file in the base directory.
Args:
filename (str): Name of the Python file
code (str): Python code content
Returns:
str: Success/failure message
"""
try:
if not filename.endswith('.py'):
filename += '.py'
filepath = self.base_dir / filename
# Heal the code before saving
healed_code = self._heal_python_code(code)
with open(filepath, 'w', encoding='utf-8') as f:
f.write(healed_code)
logger.info(f"Python file saved: {filename}")
return f"Python file '{filename}' saved successfully to {self.base_dir}"
except Exception as e:
error_msg = f"Error saving Python file '{filename}': {str(e)}"
logger.error(error_msg)
return error_msg
def list_python_files(self) -> str:
"""
List all Python files in the base directory.
Returns:
str: List of Python files
"""
try:
python_files = list(self.base_dir.glob("*.py"))
if not python_files:
return "No Python files found in the base directory"
file_list = []
for file_path in python_files:
file_stat = file_path.stat()
file_info = f"{file_path.name} ({file_stat.st_size} bytes, modified: {time.ctime(file_stat.st_mtime)})"
file_list.append(file_info)
return "Python files in base directory:\n" + "\n".join(file_list)
except Exception as e:
error_msg = f"Error listing Python files: {str(e)}"
logger.error(error_msg)
return error_msg
def validate_python_syntax(self, code: str) -> str:
"""
Validate Python code syntax without executing it.
Args:
code (str): Python code to validate
Returns:
str: Validation result message
"""
try:
# Parse the code to check for syntax errors
ast.parse(code)
return "Python syntax is valid"
except SyntaxError as e:
error_msg = f"Syntax Error at line {e.lineno}: {e.msg}"
logger.warning(f"Python syntax validation failed: {error_msg}")
return error_msg
except Exception as e:
error_msg = f"Error validating Python syntax: {str(e)}"
logger.error(error_msg)
return error_msg
def get_base_directory(self) -> str:
"""
Get the current base directory path.
Returns:
str: Absolute path of the base directory
"""
return str(self.base_dir.absolute())
def clear_temp_files(self) -> str:
"""
Clean up any temporary Python files in the base directory.
Returns:
str: Cleanup result message
"""
try:
temp_files = list(self.base_dir.glob("temp_script_*.py"))
if not temp_files:
return "No temporary files to clean up"
for temp_file in temp_files:
temp_file.unlink()
return f"Cleaned up {len(temp_files)} temporary Python files"
except Exception as e:
error_msg = f"Error cleaning up temporary files: {str(e)}"
logger.error(error_msg)
return error_msg