cyber_llm / src /agents /recon_agent.py
unit731's picture
Upload core Cyber-LLM platform components
23804b3 verified
"""
ReconAgent: Cybersecurity Reconnaissance Agent
Performs stealth reconnaissance and information gathering operations.
"""
import json
import logging
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from pathlib import Path
# HUMAN_APPROVAL_REQUIRED: Review reconnaissance strategies before execution
@dataclass
class ReconTarget:
"""Target information for reconnaissance operations."""
target: str
target_type: str # 'domain', 'ip', 'network', 'organization'
constraints: Dict[str, Any]
opsec_level: str = 'medium' # 'low', 'medium', 'high', 'maximum'
@dataclass
class ReconResult:
"""Results from reconnaissance operations."""
target: str
commands: Dict[str, List[str]]
passive_techniques: List[str]
opsec_notes: List[str]
risk_assessment: str
next_steps: List[str]
class ReconAgent:
"""Advanced reconnaissance agent with OPSEC awareness."""
def __init__(self, config_path: Optional[Path] = None):
self.logger = logging.getLogger(__name__)
self.config = self._load_config(config_path)
self.opsec_profiles = self._load_opsec_profiles()
def _load_config(self, config_path: Optional[Path]) -> Dict:
"""Load agent configuration."""
default_config = {
'max_scan_ports': 1000,
'scan_timing': 'T3', # Normal timing
'stealth_mode': True,
'passive_only': False,
'shodan_api_key': None,
'censys_api_key': None
}
if config_path and config_path.exists():
with open(config_path, 'r') as f:
user_config = json.load(f)
default_config.update(user_config)
return default_config
def _load_opsec_profiles(self) -> Dict:
"""Load OPSEC profiles for different stealth levels."""
return {
'low': {
'timing': 'T4',
'port_limit': 65535,
'techniques': ['tcp_connect', 'udp_scan', 'service_detection'],
'delay_between_scans': 0
},
'medium': {
'timing': 'T3',
'port_limit': 1000,
'techniques': ['syn_scan', 'service_detection'],
'delay_between_scans': 1
},
'high': {
'timing': 'T2',
'port_limit': 100,
'techniques': ['syn_scan'],
'delay_between_scans': 5
},
'maximum': {
'timing': 'T1',
'port_limit': 22, # Common ports only
'techniques': ['passive_only'],
'delay_between_scans': 30
}
}
def analyze_target(self, target_info: ReconTarget) -> ReconResult:
"""
Analyze target and generate reconnaissance strategy.
HUMAN_APPROVAL_REQUIRED: Review target analysis before proceeding
"""
self.logger.info(f"Analyzing target: {target_info.target}")
# Get OPSEC profile
opsec_profile = self.opsec_profiles.get(target_info.opsec_level, self.opsec_profiles['medium'])
# Generate reconnaissance commands
commands = {
'nmap': self._generate_nmap_commands(target_info, opsec_profile),
'passive_dns': self._generate_passive_dns_commands(target_info),
'osint': self._generate_osint_commands(target_info),
'shodan': self._generate_shodan_queries(target_info)
}
# Generate passive techniques
passive_techniques = self._generate_passive_techniques(target_info)
# OPSEC considerations
opsec_notes = self._generate_opsec_notes(target_info, opsec_profile)
# Risk assessment
risk_assessment = self._assess_reconnaissance_risk(target_info, commands)
# Next steps
next_steps = self._suggest_next_steps(target_info, commands)
return ReconResult(
target=target_info.target,
commands=commands,
passive_techniques=passive_techniques,
opsec_notes=opsec_notes,
risk_assessment=risk_assessment,
next_steps=next_steps
)
def _generate_nmap_commands(self, target: ReconTarget, opsec_profile: Dict) -> List[str]:
"""Generate OPSEC-aware Nmap commands."""
commands = []
timing = opsec_profile['timing']
port_limit = min(opsec_profile['port_limit'], self.config['max_scan_ports'])
if 'passive_only' in opsec_profile['techniques']:
return [] # No active scanning for maximum stealth
# Host discovery
if target.opsec_level in ['low', 'medium']:
commands.append(f"nmap -sn {target.target}")
# Port scanning
if 'syn_scan' in opsec_profile['techniques']:
commands.append(f"nmap -sS -{timing} --top-ports {port_limit} {target.target}")
elif 'tcp_connect' in opsec_profile['techniques']:
commands.append(f"nmap -sT -{timing} --top-ports {port_limit} {target.target}")
# Service detection (careful with stealth)
if 'service_detection' in opsec_profile['techniques'] and target.opsec_level != 'high':
commands.append(f"nmap -sV -{timing} --version-intensity 2 {target.target}")
# OS detection (only for low OPSEC)
if target.opsec_level == 'low':
commands.append(f"nmap -O -{timing} {target.target}")
# Add stealth flags
for i, cmd in enumerate(commands):
if target.opsec_level in ['high', 'maximum']:
commands[i] += " -f --scan-delay 1000ms" # Fragment packets, add delay
return commands
def _generate_passive_dns_commands(self, target: ReconTarget) -> List[str]:
"""Generate passive DNS reconnaissance commands."""
commands = []
if target.target_type == 'domain':
commands.extend([
f"dig {target.target} ANY",
f"dig {target.target} TXT",
f"dig {target.target} MX",
f"dig {target.target} NS",
f"whois {target.target}",
f"curl -s 'https://crt.sh/?q={target.target}&output=json'"
])
return commands
def _generate_osint_commands(self, target: ReconTarget) -> List[str]:
"""Generate OSINT gathering commands."""
commands = []
if target.target_type in ['domain', 'organization']:
commands.extend([
f"theharvester -d {target.target} -b google,bing,linkedin",
f"amass enum -d {target.target}",
f"subfinder -d {target.target}",
f"curl -s 'https://api.github.com/search/code?q={target.target}'"
])
return commands
def _generate_shodan_queries(self, target: ReconTarget) -> List[str]:
"""Generate Shodan search queries."""
if not self.config.get('shodan_api_key'):
return ["# Shodan API key not configured"]
queries = []
if target.target_type == 'ip':
queries.append(f"host:{target.target}")
elif target.target_type == 'domain':
queries.extend([
f"hostname:{target.target}",
f"ssl:{target.target}",
f"org:\"{target.target}\""
])
elif target.target_type == 'organization':
queries.append(f"org:\"{target.target}\"")
return queries
def _generate_passive_techniques(self, target: ReconTarget) -> List[str]:
"""Generate list of passive reconnaissance techniques."""
techniques = [
"Certificate Transparency log analysis",
"DNS cache snooping",
"BGP route analysis",
"Social media reconnaissance",
"Job posting analysis",
"Public document metadata extraction",
"Wayback Machine analysis",
"GitHub/GitLab repository search"
]
if target.target_type == 'organization':
techniques.extend([
"LinkedIn employee enumeration",
"SEC filing analysis",
"Press release analysis",
"Conference presentation search"
])
return techniques
def _generate_opsec_notes(self, target: ReconTarget, opsec_profile: Dict) -> List[str]:
"""Generate OPSEC considerations and warnings."""
notes = []
if target.opsec_level == 'maximum':
notes.extend([
"MAXIMUM STEALTH: Use only passive techniques",
"Consider using Tor or VPN for all queries",
"Spread reconnaissance over multiple days",
"Use different source IPs for different queries"
])
elif target.opsec_level == 'high':
notes.extend([
"HIGH STEALTH: Minimize active scanning",
"Use packet fragmentation and timing delays",
"Consider using decoy IPs",
"Monitor for defensive responses"
])
elif target.opsec_level == 'medium':
notes.extend([
"MEDIUM STEALTH: Balance speed and stealth",
"Use moderate timing delays",
"Avoid aggressive service detection"
])
else: # low
notes.extend([
"LOW STEALTH: Speed prioritized over stealth",
"Full port ranges and service detection enabled",
"Monitor logs for potential detection"
])
# General OPSEC notes
notes.extend([
"Log all reconnaissance activities",
"Use legitimate-looking User-Agent strings",
"Vary timing between different techniques",
"Document any anomalous responses"
])
return notes
def _assess_reconnaissance_risk(self, target: ReconTarget, commands: Dict) -> str:
"""Assess the risk level of the reconnaissance plan."""
risk_factors = []
# Count active scanning commands
active_commands = len(commands.get('nmap', []))
if active_commands > 5:
risk_factors.append("High number of active scans")
# Check OPSEC level vs techniques
if target.opsec_level == 'maximum' and active_commands > 0:
risk_factors.append("Active scanning conflicts with maximum stealth requirement")
# Check for aggressive techniques
nmap_commands = ' '.join(commands.get('nmap', []))
if '-A' in nmap_commands or '--script' in nmap_commands:
risk_factors.append("Aggressive scanning techniques detected")
if not risk_factors:
return "LOW: Reconnaissance plan follows OPSEC guidelines"
elif len(risk_factors) <= 2:
return f"MEDIUM: Consider addressing: {'; '.join(risk_factors)}"
else:
return f"HIGH: Multiple risk factors identified: {'; '.join(risk_factors)}"
def _suggest_next_steps(self, target: ReconTarget, commands: Dict) -> List[str]:
"""Suggest next steps based on reconnaissance results."""
steps = [
"Execute passive reconnaissance first",
"Analyze results for interesting services/ports",
"Proceed with active scanning if OPSEC allows",
"Document all findings in structured format",
"Identify potential attack vectors",
"Plan next phase based on discovered services"
]
if target.opsec_level in ['high', 'maximum']:
steps.insert(1, "Wait 24-48 hours between reconnaissance phases")
return steps
def execute_reconnaissance(self, target_info: ReconTarget) -> Dict:
"""
Execute reconnaissance plan (simulation/planning mode).
HUMAN_APPROVAL_REQUIRED: Manual execution required for actual scanning
"""
self.logger.warning("SIMULATION MODE: Actual command execution disabled for safety")
recon_result = self.analyze_target(target_info)
# Return structured results for logging/analysis
return {
'target': target_info.target,
'opsec_level': target_info.opsec_level,
'plan': recon_result.__dict__,
'execution_status': 'SIMULATION_ONLY',
'timestamp': str(Path().cwd())
}
# Example usage and testing
if __name__ == "__main__":
# Configure logging
logging.basicConfig(level=logging.INFO)
# Initialize agent
agent = ReconAgent()
# Example target
target = ReconTarget(
target="example.com",
target_type="domain",
constraints={"time_limit": "2h", "stealth": True},
opsec_level="high"
)
# Analyze target
result = agent.execute_reconnaissance(target)
print(json.dumps(result, indent=2))