|
""" |
|
Explainability Agent for Cyber-LLM |
|
Provides rationale and explanation for agent decisions |
|
""" |
|
|
|
import json |
|
import logging |
|
from typing import Dict, List, Any, Optional |
|
from datetime import datetime |
|
import yaml |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
class ExplainabilityAgent: |
|
""" |
|
Agent responsible for providing explainable rationales for decisions |
|
made by other agents in the Cyber-LLM system. |
|
""" |
|
|
|
def __init__(self, config_path: Optional[str] = None): |
|
"""Initialize the ExplainabilityAgent""" |
|
self.config = self._load_config(config_path) |
|
self.explanation_templates = self._load_explanation_templates() |
|
|
|
def _load_config(self, config_path: Optional[str]) -> Dict[str, Any]: |
|
"""Load configuration for the explainability agent""" |
|
default_config = { |
|
"explanation_depth": "detailed", |
|
"include_risks": True, |
|
"include_mitigations": True, |
|
"include_alternatives": True, |
|
"format": "json" |
|
} |
|
|
|
if config_path: |
|
try: |
|
with open(config_path, 'r') as f: |
|
user_config = yaml.safe_load(f) |
|
default_config.update(user_config) |
|
except Exception as e: |
|
logger.warning(f"Could not load config from {config_path}: {e}") |
|
|
|
return default_config |
|
|
|
def _load_explanation_templates(self) -> Dict[str, str]: |
|
"""Load explanation templates for different agent types""" |
|
return { |
|
"recon": """ |
|
RECONNAISSANCE DECISION EXPLANATION: |
|
Action: {action} |
|
Target: {target} |
|
|
|
Justification: |
|
- {justification} |
|
|
|
Risk Assessment: |
|
- Detection Risk: {detection_risk} |
|
- Network Impact: {network_impact} |
|
- Time Investment: {time_investment} |
|
|
|
OPSEC Considerations: |
|
- {opsec_considerations} |
|
|
|
Alternative Approaches: |
|
- {alternatives} |
|
""", |
|
|
|
"c2": """ |
|
C2 CHANNEL DECISION EXPLANATION: |
|
Channel Type: {channel_type} |
|
Configuration: {configuration} |
|
|
|
Justification: |
|
- {justification} |
|
|
|
Risk Assessment: |
|
- Stealth Level: {stealth_level} |
|
- Reliability: {reliability} |
|
- Bandwidth: {bandwidth} |
|
|
|
OPSEC Considerations: |
|
- {opsec_considerations} |
|
|
|
Backup Options: |
|
- {backup_options} |
|
""", |
|
|
|
"post_exploit": """ |
|
POST-EXPLOITATION DECISION EXPLANATION: |
|
Action: {action} |
|
Method: {method} |
|
|
|
Justification: |
|
- {justification} |
|
|
|
Risk Assessment: |
|
- Detection Probability: {detection_probability} |
|
- System Impact: {system_impact} |
|
- Evidence Left: {evidence_left} |
|
|
|
OPSEC Considerations: |
|
- {opsec_considerations} |
|
|
|
Cleanup Required: |
|
- {cleanup_required} |
|
""" |
|
} |
|
|
|
def explain_decision(self, agent_type: str, decision_data: Dict[str, Any]) -> Dict[str, Any]: |
|
""" |
|
Generate explanation for an agent's decision |
|
|
|
Args: |
|
agent_type: Type of agent (recon, c2, post_exploit, etc.) |
|
decision_data: Data about the decision made |
|
|
|
Returns: |
|
Dictionary containing detailed explanation |
|
""" |
|
try: |
|
explanation = { |
|
"timestamp": datetime.now().isoformat(), |
|
"agent_type": agent_type, |
|
"decision_id": decision_data.get("id", "unknown"), |
|
"explanation": self._generate_explanation(agent_type, decision_data), |
|
"risk_assessment": self._assess_risks(agent_type, decision_data), |
|
"alternatives": self._suggest_alternatives(agent_type, decision_data), |
|
"confidence_score": self._calculate_confidence(decision_data) |
|
} |
|
|
|
if self.config.get("include_mitigations", True): |
|
explanation["mitigations"] = self._suggest_mitigations(agent_type, decision_data) |
|
|
|
logger.info(f"Generated explanation for {agent_type} decision: {decision_data.get('id', 'unknown')}") |
|
return explanation |
|
|
|
except Exception as e: |
|
logger.error(f"Error generating explanation: {e}") |
|
return { |
|
"error": f"Failed to generate explanation: {str(e)}", |
|
"timestamp": datetime.now().isoformat(), |
|
"agent_type": agent_type |
|
} |
|
|
|
def _generate_explanation(self, agent_type: str, decision_data: Dict[str, Any]) -> str: |
|
"""Generate the core explanation for the decision""" |
|
if agent_type == "recon": |
|
return self._explain_recon_decision(decision_data) |
|
elif agent_type == "c2": |
|
return self._explain_c2_decision(decision_data) |
|
elif agent_type == "post_exploit": |
|
return self._explain_post_exploit_decision(decision_data) |
|
else: |
|
return f"Decision made by {agent_type} agent based on available information." |
|
|
|
def _explain_recon_decision(self, decision_data: Dict[str, Any]) -> str: |
|
"""Explain reconnaissance decisions""" |
|
action = decision_data.get("action", "unknown") |
|
target = decision_data.get("target", "unknown") |
|
|
|
explanations = { |
|
"nmap_scan": f"Initiated Nmap scan against {target} to identify open ports and services. This is a standard reconnaissance technique that provides essential information for attack planning.", |
|
"shodan_search": f"Performed Shodan search for {target} to gather passive intelligence about exposed services without direct interaction with the target.", |
|
"dns_enum": f"Conducted DNS enumeration for {target} to map the network infrastructure and identify potential attack vectors." |
|
} |
|
|
|
return explanations.get(action, f"Performed {action} against {target} as part of reconnaissance phase.") |
|
|
|
def _explain_c2_decision(self, decision_data: Dict[str, Any]) -> str: |
|
"""Explain C2 channel decisions""" |
|
channel = decision_data.get("channel_type", "unknown") |
|
|
|
explanations = { |
|
"http": "Selected HTTP channel for C2 communication due to its ability to blend with normal web traffic and bypass many network filters.", |
|
"https": "Chose HTTPS channel for encrypted C2 communication, providing both stealth and security for command transmission.", |
|
"dns": "Implemented DNS tunneling for C2 to leverage a protocol that is rarely blocked and often unmonitored." |
|
} |
|
|
|
return explanations.get(channel, f"Established {channel} C2 channel based on network constraints and stealth requirements.") |
|
|
|
def _explain_post_exploit_decision(self, decision_data: Dict[str, Any]) -> str: |
|
"""Explain post-exploitation decisions""" |
|
action = decision_data.get("action", "unknown") |
|
|
|
explanations = { |
|
"credential_dump": "Initiated credential dumping to harvest authentication materials for lateral movement and privilege escalation.", |
|
"lateral_movement": "Attempting lateral movement to expand access within the target network and reach high-value assets.", |
|
"persistence": "Establishing persistence mechanisms to maintain access even after system reboots or security updates." |
|
} |
|
|
|
return explanations.get(action, f"Executed {action} to advance the attack chain and achieve mission objectives.") |
|
|
|
def _assess_risks(self, agent_type: str, decision_data: Dict[str, Any]) -> Dict[str, str]: |
|
"""Assess risks associated with the decision""" |
|
risk_factors = { |
|
"detection_risk": "medium", |
|
"system_impact": "low", |
|
"evidence_trail": "minimal", |
|
"network_noise": "low" |
|
} |
|
|
|
|
|
if agent_type == "recon": |
|
action = decision_data.get("action", "") |
|
if "aggressive" in action.lower() or "fast" in action.lower(): |
|
risk_factors["detection_risk"] = "high" |
|
risk_factors["network_noise"] = "high" |
|
|
|
elif agent_type == "post_exploit": |
|
action = decision_data.get("action", "") |
|
if "dump" in action.lower() or "extract" in action.lower(): |
|
risk_factors["detection_risk"] = "high" |
|
risk_factors["system_impact"] = "medium" |
|
risk_factors["evidence_trail"] = "significant" |
|
|
|
return risk_factors |
|
|
|
def _suggest_alternatives(self, agent_type: str, decision_data: Dict[str, Any]) -> List[str]: |
|
"""Suggest alternative approaches""" |
|
alternatives = [] |
|
|
|
if agent_type == "recon": |
|
alternatives = [ |
|
"Use passive reconnaissance techniques instead of active scanning", |
|
"Employ slower scan rates to reduce detection probability", |
|
"Utilize third-party intelligence sources for initial reconnaissance" |
|
] |
|
elif agent_type == "c2": |
|
alternatives = [ |
|
"Consider domain fronting techniques for additional stealth", |
|
"Implement multiple fallback C2 channels", |
|
"Use legitimate cloud services as C2 infrastructure" |
|
] |
|
elif agent_type == "post_exploit": |
|
alternatives = [ |
|
"Use living-off-the-land techniques instead of custom tools", |
|
"Implement time delays between actions to avoid pattern detection", |
|
"Utilize legitimate administrative tools for post-exploitation activities" |
|
] |
|
|
|
return alternatives |
|
|
|
def _suggest_mitigations(self, agent_type: str, decision_data: Dict[str, Any]) -> List[str]: |
|
"""Suggest risk mitigation strategies""" |
|
mitigations = [ |
|
"Monitor network traffic for anomalous patterns", |
|
"Implement rate limiting to slow down automated attacks", |
|
"Deploy behavioral analysis tools to detect suspicious activities", |
|
"Maintain updated incident response procedures" |
|
] |
|
|
|
return mitigations |
|
|
|
def _calculate_confidence(self, decision_data: Dict[str, Any]) -> float: |
|
"""Calculate confidence score for the decision""" |
|
|
|
factors = [] |
|
|
|
if decision_data.get("target"): |
|
factors.append(0.2) |
|
if decision_data.get("action"): |
|
factors.append(0.3) |
|
if decision_data.get("parameters"): |
|
factors.append(0.2) |
|
if decision_data.get("context"): |
|
factors.append(0.3) |
|
|
|
return min(sum(factors), 1.0) |
|
|
|
def format_explanation(self, explanation: Dict[str, Any], format_type: str = "json") -> str: |
|
"""Format explanation in the specified format""" |
|
if format_type == "json": |
|
return json.dumps(explanation, indent=2) |
|
elif format_type == "yaml": |
|
return yaml.dump(explanation, default_flow_style=False) |
|
elif format_type == "markdown": |
|
return self._format_as_markdown(explanation) |
|
else: |
|
return str(explanation) |
|
|
|
def _format_as_markdown(self, explanation: Dict[str, Any]) -> str: |
|
"""Format explanation as markdown""" |
|
md = f""" |
|
# Decision Explanation Report |
|
|
|
**Agent Type**: {explanation.get('agent_type', 'Unknown')} |
|
**Decision ID**: {explanation.get('decision_id', 'Unknown')} |
|
**Timestamp**: {explanation.get('timestamp', 'Unknown')} |
|
**Confidence Score**: {explanation.get('confidence_score', 0.0):.2f} |
|
|
|
## Explanation |
|
{explanation.get('explanation', 'No explanation available')} |
|
|
|
## Risk Assessment |
|
""" |
|
|
|
risks = explanation.get('risk_assessment', {}) |
|
for risk, level in risks.items(): |
|
md += f"- **{risk.replace('_', ' ').title()}**: {level}\n" |
|
|
|
if explanation.get('alternatives'): |
|
md += "\n## Alternative Approaches\n" |
|
for alt in explanation['alternatives']: |
|
md += f"- {alt}\n" |
|
|
|
if explanation.get('mitigations'): |
|
md += "\n## Suggested Mitigations\n" |
|
for mit in explanation['mitigations']: |
|
md += f"- {mit}\n" |
|
|
|
return md |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
explainer = ExplainabilityAgent() |
|
|
|
|
|
recon_decision = { |
|
"id": "recon_001", |
|
"action": "nmap_scan", |
|
"target": "192.168.1.1-100", |
|
"parameters": { |
|
"scan_type": "TCP SYN", |
|
"ports": "1-1000", |
|
"timing": "T3" |
|
}, |
|
"context": "Initial network reconnaissance" |
|
} |
|
|
|
|
|
explanation = explainer.explain_decision("recon", recon_decision) |
|
|
|
|
|
print("JSON Format:") |
|
print(explainer.format_explanation(explanation, "json")) |
|
|
|
print("\nMarkdown Format:") |
|
print(explainer.format_explanation(explanation, "markdown")) |
|
|