cyber_llm / src /orchestration /advanced_workflows.py
unit731's picture
Upload core Cyber-LLM platform components
23804b3 verified
"""
Advanced Workflow Orchestration System for Cyber-LLM
Implements complex multi-agent scenarios, dynamic adaptation, and external tool integration
"""
import os
import json
import yaml
import asyncio
from typing import Dict, List, Any, Optional, Callable, Union
from datetime import datetime, timedelta
from pathlib import Path
from dataclasses import dataclass, asdict, field
from enum import Enum
import uuid
import subprocess
from concurrent.futures import ThreadPoolExecutor
import aiohttp
import redis
from ..utils.logging_system import CyberLLMLogger, CyberLLMError, ErrorCategory, retry_with_backoff
from ..utils.secrets_manager import get_secrets_manager
from ..agents.orchestrator_agent import OrchestratorAgent
class WorkflowStatus(Enum):
"""Workflow execution status"""
PENDING = "pending"
RUNNING = "running"
PAUSED = "paused"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class TaskPriority(Enum):
"""Task priority levels"""
LOW = 1
NORMAL = 2
HIGH = 3
CRITICAL = 4
@dataclass
class WorkflowContext:
"""Dynamic workflow context with adaptive capabilities"""
workflow_id: str
current_stage: str
variables: Dict[str, Any] = field(default_factory=dict)
agent_states: Dict[str, Dict[str, Any]] = field(default_factory=dict)
execution_history: List[Dict[str, Any]] = field(default_factory=list)
metrics: Dict[str, float] = field(default_factory=dict)
external_tool_configs: Dict[str, Dict[str, Any]] = field(default_factory=dict)
adaptation_rules: List[Dict[str, Any]] = field(default_factory=list)
rollback_points: List[Dict[str, Any]] = field(default_factory=list)
@dataclass
class ExternalToolConfig:
"""Configuration for external tool integration"""
tool_name: str
tool_type: str # metasploit, burp_suite, nmap, etc.
endpoint: str
api_key: Optional[str] = None
credentials: Optional[Dict[str, str]] = None
timeout: int = 300
retry_attempts: int = 3
custom_headers: Dict[str, str] = field(default_factory=dict)
class AdvancedWorkflowEngine:
"""Advanced workflow orchestration engine with dynamic adaptation"""
def __init__(self,
redis_url: str = "redis://localhost:6379",
max_concurrent_workflows: int = 10,
logger: Optional[CyberLLMLogger] = None):
self.logger = logger or CyberLLMLogger(name="advanced_workflow_engine")
self.max_concurrent_workflows = max_concurrent_workflows
# Initialize Redis for state management
try:
self.redis_client = redis.from_url(redis_url, decode_responses=True)
self.redis_client.ping()
except Exception as e:
self.logger.warning(f"Redis not available, using in-memory storage: {e}")
self.redis_client = None
# Workflow state management
self.active_workflows: Dict[str, WorkflowContext] = {}
self.workflow_templates: Dict[str, Dict[str, Any]] = {}
self.external_tools: Dict[str, ExternalToolConfig] = {}
# Execution control
self.executor = ThreadPoolExecutor(max_workers=max_concurrent_workflows)
self.is_running = False
# Load workflow templates and external tool configs
self._load_workflow_templates()
self._load_external_tool_configs()
def _load_workflow_templates(self):
"""Load advanced workflow templates"""
templates_dir = Path(__file__).parent / "templates"
if not templates_dir.exists():
templates_dir.mkdir(parents=True)
self._create_default_templates(templates_dir)
for template_file in templates_dir.glob("*.yaml"):
try:
with open(template_file, 'r') as f:
template = yaml.safe_load(f)
template_name = template_file.stem
self.workflow_templates[template_name] = template
self.logger.info(f"Loaded workflow template: {template_name}")
except Exception as e:
self.logger.error(f"Failed to load template {template_file}", error=str(e))
def _create_default_templates(self, templates_dir: Path):
"""Create default advanced workflow templates"""
# Advanced Red Team Exercise Template
red_team_template = {
"name": "advanced_red_team_exercise",
"description": "Comprehensive red team exercise with dynamic adaptation",
"version": "2.0",
"timeout": 3600, # 1 hour
"parallel_execution": True,
"dynamic_adaptation": True,
"rollback_enabled": True,
"stages": {
"reconnaissance": {
"type": "parallel",
"agents": ["recon_agent"],
"external_tools": ["nmap", "shodan", "amass"],
"timeout": 600,
"adaptation_rules": [
{
"condition": "target_ports_found > 50",
"action": "increase_scan_depth",
"parameters": {"depth": 2}
},
{
"condition": "stealth_score < 0.7",
"action": "reduce_scan_frequency",
"parameters": {"delay": 5}
}
],
"tasks": [
{
"name": "network_discovery",
"agent": "recon_agent",
"action": "discover_network",
"parameters": {
"target": "${workflow.target}",
"scan_type": "stealth",
"timeout": 300
},
"success_criteria": {
"min_hosts_found": 1,
"stealth_score": 0.7
}
},
{
"name": "service_enumeration",
"agent": "recon_agent",
"action": "enumerate_services",
"dependencies": ["network_discovery"],
"parameters": {
"targets": "${reconnaissance.network_discovery.hosts}",
"depth": "moderate"
}
},
{
"name": "vulnerability_assessment",
"external_tool": "nessus",
"parameters": {
"targets": "${reconnaissance.service_enumeration.services}",
"scan_template": "advanced"
},
"parallel": True
}
]
},
"initial_access": {
"type": "sequential",
"agents": ["c2_agent"],
"external_tools": ["metasploit", "burp_suite"],
"dependencies": ["reconnaissance"],
"timeout": 900,
"rollback_point": True,
"adaptation_rules": [
{
"condition": "exploit_attempts > 3 and success_rate < 0.3",
"action": "switch_strategy",
"parameters": {"strategy": "social_engineering"}
}
],
"tasks": [
{
"name": "exploit_selection",
"agent": "c2_agent",
"action": "select_exploits",
"parameters": {
"vulnerabilities": "${reconnaissance.vulnerability_assessment.findings}",
"target_os": "${reconnaissance.network_discovery.os_info}",
"stealth_required": True
}
},
{
"name": "payload_generation",
"external_tool": "metasploit",
"action": "generate_payload",
"parameters": {
"exploit": "${initial_access.exploit_selection.chosen_exploit}",
"target": "${workflow.target}",
"avoid_detection": True
}
},
{
"name": "exploitation",
"agent": "c2_agent",
"action": "execute_exploit",
"parameters": {
"payload": "${initial_access.payload_generation.payload}",
"target": "${workflow.target}",
"max_attempts": 3
},
"success_criteria": {
"shell_obtained": True,
"detection_avoided": True
}
}
]
},
"post_exploitation": {
"type": "parallel",
"agents": ["post_exploit_agent"],
"dependencies": ["initial_access"],
"timeout": 1200,
"conditional": {
"condition": "initial_access.exploitation.success == True",
"else_action": "skip_stage"
},
"tasks": [
{
"name": "privilege_escalation",
"agent": "post_exploit_agent",
"action": "escalate_privileges",
"parameters": {
"session": "${initial_access.exploitation.session}",
"target_privilege": "system"
},
"priority": "high"
},
{
"name": "persistence",
"agent": "post_exploit_agent",
"action": "establish_persistence",
"parameters": {
"session": "${initial_access.exploitation.session}",
"method": "service",
"stealth": True
},
"priority": "normal"
},
{
"name": "lateral_movement",
"agent": "post_exploit_agent",
"action": "move_laterally",
"dependencies": ["privilege_escalation"],
"parameters": {
"session": "${initial_access.exploitation.session}",
"discovery_method": "active_directory"
},
"priority": "high"
}
]
},
"cleanup": {
"type": "sequential",
"agents": ["post_exploit_agent", "safety_agent"],
"always_execute": True,
"timeout": 300,
"tasks": [
{
"name": "remove_persistence",
"agent": "post_exploit_agent",
"action": "cleanup_persistence",
"parameters": {
"persistence_info": "${post_exploitation.persistence.info}"
}
},
{
"name": "close_sessions",
"agent": "post_exploit_agent",
"action": "cleanup_sessions",
"parameters": {
"sessions": "${workflow.active_sessions}"
}
},
{
"name": "safety_verification",
"agent": "safety_agent",
"action": "verify_cleanup",
"parameters": {
"target": "${workflow.target}",
"cleanup_actions": "${cleanup.remove_persistence.actions}"
}
}
]
}
},
"success_criteria": {
"overall": {
"min_stages_completed": 3,
"stealth_score": 0.8,
"safety_compliance": 0.95
}
},
"failure_handling": {
"max_retries": 2,
"rollback_on_failure": True,
"escalation_rules": [
{
"condition": "detection_probability > 0.8",
"action": "immediate_cleanup"
}
]
}
}
# Web Application Security Assessment Template
web_app_template = {
"name": "web_app_security_assessment",
"description": "Comprehensive web application security testing",
"version": "2.0",
"timeout": 2400, # 40 minutes
"parallel_execution": True,
"stages": {
"reconnaissance": {
"type": "parallel",
"agents": ["recon_agent"],
"external_tools": ["burp_suite", "dirb", "nikto"],
"tasks": [
{
"name": "web_discovery",
"agent": "recon_agent",
"action": "discover_web_assets",
"parameters": {
"target": "${workflow.target}",
"depth": 3,
"follow_redirects": True
}
},
{
"name": "technology_fingerprinting",
"external_tool": "whatweb",
"parameters": {
"target": "${workflow.target}",
"aggression": 3
}
}
]
},
"vulnerability_scanning": {
"type": "parallel",
"external_tools": ["burp_suite", "owasp_zap", "sqlmap"],
"dependencies": ["reconnaissance"],
"tasks": [
{
"name": "automated_scan",
"external_tool": "owasp_zap",
"parameters": {
"target": "${workflow.target}",
"scan_policy": "full",
"spider_depth": 5
}
},
{
"name": "sql_injection_test",
"external_tool": "sqlmap",
"parameters": {
"urls": "${reconnaissance.web_discovery.forms}",
"risk": 2,
"level": 3
}
}
]
},
"manual_testing": {
"type": "sequential",
"agents": ["recon_agent", "c2_agent"],
"dependencies": ["vulnerability_scanning"],
"tasks": [
{
"name": "business_logic_testing",
"agent": "recon_agent",
"action": "test_business_logic",
"parameters": {
"application_map": "${reconnaissance.web_discovery.map}",
"user_roles": "${workflow.user_roles}"
}
},
{
"name": "authentication_bypass",
"agent": "c2_agent",
"action": "test_auth_bypass",
"parameters": {
"auth_endpoints": "${reconnaissance.web_discovery.auth_endpoints}",
"methods": ["parameter_pollution", "race_condition", "jwt_manipulation"]
}
}
]
}
}
}
# Save templates
with open(templates_dir / "advanced_red_team_exercise.yaml", 'w') as f:
yaml.dump(red_team_template, f, default_flow_style=False)
with open(templates_dir / "web_app_security_assessment.yaml", 'w') as f:
yaml.dump(web_app_template, f, default_flow_style=False)
self.logger.info("Created default advanced workflow templates")
def _load_external_tool_configs(self):
"""Load external tool configurations"""
secrets_manager = get_secrets_manager()
# Common external tools configuration
tool_configs = {
"metasploit": ExternalToolConfig(
tool_name="metasploit",
tool_type="exploitation_framework",
endpoint="http://localhost:55553",
api_key=None, # Will be loaded from secrets
timeout=300
),
"burp_suite": ExternalToolConfig(
tool_name="burp_suite",
tool_type="web_security",
endpoint="http://localhost:1337",
timeout=600
),
"nmap": ExternalToolConfig(
tool_name="nmap",
tool_type="network_scanner",
endpoint="localhost", # CLI tool
timeout=300
),
"owasp_zap": ExternalToolConfig(
tool_name="owasp_zap",
tool_type="web_security",
endpoint="http://localhost:8080",
timeout=900
),
"sqlmap": ExternalToolConfig(
tool_name="sqlmap",
tool_type="sql_injection",
endpoint="localhost", # CLI tool
timeout=600
),
"nessus": ExternalToolConfig(
tool_name="nessus",
tool_type="vulnerability_scanner",
endpoint="https://localhost:8834",
timeout=1800
)
}
# Load API keys from secrets manager
for tool_name, config in tool_configs.items():
try:
api_key_name = f"{tool_name}_api_key"
# This would normally load from secrets manager
# config.api_key = asyncio.run(secrets_manager.get_secret(api_key_name))
self.logger.debug(f"External tool configured: {tool_name}")
except Exception as e:
self.logger.warning(f"Failed to load API key for {tool_name}", error=str(e))
self.external_tools = tool_configs
self.logger.info(f"Configured {len(tool_configs)} external tools")
async def create_workflow(self,
template_name: str,
parameters: Dict[str, Any],
workflow_id: Optional[str] = None) -> str:
"""Create a new workflow instance"""
if template_name not in self.workflow_templates:
raise CyberLLMError(f"Workflow template not found: {template_name}", ErrorCategory.VALIDATION)
workflow_id = workflow_id or str(uuid.uuid4())
# Create workflow context
context = WorkflowContext(
workflow_id=workflow_id,
current_stage="pending",
variables=parameters.copy()
)
# Add template-specific configuration
template = self.workflow_templates[template_name]
context.variables.update({
"template_name": template_name,
"template_version": template.get("version", "1.0"),
"created_at": datetime.now().isoformat(),
"timeout": template.get("timeout", 1800)
})
# Load adaptation rules from template
if template.get("dynamic_adaptation"):
context.adaptation_rules = self._extract_adaptation_rules(template)
# Store workflow context
self.active_workflows[workflow_id] = context
if self.redis_client:
await self._save_workflow_state(workflow_id, context)
self.logger.info(f"Created workflow: {workflow_id} from template: {template_name}")
return workflow_id
def _extract_adaptation_rules(self, template: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Extract adaptation rules from workflow template"""
rules = []
for stage_name, stage_config in template.get("stages", {}).items():
stage_rules = stage_config.get("adaptation_rules", [])
for rule in stage_rules:
rule["stage"] = stage_name
rules.append(rule)
return rules
async def execute_workflow(self, workflow_id: str) -> Dict[str, Any]:
"""Execute workflow with advanced orchestration"""
if workflow_id not in self.active_workflows:
raise CyberLLMError(f"Workflow not found: {workflow_id}", ErrorCategory.VALIDATION)
context = self.active_workflows[workflow_id]
template = self.workflow_templates[context.variables["template_name"]]
try:
context.current_stage = "running"
context.variables["started_at"] = datetime.now().isoformat()
self.logger.info(f"Starting workflow execution: {workflow_id}")
# Execute stages
stages = template["stages"]
execution_results = {}
for stage_name, stage_config in stages.items():
self.logger.info(f"Executing stage: {stage_name}")
# Check dependencies
dependencies = stage_config.get("dependencies", [])
if not self._check_dependencies(dependencies, execution_results):
self.logger.warning(f"Dependencies not met for stage: {stage_name}")
continue
# Check conditional execution
if not self._check_conditional(stage_config.get("conditional"), context, execution_results):
self.logger.info(f"Skipping stage due to condition: {stage_name}")
continue
# Create rollback point if specified
if stage_config.get("rollback_point", False):
await self._create_rollback_point(context, stage_name)
# Execute stage
stage_result = await self._execute_stage(
stage_name, stage_config, context, execution_results
)
execution_results[stage_name] = stage_result
# Apply dynamic adaptations
if template.get("dynamic_adaptation"):
await self._apply_adaptations(context, stage_name, stage_result)
# Check for failure and handle rollback
if not stage_result.get("success", False):
failure_config = template.get("failure_handling", {})
if failure_config.get("rollback_on_failure", False):
await self._handle_failure_rollback(context, stage_name)
break
# Calculate overall success
success_criteria = template.get("success_criteria", {}).get("overall", {})
overall_success = self._evaluate_success_criteria(success_criteria, execution_results, context)
# Update workflow status
context.current_stage = "completed" if overall_success else "failed"
context.variables["completed_at"] = datetime.now().isoformat()
context.variables["success"] = overall_success
# Generate execution report
execution_report = {
"workflow_id": workflow_id,
"template": context.variables["template_name"],
"status": context.current_stage,
"duration": self._calculate_duration(context),
"stages_executed": len(execution_results),
"overall_success": overall_success,
"results": execution_results,
"metrics": context.metrics,
"adaptation_actions": len(context.execution_history)
}
self.logger.info(f"Workflow execution completed: {workflow_id}, Success: {overall_success}")
return execution_report
except Exception as e:
context.current_stage = "failed"
self.logger.error(f"Workflow execution failed: {workflow_id}", error=str(e))
raise CyberLLMError(f"Workflow execution failed: {str(e)}", ErrorCategory.SYSTEM)
async def _execute_stage(self,
stage_name: str,
stage_config: Dict[str, Any],
context: WorkflowContext,
previous_results: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a workflow stage with parallel/sequential support"""
stage_type = stage_config.get("type", "sequential")
tasks = stage_config.get("tasks", [])
timeout = stage_config.get("timeout", 300)
stage_result = {
"stage": stage_name,
"type": stage_type,
"started_at": datetime.now().isoformat(),
"tasks": {},
"success": True,
"errors": []
}
try:
if stage_type == "parallel":
# Execute tasks in parallel
task_futures = []
for task in tasks:
if self._check_task_dependencies(task, stage_result["tasks"]):
future = asyncio.create_task(
self._execute_task(task, context, previous_results, stage_result)
)
task_futures.append((task["name"], future))
# Wait for all tasks with timeout
try:
results = await asyncio.wait_for(
asyncio.gather(*[future for _, future in task_futures], return_exceptions=True),
timeout=timeout
)
for (task_name, _), result in zip(task_futures, results):
if isinstance(result, Exception):
stage_result["errors"].append(f"Task {task_name} failed: {str(result)}")
stage_result["success"] = False
else:
stage_result["tasks"][task_name] = result
except asyncio.TimeoutError:
stage_result["errors"].append(f"Stage timeout after {timeout} seconds")
stage_result["success"] = False
else: # sequential execution
for task in tasks:
if not self._check_task_dependencies(task, stage_result["tasks"]):
continue
try:
task_result = await asyncio.wait_for(
self._execute_task(task, context, previous_results, stage_result),
timeout=task.get("timeout", timeout)
)
stage_result["tasks"][task["name"]] = task_result
# Check if task failed and should stop stage
if not task_result.get("success", False) and task.get("critical", False):
stage_result["success"] = False
break
except asyncio.TimeoutError:
error_msg = f"Task {task['name']} timeout"
stage_result["errors"].append(error_msg)
if task.get("critical", False):
stage_result["success"] = False
break
except Exception as e:
error_msg = f"Task {task['name']} failed: {str(e)}"
stage_result["errors"].append(error_msg)
if task.get("critical", False):
stage_result["success"] = False
break
except Exception as e:
stage_result["errors"].append(f"Stage execution error: {str(e)}")
stage_result["success"] = False
stage_result["completed_at"] = datetime.now().isoformat()
stage_result["duration"] = (
datetime.fromisoformat(stage_result["completed_at"]) -
datetime.fromisoformat(stage_result["started_at"])
).total_seconds()
return stage_result
async def _execute_task(self,
task: Dict[str, Any],
context: WorkflowContext,
previous_results: Dict[str, Any],
stage_result: Dict[str, Any]) -> Dict[str, Any]:
"""Execute individual task (agent action or external tool)"""
task_name = task["name"]
task_result = {
"task": task_name,
"started_at": datetime.now().isoformat(),
"success": False,
"output": {},
"errors": []
}
try:
# Resolve parameters with variable substitution
resolved_params = self._resolve_parameters(
task.get("parameters", {}),
context,
previous_results,
stage_result
)
if "agent" in task:
# Execute agent task
task_result = await self._execute_agent_task(
task["agent"],
task["action"],
resolved_params,
task_result
)
elif "external_tool" in task:
# Execute external tool task
task_result = await self._execute_external_tool_task(
task["external_tool"],
task.get("action"),
resolved_params,
task_result
)
else:
raise CyberLLMError(f"Task type not specified: {task_name}", ErrorCategory.VALIDATION)
# Validate success criteria
success_criteria = task.get("success_criteria", {})
if success_criteria and not self._validate_success_criteria(success_criteria, task_result["output"]):
task_result["success"] = False
task_result["errors"].append("Success criteria not met")
except Exception as e:
task_result["errors"].append(f"Task execution error: {str(e)}")
task_result["success"] = False
task_result["completed_at"] = datetime.now().isoformat()
return task_result
async def _execute_agent_task(self,
agent_name: str,
action: str,
parameters: Dict[str, Any],
task_result: Dict[str, Any]) -> Dict[str, Any]:
"""Execute task using cyber-llm agent"""
# This would integrate with actual agent implementations
# For now, we'll simulate the execution
self.logger.info(f"Executing agent task: {agent_name}.{action}")
# Simulate agent execution
await asyncio.sleep(0.1) # Simulate processing time
task_result["output"] = {
"agent": agent_name,
"action": action,
"parameters": parameters,
"simulated": True,
"result": f"Simulated execution of {action} by {agent_name}"
}
task_result["success"] = True
return task_result
async def _execute_external_tool_task(self,
tool_name: str,
action: Optional[str],
parameters: Dict[str, Any],
task_result: Dict[str, Any]) -> Dict[str, Any]:
"""Execute task using external security tool"""
if tool_name not in self.external_tools:
raise CyberLLMError(f"External tool not configured: {tool_name}", ErrorCategory.CONFIGURATION)
tool_config = self.external_tools[tool_name]
self.logger.info(f"Executing external tool task: {tool_name}")
try:
if tool_config.tool_type in ["network_scanner", "sql_injection"]:
# CLI-based tools
result = await self._execute_cli_tool(tool_name, parameters)
else:
# API-based tools
result = await self._execute_api_tool(tool_config, action, parameters)
task_result["output"] = result
task_result["success"] = True
except Exception as e:
self.logger.error(f"External tool execution failed: {tool_name}", error=str(e))
task_result["errors"].append(f"External tool error: {str(e)}")
task_result["success"] = False
return task_result
async def _execute_cli_tool(self, tool_name: str, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Execute CLI-based security tool"""
if tool_name == "nmap":
target = parameters.get("target", "localhost")
scan_type = parameters.get("scan_type", "stealth")
cmd = ["nmap", "-sS", "-O", target] if scan_type == "stealth" else ["nmap", "-sV", target]
elif tool_name == "sqlmap":
url = parameters.get("url", "")
cmd = ["sqlmap", "-u", url, "--batch", "--risk=2", "--level=3"]
else:
raise CyberLLMError(f"Unsupported CLI tool: {tool_name}", ErrorCategory.VALIDATION)
try:
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
return {
"tool": tool_name,
"command": " ".join(cmd),
"exit_code": process.returncode,
"stdout": stdout.decode() if stdout else "",
"stderr": stderr.decode() if stderr else "",
"success": process.returncode == 0
}
except Exception as e:
raise CyberLLMError(f"CLI tool execution failed: {str(e)}", ErrorCategory.SYSTEM)
async def _execute_api_tool(self,
tool_config: ExternalToolConfig,
action: Optional[str],
parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Execute API-based security tool"""
headers = {"Content-Type": "application/json"}
headers.update(tool_config.custom_headers)
if tool_config.api_key:
headers["Authorization"] = f"Bearer {tool_config.api_key}"
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=tool_config.timeout)) as session:
if tool_config.tool_name == "metasploit":
# Metasploit RPC API simulation
endpoint = f"{tool_config.endpoint}/api/v1/exploit"
payload = {
"exploit": parameters.get("exploit", ""),
"target": parameters.get("target", ""),
"payload_type": parameters.get("payload_type", "reverse_tcp")
}
elif tool_config.tool_name == "burp_suite":
# Burp Suite API simulation
endpoint = f"{tool_config.endpoint}/burp/scanner/scans"
payload = {
"scan_type": "active",
"target": parameters.get("target", "")
}
elif tool_config.tool_name == "owasp_zap":
# OWASP ZAP API simulation
endpoint = f"{tool_config.endpoint}/JSON/ascan/action/scan/"
payload = {
"url": parameters.get("target", ""),
"policy": parameters.get("scan_policy", "default")
}
else:
raise CyberLLMError(f"Unsupported API tool: {tool_config.tool_name}", ErrorCategory.VALIDATION)
try:
async with session.post(endpoint, json=payload, headers=headers) as response:
result_data = await response.json() if response.content_type == 'application/json' else await response.text()
return {
"tool": tool_config.tool_name,
"endpoint": endpoint,
"status_code": response.status,
"response": result_data,
"success": response.status < 400
}
except aiohttp.ClientError as e:
raise CyberLLMError(f"API tool request failed: {str(e)}", ErrorCategory.NETWORK)
def _resolve_parameters(self,
parameters: Dict[str, Any],
context: WorkflowContext,
previous_results: Dict[str, Any],
stage_result: Dict[str, Any]) -> Dict[str, Any]:
"""Resolve parameter variables with workflow context"""
resolved = {}
for key, value in parameters.items():
if isinstance(value, str) and value.startswith("${"):
# Variable substitution
var_path = value[2:-1] # Remove ${ and }
resolved_value = self._resolve_variable(var_path, context, previous_results, stage_result)
resolved[key] = resolved_value
else:
resolved[key] = value
return resolved
def _resolve_variable(self,
var_path: str,
context: WorkflowContext,
previous_results: Dict[str, Any],
stage_result: Dict[str, Any]) -> Any:
"""Resolve variable from context"""
parts = var_path.split(".")
if parts[0] == "workflow":
# Workflow variables
current = context.variables
for part in parts[1:]:
current = current.get(part, "")
return current
elif parts[0] in previous_results:
# Previous stage results
current = previous_results[parts[0]]
for part in parts[1:]:
if isinstance(current, dict):
current = current.get(part, "")
else:
current = ""
return current
else:
self.logger.warning(f"Unable to resolve variable: {var_path}")
return ""
async def _apply_adaptations(self,
context: WorkflowContext,
stage_name: str,
stage_result: Dict[str, Any]):
"""Apply dynamic workflow adaptations based on results"""
for rule in context.adaptation_rules:
if rule.get("stage") != stage_name:
continue
condition = rule.get("condition", "")
if self._evaluate_condition(condition, context, stage_result):
action = rule.get("action", "")
parameters = rule.get("parameters", {})
await self._execute_adaptation_action(action, parameters, context, stage_result)
# Log adaptation
adaptation_record = {
"timestamp": datetime.now().isoformat(),
"stage": stage_name,
"condition": condition,
"action": action,
"parameters": parameters
}
context.execution_history.append(adaptation_record)
self.logger.info(f"Applied adaptation: {action} in stage {stage_name}")
def _evaluate_condition(self,
condition: str,
context: WorkflowContext,
stage_result: Dict[str, Any]) -> bool:
"""Evaluate adaptation condition"""
# Simple condition evaluation (in production, use a proper expression evaluator)
# Examples: "target_ports_found > 50", "stealth_score < 0.7"
try:
# This is a simplified implementation
# In production, use a safe expression evaluator
if ">" in condition:
left, right = condition.split(">")
left_val = self._get_condition_value(left.strip(), context, stage_result)
right_val = float(right.strip())
return float(left_val) > right_val
elif "<" in condition:
left, right = condition.split("<")
left_val = self._get_condition_value(left.strip(), context, stage_result)
right_val = float(right.strip())
return float(left_val) < right_val
elif "==" in condition:
left, right = condition.split("==")
left_val = self._get_condition_value(left.strip(), context, stage_result)
right_val = right.strip().strip('"\'')
return str(left_val) == right_val
return False
except Exception as e:
self.logger.warning(f"Failed to evaluate condition: {condition}", error=str(e))
return False
def _get_condition_value(self,
variable: str,
context: WorkflowContext,
stage_result: Dict[str, Any]) -> Any:
"""Get value for condition evaluation"""
if variable in context.metrics:
return context.metrics[variable]
elif variable in context.variables:
return context.variables[variable]
else:
# Try to extract from stage result
return 0 # Default value
async def _execute_adaptation_action(self,
action: str,
parameters: Dict[str, Any],
context: WorkflowContext,
stage_result: Dict[str, Any]):
"""Execute adaptation action"""
if action == "increase_scan_depth":
depth = parameters.get("depth", 1)
context.variables["scan_depth"] = context.variables.get("scan_depth", 1) + depth
elif action == "reduce_scan_frequency":
delay = parameters.get("delay", 1)
context.variables["scan_delay"] = delay
elif action == "switch_strategy":
new_strategy = parameters.get("strategy", "default")
context.variables["current_strategy"] = new_strategy
elif action == "immediate_cleanup":
context.variables["emergency_cleanup"] = True
else:
self.logger.warning(f"Unknown adaptation action: {action}")
async def _create_rollback_point(self, context: WorkflowContext, stage_name: str):
"""Create rollback point for workflow recovery"""
rollback_point = {
"stage": stage_name,
"timestamp": datetime.now().isoformat(),
"context_snapshot": {
"variables": context.variables.copy(),
"agent_states": context.agent_states.copy(),
"metrics": context.metrics.copy()
}
}
context.rollback_points.append(rollback_point)
self.logger.info(f"Created rollback point at stage: {stage_name}")
async def _handle_failure_rollback(self, context: WorkflowContext, failed_stage: str):
"""Handle workflow rollback on failure"""
if not context.rollback_points:
self.logger.warning("No rollback points available")
return
# Find the most recent rollback point before the failed stage
rollback_point = context.rollback_points[-1]
# Restore context
context.variables.update(rollback_point["context_snapshot"]["variables"])
context.agent_states.update(rollback_point["context_snapshot"]["agent_states"])
context.metrics.update(rollback_point["context_snapshot"]["metrics"])
self.logger.info(f"Rolled back workflow to stage: {rollback_point['stage']}")
# Additional utility methods would continue here...
async def _save_workflow_state(self, workflow_id: str, context: WorkflowContext):
"""Save workflow state to Redis"""
if self.redis_client:
try:
state_data = asdict(context)
await asyncio.get_event_loop().run_in_executor(
None,
self.redis_client.setex,
f"workflow:{workflow_id}",
3600, # 1 hour TTL
json.dumps(state_data, default=str)
)
except Exception as e:
self.logger.warning(f"Failed to save workflow state: {workflow_id}", error=str(e))
def _check_dependencies(self, dependencies: List[str], results: Dict[str, Any]) -> bool:
"""Check if stage dependencies are satisfied"""
return all(dep in results and results[dep].get("success", False) for dep in dependencies)
def _check_conditional(self,
conditional: Optional[Dict[str, Any]],
context: WorkflowContext,
results: Dict[str, Any]) -> bool:
"""Check conditional execution requirements"""
if not conditional:
return True
condition = conditional.get("condition", "")
return self._evaluate_condition(condition, context, {"tasks": results})
def _check_task_dependencies(self, task: Dict[str, Any], completed_tasks: Dict[str, Any]) -> bool:
"""Check if task dependencies are satisfied"""
dependencies = task.get("dependencies", [])
return all(dep in completed_tasks and completed_tasks[dep].get("success", False) for dep in dependencies)
def _validate_success_criteria(self, criteria: Dict[str, Any], output: Dict[str, Any]) -> bool:
"""Validate task success criteria"""
for key, expected_value in criteria.items():
if key not in output:
return False
actual_value = output[key]
if isinstance(expected_value, (int, float)):
if actual_value < expected_value:
return False
elif actual_value != expected_value:
return False
return True
def _evaluate_success_criteria(self,
criteria: Dict[str, Any],
results: Dict[str, Any],
context: WorkflowContext) -> bool:
"""Evaluate overall workflow success criteria"""
min_stages = criteria.get("min_stages_completed", 0)
completed_stages = sum(1 for r in results.values() if r.get("success", False))
if completed_stages < min_stages:
return False
# Check metric thresholds
for metric, threshold in criteria.items():
if metric.startswith("min_"):
continue
if metric in context.metrics:
if context.metrics[metric] < threshold:
return False
return True
def _calculate_duration(self, context: WorkflowContext) -> float:
"""Calculate workflow execution duration"""
if "started_at" in context.variables and "completed_at" in context.variables:
start = datetime.fromisoformat(context.variables["started_at"])
end = datetime.fromisoformat(context.variables["completed_at"])
return (end - start).total_seconds()
return 0.0
# Example usage and testing
if __name__ == "__main__":
import asyncio
async def test_advanced_workflow():
# Initialize advanced workflow engine
engine = AdvancedWorkflowEngine()
# Create red team exercise workflow
workflow_id = await engine.create_workflow(
"advanced_red_team_exercise",
{
"target": "192.168.1.100",
"stealth_required": True,
"max_duration": 3600
}
)
print(f"Created workflow: {workflow_id}")
# Execute workflow
result = await engine.execute_workflow(workflow_id)
print("\nWorkflow Results:")
print(f"Status: {result['status']}")
print(f"Duration: {result['duration']}s")
print(f"Stages Executed: {result['stages_executed']}")
print(f"Overall Success: {result['overall_success']}")
asyncio.run(test_advanced_workflow())