|
from daytona_sdk.process import SessionExecuteRequest |
|
from typing import Optional |
|
|
|
from agentpress.tool import ToolResult, openapi_schema, xml_schema |
|
from sandbox.sandbox import SandboxToolsBase, Sandbox, get_or_start_sandbox |
|
from utils.files_utils import EXCLUDED_FILES, EXCLUDED_DIRS, EXCLUDED_EXT, should_exclude_file, clean_path |
|
from agentpress.thread_manager import ThreadManager |
|
from utils.logger import logger |
|
import os |
|
|
|
class SandboxFilesTool(SandboxToolsBase): |
|
"""Tool for executing file system operations in a Daytona sandbox. All operations are performed relative to the /workspace directory.""" |
|
|
|
def __init__(self, project_id: str, thread_manager: ThreadManager): |
|
super().__init__(project_id, thread_manager) |
|
self.SNIPPET_LINES = 4 |
|
self.workspace_path = "/workspace" |
|
|
|
def clean_path(self, path: str) -> str: |
|
"""Clean and normalize a path to be relative to /workspace""" |
|
return clean_path(path, self.workspace_path) |
|
|
|
def _should_exclude_file(self, rel_path: str) -> bool: |
|
"""Check if a file should be excluded based on path, name, or extension""" |
|
return should_exclude_file(rel_path) |
|
|
|
def _file_exists(self, path: str) -> bool: |
|
"""Check if a file exists in the sandbox""" |
|
try: |
|
self.sandbox.fs.get_file_info(path) |
|
return True |
|
except Exception: |
|
return False |
|
|
|
async def get_workspace_state(self) -> dict: |
|
"""Get the current workspace state by reading all files""" |
|
files_state = {} |
|
try: |
|
|
|
await self._ensure_sandbox() |
|
|
|
files = self.sandbox.fs.list_files(self.workspace_path) |
|
for file_info in files: |
|
rel_path = file_info.name |
|
|
|
|
|
if self._should_exclude_file(rel_path) or file_info.is_dir: |
|
continue |
|
|
|
try: |
|
full_path = f"{self.workspace_path}/{rel_path}" |
|
content = self.sandbox.fs.download_file(full_path).decode() |
|
files_state[rel_path] = { |
|
"content": content, |
|
"is_dir": file_info.is_dir, |
|
"size": file_info.size, |
|
"modified": file_info.mod_time |
|
} |
|
except Exception as e: |
|
print(f"Error reading file {rel_path}: {e}") |
|
except UnicodeDecodeError: |
|
print(f"Skipping binary file: {rel_path}") |
|
|
|
return files_state |
|
|
|
except Exception as e: |
|
print(f"Error getting workspace state: {str(e)}") |
|
return {} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@openapi_schema({ |
|
"type": "function", |
|
"function": { |
|
"name": "create_file", |
|
"description": "Create a new file with the provided contents at a given path in the workspace. The path must be relative to /workspace (e.g., 'src/main.py' for /workspace/src/main.py)", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"file_path": { |
|
"type": "string", |
|
"description": "Path to the file to be created, relative to /workspace (e.g., 'src/main.py')" |
|
}, |
|
"file_contents": { |
|
"type": "string", |
|
"description": "The content to write to the file" |
|
}, |
|
"permissions": { |
|
"type": "string", |
|
"description": "File permissions in octal format (e.g., '644')", |
|
"default": "644" |
|
} |
|
}, |
|
"required": ["file_path", "file_contents"] |
|
} |
|
} |
|
}) |
|
@xml_schema( |
|
tag_name="create-file", |
|
mappings=[ |
|
{"param_name": "file_path", "node_type": "attribute", "path": "."}, |
|
{"param_name": "file_contents", "node_type": "content", "path": "."} |
|
], |
|
example=''' |
|
<create-file file_path="src/main.py"> |
|
File contents go here |
|
</create-file> |
|
''' |
|
) |
|
async def create_file(self, file_path: str, file_contents: str, permissions: str = "644") -> ToolResult: |
|
try: |
|
|
|
await self._ensure_sandbox() |
|
|
|
file_path = self.clean_path(file_path) |
|
full_path = f"{self.workspace_path}/{file_path}" |
|
if self._file_exists(full_path): |
|
return self.fail_response(f"File '{file_path}' already exists. Use update_file to modify existing files.") |
|
|
|
|
|
parent_dir = '/'.join(full_path.split('/')[:-1]) |
|
if parent_dir: |
|
self.sandbox.fs.create_folder(parent_dir, "755") |
|
|
|
|
|
self.sandbox.fs.upload_file(full_path, file_contents.encode()) |
|
self.sandbox.fs.set_file_permissions(full_path, permissions) |
|
|
|
|
|
|
|
message = f"File '{file_path}' created successfully." |
|
|
|
|
|
|
|
return self.success_response(message) |
|
except Exception as e: |
|
return self.fail_response(f"Error creating file: {str(e)}") |
|
|
|
@openapi_schema({ |
|
"type": "function", |
|
"function": { |
|
"name": "str_replace", |
|
"description": "Replace specific text in a file. The file path must be relative to /workspace (e.g., 'src/main.py' for /workspace/src/main.py). Use this when you need to replace a unique string that appears exactly once in the file.", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"file_path": { |
|
"type": "string", |
|
"description": "Path to the target file, relative to /workspace (e.g., 'src/main.py')" |
|
}, |
|
"old_str": { |
|
"type": "string", |
|
"description": "Text to be replaced (must appear exactly once)" |
|
}, |
|
"new_str": { |
|
"type": "string", |
|
"description": "Replacement text" |
|
} |
|
}, |
|
"required": ["file_path", "old_str", "new_str"] |
|
} |
|
} |
|
}) |
|
@xml_schema( |
|
tag_name="str-replace", |
|
mappings=[ |
|
{"param_name": "file_path", "node_type": "attribute", "path": "."}, |
|
{"param_name": "old_str", "node_type": "element", "path": "old_str"}, |
|
{"param_name": "new_str", "node_type": "element", "path": "new_str"} |
|
], |
|
example=''' |
|
<str-replace file_path="src/main.py"> |
|
<old_str>text to replace (must appear exactly once in the file)</old_str> |
|
<new_str>replacement text that will be inserted instead</new_str> |
|
</str-replace> |
|
''' |
|
) |
|
async def str_replace(self, file_path: str, old_str: str, new_str: str) -> ToolResult: |
|
try: |
|
|
|
await self._ensure_sandbox() |
|
|
|
file_path = self.clean_path(file_path) |
|
full_path = f"{self.workspace_path}/{file_path}" |
|
if not self._file_exists(full_path): |
|
return self.fail_response(f"File '{file_path}' does not exist") |
|
|
|
content = self.sandbox.fs.download_file(full_path).decode() |
|
old_str = old_str.expandtabs() |
|
new_str = new_str.expandtabs() |
|
|
|
occurrences = content.count(old_str) |
|
if occurrences == 0: |
|
return self.fail_response(f"String '{old_str}' not found in file") |
|
if occurrences > 1: |
|
lines = [i+1 for i, line in enumerate(content.split('\n')) if old_str in line] |
|
return self.fail_response(f"Multiple occurrences found in lines {lines}. Please ensure string is unique") |
|
|
|
|
|
new_content = content.replace(old_str, new_str) |
|
self.sandbox.fs.upload_file(full_path, new_content.encode()) |
|
|
|
|
|
replacement_line = content.split(old_str)[0].count('\n') |
|
start_line = max(0, replacement_line - self.SNIPPET_LINES) |
|
end_line = replacement_line + self.SNIPPET_LINES + new_str.count('\n') |
|
snippet = '\n'.join(new_content.split('\n')[start_line:end_line + 1]) |
|
|
|
|
|
|
|
message = f"Replacement successful." |
|
|
|
|
|
|
|
return self.success_response(message) |
|
|
|
except Exception as e: |
|
return self.fail_response(f"Error replacing string: {str(e)}") |
|
|
|
@openapi_schema({ |
|
"type": "function", |
|
"function": { |
|
"name": "full_file_rewrite", |
|
"description": "Completely rewrite an existing file with new content. The file path must be relative to /workspace (e.g., 'src/main.py' for /workspace/src/main.py). Use this when you need to replace the entire file content or make extensive changes throughout the file.", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"file_path": { |
|
"type": "string", |
|
"description": "Path to the file to be rewritten, relative to /workspace (e.g., 'src/main.py')" |
|
}, |
|
"file_contents": { |
|
"type": "string", |
|
"description": "The new content to write to the file, replacing all existing content" |
|
}, |
|
"permissions": { |
|
"type": "string", |
|
"description": "File permissions in octal format (e.g., '644')", |
|
"default": "644" |
|
} |
|
}, |
|
"required": ["file_path", "file_contents"] |
|
} |
|
} |
|
}) |
|
@xml_schema( |
|
tag_name="full-file-rewrite", |
|
mappings=[ |
|
{"param_name": "file_path", "node_type": "attribute", "path": "."}, |
|
{"param_name": "file_contents", "node_type": "content", "path": "."} |
|
], |
|
example=''' |
|
<full-file-rewrite file_path="src/main.py"> |
|
This completely replaces the entire file content. |
|
Use when making major changes to a file or when the changes |
|
are too extensive for str-replace. |
|
All previous content will be lost and replaced with this text. |
|
</full-file-rewrite> |
|
''' |
|
) |
|
async def full_file_rewrite(self, file_path: str, file_contents: str, permissions: str = "644") -> ToolResult: |
|
try: |
|
|
|
await self._ensure_sandbox() |
|
|
|
file_path = self.clean_path(file_path) |
|
full_path = f"{self.workspace_path}/{file_path}" |
|
if not self._file_exists(full_path): |
|
return self.fail_response(f"File '{file_path}' does not exist. Use create_file to create a new file.") |
|
|
|
self.sandbox.fs.upload_file(full_path, file_contents.encode()) |
|
self.sandbox.fs.set_file_permissions(full_path, permissions) |
|
|
|
|
|
|
|
message = f"File '{file_path}' completely rewritten successfully." |
|
|
|
|
|
|
|
return self.success_response(message) |
|
except Exception as e: |
|
return self.fail_response(f"Error rewriting file: {str(e)}") |
|
|
|
@openapi_schema({ |
|
"type": "function", |
|
"function": { |
|
"name": "delete_file", |
|
"description": "Delete a file at the given path. The path must be relative to /workspace (e.g., 'src/main.py' for /workspace/src/main.py)", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"file_path": { |
|
"type": "string", |
|
"description": "Path to the file to be deleted, relative to /workspace (e.g., 'src/main.py')" |
|
} |
|
}, |
|
"required": ["file_path"] |
|
} |
|
} |
|
}) |
|
@xml_schema( |
|
tag_name="delete-file", |
|
mappings=[ |
|
{"param_name": "file_path", "node_type": "attribute", "path": "."} |
|
], |
|
example=''' |
|
<delete-file file_path="src/main.py"> |
|
</delete-file> |
|
''' |
|
) |
|
async def delete_file(self, file_path: str) -> ToolResult: |
|
try: |
|
|
|
await self._ensure_sandbox() |
|
|
|
file_path = self.clean_path(file_path) |
|
full_path = f"{self.workspace_path}/{file_path}" |
|
if not self._file_exists(full_path): |
|
return self.fail_response(f"File '{file_path}' does not exist") |
|
|
|
self.sandbox.fs.delete_file(full_path) |
|
return self.success_response(f"File '{file_path}' deleted successfully.") |
|
except Exception as e: |
|
return self.fail_response(f"Error deleting file: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|