|
""" |
|
Multi-Agent Collaboration Framework for Cyber-LLM |
|
Advanced agent-to-agent communication and swarm intelligence |
|
|
|
Author: Muzan Sano <[email protected]> |
|
""" |
|
|
|
import asyncio |
|
import json |
|
import logging |
|
import uuid |
|
from datetime import datetime, timedelta |
|
from typing import Dict, List, Any, Optional, Tuple, Union, Callable |
|
from dataclasses import dataclass, field |
|
from enum import Enum |
|
import numpy as np |
|
from collections import defaultdict, deque |
|
import websockets |
|
import aiohttp |
|
|
|
from ..utils.logging_system import CyberLLMLogger, CyberLLMError, ErrorCategory |
|
from ..memory.persistent_memory import PersistentMemoryManager |
|
from ..cognitive.meta_cognitive import MetaCognitiveEngine |
|
|
|
class MessageType(Enum): |
|
"""Agent communication message types""" |
|
TASK_REQUEST = "task_request" |
|
TASK_RESPONSE = "task_response" |
|
INFORMATION_SHARE = "information_share" |
|
COORDINATION_REQUEST = "coordination_request" |
|
CONSENSUS_PROPOSAL = "consensus_proposal" |
|
CONSENSUS_VOTE = "consensus_vote" |
|
CAPABILITY_ANNOUNCEMENT = "capability_announcement" |
|
RESOURCE_REQUEST = "resource_request" |
|
RESOURCE_OFFER = "resource_offer" |
|
SWARM_DIRECTIVE = "swarm_directive" |
|
EMERGENCY_ALERT = "emergency_alert" |
|
|
|
class AgentRole(Enum): |
|
"""Agent roles in the collaboration framework""" |
|
LEADER = "leader" |
|
SPECIALIST = "specialist" |
|
COORDINATOR = "coordinator" |
|
SCOUT = "scout" |
|
ANALYZER = "analyzer" |
|
EXECUTOR = "executor" |
|
MONITOR = "monitor" |
|
|
|
class ConsensusAlgorithm(Enum): |
|
"""Consensus algorithms for decision making""" |
|
MAJORITY_VOTE = "majority_vote" |
|
WEIGHTED_VOTE = "weighted_vote" |
|
BYZANTINE_FAULT_TOLERANT = "byzantine_fault_tolerant" |
|
PROOF_OF_EXPERTISE = "proof_of_expertise" |
|
RAFT = "raft" |
|
|
|
@dataclass |
|
class AgentMessage: |
|
"""Inter-agent communication message""" |
|
message_id: str |
|
sender_id: str |
|
recipient_id: Optional[str] |
|
message_type: MessageType |
|
timestamp: datetime |
|
|
|
|
|
content: Dict[str, Any] |
|
priority: int = 5 |
|
|
|
|
|
ttl: int = 300 |
|
requires_acknowledgment: bool = False |
|
correlation_id: Optional[str] = None |
|
|
|
|
|
signature: Optional[str] = None |
|
encrypted: bool = False |
|
|
|
@dataclass |
|
class AgentCapability: |
|
"""Agent capability description""" |
|
capability_id: str |
|
name: str |
|
description: str |
|
|
|
|
|
accuracy: float |
|
speed: float |
|
resource_cost: float |
|
|
|
|
|
available: bool = True |
|
current_load: float = 0.0 |
|
max_concurrent: int = 10 |
|
|
|
|
|
required_resources: Dict[str, float] = field(default_factory=dict) |
|
dependencies: List[str] = field(default_factory=list) |
|
|
|
@dataclass |
|
class SwarmTask: |
|
"""Task for swarm execution""" |
|
task_id: str |
|
description: str |
|
task_type: str |
|
|
|
|
|
required_capabilities: List[str] |
|
estimated_complexity: float |
|
deadline: Optional[datetime] = None |
|
|
|
|
|
subtasks: List['SwarmTask'] = field(default_factory=list) |
|
dependencies: List[str] = field(default_factory=list) |
|
|
|
|
|
assigned_agents: List[str] = field(default_factory=list) |
|
status: str = "pending" |
|
|
|
|
|
results: Dict[str, Any] = field(default_factory=dict) |
|
completion_time: Optional[datetime] = None |
|
|
|
class AgentCommunicationProtocol: |
|
"""Standardized protocol for agent communication""" |
|
|
|
def __init__(self, agent_id: str, logger: Optional[CyberLLMLogger] = None): |
|
self.agent_id = agent_id |
|
self.logger = logger or CyberLLMLogger(name="agent_protocol") |
|
|
|
|
|
self.message_queue = asyncio.Queue() |
|
self.active_connections = {} |
|
self.message_handlers = {} |
|
self.acknowledgments = {} |
|
|
|
|
|
self.capabilities = {} |
|
self.peer_agents = {} |
|
self.conversation_contexts = {} |
|
|
|
|
|
self.trusted_agents = set() |
|
self.encryption_keys = {} |
|
|
|
self.logger.info("Agent Communication Protocol initialized", agent_id=agent_id) |
|
|
|
async def send_message(self, message: AgentMessage) -> bool: |
|
"""Send message to another agent or broadcast""" |
|
|
|
try: |
|
|
|
if not self._validate_message(message): |
|
self.logger.error("Invalid message", message_id=message.message_id) |
|
return False |
|
|
|
|
|
message.timestamp = datetime.now() |
|
message.sender_id = self.agent_id |
|
|
|
|
|
if message.encrypted or message.signature: |
|
message = await self._secure_message(message) |
|
|
|
|
|
if message.recipient_id: |
|
|
|
success = await self._send_direct_message(message) |
|
else: |
|
|
|
success = await self._broadcast_message(message) |
|
|
|
|
|
if message.requires_acknowledgment and success: |
|
asyncio.create_task(self._wait_for_acknowledgment(message)) |
|
|
|
self.logger.info("Message sent", |
|
message_id=message.message_id, |
|
recipient=message.recipient_id or "broadcast", |
|
type=message.message_type.value) |
|
|
|
return success |
|
|
|
except Exception as e: |
|
self.logger.error("Failed to send message", error=str(e)) |
|
return False |
|
|
|
async def receive_message(self) -> Optional[AgentMessage]: |
|
"""Receive next message from queue""" |
|
|
|
try: |
|
|
|
message = await asyncio.wait_for(self.message_queue.get(), timeout=1.0) |
|
|
|
|
|
if self._validate_received_message(message): |
|
await self._process_received_message(message) |
|
return message |
|
|
|
return None |
|
|
|
except asyncio.TimeoutError: |
|
return None |
|
except Exception as e: |
|
self.logger.error("Failed to receive message", error=str(e)) |
|
return None |
|
|
|
async def register_capability(self, capability: AgentCapability): |
|
"""Register agent capability""" |
|
|
|
self.capabilities[capability.capability_id] = capability |
|
|
|
|
|
announcement = AgentMessage( |
|
message_id=str(uuid.uuid4()), |
|
sender_id=self.agent_id, |
|
recipient_id=None, |
|
message_type=MessageType.CAPABILITY_ANNOUNCEMENT, |
|
timestamp=datetime.now(), |
|
content={ |
|
"capability": { |
|
"id": capability.capability_id, |
|
"name": capability.name, |
|
"description": capability.description, |
|
"accuracy": capability.accuracy, |
|
"speed": capability.speed, |
|
"available": capability.available |
|
} |
|
} |
|
) |
|
|
|
await self.send_message(announcement) |
|
|
|
self.logger.info("Capability registered and announced", |
|
capability_id=capability.capability_id, |
|
name=capability.name) |
|
|
|
class DistributedConsensus: |
|
"""Distributed consensus mechanisms for multi-agent decisions""" |
|
|
|
def __init__(self, |
|
agent_id: str, |
|
communication_protocol: AgentCommunicationProtocol, |
|
logger: Optional[CyberLLMLogger] = None): |
|
|
|
self.agent_id = agent_id |
|
self.protocol = communication_protocol |
|
self.logger = logger or CyberLLMLogger(name="consensus") |
|
|
|
|
|
self.active_proposals = {} |
|
self.voting_history = deque(maxlen=1000) |
|
self.consensus_results = {} |
|
|
|
|
|
self.agent_weights = {} |
|
|
|
self.logger.info("Distributed Consensus initialized", agent_id=agent_id) |
|
|
|
async def propose_consensus(self, |
|
proposal_id: str, |
|
proposal_content: Dict[str, Any], |
|
algorithm: ConsensusAlgorithm = ConsensusAlgorithm.MAJORITY_VOTE, |
|
timeout: int = 300) -> Dict[str, Any]: |
|
"""Propose a decision for consensus""" |
|
|
|
try: |
|
proposal = { |
|
"proposal_id": proposal_id, |
|
"proposer": self.agent_id, |
|
"content": proposal_content, |
|
"algorithm": algorithm.value, |
|
"created_at": datetime.now().isoformat(), |
|
"timeout": timeout, |
|
"votes": {}, |
|
"status": "active" |
|
} |
|
|
|
self.active_proposals[proposal_id] = proposal |
|
|
|
|
|
message = AgentMessage( |
|
message_id=str(uuid.uuid4()), |
|
sender_id=self.agent_id, |
|
recipient_id=None, |
|
message_type=MessageType.CONSENSUS_PROPOSAL, |
|
timestamp=datetime.now(), |
|
content=proposal, |
|
ttl=timeout |
|
) |
|
|
|
await self.protocol.send_message(message) |
|
|
|
|
|
result = await self._wait_for_consensus(proposal_id, timeout) |
|
|
|
self.logger.info("Consensus proposal completed", |
|
proposal_id=proposal_id, |
|
result=result.get("decision"), |
|
votes_received=len(result.get("votes", {}))) |
|
|
|
return result |
|
|
|
except Exception as e: |
|
self.logger.error("Consensus proposal failed", error=str(e)) |
|
return {"decision": "failed", "error": str(e)} |
|
|
|
async def vote_on_proposal(self, |
|
proposal_id: str, |
|
vote: Union[bool, float, str], |
|
justification: Optional[str] = None) -> bool: |
|
"""Vote on an active proposal""" |
|
|
|
try: |
|
if proposal_id not in self.active_proposals: |
|
self.logger.warning("Unknown proposal", proposal_id=proposal_id) |
|
return False |
|
|
|
proposal = self.active_proposals[proposal_id] |
|
|
|
|
|
vote_content = { |
|
"proposal_id": proposal_id, |
|
"vote": vote, |
|
"voter": self.agent_id, |
|
"timestamp": datetime.now().isoformat(), |
|
"justification": justification |
|
} |
|
|
|
message = AgentMessage( |
|
message_id=str(uuid.uuid4()), |
|
sender_id=self.agent_id, |
|
recipient_id=proposal["proposer"], |
|
message_type=MessageType.CONSENSUS_VOTE, |
|
timestamp=datetime.now(), |
|
content=vote_content |
|
) |
|
|
|
await self.protocol.send_message(message) |
|
|
|
|
|
self.voting_history.append((datetime.now(), proposal_id, vote)) |
|
|
|
self.logger.info("Vote submitted", |
|
proposal_id=proposal_id, |
|
vote=vote) |
|
|
|
return True |
|
|
|
except Exception as e: |
|
self.logger.error("Failed to vote on proposal", error=str(e)) |
|
return False |
|
|
|
class SwarmIntelligence: |
|
"""Swarm intelligence capabilities for emergent behavior""" |
|
|
|
def __init__(self, |
|
agent_id: str, |
|
communication_protocol: AgentCommunicationProtocol, |
|
memory_manager: PersistentMemoryManager, |
|
logger: Optional[CyberLLMLogger] = None): |
|
|
|
self.agent_id = agent_id |
|
self.protocol = communication_protocol |
|
self.memory_manager = memory_manager |
|
self.logger = logger or CyberLLMLogger(name="swarm_intelligence") |
|
|
|
|
|
self.swarm_members = set() |
|
self.role = AgentRole.SPECIALIST |
|
self.current_tasks = {} |
|
|
|
|
|
self.pheromone_trails = defaultdict(float) |
|
self.collective_knowledge = {} |
|
self.emergence_patterns = {} |
|
|
|
|
|
self.task_queue = asyncio.Queue() |
|
self.completed_tasks = deque(maxlen=1000) |
|
|
|
self.logger.info("Swarm Intelligence initialized", agent_id=agent_id) |
|
|
|
async def join_swarm(self, swarm_id: str, role: AgentRole = AgentRole.SPECIALIST): |
|
"""Join a swarm with specified role""" |
|
|
|
try: |
|
self.role = role |
|
self.swarm_members.add(self.agent_id) |
|
|
|
|
|
message = AgentMessage( |
|
message_id=str(uuid.uuid4()), |
|
sender_id=self.agent_id, |
|
recipient_id=None, |
|
message_type=MessageType.INFORMATION_SHARE, |
|
timestamp=datetime.now(), |
|
content={ |
|
"action": "join_swarm", |
|
"swarm_id": swarm_id, |
|
"role": role.value, |
|
"agent_capabilities": list(self.protocol.capabilities.keys()) |
|
} |
|
) |
|
|
|
await self.protocol.send_message(message) |
|
|
|
|
|
asyncio.create_task(self._run_swarm_behaviors()) |
|
|
|
self.logger.info("Joined swarm", |
|
swarm_id=swarm_id, |
|
role=role.value) |
|
|
|
except Exception as e: |
|
self.logger.error("Failed to join swarm", error=str(e)) |
|
|
|
async def distribute_task(self, task: SwarmTask) -> str: |
|
"""Distribute task across swarm members""" |
|
|
|
try: |
|
|
|
task_requirements = await self._analyze_task_requirements(task) |
|
|
|
|
|
suitable_agents = await self._find_suitable_agents(task_requirements) |
|
|
|
if not suitable_agents: |
|
self.logger.warning("No suitable agents found for task", task_id=task.task_id) |
|
return "failed" |
|
|
|
|
|
if len(task.required_capabilities) > 1 or task.estimated_complexity > 0.7: |
|
subtasks = await self._decompose_task(task) |
|
if subtasks: |
|
|
|
for subtask in subtasks: |
|
await self.distribute_task(subtask) |
|
return "distributed" |
|
|
|
|
|
best_agent = await self._select_best_agent(suitable_agents, task_requirements) |
|
|
|
|
|
task_message = AgentMessage( |
|
message_id=str(uuid.uuid4()), |
|
sender_id=self.agent_id, |
|
recipient_id=best_agent, |
|
message_type=MessageType.TASK_REQUEST, |
|
timestamp=datetime.now(), |
|
content={ |
|
"task": { |
|
"id": task.task_id, |
|
"description": task.description, |
|
"type": task.task_type, |
|
"complexity": task.estimated_complexity, |
|
"deadline": task.deadline.isoformat() if task.deadline else None, |
|
"requirements": task_requirements |
|
} |
|
}, |
|
requires_acknowledgment=True |
|
) |
|
|
|
await self.protocol.send_message(task_message) |
|
|
|
|
|
task.assigned_agents = [best_agent] |
|
task.status = "assigned" |
|
self.current_tasks[task.task_id] = task |
|
|
|
self.logger.info("Task distributed", |
|
task_id=task.task_id, |
|
assigned_agent=best_agent) |
|
|
|
return "assigned" |
|
|
|
except Exception as e: |
|
self.logger.error("Task distribution failed", error=str(e)) |
|
return "failed" |
|
|
|
async def execute_collective_problem_solving(self, |
|
problem: Dict[str, Any]) -> Dict[str, Any]: |
|
"""Execute collective problem solving using swarm intelligence""" |
|
|
|
try: |
|
problem_id = problem.get("id", str(uuid.uuid4())) |
|
|
|
self.logger.info("Starting collective problem solving", problem_id=problem_id) |
|
|
|
|
|
subproblems = await self._decompose_problem(problem) |
|
|
|
|
|
partial_solutions = [] |
|
for subproblem in subproblems: |
|
solution = await self._solve_subproblem_collectively(subproblem) |
|
partial_solutions.append(solution) |
|
|
|
|
|
final_solution = await self._synthesize_solutions(partial_solutions, problem) |
|
|
|
|
|
validation_result = await self._validate_solution_collectively( |
|
final_solution, problem) |
|
|
|
|
|
self.collective_knowledge[problem_id] = { |
|
"problem": problem, |
|
"solution": final_solution, |
|
"validation": validation_result, |
|
"timestamp": datetime.now().isoformat(), |
|
"participating_agents": list(self.swarm_members) |
|
} |
|
|
|
|
|
if validation_result.get("valid", False): |
|
await self._update_pheromone_trails(problem, final_solution) |
|
|
|
self.logger.info("Collective problem solving completed", |
|
problem_id=problem_id, |
|
solution_quality=validation_result.get("quality", 0.0)) |
|
|
|
return { |
|
"problem_id": problem_id, |
|
"solution": final_solution, |
|
"validation": validation_result, |
|
"collective_intelligence_applied": True |
|
} |
|
|
|
except Exception as e: |
|
self.logger.error("Collective problem solving failed", error=str(e)) |
|
return {"problem_id": problem_id, "error": str(e)} |
|
|
|
class TaskDistributionEngine: |
|
"""Advanced task distribution and load balancing""" |
|
|
|
def __init__(self, logger: Optional[CyberLLMLogger] = None): |
|
self.logger = logger or CyberLLMLogger(name="task_distribution") |
|
self.agent_loads = defaultdict(float) |
|
self.task_history = deque(maxlen=10000) |
|
self.performance_metrics = defaultdict(dict) |
|
|
|
async def distribute_workload(self, |
|
tasks: List[SwarmTask], |
|
available_agents: Dict[str, AgentCapability]) -> Dict[str, List[str]]: |
|
"""Distribute workload optimally across agents""" |
|
|
|
try: |
|
|
|
task_assignments = {} |
|
|
|
for task in tasks: |
|
best_agent = await self._find_optimal_agent(task, available_agents) |
|
if best_agent: |
|
if best_agent not in task_assignments: |
|
task_assignments[best_agent] = [] |
|
task_assignments[best_agent].append(task.task_id) |
|
|
|
|
|
self.agent_loads[best_agent] += task.estimated_complexity |
|
|
|
self.logger.info("Workload distributed", |
|
tasks_count=len(tasks), |
|
agents_used=len(task_assignments)) |
|
|
|
return task_assignments |
|
|
|
except Exception as e: |
|
self.logger.error("Workload distribution failed", error=str(e)) |
|
return {} |
|
|
|
|
|
def create_communication_protocol(agent_id: str, **kwargs) -> AgentCommunicationProtocol: |
|
"""Create agent communication protocol""" |
|
return AgentCommunicationProtocol(agent_id, **kwargs) |
|
|
|
def create_distributed_consensus(agent_id: str, |
|
protocol: AgentCommunicationProtocol, |
|
**kwargs) -> DistributedConsensus: |
|
"""Create distributed consensus manager""" |
|
return DistributedConsensus(agent_id, protocol, **kwargs) |
|
|
|
def create_swarm_intelligence(agent_id: str, |
|
protocol: AgentCommunicationProtocol, |
|
memory_manager: PersistentMemoryManager, |
|
**kwargs) -> SwarmIntelligence: |
|
"""Create swarm intelligence engine""" |
|
return SwarmIntelligence(agent_id, protocol, memory_manager, **kwargs) |
|
|