transformerOS / pareto-lang.parse.py
recursivelabs's picture
Upload 11 files
37b8741 verified
"""
parse.py
Translate .p/ commands to function calls in transformerOS
This module handles the parsing and translation of the .p/ command language,
transforming structured .p/ commands into executable function calls in the
transformerOS framework.
"""
import re
import yaml
import json
import logging
from typing import Dict, List, Optional, Tuple, Union, Any
from pathlib import Path
# Configure parser logging
log = logging.getLogger("transformerOS.pareto_lang.parse")
log.setLevel(logging.INFO)
class CommandParser:
"""
Parser for the .p/ command language
This class handles the parsing and translation of .p/ commands into
executable function calls, enabling a consistent interface between
the symbolic command language and the underlying system operations.
"""
def __init__(self, commands_file: Optional[str] = None):
"""
Initialize the command parser
Parameters:
-----------
commands_file : Optional[str]
Path to the YAML file containing command definitions
If None, uses the default commands.yaml in the same directory
"""
# Load command definitions
if commands_file is None:
# Use default commands file in the same directory
commands_file = Path(__file__).parent / "commands.yaml"
self.commands = self._load_commands(commands_file)
# Compile regex patterns for command parsing
self.command_pattern = re.compile(r'\.p/([a-zA-Z_]+)\.([a-zA-Z_]+)(\{.*\})?')
self.param_pattern = re.compile(r'([a-zA-Z_]+)\s*=\s*([^,}]+)')
log.info(f"CommandParser initialized with {len(self.commands)} command definitions")
def _load_commands(self, commands_file: str) -> Dict:
"""Load command definitions from YAML file"""
try:
with open(commands_file, 'r') as f:
commands = yaml.safe_load(f)
# Validate command structure
if not isinstance(commands, dict) or "commands" not in commands:
raise ValueError(f"Invalid command file format: {commands_file}")
log.info(f"Loaded {len(commands['commands'])} command definitions from {commands_file}")
return commands
except Exception as e:
log.error(f"Failed to load commands from {commands_file}: {e}")
# Return empty commands dict as fallback
return {"commands": {}, "version": "unknown"}
def parse_command(self, command_str: str) -> Dict:
"""
Parse a .p/ command string into a structured command object
Parameters:
-----------
command_str : str
The .p/ command string to parse
Returns:
--------
Dict containing the parsed command structure
"""
# Strip whitespace
command_str = command_str.strip()
# Check if this is a .p/ command
if not command_str.startswith(".p/"):
raise ValueError(f"Not a valid .p/ command: {command_str}")
# Extract command components using regex
match = self.command_pattern.match(command_str)
if not match:
raise ValueError(f"Invalid command format: {command_str}")
domain, operation, params_str = match.groups()
# Parse parameters if present
params = {}
if params_str:
# Remove the braces
params_str = params_str.strip('{}')
# Extract parameters using regex
param_matches = self.param_pattern.findall(params_str)
for param_name, param_value in param_matches:
# Process the parameter value based on type
params[param_name] = self._parse_parameter_value(param_value)
# Create command structure
parsed_command = {
"domain": domain,
"operation": operation,
"parameters": params,
"original": command_str
}
log.debug(f"Parsed command: {parsed_command}")
return parsed_command
def _parse_parameter_value(self, value_str: str) -> Any:
"""Parse a parameter value string into the appropriate type"""
# Strip whitespace and quotes
value_str = value_str.strip()
# Handle quoted strings
if (value_str.startswith('"') and value_str.endswith('"')) or \
(value_str.startswith("'") and value_str.endswith("'")):
return value_str[1:-1]
# Handle numeric values
try:
# Try as int
if value_str.isdigit():
return int(value_str)
# Try as float
return float(value_str)
except ValueError:
pass
# Handle boolean values
if value_str.lower() == "true":
return True
elif value_str.lower() == "false":
return False
# Handle special values
if value_str.lower() == "null" or value_str.lower() == "none":
return None
# Handle lists
if value_str.startswith('[') and value_str.endswith(']'):
# Simple list parsing - this could be enhanced for nested structures
items = value_str[1:-1].split(',')
return [self._parse_parameter_value(item) for item in items]
# Handle complete as a special string case
if value_str.lower() == "complete":
return "complete"
# Default to string
return value_str
def validate_command(self, parsed_command: Dict) -> Tuple[bool, Optional[str]]:
"""
Validate a parsed command against command definitions
Parameters:
-----------
parsed_command : Dict
The parsed command structure to validate
Returns:
--------
Tuple of (is_valid, error_message)
"""
domain = parsed_command["domain"]
operation = parsed_command["operation"]
parameters = parsed_command["parameters"]
# Check if domain exists
if domain not in self.commands["commands"]:
return False, f"Unknown command domain: {domain}"
# Check if operation exists in domain
domain_commands = self.commands["commands"][domain]
if operation not in domain_commands:
return False, f"Unknown operation '{operation}' in domain '{domain}'"
# Get command definition
command_def = domain_commands[operation]
# Check required parameters
if "required_parameters" in command_def:
for required_param in command_def["required_parameters"]:
if required_param not in parameters:
return False, f"Missing required parameter: {required_param}"
# Check parameter types (basic validation)
if "parameters" in command_def:
for param_name, param_value in parameters.items():
if param_name in command_def["parameters"]:
expected_type = command_def["parameters"][param_name]["type"]
# Check parameter type
if not self._validate_parameter_type(param_value, expected_type):
return False, f"Parameter '{param_name}' has invalid type. Expected {expected_type}."
return True, None
def _validate_parameter_type(self, value: Any, expected_type: str) -> bool:
"""Validate a parameter value against its expected type"""
if expected_type == "string":
return isinstance(value, str)
elif expected_type == "int":
return isinstance(value, int)
elif expected_type == "float":
return isinstance(value, (int, float))
elif expected_type == "bool":
return isinstance(value, bool)
elif expected_type == "list":
return isinstance(value, list)
elif expected_type == "any":
return True
elif expected_type == "string_or_int":
return isinstance(value, (str, int))
elif expected_type == "null":
return value is None
else:
# For complex types, just return True
return True
def get_function_mapping(self, parsed_command: Dict) -> Dict:
"""
Get the function mapping for a parsed command
Parameters:
-----------
parsed_command : Dict
The parsed command structure
Returns:
--------
Dict containing function mapping information
"""
domain = parsed_command["domain"]
operation = parsed_command["operation"]
# Get command definition
try:
command_def = self.commands["commands"][domain][operation]
except KeyError:
log.warning(f"No function mapping found for {domain}.{operation}")
return {"function": None, "module": None, "parameters": {}}
# Extract function mapping
function_mapping = command_def.get("function_mapping", {})
# Create mapping result
mapping = {
"function": function_mapping.get("function", None),
"module": function_mapping.get("module", None),
"parameters": self._map_parameters(parsed_command["parameters"], function_mapping),
"original_command": parsed_command["original"]
}
return mapping
def _map_parameters(self, cmd_params: Dict, function_mapping: Dict) -> Dict:
"""Map command parameters to function parameters based on mapping rules"""
result_params = {}
# Direct parameter mappings
param_mapping = function_mapping.get("parameter_mapping", {})
for cmd_param, func_param in param_mapping.items():
if cmd_param in cmd_params:
result_params[func_param] = cmd_params[cmd_param]
# Add unmapped parameters if they match function parameter names
for param_name, param_value in cmd_params.items():
if param_name not in param_mapping.values():
# Add parameter directly if no specific mapping
result_params[param_name] = param_value
return result_params
def extract_commands(self, text: str) -> List[Dict]:
"""
Extract all .p/ commands from a text
Parameters:
-----------
text : str
The text to extract commands from
Returns:
--------
List of parsed command dictionaries
"""
# Find all command matches
pattern = r'(\.p/[a-zA-Z_]+\.[a-zA-Z_]+(?:\{[^}]*\})?)'
command_matches = re.findall(pattern, text)
# Parse each command
parsed_commands = []
for cmd_str in command_matches:
try:
parsed_cmd = self.parse_command(cmd_str)
parsed_commands.append(parsed_cmd)
except ValueError as e:
log.warning(f"Failed to parse command '{cmd_str}': {e}")
log.info(f"Extracted {len(parsed_commands)} commands from text")
return parsed_commands
# Command execution helper function
def execute_parsed_command(parsed_mapping: Dict) -> Dict:
"""
Execute a parsed command mapping using dynamic imports
Parameters:
-----------
parsed_mapping : Dict
The parsed function mapping to execute
Returns:
--------
Dict containing execution results
"""
function_name = parsed_mapping["function"]
module_name = parsed_mapping["module"]
parameters = parsed_mapping["parameters"]
if not function_name or not module_name:
raise ValueError("Invalid function mapping: missing function or module name")
try:
# Dynamically import the module
module = __import__(module_name, fromlist=[function_name])
# Get the function from the module
function = getattr(module, function_name)
# Execute the function with parameters
result = function(**parameters)
# Return execution result
return {
"status": "success",
"result": result,
"command": parsed_mapping["original_command"]
}
except ImportError as e:
log.error(f"Failed to import module {module_name}: {e}")
return {
"status": "error",
"error": f"Module not found: {module_name}",
"command": parsed_mapping["original_command"]
}
except AttributeError as e:
log.error(f"Function {function_name} not found in module {module_name}: {e}")
return {
"status": "error",
"error": f"Function not found: {function_name}",
"command": parsed_mapping["original_command"]
}
except Exception as e:
log.error(f"Error executing function {function_name}: {e}")
return {
"status": "error",
"error": str(e),
"command": parsed_mapping["original_command"]
}
# Main execution for command-line usage
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Parse .p/ commands")
parser.add_argument("command", help=".p/ command to parse")
parser.add_argument("--commands-file", help="Path to commands YAML file")
parser.add_argument("--execute", action="store_true", help="Execute the parsed command")
parser.add_argument("--verbose", action="store_true", help="Enable verbose logging")
args = parser.parse_args()
# Configure logging
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
# Create parser
cmd_parser = CommandParser(args.commands_file)
try:
# Parse command
parsed_cmd = cmd_parser.parse_command(args.command)
print("Parsed Command:")
print(json.dumps(parsed_cmd, indent=2))
# Validate command
valid, error = cmd_parser.validate_command(parsed_cmd)
if valid:
print("Command validation: PASSED")
else:
print(f"Command validation: FAILED - {error}")
# Get function mapping
func_mapping = cmd_parser.get_function_mapping(parsed_cmd)
print("\nFunction Mapping:")
print(json.dumps(func_mapping, indent=2))
# Execute if requested
if args.execute and valid:
print("\nExecuting command...")
result = execute_parsed_command(func_mapping)
print("\nExecution Result:")
print(json.dumps(result, indent=2))
except Exception as e:
print(f"Error: {e}")