File size: 17,519 Bytes
8b21729
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
"""
RestrictedPythonTools - Self-Healing Python Execution with Shell Backend

This toolkit provides Python code execution with built-in directory constraints,
path auto-correction, and self-healing capabilities. Uses RestrictedShellTools
as the backend execution engine, mirroring Claude Code's architecture.
"""

import os
import re
import ast
import sys
import json
import time
import uuid
import tempfile
from pathlib import Path
from typing import Optional, Dict, Any, List
from agno.tools import Toolkit
from agno.utils.log import logger

from .shell_toolkit import RestrictedShellTools


class RestrictedPythonTools(Toolkit):
    """
    Self-healing Python execution toolkit with directory constraints.
    
    Uses RestrictedShellTools as backend for secure, constrained Python execution.
    Includes automatic path correction, package installation, and error recovery.
    """
    
    def __init__(self, base_dir: Optional[Path] = None, **kwargs):
        """
        Initialize the restricted Python toolkit.
        
        Args:
            base_dir: Base directory to constrain all Python operations to
            **kwargs: Additional arguments passed to parent Toolkit
        """
        self.base_dir = Path(base_dir) if base_dir else Path.cwd()
        self.base_dir.mkdir(parents=True, exist_ok=True)
        
        # Initialize backend tools
        self.shell_tools = RestrictedShellTools(base_dir=self.base_dir)
        
        # Track installed packages to avoid redundant installations
        self.installed_packages = set()
        
        # Initialize toolkit with Python execution functions
        super().__init__(
            name="restricted_python_tools",
            tools=[
                self.run_python_code,
                self.install_package,
                self.save_python_file,
                self.list_python_files,
                self.validate_python_syntax
            ],
            **kwargs
        )
        
        logger.info(f"RestrictedPythonTools initialized with base_dir: {self.base_dir}")
    
    def run_python_code(self, code: str, timeout: int = 120) -> str:
        """
        Execute Python code with self-healing and directory constraints.
        
        Args:
            code (str): Python code to execute
            timeout (int): Maximum execution time in seconds
            
        Returns:
            str: Output from code execution or error message
        """
        try:
            # Step 1: Auto-correct and heal the code
            healed_code = self._heal_python_code(code)
            
            # Step 2: Validate syntax before execution
            syntax_result = self.validate_python_syntax(healed_code)
            if "Error" in syntax_result:
                return f"Syntax Error: {syntax_result}"
            
            # Step 3: Extract and auto-install required packages
            self._auto_install_packages(healed_code)
            
            # Step 4: Create temporary Python file
            temp_filename = f"temp_script_{uuid.uuid4().hex[:8]}.py"
            temp_filepath = self.base_dir / temp_filename
            
            try:
                # Save healed code to temporary file
                with open(temp_filepath, 'w', encoding='utf-8') as f:
                    f.write(healed_code)
                
                logger.info(f"Executing Python code via shell backend: {temp_filename}")
                
                # Step 5: Execute via RestrictedShellTools backend
                execution_command = f"python3 {temp_filename}"
                result = self.shell_tools.run_shell_command(execution_command, timeout=timeout)
                
                # Step 6: Check for common errors and attempt recovery
                if self._has_execution_errors(result):
                    recovery_result = self._attempt_error_recovery(healed_code, result, temp_filename, timeout)
                    if recovery_result:
                        result = recovery_result
                
                return result
                
            finally:
                # Cleanup temporary file
                if temp_filepath.exists():
                    temp_filepath.unlink()
                    
        except Exception as e:
            error_msg = f"Error executing Python code: {str(e)}"
            logger.error(error_msg)
            return error_msg
    
    def _heal_python_code(self, code: str) -> str:
        """
        Auto-correct common path and directory issues in Python code.
        
        Args:
            code (str): Original Python code
            
        Returns:
            str: Healed Python code with corrected paths
        """
        healed_code = code
        
        # Path correction patterns
        path_corrections = [
            # Fix relative paths that go outside base directory
            (r'\.\./', ''),
            (r'\.\./\.\./', ''),
            (r'\.\.\\', ''),
            
            # Convert absolute paths to relative paths within base directory
            (r'["\']\/[^"\']*\/([^"\'\/]+\.(xlsx?|csv|json|txt|py))["\']', r'"\1"'),
            
            # Fix common pandas path issues
            (r'pd\.to_excel\(["\'][^"\']*\/([^"\'\/]+\.xlsx?)["\']', r'pd.to_excel("\1"'),
            (r'pd\.read_excel\(["\'][^"\']*\/([^"\'\/]+\.xlsx?)["\']', r'pd.read_excel("\1"'),
            (r'pd\.to_csv\(["\'][^"\']*\/([^"\'\/]+\.csv)["\']', r'pd.to_csv("\1"'),
            
            # Fix file operations
            (r'open\(["\'][^"\']*\/([^"\'\/]+)["\']', r'open("\1"'),
            (r'with open\(["\'][^"\']*\/([^"\'\/]+)["\']', r'with open("\1"'),
        ]
        
        for pattern, replacement in path_corrections:
            healed_code = re.sub(pattern, replacement, healed_code)
        
        # Add working directory insurance at the beginning
        directory_insurance = f"""
import os
import sys

# Ensure we're in the correct working directory
base_dir = r'{self.base_dir}'
if os.getcwd() != base_dir:
    os.chdir(base_dir)
    print(f"Working directory corrected to: {{os.getcwd()}}")

"""
        
        # Add directory insurance to the beginning of the code
        healed_code = directory_insurance + healed_code
        
        logger.debug(f"Code healing applied - original length: {len(code)}, healed length: {len(healed_code)}")
        return healed_code
    
    def _extract_required_packages(self, code: str) -> List[str]:
        """
        Extract package names from import statements in Python code.
        
        Args:
            code (str): Python code to analyze
            
        Returns:
            List[str]: List of package names that need to be installed
        """
        packages = set()
        
        # Built-in modules that don't need installation
        builtin_modules = {
            'os', 'sys', 'json', 'time', 'datetime', 'uuid', 'tempfile', 
            're', 'ast', 'pathlib', 'math', 'random', 'subprocess', 
            'collections', 'itertools', 'functools', 'logging', 'io',
            'csv', 'xml', 'urllib', 'http', 'email', 'sqlite3'
        }
        
        # Common package mappings (import name -> pip package name)
        package_mappings = {
            'pandas': 'pandas',
            'numpy': 'numpy', 
            'openpyxl': 'openpyxl',
            'xlsxwriter': 'xlsxwriter',
            'matplotlib': 'matplotlib',
            'seaborn': 'seaborn',
            'plotly': 'plotly',
            'requests': 'requests',
            'beautifulsoup4': 'beautifulsoup4',
            'bs4': 'beautifulsoup4',
            'sklearn': 'scikit-learn',
            'cv2': 'opencv-python',
            'PIL': 'Pillow',
            'yaml': 'PyYAML',
        }
        
        # Extract import statements using regex
        import_patterns = [
            r'^import\s+([a-zA-Z_][a-zA-Z0-9_]*)',
            r'^from\s+([a-zA-Z_][a-zA-Z0-9_]*)\s+import',
        ]
        
        for line in code.split('\n'):
            line = line.strip()
            for pattern in import_patterns:
                match = re.match(pattern, line)
                if match:
                    package_name = match.group(1)
                    
                    # Skip built-in modules
                    if package_name in builtin_modules:
                        continue
                    
                    # Map to pip package name if known
                    pip_package = package_mappings.get(package_name, package_name)
                    packages.add(pip_package)
        
        return list(packages)
    
    def _auto_install_packages(self, code: str) -> None:
        """
        Automatically install required packages for the Python code.
        
        Args:
            code (str): Python code to analyze for package requirements
        """
        required_packages = self._extract_required_packages(code)
        
        for package in required_packages:
            if package not in self.installed_packages:
                logger.info(f"Auto-installing package: {package}")
                install_result = self.install_package(package)
                if "successfully" in install_result.lower():
                    self.installed_packages.add(package)
                else:
                    logger.warning(f"Failed to install package {package}: {install_result}")
    
    def _has_execution_errors(self, result: str) -> bool:
        """
        Check if execution result contains errors that might be recoverable.
        
        Args:
            result (str): Execution result to check
            
        Returns:
            bool: True if recoverable errors are detected
        """
        error_indicators = [
            "ModuleNotFoundError",
            "ImportError",
            "FileNotFoundError",
            "PermissionError",
            "No such file or directory",
        ]
        
        return any(error in result for error in error_indicators)
    
    def _attempt_error_recovery(self, code: str, error_result: str, temp_filename: str, timeout: int) -> Optional[str]:
        """
        Attempt to recover from execution errors.
        
        Args:
            code (str): Original code that failed
            error_result (str): Error message from failed execution
            temp_filename (str): Temporary file name used
            timeout (int): Execution timeout
            
        Returns:
            Optional[str]: Recovery result if successful, None if recovery failed
        """
        try:
            # Recovery attempt 1: Install missing packages
            if "ModuleNotFoundError" in error_result or "ImportError" in error_result:
                logger.info("Attempting recovery: Installing missing packages")
                
                # Extract package name from error message
                missing_package_match = re.search(r"No module named '([^']+)'", error_result)
                if missing_package_match:
                    missing_package = missing_package_match.group(1)
                    install_result = self.install_package(missing_package)
                    
                    if "successfully" in install_result.lower():
                        logger.info(f"Recovery successful: Installed {missing_package}")
                        # Retry execution
                        retry_result = self.shell_tools.run_shell_command(f"python3 {temp_filename}", timeout=timeout)
                        return retry_result
            
            # Recovery attempt 2: Fix file path issues
            if "FileNotFoundError" in error_result or "No such file or directory" in error_result:
                logger.info("Attempting recovery: Fixing file path issues")
                
                # Create any missing directories that might be referenced
                self.shell_tools.run_shell_command("mkdir -p data reports output")
                
                # Retry execution
                retry_result = self.shell_tools.run_shell_command(f"python3 {temp_filename}", timeout=timeout)
                return retry_result
            
        except Exception as e:
            logger.error(f"Error recovery failed: {str(e)}")
        
        return None
    
    def install_package(self, package_name: str) -> str:
        """
        Install a Python package using pip via shell backend.
        
        Args:
            package_name (str): Name of the package to install
            
        Returns:
            str: Installation result message
        """
        try:
            logger.info(f"Installing Python package: {package_name}")
            
            # Try multiple installation methods
            install_commands = [
                f"pip3 install {package_name}",
                f"python3 -m pip install {package_name}",
                f"pip install {package_name}",
            ]
            
            for command in install_commands:
                result = self.shell_tools.run_shell_command(command, timeout=120)
                
                if "Successfully installed" in result or "already satisfied" in result:
                    self.installed_packages.add(package_name)
                    return f"Package '{package_name}' installed successfully"
                
                # If first method fails, try the next one
                if "error" not in result.lower():
                    break
            
            return f"Package installation failed: {result}"
            
        except Exception as e:
            error_msg = f"Error installing package '{package_name}': {str(e)}"
            logger.error(error_msg)
            return error_msg
    
    def save_python_file(self, filename: str, code: str) -> str:
        """
        Save Python code to a file in the base directory.
        
        Args:
            filename (str): Name of the Python file
            code (str): Python code content
            
        Returns:
            str: Success/failure message
        """
        try:
            if not filename.endswith('.py'):
                filename += '.py'
            
            filepath = self.base_dir / filename
            
            # Heal the code before saving
            healed_code = self._heal_python_code(code)
            
            with open(filepath, 'w', encoding='utf-8') as f:
                f.write(healed_code)
            
            logger.info(f"Python file saved: {filename}")
            return f"Python file '{filename}' saved successfully to {self.base_dir}"
            
        except Exception as e:
            error_msg = f"Error saving Python file '{filename}': {str(e)}"
            logger.error(error_msg)
            return error_msg
    
    def list_python_files(self) -> str:
        """
        List all Python files in the base directory.
        
        Returns:
            str: List of Python files
        """
        try:
            python_files = list(self.base_dir.glob("*.py"))
            
            if not python_files:
                return "No Python files found in the base directory"
            
            file_list = []
            for file_path in python_files:
                file_stat = file_path.stat()
                file_info = f"{file_path.name} ({file_stat.st_size} bytes, modified: {time.ctime(file_stat.st_mtime)})"
                file_list.append(file_info)
            
            return "Python files in base directory:\n" + "\n".join(file_list)
            
        except Exception as e:
            error_msg = f"Error listing Python files: {str(e)}"
            logger.error(error_msg)
            return error_msg
    
    def validate_python_syntax(self, code: str) -> str:
        """
        Validate Python code syntax without executing it.
        
        Args:
            code (str): Python code to validate
            
        Returns:
            str: Validation result message
        """
        try:
            # Parse the code to check for syntax errors
            ast.parse(code)
            return "Python syntax is valid"
            
        except SyntaxError as e:
            error_msg = f"Syntax Error at line {e.lineno}: {e.msg}"
            logger.warning(f"Python syntax validation failed: {error_msg}")
            return error_msg
            
        except Exception as e:
            error_msg = f"Error validating Python syntax: {str(e)}"
            logger.error(error_msg)
            return error_msg
    
    def get_base_directory(self) -> str:
        """
        Get the current base directory path.
        
        Returns:
            str: Absolute path of the base directory
        """
        return str(self.base_dir.absolute())
    
    def clear_temp_files(self) -> str:
        """
        Clean up any temporary Python files in the base directory.
        
        Returns:
            str: Cleanup result message
        """
        try:
            temp_files = list(self.base_dir.glob("temp_script_*.py"))
            
            if not temp_files:
                return "No temporary files to clean up"
            
            for temp_file in temp_files:
                temp_file.unlink()
            
            return f"Cleaned up {len(temp_files)} temporary Python files"
            
        except Exception as e:
            error_msg = f"Error cleaning up temporary files: {str(e)}"
            logger.error(error_msg)
            return error_msg