|
""" |
|
shell_executor.py |
|
YAML symbolic shells interpreter for transformerOS |
|
|
|
This module serves as the core execution engine for symbolic shells, |
|
managing the parsing, validation, and execution of shell packs across |
|
different model architectures. |
|
""" |
|
|
|
import os |
|
import sys |
|
import yaml |
|
import json |
|
import logging |
|
import importlib |
|
from typing import Dict, List, Optional, Tuple, Union, Any |
|
from pathlib import Path |
|
from dataclasses import dataclass, field |
|
|
|
|
|
log = logging.getLogger("transformerOS.shell_executor") |
|
log.setLevel(logging.INFO) |
|
|
|
|
|
from transformerOS.core.symbolic_engine import SymbolicEngine |
|
from transformerOS.models.model_interface import ModelInterface, get_model_interface |
|
from transformerOS.utils.visualization import ShellVisualizer |
|
from transformerOS.core.attribution import AttributionTracer |
|
from transformerOS.core.collapse_detector import CollapseDetector |
|
|
|
|
|
from transformerOS.modules.reflect_module import ReflectOperation |
|
from transformerOS.modules.collapse_module import CollapseOperation |
|
from transformerOS.modules.ghostcircuits_module import GhostCircuitOperation |
|
|
|
@dataclass |
|
class ShellExecutionResult: |
|
"""Structured result from shell execution""" |
|
shell_id: str |
|
success: bool |
|
execution_time: float |
|
result: Dict |
|
residue: Optional[Dict] = None |
|
collapse_detected: bool = False |
|
collapse_type: Optional[str] = None |
|
attribution_map: Optional[Dict] = None |
|
visualization: Optional[Dict] = None |
|
metadata: Dict = field(default_factory=dict) |
|
|
|
|
|
class ShellExecutor: |
|
""" |
|
Core executor for symbolic shell packs |
|
|
|
This class handles the loading, validation, and execution of shell packs, |
|
providing a standardized interface for triggering and analyzing model |
|
behavior through structured symbolic shells. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
config_path: Optional[str] = None, |
|
model_id: str = "default", |
|
custom_model: Optional[ModelInterface] = None, |
|
log_path: Optional[str] = None |
|
): |
|
""" |
|
Initialize the shell executor |
|
|
|
Parameters: |
|
----------- |
|
config_path : Optional[str] |
|
Path to executor configuration file |
|
model_id : str |
|
Identifier for the model to use |
|
custom_model : Optional[ModelInterface] |
|
Custom model interface (if provided, model_id is ignored) |
|
log_path : Optional[str] |
|
Path for execution logs |
|
""" |
|
|
|
if log_path: |
|
file_handler = logging.FileHandler(log_path) |
|
file_handler.setLevel(logging.INFO) |
|
log.addHandler(file_handler) |
|
|
|
|
|
self.config = self._load_config(config_path) |
|
|
|
|
|
if custom_model: |
|
self.model = custom_model |
|
else: |
|
self.model = get_model_interface(model_id) |
|
|
|
|
|
self.symbolic_engine = SymbolicEngine() |
|
self.attribution_tracer = AttributionTracer(self.model) |
|
self.collapse_detector = CollapseDetector() |
|
self.visualizer = ShellVisualizer() |
|
|
|
|
|
self.reflect_op = ReflectOperation(self.model, self.symbolic_engine) |
|
self.collapse_op = CollapseOperation(self.model, self.symbolic_engine) |
|
self.ghost_op = GhostCircuitOperation(self.model, self.symbolic_engine) |
|
|
|
|
|
self.loaded_shells = {} |
|
self.shell_cache = {} |
|
|
|
|
|
if self.config.get("auto_load_shells", False): |
|
default_packs = self.config.get("default_shell_packs", []) |
|
for pack_path in default_packs: |
|
self.load_shell_pack(pack_path) |
|
|
|
log.info(f"ShellExecutor initialized with model: {self.model.model_id}") |
|
|
|
def _load_config(self, config_path: Optional[str]) -> Dict: |
|
"""Load configuration from file or use defaults""" |
|
default_config = { |
|
"auto_load_shells": True, |
|
"default_shell_packs": [ |
|
"symbolic-shells/core-shells.yml", |
|
"symbolic-shells/constitutional-shells.yml", |
|
"symbolic-shells/meta-shells.yml" |
|
], |
|
"default_visualization": True, |
|
"attribution_tracing": True, |
|
"residue_logging": True, |
|
"shell_timeout": 60, |
|
"max_token_count": 2048 |
|
} |
|
|
|
if not config_path: |
|
return default_config |
|
|
|
try: |
|
with open(config_path, 'r') as f: |
|
user_config = yaml.safe_load(f) |
|
|
|
|
|
config = {**default_config, **user_config} |
|
log.info(f"Loaded configuration from {config_path}") |
|
return config |
|
except Exception as e: |
|
log.warning(f"Failed to load config from {config_path}: {e}") |
|
log.info("Using default configuration") |
|
return default_config |
|
|
|
def load_shell_pack(self, pack_path: str) -> Dict: |
|
""" |
|
Load a shell pack from YAML file |
|
|
|
Parameters: |
|
----------- |
|
pack_path : str |
|
Path to the shell pack YAML file |
|
|
|
Returns: |
|
-------- |
|
Dict with information about loaded shells |
|
""" |
|
try: |
|
|
|
if not os.path.isabs(pack_path): |
|
|
|
standard_locations = [ |
|
"", |
|
"symbolic-shells/", |
|
"../symbolic-shells/", |
|
os.path.join(os.path.dirname(__file__), "../symbolic-shells/") |
|
] |
|
|
|
for location in standard_locations: |
|
full_path = os.path.join(location, pack_path) |
|
if os.path.exists(full_path): |
|
pack_path = full_path |
|
break |
|
|
|
|
|
with open(pack_path, 'r') as f: |
|
shell_pack = yaml.safe_load(f) |
|
|
|
|
|
if not isinstance(shell_pack, dict) or "shells" not in shell_pack: |
|
raise ValueError(f"Invalid shell pack format in {pack_path}") |
|
|
|
|
|
pack_metadata = { |
|
"name": shell_pack.get("name", os.path.basename(pack_path)), |
|
"description": shell_pack.get("description", ""), |
|
"version": shell_pack.get("version", "1.0.0"), |
|
"author": shell_pack.get("author", "Unknown"), |
|
"shells": [] |
|
} |
|
|
|
|
|
for shell_id, shell_def in shell_pack["shells"].items(): |
|
|
|
if not self._validate_shell(shell_id, shell_def): |
|
log.warning(f"Skipping invalid shell definition: {shell_id}") |
|
continue |
|
|
|
|
|
self.loaded_shells[shell_id] = shell_def |
|
pack_metadata["shells"].append(shell_id) |
|
|
|
log.info(f"Loaded shell: {shell_id}") |
|
|
|
log.info(f"Successfully loaded shell pack from {pack_path} with {len(pack_metadata['shells'])} shells") |
|
return pack_metadata |
|
|
|
except Exception as e: |
|
log.error(f"Failed to load shell pack from {pack_path}: {e}") |
|
raise |
|
|
|
def _validate_shell(self, shell_id: str, shell_def: Dict) -> bool: |
|
"""Validate shell definition structure""" |
|
required_fields = ["description", "type", "operations"] |
|
|
|
|
|
for field in required_fields: |
|
if field not in shell_def: |
|
log.warning(f"Shell {shell_id} missing required field: {field}") |
|
return False |
|
|
|
|
|
if not isinstance(shell_def["operations"], list) or not shell_def["operations"]: |
|
log.warning(f"Shell {shell_id} has invalid or empty operations list") |
|
return False |
|
|
|
|
|
for operation in shell_def["operations"]: |
|
if "type" not in operation or "parameters" not in operation: |
|
log.warning(f"Shell {shell_id} has operation missing type or parameters") |
|
return False |
|
|
|
return True |
|
|
|
def list_shells(self, shell_type: Optional[str] = None) -> List[Dict]: |
|
""" |
|
List available shells, optionally filtered by type |
|
|
|
Parameters: |
|
----------- |
|
shell_type : Optional[str] |
|
Filter shells by type if provided |
|
|
|
Returns: |
|
-------- |
|
List of shell information dictionaries |
|
""" |
|
shells = [] |
|
|
|
for shell_id, shell_def in self.loaded_shells.items(): |
|
|
|
if shell_type and shell_def.get("type") != shell_type: |
|
continue |
|
|
|
shells.append({ |
|
"id": shell_id, |
|
"description": shell_def.get("description", ""), |
|
"type": shell_def.get("type", "unknown"), |
|
"operations_count": len(shell_def.get("operations", [])), |
|
"tags": shell_def.get("tags", []) |
|
}) |
|
|
|
return shells |
|
|
|
def execute_shell( |
|
self, |
|
shell_id: str, |
|
prompt: str, |
|
parameters: Optional[Dict] = None, |
|
visualize: bool = None, |
|
trace_attribution: bool = None, |
|
return_residue: bool = None |
|
) -> ShellExecutionResult: |
|
""" |
|
Execute a symbolic shell with the given prompt |
|
|
|
Parameters: |
|
----------- |
|
shell_id : str |
|
ID of the shell to execute |
|
prompt : str |
|
Input prompt for the shell |
|
parameters : Optional[Dict] |
|
Additional parameters to override shell defaults |
|
visualize : bool |
|
Whether to generate visualization (overrides config) |
|
trace_attribution : bool |
|
Whether to trace attribution (overrides config) |
|
return_residue : bool |
|
Whether to return symbolic residue (overrides config) |
|
|
|
Returns: |
|
-------- |
|
ShellExecutionResult object with execution results |
|
""" |
|
import time |
|
|
|
|
|
if shell_id not in self.loaded_shells: |
|
raise ValueError(f"Shell not found: {shell_id}") |
|
|
|
shell_def = self.loaded_shells[shell_id] |
|
log.info(f"Executing shell: {shell_id}") |
|
|
|
|
|
if parameters is None: |
|
parameters = {} |
|
|
|
|
|
if visualize is None: |
|
visualize = self.config.get("default_visualization", True) |
|
if trace_attribution is None: |
|
trace_attribution = self.config.get("attribution_tracing", True) |
|
if return_residue is None: |
|
return_residue = self.config.get("residue_logging", True) |
|
|
|
|
|
start_time = time.time() |
|
result = {} |
|
collapse_detected = False |
|
collapse_type = None |
|
attribution_map = None |
|
residue = None |
|
|
|
try: |
|
|
|
for operation_idx, operation in enumerate(shell_def["operations"]): |
|
|
|
operation_type = operation["type"] |
|
operation_params = {**operation.get("parameters", {}), **parameters} |
|
|
|
|
|
operation_result = self._execute_operation( |
|
operation_type, |
|
prompt, |
|
operation_params |
|
) |
|
|
|
|
|
if operation_result.get("collapse_detected", False): |
|
collapse_detected = True |
|
collapse_type = operation_result.get("collapse_type") |
|
log.warning(f"Collapse detected in operation {operation_idx}: {collapse_type}") |
|
|
|
|
|
if return_residue: |
|
residue = operation_result.get("residue", {}) |
|
|
|
|
|
result[f"operation_{operation_idx}"] = operation_result |
|
|
|
|
|
if operation.get("update_prompt", False) and "output" in operation_result: |
|
prompt = operation_result["output"] |
|
|
|
|
|
if trace_attribution: |
|
attribution_map = self.attribution_tracer.trace(prompt) |
|
|
|
|
|
visualization = None |
|
if visualize: |
|
visualization = self.visualizer.generate_shell_execution( |
|
shell_id, |
|
result, |
|
collapse_detected=collapse_detected, |
|
attribution_map=attribution_map |
|
) |
|
|
|
|
|
execution_time = time.time() - start_time |
|
execution_result = ShellExecutionResult( |
|
shell_id=shell_id, |
|
success=True, |
|
execution_time=execution_time, |
|
result=result, |
|
residue=residue, |
|
collapse_detected=collapse_detected, |
|
collapse_type=collapse_type, |
|
attribution_map=attribution_map, |
|
visualization=visualization, |
|
metadata={ |
|
"shell_type": shell_def.get("type", "unknown"), |
|
"timestamp": self.symbolic_engine.get_timestamp(), |
|
"prompt_length": len(prompt), |
|
"operations_count": len(shell_def["operations"]) |
|
} |
|
) |
|
|
|
|
|
self.shell_cache[shell_id] = execution_result |
|
|
|
log.info(f"Shell execution completed in {execution_time:.2f}s") |
|
return execution_result |
|
|
|
except Exception as e: |
|
log.error(f"Error executing shell {shell_id}: {e}") |
|
|
|
|
|
execution_time = time.time() - start_time |
|
return ShellExecutionResult( |
|
shell_id=shell_id, |
|
success=False, |
|
execution_time=execution_time, |
|
result={"error": str(e)}, |
|
metadata={ |
|
"shell_type": shell_def.get("type", "unknown"), |
|
"timestamp": self.symbolic_engine.get_timestamp(), |
|
"error_type": type(e).__name__ |
|
} |
|
) |
|
|
|
def _execute_operation( |
|
self, |
|
operation_type: str, |
|
prompt: str, |
|
parameters: Dict |
|
) -> Dict: |
|
"""Execute a single shell operation""" |
|
|
|
|
|
if operation_type == "reflect.trace": |
|
|
|
target = parameters.get("target", "reasoning") |
|
depth = parameters.get("depth", 3) |
|
detailed = parameters.get("detailed", True) |
|
visualize = parameters.get("visualize", False) |
|
|
|
return self.reflect_op.trace( |
|
content=prompt, |
|
target=target, |
|
depth=depth, |
|
detailed=detailed, |
|
visualize=visualize |
|
) |
|
|
|
elif operation_type == "reflect.attribution": |
|
|
|
sources = parameters.get("sources", "all") |
|
confidence = parameters.get("confidence", True) |
|
visualize = parameters.get("visualize", False) |
|
|
|
return self.reflect_op.attribution( |
|
content=prompt, |
|
sources=sources, |
|
confidence=confidence, |
|
visualize=visualize |
|
) |
|
|
|
elif operation_type == "collapse.detect": |
|
|
|
threshold = parameters.get("threshold", 0.7) |
|
alert = parameters.get("alert", True) |
|
|
|
return self.collapse_op.detect( |
|
content=prompt, |
|
threshold=threshold, |
|
alert=alert |
|
) |
|
|
|
elif operation_type == "collapse.prevent": |
|
|
|
trigger = parameters.get("trigger", "recursive_depth") |
|
threshold = parameters.get("threshold", 5) |
|
|
|
return self.collapse_op.prevent( |
|
content=prompt, |
|
trigger=trigger, |
|
threshold=threshold |
|
) |
|
|
|
elif operation_type == "ghostcircuit.identify": |
|
|
|
sensitivity = parameters.get("sensitivity", 0.7) |
|
threshold = parameters.get("threshold", 0.2) |
|
trace_type = parameters.get("trace_type", "full") |
|
visualize = parameters.get("visualize", False) |
|
|
|
return self.ghost_op.identify( |
|
content=prompt, |
|
sensitivity=sensitivity, |
|
threshold=threshold, |
|
trace_type=trace_type, |
|
visualize=visualize |
|
) |
|
|
|
elif operation_type == "model.generate": |
|
|
|
max_tokens = parameters.get("max_tokens", self.config.get("max_token_count", 2048)) |
|
temperature = parameters.get("temperature", 0.7) |
|
|
|
output = self.model.generate( |
|
prompt, |
|
max_tokens=max_tokens, |
|
temperature=temperature |
|
) |
|
|
|
return { |
|
"output": output, |
|
"prompt": prompt, |
|
"max_tokens": max_tokens, |
|
"temperature": temperature |
|
} |
|
|
|
else: |
|
raise ValueError(f"Unknown operation type: {operation_type}") |
|
|
|
def compare_shells( |
|
self, |
|
shell_ids: List[str], |
|
prompt: str, |
|
parameters: Optional[Dict] = None, |
|
visualize: bool = True |
|
) -> Dict: |
|
""" |
|
Compare execution results from multiple shells |
|
|
|
Parameters: |
|
----------- |
|
shell_ids : List[str] |
|
IDs of shells to compare |
|
prompt : str |
|
Input prompt for all shells |
|
parameters : Optional[Dict] |
|
Additional parameters to override shell defaults |
|
visualize : bool |
|
Whether to generate comparison visualization |
|
|
|
Returns: |
|
-------- |
|
Dict with comparison results |
|
""" |
|
|
|
results = {} |
|
for shell_id in shell_ids: |
|
try: |
|
result = self.execute_shell( |
|
shell_id, |
|
prompt, |
|
parameters=parameters, |
|
visualize=False |
|
) |
|
results[shell_id] = result |
|
except Exception as e: |
|
log.error(f"Error executing shell {shell_id} for comparison: {e}") |
|
results[shell_id] = { |
|
"error": str(e), |
|
"success": False |
|
} |
|
|
|
|
|
comparison_viz = None |
|
if visualize: |
|
comparison_viz = self.visualizer.generate_shell_comparison( |
|
results |
|
) |
|
|
|
|
|
comparison = { |
|
"results": results, |
|
"prompt": prompt, |
|
"visualization": comparison_viz, |
|
"timestamp": self.symbolic_engine.get_timestamp(), |
|
"shells_compared": shell_ids |
|
} |
|
|
|
return comparison |
|
|
|
def log_execution_result(self, result: ShellExecutionResult, log_path: Optional[str] = None) -> str: |
|
""" |
|
Log shell execution result to file |
|
|
|
Parameters: |
|
----------- |
|
result : ShellExecutionResult |
|
Execution result to log |
|
log_path : Optional[str] |
|
Path for log file (if None, uses default) |
|
|
|
Returns: |
|
-------- |
|
Path to log file |
|
""" |
|
if log_path is None: |
|
|
|
log_dir = self.config.get("log_directory", "logs") |
|
os.makedirs(log_dir, exist_ok=True) |
|
|
|
|
|
timestamp = result.metadata.get("timestamp", "").replace(":", "-").replace(" ", "_") |
|
log_path = os.path.join(log_dir, f"{result.shell_id}_{timestamp}.json") |
|
|
|
|
|
result_dict = { |
|
"shell_id": result.shell_id, |
|
"success": result.success, |
|
"execution_time": result.execution_time, |
|
"result": result.result, |
|
"residue": result.residue, |
|
"collapse_detected": result.collapse_detected, |
|
"collapse_type": result.collapse_type, |
|
"attribution_map": result.attribution_map, |
|
"metadata": result.metadata |
|
} |
|
|
|
|
|
if "visualization" in result_dict: |
|
del result_dict["visualization"] |
|
|
|
|
|
with open(log_path, 'w') as f: |
|
json.dump(result_dict, f, indent=2) |
|
|
|
log.info(f"Execution result logged to {log_path}") |
|
return log_path |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
import argparse |
|
import json |
|
|
|
|
|
parser = argparse.ArgumentParser(description="Symbolic Shell Executor for transformerOS") |
|
parser.add_argument("--shell", required=True, help="Shell ID to execute") |
|
parser.add_argument("--prompt", required=True, help="Input prompt") |
|
parser.add_argument("--pack", help="Path to shell pack to load first") |
|
parser.add_argument("--model", default="default", help="Model ID to use") |
|
parser.add_argument("--config", help="Path to executor configuration") |
|
parser.add_argument("--visualize", action="store_true", help="Generate visualization") |
|
parser.add_argument("--log", action="store_true", help="Log execution result") |
|
parser.add_argument("--output", help="Output file for result") |
|
|
|
args = parser.parse_args() |
|
|
|
|
|
executor = ShellExecutor( |
|
config_path=args.config, |
|
model_id=args.model |
|
) |
|
|
|
|
|
if args.pack: |
|
executor.load_shell_pack(args.pack) |
|
|
|
|
|
result = executor.execute_shell( |
|
args.shell, |
|
args.prompt, |
|
visualize=args.visualize |
|
) |
|
|
|
|
|
if args.log: |
|
executor.log_execution_result(result) |
|
|
|
|
|
if args.output: |
|
|
|
result_dict = { |
|
"shell_id": result.shell_id, |
|
"success": result.success, |
|
"execution_time": result.execution_time, |
|
"result": result.result, |
|
"collapse_detected": result.collapse_detected, |
|
"collapse_type": result.collapse_type, |
|
"metadata": result.metadata |
|
} |
|
|
|
|
|
if result.visualization: |
|
result_dict["visualization"] = result.visualization |
|
|
|
|
|
with open(args.output, 'w') as f: |
|
json.dump(result_dict, f, indent=2) |
|
else: |
|
|
|
print(f"Shell: {result.shell_id}") |
|
print(f"Success: {result.success}") |
|
print(f"Execution time: {result.execution_time:.2f}s") |
|
print(f"Collapse detected: {result.collapse_detected}") |
|
if result.collapse_detected: |
|
print(f"Collapse type: {result.collapse_type}") |
|
print("\nResult summary:") |
|
for op_key, op_result in result.result.items(): |
|
print(f" - {op_key}: {op_result.get('status', 'completed')}") |
|
|