Spaces:
Sleeping
Sleeping
import uuid | |
from typing import Any, Optional, Dict, List | |
from pydantic import Field | |
from aworld.output.artifact import Artifact, ArtifactType, ArtifactAttachment | |
CODE_FILE_EXTENSION_MAP = { | |
"python": "py", | |
"java": "java", | |
"javascript": "js", | |
"typescript": "ts", | |
"html": "html", | |
"css": "css", | |
"c": "c", | |
"cpp": "cpp", | |
"csharp": "cs", | |
"go": "go", | |
"rust": "rs", | |
"ruby": "rb", | |
"php": "php", | |
"swift": "swift", | |
"kotlin": "kt", | |
"scala": "scala", | |
"markdown": "md", | |
"txt": "txt", | |
"shell": "sh", | |
"bash": "sh", | |
"sh": "sh", | |
"zsh": "zsh", | |
"powershell": "ps1", | |
"cmd": "cmd", | |
"bat": "bat" | |
} | |
class CodeArtifact(Artifact): | |
code_interceptor: Any = Field(default=None, description="code executor type") | |
def __init__(self, artifact_type: ArtifactType, content: Any, code_type: Optional[str], code_version: Optional[str], | |
code_interceptor_provider: Optional[str] = None, | |
artifact_id: Optional[str] = None, render_type: Optional[str] = None, **kwargs): | |
# Extract filename from the first line of the content | |
filename = self.extract_filename(content) | |
# Initialize metadata, including any passed in kwargs | |
metadata = { | |
"code_type": code_type, | |
"code_version": code_version, | |
"code_interceptor_provider": code_interceptor_provider, | |
"filename": filename # Store filename in metadata | |
} | |
# Merge additional metadata from kwargs if provided | |
if 'metadata' in kwargs: | |
metadata.update(kwargs['metadata']) | |
del kwargs['metadata'] # Remove metadata from kwargs to avoid multiple values | |
super().__init__( | |
artifact_type=artifact_type, | |
content=content, | |
metadata=metadata, | |
artifact_id=artifact_id, | |
render_type=render_type, | |
**kwargs | |
) | |
self.archive() | |
self.code_interceptor = self.init_code_interceptor(code_interceptor_provider) | |
def extract_filename(content: Any) -> Optional[str]: | |
"""Extract filename from the first line of the code block comment.""" | |
if isinstance(content, str): | |
lines = content.splitlines() | |
if lines: | |
first_line = lines[0].strip() | |
# Check if the first line is a shebang for bash or other interpreters | |
if first_line in ["# /bin/bash", "#!/bin/bash", "#!/usr/bin/env bash", | |
"#!/bin/sh", "#!/usr/bin/env python", | |
"#!/usr/bin/env python3"]: | |
return None # Do not return a filename | |
# Check for common comment styles in various languages | |
if first_line.startswith("#"): # Python, Ruby, Shell | |
return first_line[1:].strip() # Remove the comment symbol | |
elif first_line.startswith("//"): # Java, JavaScript, C, C++ | |
return first_line[2:].strip() # Remove the comment symbol | |
elif first_line.startswith("/*") and "*/" in first_line: # C, C++ | |
return first_line.split("*/")[0][2:].strip() # Remove comment symbols | |
elif first_line.startswith("<!--"): # HTML | |
return first_line[4:].strip() # Remove the comment symbol | |
# Add more languages as needed | |
return None # Return None if filename is unknown | |
def build_artifact(cls, | |
content: Any, | |
code_type: Optional[str] = None, | |
code_version: Optional[str] = None, | |
code_interceptor_provider: Optional[str] = None, | |
artifact_id: Optional[str] = None, | |
render_type: Optional[str] = None, | |
**kwargs) -> "CodeArtifact": | |
# Create CodeArtifact instance | |
if code_type in ['shell', 'sh', 'bash', 'zsh']: | |
return ShellArtifact( | |
artifact_type=ArtifactType.CODE, | |
content=content, | |
code_version=code_version, | |
code_interceptor_provider=code_interceptor_provider, | |
artifact_id=artifact_id, | |
render_type=render_type, | |
**kwargs | |
) | |
elif code_type in ['html']: | |
return HtmlArtifact( | |
content=content, | |
artifact_id=artifact_id, | |
**kwargs | |
) | |
return cls( | |
artifact_type=ArtifactType.CODE, | |
content=content, | |
code_type=code_type, | |
code_version=code_version, | |
code_interceptor_provider=code_interceptor_provider, | |
artifact_id=artifact_id, | |
render_type=render_type, | |
**kwargs | |
) | |
def from_code_content(cls, artifact_type: ArtifactType, | |
content: Any, | |
render_type: Optional[str] = None, | |
**kwargs) -> List["CodeArtifact"]: | |
code_blocks = cls.extract_model_output_to_code_content(content) # Extract code blocks | |
artifacts = [] # List to store CodeArtifact instances | |
for block in code_blocks: | |
code_type = block['language'] | |
code_version = "1.0" | |
if code_type in ['python', 'javascript', 'java']: | |
code_interceptor_provider = "default_interceptor" | |
elif code_type in ['shell', 'sh', 'bash', 'zsh']: | |
code_interceptor_provider = "shell_interceptor" | |
else: | |
code_interceptor_provider = "generic_interceptor" | |
artifact = cls.create_artifact( | |
artifact_type=ArtifactType.CODE, | |
content=block['content'], | |
code_type=code_type, | |
code_version=code_version, | |
code_interceptor_provider=code_interceptor_provider, | |
artifact_id=block['artifact_id'], # Use extracted artifact_id | |
render_type=render_type, | |
**kwargs | |
) | |
artifacts.append(artifact) # Add to the list | |
return artifacts # Return the list of CodeArtifact instances | |
def init_code_interceptor(self, code_interceptor_provider): | |
pass | |
def extract_model_output_to_code_content(cls, content): | |
""" | |
Extract code blocks from markdown content using mistune. | |
First extracts all code blocks enclosed in triple backticks, | |
then determines the language for each block. | |
""" | |
try: | |
import mistune | |
except ImportError: | |
# install mistune | |
import subprocess | |
subprocess.run(["pip", "install", "mistune>=3.0.0"], check=True) | |
import mistune | |
code_blocks = [] | |
# | |
extracted_blocks = [] | |
# create custom Render | |
class CustomRenderer(mistune.HTMLRenderer): | |
def block_code(self, code, info=None): | |
language = info.split()[0] if info else 'unknown' | |
extracted_blocks.append({ | |
"content": code, | |
"language": language | |
}) | |
return "" | |
# create Markdown render | |
renderer = CustomRenderer() | |
markdown = mistune.create_markdown( | |
renderer=renderer | |
) | |
# resolve markdown | |
markdown(content) | |
# process codeblocks | |
for block in extracted_blocks: | |
artifact_id = str(uuid.uuid4()) | |
language = block['language'] | |
file_suffix = CODE_FILE_EXTENSION_MAP.get(language, "txt") | |
code_blocks.append({ | |
"artifact_id": artifact_id, | |
"content": block['content'], | |
"language": language, | |
"file_suffix": file_suffix | |
}) | |
return code_blocks | |
class ShellArtifact(CodeArtifact): | |
shell_result: str = Field(default="", description="shell execution result") | |
def __init__(self, artifact_type: ArtifactType, content: Any, code_version: str, | |
code_interceptor_provider: Optional[str] = None, | |
artifact_id: Optional[str] = None, render_type: Optional[str] = None, | |
shell_result: str = "", **kwargs): | |
code_type = "shell" | |
# extract filename | |
filename = self.extract_filename(content) | |
# default set terminal.txt | |
if not filename: | |
filename = "terminal.txt" | |
# update metadata | |
metadata = kwargs.get('metadata', {}) | |
metadata['filename'] = filename | |
# setting code_interceptor_provider | |
if code_interceptor_provider is None: | |
code_interceptor_provider = "shell_interceptor" | |
super().__init__(artifact_type, content, code_type, code_version, | |
code_interceptor_provider, artifact_id, render_type, metadata=metadata, **kwargs) | |
self.shell_result = shell_result | |
def execute(self): | |
# todo add | |
pass | |
class HtmlArtifact(CodeArtifact): | |
def __init__(self, content: Any, artifact_id: Optional[str] = None, **kwargs): | |
# Remove artifact_type from kwargs if it exists to avoid conflicts | |
kwargs.pop('artifact_type', None) | |
super().__init__( | |
artifact_type=ArtifactType.HTML, | |
content=content, | |
code_type='html', | |
code_version='1.0', | |
artifact_id=artifact_id, | |
**kwargs | |
) | |
content = content.replace("```html", "").replace("```", "") | |
self.content = None | |
self.attachments.append( | |
ArtifactAttachment(filename=f"{artifact_id}.html", content=content, mime_type="text/html") | |
) | |