|
""" |
|
ReconAgent: Cybersecurity Reconnaissance Agent |
|
Performs stealth reconnaissance and information gathering operations. |
|
""" |
|
|
|
import json |
|
import logging |
|
from typing import Dict, List, Optional, Any |
|
from dataclasses import dataclass |
|
from pathlib import Path |
|
|
|
|
|
|
|
@dataclass |
|
class ReconTarget: |
|
"""Target information for reconnaissance operations.""" |
|
target: str |
|
target_type: str |
|
constraints: Dict[str, Any] |
|
opsec_level: str = 'medium' |
|
|
|
@dataclass |
|
class ReconResult: |
|
"""Results from reconnaissance operations.""" |
|
target: str |
|
commands: Dict[str, List[str]] |
|
passive_techniques: List[str] |
|
opsec_notes: List[str] |
|
risk_assessment: str |
|
next_steps: List[str] |
|
|
|
class ReconAgent: |
|
"""Advanced reconnaissance agent with OPSEC awareness.""" |
|
|
|
def __init__(self, config_path: Optional[Path] = None): |
|
self.logger = logging.getLogger(__name__) |
|
self.config = self._load_config(config_path) |
|
self.opsec_profiles = self._load_opsec_profiles() |
|
|
|
def _load_config(self, config_path: Optional[Path]) -> Dict: |
|
"""Load agent configuration.""" |
|
default_config = { |
|
'max_scan_ports': 1000, |
|
'scan_timing': 'T3', |
|
'stealth_mode': True, |
|
'passive_only': False, |
|
'shodan_api_key': None, |
|
'censys_api_key': None |
|
} |
|
|
|
if config_path and config_path.exists(): |
|
with open(config_path, 'r') as f: |
|
user_config = json.load(f) |
|
default_config.update(user_config) |
|
|
|
return default_config |
|
|
|
def _load_opsec_profiles(self) -> Dict: |
|
"""Load OPSEC profiles for different stealth levels.""" |
|
return { |
|
'low': { |
|
'timing': 'T4', |
|
'port_limit': 65535, |
|
'techniques': ['tcp_connect', 'udp_scan', 'service_detection'], |
|
'delay_between_scans': 0 |
|
}, |
|
'medium': { |
|
'timing': 'T3', |
|
'port_limit': 1000, |
|
'techniques': ['syn_scan', 'service_detection'], |
|
'delay_between_scans': 1 |
|
}, |
|
'high': { |
|
'timing': 'T2', |
|
'port_limit': 100, |
|
'techniques': ['syn_scan'], |
|
'delay_between_scans': 5 |
|
}, |
|
'maximum': { |
|
'timing': 'T1', |
|
'port_limit': 22, |
|
'techniques': ['passive_only'], |
|
'delay_between_scans': 30 |
|
} |
|
} |
|
|
|
def analyze_target(self, target_info: ReconTarget) -> ReconResult: |
|
""" |
|
Analyze target and generate reconnaissance strategy. |
|
|
|
HUMAN_APPROVAL_REQUIRED: Review target analysis before proceeding |
|
""" |
|
self.logger.info(f"Analyzing target: {target_info.target}") |
|
|
|
|
|
opsec_profile = self.opsec_profiles.get(target_info.opsec_level, self.opsec_profiles['medium']) |
|
|
|
|
|
commands = { |
|
'nmap': self._generate_nmap_commands(target_info, opsec_profile), |
|
'passive_dns': self._generate_passive_dns_commands(target_info), |
|
'osint': self._generate_osint_commands(target_info), |
|
'shodan': self._generate_shodan_queries(target_info) |
|
} |
|
|
|
|
|
passive_techniques = self._generate_passive_techniques(target_info) |
|
|
|
|
|
opsec_notes = self._generate_opsec_notes(target_info, opsec_profile) |
|
|
|
|
|
risk_assessment = self._assess_reconnaissance_risk(target_info, commands) |
|
|
|
|
|
next_steps = self._suggest_next_steps(target_info, commands) |
|
|
|
return ReconResult( |
|
target=target_info.target, |
|
commands=commands, |
|
passive_techniques=passive_techniques, |
|
opsec_notes=opsec_notes, |
|
risk_assessment=risk_assessment, |
|
next_steps=next_steps |
|
) |
|
|
|
def _generate_nmap_commands(self, target: ReconTarget, opsec_profile: Dict) -> List[str]: |
|
"""Generate OPSEC-aware Nmap commands.""" |
|
commands = [] |
|
timing = opsec_profile['timing'] |
|
port_limit = min(opsec_profile['port_limit'], self.config['max_scan_ports']) |
|
|
|
if 'passive_only' in opsec_profile['techniques']: |
|
return [] |
|
|
|
|
|
if target.opsec_level in ['low', 'medium']: |
|
commands.append(f"nmap -sn {target.target}") |
|
|
|
|
|
if 'syn_scan' in opsec_profile['techniques']: |
|
commands.append(f"nmap -sS -{timing} --top-ports {port_limit} {target.target}") |
|
elif 'tcp_connect' in opsec_profile['techniques']: |
|
commands.append(f"nmap -sT -{timing} --top-ports {port_limit} {target.target}") |
|
|
|
|
|
if 'service_detection' in opsec_profile['techniques'] and target.opsec_level != 'high': |
|
commands.append(f"nmap -sV -{timing} --version-intensity 2 {target.target}") |
|
|
|
|
|
if target.opsec_level == 'low': |
|
commands.append(f"nmap -O -{timing} {target.target}") |
|
|
|
|
|
for i, cmd in enumerate(commands): |
|
if target.opsec_level in ['high', 'maximum']: |
|
commands[i] += " -f --scan-delay 1000ms" |
|
|
|
return commands |
|
|
|
def _generate_passive_dns_commands(self, target: ReconTarget) -> List[str]: |
|
"""Generate passive DNS reconnaissance commands.""" |
|
commands = [] |
|
|
|
if target.target_type == 'domain': |
|
commands.extend([ |
|
f"dig {target.target} ANY", |
|
f"dig {target.target} TXT", |
|
f"dig {target.target} MX", |
|
f"dig {target.target} NS", |
|
f"whois {target.target}", |
|
f"curl -s 'https://crt.sh/?q={target.target}&output=json'" |
|
]) |
|
|
|
return commands |
|
|
|
def _generate_osint_commands(self, target: ReconTarget) -> List[str]: |
|
"""Generate OSINT gathering commands.""" |
|
commands = [] |
|
|
|
if target.target_type in ['domain', 'organization']: |
|
commands.extend([ |
|
f"theharvester -d {target.target} -b google,bing,linkedin", |
|
f"amass enum -d {target.target}", |
|
f"subfinder -d {target.target}", |
|
f"curl -s 'https://api.github.com/search/code?q={target.target}'" |
|
]) |
|
|
|
return commands |
|
|
|
def _generate_shodan_queries(self, target: ReconTarget) -> List[str]: |
|
"""Generate Shodan search queries.""" |
|
if not self.config.get('shodan_api_key'): |
|
return ["# Shodan API key not configured"] |
|
|
|
queries = [] |
|
|
|
if target.target_type == 'ip': |
|
queries.append(f"host:{target.target}") |
|
elif target.target_type == 'domain': |
|
queries.extend([ |
|
f"hostname:{target.target}", |
|
f"ssl:{target.target}", |
|
f"org:\"{target.target}\"" |
|
]) |
|
elif target.target_type == 'organization': |
|
queries.append(f"org:\"{target.target}\"") |
|
|
|
return queries |
|
|
|
def _generate_passive_techniques(self, target: ReconTarget) -> List[str]: |
|
"""Generate list of passive reconnaissance techniques.""" |
|
techniques = [ |
|
"Certificate Transparency log analysis", |
|
"DNS cache snooping", |
|
"BGP route analysis", |
|
"Social media reconnaissance", |
|
"Job posting analysis", |
|
"Public document metadata extraction", |
|
"Wayback Machine analysis", |
|
"GitHub/GitLab repository search" |
|
] |
|
|
|
if target.target_type == 'organization': |
|
techniques.extend([ |
|
"LinkedIn employee enumeration", |
|
"SEC filing analysis", |
|
"Press release analysis", |
|
"Conference presentation search" |
|
]) |
|
|
|
return techniques |
|
|
|
def _generate_opsec_notes(self, target: ReconTarget, opsec_profile: Dict) -> List[str]: |
|
"""Generate OPSEC considerations and warnings.""" |
|
notes = [] |
|
|
|
if target.opsec_level == 'maximum': |
|
notes.extend([ |
|
"MAXIMUM STEALTH: Use only passive techniques", |
|
"Consider using Tor or VPN for all queries", |
|
"Spread reconnaissance over multiple days", |
|
"Use different source IPs for different queries" |
|
]) |
|
elif target.opsec_level == 'high': |
|
notes.extend([ |
|
"HIGH STEALTH: Minimize active scanning", |
|
"Use packet fragmentation and timing delays", |
|
"Consider using decoy IPs", |
|
"Monitor for defensive responses" |
|
]) |
|
elif target.opsec_level == 'medium': |
|
notes.extend([ |
|
"MEDIUM STEALTH: Balance speed and stealth", |
|
"Use moderate timing delays", |
|
"Avoid aggressive service detection" |
|
]) |
|
else: |
|
notes.extend([ |
|
"LOW STEALTH: Speed prioritized over stealth", |
|
"Full port ranges and service detection enabled", |
|
"Monitor logs for potential detection" |
|
]) |
|
|
|
|
|
notes.extend([ |
|
"Log all reconnaissance activities", |
|
"Use legitimate-looking User-Agent strings", |
|
"Vary timing between different techniques", |
|
"Document any anomalous responses" |
|
]) |
|
|
|
return notes |
|
|
|
def _assess_reconnaissance_risk(self, target: ReconTarget, commands: Dict) -> str: |
|
"""Assess the risk level of the reconnaissance plan.""" |
|
risk_factors = [] |
|
|
|
|
|
active_commands = len(commands.get('nmap', [])) |
|
if active_commands > 5: |
|
risk_factors.append("High number of active scans") |
|
|
|
|
|
if target.opsec_level == 'maximum' and active_commands > 0: |
|
risk_factors.append("Active scanning conflicts with maximum stealth requirement") |
|
|
|
|
|
nmap_commands = ' '.join(commands.get('nmap', [])) |
|
if '-A' in nmap_commands or '--script' in nmap_commands: |
|
risk_factors.append("Aggressive scanning techniques detected") |
|
|
|
if not risk_factors: |
|
return "LOW: Reconnaissance plan follows OPSEC guidelines" |
|
elif len(risk_factors) <= 2: |
|
return f"MEDIUM: Consider addressing: {'; '.join(risk_factors)}" |
|
else: |
|
return f"HIGH: Multiple risk factors identified: {'; '.join(risk_factors)}" |
|
|
|
def _suggest_next_steps(self, target: ReconTarget, commands: Dict) -> List[str]: |
|
"""Suggest next steps based on reconnaissance results.""" |
|
steps = [ |
|
"Execute passive reconnaissance first", |
|
"Analyze results for interesting services/ports", |
|
"Proceed with active scanning if OPSEC allows", |
|
"Document all findings in structured format", |
|
"Identify potential attack vectors", |
|
"Plan next phase based on discovered services" |
|
] |
|
|
|
if target.opsec_level in ['high', 'maximum']: |
|
steps.insert(1, "Wait 24-48 hours between reconnaissance phases") |
|
|
|
return steps |
|
|
|
def execute_reconnaissance(self, target_info: ReconTarget) -> Dict: |
|
""" |
|
Execute reconnaissance plan (simulation/planning mode). |
|
|
|
HUMAN_APPROVAL_REQUIRED: Manual execution required for actual scanning |
|
""" |
|
self.logger.warning("SIMULATION MODE: Actual command execution disabled for safety") |
|
|
|
recon_result = self.analyze_target(target_info) |
|
|
|
|
|
return { |
|
'target': target_info.target, |
|
'opsec_level': target_info.opsec_level, |
|
'plan': recon_result.__dict__, |
|
'execution_status': 'SIMULATION_ONLY', |
|
'timestamp': str(Path().cwd()) |
|
} |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
|
|
|
agent = ReconAgent() |
|
|
|
|
|
target = ReconTarget( |
|
target="example.com", |
|
target_type="domain", |
|
constraints={"time_limit": "2h", "stealth": True}, |
|
opsec_level="high" |
|
) |
|
|
|
|
|
result = agent.execute_reconnaissance(target) |
|
print(json.dumps(result, indent=2)) |
|
|