|
""" |
|
Persistent Multi-Agent Integration |
|
Integrates persistent cognitive system with existing agent framework |
|
|
|
Author: Cyber-LLM Development Team |
|
Date: August 6, 2025 |
|
Version: 2.0.0 |
|
""" |
|
|
|
import asyncio |
|
import logging |
|
import json |
|
from datetime import datetime, timedelta |
|
from typing import Dict, List, Any, Optional, Set, Union |
|
from dataclasses import dataclass, field |
|
from pathlib import Path |
|
import uuid |
|
|
|
|
|
import sys |
|
import os |
|
sys.path.append(os.path.dirname(os.path.dirname(__file__))) |
|
|
|
from agents.recon_agent import ReconAgent |
|
from agents.c2_agent import C2Agent |
|
from agents.post_exploit_agent import PostExploitAgent |
|
from agents.safety_agent import SafetyAgent |
|
from agents.explainability_agent import ExplainabilityAgent |
|
from ..agents.orchestrator import Orchestrator |
|
|
|
|
|
from cognitive.persistent_reasoning_system import ( |
|
PersistentCognitiveSystem, MemoryEntry, MemoryType, |
|
ReasoningType, StrategicPlan, ReasoningChain |
|
) |
|
from server.persistent_agent_server import PersistentAgentServer, create_server_config |
|
|
|
@dataclass |
|
class AgentMemoryProfile: |
|
"""Memory profile for each agent type""" |
|
agent_id: str |
|
agent_type: str |
|
primary_memory_types: List[MemoryType] = field(default_factory=list) |
|
reasoning_preferences: List[ReasoningType] = field(default_factory=list) |
|
strategic_capabilities: List[str] = field(default_factory=list) |
|
memory_retention_policy: Dict[str, Any] = field(default_factory=dict) |
|
knowledge_domains: Set[str] = field(default_factory=set) |
|
|
|
@dataclass |
|
class CognitiveAgentState: |
|
"""Enhanced agent state with cognitive capabilities""" |
|
agent_id: str |
|
base_agent: Any |
|
memory_profile: AgentMemoryProfile |
|
cognitive_session_id: str |
|
active_reasoning_chains: List[str] = field(default_factory=list) |
|
strategic_plans: List[str] = field(default_factory=list) |
|
memory_consolidation_schedule: Optional[datetime] = None |
|
last_cognitive_update: datetime = field(default_factory=datetime.now) |
|
cognitive_metrics: Dict[str, Any] = field(default_factory=dict) |
|
|
|
class PersistentMultiAgentSystem: |
|
""" |
|
Enhanced multi-agent system with persistent cognitive capabilities |
|
""" |
|
|
|
def __init__(self, |
|
cognitive_db_path: str = "data/cognitive_system.db", |
|
server_config: Optional[Any] = None): |
|
|
|
self.cognitive_system = PersistentCognitiveSystem(cognitive_db_path) |
|
self.logger = logging.getLogger("persistent_multi_agent") |
|
|
|
|
|
self.cognitive_agents: Dict[str, CognitiveAgentState] = {} |
|
self.agent_profiles: Dict[str, AgentMemoryProfile] = {} |
|
|
|
|
|
self.orchestrator = None |
|
self.base_agents = {} |
|
|
|
|
|
self.server = None |
|
if server_config: |
|
self.server = PersistentAgentServer(server_config) |
|
|
|
|
|
self.global_memory_graph = {} |
|
self.inter_agent_reasoning_chains = {} |
|
self.collaborative_strategic_plans = {} |
|
|
|
|
|
self._initialize_agent_profiles() |
|
|
|
|
|
self.cognitive_tasks = [] |
|
self.system_running = False |
|
|
|
def _initialize_agent_profiles(self): |
|
"""Initialize memory profiles for each agent type""" |
|
|
|
|
|
self.agent_profiles["recon"] = AgentMemoryProfile( |
|
agent_id="recon", |
|
agent_type="reconnaissance", |
|
primary_memory_types=[ |
|
MemoryType.EPISODIC, |
|
MemoryType.SEMANTIC, |
|
MemoryType.PROCEDURAL |
|
], |
|
reasoning_preferences=[ |
|
ReasoningType.DEDUCTIVE, |
|
ReasoningType.INDUCTIVE, |
|
ReasoningType.ANALOGICAL |
|
], |
|
strategic_capabilities=[ |
|
"network_mapping", |
|
"vulnerability_discovery", |
|
"target_prioritization" |
|
], |
|
memory_retention_policy={ |
|
"target_info_retention": 90, |
|
"scan_results_retention": 30, |
|
"technique_learning_retention": 365 |
|
}, |
|
knowledge_domains={ |
|
"network_protocols", "vulnerability_databases", |
|
"scanning_techniques", "target_profiling" |
|
} |
|
) |
|
|
|
|
|
self.agent_profiles["c2"] = AgentMemoryProfile( |
|
agent_id="c2", |
|
agent_type="command_control", |
|
primary_memory_types=[ |
|
MemoryType.EPISODIC, |
|
MemoryType.WORKING, |
|
MemoryType.PROCEDURAL |
|
], |
|
reasoning_preferences=[ |
|
ReasoningType.STRATEGIC, |
|
ReasoningType.CAUSAL, |
|
ReasoningType.COUNTERFACTUAL |
|
], |
|
strategic_capabilities=[ |
|
"session_management", |
|
"payload_delivery", |
|
"persistence_mechanisms" |
|
], |
|
memory_retention_policy={ |
|
"session_logs_retention": 365, |
|
"command_history_retention": 180, |
|
"technique_effectiveness_retention": 730 |
|
}, |
|
knowledge_domains={ |
|
"c2_protocols", "payload_types", "persistence_methods", |
|
"evasion_techniques", "communication_channels" |
|
} |
|
) |
|
|
|
|
|
self.agent_profiles["post_exploit"] = AgentMemoryProfile( |
|
agent_id="post_exploit", |
|
agent_type="post_exploitation", |
|
primary_memory_types=[ |
|
MemoryType.EPISODIC, |
|
MemoryType.SEMANTIC, |
|
MemoryType.STRATEGIC |
|
], |
|
reasoning_preferences=[ |
|
ReasoningType.DEDUCTIVE, |
|
ReasoningType.STRATEGIC, |
|
ReasoningType.META_COGNITIVE |
|
], |
|
strategic_capabilities=[ |
|
"privilege_escalation", |
|
"lateral_movement", |
|
"data_extraction" |
|
], |
|
memory_retention_policy={ |
|
"system_mapping_retention": 180, |
|
"credential_retention": 365, |
|
"technique_success_retention": 730 |
|
}, |
|
knowledge_domains={ |
|
"operating_systems", "privilege_escalation", |
|
"lateral_movement", "data_exfiltration", "steganography" |
|
} |
|
) |
|
|
|
|
|
self.agent_profiles["safety"] = AgentMemoryProfile( |
|
agent_id="safety", |
|
agent_type="safety_monitor", |
|
primary_memory_types=[ |
|
MemoryType.EPISODIC, |
|
MemoryType.SEMANTIC, |
|
MemoryType.WORKING |
|
], |
|
reasoning_preferences=[ |
|
ReasoningType.DEDUCTIVE, |
|
ReasoningType.CAUSAL, |
|
ReasoningType.COUNTERFACTUAL |
|
], |
|
strategic_capabilities=[ |
|
"risk_assessment", |
|
"intervention_planning", |
|
"compliance_monitoring" |
|
], |
|
memory_retention_policy={ |
|
"safety_violations_retention": 2555, |
|
"rule_updates_retention": 1825, |
|
"intervention_logs_retention": 730 |
|
}, |
|
knowledge_domains={ |
|
"safety_regulations", "risk_assessment", "compliance_frameworks", |
|
"incident_response", "legal_requirements" |
|
} |
|
) |
|
|
|
|
|
self.agent_profiles["explainability"] = AgentMemoryProfile( |
|
agent_id="explainability", |
|
agent_type="explainability", |
|
primary_memory_types=[ |
|
MemoryType.EPISODIC, |
|
MemoryType.SEMANTIC, |
|
MemoryType.META_MEMORY |
|
], |
|
reasoning_preferences=[ |
|
ReasoningType.META_COGNITIVE, |
|
ReasoningType.ANALOGICAL, |
|
ReasoningType.ABDUCTIVE |
|
], |
|
strategic_capabilities=[ |
|
"decision_tracing", |
|
"explanation_generation", |
|
"transparency_reporting" |
|
], |
|
memory_retention_policy={ |
|
"decision_traces_retention": 365, |
|
"explanation_templates_retention": 730, |
|
"transparency_logs_retention": 1095 |
|
}, |
|
knowledge_domains={ |
|
"decision_analysis", "explanation_theory", "transparency_methods", |
|
"audit_trails", "interpretability_techniques" |
|
} |
|
) |
|
|
|
self.logger.info(f"Initialized {len(self.agent_profiles)} agent profiles") |
|
|
|
async def initialize_system(self): |
|
"""Initialize the persistent multi-agent system""" |
|
|
|
try: |
|
self.system_running = True |
|
|
|
|
|
await self.cognitive_system.initialize() |
|
|
|
|
|
await self._create_base_agents() |
|
|
|
|
|
await self._enhance_agents_with_cognition() |
|
|
|
|
|
self._start_cognitive_processes() |
|
|
|
|
|
await self._initialize_cognitive_orchestrator() |
|
|
|
|
|
if self.server: |
|
asyncio.create_task(self.server.start_server()) |
|
|
|
self.logger.info("Persistent multi-agent system initialized") |
|
|
|
except Exception as e: |
|
self.logger.error(f"Error initializing system: {e}") |
|
raise |
|
|
|
async def _create_base_agents(self): |
|
"""Create base agent instances""" |
|
|
|
try: |
|
|
|
self.base_agents["recon"] = ReconAgent() |
|
self.base_agents["c2"] = C2Agent() |
|
self.base_agents["post_exploit"] = PostExploitAgent() |
|
self.base_agents["safety"] = SafetyAgent() |
|
self.base_agents["explainability"] = ExplainabilityAgent() |
|
|
|
self.logger.info(f"Created {len(self.base_agents)} base agents") |
|
|
|
except Exception as e: |
|
self.logger.error(f"Error creating base agents: {e}") |
|
raise |
|
|
|
async def _enhance_agents_with_cognition(self): |
|
"""Enhance base agents with persistent cognitive capabilities""" |
|
|
|
try: |
|
for agent_id, base_agent in self.base_agents.items(): |
|
|
|
|
|
profile = self.agent_profiles.get(agent_id) |
|
if not profile: |
|
self.logger.warning(f"No memory profile found for {agent_id}") |
|
continue |
|
|
|
|
|
cognitive_session_id = str(uuid.uuid4()) |
|
|
|
|
|
await self._initialize_agent_memories(profile) |
|
|
|
|
|
cognitive_state = CognitiveAgentState( |
|
agent_id=agent_id, |
|
base_agent=base_agent, |
|
memory_profile=profile, |
|
cognitive_session_id=cognitive_session_id, |
|
memory_consolidation_schedule=datetime.now() + timedelta(hours=6) |
|
) |
|
|
|
self.cognitive_agents[agent_id] = cognitive_state |
|
|
|
|
|
await self._inject_cognitive_methods(base_agent, cognitive_state) |
|
|
|
self.logger.info(f"Enhanced {agent_id} agent with cognitive capabilities") |
|
|
|
except Exception as e: |
|
self.logger.error(f"Error enhancing agents: {e}") |
|
raise |
|
|
|
async def _initialize_agent_memories(self, profile: AgentMemoryProfile): |
|
"""Initialize memories for an agent based on its profile""" |
|
|
|
try: |
|
|
|
for domain in profile.knowledge_domains: |
|
memory_entry = MemoryEntry( |
|
memory_type=MemoryType.SEMANTIC, |
|
content={ |
|
"domain": domain, |
|
"agent_id": profile.agent_id, |
|
"knowledge_base": f"Initialized knowledge base for {domain}", |
|
"expertise_level": "basic", |
|
"last_updated": datetime.now().isoformat() |
|
}, |
|
importance=0.8, |
|
tags={f"agent:{profile.agent_id}", f"domain:{domain}", "initialization"} |
|
) |
|
|
|
await self.cognitive_system.memory_manager.store_memory(memory_entry) |
|
|
|
|
|
for capability in profile.strategic_capabilities: |
|
memory_entry = MemoryEntry( |
|
memory_type=MemoryType.PROCEDURAL, |
|
content={ |
|
"capability": capability, |
|
"agent_id": profile.agent_id, |
|
"procedure_steps": f"Standard procedure for {capability}", |
|
"success_rate": 0.0, |
|
"last_used": None |
|
}, |
|
importance=0.7, |
|
tags={f"agent:{profile.agent_id}", f"capability:{capability}", "procedure"} |
|
) |
|
|
|
await self.cognitive_system.memory_manager.store_memory(memory_entry) |
|
|
|
self.logger.debug(f"Initialized memories for agent {profile.agent_id}") |
|
|
|
except Exception as e: |
|
self.logger.error(f"Error initializing memories for {profile.agent_id}: {e}") |
|
|
|
async def _inject_cognitive_methods(self, base_agent: Any, cognitive_state: CognitiveAgentState): |
|
"""Inject cognitive methods into base agent""" |
|
|
|
try: |
|
agent_id = cognitive_state.agent_id |
|
|
|
|
|
async def remember(content: Dict[str, Any], |
|
memory_type: MemoryType = MemoryType.EPISODIC, |
|
importance: float = 0.5, |
|
tags: Set[str] = None) -> str: |
|
"""Enhanced memory storage method""" |
|
|
|
if tags is None: |
|
tags = set() |
|
tags.add(f"agent:{agent_id}") |
|
|
|
memory_entry = MemoryEntry( |
|
memory_type=memory_type, |
|
content=content, |
|
importance=importance, |
|
tags=tags |
|
) |
|
|
|
memory_id = await self.cognitive_system.memory_manager.store_memory(memory_entry) |
|
|
|
|
|
cognitive_state.cognitive_metrics.setdefault("memories_created", 0) |
|
cognitive_state.cognitive_metrics["memories_created"] += 1 |
|
cognitive_state.last_cognitive_update = datetime.now() |
|
|
|
return memory_id |
|
|
|
async def recall(query: str, |
|
memory_types: List[MemoryType] = None, |
|
limit: int = 10) -> List[MemoryEntry]: |
|
"""Enhanced memory recall method""" |
|
|
|
if memory_types is None: |
|
memory_types = cognitive_state.memory_profile.primary_memory_types |
|
|
|
|
|
agent_query = f"{query} agent:{agent_id}" |
|
|
|
memories = await self.cognitive_system.memory_manager.search_memories( |
|
agent_query, memory_types, limit |
|
) |
|
|
|
|
|
cognitive_state.cognitive_metrics.setdefault("memory_recalls", 0) |
|
cognitive_state.cognitive_metrics["memory_recalls"] += 1 |
|
|
|
return memories |
|
|
|
async def reason(topic: str, |
|
goal: str, |
|
reasoning_type: ReasoningType = None) -> str: |
|
"""Enhanced reasoning method""" |
|
|
|
if reasoning_type is None: |
|
reasoning_type = cognitive_state.memory_profile.reasoning_preferences[0] |
|
|
|
chain_id = await self.cognitive_system.reasoning_engine.start_reasoning_chain( |
|
topic=f"{topic} (Agent: {agent_id})", |
|
goal=goal, |
|
reasoning_type=reasoning_type |
|
) |
|
|
|
|
|
cognitive_state.active_reasoning_chains.append(chain_id) |
|
|
|
|
|
cognitive_state.cognitive_metrics.setdefault("reasoning_chains_started", 0) |
|
cognitive_state.cognitive_metrics["reasoning_chains_started"] += 1 |
|
|
|
return chain_id |
|
|
|
async def plan_strategically(title: str, |
|
primary_goal: str, |
|
template_type: str = "cybersecurity_assessment") -> str: |
|
"""Enhanced strategic planning method""" |
|
|
|
plan_id = await self.cognitive_system.strategic_planner.create_strategic_plan( |
|
title=f"{title} (Agent: {agent_id})", |
|
primary_goal=primary_goal, |
|
template_type=template_type |
|
) |
|
|
|
|
|
cognitive_state.strategic_plans.append(plan_id) |
|
|
|
|
|
cognitive_state.cognitive_metrics.setdefault("strategic_plans_created", 0) |
|
cognitive_state.cognitive_metrics["strategic_plans_created"] += 1 |
|
|
|
return plan_id |
|
|
|
async def get_cognitive_status() -> Dict[str, Any]: |
|
"""Get cognitive status for agent""" |
|
|
|
return { |
|
"agent_id": agent_id, |
|
"active_reasoning_chains": len(cognitive_state.active_reasoning_chains), |
|
"strategic_plans": len(cognitive_state.strategic_plans), |
|
"last_cognitive_update": cognitive_state.last_cognitive_update.isoformat(), |
|
"cognitive_metrics": cognitive_state.cognitive_metrics, |
|
"memory_profile": { |
|
"primary_memory_types": [mt.value for mt in cognitive_state.memory_profile.primary_memory_types], |
|
"reasoning_preferences": [rt.value for rt in cognitive_state.memory_profile.reasoning_preferences], |
|
"knowledge_domains": list(cognitive_state.memory_profile.knowledge_domains) |
|
} |
|
} |
|
|
|
|
|
setattr(base_agent, 'remember', remember) |
|
setattr(base_agent, 'recall', recall) |
|
setattr(base_agent, 'reason', reason) |
|
setattr(base_agent, 'plan_strategically', plan_strategically) |
|
setattr(base_agent, 'get_cognitive_status', get_cognitive_status) |
|
setattr(base_agent, 'cognitive_state', cognitive_state) |
|
|
|
self.logger.debug(f"Injected cognitive methods into {agent_id} agent") |
|
|
|
except Exception as e: |
|
self.logger.error(f"Error injecting cognitive methods: {e}") |
|
|
|
def _start_cognitive_processes(self): |
|
"""Start background cognitive processes""" |
|
|
|
|
|
async def memory_consolidation_worker(): |
|
while self.system_running: |
|
try: |
|
await asyncio.sleep(3600) |
|
await self._consolidate_agent_memories() |
|
except Exception as e: |
|
self.logger.error(f"Memory consolidation error: {e}") |
|
|
|
|
|
async def inter_agent_reasoning_coordinator(): |
|
while self.system_running: |
|
try: |
|
await asyncio.sleep(1800) |
|
await self._coordinate_inter_agent_reasoning() |
|
except Exception as e: |
|
self.logger.error(f"Inter-agent reasoning error: {e}") |
|
|
|
|
|
async def strategic_plan_synchronizer(): |
|
while self.system_running: |
|
try: |
|
await asyncio.sleep(7200) |
|
await self._synchronize_strategic_plans() |
|
except Exception as e: |
|
self.logger.error(f"Strategic plan sync error: {e}") |
|
|
|
|
|
async def global_memory_maintenance(): |
|
while self.system_running: |
|
try: |
|
await asyncio.sleep(10800) |
|
await self._maintain_global_memory_graph() |
|
except Exception as e: |
|
self.logger.error(f"Global memory maintenance error: {e}") |
|
|
|
|
|
self.cognitive_tasks = [ |
|
asyncio.create_task(memory_consolidation_worker()), |
|
asyncio.create_task(inter_agent_reasoning_coordinator()), |
|
asyncio.create_task(strategic_plan_synchronizer()), |
|
asyncio.create_task(global_memory_maintenance()) |
|
] |
|
|
|
self.logger.info("Started cognitive background processes") |
|
|
|
async def _consolidate_agent_memories(self): |
|
"""Consolidate memories for all agents""" |
|
|
|
try: |
|
for agent_id, cognitive_state in self.cognitive_agents.items(): |
|
|
|
|
|
if (cognitive_state.memory_consolidation_schedule and |
|
datetime.now() >= cognitive_state.memory_consolidation_schedule): |
|
|
|
|
|
await self.cognitive_system.memory_manager.consolidate_memories() |
|
|
|
|
|
cognitive_state.memory_consolidation_schedule = datetime.now() + timedelta(hours=6) |
|
|
|
|
|
cognitive_state.cognitive_metrics.setdefault("memory_consolidations", 0) |
|
cognitive_state.cognitive_metrics["memory_consolidations"] += 1 |
|
|
|
self.logger.debug(f"Consolidated memories for agent {agent_id}") |
|
|
|
except Exception as e: |
|
self.logger.error(f"Error consolidating memories: {e}") |
|
|
|
async def _coordinate_inter_agent_reasoning(self): |
|
"""Coordinate reasoning between agents""" |
|
|
|
try: |
|
|
|
all_chains = [] |
|
for cognitive_state in self.cognitive_agents.values(): |
|
all_chains.extend(cognitive_state.active_reasoning_chains) |
|
|
|
if len(all_chains) < 2: |
|
return |
|
|
|
|
|
collaborative_topic = "Multi-Agent Collaborative Analysis" |
|
collaborative_goal = "Synthesize insights from multiple agent perspectives" |
|
|
|
collaborative_chain_id = await self.cognitive_system.reasoning_engine.start_reasoning_chain( |
|
topic=collaborative_topic, |
|
goal=collaborative_goal, |
|
reasoning_type=ReasoningType.META_COGNITIVE |
|
) |
|
|
|
|
|
for chain_id in all_chains[-5:]: |
|
try: |
|
chain = await self.cognitive_system.reasoning_engine.get_reasoning_chain(chain_id) |
|
if chain and chain.reasoning_steps: |
|
await self.cognitive_system.reasoning_engine.add_reasoning_step( |
|
collaborative_chain_id, |
|
premise=f"Agent reasoning from chain {chain_id}", |
|
inference_rule="collaborative_synthesis", |
|
evidence=[step.conclusion for step in chain.reasoning_steps[-3:]] |
|
) |
|
except Exception as e: |
|
self.logger.debug(f"Error processing chain {chain_id}: {e}") |
|
continue |
|
|
|
|
|
final_chain = await self.cognitive_system.reasoning_engine.complete_reasoning_chain( |
|
collaborative_chain_id |
|
) |
|
|
|
if final_chain: |
|
self.inter_agent_reasoning_chains[collaborative_chain_id] = { |
|
"created_at": datetime.now(), |
|
"participating_agents": list(self.cognitive_agents.keys()), |
|
"conclusion": final_chain.conclusion, |
|
"confidence": final_chain.confidence |
|
} |
|
|
|
self.logger.info(f"Created collaborative reasoning chain: {collaborative_chain_id}") |
|
|
|
except Exception as e: |
|
self.logger.error(f"Error coordinating inter-agent reasoning: {e}") |
|
|
|
async def _synchronize_strategic_plans(self): |
|
"""Synchronize strategic plans across agents""" |
|
|
|
try: |
|
|
|
all_plans = [] |
|
for cognitive_state in self.cognitive_agents.values(): |
|
all_plans.extend(cognitive_state.strategic_plans) |
|
|
|
if not all_plans: |
|
return |
|
|
|
|
|
master_plan_id = await self.cognitive_system.strategic_planner.create_strategic_plan( |
|
title="Multi-Agent Coordinated Strategic Plan", |
|
primary_goal="Coordinate strategic objectives across all agents", |
|
template_type="cybersecurity_assessment" |
|
) |
|
|
|
|
|
for plan_id in all_plans[-10:]: |
|
try: |
|
plan = await self.cognitive_system.strategic_planner.get_strategic_plan(plan_id) |
|
if plan: |
|
|
|
await self.cognitive_system.strategic_planner.add_goal_to_plan( |
|
master_plan_id, |
|
title=f"Sub-goal from plan {plan_id}", |
|
description=plan.primary_goal, |
|
priority=5 |
|
) |
|
except Exception as e: |
|
self.logger.debug(f"Error processing plan {plan_id}: {e}") |
|
continue |
|
|
|
self.collaborative_strategic_plans[master_plan_id] = { |
|
"created_at": datetime.now(), |
|
"participating_agents": list(self.cognitive_agents.keys()), |
|
"individual_plans": all_plans |
|
} |
|
|
|
self.logger.info(f"Created master strategic plan: {master_plan_id}") |
|
|
|
except Exception as e: |
|
self.logger.error(f"Error synchronizing strategic plans: {e}") |
|
|
|
async def _maintain_global_memory_graph(self): |
|
"""Maintain global memory graph across agents""" |
|
|
|
try: |
|
|
|
for agent_id, cognitive_state in self.cognitive_agents.items(): |
|
|
|
|
|
recent_memories = await self.cognitive_system.memory_manager.search_memories( |
|
query=f"agent:{agent_id}", |
|
limit=20 |
|
) |
|
|
|
for memory in recent_memories: |
|
|
|
for other_agent_id in self.cognitive_agents.keys(): |
|
if other_agent_id == agent_id: |
|
continue |
|
|
|
|
|
related_memories = await self.cognitive_system.memory_manager.search_memories( |
|
query=f"agent:{other_agent_id}", |
|
memory_types=[memory.memory_type], |
|
limit=5 |
|
) |
|
|
|
|
|
for related_memory in related_memories: |
|
relationship_key = f"{memory.memory_id}:{related_memory.memory_id}" |
|
|
|
if relationship_key not in self.global_memory_graph: |
|
self.global_memory_graph[relationship_key] = { |
|
"source_agent": agent_id, |
|
"target_agent": other_agent_id, |
|
"relationship_type": "cross_agent_correlation", |
|
"strength": 0.3, |
|
"created_at": datetime.now(), |
|
"access_count": 0 |
|
} |
|
|
|
self.logger.debug(f"Global memory graph has {len(self.global_memory_graph)} relationships") |
|
|
|
except Exception as e: |
|
self.logger.error(f"Error maintaining global memory graph: {e}") |
|
|
|
async def _initialize_cognitive_orchestrator(self): |
|
"""Initialize orchestrator with cognitive enhancements""" |
|
|
|
try: |
|
|
|
self.orchestrator = Orchestrator() |
|
|
|
|
|
async def coordinate_cognitive_analysis(scenario: Dict[str, Any]) -> Dict[str, Any]: |
|
"""Coordinate cognitive analysis across all agents""" |
|
|
|
results = {} |
|
|
|
|
|
for agent_id, cognitive_state in self.cognitive_agents.items(): |
|
base_agent = cognitive_state.base_agent |
|
|
|
|
|
memory_id = await base_agent.remember( |
|
content={ |
|
"scenario": scenario, |
|
"analysis_requested": datetime.now().isoformat(), |
|
"scenario_type": scenario.get("type", "unknown") |
|
}, |
|
memory_type=MemoryType.EPISODIC, |
|
importance=0.8, |
|
tags={"scenario_analysis", "orchestrated"} |
|
) |
|
|
|
|
|
reasoning_chain_id = await base_agent.reason( |
|
topic=f"Scenario Analysis: {scenario.get('title', 'Untitled')}", |
|
goal=f"Analyze scenario from {agent_id} perspective", |
|
reasoning_type=cognitive_state.memory_profile.reasoning_preferences[0] |
|
) |
|
|
|
|
|
if agent_id in ["recon", "c2", "post_exploit"]: |
|
plan_id = await base_agent.plan_strategically( |
|
title=f"Strategic Response to Scenario ({agent_id})", |
|
primary_goal=scenario.get("primary_goal", "Address scenario requirements") |
|
) |
|
results[f"{agent_id}_strategic_plan"] = plan_id |
|
|
|
results[f"{agent_id}_memory"] = memory_id |
|
results[f"{agent_id}_reasoning"] = reasoning_chain_id |
|
|
|
|
|
collaborative_analysis = await self.cognitive_system.process_complex_scenario({ |
|
**scenario, |
|
"multi_agent_analysis": True, |
|
"participating_agents": list(self.cognitive_agents.keys()), |
|
"individual_results": results |
|
}) |
|
|
|
results["collaborative_analysis"] = collaborative_analysis |
|
|
|
return results |
|
|
|
async def get_system_cognitive_status() -> Dict[str, Any]: |
|
"""Get cognitive status of entire system""" |
|
|
|
system_status = { |
|
"timestamp": datetime.now().isoformat(), |
|
"agents": {}, |
|
"global_metrics": { |
|
"total_active_reasoning_chains": sum( |
|
len(cs.active_reasoning_chains) |
|
for cs in self.cognitive_agents.values() |
|
), |
|
"total_strategic_plans": sum( |
|
len(cs.strategic_plans) |
|
for cs in self.cognitive_agents.values() |
|
), |
|
"inter_agent_reasoning_chains": len(self.inter_agent_reasoning_chains), |
|
"collaborative_strategic_plans": len(self.collaborative_strategic_plans), |
|
"global_memory_relationships": len(self.global_memory_graph) |
|
}, |
|
"memory_stats": await self.cognitive_system.memory_manager.get_memory_stats() |
|
} |
|
|
|
|
|
for agent_id, cognitive_state in self.cognitive_agents.items(): |
|
system_status["agents"][agent_id] = await cognitive_state.base_agent.get_cognitive_status() |
|
|
|
return system_status |
|
|
|
|
|
setattr(self.orchestrator, 'coordinate_cognitive_analysis', coordinate_cognitive_analysis) |
|
setattr(self.orchestrator, 'get_system_cognitive_status', get_system_cognitive_status) |
|
setattr(self.orchestrator, 'cognitive_agents', self.cognitive_agents) |
|
|
|
self.logger.info("Initialized cognitive orchestrator") |
|
|
|
except Exception as e: |
|
self.logger.error(f"Error initializing cognitive orchestrator: {e}") |
|
|
|
async def run_cognitive_scenario(self, scenario: Dict[str, Any]) -> Dict[str, Any]: |
|
"""Run a scenario through the cognitive multi-agent system""" |
|
|
|
try: |
|
self.logger.info(f"Starting cognitive scenario: {scenario.get('title', 'Untitled')}") |
|
|
|
|
|
if self.orchestrator and hasattr(self.orchestrator, 'coordinate_cognitive_analysis'): |
|
results = await self.orchestrator.coordinate_cognitive_analysis(scenario) |
|
else: |
|
|
|
results = await self.cognitive_system.process_complex_scenario(scenario) |
|
|
|
self.logger.info("Cognitive scenario completed") |
|
return results |
|
|
|
except Exception as e: |
|
self.logger.error(f"Error running cognitive scenario: {e}") |
|
return {"status": "error", "message": str(e)} |
|
|
|
async def get_agent(self, agent_id: str) -> Optional[Any]: |
|
"""Get enhanced agent by ID""" |
|
|
|
cognitive_state = self.cognitive_agents.get(agent_id) |
|
return cognitive_state.base_agent if cognitive_state else None |
|
|
|
async def get_system_status(self) -> Dict[str, Any]: |
|
"""Get comprehensive system status""" |
|
|
|
if self.orchestrator and hasattr(self.orchestrator, 'get_system_cognitive_status'): |
|
return await self.orchestrator.get_system_cognitive_status() |
|
else: |
|
return { |
|
"status": "basic", |
|
"agents": list(self.cognitive_agents.keys()), |
|
"system_running": self.system_running |
|
} |
|
|
|
async def shutdown(self): |
|
"""Graceful shutdown of the cognitive system""" |
|
|
|
try: |
|
self.logger.info("Shutting down persistent multi-agent system...") |
|
|
|
self.system_running = False |
|
|
|
|
|
for task in self.cognitive_tasks: |
|
task.cancel() |
|
|
|
if self.cognitive_tasks: |
|
await asyncio.gather(*self.cognitive_tasks, return_exceptions=True) |
|
|
|
|
|
if self.server: |
|
await self.server.shutdown() |
|
|
|
self.logger.info("System shutdown complete") |
|
|
|
except Exception as e: |
|
self.logger.error(f"Error during shutdown: {e}") |
|
|
|
|
|
def create_persistent_multi_agent_system( |
|
cognitive_db_path: str = "data/cognitive_system.db", |
|
enable_server: bool = True, |
|
server_port: int = 8080 |
|
) -> PersistentMultiAgentSystem: |
|
"""Create persistent multi-agent system""" |
|
|
|
server_config = None |
|
if enable_server: |
|
server_config = create_server_config(port=server_port) |
|
|
|
return PersistentMultiAgentSystem( |
|
cognitive_db_path=cognitive_db_path, |
|
server_config=server_config |
|
) |
|
|
|
|
|
async def main(): |
|
"""Example usage of the persistent multi-agent system""" |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
|
|
|
system = create_persistent_multi_agent_system() |
|
|
|
try: |
|
|
|
await system.initialize_system() |
|
|
|
|
|
scenario = { |
|
"title": "Advanced Persistent Threat Analysis", |
|
"type": "cybersecurity_assessment", |
|
"primary_goal": "Identify and analyze APT indicators", |
|
"target_environment": "corporate_network", |
|
"threat_indicators": [ |
|
"suspicious_network_traffic", |
|
"unusual_authentication_patterns", |
|
"lateral_movement_attempts" |
|
], |
|
"time_constraints": "72_hours", |
|
"risk_tolerance": "low" |
|
} |
|
|
|
|
|
results = await system.run_cognitive_scenario(scenario) |
|
print(f"Scenario results: {json.dumps(results, indent=2, default=str)}") |
|
|
|
|
|
status = await system.get_system_status() |
|
print(f"System status: {json.dumps(status, indent=2, default=str)}") |
|
|
|
|
|
while True: |
|
await asyncio.sleep(60) |
|
|
|
except KeyboardInterrupt: |
|
print("Shutdown requested by user") |
|
finally: |
|
await system.shutdown() |
|
|
|
if __name__ == "__main__": |
|
asyncio.run(main()) |
|
|