|
""" |
|
Universal Tool Integration Framework for Cyber-LLM |
|
Plugin architecture and standardized API for external security tools |
|
|
|
Author: Muzan Sano <[email protected]> |
|
""" |
|
|
|
import asyncio |
|
import json |
|
import logging |
|
import importlib |
|
import inspect |
|
from datetime import datetime, timedelta |
|
from typing import Dict, List, Any, Optional, Tuple, Union, Callable |
|
from dataclasses import dataclass, field |
|
from enum import Enum |
|
from pathlib import Path |
|
import yaml |
|
import aiohttp |
|
import docker |
|
import subprocess |
|
|
|
from ..utils.logging_system import CyberLLMLogger, CyberLLMError, ErrorCategory |
|
from ..memory.persistent_memory import PersistentMemoryManager |
|
|
|
class ToolType(Enum): |
|
"""Types of security tools""" |
|
SCANNER = "scanner" |
|
ANALYZER = "analyzer" |
|
MONITOR = "monitor" |
|
FORENSICS = "forensics" |
|
THREAT_INTEL = "threat_intel" |
|
VULNERABILITY_MGMT = "vulnerability_mgmt" |
|
INCIDENT_RESPONSE = "incident_response" |
|
COMPLIANCE = "compliance" |
|
REPORTING = "reporting" |
|
AUTOMATION = "automation" |
|
|
|
class IntegrationMethod(Enum): |
|
"""Tool integration methods""" |
|
REST_API = "rest_api" |
|
CLI_WRAPPER = "cli_wrapper" |
|
PYTHON_LIBRARY = "python_library" |
|
DOCKER_CONTAINER = "docker_container" |
|
WEBHOOK = "webhook" |
|
SOCKET = "socket" |
|
DATABASE = "database" |
|
FILE_SYSTEM = "file_system" |
|
|
|
class ToolStatus(Enum): |
|
"""Tool availability status""" |
|
AVAILABLE = "available" |
|
BUSY = "busy" |
|
ERROR = "error" |
|
OFFLINE = "offline" |
|
MAINTENANCE = "maintenance" |
|
|
|
@dataclass |
|
class ToolCapability: |
|
"""Tool capability definition""" |
|
capability_id: str |
|
name: str |
|
description: str |
|
|
|
|
|
input_schema: Dict[str, Any] |
|
output_schema: Dict[str, Any] |
|
|
|
|
|
typical_execution_time: float |
|
resource_requirements: Dict[str, float] |
|
|
|
|
|
success_rate: float = 0.95 |
|
error_rate: float = 0.05 |
|
|
|
|
|
required_credentials: List[str] = field(default_factory=list) |
|
required_permissions: List[str] = field(default_factory=list) |
|
|
|
@dataclass |
|
class ToolMetadata: |
|
"""Comprehensive tool metadata""" |
|
tool_id: str |
|
name: str |
|
version: str |
|
vendor: str |
|
|
|
|
|
tool_type: ToolType |
|
integration_method: IntegrationMethod |
|
|
|
|
|
capabilities: List[ToolCapability] |
|
supported_formats: List[str] |
|
|
|
|
|
endpoint_url: Optional[str] = None |
|
api_key_required: bool = False |
|
authentication_method: Optional[str] = None |
|
|
|
|
|
docker_image: Optional[str] = None |
|
docker_config: Dict[str, Any] = field(default_factory=dict) |
|
|
|
|
|
executable_path: Optional[str] = None |
|
command_template: Optional[str] = None |
|
|
|
|
|
status: ToolStatus = ToolStatus.OFFLINE |
|
last_health_check: Optional[datetime] = None |
|
health_check_interval: int = 300 |
|
|
|
|
|
total_invocations: int = 0 |
|
successful_invocations: int = 0 |
|
average_response_time: float = 0.0 |
|
|
|
@dataclass |
|
class ToolExecutionRequest: |
|
"""Tool execution request""" |
|
request_id: str |
|
tool_id: str |
|
capability_id: str |
|
|
|
|
|
input_data: Dict[str, Any] |
|
parameters: Dict[str, Any] = field(default_factory=dict) |
|
|
|
|
|
timeout: int = 300 |
|
priority: int = 5 |
|
retry_count: int = 3 |
|
|
|
|
|
correlation_id: Optional[str] = None |
|
requested_by: Optional[str] = None |
|
timestamp: datetime = field(default_factory=datetime.now) |
|
|
|
@dataclass |
|
class ToolExecutionResult: |
|
"""Tool execution result""" |
|
request_id: str |
|
tool_id: str |
|
capability_id: str |
|
|
|
|
|
success: bool |
|
output_data: Dict[str, Any] |
|
error_message: Optional[str] = None |
|
|
|
|
|
execution_time: float = 0.0 |
|
resource_usage: Dict[str, float] = field(default_factory=dict) |
|
|
|
|
|
executed_at: datetime = field(default_factory=datetime.now) |
|
tool_version: Optional[str] = None |
|
|
|
class UniversalToolRegistry: |
|
"""Central registry for all integrated tools""" |
|
|
|
def __init__(self, logger: Optional[CyberLLMLogger] = None): |
|
self.logger = logger or CyberLLMLogger(name="tool_registry") |
|
|
|
|
|
self.registered_tools = {} |
|
self.tool_instances = {} |
|
self.capability_index = {} |
|
|
|
|
|
self.discovery_paths = [] |
|
self.validation_rules = {} |
|
|
|
|
|
self.health_monitors = {} |
|
self.usage_statistics = {} |
|
|
|
self.logger.info("Universal Tool Registry initialized") |
|
|
|
async def register_tool(self, metadata: ToolMetadata) -> bool: |
|
"""Register a new tool""" |
|
|
|
try: |
|
|
|
if not await self._validate_tool_metadata(metadata): |
|
self.logger.error("Invalid tool metadata", tool_id=metadata.tool_id) |
|
return False |
|
|
|
|
|
if metadata.tool_id in self.registered_tools: |
|
self.logger.warning("Tool already registered", tool_id=metadata.tool_id) |
|
return False |
|
|
|
|
|
tool_instance = await self._create_tool_instance(metadata) |
|
if not tool_instance: |
|
self.logger.error("Failed to create tool instance", tool_id=metadata.tool_id) |
|
return False |
|
|
|
|
|
health_status = await self._perform_health_check(tool_instance) |
|
metadata.status = ToolStatus.AVAILABLE if health_status else ToolStatus.ERROR |
|
metadata.last_health_check = datetime.now() |
|
|
|
|
|
self.registered_tools[metadata.tool_id] = metadata |
|
self.tool_instances[metadata.tool_id] = tool_instance |
|
|
|
|
|
for capability in metadata.capabilities: |
|
self.capability_index[capability.capability_id] = metadata.tool_id |
|
|
|
|
|
asyncio.create_task(self._monitor_tool_health(metadata.tool_id)) |
|
|
|
self.logger.info("Tool registered successfully", |
|
tool_id=metadata.tool_id, |
|
name=metadata.name, |
|
capabilities_count=len(metadata.capabilities)) |
|
|
|
return True |
|
|
|
except Exception as e: |
|
self.logger.error("Tool registration failed", |
|
tool_id=metadata.tool_id, |
|
error=str(e)) |
|
return False |
|
|
|
async def discover_tools(self, discovery_paths: List[str]) -> List[ToolMetadata]: |
|
"""Discover tools from specified paths""" |
|
|
|
discovered_tools = [] |
|
|
|
for path in discovery_paths: |
|
try: |
|
if Path(path).is_file() and path.endswith('.yaml'): |
|
|
|
metadata = await self._load_tool_from_yaml(path) |
|
if metadata: |
|
discovered_tools.append(metadata) |
|
|
|
elif Path(path).is_dir(): |
|
|
|
for yaml_file in Path(path).glob('*.yaml'): |
|
metadata = await self._load_tool_from_yaml(str(yaml_file)) |
|
if metadata: |
|
discovered_tools.append(metadata) |
|
|
|
except Exception as e: |
|
self.logger.error("Tool discovery failed for path", |
|
path=path, |
|
error=str(e)) |
|
|
|
self.logger.info("Tool discovery completed", |
|
discovered_count=len(discovered_tools)) |
|
|
|
return discovered_tools |
|
|
|
async def get_tool_by_capability(self, capability_id: str) -> Optional[ToolMetadata]: |
|
"""Get tool that provides specific capability""" |
|
|
|
tool_id = self.capability_index.get(capability_id) |
|
if tool_id: |
|
return self.registered_tools.get(tool_id) |
|
return None |
|
|
|
async def list_tools_by_type(self, tool_type: ToolType) -> List[ToolMetadata]: |
|
"""List all tools of specified type""" |
|
|
|
return [tool for tool in self.registered_tools.values() |
|
if tool.tool_type == tool_type and tool.status == ToolStatus.AVAILABLE] |
|
|
|
class ToolExecutionEngine: |
|
"""Engine for executing tools with advanced features""" |
|
|
|
def __init__(self, |
|
registry: UniversalToolRegistry, |
|
memory_manager: PersistentMemoryManager, |
|
logger: Optional[CyberLLMLogger] = None): |
|
|
|
self.registry = registry |
|
self.memory_manager = memory_manager |
|
self.logger = logger or CyberLLMLogger(name="tool_execution") |
|
|
|
|
|
self.active_executions = {} |
|
self.execution_queue = asyncio.Queue() |
|
self.execution_history = [] |
|
|
|
|
|
self.resource_limits = { |
|
"max_concurrent_executions": 10, |
|
"max_memory_per_execution": 2048, |
|
"max_cpu_per_execution": 2.0 |
|
} |
|
|
|
|
|
self.execution_cache = {} |
|
self.load_balancing = True |
|
|
|
|
|
asyncio.create_task(self._execution_worker()) |
|
|
|
self.logger.info("Tool Execution Engine initialized") |
|
|
|
async def execute_tool(self, request: ToolExecutionRequest) -> ToolExecutionResult: |
|
"""Execute a tool with specified capability""" |
|
|
|
try: |
|
self.logger.info("Tool execution requested", |
|
request_id=request.request_id, |
|
tool_id=request.tool_id, |
|
capability=request.capability_id) |
|
|
|
|
|
tool_metadata = self.registry.registered_tools.get(request.tool_id) |
|
if not tool_metadata: |
|
return ToolExecutionResult( |
|
request_id=request.request_id, |
|
tool_id=request.tool_id, |
|
capability_id=request.capability_id, |
|
success=False, |
|
output_data={}, |
|
error_message="Tool not found" |
|
) |
|
|
|
|
|
if tool_metadata.status != ToolStatus.AVAILABLE: |
|
return ToolExecutionResult( |
|
request_id=request.request_id, |
|
tool_id=request.tool_id, |
|
capability_id=request.capability_id, |
|
success=False, |
|
output_data={}, |
|
error_message=f"Tool not available: {tool_metadata.status.value}" |
|
) |
|
|
|
|
|
cache_key = self._generate_cache_key(request) |
|
if cache_key in self.execution_cache: |
|
cached_result = self.execution_cache[cache_key] |
|
if self._is_cache_valid(cached_result): |
|
self.logger.info("Returning cached result", request_id=request.request_id) |
|
return cached_result |
|
|
|
|
|
tool_instance = self.registry.tool_instances[request.tool_id] |
|
start_time = datetime.now() |
|
|
|
try: |
|
|
|
if tool_metadata.integration_method == IntegrationMethod.REST_API: |
|
result = await self._execute_rest_api(tool_instance, request) |
|
elif tool_metadata.integration_method == IntegrationMethod.CLI_WRAPPER: |
|
result = await self._execute_cli_wrapper(tool_instance, request) |
|
elif tool_metadata.integration_method == IntegrationMethod.PYTHON_LIBRARY: |
|
result = await self._execute_python_library(tool_instance, request) |
|
elif tool_metadata.integration_method == IntegrationMethod.DOCKER_CONTAINER: |
|
result = await self._execute_docker_container(tool_instance, request) |
|
else: |
|
raise NotImplementedError(f"Integration method {tool_metadata.integration_method} not implemented") |
|
|
|
execution_time = (datetime.now() - start_time).total_seconds() |
|
|
|
execution_result = ToolExecutionResult( |
|
request_id=request.request_id, |
|
tool_id=request.tool_id, |
|
capability_id=request.capability_id, |
|
success=True, |
|
output_data=result, |
|
execution_time=execution_time, |
|
executed_at=start_time, |
|
tool_version=tool_metadata.version |
|
) |
|
|
|
|
|
self.execution_cache[cache_key] = execution_result |
|
|
|
|
|
self._update_execution_statistics(tool_metadata, execution_result) |
|
|
|
self.logger.info("Tool execution completed", |
|
request_id=request.request_id, |
|
execution_time=execution_time, |
|
success=True) |
|
|
|
return execution_result |
|
|
|
except asyncio.TimeoutError: |
|
return ToolExecutionResult( |
|
request_id=request.request_id, |
|
tool_id=request.tool_id, |
|
capability_id=request.capability_id, |
|
success=False, |
|
output_data={}, |
|
error_message="Execution timeout", |
|
execution_time=(datetime.now() - start_time).total_seconds() |
|
) |
|
|
|
except Exception as e: |
|
self.logger.error("Tool execution failed", |
|
request_id=request.request_id, |
|
error=str(e)) |
|
|
|
return ToolExecutionResult( |
|
request_id=request.request_id, |
|
tool_id=request.tool_id, |
|
capability_id=request.capability_id, |
|
success=False, |
|
output_data={}, |
|
error_message=str(e) |
|
) |
|
|
|
async def _execute_rest_api(self, tool_instance: Any, request: ToolExecutionRequest) -> Dict[str, Any]: |
|
"""Execute REST API based tool""" |
|
|
|
async with aiohttp.ClientSession() as session: |
|
headers = await self._get_api_headers(tool_instance, request) |
|
|
|
async with session.post( |
|
tool_instance.endpoint, |
|
json=request.input_data, |
|
headers=headers, |
|
timeout=aiohttp.ClientTimeout(total=request.timeout) |
|
) as response: |
|
|
|
if response.status == 200: |
|
return await response.json() |
|
else: |
|
raise Exception(f"API call failed with status {response.status}: {await response.text()}") |
|
|
|
async def _execute_cli_wrapper(self, tool_instance: Any, request: ToolExecutionRequest) -> Dict[str, Any]: |
|
"""Execute CLI wrapper based tool""" |
|
|
|
|
|
command = tool_instance.command_template.format(**request.input_data) |
|
|
|
|
|
process = await asyncio.create_subprocess_shell( |
|
command, |
|
stdout=asyncio.subprocess.PIPE, |
|
stderr=asyncio.subprocess.PIPE |
|
) |
|
|
|
try: |
|
stdout, stderr = await asyncio.wait_for( |
|
process.communicate(), |
|
timeout=request.timeout |
|
) |
|
|
|
if process.returncode == 0: |
|
return {"stdout": stdout.decode(), "stderr": stderr.decode()} |
|
else: |
|
raise Exception(f"Command failed with return code {process.returncode}: {stderr.decode()}") |
|
|
|
finally: |
|
if process.returncode is None: |
|
process.terminate() |
|
await process.wait() |
|
|
|
class PluginManager: |
|
"""Manager for dynamic plugin loading and lifecycle""" |
|
|
|
def __init__(self, |
|
registry: UniversalToolRegistry, |
|
logger: Optional[CyberLLMLogger] = None): |
|
|
|
self.registry = registry |
|
self.logger = logger or CyberLLMLogger(name="plugin_manager") |
|
|
|
|
|
self.loaded_plugins = {} |
|
self.plugin_hooks = {} |
|
self.plugin_dependencies = {} |
|
|
|
self.logger.info("Plugin Manager initialized") |
|
|
|
async def load_plugin(self, plugin_path: str) -> bool: |
|
"""Dynamically load a plugin""" |
|
|
|
try: |
|
|
|
spec = importlib.util.spec_from_file_location("plugin", plugin_path) |
|
plugin_module = importlib.util.module_from_spec(spec) |
|
spec.loader.exec_module(plugin_module) |
|
|
|
|
|
if not hasattr(plugin_module, 'PLUGIN_METADATA'): |
|
raise Exception("Plugin missing PLUGIN_METADATA") |
|
|
|
if not hasattr(plugin_module, 'initialize_plugin'): |
|
raise Exception("Plugin missing initialize_plugin function") |
|
|
|
|
|
plugin_metadata = plugin_module.PLUGIN_METADATA |
|
plugin_instance = await plugin_module.initialize_plugin() |
|
|
|
|
|
if hasattr(plugin_instance, 'get_tool_metadata'): |
|
tool_metadata = await plugin_instance.get_tool_metadata() |
|
await self.registry.register_tool(tool_metadata) |
|
|
|
|
|
plugin_id = plugin_metadata['id'] |
|
self.loaded_plugins[plugin_id] = { |
|
'module': plugin_module, |
|
'instance': plugin_instance, |
|
'metadata': plugin_metadata, |
|
'path': plugin_path |
|
} |
|
|
|
self.logger.info("Plugin loaded successfully", |
|
plugin_id=plugin_id, |
|
name=plugin_metadata.get('name')) |
|
|
|
return True |
|
|
|
except Exception as e: |
|
self.logger.error("Plugin loading failed", |
|
plugin_path=plugin_path, |
|
error=str(e)) |
|
return False |
|
|
|
|
|
def create_tool_registry(**kwargs) -> UniversalToolRegistry: |
|
"""Create universal tool registry""" |
|
return UniversalToolRegistry(**kwargs) |
|
|
|
def create_tool_execution_engine(registry: UniversalToolRegistry, |
|
memory_manager: PersistentMemoryManager, |
|
**kwargs) -> ToolExecutionEngine: |
|
"""Create tool execution engine""" |
|
return ToolExecutionEngine(registry, memory_manager, **kwargs) |
|
|
|
def create_plugin_manager(registry: UniversalToolRegistry, |
|
**kwargs) -> PluginManager: |
|
"""Create plugin manager""" |
|
return PluginManager(registry, **kwargs) |
|
|