|
""" |
|
parse.py |
|
Translate .p/ commands to function calls in transformerOS |
|
|
|
This module handles the parsing and translation of the .p/ command language, |
|
transforming structured .p/ commands into executable function calls in the |
|
transformerOS framework. |
|
""" |
|
|
|
import re |
|
import yaml |
|
import json |
|
import logging |
|
from typing import Dict, List, Optional, Tuple, Union, Any |
|
from pathlib import Path |
|
|
|
|
|
log = logging.getLogger("transformerOS.pareto_lang.parse") |
|
log.setLevel(logging.INFO) |
|
|
|
class CommandParser: |
|
""" |
|
Parser for the .p/ command language |
|
|
|
This class handles the parsing and translation of .p/ commands into |
|
executable function calls, enabling a consistent interface between |
|
the symbolic command language and the underlying system operations. |
|
""" |
|
|
|
def __init__(self, commands_file: Optional[str] = None): |
|
""" |
|
Initialize the command parser |
|
|
|
Parameters: |
|
----------- |
|
commands_file : Optional[str] |
|
Path to the YAML file containing command definitions |
|
If None, uses the default commands.yaml in the same directory |
|
""" |
|
|
|
if commands_file is None: |
|
|
|
commands_file = Path(__file__).parent / "commands.yaml" |
|
|
|
self.commands = self._load_commands(commands_file) |
|
|
|
|
|
self.command_pattern = re.compile(r'\.p/([a-zA-Z_]+)\.([a-zA-Z_]+)(\{.*\})?') |
|
self.param_pattern = re.compile(r'([a-zA-Z_]+)\s*=\s*([^,}]+)') |
|
|
|
log.info(f"CommandParser initialized with {len(self.commands)} command definitions") |
|
|
|
def _load_commands(self, commands_file: str) -> Dict: |
|
"""Load command definitions from YAML file""" |
|
try: |
|
with open(commands_file, 'r') as f: |
|
commands = yaml.safe_load(f) |
|
|
|
|
|
if not isinstance(commands, dict) or "commands" not in commands: |
|
raise ValueError(f"Invalid command file format: {commands_file}") |
|
|
|
log.info(f"Loaded {len(commands['commands'])} command definitions from {commands_file}") |
|
return commands |
|
|
|
except Exception as e: |
|
log.error(f"Failed to load commands from {commands_file}: {e}") |
|
|
|
return {"commands": {}, "version": "unknown"} |
|
|
|
def parse_command(self, command_str: str) -> Dict: |
|
""" |
|
Parse a .p/ command string into a structured command object |
|
|
|
Parameters: |
|
----------- |
|
command_str : str |
|
The .p/ command string to parse |
|
|
|
Returns: |
|
-------- |
|
Dict containing the parsed command structure |
|
""" |
|
|
|
command_str = command_str.strip() |
|
|
|
|
|
if not command_str.startswith(".p/"): |
|
raise ValueError(f"Not a valid .p/ command: {command_str}") |
|
|
|
|
|
match = self.command_pattern.match(command_str) |
|
if not match: |
|
raise ValueError(f"Invalid command format: {command_str}") |
|
|
|
domain, operation, params_str = match.groups() |
|
|
|
|
|
params = {} |
|
if params_str: |
|
|
|
params_str = params_str.strip('{}') |
|
|
|
|
|
param_matches = self.param_pattern.findall(params_str) |
|
for param_name, param_value in param_matches: |
|
|
|
params[param_name] = self._parse_parameter_value(param_value) |
|
|
|
|
|
parsed_command = { |
|
"domain": domain, |
|
"operation": operation, |
|
"parameters": params, |
|
"original": command_str |
|
} |
|
|
|
log.debug(f"Parsed command: {parsed_command}") |
|
return parsed_command |
|
|
|
def _parse_parameter_value(self, value_str: str) -> Any: |
|
"""Parse a parameter value string into the appropriate type""" |
|
|
|
value_str = value_str.strip() |
|
|
|
|
|
if (value_str.startswith('"') and value_str.endswith('"')) or \ |
|
(value_str.startswith("'") and value_str.endswith("'")): |
|
return value_str[1:-1] |
|
|
|
|
|
try: |
|
|
|
if value_str.isdigit(): |
|
return int(value_str) |
|
|
|
|
|
return float(value_str) |
|
except ValueError: |
|
pass |
|
|
|
|
|
if value_str.lower() == "true": |
|
return True |
|
elif value_str.lower() == "false": |
|
return False |
|
|
|
|
|
if value_str.lower() == "null" or value_str.lower() == "none": |
|
return None |
|
|
|
|
|
if value_str.startswith('[') and value_str.endswith(']'): |
|
|
|
items = value_str[1:-1].split(',') |
|
return [self._parse_parameter_value(item) for item in items] |
|
|
|
|
|
if value_str.lower() == "complete": |
|
return "complete" |
|
|
|
|
|
return value_str |
|
|
|
def validate_command(self, parsed_command: Dict) -> Tuple[bool, Optional[str]]: |
|
""" |
|
Validate a parsed command against command definitions |
|
|
|
Parameters: |
|
----------- |
|
parsed_command : Dict |
|
The parsed command structure to validate |
|
|
|
Returns: |
|
-------- |
|
Tuple of (is_valid, error_message) |
|
""" |
|
domain = parsed_command["domain"] |
|
operation = parsed_command["operation"] |
|
parameters = parsed_command["parameters"] |
|
|
|
|
|
if domain not in self.commands["commands"]: |
|
return False, f"Unknown command domain: {domain}" |
|
|
|
|
|
domain_commands = self.commands["commands"][domain] |
|
if operation not in domain_commands: |
|
return False, f"Unknown operation '{operation}' in domain '{domain}'" |
|
|
|
|
|
command_def = domain_commands[operation] |
|
|
|
|
|
if "required_parameters" in command_def: |
|
for required_param in command_def["required_parameters"]: |
|
if required_param not in parameters: |
|
return False, f"Missing required parameter: {required_param}" |
|
|
|
|
|
if "parameters" in command_def: |
|
for param_name, param_value in parameters.items(): |
|
if param_name in command_def["parameters"]: |
|
expected_type = command_def["parameters"][param_name]["type"] |
|
|
|
|
|
if not self._validate_parameter_type(param_value, expected_type): |
|
return False, f"Parameter '{param_name}' has invalid type. Expected {expected_type}." |
|
|
|
return True, None |
|
|
|
def _validate_parameter_type(self, value: Any, expected_type: str) -> bool: |
|
"""Validate a parameter value against its expected type""" |
|
if expected_type == "string": |
|
return isinstance(value, str) |
|
elif expected_type == "int": |
|
return isinstance(value, int) |
|
elif expected_type == "float": |
|
return isinstance(value, (int, float)) |
|
elif expected_type == "bool": |
|
return isinstance(value, bool) |
|
elif expected_type == "list": |
|
return isinstance(value, list) |
|
elif expected_type == "any": |
|
return True |
|
elif expected_type == "string_or_int": |
|
return isinstance(value, (str, int)) |
|
elif expected_type == "null": |
|
return value is None |
|
else: |
|
|
|
return True |
|
|
|
def get_function_mapping(self, parsed_command: Dict) -> Dict: |
|
""" |
|
Get the function mapping for a parsed command |
|
|
|
Parameters: |
|
----------- |
|
parsed_command : Dict |
|
The parsed command structure |
|
|
|
Returns: |
|
-------- |
|
Dict containing function mapping information |
|
""" |
|
domain = parsed_command["domain"] |
|
operation = parsed_command["operation"] |
|
|
|
|
|
try: |
|
command_def = self.commands["commands"][domain][operation] |
|
except KeyError: |
|
log.warning(f"No function mapping found for {domain}.{operation}") |
|
return {"function": None, "module": None, "parameters": {}} |
|
|
|
|
|
function_mapping = command_def.get("function_mapping", {}) |
|
|
|
|
|
mapping = { |
|
"function": function_mapping.get("function", None), |
|
"module": function_mapping.get("module", None), |
|
"parameters": self._map_parameters(parsed_command["parameters"], function_mapping), |
|
"original_command": parsed_command["original"] |
|
} |
|
|
|
return mapping |
|
|
|
def _map_parameters(self, cmd_params: Dict, function_mapping: Dict) -> Dict: |
|
"""Map command parameters to function parameters based on mapping rules""" |
|
result_params = {} |
|
|
|
|
|
param_mapping = function_mapping.get("parameter_mapping", {}) |
|
for cmd_param, func_param in param_mapping.items(): |
|
if cmd_param in cmd_params: |
|
result_params[func_param] = cmd_params[cmd_param] |
|
|
|
|
|
for param_name, param_value in cmd_params.items(): |
|
if param_name not in param_mapping.values(): |
|
|
|
result_params[param_name] = param_value |
|
|
|
return result_params |
|
|
|
def extract_commands(self, text: str) -> List[Dict]: |
|
""" |
|
Extract all .p/ commands from a text |
|
|
|
Parameters: |
|
----------- |
|
text : str |
|
The text to extract commands from |
|
|
|
Returns: |
|
-------- |
|
List of parsed command dictionaries |
|
""" |
|
|
|
pattern = r'(\.p/[a-zA-Z_]+\.[a-zA-Z_]+(?:\{[^}]*\})?)' |
|
command_matches = re.findall(pattern, text) |
|
|
|
|
|
parsed_commands = [] |
|
for cmd_str in command_matches: |
|
try: |
|
parsed_cmd = self.parse_command(cmd_str) |
|
parsed_commands.append(parsed_cmd) |
|
except ValueError as e: |
|
log.warning(f"Failed to parse command '{cmd_str}': {e}") |
|
|
|
log.info(f"Extracted {len(parsed_commands)} commands from text") |
|
return parsed_commands |
|
|
|
|
|
|
|
def execute_parsed_command(parsed_mapping: Dict) -> Dict: |
|
""" |
|
Execute a parsed command mapping using dynamic imports |
|
|
|
Parameters: |
|
----------- |
|
parsed_mapping : Dict |
|
The parsed function mapping to execute |
|
|
|
Returns: |
|
-------- |
|
Dict containing execution results |
|
""" |
|
function_name = parsed_mapping["function"] |
|
module_name = parsed_mapping["module"] |
|
parameters = parsed_mapping["parameters"] |
|
|
|
if not function_name or not module_name: |
|
raise ValueError("Invalid function mapping: missing function or module name") |
|
|
|
try: |
|
|
|
module = __import__(module_name, fromlist=[function_name]) |
|
|
|
|
|
function = getattr(module, function_name) |
|
|
|
|
|
result = function(**parameters) |
|
|
|
|
|
return { |
|
"status": "success", |
|
"result": result, |
|
"command": parsed_mapping["original_command"] |
|
} |
|
|
|
except ImportError as e: |
|
log.error(f"Failed to import module {module_name}: {e}") |
|
return { |
|
"status": "error", |
|
"error": f"Module not found: {module_name}", |
|
"command": parsed_mapping["original_command"] |
|
} |
|
|
|
except AttributeError as e: |
|
log.error(f"Function {function_name} not found in module {module_name}: {e}") |
|
return { |
|
"status": "error", |
|
"error": f"Function not found: {function_name}", |
|
"command": parsed_mapping["original_command"] |
|
} |
|
|
|
except Exception as e: |
|
log.error(f"Error executing function {function_name}: {e}") |
|
return { |
|
"status": "error", |
|
"error": str(e), |
|
"command": parsed_mapping["original_command"] |
|
} |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
import argparse |
|
|
|
parser = argparse.ArgumentParser(description="Parse .p/ commands") |
|
parser.add_argument("command", help=".p/ command to parse") |
|
parser.add_argument("--commands-file", help="Path to commands YAML file") |
|
parser.add_argument("--execute", action="store_true", help="Execute the parsed command") |
|
parser.add_argument("--verbose", action="store_true", help="Enable verbose logging") |
|
|
|
args = parser.parse_args() |
|
|
|
|
|
if args.verbose: |
|
logging.basicConfig(level=logging.DEBUG) |
|
else: |
|
logging.basicConfig(level=logging.INFO) |
|
|
|
|
|
cmd_parser = CommandParser(args.commands_file) |
|
|
|
try: |
|
|
|
parsed_cmd = cmd_parser.parse_command(args.command) |
|
print("Parsed Command:") |
|
print(json.dumps(parsed_cmd, indent=2)) |
|
|
|
|
|
valid, error = cmd_parser.validate_command(parsed_cmd) |
|
if valid: |
|
print("Command validation: PASSED") |
|
else: |
|
print(f"Command validation: FAILED - {error}") |
|
|
|
|
|
func_mapping = cmd_parser.get_function_mapping(parsed_cmd) |
|
print("\nFunction Mapping:") |
|
print(json.dumps(func_mapping, indent=2)) |
|
|
|
|
|
if args.execute and valid: |
|
print("\nExecuting command...") |
|
result = execute_parsed_command(func_mapping) |
|
print("\nExecution Result:") |
|
print(json.dumps(result, indent=2)) |
|
|
|
except Exception as e: |
|
print(f"Error: {e}") |
|
|