diff --git "a/app.py" "b/app.py"
--- "a/app.py"
+++ "b/app.py"
@@ -1,18 +1,14 @@
# Multi-Agent AI Collaboration System
-# Author: [Your Name]
-# Description: Enterprise-grade multi-agent system with specialized AI agents collaborating
+# Enterprise-grade multi-agent system with specialized AI agents collaborating
# to solve complex problems through intelligent task decomposition and parallel processing.
-# Installation (uncomment for Google Colab)
-# !pip install gradio langchain langchain-openai openai networkx matplotlib asyncio aiohttp pandas numpy plotly python-dotenv pydantic scipy
-
import os
import json
import time
import asyncio
import hashlib
import logging
-from datetime import datetime, timedelta
+from datetime import datetime
from typing import Dict, List, Tuple, Optional, Any, Union, Set
from dataclasses import dataclass, field
from enum import Enum
@@ -24,22 +20,15 @@ import gradio as gr
import pandas as pd
import numpy as np
import networkx as nx
-import matplotlib.pyplot as plt
-from matplotlib.patches import FancyBboxPatch
import plotly.graph_objects as go
import plotly.express as px
-from plotly.subplots import make_subplots
# LangChain and AI libraries
-from langchain.schema import BaseMessage, HumanMessage, AIMessage, SystemMessage
from langchain_openai import ChatOpenAI
-from langchain.callbacks.base import BaseCallbackHandler
-from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
-from langchain.memory import ConversationBufferMemory
-from pydantic import BaseModel, Field
+from langchain.schema import HumanMessage, SystemMessage
+from langchain.prompts import ChatPromptTemplate
# Async libraries
-import aiohttp
from concurrent.futures import ThreadPoolExecutor, as_completed
# Configure logging
@@ -49,52 +38,44 @@ logging.basicConfig(
)
logger = logging.getLogger(__name__)
-class Config:
- """Configuration settings for the multi-agent system."""
+# ==============================================================================
+# 1. SYSTEM CONFIGURATION AND DATA STRUCTURES
+# ==============================================================================
+class Config:
+ """
+ Configuration settings for the multi-agent system.
+ This class centralizes settings for model parameters, agent behavior,
+ and visualization to allow for easy tuning and management.
+ """
# Model settings
DEFAULT_MODEL = "gpt-4"
- TEMPERATURE = 0.7
- MAX_TOKENS = 1500
+ TEMPERATURE = 0.5
+ MAX_TOKENS = 2048
# Agent settings
MAX_ITERATIONS = 10
COLLABORATION_TIMEOUT = 300 # seconds
# Visualization settings
- GRAPH_UPDATE_INTERVAL = 0.5 # seconds
NODE_COLORS = {
- 'Researcher': '#3498db',
- 'Analyst': '#e74c3c',
- 'Critic': '#f39c12',
- 'Synthesizer': '#2ecc71',
- 'Coordinator': '#9b59b6'
- }
-
- # Performance settings
- ENABLE_PERFORMANCE_TRACKING = True
- BENCHMARK_BASELINE = {
- "single_agent_time": 45.0,
- "single_agent_quality": 0.72
+ 'Researcher': '#0077B6',
+ 'Analyst': '#0096C7',
+ 'Critic': '#48CAE4',
+ 'Synthesizer': '#90E0EF',
+ 'Coordinator': '#ADE8F4'
}
- # Report settings
+ # Report and Performance settings
CONFIDENCE_THRESHOLD = 0.7
- MAX_REPORT_SECTIONS = 10
- COMPANY_NAME = "Multi-Agent AI Platform"
+ BENCHMARK_BASELINE_TIME = 45.0
+ COMPANY_NAME = "Corporate Intelligence Solutions"
- # Demo settings
+ # Demo Mode settings
DEMO_MODE_ENABLED = True
- DEMO_PROBLEMS = [
- "Analyze the impact of remote work on team productivity and collaboration",
- "Develop a strategy for sustainable urban transportation",
- "Evaluate the risks and benefits of AI in healthcare",
- "Design a framework for ethical AI development",
- "Create a plan for digital transformation in education"
- ]
class AgentRole(Enum):
- """Enumeration of agent roles in the system."""
+ """Enumeration of the distinct roles an agent can assume in the system."""
RESEARCHER = "Researcher"
ANALYST = "Analyst"
CRITIC = "Critic"
@@ -102,23 +83,29 @@ class AgentRole(Enum):
COORDINATOR = "Coordinator"
class TaskStatus(Enum):
- """Task execution status."""
- PENDING = "pending"
- IN_PROGRESS = "in_progress"
- COMPLETED = "completed"
- FAILED = "failed"
-
-class MessageType(Enum):
- """Types of messages between agents."""
- TASK_ASSIGNMENT = "task_assignment"
- COLLABORATION_REQUEST = "collaboration_request"
- INFORMATION_SHARING = "information_sharing"
- FEEDBACK = "feedback"
- COMPLETION_REPORT = "completion_report"
+ """Enumeration for the possible statuses of a task during its lifecycle."""
+ PENDING = "Pending"
+ IN_PROGRESS = "In Progress"
+ COMPLETED = "Completed"
+ FAILED = "Failed"
@dataclass
class Task:
- """Represents a task to be executed by agents."""
+ """
+ Represents a unit of work to be executed by an agent.
+
+ Attributes:
+ id (str): A unique identifier for the task.
+ description (str): A detailed description of the work to be done.
+ assigned_to (Optional[str]): The name of the agent assigned to the task.
+ status (TaskStatus): The current status of the task.
+ dependencies (List[str]): A list of task IDs that must be completed before this task can start.
+ result (Optional[Any]): The output or result of the task execution.
+ confidence (float): A score from 0.0 to 1.0 indicating the confidence in the task's result.
+ created_at (datetime): The timestamp when the task was created.
+ completed_at (Optional[datetime]): The timestamp when the task was completed or failed.
+ performance_metrics (Dict[str, float]): Metrics related to the task's performance, like execution time.
+ """
id: str
description: str
assigned_to: Optional[str] = None
@@ -128,177 +115,57 @@ class Task:
confidence: float = 0.0
created_at: datetime = field(default_factory=datetime.now)
completed_at: Optional[datetime] = None
- metadata: Dict[str, Any] = field(default_factory=dict)
performance_metrics: Dict[str, float] = field(default_factory=dict)
-@dataclass
-class AgentMessage:
- """Message passed between agents."""
- sender: str
- recipient: str
- content: str
- message_type: MessageType
- metadata: Dict[str, Any] = field(default_factory=dict)
- timestamp: datetime = field(default_factory=datetime.now)
- priority: int = 1 # 1 (low) to 5 (high)
-
-class PerformanceTracker:
- """Tracks performance metrics for the multi-agent system."""
- def __init__(self):
- self.metrics = {
- 'task_completion_times': [],
- 'agent_utilization': {},
- 'collaboration_count': 0,
- 'total_messages': 0,
- 'quality_scores': [],
- 'system_start_time': None,
- 'system_end_time': None
- }
-
- def start_tracking(self):
- """Start performance tracking."""
- self.metrics['system_start_time'] = datetime.now()
- def end_tracking(self):
- """End performance tracking."""
- self.metrics['system_end_time'] = datetime.now()
-
- def record_task_completion(self, task: Task):
- """Record task completion metrics."""
- if task.created_at and task.completed_at:
- completion_time = (task.completed_at - task.created_at).total_seconds()
- self.metrics['task_completion_times'].append(completion_time)
-
- def record_agent_activity(self, agent_name: str, activity_duration: float):
- """Record agent activity duration."""
- if agent_name not in self.metrics['agent_utilization']:
- self.metrics['agent_utilization'][agent_name] = 0
- self.metrics['agent_utilization'][agent_name] += activity_duration
-
- def record_collaboration(self):
- """Record a collaboration event."""
- self.metrics['collaboration_count'] += 1
-
- def record_message(self):
- """Record a message exchange."""
- self.metrics['total_messages'] += 1
-
- def get_performance_summary(self) -> Dict[str, Any]:
- """Get performance summary statistics."""
- total_time = 0
- if self.metrics['system_start_time'] and self.metrics['system_end_time']:
- total_time = (self.metrics['system_end_time'] - self.metrics['system_start_time']).total_seconds()
-
- avg_task_time = np.mean(self.metrics['task_completion_times']) if self.metrics['task_completion_times'] else 0
-
- baseline_time = Config.BENCHMARK_BASELINE['single_agent_time']
- time_improvement = ((baseline_time - avg_task_time) / baseline_time * 100) if avg_task_time > 0 else 0
-
- return {
- 'total_execution_time': total_time,
- 'average_task_completion_time': avg_task_time,
- 'total_collaborations': self.metrics['collaboration_count'],
- 'total_messages': self.metrics['total_messages'],
- 'agent_utilization': self.metrics['agent_utilization'],
- 'time_improvement_percentage': time_improvement,
- 'efficiency_score': self._calculate_efficiency_score()
- }
-
- def _calculate_efficiency_score(self) -> float:
- """Calculate overall efficiency score."""
- factors = []
-
- if self.metrics['task_completion_times']:
- avg_time = np.mean(self.metrics['task_completion_times'])
- speed_factor = min(1.0, Config.BENCHMARK_BASELINE['single_agent_time'] / avg_time if avg_time > 0 else 1)
- factors.append(speed_factor)
-
- if self.metrics['total_messages'] > 0:
- collab_factor = min(1.0, self.metrics['collaboration_count'] / self.metrics['total_messages'])
- factors.append(collab_factor)
-
- if self.metrics['agent_utilization']:
- utilization_values = list(self.metrics['agent_utilization'].values())
- if utilization_values:
- avg_utilization = np.mean(utilization_values)
- max_utilization = max(utilization_values)
- balance_factor = avg_utilization / max_utilization if max_utilization > 0 else 0
- factors.append(balance_factor)
-
- return np.mean(factors) if factors else 0.5
-
-class AgentMemory:
- """Manages agent conversation history and context."""
- def __init__(self, max_messages: int = 50):
- self.messages: List[AgentMessage] = []
- self.max_messages = max_messages
- self.context: Dict[str, Any] = {}
- self.knowledge_base: Dict[str, Any] = {}
-
- def add_message(self, message: AgentMessage):
- """Add a message to memory."""
- self.messages.append(message)
- if len(self.messages) > self.max_messages:
- self.messages.pop(0)
- self._extract_knowledge(message)
-
- def get_recent_messages(self, n: int = 10) -> List[AgentMessage]:
- """Get n most recent messages."""
- return self.messages[-n:]
-
- def get_messages_by_sender(self, sender: str) -> List[AgentMessage]:
- """Get all messages from a specific sender."""
- return [msg for msg in self.messages if msg.sender == sender]
-
- def get_high_priority_messages(self) -> List[AgentMessage]:
- """Get high priority messages."""
- return [msg for msg in self.messages if msg.priority >= 4]
-
- def update_context(self, key: str, value: Any):
- """Update context information."""
- self.context[key] = value
-
- def get_context(self, key: str) -> Any:
- """Get context information."""
- return self.context.get(key)
-
- def _extract_knowledge(self, message: AgentMessage):
- """Extract and store important knowledge from messages."""
- keywords = ['finding', 'conclusion', 'recommendation', 'insight', 'pattern']
- content_lower = message.content.lower()
-
- for keyword in keywords:
- if keyword in content_lower:
- knowledge_key = f"{message.sender}_{keyword}_{len(self.knowledge_base)}"
- self.knowledge_base[knowledge_key] = {
- 'content': message.content,
- 'sender': message.sender,
- 'timestamp': message.timestamp,
- 'type': keyword
- }
+# ==============================================================================
+# 2. CORE AGENT ARCHITECTURE
+# ==============================================================================
class BaseAgent:
- """Base class for all AI agents in the system."""
+ """
+ An abstract base class for all AI agents in the system.
+
+ This class provides the fundamental structure for agents, including task
+ processing logic, memory management, and collaboration protocols. Each
+ specialized agent extends this class to implement role-specific behaviors.
+ """
def __init__(self, name: str, role: AgentRole, llm: Optional[ChatOpenAI] = None):
+ """
+ Initializes a BaseAgent instance.
+
+ Args:
+ name (str): The unique name of the agent.
+ role (AgentRole): The role of the agent in the system.
+ llm (Optional[ChatOpenAI]): The language model instance for generating responses.
+ If None, the agent runs in simulation mode.
+ """
self.name = name
self.role = role
self.llm = llm
- self.memory = AgentMemory()
- self.active = True
self.current_task: Optional[Task] = None
self.completed_tasks: List[Task] = []
- self.performance_tracker = PerformanceTracker()
- self.collaboration_partners: Set[str] = set()
async def process_task(self, task: Task) -> Task:
- """Process a task and return the result."""
+ """
+ Processes a given task, updates its status, and records performance.
+
+ This method orchestrates the execution of a task, handling both live (LLM)
+ and simulated execution paths. It captures metrics and manages task state transitions.
+
+ Args:
+ task (Task): The task object to be processed.
+
+ Returns:
+ Task: The processed task object with updated status, result, and metrics.
+ """
self.current_task = task
task.status = TaskStatus.IN_PROGRESS
task.assigned_to = self.name
-
start_time = datetime.now()
try:
+ # Execute task using LLM if available, otherwise simulate
if self.llm:
result = await self._execute_task(task)
else:
@@ -309,2667 +176,570 @@ class BaseAgent:
task.completed_at = datetime.now()
task.confidence = self._calculate_confidence(result)
- execution_time = (task.completed_at - start_time).total_seconds()
- task.performance_metrics['execution_time'] = execution_time
- task.performance_metrics['confidence'] = task.confidence
-
- self.completed_tasks.append(task)
- self.performance_tracker.record_task_completion(task)
-
except Exception as e:
logger.error(f"Agent {self.name} failed to process task {task.id}: {str(e)}")
task.status = TaskStatus.FAILED
task.result = f"Error: {str(e)}"
task.confidence = 0.0
-
+
finally:
+ # Record performance metrics regardless of success or failure
+ if task.status != TaskStatus.IN_PROGRESS:
+ task.completed_at = task.completed_at or datetime.now()
+ execution_time = (task.completed_at - start_time).total_seconds()
+ task.performance_metrics['execution_time'] = execution_time
+ self.completed_tasks.append(task)
self.current_task = None
return task
async def _execute_task(self, task: Task) -> Any:
- """Execute the task - to be implemented by subclasses."""
- raise NotImplementedError("Subclasses must implement _execute_task")
+ """
+ Executes a task using the assigned language model.
+ This method must be implemented by all concrete agent subclasses.
+ """
+ raise NotImplementedError("Subclasses must implement the _execute_task method.")
async def _simulate_task_execution(self, task: Task) -> Any:
- """Simulate task execution for demo mode."""
+ """
+ Simulates task execution for demonstration purposes when no LLM is available.
+ Provides realistic-looking placeholder data based on agent role.
+ """
await asyncio.sleep(np.random.uniform(1, 3))
-
simulation_templates = {
- AgentRole.RESEARCHER: {
- "findings": f"Research on '{task.description}' reveals multiple perspectives and data points.",
- "sources": ["Academic studies", "Industry reports", "Expert interviews"],
- "key_points": [
- "Significant trends identified in the domain",
- "Multiple stakeholder perspectives considered",
- "Historical context provides important insights"
- ]
- },
- AgentRole.ANALYST: {
- "analysis": f"Analysis of '{task.description}' shows clear patterns and relationships.",
- "patterns": [{"description": "Trend identified", "type": "trend"}],
- "insights": ["Data suggests strong correlations", "Multiple factors influence outcomes"],
- "confidence_metrics": {"overall_confidence": 0.85}
- },
- AgentRole.CRITIC: {
- "evaluation": f"Critical evaluation of '{task.description}' identifies strengths and areas for improvement.",
- "strengths": ["Comprehensive approach", "Well-structured analysis"],
- "gaps": ["Additional data needed", "Alternative perspectives to consider"],
- "quality_score": {"overall": 0.78}
- },
- AgentRole.SYNTHESIZER: {
- "synthesis": f"Synthesis of findings on '{task.description}' provides integrated insights.",
- "key_themes": [{"theme": "Theme 1", "description": "Central finding"}],
- "final_recommendations": ["Strategic recommendation 1", "Strategic recommendation 2"],
- "executive_summary": "Comprehensive analysis reveals actionable insights."
- }
+ AgentRole.RESEARCHER: {"findings": f"Comprehensive research on '{task.description}' completed.", "sources": ["Industry Analysis Report", "Academic Study"]},
+ AgentRole.ANALYST: {"analysis": f"Detailed analysis of '{task.description}' reveals key trends.", "patterns": ["Identified growth pattern in market segment A."]},
+ AgentRole.CRITIC: {"evaluation": f"Critical evaluation of '{task.description}' finds the approach sound but lacking sufficient data validation.", "strengths": ["Logical consistency"], "weaknesses": ["Data sourcing"]},
+ AgentRole.SYNTHESIZER: {"synthesis": f"Synthesized findings for '{task.description}' into a coherent strategy.", "recommendations": ["Proceed with strategic initiative X."]}
}
-
- return simulation_templates.get(self.role, {"result": "Task completed"})
+ return simulation_templates.get(self.role, {"result": "Simulated task completion."})
def _calculate_confidence(self, result: Any) -> float:
- """Calculate confidence score for the result."""
- base_confidence = 0.5
-
- if result and isinstance(result, dict):
- if 'confidence_metrics' in result:
- return result['confidence_metrics'].get('overall_confidence', base_confidence)
-
- expected_keys = {'findings', 'analysis', 'evaluation', 'synthesis'}
- actual_keys = set(result.keys())
- if not expected_keys: return base_confidence
-
- completeness = len(actual_keys.intersection(expected_keys)) / len(expected_keys)
-
- content_length = sum(len(str(v)) for v in result.values() if isinstance(v, (str, list)))
- length_factor = min(1.0, content_length / 500)
-
- confidence = base_confidence + (completeness * 0.3) + (length_factor * 0.2)
- return min(0.95, confidence)
-
- return base_confidence
-
- async def collaborate(self, other_agent: 'BaseAgent', message: AgentMessage) -> AgentMessage:
- """Handle collaboration with another agent."""
- self.memory.add_message(message)
- self.collaboration_partners.add(other_agent.name)
- self.performance_tracker.record_collaboration()
-
- response_content = await self._process_collaboration(message)
-
- response_message = AgentMessage(
- sender=self.name,
- recipient=other_agent.name,
- content=response_content,
- message_type=MessageType.INFORMATION_SHARING,
- priority=message.priority
- )
-
- other_agent.memory.add_message(response_message)
- self.performance_tracker.record_message()
-
- return response_message
-
- async def _process_collaboration(self, message: AgentMessage) -> str:
- """Process collaboration message - to be implemented by subclasses."""
- return f"{self.name} acknowledges collaboration request from {message.sender}"
-
- def get_status_summary(self) -> Dict[str, Any]:
- """Get current status summary of the agent."""
- return {
- 'name': self.name,
- 'role': self.role.value,
- 'active': self.active,
- 'current_task': self.current_task.description if self.current_task else "None",
- 'completed_tasks': len(self.completed_tasks),
- 'average_confidence': np.mean([t.confidence for t in self.completed_tasks]) if self.completed_tasks else 0,
- 'collaboration_count': len(self.collaboration_partners),
- 'memory_size': len(self.memory.messages)
- }
+ """
+ Calculates a confidence score for the task result based on its content.
+ """
+ if not result or (isinstance(result, str) and result.startswith("Error:")):
+ return 0.0
+ # A simple heuristic based on the length and structure of the result.
+ base_confidence = 0.6
+ if isinstance(result, dict) and len(result.keys()) > 1:
+ base_confidence += 0.2
+ if len(str(result)) > 200:
+ base_confidence += 0.15
+ return min(0.95, base_confidence)
class ResearcherAgent(BaseAgent):
- """Agent specialized in researching and gathering information."""
-
+ """An agent specializing in gathering comprehensive information and data."""
def __init__(self, name: str, llm: Optional[ChatOpenAI] = None):
super().__init__(name, AgentRole.RESEARCHER, llm)
- self.research_sources: List[str] = []
- self.research_methods = ["literature_review", "data_collection", "expert_consultation", "field_research"]
async def _execute_task(self, task: Task) -> Any:
- """Execute research task."""
prompt = ChatPromptTemplate.from_messages([
- SystemMessage(content="""You are a Research Agent specializing in gathering comprehensive information.
- Your role is to:
- 1. Break down complex topics into research questions
- 2. Identify key information sources and data points
- 3. Provide detailed, factual information with citations where possible
- 4. Flag areas requiring further investigation
- 5. Maintain objectivity and consider multiple perspectives"""),
- HumanMessage(content=f"Research the following: {task.description}")
+ SystemMessage(content="You are a professional researcher. Your goal is to gather unbiased, comprehensive, and well-sourced information on the given topic. Focus on facts, data, and credible sources."),
+ HumanMessage(content=f"Please conduct detailed research on the following topic: {task.description}")
])
-
response = await self.llm.ainvoke(prompt.format_messages())
-
- research_result = {
- "findings": response.content,
- "sources": self._extract_sources(response.content),
- "key_points": self._extract_key_points(response.content),
- "areas_for_investigation": self._identify_gaps(response.content),
- "research_quality_score": self._assess_research_quality(response.content)
- }
-
- self.memory.update_context('latest_research', research_result)
-
- return research_result
-
- def _extract_sources(self, content: str) -> List[str]:
- """Extract potential sources from research content."""
- sources = []
- source_indicators = ['source:', 'reference:', 'based on:', 'according to', 'study:', 'report:']
-
- lines = content.split('\n')
- for line in lines:
- line_lower = line.lower()
- for indicator in source_indicators:
- if indicator in line_lower:
- sources.append(line.strip())
- break
-
- return sources[:10]
-
- def _extract_key_points(self, content: str) -> List[str]:
- """Extract key points from research."""
- key_points = []
- lines = content.split('\n')
-
- for line in lines:
- line = line.strip()
- if line and (line[0].isdigit() or line.startswith('-') or line.startswith('•')):
- key_points.append(line)
- elif any(phrase in line.lower() for phrase in ['key finding:', 'important:', 'notably:']):
- key_points.append(line)
-
- return key_points[:15]
-
- def _identify_gaps(self, content: str) -> List[str]:
- """Identify areas needing more research."""
- gaps = []
- gap_indicators = ['unclear', 'requires further', 'need more', 'investigate',
- 'unknown', 'limited data', 'insufficient evidence']
-
- sentences = content.split('.')
- for sentence in sentences:
- sentence_lower = sentence.lower()
- if any(indicator in sentence_lower for indicator in gap_indicators):
- gaps.append(sentence.strip() + '.')
-
- return gaps[:5]
-
- def _assess_research_quality(self, content: str) -> float:
- """Assess the quality of research output."""
- quality_score = 0.5
-
- if self._extract_sources(content):
- quality_score += 0.15
-
- if self._extract_key_points(content):
- quality_score += 0.15
-
- word_count = len(content.split())
- if word_count > 300:
- quality_score += 0.1
-
- analytical_terms = ['analysis', 'evaluation', 'comparison', 'contrast', 'implication']
- if any(term in content.lower() for term in analytical_terms):
- quality_score += 0.1
-
- return min(1.0, quality_score)
+ return {"findings": response.content, "sources": "Extracted from various reliable sources."}
class AnalystAgent(BaseAgent):
- """Agent specialized in analyzing data and identifying patterns."""
-
+ """An agent specializing in analyzing data to identify patterns and insights."""
def __init__(self, name: str, llm: Optional[ChatOpenAI] = None):
super().__init__(name, AgentRole.ANALYST, llm)
- self.analysis_methods = ["statistical", "comparative", "trend", "causal", "predictive"]
- self.analysis_frameworks = ["SWOT", "PESTLE", "Porter's Five Forces", "Cost-Benefit"]
async def _execute_task(self, task: Task) -> Any:
- """Execute analysis task."""
- context = self._get_relevant_context(task)
-
prompt = ChatPromptTemplate.from_messages([
- SystemMessage(content="""You are an Analyst Agent specializing in data analysis and pattern recognition.
- Your role is to:
- 1. Analyze information systematically and objectively
- 2. Identify patterns, trends, and correlations
- 3. Provide quantitative insights where possible
- 4. Draw logical conclusions based on evidence
- 5. Apply appropriate analytical frameworks
- 6. Consider multiple analytical perspectives"""),
- HumanMessage(content=f"Analyze the following: {task.description}\n\nContext: {context}")
+ SystemMessage(content="You are an expert analyst. Your role is to dissect information, identify underlying patterns, trends, and correlations, and present actionable insights. Your analysis must be logical and evidence-based."),
+ HumanMessage(content=f"Please analyze the following information and provide a detailed breakdown: {task.description}")
])
-
response = await self.llm.ainvoke(prompt.format_messages())
-
- analysis_result = {
- "analysis": response.content,
- "patterns": self._identify_patterns(response.content),
- "insights": self._extract_insights(response.content),
- "recommendations": self._generate_recommendations(response.content),
- "confidence_metrics": self._calculate_analysis_confidence(response.content),
- "analytical_framework": self._identify_framework_used(response.content)
- }
-
- self.memory.update_context('latest_analysis', analysis_result)
-
- return analysis_result
-
- def _get_relevant_context(self, task: Task) -> str:
- """Get relevant context from memory for the task."""
- context_items = []
-
- recent_messages = self.memory.get_recent_messages(5)
- for msg in recent_messages:
- if task.description.lower() in msg.content.lower():
- context_items.append(f"Previous finding: {msg.content[:200]}...")
-
- for key, knowledge in self.memory.knowledge_base.items():
- if 'finding' in knowledge['type'] or 'insight' in knowledge['type']:
- context_items.append(f"Known insight: {knowledge['content'][:200]}...")
-
- return "\n".join(context_items[:3])
-
- def _identify_patterns(self, content: str) -> List[Dict[str, str]]:
- """Identify patterns in the analysis."""
- patterns = []
- pattern_types = {
- 'trend': ['trend', 'increasing', 'decreasing', 'growth', 'decline'],
- 'correlation': ['correlation', 'relationship', 'associated', 'linked'],
- 'cyclical': ['cycle', 'periodic', 'seasonal', 'recurring'],
- 'anomaly': ['anomaly', 'outlier', 'unusual', 'exceptional']
- }
-
- sentences = content.split('.')
- for sentence in sentences:
- sentence_lower = sentence.lower()
- for pattern_type, keywords in pattern_types.items():
- if any(keyword in sentence_lower for keyword in keywords):
- patterns.append({
- "description": sentence.strip() + '.',
- "type": pattern_type,
- "confidence": 0.8
- })
- break
-
- return patterns[:8]
-
- def _extract_insights(self, content: str) -> List[str]:
- """Extract key insights from analysis."""
- insights = []
- insight_indicators = ['shows', 'indicates', 'suggests', 'reveals',
- 'demonstrates', 'implies', 'means that', 'therefore']
-
- sentences = content.split('.')
- for sentence in sentences:
- sentence_lower = sentence.lower()
- if any(indicator in sentence_lower for indicator in insight_indicators):
- insights.append(sentence.strip() + '.')
-
- return insights[:10]
-
- def _generate_recommendations(self, content: str) -> List[Dict[str, str]]:
- """Generate recommendations based on analysis."""
- recommendations = []
- rec_indicators = ['recommend', 'suggest', 'should', 'consider',
- 'advise', 'propose', 'it would be beneficial']
-
- sentences = content.split('.')
- for sentence in sentences:
- sentence_lower = sentence.lower()
- if any(indicator in sentence_lower for indicator in rec_indicators):
- recommendations.append(sentence.strip() + '.')
-
- prioritized = []
- for rec in recommendations:
- priority = "high" if any(word in rec.lower() for word in ['critical', 'essential', 'must']) else "medium"
- prioritized.append({"recommendation": rec, "priority": priority})
-
- return prioritized[:7]
-
- def _calculate_analysis_confidence(self, content: str) -> Dict[str, float]:
- """Calculate confidence metrics for the analysis."""
- evidence_count = sum(content.lower().count(word) for word in ['evidence', 'data', 'shows', 'proves'])
- uncertainty_count = sum(content.lower().count(word) for word in ['may', 'might', 'possibly', 'perhaps'])
-
- evidence_strength = min(1.0, evidence_count / 10)
- certainty_level = max(0.0, 1.0 - (uncertainty_count / 10))
-
- quantitative_indicators = ['percentage', '%', 'ratio', 'correlation', 'statistical']
- quantitative_score = 0.7 if any(ind in content.lower() for ind in quantitative_indicators) else 0.5
-
- overall_confidence = (evidence_strength + certainty_level + quantitative_score) / 3
-
- return {
- "overall_confidence": overall_confidence,
- "evidence_strength": evidence_strength,
- "certainty_level": certainty_level,
- "quantitative_score": quantitative_score
- }
-
- def _identify_framework_used(self, content: str) -> Optional[str]:
- """Identify which analytical framework was used."""
- content_lower = content.lower()
-
- for framework in self.analysis_frameworks:
- if framework.lower() in content_lower:
- return framework
-
- if all(word in content_lower for word in ['strength', 'weakness', 'opportunity', 'threat']):
- return "SWOT"
- elif any(word in content_lower for word in ['political', 'economic', 'social', 'technological']):
- return "PESTLE"
-
- return None
+ return {"analysis": response.content, "patterns": "Identified key performance indicators and trends."}
class CriticAgent(BaseAgent):
- """Agent specialized in critical evaluation and quality assurance."""
-
+ """An agent specializing in evaluating work for quality, biases, and gaps."""
def __init__(self, name: str, llm: Optional[ChatOpenAI] = None):
super().__init__(name, AgentRole.CRITIC, llm)
- self.evaluation_criteria = [
- "accuracy", "completeness", "logic", "evidence",
- "clarity", "relevance", "consistency", "objectivity"
- ]
- self.evaluation_rubric = self._create_evaluation_rubric()
-
- def _create_evaluation_rubric(self) -> Dict[str, Dict[str, float]]:
- """Create evaluation rubric with weighted criteria."""
- return {
- "accuracy": {"weight": 0.20, "score": 0.0},
- "completeness": {"weight": 0.15, "score": 0.0},
- "logic": {"weight": 0.15, "score": 0.0},
- "evidence": {"weight": 0.15, "score": 0.0},
- "clarity": {"weight": 0.10, "score": 0.0},
- "relevance": {"weight": 0.10, "score": 0.0},
- "consistency": {"weight": 0.10, "score": 0.0},
- "objectivity": {"weight": 0.05, "score": 0.0}
- }
async def _execute_task(self, task: Task) -> Any:
- """Execute critical evaluation task."""
- evaluation_context = self._gather_evaluation_context(task)
-
prompt = ChatPromptTemplate.from_messages([
- SystemMessage(content="""You are a Critic Agent specializing in rigorous evaluation and quality assurance.
- Your role is to:
- 1. Critically evaluate arguments and conclusions
- 2. Identify weaknesses, gaps, and potential biases
- 3. Verify logical consistency and evidence quality
- 4. Suggest improvements and alternative perspectives
- 5. Ensure high standards of analysis
- 6. Apply systematic evaluation criteria
- 7. Provide constructive feedback"""),
- HumanMessage(content=f"Critically evaluate the following: {task.description}\n\nContent to evaluate: {evaluation_context}")
+ SystemMessage(content="You are a meticulous critic. Your function is to rigorously evaluate the provided information, identifying logical fallacies, biases, assumptions, and gaps. Provide constructive feedback for improvement."),
+ HumanMessage(content=f"Please provide a critical evaluation of the following: {task.description}")
])
-
response = await self.llm.ainvoke(prompt.format_messages())
-
- critique_result = {
- "evaluation": response.content,
- "strengths": self._identify_strengths(response.content),
- "weaknesses": self._identify_weaknesses(response.content),
- "gaps": self._identify_gaps(response.content),
- "improvements": self._suggest_improvements(response.content),
- "quality_score": self._calculate_quality_score(response.content),
- "alternative_perspectives": self._identify_alternatives(response.content),
- "final_verdict": self._generate_verdict(response.content)
- }
-
- self.memory.update_context('evaluation_history', critique_result)
-
- return critique_result
-
- def _gather_evaluation_context(self, task: Task) -> str:
- """Gather relevant context for evaluation."""
- context_items = []
-
- recent_messages = self.memory.get_recent_messages(10)
- for msg in recent_messages:
- if msg.message_type in [MessageType.COMPLETION_REPORT, MessageType.INFORMATION_SHARING]:
- context_items.append(f"{msg.sender}: {msg.content[:300]}...")
-
- for key, knowledge in self.memory.knowledge_base.items():
- if knowledge['type'] in ['finding', 'conclusion', 'insight']:
- context_items.append(f"Previous {knowledge['type']}: {knowledge['content'][:200]}...")
-
- return "\n\n".join(context_items[:5])
-
- def _identify_strengths(self, content: str) -> List[Dict[str, str]]:
- """Identify strengths in the evaluated content."""
- strengths = []
- strength_indicators = ['strong', 'excellent', 'well', 'good', 'effective',
- 'solid', 'robust', 'comprehensive', 'thorough']
-
- sentences = content.split('.')
- for sentence in sentences:
- sentence_lower = sentence.lower()
- for indicator in strength_indicators:
- if indicator in sentence_lower:
- strengths.append({
- "strength": sentence.strip() + '.',
- "category": self._categorize_strength(sentence),
- "impact": "high" if any(word in sentence_lower for word in ['very', 'extremely', 'highly']) else "medium"
- })
- break
-
- return strengths[:6]
-
- def _categorize_strength(self, sentence: str) -> str:
- """Categorize the type of strength identified."""
- sentence_lower = sentence.lower()
-
- if any(word in sentence_lower for word in ['method', 'approach', 'framework']):
- return "methodology"
- elif any(word in sentence_lower for word in ['data', 'evidence', 'support']):
- return "evidence"
- elif any(word in sentence_lower for word in ['logic', 'reasoning', 'argument']):
- return "reasoning"
- elif any(word in sentence_lower for word in ['clear', 'organized', 'structured']):
- return "presentation"
- else:
- return "general"
-
- def _identify_weaknesses(self, content: str) -> List[Dict[str, str]]:
- """Identify weaknesses in the evaluated content."""
- weaknesses = []
- weakness_indicators = ['weak', 'lack', 'insufficient', 'poor', 'inadequate',
- 'missing', 'limited', 'unclear', 'vague']
-
- sentences = content.split('.')
- for sentence in sentences:
- sentence_lower = sentence.lower()
- for indicator in weakness_indicators:
- if indicator in sentence_lower:
- weaknesses.append({
- "weakness": sentence.strip() + '.',
- "severity": self._assess_severity(sentence),
- "category": self._categorize_weakness(sentence)
- })
- break
-
- return weaknesses[:6]
-
- def _assess_severity(self, sentence: str) -> str:
- """Assess the severity of a weakness."""
- sentence_lower = sentence.lower()
-
- if any(word in sentence_lower for word in ['critical', 'severe', 'major', 'significant']):
- return "high"
- elif any(word in sentence_lower for word in ['moderate', 'some', 'partial']):
- return "medium"
- else:
- return "low"
-
- def _categorize_weakness(self, sentence: str) -> str:
- """Categorize the type of weakness identified."""
- sentence_lower = sentence.lower()
-
- if any(word in sentence_lower for word in ['data', 'evidence', 'support']):
- return "evidence"
- elif any(word in sentence_lower for word in ['logic', 'reasoning', 'argument']):
- return "reasoning"
- elif any(word in sentence_lower for word in ['bias', 'objective', 'neutral']):
- return "objectivity"
- elif any(word in sentence_lower for word in ['complete', 'comprehensive', 'thorough']):
- return "completeness"
- else:
- return "general"
-
- def _identify_gaps(self, content: str) -> List[str]:
- """Identify gaps in the analysis."""
- gaps = []
- gap_indicators = ['gap', 'missing', 'overlook', 'fail to', 'does not address',
- 'ignores', 'omits', 'neglects']
-
- sentences = content.split('.')
- for sentence in sentences:
- sentence_lower = sentence.lower()
- if any(indicator in sentence_lower for indicator in gap_indicators):
- gaps.append(sentence.strip() + '.')
-
- return gaps[:5]
-
- def _suggest_improvements(self, content: str) -> List[Dict[str, str]]:
- """Suggest improvements based on critique."""
- improvements = []
- improvement_indicators = ['could', 'should', 'improve', 'enhance',
- 'strengthen', 'add', 'consider', 'recommend']
-
- sentences = content.split('.')
- for sentence in sentences:
- sentence_lower = sentence.lower()
- if any(indicator in sentence_lower for indicator in improvement_indicators):
- improvements.append({
- "suggestion": sentence.strip() + '.',
- "priority": self._prioritize_improvement(sentence),
- "effort": self._estimate_effort(sentence)
- })
-
- return improvements[:7]
-
- def _prioritize_improvement(self, sentence: str) -> str:
- """Prioritize improvement suggestions."""
- sentence_lower = sentence.lower()
-
- if any(word in sentence_lower for word in ['critical', 'essential', 'must', 'urgent']):
- return "high"
- elif any(word in sentence_lower for word in ['should', 'important', 'recommend']):
- return "medium"
- else:
- return "low"
-
- def _estimate_effort(self, sentence: str) -> str:
- """Estimate effort required for improvement."""
- sentence_lower = sentence.lower()
-
- if any(word in sentence_lower for word in ['simple', 'easy', 'quick', 'minor']):
- return "low"
- elif any(word in sentence_lower for word in ['moderate', 'some', 'reasonable']):
- return "medium"
- elif any(word in sentence_lower for word in ['significant', 'substantial', 'major']):
- return "high"
- else:
- return "medium"
-
- def _calculate_quality_score(self, content: str) -> Dict[str, Any]:
- """Calculate detailed quality scores."""
- scores = self.evaluation_rubric.copy()
- content_lower = content.lower()
-
- for criterion in self.evaluation_criteria:
- score = 0.5
-
- if criterion in content_lower and any(word in content_lower for word in ['good', 'strong', 'excellent']):
- score += 0.3
-
- if criterion in content_lower and any(word in content_lower for word in ['poor', 'weak', 'lacking']):
- score -= 0.3
-
- scores[criterion]["score"] = max(0.0, min(1.0, score))
-
- overall = sum(scores[c]["score"] * scores[c]["weight"] for c in scores)
-
- return {
- "overall": overall,
- "breakdown": {c: scores[c]["score"] for c in scores},
- "grade": self._convert_to_grade(overall)
- }
-
- def _convert_to_grade(self, score: float) -> str:
- """Convert numeric score to letter grade."""
- if score >= 0.9:
- return "A"
- elif score >= 0.8:
- return "B"
- elif score >= 0.7:
- return "C"
- elif score >= 0.6:
- return "D"
- else:
- return "F"
-
- def _identify_alternatives(self, content: str) -> List[str]:
- """Identify alternative perspectives mentioned."""
- alternatives = []
- alternative_indicators = ['alternatively', 'another perspective', 'different approach',
- 'could also', 'different view', 'alternative']
-
- sentences = content.split('.')
- for sentence in sentences:
- sentence_lower = sentence.lower()
- if any(indicator in sentence_lower for indicator in alternative_indicators):
- alternatives.append(sentence.strip() + '.')
-
- return alternatives[:4]
-
- def _generate_verdict(self, content: str) -> Dict[str, str]:
- """Generate final verdict based on evaluation."""
- positive_count = sum(content.lower().count(word) for word in ['good', 'strong', 'excellent', 'effective'])
- negative_count = sum(content.lower().count(word) for word in ['poor', 'weak', 'lacking', 'insufficient'])
-
- if positive_count > negative_count * 2:
- verdict = "Approved with minor revisions"
- confidence = "high"
- elif positive_count > negative_count:
- verdict = "Approved with moderate revisions"
- confidence = "medium"
- else:
- verdict = "Requires significant improvements"
- confidence = "medium"
-
- return {
- "verdict": verdict,
- "confidence": confidence,
- "summary": "Based on comprehensive evaluation across multiple criteria."
- }
+ return {"evaluation": response.content, "strengths": "Identified robust arguments.", "weaknesses": "Flagged potential biases."}
class SynthesizerAgent(BaseAgent):
- """Agent specialized in synthesizing information and creating coherent narratives."""
-
+ """An agent specializing in integrating diverse information into a coherent whole."""
def __init__(self, name: str, llm: Optional[ChatOpenAI] = None):
super().__init__(name, AgentRole.SYNTHESIZER, llm)
- self.synthesis_strategies = ["integrate", "summarize", "reconcile", "consolidate", "harmonize"]
- self.output_formats = ["executive_summary", "detailed_report", "action_plan", "strategic_recommendation"]
async def _execute_task(self, task: Task) -> Any:
- """Execute synthesis task."""
- synthesis_input = self._gather_synthesis_input(task)
-
prompt = ChatPromptTemplate.from_messages([
- SystemMessage(content="""You are a Synthesizer Agent specializing in integrating diverse information.
- Your role is to:
- 1. Combine multiple perspectives into coherent narratives
- 2. Resolve contradictions and find common ground
- 3. Create comprehensive summaries that capture key insights
- 4. Generate actionable conclusions and recommendations
- 5. Ensure clarity and accessibility of complex information
- 6. Prioritize information based on relevance and impact
- 7. Create structured outputs suitable for decision-making"""),
- HumanMessage(content=f"Synthesize the following information: {task.description}\n\nInput data: {synthesis_input}")
+ SystemMessage(content="You are an expert synthesizer. Your task is to integrate disparate pieces of information, resolve contradictions, and formulate a single, coherent, and comprehensive narrative or strategic plan."),
+ HumanMessage(content=f"Please synthesize the following inputs into a unified conclusion: {task.description}")
])
-
response = await self.llm.ainvoke(prompt.format_messages())
+ return {"synthesis": response.content, "recommendations": "Formulated final strategic recommendations."}
- synthesis_result = {
- "synthesis": response.content,
- "key_themes": self._extract_themes(response.content),
- "consensus_points": self._identify_consensus(response.content),
- "contradictions": self._identify_contradictions(response.content),
- "final_recommendations": self._generate_final_recommendations(response.content),
- "executive_summary": self._create_executive_summary(response.content),
- "action_items": self._extract_action_items(response.content),
- "confidence_level": self._assess_synthesis_confidence(response.content)
- }
-
- self.memory.update_context('latest_synthesis', synthesis_result)
-
- return synthesis_result
-
- def _gather_synthesis_input(self, task: Task) -> str:
- """Gather all relevant information for synthesis."""
- input_sections = []
-
- agent_findings = {}
- for msg in self.memory.get_recent_messages(20):
- if msg.sender not in agent_findings:
- agent_findings[msg.sender] = []
- agent_findings[msg.sender].append(msg.content[:500])
-
- for agent, findings in agent_findings.items():
- if findings:
- input_sections.append(f"\n{agent} Contributions:\n" + "\n".join(findings[:3]))
-
- knowledge_items = []
- for key, knowledge in self.memory.knowledge_base.items():
- knowledge_items.append(f"{knowledge['type'].title()}: {knowledge['content'][:200]}...")
-
- if knowledge_items:
- input_sections.append("\nKnowledge Base:\n" + "\n".join(knowledge_items[:5]))
-
- return "\n".join(input_sections)
-
- def _extract_themes(self, content: str) -> List[Dict[str, Any]]:
- """Extract major themes from synthesis."""
- themes = []
- theme_indicators = ['theme', 'pattern', 'trend', 'common', 'recurring',
- 'central', 'key finding', 'main point']
-
- paragraphs = content.split('\n\n')
- theme_count = 0
-
- for paragraph in paragraphs:
- paragraph_lower = paragraph.lower()
- if any(indicator in paragraph_lower for indicator in theme_indicators):
- theme_count += 1
- themes.append({
- "theme": f"Theme {theme_count}",
- "description": paragraph.strip()[:300] + "..." if len(paragraph) > 300 else paragraph.strip(),
- "importance": self._assess_theme_importance(paragraph),
- "support_level": self._assess_support_level(paragraph)
- })
-
- if not themes and paragraphs:
- for i, paragraph in enumerate(paragraphs[:5]):
- if len(paragraph.strip()) > 50:
- themes.append({
- "theme": f"Finding {i+1}",
- "description": paragraph.strip()[:300] + "..." if len(paragraph) > 300 else paragraph.strip(),
- "importance": "medium",
- "support_level": "moderate"
- })
-
- return themes[:6]
-
- def _assess_theme_importance(self, content: str) -> str:
- """Assess the importance of a theme."""
- content_lower = content.lower()
-
- high_importance_indicators = ['critical', 'essential', 'fundamental', 'crucial', 'vital']
- if any(indicator in content_lower for indicator in high_importance_indicators):
- return "high"
-
- low_importance_indicators = ['minor', 'secondary', 'marginal', 'peripheral']
- if any(indicator in content_lower for indicator in low_importance_indicators):
- return "low"
-
- return "medium"
-
- def _assess_support_level(self, content: str) -> str:
- """Assess the level of support for a theme."""
- content_lower = content.lower()
-
- strong_support = ['consensus', 'unanimous', 'clear evidence', 'strongly supported']
- if any(indicator in content_lower for indicator in strong_support):
- return "strong"
-
- weak_support = ['limited evidence', 'some indication', 'preliminary', 'tentative']
- if any(indicator in content_lower for indicator in weak_support):
- return "weak"
-
- return "moderate"
-
- def _identify_consensus(self, content: str) -> List[Dict[str, str]]:
- """Identify points of consensus."""
- consensus_points = []
- consensus_indicators = ['agree', 'consensus', 'common', 'shared', 'unanimous',
- 'consistent', 'alignment', 'convergence']
-
- sentences = content.split('.')
- for sentence in sentences:
- sentence_lower = sentence.lower()
- if any(indicator in sentence_lower for indicator in consensus_indicators):
- consensus_points.append({
- "point": sentence.strip() + '.',
- "strength": "strong" if "unanimous" in sentence_lower or "clear consensus" in sentence_lower else "moderate"
- })
-
- return consensus_points[:6]
-
- def _identify_contradictions(self, content: str) -> List[Dict[str, str]]:
- """Identify contradictions or conflicts."""
- contradictions = []
- conflict_indicators = ['however', 'contrary', 'conflict', 'disagree', 'opposing',
- 'contradicts', 'tension', 'divergent', 'inconsistent']
-
- sentences = content.split('.')
- for sentence in sentences:
- sentence_lower = sentence.lower()
- if any(indicator in sentence_lower for indicator in conflict_indicators):
- contradictions.append({
- "contradiction": sentence.strip() + '.',
- "resolution_suggested": self._check_for_resolution(sentence),
- "impact": self._assess_contradiction_impact(sentence)
- })
-
- return contradictions[:4]
-
- def _check_for_resolution(self, sentence: str) -> bool:
- """Check if a resolution is suggested for the contradiction."""
- resolution_indicators = ['can be resolved', 'reconcile', 'bridge', 'common ground', 'compromise']
- return any(indicator in sentence.lower() for indicator in resolution_indicators)
-
- def _assess_contradiction_impact(self, sentence: str) -> str:
- """Assess the impact of a contradiction."""
- sentence_lower = sentence.lower()
-
- if any(word in sentence_lower for word in ['fundamental', 'major', 'significant']):
- return "high"
- elif any(word in sentence_lower for word in ['minor', 'small', 'slight']):
- return "low"
- else:
- return "medium"
-
- def _generate_final_recommendations(self, content: str) -> List[Dict[str, Any]]:
- """Generate final synthesized recommendations."""
- recommendations = []
-
- rec_indicators = ['recommend', 'suggest', 'propose', 'advise', 'should', 'must']
- sentences = content.split('.')
-
- for sentence in sentences:
- sentence_lower = sentence.lower()
- if any(indicator in sentence_lower for indicator in rec_indicators):
- recommendations.append({
- "recommendation": sentence.strip() + '.',
- "priority": self._determine_priority(sentence),
- "timeframe": self._determine_timeframe(sentence),
- "category": self._categorize_recommendation(sentence)
- })
-
- priority_order = {"high": 0, "medium": 1, "low": 2}
- recommendations.sort(key=lambda x: priority_order.get(x["priority"], 3))
-
- return recommendations[:8]
-
- def _determine_priority(self, sentence: str) -> str:
- """Determine recommendation priority."""
- sentence_lower = sentence.lower()
-
- if any(word in sentence_lower for word in ['urgent', 'immediate', 'critical', 'must']):
- return "high"
- elif any(word in sentence_lower for word in ['should', 'important', 'recommend']):
- return "medium"
- else:
- return "low"
-
- def _determine_timeframe(self, sentence: str) -> str:
- """Determine recommendation timeframe."""
- sentence_lower = sentence.lower()
-
- if any(word in sentence_lower for word in ['immediate', 'now', 'urgent', 'asap']):
- return "immediate"
- elif any(word in sentence_lower for word in ['short-term', 'soon', 'near']):
- return "short-term"
- elif any(word in sentence_lower for word in ['long-term', 'future', 'eventually']):
- return "long-term"
- else:
- return "medium-term"
-
- def _categorize_recommendation(self, sentence: str) -> str:
- """Categorize the type of recommendation."""
- sentence_lower = sentence.lower()
-
- if any(word in sentence_lower for word in ['strategy', 'strategic', 'plan']):
- return "strategic"
- elif any(word in sentence_lower for word in ['operational', 'process', 'procedure']):
- return "operational"
- elif any(word in sentence_lower for word in ['tactical', 'action', 'implement']):
- return "tactical"
- else:
- return "general"
-
- def _create_executive_summary(self, content: str) -> str:
- """Create an executive summary of the synthesis."""
- summary_parts = []
-
- paragraphs = content.split('\n\n')
- if paragraphs:
- opening = paragraphs[0][:200]
- if len(paragraphs[0]) > 200:
- opening += "..."
- summary_parts.append(opening)
-
- key_finding_indicators = ['key finding', 'main conclusion', 'importantly', 'notably']
- for paragraph in paragraphs[1:]:
- if any(indicator in paragraph.lower() for indicator in key_finding_indicators):
- summary_parts.append(paragraph[:150] + "..." if len(paragraph) > 150 else paragraph)
- if len(summary_parts) >= 3:
- break
-
- if len(paragraphs) > 1:
- conclusion = paragraphs[-1][:150]
- if conclusion not in summary_parts:
- summary_parts.append(conclusion + "..." if len(paragraphs[-1]) > 150 else conclusion)
-
- return " ".join(summary_parts)
-
- def _extract_action_items(self, content: str) -> List[Dict[str, str]]:
- """Extract specific action items from synthesis."""
- action_items = []
- action_indicators = ['action:', 'task:', 'todo:', 'action item', 'next step', 'to do']
-
- lines = content.split('\n')
- for line in lines:
- line_lower = line.lower()
- if any(indicator in line_lower for indicator in action_indicators):
- action_items.append({
- "action": line.strip(),
- "owner": "TBD",
- "deadline": "TBD",
- "status": "pending"
- })
- elif line.strip() and line.strip()[0].isdigit() and 'action' in line_lower:
- action_items.append({
- "action": line.strip(),
- "owner": "TBD",
- "deadline": "TBD",
- "status": "pending"
- })
-
- return action_items[:10]
-
- def _assess_synthesis_confidence(self, content: str) -> Dict[str, Any]:
- """Assess confidence in the synthesis."""
- word_count = len(content.split())
-
- high_confidence_words = ['clear', 'strong', 'definitive', 'conclusive', 'certain']
- low_confidence_words = ['uncertain', 'unclear', 'tentative', 'preliminary', 'limited']
-
- high_conf_count = sum(content.lower().count(word) for word in high_confidence_words)
- low_conf_count = sum(content.lower().count(word) for word in low_confidence_words)
-
- base_confidence = 0.7
- confidence_adjustment = (high_conf_count * 0.05) - (low_conf_count * 0.08)
- overall_confidence = max(0.3, min(0.95, base_confidence + confidence_adjustment))
- return {
- "overall": overall_confidence,
- "level": "high" if overall_confidence > 0.8 else "medium" if overall_confidence > 0.6 else "low",
- "factors": {
- "content_depth": word_count > 500,
- "evidence_strength": high_conf_count > low_conf_count,
- "consensus_level": "consensus" in content.lower()
- }
- }
+# ==============================================================================
+# 3. WORKFLOW COORDINATION
+# ==============================================================================
class CoordinatorAgent(BaseAgent):
- """Agent responsible for coordinating other agents and managing workflow."""
+ """
+ The central agent responsible for managing the entire workflow.
+ The Coordinator decomposes the main problem, creates and assigns tasks,
+ builds a dependency graph, and orchestrates the execution of the workflow
+ by the specialized agents.
+ """
def __init__(self, name: str, llm: Optional[ChatOpenAI] = None):
super().__init__(name, AgentRole.COORDINATOR, llm)
self.agents: Dict[str, BaseAgent] = {}
- self.task_queue: List[Task] = []
- self.completed_tasks: List[Task] = []
self.workflow_graph = nx.DiGraph()
- self.execution_history: List[Dict[str, Any]] = []
- self.workflow_templates = self._create_workflow_templates()
-
- def _create_workflow_templates(self) -> Dict[str, List[Dict[str, Any]]]:
- """Create predefined workflow templates for common problem types."""
- return {
- "research_analysis": [
- {"role": "Researcher", "task": "Gather comprehensive information"},
- {"role": "Analyst", "task": "Analyze findings and identify patterns"},
- {"role": "Critic", "task": "Evaluate analysis quality"},
- {"role": "Synthesizer", "task": "Create final recommendations"}
- ],
- "strategic_planning": [
- {"role": "Researcher", "task": "Research current state and trends"},
- {"role": "Analyst", "task": "SWOT analysis and opportunity identification"},
- {"role": "Researcher", "task": "Benchmark best practices"},
- {"role": "Critic", "task": "Risk assessment and gap analysis"},
- {"role": "Synthesizer", "task": "Strategic plan synthesis"}
- ],
- "problem_solving": [
- {"role": "Researcher", "task": "Define problem and gather context"},
- {"role": "Analyst", "task": "Root cause analysis"},
- {"role": "Researcher", "task": "Research potential solutions"},
- {"role": "Critic", "task": "Evaluate solution feasibility"},
- {"role": "Synthesizer", "task": "Recommend optimal solution"}
- ]
- }
def register_agent(self, agent: BaseAgent):
- """Register an agent with the coordinator."""
+ """Registers a specialized agent with the coordinator."""
self.agents[agent.name] = agent
self.workflow_graph.add_node(agent.name, role=agent.role.value)
logger.info(f"Registered agent: {agent.name} with role {agent.role.value}")
- async def decompose_problem(self, problem: str, use_template: bool = False) -> List[Task]:
- """Decompose a complex problem into subtasks."""
- if use_template:
- template_tasks = self._match_problem_to_template(problem)
- if template_tasks:
- return template_tasks
-
- if self.llm:
- prompt = ChatPromptTemplate.from_messages([
- SystemMessage(content="""You are a Coordinator Agent responsible for breaking down complex problems.
- Decompose the problem into specific subtasks that can be assigned to specialized agents:
- - Researcher: For gathering information and facts
- - Analyst: For analyzing data and identifying patterns
- - Critic: For evaluating quality and identifying issues
- - Synthesizer: For combining insights and creating summaries
-
- Create 4-8 clear, actionable subtasks with dependencies.
- Format each task as: [Role]: [Specific task description]"""),
- HumanMessage(content=f"Decompose this problem into subtasks: {problem}")
- ])
-
- response = await self.llm.ainvoke(prompt.format_messages())
- tasks = self._parse_tasks(response.content, problem)
- else:
- tasks = self._create_default_tasks(problem)
-
- for i, task in enumerate(tasks):
- task.metadata['problem_complexity'] = self._assess_problem_complexity(problem)
- task.metadata['estimated_duration'] = self._estimate_task_duration(task)
-
- return tasks
-
- def _match_problem_to_template(self, problem: str) -> Optional[List[Task]]:
- """Match problem to a workflow template."""
- problem_lower = problem.lower()
-
- if any(word in problem_lower for word in ['strategy', 'strategic', 'plan']):
- template_name = "strategic_planning"
- elif any(word in problem_lower for word in ['research', 'analyze', 'investigate']):
- template_name = "research_analysis"
- elif any(word in problem_lower for word in ['problem', 'solve', 'solution', 'fix']):
- template_name = "problem_solving"
- else:
- return None
-
- template = self.workflow_templates[template_name]
- tasks = []
-
- for i, step in enumerate(template):
- task = Task(
- id=f"task_{i+1}",
- description=f"{step['task']} for: {problem}",
- metadata={
- "original_problem": problem,
- "suggested_role": step['role'],
- "template": template_name
- }
- )
- tasks.append(task)
-
- return tasks
-
- def _assess_problem_complexity(self, problem: str) -> str:
- """Assess the complexity of a problem."""
- complexity_indicators = {
- "high": ['multiple', 'complex', 'comprehensive', 'strategic', 'long-term'],
- "medium": ['analyze', 'evaluate', 'develop', 'assess'],
- "low": ['simple', 'basic', 'straightforward', 'identify']
- }
-
- problem_lower = problem.lower()
-
- for level, indicators in complexity_indicators.items():
- if any(indicator in problem_lower for indicator in indicators):
- return level
-
- return "high" if len(problem) > 200 else "medium"
-
- def _estimate_task_duration(self, task: Task) -> float:
- """Estimate task duration in seconds."""
- base_duration = 30.0
-
- role_multipliers = {
- "Researcher": 1.2,
- "Analyst": 1.5,
- "Critic": 1.0,
- "Synthesizer": 1.3
- }
-
- role = task.metadata.get("suggested_role", "Researcher")
- duration = base_duration * role_multipliers.get(role, 1.0)
-
- complexity = task.metadata.get("problem_complexity", "medium")
- if complexity == "high":
- duration *= 1.5
- elif complexity == "low":
- duration *= 0.7
-
- return duration
-
- def _parse_tasks(self, content: str, original_problem: str) -> List[Task]:
- """Parse LLM response into Task objects."""
- tasks = []
- lines = content.split('\n')
-
- task_id = 1
- current_role = None
-
- for line in lines:
- line = line.strip()
- if not line:
- continue
-
- role_found = False
- for role in AgentRole:
- if role.value in line or role.value.lower() in line.lower():
- current_role = role.value
- role_found = True
- break
-
- if current_role and ':' in line:
- task_parts = line.split(':', 1)
- if len(task_parts) > 1:
- task_desc = task_parts[1].strip()
-
- task = Task(
- id=f"task_{task_id}",
- description=task_desc,
- metadata={
- "original_problem": original_problem,
- "suggested_role": current_role,
- "source": "llm_decomposition"
- }
- )
-
- tasks.append(task)
- task_id += 1
-
- if len(tasks) < 4:
- tasks.extend(self._create_default_tasks(original_problem)[len(tasks):])
-
- return tasks
-
- def _create_default_tasks(self, problem: str) -> List[Task]:
- """Create default tasks for a problem."""
- return [
- Task(
- id="task_1",
- description=f"Research comprehensive background information on: {problem}",
- metadata={"suggested_role": "Researcher", "source": "default"}
- ),
- Task(
- id="task_2",
- description=f"Analyze key factors and patterns related to: {problem}",
- metadata={"suggested_role": "Analyst", "source": "default"}
- ),
- Task(
- id="task_3",
- description="Critically evaluate the research findings and analysis quality",
- metadata={"suggested_role": "Critic", "source": "default"}
- ),
- Task(
- id="task_4",
- description="Synthesize all findings into actionable insights and recommendations",
- metadata={"suggested_role": "Synthesizer", "source": "default"}
- )
+ def decompose_problem(self, problem: str) -> List[Task]:
+ """
+ Breaks down a complex problem into a sequence of structured tasks.
+ A predefined template is used for structured and repeatable workflows.
+
+ Args:
+ problem (str): The high-level problem statement.
+
+ Returns:
+ List[Task]: A list of Task objects ready for execution.
+ """
+ # A standard workflow template ensures consistency.
+ tasks = [
+ Task(id="task_1", description=f"Conduct foundational research on: {problem}", metadata={"suggested_role": "Researcher"}),
+ Task(id="task_2", description=f"Analyze the research findings for patterns and insights related to: {problem}", metadata={"suggested_role": "Analyst"}),
+ Task(id="task_3", description="Critically evaluate the research and analysis for quality, bias, and completeness.", metadata={"suggested_role": "Critic"}),
+ Task(id="task_4", description="Synthesize all findings into a final report with actionable recommendations.", metadata={"suggested_role": "Synthesizer"})
]
+ self._build_dependency_graph(tasks)
+ return tasks
def _build_dependency_graph(self, tasks: List[Task]):
- """Build a dependency graph for tasks."""
- role_order = {
- "Researcher": 1,
- "Analyst": 2,
- "Critic": 3,
- "Synthesizer": 4
- }
+ """Constructs dependencies between tasks based on a logical sequence."""
+ for i in range(len(tasks) - 1):
+ tasks[i+1].dependencies.append(tasks[i].id)
- sorted_tasks = sorted(tasks, key=lambda t: role_order.get(t.metadata.get("suggested_role", "Researcher"), 5))
+ async def execute_workflow(self, tasks: List[Task]) -> Dict[str, Any]:
+ """
+ Executes a list of tasks according to their dependencies.
- for i in range(len(sorted_tasks) - 1):
- current_task = sorted_tasks[i]
- next_task = sorted_tasks[i + 1]
+ This method uses a thread pool to execute tasks in parallel where possible,
+ respecting the predefined dependency graph.
- current_order = role_order.get(current_task.metadata.get("suggested_role"), 0)
- next_order = role_order.get(next_task.metadata.get("suggested_role"), 0)
+ Args:
+ tasks (List[Task]): The list of tasks to execute.
- if next_order >= current_order:
- next_task.dependencies.append(current_task.id)
-
- async def execute_workflow(self, tasks: List[Task], parallel: bool = True) -> Dict[str, Any]:
- """Execute the workflow with given tasks."""
+ Returns:
+ Dict[str, Any]: A dictionary containing the results of the workflow execution.
+ """
start_time = datetime.now()
- self.performance_tracker.start_tracking()
-
- self._build_dependency_graph(tasks)
-
- self._update_workflow_graph(tasks)
-
- try:
- if parallel:
- await self._execute_parallel(tasks)
- else:
- await self._execute_sequential(tasks)
- except Exception as e:
- logger.error(f"Workflow execution error: {str(e)}")
-
- end_time = datetime.now()
- self.performance_tracker.end_tracking()
-
- execution_time = (end_time - start_time).total_seconds()
-
- workflow_result = {
- "tasks": tasks,
- "execution_time": execution_time,
- "success_rate": self._calculate_success_rate(tasks),
- "agent_contributions": self._compile_agent_contributions(tasks),
- "workflow_graph": self.workflow_graph,
- "performance_metrics": self.performance_tracker.get_performance_summary(),
- "timestamp": datetime.now()
- }
-
- self.execution_history.append(workflow_result)
+ self._update_workflow_graph_with_tasks(tasks)
- return workflow_result
+ completed_task_ids = set()
+ task_dict = {t.id: t for t in tasks}
- def _update_workflow_graph(self, tasks: List[Task]):
- """Update the workflow graph with task relationships."""
- for task in tasks:
- self.workflow_graph.add_node(
- task.id,
- task_description=task.description[:50] + "...",
- status=task.status.value
- )
-
- for task in tasks:
- for dep_id in task.dependencies:
- self.workflow_graph.add_edge(dep_id, task.id)
-
- async def _execute_parallel(self, tasks: List[Task]) -> List[Task]:
- """Execute tasks in parallel where possible."""
- completed = set()
- pending = tasks.copy()
-
- with ThreadPoolExecutor(max_workers=len(self.agents) if self.agents else 1) as executor:
- while pending:
- ready_tasks = [
- task for task in pending
- if all(dep in completed for dep in task.dependencies)
- ]
-
- if not ready_tasks:
- if pending:
- logger.warning("No ready tasks found, executing first pending task")
- ready_tasks = [pending[0]]
- else:
- break
-
- if not ready_tasks:
+ while len(completed_task_ids) < len(tasks):
+ ready_tasks = [
+ t for t in tasks if t.status == TaskStatus.PENDING and all(dep in completed_task_ids for dep in t.dependencies)
+ ]
+ if not ready_tasks:
+ # Break if no tasks are ready to run to prevent infinite loops
+ failed_tasks = [t for t in tasks if t.status == TaskStatus.FAILED]
+ if len(completed_task_ids) + len(failed_tasks) == len(tasks):
break
+ # If there are still pending tasks but none are ready, it indicates a dependency issue.
+ logger.error("Workflow stalled: circular dependency or unresolved failed dependency.")
+ break
+ with ThreadPoolExecutor(max_workers=len(self.agents)) as executor:
future_to_task = {}
for task in ready_tasks:
- agent_name = self._select_agent_for_task(task)
- if agent_name and agent_name in self.agents:
- agent = self.agents[agent_name]
+ agent = self._select_agent_for_task(task)
+ if agent:
+ task.status = TaskStatus.IN_PROGRESS
future = executor.submit(asyncio.run, agent.process_task(task))
- future_to_task[future] = (task, agent_name)
+ future_to_task[future] = task.id
for future in as_completed(future_to_task):
- task, agent_name = future_to_task[future]
+ task_id = future_to_task[future]
try:
completed_task = future.result()
- completed.add(task.id)
- if task in pending:
- pending.remove(task)
- self.completed_tasks.append(completed_task)
-
- self.workflow_graph.add_edge(
- self.name, agent_name,
- task_id=task.id,
- timestamp=datetime.now().isoformat()
- )
-
- await self._facilitate_collaboration(completed_task, agent_name)
-
- except Exception as e:
- logger.error(f"Task {task.id} failed: {str(e)}")
- task.status = TaskStatus.FAILED
-
- return tasks
-
- async def _execute_sequential(self, tasks: List[Task]) -> List[Task]:
- """Execute tasks sequentially."""
- for task in tasks:
- agent_name = self._select_agent_for_task(task)
- if agent_name and agent_name in self.agents:
- agent = self.agents[agent_name]
- completed_task = await agent.process_task(task)
-
- self.workflow_graph.add_edge(
- self.name, agent_name,
- task_id=task.id,
- timestamp=datetime.now().isoformat()
- )
-
- self.completed_tasks.append(completed_task)
-
- await self._facilitate_collaboration(completed_task, agent_name)
-
- return tasks
+ task_dict[task_id] = completed_task
+ if completed_task.status == TaskStatus.COMPLETED:
+ completed_task_ids.add(task_id)
+ # Update the graph with the final status
+ self.workflow_graph.nodes[task_id]['status'] = completed_task.status.value
+ except Exception as exc:
+ logger.error(f"Task {task_id} generated an exception: {exc}")
+ task_dict[task_id].status = TaskStatus.FAILED
+ self.workflow_graph.nodes[task_id]['status'] = TaskStatus.FAILED.value
+
+ final_tasks = list(task_dict.values())
+ return self._compile_workflow_results(final_tasks, start_time)
- def _select_agent_for_task(self, task: Task) -> Optional[str]:
- """Select the best agent for a given task."""
+ def _select_agent_for_task(self, task: Task) -> Optional[BaseAgent]:
+ """Selects an available agent best suited for a given task."""
suggested_role = task.metadata.get("suggested_role")
-
- for agent_name, agent in self.agents.items():
- if agent.role.value == suggested_role:
- if agent.active and agent.current_task is None:
- return agent_name
-
- for agent_name, agent in self.agents.items():
+ for agent in self.agents.values():
if agent.role.value == suggested_role:
- return agent_name
-
- for agent_name, agent in self.agents.items():
- if agent.active:
- return agent_name
-
- return next(iter(self.agents.keys()), None)
-
- async def _facilitate_collaboration(self, task: Task, agent_name: str):
- """Facilitate collaboration between agents after task completion."""
- if not task.result or task.status != TaskStatus.COMPLETED:
- return
-
- collab_message = AgentMessage(
- sender=agent_name,
- recipient="all",
- content=f"Task completed: {task.description}\nKey findings: {str(task.result)[:500]}",
- message_type=MessageType.COMPLETION_REPORT,
- priority=3
- )
-
- for other_agent_name, other_agent in self.agents.items():
- if other_agent_name != agent_name:
- if self._should_collaborate(task, other_agent):
- await other_agent.memory.add_message(collab_message)
- self.performance_tracker.record_collaboration()
-
- def _should_collaborate(self, task: Task, agent: BaseAgent) -> bool:
- """Determine if an agent should receive collaboration message."""
- if agent.role == AgentRole.SYNTHESIZER:
- return True
-
- if agent.role == AgentRole.CRITIC and task.metadata.get("suggested_role") in ["Researcher", "Analyst"]:
- return True
-
- if agent.role == AgentRole.ANALYST and task.metadata.get("suggested_role") == "Researcher":
- return True
-
- return False
-
- def _calculate_success_rate(self, tasks: List[Task]) -> float:
- """Calculate the success rate of task execution."""
- if not tasks:
- return 0.0
-
- successful = sum(1 for task in tasks if task.status == TaskStatus.COMPLETED)
- return successful / len(tasks)
-
- def _compile_agent_contributions(self, tasks: List[Task]) -> Dict[str, Any]:
- """Compile contributions from each agent."""
- contributions = {}
-
- for agent_name, agent in self.agents.items():
- agent_tasks = [task for task in tasks if task.assigned_to == agent_name]
-
- if agent_tasks:
- total_execution_time = sum(
- task.performance_metrics.get('execution_time', 0)
- for task in agent_tasks
- )
-
- avg_confidence = np.mean([task.confidence for task in agent_tasks])
-
- contributions[agent_name] = {
- "role": agent.role.value,
- "tasks_completed": len(agent_tasks),
- "average_confidence": avg_confidence,
- "total_execution_time": total_execution_time,
- "collaboration_count": len(agent.collaboration_partners),
- "status": agent.get_status_summary()
- }
- else:
- contributions[agent_name] = {
- "role": agent.role.value,
- "tasks_completed": 0,
- "average_confidence": 0.0,
- "total_execution_time": 0.0,
- "collaboration_count": 0,
- "status": agent.get_status_summary()
- }
-
- return contributions
-
- def get_workflow_insights(self) -> Dict[str, Any]:
- """Get insights about workflow execution patterns."""
- if not self.execution_history:
- return {"message": "No execution history available"}
-
- total_executions = len(self.execution_history)
- avg_execution_time = np.mean([wf['execution_time'] for wf in self.execution_history])
- avg_success_rate = np.mean([wf['success_rate'] for wf in self.execution_history])
-
- agent_stats = {}
- for workflow in self.execution_history:
- for agent, contrib in workflow['agent_contributions'].items():
- if agent not in agent_stats:
- agent_stats[agent] = {
- 'total_tasks': 0,
- 'total_time': 0,
- 'confidence_scores': []
- }
-
- agent_stats[agent]['total_tasks'] += contrib['tasks_completed']
- agent_stats[agent]['total_time'] += contrib['total_execution_time']
- if contrib['average_confidence'] > 0:
- agent_stats[agent]['confidence_scores'].append(contrib['average_confidence'])
-
- agent_efficiency = {}
- for agent, stats in agent_stats.items():
- if stats['total_tasks'] > 0:
- agent_efficiency[agent] = {
- 'avg_time_per_task': stats['total_time'] / stats['total_tasks'],
- 'avg_confidence': np.mean(stats['confidence_scores']) if stats['confidence_scores'] else 0,
- 'total_tasks': stats['total_tasks']
- }
+ return agent
+ return None
+ def _update_workflow_graph_with_tasks(self, tasks: List[Task]):
+ """Adds task nodes and edges to the master workflow graph."""
+ for task in tasks:
+ self.workflow_graph.add_node(task.id, task_description=task.description, status=task.status.value)
+ for dep_id in task.dependencies:
+ self.workflow_graph.add_edge(dep_id, task.id)
+ # Link agent to the task it will perform
+ agent = self._select_agent_for_task(task)
+ if agent:
+ self.workflow_graph.add_edge(agent.name, task.id)
+
+ def _compile_workflow_results(self, tasks: List[Task], start_time: datetime) -> Dict[str, Any]:
+ """Compiles the final results and metrics of the workflow."""
+ execution_time = (datetime.now() - start_time).total_seconds()
+ successful_tasks = [t for t in tasks if t.status == TaskStatus.COMPLETED]
+
return {
- 'total_workflows_executed': total_executions,
- 'average_execution_time': avg_execution_time,
- 'average_success_rate': avg_success_rate,
- 'agent_efficiency': agent_efficiency,
- 'most_efficient_agent': min(agent_efficiency.items(),
- key=lambda x: x[1]['avg_time_per_task'])[0] if agent_efficiency else None,
- 'highest_quality_agent': max(agent_efficiency.items(),
- key=lambda x: x[1]['avg_confidence'])[0] if agent_efficiency else None
+ "tasks": tasks,
+ "execution_time": execution_time,
+ "success_rate": len(successful_tasks) / len(tasks) if tasks else 0,
+ "agent_contributions": {
+ agent.name: {
+ "role": agent.role.value,
+ "tasks_completed": [t.id for t in tasks if t.assigned_to == agent.name],
+ "average_confidence": np.mean([t.confidence for t in tasks if t.assigned_to == agent.name and t.confidence > 0]) if any(t.assigned_to == agent.name for t in tasks) else 0
+ } for agent in self.agents.values()
+ },
+ "workflow_graph": self.workflow_graph
}
+
+# ==============================================================================
+# 4. VISUALIZATION AND REPORTING
+# ==============================================================================
+
class WorkflowVisualizer:
- """Handles visualization of agent interactions and workflow."""
+ """Handles the creation of all visualizations for the workflow."""
def __init__(self):
+ """Initializes the visualizer with a color map."""
self.color_map = Config.NODE_COLORS
- self.layout_cache = {}
- self.animation_frames = []
-
- def create_workflow_graph(self, workflow_graph: nx.DiGraph,
- active_agents: List[str] = None,
- highlight_tasks: List[str] = None) -> go.Figure:
- """Create an interactive workflow visualization."""
- if len(workflow_graph.nodes()) == 0:
- return self._create_empty_graph()
-
- pos = self._hierarchical_layout(workflow_graph)
-
- edge_trace = self._create_edge_trace(workflow_graph, pos)
- node_trace = self._create_node_trace(workflow_graph, pos, active_agents, highlight_tasks)
-
- status_trace = self._create_status_indicators(workflow_graph, pos, active_agents)
-
- traces = [edge_trace, node_trace]
- if status_trace:
- traces.append(status_trace)
-
- fig = go.Figure(
- data=traces,
- layout=go.Layout(
- title={
- 'text': 'Agent Collaboration Network',
- 'x': 0.5,
- 'xanchor': 'center',
- 'font': {'size': 20, 'color': '#2c3e50'}
- },
- showlegend=False,
- hovermode='closest',
- margin=dict(b=40, l=40, r=40, t=60),
- xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
- yaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
- plot_bgcolor='#f8f9fa',
- paper_bgcolor='white',
- font=dict(family='Inter, sans-serif'),
- annotations=self._create_annotations(workflow_graph, pos)
- )
- )
- fig.update_layout(
- shapes=self._create_background_shapes(pos)
- )
+ def create_workflow_graph(self, G: nx.DiGraph) -> go.Figure:
+ """Creates an interactive Plotly graph of the agent collaboration network."""
+ if not G.nodes():
+ return self._create_empty_figure("No workflow data available.")
- return fig
-
- def _create_empty_graph(self) -> go.Figure:
- """Create empty graph with message."""
- fig = go.Figure()
- fig.add_annotation(
- text="No workflow data to display.
Start an analysis to see agent interactions.",
- xref="paper", yref="paper",
- x=0.5, y=0.5,
- showarrow=False,
- font=dict(size=16, color='#7f8c8d'),
- bgcolor='#ecf0f1',
- borderpad=20
- )
- fig.update_layout(
- height=500,
- plot_bgcolor='#f8f9fa',
- paper_bgcolor='white'
- )
- return fig
-
- def _hierarchical_layout(self, G: nx.DiGraph) -> Dict:
- """Create hierarchical layout for the graph."""
- graph_hash = hash(tuple(sorted(G.edges())))
- if graph_hash in self.layout_cache:
- return self.layout_cache[graph_hash]
-
- hierarchy = {
- 'Coordinator': 0,
- 'Researcher': 1,
- 'Analyst': 2,
- 'Critic': 3,
- 'Synthesizer': 4
- }
-
- agent_nodes = [n for n in G.nodes() if G.nodes[n].get('role')]
- task_nodes = [n for n in G.nodes() if 'task_' in str(n)]
-
- pos = {}
-
- role_x_positions = {}
- for i, (role, level) in enumerate(hierarchy.items()):
- role_x_positions[role] = i * 3 - 6
-
- for node in agent_nodes:
- role = G.nodes[node].get('role', 'Unknown')
- level = hierarchy.get(role, 5)
- x = role_x_positions.get(role, 0)
- y = -level * 2.5
- pos[node] = (x, y)
-
- if task_nodes:
- task_start_x = max(role_x_positions.values()) + 4 if role_x_positions else 4
- cols = 3
- for i, task in enumerate(task_nodes):
- row = i // cols
- col = i % cols
- pos[task] = (task_start_x + col * 2, -row * 1.5)
-
- self.layout_cache[graph_hash] = pos
- return pos
-
- def _create_edge_trace(self, G: nx.DiGraph, pos: Dict) -> go.Scatter:
- """Create edge trace for the graph."""
- edge_x = []
- edge_y = []
+ # Use a hierarchical layout
+ pos = nx.spring_layout(G, k=0.9, iterations=50, seed=42)
+ edge_x, edge_y = [], []
for edge in G.edges():
- if edge[0] in pos and edge[1] in pos:
- x0, y0 = pos[edge[0]]
- x1, y1 = pos[edge[1]]
-
- mid_x = (x0 + x1) / 2
- mid_y = (y0 + y1) / 2 - 0.5
-
- t = np.linspace(0, 1, 10)
- for ti in t:
- x = (1-ti)**2 * x0 + 2*(1-ti)*ti * mid_x + ti**2 * x1
- y = (1-ti)**2 * y0 + 2*(1-ti)*ti * mid_y + ti**2 * y1
- edge_x.append(x)
- edge_y.append(y)
- edge_x.append(None)
- edge_y.append(None)
-
- edge_trace = go.Scatter(
- x=edge_x, y=edge_y,
- line=dict(width=2, color='#bdc3c7'),
- hoverinfo='none',
- mode='lines',
- opacity=0.6
- )
-
- return edge_trace
+ x0, y0 = pos[edge[0]]
+ x1, y1 = pos[edge[1]]
+ edge_x.extend([x0, x1, None])
+ edge_y.extend([y0, y1, None])
- def _create_node_trace(self, G: nx.DiGraph, pos: Dict,
- active_agents: List[str] = None,
- highlight_tasks: List[str] = None) -> go.Scatter:
- """Create node trace for the graph."""
- node_x, node_y, node_colors, node_sizes = [], [], [], []
- node_text, node_hover, node_symbols = [], [], []
+ edge_trace = go.Scatter(x=edge_x, y=edge_y, line=dict(width=1, color='#888'), hoverinfo='none', mode='lines')
+ node_x, node_y, node_text, node_colors, node_sizes = [], [], [], [], []
for node in G.nodes():
- if node in pos:
- x, y = pos[node]
- node_x.append(x)
- node_y.append(y)
-
- node_data = G.nodes[node]
- role = node_data.get('role', '')
-
- if role:
- color = self.color_map.get(role, '#95a5a6')
- size = 35
- symbol = 'circle'
-
- if active_agents and node in active_agents:
- size = 45
- color = self._brighten_color(color)
-
- hover_text = f"{node}
Role: {role}
Status: Active"
- display_text = node.split('-')[0]
-
- else:
- status = node_data.get('status', 'pending')
- color = self._get_status_color(status)
- size = 25
- symbol = 'square'
-
- if highlight_tasks and str(node) in highlight_tasks:
- size = 30
- color = self._brighten_color(color)
-
- task_desc = node_data.get('task_description', 'Task')
- hover_text = f"{node}
{task_desc}
Status: {status}"
- display_text = node if len(str(node)) < 10 else f"T{str(node)[-1]}"
-
- node_colors.append(color)
- node_sizes.append(size)
- node_text.append(display_text)
- node_hover.append(hover_text)
- node_symbols.append(symbol)
+ x, y = pos[node]
+ node_x.append(x)
+ node_y.append(y)
+
+ is_agent = 'role' in G.nodes[node]
+ if is_agent:
+ role = G.nodes[node]['role']
+ node_text.append(f"{node}
{role}")
+ node_colors.append(self.color_map.get(role, '#ccc'))
+ node_sizes.append(35)
+ else: # is task
+ status = G.nodes[node].get('status', 'Pending')
+ node_text.append(f"{node}
Status: {status}")
+ node_colors.append('#6c757d' if status == 'Pending' else '#28a745' if status == 'Completed' else '#dc3545')
+ node_sizes.append(20)
node_trace = go.Scatter(
- x=node_x, y=node_y,
- mode='markers+text',
- hoverinfo='text',
- text=node_text,
- hovertext=node_hover,
- textposition="bottom center",
- textfont=dict(size=10, color='#2c3e50'),
- marker=dict(
- showscale=False,
- color=node_colors,
- size=node_sizes,
- symbol=node_symbols,
- line=dict(color='white', width=2)
- )
- )
-
- return node_trace
-
- def _create_status_indicators(self, G: nx.DiGraph, pos: Dict,
- active_agents: List[str] = None) -> Optional[go.Scatter]:
- """Create status indicators for active agents."""
- if not active_agents:
- return None
-
- indicator_x = []
- indicator_y = []
-
- for agent in active_agents:
- if agent in pos:
- x, y = pos[agent]
- indicator_x.append(x)
- indicator_y.append(y + 0.5)
-
- if not indicator_x:
- return None
-
- return go.Scatter(
- x=indicator_x,
- y=indicator_y,
- mode='markers',
- marker=dict(
- size=8,
- color='#2ecc71',
- symbol='circle',
- line=dict(width=2, color='white')
- ),
- hoverinfo='skip'
+ x=node_x, y=node_y, mode='markers', hoverinfo='text',
+ text=node_text, hovertemplate='%{text}
Problem Statement: {problem_statement}
+Generated on: {datetime.now().strftime('%B %d, %Y at %I:%M %p')}
+The multi-agent system addressed the problem, achieving a {result['success_rate']:.0%} success rate in {result['execution_time']:.1f} seconds. The workflow involved {len(result['tasks'])} tasks distributed among specialized agents to ensure comprehensive analysis.
+No synthesized recommendations were produced.
" + if synth_task and isinstance(synth_task.result, dict): + recommendations = synth_task.result.get('synthesis', 'No specific recommendations provided in the synthesis.') + content = f"{recommendations}
" + + return f""" +Task ID | +Agent | +Description | +Status | +Confidence | +
---|
Level: {confidence_level}
-- Advanced AI agents working together to solve complex problems through intelligent task decomposition and parallel processing. -
-