diff --git "a/app.py" "b/app.py"
--- "a/app.py"
+++ "b/app.py"
@@ -1,10 +1,10 @@
# Multi-Agent AI Collaboration System
-# Author: Spencer Purdy
+# Author: [Your Name]
# Description: Enterprise-grade multi-agent system with specialized AI agents collaborating
-# to solve complex problems through task decomposition and parallel processing.
+# to solve complex problems through intelligent task decomposition and parallel processing.
# Installation (uncomment for Google Colab)
-# !pip install gradio langchain langchain-openai openai networkx matplotlib asyncio aiohttp pandas numpy plotly python-dotenv pydantic
+!pip install gradio langchain langchain-openai openai networkx matplotlib asyncio aiohttp pandas numpy plotly python-dotenv pydantic scipy
import os
import json
@@ -27,6 +27,7 @@ import networkx as nx
import matplotlib.pyplot as plt
from matplotlib.patches import FancyBboxPatch
import plotly.graph_objects as go
+import plotly.express as px
from plotly.subplots import make_subplots
# LangChain and AI libraries
@@ -39,7 +40,7 @@ from pydantic import BaseModel, Field
# Async libraries
import aiohttp
-from concurrent.futures import ThreadPoolExecutor
+from concurrent.futures import ThreadPoolExecutor, as_completed
# Configure logging
logging.basicConfig(
@@ -64,15 +65,33 @@ class Config:
GRAPH_UPDATE_INTERVAL = 0.5 # seconds
NODE_COLORS = {
'Researcher': '#3498db',
- 'Analyst': '#e74c3c',
+ 'Analyst': '#e74c3c',
'Critic': '#f39c12',
'Synthesizer': '#2ecc71',
'Coordinator': '#9b59b6'
}
+ # Performance settings
+ ENABLE_PERFORMANCE_TRACKING = True
+ BENCHMARK_BASELINE = {
+ "single_agent_time": 45.0,
+ "single_agent_quality": 0.72
+ }
+
# Report settings
CONFIDENCE_THRESHOLD = 0.7
MAX_REPORT_SECTIONS = 10
+ COMPANY_NAME = "Multi-Agent AI Platform"
+
+ # Demo settings
+ DEMO_MODE_ENABLED = True
+ DEMO_PROBLEMS = [
+ "Analyze the impact of remote work on team productivity and collaboration",
+ "Develop a strategy for sustainable urban transportation",
+ "Evaluate the risks and benefits of AI in healthcare",
+ "Design a framework for ethical AI development",
+ "Create a plan for digital transformation in education"
+ ]
class AgentRole(Enum):
"""Enumeration of agent roles in the system."""
@@ -89,6 +108,14 @@ class TaskStatus(Enum):
COMPLETED = "completed"
FAILED = "failed"
+class MessageType(Enum):
+ """Types of messages between agents."""
+ TASK_ASSIGNMENT = "task_assignment"
+ COLLABORATION_REQUEST = "collaboration_request"
+ INFORMATION_SHARING = "information_sharing"
+ FEEDBACK = "feedback"
+ COMPLETION_REPORT = "completion_report"
+
@dataclass
class Task:
"""Represents a task to be executed by agents."""
@@ -102,6 +129,7 @@ class Task:
created_at: datetime = field(default_factory=datetime.now)
completed_at: Optional[datetime] = None
metadata: Dict[str, Any] = field(default_factory=dict)
+ performance_metrics: Dict[str, float] = field(default_factory=dict)
@dataclass
class AgentMessage:
@@ -109,9 +137,102 @@ class AgentMessage:
sender: str
recipient: str
content: str
- message_type: str = "task"
+ message_type: MessageType
metadata: Dict[str, Any] = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.now)
+ priority: int = 1 # 1 (low) to 5 (high)
+
+class PerformanceTracker:
+ """Tracks performance metrics for the multi-agent system."""
+
+ def __init__(self):
+ self.metrics = {
+ 'task_completion_times': [],
+ 'agent_utilization': {},
+ 'collaboration_count': 0,
+ 'total_messages': 0,
+ 'quality_scores': [],
+ 'system_start_time': None,
+ 'system_end_time': None
+ }
+
+ def start_tracking(self):
+ """Start performance tracking."""
+ self.metrics['system_start_time'] = datetime.now()
+
+ def end_tracking(self):
+ """End performance tracking."""
+ self.metrics['system_end_time'] = datetime.now()
+
+ def record_task_completion(self, task: Task):
+ """Record task completion metrics."""
+ if task.created_at and task.completed_at:
+ completion_time = (task.completed_at - task.created_at).total_seconds()
+ self.metrics['task_completion_times'].append(completion_time)
+
+ def record_agent_activity(self, agent_name: str, activity_duration: float):
+ """Record agent activity duration."""
+ if agent_name not in self.metrics['agent_utilization']:
+ self.metrics['agent_utilization'][agent_name] = 0
+ self.metrics['agent_utilization'][agent_name] += activity_duration
+
+ def record_collaboration(self):
+ """Record a collaboration event."""
+ self.metrics['collaboration_count'] += 1
+
+ def record_message(self):
+ """Record a message exchange."""
+ self.metrics['total_messages'] += 1
+
+ def get_performance_summary(self) -> Dict[str, Any]:
+ """Get performance summary statistics."""
+ total_time = 0
+ if self.metrics['system_start_time'] and self.metrics['system_end_time']:
+ total_time = (self.metrics['system_end_time'] -
+ self.metrics['system_start_time']).total_seconds()
+
+ avg_task_time = np.mean(self.metrics['task_completion_times']) if self.metrics['task_completion_times'] else 0
+
+ # Calculate improvement over baseline
+ baseline_time = Config.BENCHMARK_BASELINE['single_agent_time']
+ time_improvement = ((baseline_time - avg_task_time) / baseline_time * 100) if avg_task_time > 0 else 0
+
+ return {
+ 'total_execution_time': total_time,
+ 'average_task_completion_time': avg_task_time,
+ 'total_collaborations': self.metrics['collaboration_count'],
+ 'total_messages': self.metrics['total_messages'],
+ 'agent_utilization': self.metrics['agent_utilization'],
+ 'time_improvement_percentage': time_improvement,
+ 'efficiency_score': self._calculate_efficiency_score()
+ }
+
+ def _calculate_efficiency_score(self) -> float:
+ """Calculate overall efficiency score."""
+ # Simple efficiency calculation based on multiple factors
+ factors = []
+
+ # Task completion speed factor
+ if self.metrics['task_completion_times']:
+ avg_time = np.mean(self.metrics['task_completion_times'])
+ speed_factor = min(1.0, Config.BENCHMARK_BASELINE['single_agent_time'] / avg_time)
+ factors.append(speed_factor)
+
+ # Collaboration efficiency factor
+ if self.metrics['total_messages'] > 0:
+ collab_factor = min(1.0, self.metrics['collaboration_count'] / self.metrics['total_messages'])
+ factors.append(collab_factor)
+
+ # Agent utilization factor
+ if self.metrics['agent_utilization']:
+ utilization_values = list(self.metrics['agent_utilization'].values())
+ if utilization_values:
+ avg_utilization = np.mean(utilization_values)
+ max_utilization = max(utilization_values)
+ balance_factor = avg_utilization / max_utilization if max_utilization > 0 else 0
+ factors.append(balance_factor)
+
+ return np.mean(factors) if factors else 0.5
class AgentMemory:
"""Manages agent conversation history and context."""
@@ -120,12 +241,16 @@ class AgentMemory:
self.messages: List[AgentMessage] = []
self.max_messages = max_messages
self.context: Dict[str, Any] = {}
+ self.knowledge_base: Dict[str, Any] = {}
def add_message(self, message: AgentMessage):
"""Add a message to memory."""
self.messages.append(message)
if len(self.messages) > self.max_messages:
self.messages.pop(0)
+
+ # Extract and store important information
+ self._extract_knowledge(message)
def get_recent_messages(self, n: int = 10) -> List[AgentMessage]:
"""Get n most recent messages."""
@@ -135,6 +260,10 @@ class AgentMemory:
"""Get all messages from a specific sender."""
return [msg for msg in self.messages if msg.sender == sender]
+ def get_high_priority_messages(self) -> List[AgentMessage]:
+ """Get high priority messages."""
+ return [msg for msg in self.messages if msg.priority >= 4]
+
def update_context(self, key: str, value: Any):
"""Update context information."""
self.context[key] = value
@@ -142,11 +271,27 @@ class AgentMemory:
def get_context(self, key: str) -> Any:
"""Get context information."""
return self.context.get(key)
+
+ def _extract_knowledge(self, message: AgentMessage):
+ """Extract and store important knowledge from messages."""
+ # Simple keyword-based knowledge extraction
+ keywords = ['finding', 'conclusion', 'recommendation', 'insight', 'pattern']
+ content_lower = message.content.lower()
+
+ for keyword in keywords:
+ if keyword in content_lower:
+ knowledge_key = f"{message.sender}_{keyword}_{len(self.knowledge_base)}"
+ self.knowledge_base[knowledge_key] = {
+ 'content': message.content,
+ 'sender': message.sender,
+ 'timestamp': message.timestamp,
+ 'type': keyword
+ }
class BaseAgent:
"""Base class for all AI agents in the system."""
- def __init__(self, name: str, role: AgentRole, llm: ChatOpenAI):
+ def __init__(self, name: str, role: AgentRole, llm: Optional[ChatOpenAI] = None):
self.name = name
self.role = role
self.llm = llm
@@ -154,6 +299,8 @@ class BaseAgent:
self.active = True
self.current_task: Optional[Task] = None
self.completed_tasks: List[Task] = []
+ self.performance_tracker = PerformanceTracker()
+ self.collaboration_partners: Set[str] = set()
async def process_task(self, task: Task) -> Task:
"""Process a task and return the result."""
@@ -161,16 +308,29 @@ class BaseAgent:
task.status = TaskStatus.IN_PROGRESS
task.assigned_to = self.name
+ # Record start time
+ start_time = datetime.now()
+
try:
# Execute task based on agent role
- result = await self._execute_task(task)
+ if self.llm:
+ result = await self._execute_task(task)
+ else:
+ # Demo mode - simulate execution
+ result = await self._simulate_task_execution(task)
task.result = result
task.status = TaskStatus.COMPLETED
task.completed_at = datetime.now()
task.confidence = self._calculate_confidence(result)
+ # Record performance metrics
+ execution_time = (task.completed_at - start_time).total_seconds()
+ task.performance_metrics['execution_time'] = execution_time
+ task.performance_metrics['confidence'] = task.confidence
+
self.completed_tasks.append(task)
+ self.performance_tracker.record_task_completion(task)
except Exception as e:
logger.error(f"Agent {self.name} failed to process task {task.id}: {str(e)}")
@@ -187,42 +347,115 @@ class BaseAgent:
"""Execute the task - to be implemented by subclasses."""
raise NotImplementedError("Subclasses must implement _execute_task")
+ async def _simulate_task_execution(self, task: Task) -> Any:
+ """Simulate task execution for demo mode."""
+ # Simulate processing time
+ await asyncio.sleep(np.random.uniform(1, 3))
+
+ # Generate simulated result based on role
+ simulation_templates = {
+ AgentRole.RESEARCHER: {
+ "findings": f"Research on '{task.description}' reveals multiple perspectives and data points.",
+ "sources": ["Academic studies", "Industry reports", "Expert interviews"],
+ "key_points": [
+ "Significant trends identified in the domain",
+ "Multiple stakeholder perspectives considered",
+ "Historical context provides important insights"
+ ]
+ },
+ AgentRole.ANALYST: {
+ "analysis": f"Analysis of '{task.description}' shows clear patterns and relationships.",
+ "patterns": [{"description": "Trend identified", "type": "trend"}],
+ "insights": ["Data suggests strong correlations", "Multiple factors influence outcomes"],
+ "confidence_metrics": {"overall_confidence": 0.85}
+ },
+ AgentRole.CRITIC: {
+ "evaluation": f"Critical evaluation of '{task.description}' identifies strengths and areas for improvement.",
+ "strengths": ["Comprehensive approach", "Well-structured analysis"],
+ "gaps": ["Additional data needed", "Alternative perspectives to consider"],
+ "quality_score": {"overall": 0.78}
+ },
+ AgentRole.SYNTHESIZER: {
+ "synthesis": f"Synthesis of findings on '{task.description}' provides integrated insights.",
+ "key_themes": [{"theme": "Theme 1", "description": "Central finding"}],
+ "final_recommendations": ["Strategic recommendation 1", "Strategic recommendation 2"],
+ "executive_summary": "Comprehensive analysis reveals actionable insights."
+ }
+ }
+
+ return simulation_templates.get(self.role, {"result": "Task completed"})
+
def _calculate_confidence(self, result: Any) -> float:
"""Calculate confidence score for the result."""
- # Basic confidence calculation - can be overridden by subclasses
- if result and isinstance(result, str) and len(result) > 50:
- return min(0.9, 0.5 + len(result) / 1000)
- return 0.5
+ # Enhanced confidence calculation
+ base_confidence = 0.5
+
+ if result and isinstance(result, dict):
+ # Check for quality indicators
+ if 'confidence_metrics' in result:
+ return result['confidence_metrics'].get('overall_confidence', base_confidence)
+
+ # Calculate based on result completeness
+ expected_keys = {'findings', 'analysis', 'evaluation', 'synthesis'}
+ actual_keys = set(result.keys())
+ completeness = len(actual_keys.intersection(expected_keys)) / len(expected_keys)
+
+ # Calculate based on content length
+ content_length = sum(len(str(v)) for v in result.values() if isinstance(v, (str, list)))
+ length_factor = min(1.0, content_length / 500)
+
+ confidence = base_confidence + (completeness * 0.3) + (length_factor * 0.2)
+ return min(0.95, confidence)
+
+ return base_confidence
- async def collaborate(self, other_agent: 'BaseAgent', message: AgentMessage):
+ async def collaborate(self, other_agent: 'BaseAgent', message: AgentMessage) -> AgentMessage:
"""Handle collaboration with another agent."""
self.memory.add_message(message)
+ self.collaboration_partners.add(other_agent.name)
+ self.performance_tracker.record_collaboration()
# Process collaboration request
- response = await self._process_collaboration(message)
+ response_content = await self._process_collaboration(message)
- # Send response back
+ # Create response message
response_message = AgentMessage(
sender=self.name,
recipient=other_agent.name,
- content=response,
- message_type="response"
+ content=response_content,
+ message_type=MessageType.INFORMATION_SHARING,
+ priority=message.priority
)
other_agent.memory.add_message(response_message)
+ self.performance_tracker.record_message()
return response_message
async def _process_collaboration(self, message: AgentMessage) -> str:
"""Process collaboration message - to be implemented by subclasses."""
- return f"Acknowledged message from {message.sender}"
+ return f"{self.name} acknowledges collaboration request from {message.sender}"
+
+ def get_status_summary(self) -> Dict[str, Any]:
+ """Get current status summary of the agent."""
+ return {
+ 'name': self.name,
+ 'role': self.role.value,
+ 'active': self.active,
+ 'current_task': self.current_task.description if self.current_task else None,
+ 'completed_tasks': len(self.completed_tasks),
+ 'average_confidence': np.mean([t.confidence for t in self.completed_tasks]) if self.completed_tasks else 0,
+ 'collaboration_count': len(self.collaboration_partners),
+ 'memory_size': len(self.memory.messages)
+ }
class ResearcherAgent(BaseAgent):
"""Agent specialized in researching and gathering information."""
- def __init__(self, name: str, llm: ChatOpenAI):
+ def __init__(self, name: str, llm: Optional[ChatOpenAI] = None):
super().__init__(name, AgentRole.RESEARCHER, llm)
self.research_sources: List[str] = []
+ self.research_methods = ["literature_review", "data_collection", "expert_consultation", "field_research"]
async def _execute_task(self, task: Task) -> Any:
"""Execute research task."""
@@ -232,68 +465,119 @@ class ResearcherAgent(BaseAgent):
1. Break down complex topics into research questions
2. Identify key information sources and data points
3. Provide detailed, factual information with citations where possible
- 4. Flag areas requiring further investigation"""),
+ 4. Flag areas requiring further investigation
+ 5. Maintain objectivity and consider multiple perspectives"""),
HumanMessage(content=f"Research the following: {task.description}")
])
response = await self.llm.ainvoke(prompt.format_messages())
- # Extract research findings
+ # Extract and structure research findings
research_result = {
"findings": response.content,
"sources": self._extract_sources(response.content),
"key_points": self._extract_key_points(response.content),
- "areas_for_investigation": self._identify_gaps(response.content)
+ "areas_for_investigation": self._identify_gaps(response.content),
+ "research_quality_score": self._assess_research_quality(response.content)
}
+ # Update internal knowledge
+ self.memory.update_context('latest_research', research_result)
+
return research_result
def _extract_sources(self, content: str) -> List[str]:
"""Extract potential sources from research content."""
- # Simplified source extraction
sources = []
+ source_indicators = ['source:', 'reference:', 'based on:', 'according to', 'study:', 'report:']
+
lines = content.split('\n')
for line in lines:
- if any(keyword in line.lower() for keyword in ['source:', 'reference:', 'based on:', 'according to']):
- sources.append(line.strip())
- return sources[:5] # Limit to top 5 sources
+ line_lower = line.lower()
+ for indicator in source_indicators:
+ if indicator in line_lower:
+ sources.append(line.strip())
+ break
+
+ return sources[:10] # Limit to top 10 sources
def _extract_key_points(self, content: str) -> List[str]:
"""Extract key points from research."""
key_points = []
lines = content.split('\n')
+
for line in lines:
- if line.strip() and (line.strip()[0].isdigit() or line.strip().startswith('-')):
- key_points.append(line.strip())
- return key_points[:10] # Limit to top 10 points
+ line = line.strip()
+ # Check for numbered or bulleted points
+ if line and (line[0].isdigit() or line.startswith('-') or line.startswith('•')):
+ key_points.append(line)
+ # Check for key phrases
+ elif any(phrase in line.lower() for phrase in ['key finding:', 'important:', 'notably:']):
+ key_points.append(line)
+
+ return key_points[:15] # Limit to top 15 points
def _identify_gaps(self, content: str) -> List[str]:
"""Identify areas needing more research."""
gaps = []
- keywords = ['unclear', 'requires further', 'need more', 'investigate', 'unknown']
- lines = content.split('\n')
- for line in lines:
- if any(keyword in line.lower() for keyword in keywords):
- gaps.append(line.strip())
+ gap_indicators = ['unclear', 'requires further', 'need more', 'investigate',
+ 'unknown', 'limited data', 'insufficient evidence']
+
+ sentences = content.split('.')
+ for sentence in sentences:
+ sentence_lower = sentence.lower()
+ if any(indicator in sentence_lower for indicator in gap_indicators):
+ gaps.append(sentence.strip() + '.')
+
return gaps[:5]
+
+ def _assess_research_quality(self, content: str) -> float:
+ """Assess the quality of research output."""
+ quality_score = 0.5 # Base score
+
+ # Check for sources
+ if self._extract_sources(content):
+ quality_score += 0.15
+
+ # Check for structured content
+ if self._extract_key_points(content):
+ quality_score += 0.15
+
+ # Check for comprehensive coverage
+ word_count = len(content.split())
+ if word_count > 300:
+ quality_score += 0.1
+
+ # Check for analytical depth
+ analytical_terms = ['analysis', 'evaluation', 'comparison', 'contrast', 'implication']
+ if any(term in content.lower() for term in analytical_terms):
+ quality_score += 0.1
+
+ return min(1.0, quality_score)
class AnalystAgent(BaseAgent):
"""Agent specialized in analyzing data and identifying patterns."""
- def __init__(self, name: str, llm: ChatOpenAI):
+ def __init__(self, name: str, llm: Optional[ChatOpenAI] = None):
super().__init__(name, AgentRole.ANALYST, llm)
- self.analysis_methods: List[str] = ["statistical", "comparative", "trend", "causal"]
+ self.analysis_methods = ["statistical", "comparative", "trend", "causal", "predictive"]
+ self.analysis_frameworks = ["SWOT", "PESTLE", "Porter's Five Forces", "Cost-Benefit"]
async def _execute_task(self, task: Task) -> Any:
"""Execute analysis task."""
+ # Get context from previous research if available
+ context = self._get_relevant_context(task)
+
prompt = ChatPromptTemplate.from_messages([
SystemMessage(content="""You are an Analyst Agent specializing in data analysis and pattern recognition.
Your role is to:
1. Analyze information systematically and objectively
2. Identify patterns, trends, and correlations
3. Provide quantitative insights where possible
- 4. Draw logical conclusions based on evidence"""),
- HumanMessage(content=f"Analyze the following: {task.description}")
+ 4. Draw logical conclusions based on evidence
+ 5. Apply appropriate analytical frameworks
+ 6. Consider multiple analytical perspectives"""),
+ HumanMessage(content=f"Analyze the following: {task.description}\n\nContext: {context}")
])
response = await self.llm.ainvoke(prompt.format_messages())
@@ -304,88 +588,158 @@ class AnalystAgent(BaseAgent):
"patterns": self._identify_patterns(response.content),
"insights": self._extract_insights(response.content),
"recommendations": self._generate_recommendations(response.content),
- "confidence_metrics": self._calculate_analysis_confidence(response.content)
+ "confidence_metrics": self._calculate_analysis_confidence(response.content),
+ "analytical_framework": self._identify_framework_used(response.content)
}
+ # Store analysis in memory
+ self.memory.update_context('latest_analysis', analysis_result)
+
return analysis_result
+ def _get_relevant_context(self, task: Task) -> str:
+ """Get relevant context from memory for the task."""
+ context_items = []
+
+ # Get recent messages related to the task
+ recent_messages = self.memory.get_recent_messages(5)
+ for msg in recent_messages:
+ if task.description.lower() in msg.content.lower():
+ context_items.append(f"Previous finding: {msg.content[:200]}...")
+
+ # Get knowledge base items
+ for key, knowledge in self.memory.knowledge_base.items():
+ if 'finding' in knowledge['type'] or 'insight' in knowledge['type']:
+ context_items.append(f"Known insight: {knowledge['content'][:200]}...")
+
+ return "\n".join(context_items[:3]) # Limit context items
+
def _identify_patterns(self, content: str) -> List[Dict[str, str]]:
"""Identify patterns in the analysis."""
patterns = []
- pattern_keywords = ['pattern', 'trend', 'correlation', 'relationship', 'consistent']
+ pattern_types = {
+ 'trend': ['trend', 'increasing', 'decreasing', 'growth', 'decline'],
+ 'correlation': ['correlation', 'relationship', 'associated', 'linked'],
+ 'cyclical': ['cycle', 'periodic', 'seasonal', 'recurring'],
+ 'anomaly': ['anomaly', 'outlier', 'unusual', 'exceptional']
+ }
- lines = content.split('\n')
- for line in lines:
- if any(keyword in line.lower() for keyword in pattern_keywords):
- patterns.append({
- "description": line.strip(),
- "type": self._classify_pattern(line)
- })
+ sentences = content.split('.')
+ for sentence in sentences:
+ sentence_lower = sentence.lower()
+ for pattern_type, keywords in pattern_types.items():
+ if any(keyword in sentence_lower for keyword in keywords):
+ patterns.append({
+ "description": sentence.strip() + '.',
+ "type": pattern_type,
+ "confidence": 0.8
+ })
+ break
- return patterns[:5]
-
- def _classify_pattern(self, description: str) -> str:
- """Classify the type of pattern."""
- description_lower = description.lower()
- if 'trend' in description_lower:
- return 'trend'
- elif 'correlation' in description_lower:
- return 'correlation'
- elif 'cycle' in description_lower or 'periodic' in description_lower:
- return 'cyclical'
- else:
- return 'general'
+ return patterns[:8]
def _extract_insights(self, content: str) -> List[str]:
"""Extract key insights from analysis."""
insights = []
- insight_keywords = ['shows', 'indicates', 'suggests', 'reveals', 'demonstrates']
+ insight_indicators = ['shows', 'indicates', 'suggests', 'reveals',
+ 'demonstrates', 'implies', 'means that', 'therefore']
sentences = content.split('.')
for sentence in sentences:
- if any(keyword in sentence.lower() for keyword in insight_keywords):
+ sentence_lower = sentence.lower()
+ if any(indicator in sentence_lower for indicator in insight_indicators):
insights.append(sentence.strip() + '.')
- return insights[:7]
+ return insights[:10]
def _generate_recommendations(self, content: str) -> List[str]:
"""Generate recommendations based on analysis."""
recommendations = []
- rec_keywords = ['recommend', 'suggest', 'should', 'consider', 'advise']
+ rec_indicators = ['recommend', 'suggest', 'should', 'consider',
+ 'advise', 'propose', 'it would be beneficial']
sentences = content.split('.')
for sentence in sentences:
- if any(keyword in sentence.lower() for keyword in rec_keywords):
+ sentence_lower = sentence.lower()
+ if any(indicator in sentence_lower for indicator in rec_indicators):
recommendations.append(sentence.strip() + '.')
- return recommendations[:5]
+ # Prioritize recommendations
+ prioritized = []
+ for rec in recommendations:
+ priority = "high" if any(word in rec.lower() for word in ['critical', 'essential', 'must']) else "medium"
+ prioritized.append({"recommendation": rec, "priority": priority})
+
+ return prioritized[:7]
def _calculate_analysis_confidence(self, content: str) -> Dict[str, float]:
"""Calculate confidence metrics for the analysis."""
- word_count = len(content.split())
- evidence_count = content.lower().count('evidence') + content.lower().count('data') + content.lower().count('shows')
- uncertainty_count = content.lower().count('may') + content.lower().count('might') + content.lower().count('possibly')
+ # Count evidence indicators
+ evidence_count = sum(content.lower().count(word) for word in ['evidence', 'data', 'shows', 'proves'])
+ uncertainty_count = sum(content.lower().count(word) for word in ['may', 'might', 'possibly', 'perhaps'])
+
+ # Calculate confidence scores
+ evidence_strength = min(1.0, evidence_count / 10)
+ certainty_level = max(0.0, 1.0 - (uncertainty_count / 10))
- confidence = min(0.95, 0.5 + (evidence_count * 0.1) - (uncertainty_count * 0.05) + (word_count / 1000))
+ # Check for quantitative analysis
+ quantitative_indicators = ['percentage', '%', 'ratio', 'correlation', 'statistical']
+ quantitative_score = 0.7 if any(ind in content.lower() for ind in quantitative_indicators) else 0.5
+
+ overall_confidence = (evidence_strength + certainty_level + quantitative_score) / 3
return {
- "overall_confidence": confidence,
- "evidence_strength": min(1.0, evidence_count / 10),
- "certainty_level": max(0.0, 1.0 - (uncertainty_count / 10))
+ "overall_confidence": overall_confidence,
+ "evidence_strength": evidence_strength,
+ "certainty_level": certainty_level,
+ "quantitative_score": quantitative_score
}
+
+ def _identify_framework_used(self, content: str) -> Optional[str]:
+ """Identify which analytical framework was used."""
+ content_lower = content.lower()
+
+ for framework in self.analysis_frameworks:
+ if framework.lower() in content_lower:
+ return framework
+
+ # Check for implicit framework usage
+ if all(word in content_lower for word in ['strength', 'weakness', 'opportunity', 'threat']):
+ return "SWOT"
+ elif any(word in content_lower for word in ['political', 'economic', 'social', 'technological']):
+ return "PESTLE"
+
+ return None
class CriticAgent(BaseAgent):
"""Agent specialized in critical evaluation and quality assurance."""
- def __init__(self, name: str, llm: ChatOpenAI):
+ def __init__(self, name: str, llm: Optional[ChatOpenAI] = None):
super().__init__(name, AgentRole.CRITIC, llm)
self.evaluation_criteria = [
"accuracy", "completeness", "logic", "evidence",
- "clarity", "relevance", "consistency"
+ "clarity", "relevance", "consistency", "objectivity"
]
+ self.evaluation_rubric = self._create_evaluation_rubric()
+
+ def _create_evaluation_rubric(self) -> Dict[str, Dict[str, float]]:
+ """Create evaluation rubric with weighted criteria."""
+ return {
+ "accuracy": {"weight": 0.20, "score": 0.0},
+ "completeness": {"weight": 0.15, "score": 0.0},
+ "logic": {"weight": 0.15, "score": 0.0},
+ "evidence": {"weight": 0.15, "score": 0.0},
+ "clarity": {"weight": 0.10, "score": 0.0},
+ "relevance": {"weight": 0.10, "score": 0.0},
+ "consistency": {"weight": 0.10, "score": 0.0},
+ "objectivity": {"weight": 0.05, "score": 0.0}
+ }
async def _execute_task(self, task: Task) -> Any:
"""Execute critical evaluation task."""
+ # Get content to evaluate from context
+ evaluation_context = self._gather_evaluation_context(task)
+
prompt = ChatPromptTemplate.from_messages([
SystemMessage(content="""You are a Critic Agent specializing in rigorous evaluation and quality assurance.
Your role is to:
@@ -393,8 +747,10 @@ class CriticAgent(BaseAgent):
2. Identify weaknesses, gaps, and potential biases
3. Verify logical consistency and evidence quality
4. Suggest improvements and alternative perspectives
- 5. Ensure high standards of analysis"""),
- HumanMessage(content=f"Critically evaluate the following: {task.description}")
+ 5. Ensure high standards of analysis
+ 6. Apply systematic evaluation criteria
+ 7. Provide constructive feedback"""),
+ HumanMessage(content=f"Critically evaluate the following: {task.description}\n\nContent to evaluate: {evaluation_context}")
])
response = await self.llm.ainvoke(prompt.format_messages())
@@ -406,84 +762,260 @@ class CriticAgent(BaseAgent):
"weaknesses": self._identify_weaknesses(response.content),
"gaps": self._identify_gaps(response.content),
"improvements": self._suggest_improvements(response.content),
- "quality_score": self._calculate_quality_score(response.content)
+ "quality_score": self._calculate_quality_score(response.content),
+ "alternative_perspectives": self._identify_alternatives(response.content),
+ "final_verdict": self._generate_verdict(response.content)
}
+ # Update evaluation history
+ self.memory.update_context('evaluation_history', critique_result)
+
return critique_result
- def _identify_strengths(self, content: str) -> List[str]:
+ def _gather_evaluation_context(self, task: Task) -> str:
+ """Gather relevant context for evaluation."""
+ context_items = []
+
+ # Get recent analysis and research results
+ recent_messages = self.memory.get_recent_messages(10)
+ for msg in recent_messages:
+ if msg.message_type in [MessageType.COMPLETION_REPORT, MessageType.INFORMATION_SHARING]:
+ context_items.append(f"{msg.sender}: {msg.content[:300]}...")
+
+ # Get knowledge base insights
+ for key, knowledge in self.memory.knowledge_base.items():
+ if knowledge['type'] in ['finding', 'conclusion', 'insight']:
+ context_items.append(f"Previous {knowledge['type']}: {knowledge['content'][:200]}...")
+
+ return "\n\n".join(context_items[:5])
+
+ def _identify_strengths(self, content: str) -> List[Dict[str, str]]:
"""Identify strengths in the evaluated content."""
strengths = []
- strength_keywords = ['strong', 'excellent', 'well', 'good', 'effective', 'solid']
+ strength_indicators = ['strong', 'excellent', 'well', 'good', 'effective',
+ 'solid', 'robust', 'comprehensive', 'thorough']
sentences = content.split('.')
for sentence in sentences:
- if any(keyword in sentence.lower() for keyword in strength_keywords):
- strengths.append(sentence.strip() + '.')
+ sentence_lower = sentence.lower()
+ for indicator in strength_indicators:
+ if indicator in sentence_lower:
+ strengths.append({
+ "strength": sentence.strip() + '.',
+ "category": self._categorize_strength(sentence),
+ "impact": "high" if any(word in sentence_lower for word in ['very', 'extremely', 'highly']) else "medium"
+ })
+ break
+
+ return strengths[:6]
+
+ def _categorize_strength(self, sentence: str) -> str:
+ """Categorize the type of strength identified."""
+ sentence_lower = sentence.lower()
- return strengths[:5]
+ if any(word in sentence_lower for word in ['method', 'approach', 'framework']):
+ return "methodology"
+ elif any(word in sentence_lower for word in ['data', 'evidence', 'support']):
+ return "evidence"
+ elif any(word in sentence_lower for word in ['logic', 'reasoning', 'argument']):
+ return "reasoning"
+ elif any(word in sentence_lower for word in ['clear', 'organized', 'structured']):
+ return "presentation"
+ else:
+ return "general"
- def _identify_weaknesses(self, content: str) -> List[str]:
+ def _identify_weaknesses(self, content: str) -> List[Dict[str, str]]:
"""Identify weaknesses in the evaluated content."""
weaknesses = []
- weakness_keywords = ['weak', 'lack', 'insufficient', 'poor', 'inadequate', 'missing']
+ weakness_indicators = ['weak', 'lack', 'insufficient', 'poor', 'inadequate',
+ 'missing', 'limited', 'unclear', 'vague']
sentences = content.split('.')
for sentence in sentences:
- if any(keyword in sentence.lower() for keyword in weakness_keywords):
- weaknesses.append(sentence.strip() + '.')
+ sentence_lower = sentence.lower()
+ for indicator in weakness_indicators:
+ if indicator in sentence_lower:
+ weaknesses.append({
+ "weakness": sentence.strip() + '.',
+ "severity": self._assess_severity(sentence),
+ "category": self._categorize_weakness(sentence)
+ })
+ break
+
+ return weaknesses[:6]
+
+ def _assess_severity(self, sentence: str) -> str:
+ """Assess the severity of a weakness."""
+ sentence_lower = sentence.lower()
- return weaknesses[:5]
+ if any(word in sentence_lower for word in ['critical', 'severe', 'major', 'significant']):
+ return "high"
+ elif any(word in sentence_lower for word in ['moderate', 'some', 'partial']):
+ return "medium"
+ else:
+ return "low"
+
+ def _categorize_weakness(self, sentence: str) -> str:
+ """Categorize the type of weakness identified."""
+ sentence_lower = sentence.lower()
+
+ if any(word in sentence_lower for word in ['data', 'evidence', 'support']):
+ return "evidence"
+ elif any(word in sentence_lower for word in ['logic', 'reasoning', 'argument']):
+ return "reasoning"
+ elif any(word in sentence_lower for word in ['bias', 'objective', 'neutral']):
+ return "objectivity"
+ elif any(word in sentence_lower for word in ['complete', 'comprehensive', 'thorough']):
+ return "completeness"
+ else:
+ return "general"
def _identify_gaps(self, content: str) -> List[str]:
"""Identify gaps in the analysis."""
gaps = []
- gap_keywords = ['gap', 'missing', 'overlook', 'fail to', 'does not address', 'ignores']
+ gap_indicators = ['gap', 'missing', 'overlook', 'fail to', 'does not address',
+ 'ignores', 'omits', 'neglects']
sentences = content.split('.')
for sentence in sentences:
- if any(keyword in sentence.lower() for keyword in gap_keywords):
+ sentence_lower = sentence.lower()
+ if any(indicator in sentence_lower for indicator in gap_indicators):
gaps.append(sentence.strip() + '.')
return gaps[:5]
- def _suggest_improvements(self, content: str) -> List[str]:
+ def _suggest_improvements(self, content: str) -> List[Dict[str, str]]:
"""Suggest improvements based on critique."""
improvements = []
- improvement_keywords = ['could', 'should', 'improve', 'enhance', 'strengthen', 'add']
+ improvement_indicators = ['could', 'should', 'improve', 'enhance',
+ 'strengthen', 'add', 'consider', 'recommend']
sentences = content.split('.')
for sentence in sentences:
- if any(keyword in sentence.lower() for keyword in improvement_keywords):
- improvements.append(sentence.strip() + '.')
+ sentence_lower = sentence.lower()
+ if any(indicator in sentence_lower for indicator in improvement_indicators):
+ improvements.append({
+ "suggestion": sentence.strip() + '.',
+ "priority": self._prioritize_improvement(sentence),
+ "effort": self._estimate_effort(sentence)
+ })
+
+ return improvements[:7]
+
+ def _prioritize_improvement(self, sentence: str) -> str:
+ """Prioritize improvement suggestions."""
+ sentence_lower = sentence.lower()
- return improvements[:5]
+ if any(word in sentence_lower for word in ['critical', 'essential', 'must', 'urgent']):
+ return "high"
+ elif any(word in sentence_lower for word in ['should', 'important', 'recommend']):
+ return "medium"
+ else:
+ return "low"
+
+ def _estimate_effort(self, sentence: str) -> str:
+ """Estimate effort required for improvement."""
+ sentence_lower = sentence.lower()
+
+ if any(word in sentence_lower for word in ['simple', 'easy', 'quick', 'minor']):
+ return "low"
+ elif any(word in sentence_lower for word in ['moderate', 'some', 'reasonable']):
+ return "medium"
+ elif any(word in sentence_lower for word in ['significant', 'substantial', 'major']):
+ return "high"
+ else:
+ return "medium"
def _calculate_quality_score(self, content: str) -> Dict[str, float]:
- """Calculate quality scores for different criteria."""
- scores = {}
+ """Calculate detailed quality scores."""
+ scores = self.evaluation_rubric.copy()
+ content_lower = content.lower()
+ # Score each criterion based on content analysis
for criterion in self.evaluation_criteria:
- # Simplified scoring based on keyword presence
- positive_count = content.lower().count(criterion) + content.lower().count('good') + content.lower().count('strong')
- negative_count = content.lower().count('poor') + content.lower().count('weak') + content.lower().count('lacking')
+ score = 0.5 # Base score
+
+ # Positive indicators
+ if criterion in content_lower and any(word in content_lower for word in ['good', 'strong', 'excellent']):
+ score += 0.3
+
+ # Negative indicators
+ if criterion in content_lower and any(word in content_lower for word in ['poor', 'weak', 'lacking']):
+ score -= 0.3
- score = min(1.0, max(0.0, 0.5 + (positive_count * 0.1) - (negative_count * 0.15)))
- scores[criterion] = score
+ scores[criterion]["score"] = max(0.0, min(1.0, score))
+
+ # Calculate overall score
+ overall = sum(scores[c]["score"] * scores[c]["weight"] for c in scores)
+
+ return {
+ "overall": overall,
+ "breakdown": {c: scores[c]["score"] for c in scores},
+ "grade": self._convert_to_grade(overall)
+ }
+
+ def _convert_to_grade(self, score: float) -> str:
+ """Convert numeric score to letter grade."""
+ if score >= 0.9:
+ return "A"
+ elif score >= 0.8:
+ return "B"
+ elif score >= 0.7:
+ return "C"
+ elif score >= 0.6:
+ return "D"
+ else:
+ return "F"
+
+ def _identify_alternatives(self, content: str) -> List[str]:
+ """Identify alternative perspectives mentioned."""
+ alternatives = []
+ alternative_indicators = ['alternatively', 'another perspective', 'different approach',
+ 'could also', 'different view', 'alternative']
- scores['overall'] = sum(scores.values()) / len(scores)
+ sentences = content.split('.')
+ for sentence in sentences:
+ sentence_lower = sentence.lower()
+ if any(indicator in sentence_lower for indicator in alternative_indicators):
+ alternatives.append(sentence.strip() + '.')
+
+ return alternatives[:4]
+
+ def _generate_verdict(self, content: str) -> Dict[str, str]:
+ """Generate final verdict based on evaluation."""
+ # Simple verdict generation based on content sentiment
+ positive_count = sum(content.lower().count(word) for word in ['good', 'strong', 'excellent', 'effective'])
+ negative_count = sum(content.lower().count(word) for word in ['poor', 'weak', 'lacking', 'insufficient'])
+
+ if positive_count > negative_count * 2:
+ verdict = "Approved with minor revisions"
+ confidence = "high"
+ elif positive_count > negative_count:
+ verdict = "Approved with moderate revisions"
+ confidence = "medium"
+ else:
+ verdict = "Requires significant improvements"
+ confidence = "medium"
- return scores
+ return {
+ "verdict": verdict,
+ "confidence": confidence,
+ "summary": "Based on comprehensive evaluation across multiple criteria."
+ }
class SynthesizerAgent(BaseAgent):
"""Agent specialized in synthesizing information and creating coherent narratives."""
- def __init__(self, name: str, llm: ChatOpenAI):
+ def __init__(self, name: str, llm: Optional[ChatOpenAI] = None):
super().__init__(name, AgentRole.SYNTHESIZER, llm)
- self.synthesis_strategies = ["integrate", "summarize", "reconcile", "consolidate"]
+ self.synthesis_strategies = ["integrate", "summarize", "reconcile", "consolidate", "harmonize"]
+ self.output_formats = ["executive_summary", "detailed_report", "action_plan", "strategic_recommendation"]
async def _execute_task(self, task: Task) -> Any:
"""Execute synthesis task."""
+ # Gather all relevant information from previous agents
+ synthesis_input = self._gather_synthesis_input(task)
+
prompt = ChatPromptTemplate.from_messages([
SystemMessage(content="""You are a Synthesizer Agent specializing in integrating diverse information.
Your role is to:
@@ -491,8 +1023,10 @@ class SynthesizerAgent(BaseAgent):
2. Resolve contradictions and find common ground
3. Create comprehensive summaries that capture key insights
4. Generate actionable conclusions and recommendations
- 5. Ensure clarity and accessibility of complex information"""),
- HumanMessage(content=f"Synthesize the following information: {task.description}")
+ 5. Ensure clarity and accessibility of complex information
+ 6. Prioritize information based on relevance and impact
+ 7. Create structured outputs suitable for decision-making"""),
+ HumanMessage(content=f"Synthesize the following information: {task.description}\n\nInput data: {synthesis_input}")
])
response = await self.llm.ainvoke(prompt.format_messages())
@@ -504,116 +1038,452 @@ class SynthesizerAgent(BaseAgent):
"consensus_points": self._identify_consensus(response.content),
"contradictions": self._identify_contradictions(response.content),
"final_recommendations": self._generate_final_recommendations(response.content),
- "executive_summary": self._create_executive_summary(response.content)
+ "executive_summary": self._create_executive_summary(response.content),
+ "action_items": self._extract_action_items(response.content),
+ "confidence_level": self._assess_synthesis_confidence(response.content)
}
+ # Store synthesis for future reference
+ self.memory.update_context('latest_synthesis', synthesis_result)
+
return synthesis_result
- def _extract_themes(self, content: str) -> List[Dict[str, str]]:
+ def _gather_synthesis_input(self, task: Task) -> str:
+ """Gather all relevant information for synthesis."""
+ input_sections = []
+
+ # Get findings from all agents
+ agent_findings = {}
+ for msg in self.memory.get_recent_messages(20):
+ if msg.sender not in agent_findings:
+ agent_findings[msg.sender] = []
+ agent_findings[msg.sender].append(msg.content[:500])
+
+ # Structure input by agent type
+ for agent, findings in agent_findings.items():
+ if findings:
+ input_sections.append(f"\n{agent} Contributions:\n" + "\n".join(findings[:3]))
+
+ # Add knowledge base insights
+ knowledge_items = []
+ for key, knowledge in self.memory.knowledge_base.items():
+ knowledge_items.append(f"{knowledge['type'].title()}: {knowledge['content'][:200]}...")
+
+ if knowledge_items:
+ input_sections.append("\nKnowledge Base:\n" + "\n".join(knowledge_items[:5]))
+
+ return "\n".join(input_sections)
+
+ def _extract_themes(self, content: str) -> List[Dict[str, Any]]:
"""Extract major themes from synthesis."""
themes = []
- theme_keywords = ['theme', 'pattern', 'trend', 'common', 'recurring', 'central']
+ theme_indicators = ['theme', 'pattern', 'trend', 'common', 'recurring',
+ 'central', 'key finding', 'main point']
+ # Split into paragraphs and analyze
paragraphs = content.split('\n\n')
- for i, paragraph in enumerate(paragraphs[:5]): # Limit to first 5 paragraphs
- if any(keyword in paragraph.lower() for keyword in theme_keywords):
+ theme_count = 0
+
+ for paragraph in paragraphs:
+ paragraph_lower = paragraph.lower()
+ if any(indicator in paragraph_lower for indicator in theme_indicators):
+ theme_count += 1
themes.append({
- "theme": f"Theme {i+1}",
- "description": paragraph.strip()[:200] + "..." if len(paragraph) > 200 else paragraph.strip()
+ "theme": f"Theme {theme_count}",
+ "description": paragraph.strip()[:300] + "..." if len(paragraph) > 300 else paragraph.strip(),
+ "importance": self._assess_theme_importance(paragraph),
+ "support_level": self._assess_support_level(paragraph)
})
- return themes
+ # If no explicit themes found, extract from content structure
+ if not themes and paragraphs:
+ for i, paragraph in enumerate(paragraphs[:5]):
+ if len(paragraph.strip()) > 50:
+ themes.append({
+ "theme": f"Finding {i+1}",
+ "description": paragraph.strip()[:300] + "..." if len(paragraph) > 300 else paragraph.strip(),
+ "importance": "medium",
+ "support_level": "moderate"
+ })
+
+ return themes[:6]
+
+ def _assess_theme_importance(self, content: str) -> str:
+ """Assess the importance of a theme."""
+ content_lower = content.lower()
+
+ high_importance_indicators = ['critical', 'essential', 'fundamental', 'crucial', 'vital']
+ if any(indicator in content_lower for indicator in high_importance_indicators):
+ return "high"
+
+ low_importance_indicators = ['minor', 'secondary', 'marginal', 'peripheral']
+ if any(indicator in content_lower for indicator in low_importance_indicators):
+ return "low"
+
+ return "medium"
+
+ def _assess_support_level(self, content: str) -> str:
+ """Assess the level of support for a theme."""
+ content_lower = content.lower()
+
+ strong_support = ['consensus', 'unanimous', 'clear evidence', 'strongly supported']
+ if any(indicator in content_lower for indicator in strong_support):
+ return "strong"
+
+ weak_support = ['limited evidence', 'some indication', 'preliminary', 'tentative']
+ if any(indicator in content_lower for indicator in weak_support):
+ return "weak"
+
+ return "moderate"
- def _identify_consensus(self, content: str) -> List[str]:
+ def _identify_consensus(self, content: str) -> List[Dict[str, str]]:
"""Identify points of consensus."""
- consensus = []
- consensus_keywords = ['agree', 'consensus', 'common', 'shared', 'unanimous', 'consistent']
+ consensus_points = []
+ consensus_indicators = ['agree', 'consensus', 'common', 'shared', 'unanimous',
+ 'consistent', 'alignment', 'convergence']
sentences = content.split('.')
for sentence in sentences:
- if any(keyword in sentence.lower() for keyword in consensus_keywords):
- consensus.append(sentence.strip() + '.')
+ sentence_lower = sentence.lower()
+ if any(indicator in sentence_lower for indicator in consensus_indicators):
+ consensus_points.append({
+ "point": sentence.strip() + '.',
+ "strength": "strong" if "unanimous" in sentence_lower or "clear consensus" in sentence_lower else "moderate"
+ })
- return consensus[:5]
+ return consensus_points[:6]
- def _identify_contradictions(self, content: str) -> List[str]:
+ def _identify_contradictions(self, content: str) -> List[Dict[str, str]]:
"""Identify contradictions or conflicts."""
contradictions = []
- conflict_keywords = ['however', 'contrary', 'conflict', 'disagree', 'opposing', 'contradicts']
+ conflict_indicators = ['however', 'contrary', 'conflict', 'disagree', 'opposing',
+ 'contradicts', 'tension', 'divergent', 'inconsistent']
sentences = content.split('.')
for sentence in sentences:
- if any(keyword in sentence.lower() for keyword in conflict_keywords):
- contradictions.append(sentence.strip() + '.')
+ sentence_lower = sentence.lower()
+ if any(indicator in sentence_lower for indicator in conflict_indicators):
+ contradictions.append({
+ "contradiction": sentence.strip() + '.',
+ "resolution_suggested": self._check_for_resolution(sentence),
+ "impact": self._assess_contradiction_impact(sentence)
+ })
- return contradictions[:3]
+ return contradictions[:4]
+
+ def _check_for_resolution(self, sentence: str) -> bool:
+ """Check if a resolution is suggested for the contradiction."""
+ resolution_indicators = ['can be resolved', 'reconcile', 'bridge', 'common ground', 'compromise']
+ return any(indicator in sentence.lower() for indicator in resolution_indicators)
+
+ def _assess_contradiction_impact(self, sentence: str) -> str:
+ """Assess the impact of a contradiction."""
+ sentence_lower = sentence.lower()
+
+ if any(word in sentence_lower for word in ['fundamental', 'major', 'significant']):
+ return "high"
+ elif any(word in sentence_lower for word in ['minor', 'small', 'slight']):
+ return "low"
+ else:
+ return "medium"
- def _generate_final_recommendations(self, content: str) -> List[str]:
+ def _generate_final_recommendations(self, content: str) -> List[Dict[str, Any]]:
"""Generate final synthesized recommendations."""
recommendations = []
- # Look for recommendation sections
- lines = content.split('\n')
- in_recommendations = False
+ # Extract recommendation sentences
+ rec_indicators = ['recommend', 'suggest', 'propose', 'advise', 'should', 'must']
+ sentences = content.split('.')
- for line in lines:
- if 'recommend' in line.lower() or 'conclusion' in line.lower():
- in_recommendations = True
- elif in_recommendations and line.strip():
- recommendations.append(line.strip())
- if len(recommendations) >= 5:
- break
+ for sentence in sentences:
+ sentence_lower = sentence.lower()
+ if any(indicator in sentence_lower for indicator in rec_indicators):
+ recommendations.append({
+ "recommendation": sentence.strip() + '.',
+ "priority": self._determine_priority(sentence),
+ "timeframe": self._determine_timeframe(sentence),
+ "category": self._categorize_recommendation(sentence)
+ })
+
+ # Sort by priority
+ priority_order = {"high": 0, "medium": 1, "low": 2}
+ recommendations.sort(key=lambda x: priority_order.get(x["priority"], 3))
+
+ return recommendations[:8]
+
+ def _determine_priority(self, sentence: str) -> str:
+ """Determine recommendation priority."""
+ sentence_lower = sentence.lower()
- return recommendations
+ if any(word in sentence_lower for word in ['urgent', 'immediate', 'critical', 'must']):
+ return "high"
+ elif any(word in sentence_lower for word in ['should', 'important', 'recommend']):
+ return "medium"
+ else:
+ return "low"
+
+ def _determine_timeframe(self, sentence: str) -> str:
+ """Determine recommendation timeframe."""
+ sentence_lower = sentence.lower()
+
+ if any(word in sentence_lower for word in ['immediate', 'now', 'urgent', 'asap']):
+ return "immediate"
+ elif any(word in sentence_lower for word in ['short-term', 'soon', 'near']):
+ return "short-term"
+ elif any(word in sentence_lower for word in ['long-term', 'future', 'eventually']):
+ return "long-term"
+ else:
+ return "medium-term"
+
+ def _categorize_recommendation(self, sentence: str) -> str:
+ """Categorize the type of recommendation."""
+ sentence_lower = sentence.lower()
+
+ if any(word in sentence_lower for word in ['strategy', 'strategic', 'plan']):
+ return "strategic"
+ elif any(word in sentence_lower for word in ['operational', 'process', 'procedure']):
+ return "operational"
+ elif any(word in sentence_lower for word in ['tactical', 'action', 'implement']):
+ return "tactical"
+ else:
+ return "general"
def _create_executive_summary(self, content: str) -> str:
"""Create an executive summary of the synthesis."""
- # Take first paragraph or create summary
+ # Extract key sentences for summary
+ summary_parts = []
+
+ # Get opening statement
paragraphs = content.split('\n\n')
if paragraphs:
- summary = paragraphs[0][:300]
- if len(paragraphs[0]) > 300:
- summary += "..."
- return summary
- return "Summary generation in progress..."
+ opening = paragraphs[0][:200]
+ if len(paragraphs[0]) > 200:
+ opening += "..."
+ summary_parts.append(opening)
+
+ # Extract key findings
+ key_finding_indicators = ['key finding', 'main conclusion', 'importantly', 'notably']
+ for paragraph in paragraphs[1:]:
+ if any(indicator in paragraph.lower() for indicator in key_finding_indicators):
+ summary_parts.append(paragraph[:150] + "..." if len(paragraph) > 150 else paragraph)
+ if len(summary_parts) >= 3:
+ break
+
+ # Add conclusion if present
+ if len(paragraphs) > 1:
+ conclusion = paragraphs[-1][:150]
+ if conclusion not in summary_parts:
+ summary_parts.append(conclusion + "..." if len(paragraphs[-1]) > 150 else conclusion)
+
+ return " ".join(summary_parts)
+
+ def _extract_action_items(self, content: str) -> List[Dict[str, str]]:
+ """Extract specific action items from synthesis."""
+ action_items = []
+ action_indicators = ['action:', 'task:', 'todo:', 'action item', 'next step', 'to do']
+
+ lines = content.split('\n')
+ for line in lines:
+ line_lower = line.lower()
+ if any(indicator in line_lower for indicator in action_indicators):
+ action_items.append({
+ "action": line.strip(),
+ "owner": "TBD",
+ "deadline": "TBD",
+ "status": "pending"
+ })
+ # Also check for numbered action items
+ elif line.strip() and line.strip()[0].isdigit() and 'action' in line_lower:
+ action_items.append({
+ "action": line.strip(),
+ "owner": "TBD",
+ "deadline": "TBD",
+ "status": "pending"
+ })
+
+ return action_items[:10]
+
+ def _assess_synthesis_confidence(self, content: str) -> Dict[str, Any]:
+ """Assess confidence in the synthesis."""
+ # Calculate various confidence indicators
+ word_count = len(content.split())
+
+ # Check for confidence language
+ high_confidence_words = ['clear', 'strong', 'definitive', 'conclusive', 'certain']
+ low_confidence_words = ['uncertain', 'unclear', 'tentative', 'preliminary', 'limited']
+
+ high_conf_count = sum(content.lower().count(word) for word in high_confidence_words)
+ low_conf_count = sum(content.lower().count(word) for word in low_confidence_words)
+
+ # Calculate confidence score
+ base_confidence = 0.7
+ confidence_adjustment = (high_conf_count * 0.05) - (low_conf_count * 0.08)
+ overall_confidence = max(0.3, min(0.95, base_confidence + confidence_adjustment))
+
+ return {
+ "overall": overall_confidence,
+ "level": "high" if overall_confidence > 0.8 else "medium" if overall_confidence > 0.6 else "low",
+ "factors": {
+ "content_depth": word_count > 500,
+ "evidence_strength": high_conf_count > low_conf_count,
+ "consensus_level": "consensus" in content.lower()
+ }
+ }
class CoordinatorAgent(BaseAgent):
"""Agent responsible for coordinating other agents and managing workflow."""
- def __init__(self, name: str, llm: ChatOpenAI):
+ def __init__(self, name: str, llm: Optional[ChatOpenAI] = None):
super().__init__(name, AgentRole.COORDINATOR, llm)
self.agents: Dict[str, BaseAgent] = {}
self.task_queue: List[Task] = []
self.completed_tasks: List[Task] = []
self.workflow_graph = nx.DiGraph()
self.execution_history: List[Dict[str, Any]] = []
+ self.workflow_templates = self._create_workflow_templates()
+
+ def _create_workflow_templates(self) -> Dict[str, List[Dict[str, Any]]]:
+ """Create predefined workflow templates for common problem types."""
+ return {
+ "research_analysis": [
+ {"role": "Researcher", "task": "Gather comprehensive information"},
+ {"role": "Analyst", "task": "Analyze findings and identify patterns"},
+ {"role": "Critic", "task": "Evaluate analysis quality"},
+ {"role": "Synthesizer", "task": "Create final recommendations"}
+ ],
+ "strategic_planning": [
+ {"role": "Researcher", "task": "Research current state and trends"},
+ {"role": "Analyst", "task": "SWOT analysis and opportunity identification"},
+ {"role": "Researcher", "task": "Benchmark best practices"},
+ {"role": "Critic", "task": "Risk assessment and gap analysis"},
+ {"role": "Synthesizer", "task": "Strategic plan synthesis"}
+ ],
+ "problem_solving": [
+ {"role": "Researcher", "task": "Define problem and gather context"},
+ {"role": "Analyst", "task": "Root cause analysis"},
+ {"role": "Researcher", "task": "Research potential solutions"},
+ {"role": "Critic", "task": "Evaluate solution feasibility"},
+ {"role": "Synthesizer", "task": "Recommend optimal solution"}
+ ]
+ }
def register_agent(self, agent: BaseAgent):
"""Register an agent with the coordinator."""
self.agents[agent.name] = agent
self.workflow_graph.add_node(agent.name, role=agent.role.value)
+ logger.info(f"Registered agent: {agent.name} with role {agent.role.value}")
- async def decompose_problem(self, problem: str) -> List[Task]:
+ async def decompose_problem(self, problem: str, use_template: bool = False) -> List[Task]:
"""Decompose a complex problem into subtasks."""
- prompt = ChatPromptTemplate.from_messages([
- SystemMessage(content="""You are a Coordinator Agent responsible for breaking down complex problems.
- Decompose the problem into specific subtasks that can be assigned to specialized agents:
- - Researcher: For gathering information and facts
- - Analyst: For analyzing data and identifying patterns
- - Critic: For evaluating quality and identifying issues
- - Synthesizer: For combining insights and creating summaries
+ if use_template:
+ # Try to match problem to a template
+ template_tasks = self._match_problem_to_template(problem)
+ if template_tasks:
+ return template_tasks
+
+ if self.llm:
+ # Use LLM to decompose problem
+ prompt = ChatPromptTemplate.from_messages([
+ SystemMessage(content="""You are a Coordinator Agent responsible for breaking down complex problems.
+ Decompose the problem into specific subtasks that can be assigned to specialized agents:
+ - Researcher: For gathering information and facts
+ - Analyst: For analyzing data and identifying patterns
+ - Critic: For evaluating quality and identifying issues
+ - Synthesizer: For combining insights and creating summaries
+
+ Create 4-8 clear, actionable subtasks with dependencies.
+ Format each task as: [Role]: [Specific task description]"""),
+ HumanMessage(content=f"Decompose this problem into subtasks: {problem}")
+ ])
- Create clear, actionable subtasks with dependencies."""),
- HumanMessage(content=f"Decompose this problem into subtasks: {problem}")
- ])
+ response = await self.llm.ainvoke(prompt.format_messages())
+ tasks = self._parse_tasks(response.content, problem)
+ else:
+ # Demo mode - use template or default decomposition
+ tasks = self._create_default_tasks(problem)
- response = await self.llm.ainvoke(prompt.format_messages())
+ # Enhance tasks with metadata
+ for i, task in enumerate(tasks):
+ task.metadata['problem_complexity'] = self._assess_problem_complexity(problem)
+ task.metadata['estimated_duration'] = self._estimate_task_duration(task)
+
+ return tasks
+
+ def _match_problem_to_template(self, problem: str) -> Optional[List[Task]]:
+ """Match problem to a workflow template."""
+ problem_lower = problem.lower()
+
+ # Check for template matches
+ if any(word in problem_lower for word in ['strategy', 'strategic', 'plan']):
+ template_name = "strategic_planning"
+ elif any(word in problem_lower for word in ['research', 'analyze', 'investigate']):
+ template_name = "research_analysis"
+ elif any(word in problem_lower for word in ['problem', 'solve', 'solution', 'fix']):
+ template_name = "problem_solving"
+ else:
+ return None
+
+ # Create tasks from template
+ template = self.workflow_templates[template_name]
+ tasks = []
- # Parse response into tasks
- tasks = self._parse_tasks(response.content, problem)
+ for i, step in enumerate(template):
+ task = Task(
+ id=f"task_{i+1}",
+ description=f"{step['task']} for: {problem}",
+ metadata={
+ "original_problem": problem,
+ "suggested_role": step['role'],
+ "template": template_name
+ }
+ )
+ tasks.append(task)
return tasks
+ def _assess_problem_complexity(self, problem: str) -> str:
+ """Assess the complexity of a problem."""
+ # Simple heuristic based on problem characteristics
+ complexity_indicators = {
+ "high": ['multiple', 'complex', 'comprehensive', 'strategic', 'long-term'],
+ "medium": ['analyze', 'evaluate', 'develop', 'assess'],
+ "low": ['simple', 'basic', 'straightforward', 'identify']
+ }
+
+ problem_lower = problem.lower()
+
+ for level, indicators in complexity_indicators.items():
+ if any(indicator in problem_lower for indicator in indicators):
+ return level
+
+ # Default based on length
+ return "high" if len(problem) > 200 else "medium"
+
+ def _estimate_task_duration(self, task: Task) -> float:
+ """Estimate task duration in seconds."""
+ # Base estimation on task characteristics
+ base_duration = 30.0
+
+ # Adjust based on role
+ role_multipliers = {
+ "Researcher": 1.2,
+ "Analyst": 1.5,
+ "Critic": 1.0,
+ "Synthesizer": 1.3
+ }
+
+ role = task.metadata.get("suggested_role", "Researcher")
+ duration = base_duration * role_multipliers.get(role, 1.0)
+
+ # Adjust based on complexity
+ complexity = task.metadata.get("problem_complexity", "medium")
+ if complexity == "high":
+ duration *= 1.5
+ elif complexity == "low":
+ duration *= 0.7
+
+ return duration
+
def _parse_tasks(self, content: str, original_problem: str) -> List[Task]:
"""Parse LLM response into Task objects."""
tasks = []
@@ -626,60 +1496,116 @@ class CoordinatorAgent(BaseAgent):
line = line.strip()
if not line:
continue
-
- # Check if line indicates a role
+
+ # Check for role indicators
+ role_found = False
for role in AgentRole:
- if role.value in line:
+ if role.value in line or role.value.lower() in line.lower():
current_role = role.value
+ role_found = True
break
- # If line starts with number or dash, it's likely a task
- if (line[0].isdigit() or line.startswith('-')) and current_role:
- # Extract task description
- task_desc = line.lstrip('0123456789.-').strip()
-
- task = Task(
- id=f"task_{task_id}",
- description=task_desc,
- metadata={
- "original_problem": original_problem,
- "suggested_role": current_role
- }
- )
-
- tasks.append(task)
- task_id += 1
+ # Extract task if we have a role
+ if current_role and ':' in line:
+ # Extract task description after role mention
+ task_parts = line.split(':', 1)
+ if len(task_parts) > 1:
+ task_desc = task_parts[1].strip()
+
+ task = Task(
+ id=f"task_{task_id}",
+ description=task_desc,
+ metadata={
+ "original_problem": original_problem,
+ "suggested_role": current_role,
+ "source": "llm_decomposition"
+ }
+ )
+
+ tasks.append(task)
+ task_id += 1
- # If no tasks were parsed, create default tasks
- if not tasks:
- tasks = [
- Task(id="task_1", description=f"Research background information on: {original_problem}",
- metadata={"suggested_role": "Researcher"}),
- Task(id="task_2", description=f"Analyze key aspects of: {original_problem}",
- metadata={"suggested_role": "Analyst"}),
- Task(id="task_3", description="Critically evaluate the research and analysis",
- metadata={"suggested_role": "Critic"}),
- Task(id="task_4", description="Synthesize all findings into actionable insights",
- metadata={"suggested_role": "Synthesizer"})
- ]
+ # Ensure we have at least some tasks
+ if len(tasks) < 4:
+ tasks.extend(self._create_default_tasks(original_problem)[len(tasks):])
return tasks
+ def _create_default_tasks(self, problem: str) -> List[Task]:
+ """Create default tasks for a problem."""
+ return [
+ Task(
+ id="task_1",
+ description=f"Research comprehensive background information on: {problem}",
+ metadata={"suggested_role": "Researcher", "source": "default"}
+ ),
+ Task(
+ id="task_2",
+ description=f"Analyze key factors and patterns related to: {problem}",
+ metadata={"suggested_role": "Analyst", "source": "default"}
+ ),
+ Task(
+ id="task_3",
+ description="Critically evaluate the research findings and analysis quality",
+ metadata={"suggested_role": "Critic", "source": "default"}
+ ),
+ Task(
+ id="task_4",
+ description="Synthesize all findings into actionable insights and recommendations",
+ metadata={"suggested_role": "Synthesizer", "source": "default"}
+ )
+ ]
+
+ def _build_dependency_graph(self, tasks: List[Task]):
+ """Build a dependency graph for tasks."""
+ # Define role execution order
+ role_order = {
+ "Researcher": 1,
+ "Analyst": 2,
+ "Critic": 3,
+ "Synthesizer": 4
+ }
+
+ # Sort tasks by role order
+ sorted_tasks = sorted(tasks,
+ key=lambda t: role_order.get(t.metadata.get("suggested_role", "Researcher"), 5))
+
+ # Create dependencies based on order
+ for i in range(len(sorted_tasks) - 1):
+ current_task = sorted_tasks[i]
+ next_task = sorted_tasks[i + 1]
+
+ # Only add dependency if next task has higher order role
+ current_order = role_order.get(current_task.metadata.get("suggested_role"), 0)
+ next_order = role_order.get(next_task.metadata.get("suggested_role"), 0)
+
+ if next_order >= current_order:
+ next_task.dependencies.append(current_task.id)
+
async def execute_workflow(self, tasks: List[Task], parallel: bool = True) -> Dict[str, Any]:
"""Execute the workflow with given tasks."""
start_time = datetime.now()
+ self.performance_tracker.start_tracking()
# Build task dependency graph
self._build_dependency_graph(tasks)
+ # Update workflow graph
+ self._update_workflow_graph(tasks)
+
# Execute tasks
- if parallel:
- results = await self._execute_parallel(tasks)
- else:
- results = await self._execute_sequential(tasks)
+ try:
+ if parallel:
+ await self._execute_parallel(tasks)
+ else:
+ await self._execute_sequential(tasks)
+ except Exception as e:
+ logger.error(f"Workflow execution error: {str(e)}")
# Compile final results
end_time = datetime.now()
+ self.performance_tracker.end_tracking()
+
execution_time = (end_time - start_time).total_seconds()
workflow_result = {
@@ -688,6 +1614,7 @@ class CoordinatorAgent(BaseAgent):
"success_rate": self._calculate_success_rate(tasks),
"agent_contributions": self._compile_agent_contributions(tasks),
"workflow_graph": self.workflow_graph,
+ "performance_metrics": self.performance_tracker.get_performance_summary(),
"timestamp": datetime.now()
}
@@ -695,68 +1622,73 @@ class CoordinatorAgent(BaseAgent):
return workflow_result
- def _build_dependency_graph(self, tasks: List[Task]):
- """Build a dependency graph for tasks."""
- # For simplicity, create a linear dependency chain based on suggested roles
- role_order = ["Researcher", "Analyst", "Critic", "Synthesizer"]
+ def _update_workflow_graph(self, tasks: List[Task]):
+ """Update the workflow graph with task relationships."""
+ # Add task nodes
+ for task in tasks:
+ self.workflow_graph.add_node(
+ task.id,
+ task_description=task.description[:50] + "...",
+ status=task.status.value
+ )
- # Group tasks by role
- tasks_by_role = {}
+ # Add edges for dependencies
for task in tasks:
- role = task.metadata.get("suggested_role", "Researcher")
- if role not in tasks_by_role:
- tasks_by_role[role] = []
- tasks_by_role[role].append(task)
-
- # Create dependencies
- for i in range(len(role_order) - 1):
- current_role = role_order[i]
- next_role = role_order[i + 1]
-
- if current_role in tasks_by_role and next_role in tasks_by_role:
- for current_task in tasks_by_role[current_role]:
- for next_task in tasks_by_role[next_role]:
- next_task.dependencies.append(current_task.id)
+ for dep_id in task.dependencies:
+ self.workflow_graph.add_edge(dep_id, task.id)
async def _execute_parallel(self, tasks: List[Task]) -> List[Task]:
"""Execute tasks in parallel where possible."""
completed = set()
pending = tasks.copy()
- while pending:
- # Find tasks that can be executed (no pending dependencies)
- ready_tasks = [
- task for task in pending
- if all(dep in completed for dep in task.dependencies)
- ]
-
- if not ready_tasks:
- # Deadlock prevention - execute first pending task
- ready_tasks = [pending[0]]
-
- # Execute ready tasks in parallel
- task_futures = []
- for task in ready_tasks:
- agent_name = self._select_agent_for_task(task)
- if agent_name and agent_name in self.agents:
- agent = self.agents[agent_name]
- task_futures.append(agent.process_task(task))
-
- # Update workflow graph
- self.workflow_graph.add_edge(
- self.name, agent_name,
- task_id=task.id,
- timestamp=datetime.now()
- )
-
- # Wait for tasks to complete
- if task_futures:
- completed_tasks = await asyncio.gather(*task_futures)
+ with ThreadPoolExecutor(max_workers=len(self.agents)) as executor:
+ while pending:
+ # Find tasks ready for execution
+ ready_tasks = [
+ task for task in pending
+ if all(dep in completed for dep in task.dependencies)
+ ]
+
+ if not ready_tasks:
+ # Handle potential deadlock
+ logger.warning("No ready tasks found, executing first pending task")
+ ready_tasks = [pending[0]] if pending else []
- for task in completed_tasks:
- completed.add(task.id)
- pending.remove(task)
- self.completed_tasks.append(task)
+ if not ready_tasks:
+ break
+
+ # Submit tasks for parallel execution
+ future_to_task = {}
+ for task in ready_tasks:
+ agent_name = self._select_agent_for_task(task)
+ if agent_name and agent_name in self.agents:
+ agent = self.agents[agent_name]
+ future = executor.submit(asyncio.run, agent.process_task(task))
+ future_to_task[future] = (task, agent_name)
+
+ # Wait for tasks to complete
+ for future in as_completed(future_to_task):
+ task, agent_name = future_to_task[future]
+ try:
+ completed_task = future.result()
+ completed.add(task.id)
+ pending.remove(task)
+ self.completed_tasks.append(completed_task)
+
+ # Update workflow graph
+ self.workflow_graph.add_edge(
+ self.name, agent_name,
+ task_id=task.id,
+ timestamp=datetime.now().isoformat()
+ )
+
+ # Record collaboration
+ await self._facilitate_collaboration(completed_task, agent_name)
+
+ except Exception as e:
+ logger.error(f"Task {task.id} failed: {str(e)}")
+ task.status = TaskStatus.FAILED
return tasks
@@ -766,16 +1698,19 @@ class CoordinatorAgent(BaseAgent):
agent_name = self._select_agent_for_task(task)
if agent_name and agent_name in self.agents:
agent = self.agents[agent_name]
- await agent.process_task(task)
+ completed_task = await agent.process_task(task)
# Update workflow graph
self.workflow_graph.add_edge(
self.name, agent_name,
task_id=task.id,
- timestamp=datetime.now()
+ timestamp=datetime.now().isoformat()
)
- self.completed_tasks.append(task)
+ self.completed_tasks.append(completed_task)
+
+ # Facilitate collaboration
+ await self._facilitate_collaboration(completed_task, agent_name)
return tasks
@@ -786,13 +1721,62 @@ class CoordinatorAgent(BaseAgent):
# Find agent with matching role
for agent_name, agent in self.agents.items():
if agent.role.value == suggested_role:
+ # Check agent availability
+ if agent.active and agent.current_task is None:
+ return agent_name
+
+ # Fallback: find any available agent with the role
+ for agent_name, agent in self.agents.items():
+ if agent.role.value == suggested_role:
+ return agent_name
+
+ # Last resort: return any available agent
+ for agent_name, agent in self.agents.items():
+ if agent.active:
return agent_name
- # Fallback to first available agent
- return list(self.agents.keys())[0] if self.agents else None
+ return None
- def _calculate_success_rate(self, tasks: List[Task]) -> float:
- """Calculate the success rate of task execution."""
+ async def _facilitate_collaboration(self, task: Task, agent_name: str):
+ """Facilitate collaboration between agents after task completion."""
+ if not task.result or task.status != TaskStatus.COMPLETED:
+ return
+
+ # Create collaboration message
+ collab_message = AgentMessage(
+ sender=agent_name,
+ recipient="all",
+ content=f"Task completed: {task.description}\nKey findings: {str(task.result)[:500]}",
+ message_type=MessageType.COMPLETION_REPORT,
+ priority=3
+ )
+
+ # Share with relevant agents
+ for other_agent_name, other_agent in self.agents.items():
+ if other_agent_name != agent_name:
+ # Determine if collaboration is needed
+ if self._should_collaborate(task, other_agent):
+ await other_agent.memory.add_message(collab_message)
+ self.performance_tracker.record_collaboration()
+
+ def _should_collaborate(self, task: Task, agent: BaseAgent) -> bool:
+ """Determine if an agent should receive collaboration message."""
+ # Synthesizer should receive all completion reports
+ if agent.role == AgentRole.SYNTHESIZER:
+ return True
+
+ # Critic should receive analysis and research results
+ if agent.role == AgentRole.CRITIC and task.metadata.get("suggested_role") in ["Researcher", "Analyst"]:
+ return True
+
+ # Analyst should receive research results
+ if agent.role == AgentRole.ANALYST and task.metadata.get("suggested_role") == "Researcher":
+ return True
+
+ return False
+
+ def _calculate_success_rate(self, tasks: List[Task]) -> float:
+ """Calculate the success rate of task execution."""
if not tasks:
return 0.0
@@ -806,18 +1790,80 @@ class CoordinatorAgent(BaseAgent):
for agent_name, agent in self.agents.items():
agent_tasks = [task for task in tasks if task.assigned_to == agent_name]
- contributions[agent_name] = {
- "role": agent.role.value,
- "tasks_completed": len(agent_tasks),
- "average_confidence": np.mean([task.confidence for task in agent_tasks]) if agent_tasks else 0.0,
- "total_execution_time": sum(
- (task.completed_at - task.created_at).total_seconds()
+ if agent_tasks:
+ total_execution_time = sum(
+ task.performance_metrics.get('execution_time', 0)
for task in agent_tasks
- if task.completed_at
)
- }
+
+ avg_confidence = np.mean([task.confidence for task in agent_tasks])
+
+ contributions[agent_name] = {
+ "role": agent.role.value,
+ "tasks_completed": len(agent_tasks),
+ "average_confidence": avg_confidence,
+ "total_execution_time": total_execution_time,
+ "collaboration_count": len(agent.collaboration_partners),
+ "status": agent.get_status_summary()
+ }
+ else:
+ contributions[agent_name] = {
+ "role": agent.role.value,
+ "tasks_completed": 0,
+ "average_confidence": 0.0,
+ "total_execution_time": 0.0,
+ "collaboration_count": 0,
+ "status": agent.get_status_summary()
+ }
return contributions
+
+ def get_workflow_insights(self) -> Dict[str, Any]:
+ """Get insights about workflow execution patterns."""
+ if not self.execution_history:
+ return {"message": "No execution history available"}
+
+ # Analyze execution patterns
+ total_executions = len(self.execution_history)
+ avg_execution_time = np.mean([wf['execution_time'] for wf in self.execution_history])
+ avg_success_rate = np.mean([wf['success_rate'] for wf in self.execution_history])
+
+ # Analyze agent performance
+ agent_stats = {}
+ for workflow in self.execution_history:
+ for agent, contrib in workflow['agent_contributions'].items():
+ if agent not in agent_stats:
+ agent_stats[agent] = {
+ 'total_tasks': 0,
+ 'total_time': 0,
+ 'confidence_scores': []
+ }
+
+ agent_stats[agent]['total_tasks'] += contrib['tasks_completed']
+ agent_stats[agent]['total_time'] += contrib['total_execution_time']
+ if contrib['average_confidence'] > 0:
+ agent_stats[agent]['confidence_scores'].append(contrib['average_confidence'])
+
+ # Calculate agent efficiency
+ agent_efficiency = {}
+ for agent, stats in agent_stats.items():
+ if stats['total_tasks'] > 0:
+ agent_efficiency[agent] = {
+ 'avg_time_per_task': stats['total_time'] / stats['total_tasks'],
+ 'avg_confidence': np.mean(stats['confidence_scores']) if stats['confidence_scores'] else 0,
+ 'total_tasks': stats['total_tasks']
+ }
+
+ return {
+ 'total_workflows_executed': total_executions,
+ 'average_execution_time': avg_execution_time,
+ 'average_success_rate': avg_success_rate,
+ 'agent_efficiency': agent_efficiency,
+ 'most_efficient_agent': min(agent_efficiency.items(),
+ key=lambda x: x[1]['avg_time_per_task'])[0] if agent_efficiency else None,
+ 'highest_quality_agent': max(agent_efficiency.items(),
+ key=lambda x: x[1]['avg_confidence'])[0] if agent_efficiency else None
+ }
class WorkflowVisualizer:
"""Handles visualization of agent interactions and workflow."""
@@ -825,46 +1871,87 @@ class WorkflowVisualizer:
def __init__(self):
self.color_map = Config.NODE_COLORS
self.layout_cache = {}
+ self.animation_frames = []
def create_workflow_graph(self, workflow_graph: nx.DiGraph,
- active_agents: List[str] = None) -> go.Figure:
+ active_agents: List[str] = None,
+ highlight_tasks: List[str] = None) -> go.Figure:
"""Create an interactive workflow visualization."""
+ if len(workflow_graph.nodes()) == 0:
+ return self._create_empty_graph()
+
# Use hierarchical layout
pos = self._hierarchical_layout(workflow_graph)
- # Create edge trace
+ # Create traces
edge_trace = self._create_edge_trace(workflow_graph, pos)
+ node_trace = self._create_node_trace(workflow_graph, pos, active_agents, highlight_tasks)
+
+ # Add agent status indicators
+ status_trace = self._create_status_indicators(workflow_graph, pos, active_agents)
+
+ # Create figure with all traces
+ traces = [edge_trace, node_trace]
+ if status_trace:
+ traces.append(status_trace)
+
+ fig = go.Figure(
+ data=traces,
+ layout=go.Layout(
+ title={
+ 'text': 'Agent Collaboration Network',
+ 'x': 0.5,
+ 'xanchor': 'center',
+ 'font': {'size': 20, 'color': '#2c3e50'}
+ },
+ titlefont_size=16,
+ showlegend=False,
+ hovermode='closest',
+ margin=dict(b=40, l=40, r=40, t=60),
+ xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
+ yaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
+ plot_bgcolor='#f8f9fa',
+ paper_bgcolor='white',
+ font=dict(family='Inter, sans-serif'),
+ annotations=self._create_annotations(workflow_graph, pos)
+ )
+ )
- # Create node trace
- node_trace = self._create_node_trace(workflow_graph, pos, active_agents)
-
- # Create figure
- fig = go.Figure(data=[edge_trace, node_trace],
- layout=go.Layout(
- title='Agent Collaboration Network',
- titlefont_size=16,
- showlegend=False,
- hovermode='closest',
- margin=dict(b=20, l=5, r=5, t=40),
- xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
- yaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
- plot_bgcolor='white'
- ))
+ # Add shapes for visual enhancement
+ fig.update_layout(
+ shapes=self._create_background_shapes(pos)
+ )
return fig
+ def _create_empty_graph(self) -> go.Figure:
+ """Create empty graph with message."""
+ fig = go.Figure()
+ fig.add_annotation(
+ text="No workflow data to display.
Start an analysis to see agent interactions.",
+ xref="paper", yref="paper",
+ x=0.5, y=0.5,
+ showarrow=False,
+ font=dict(size=16, color='#7f8c8d'),
+ bgcolor='#ecf0f1',
+ borderpad=20
+ )
+ fig.update_layout(
+ height=500,
+ plot_bgcolor='#f8f9fa',
+ paper_bgcolor='white'
+ )
+ return fig
+
def _hierarchical_layout(self, G: nx.DiGraph) -> Dict:
"""Create hierarchical layout for the graph."""
- if len(G) == 0:
- return {}
-
- # Try to use cached layout if graph structure hasn't changed
- graph_hash = hash(tuple(G.edges()))
+ # Check cache
+ graph_hash = hash(tuple(sorted(G.edges())))
if graph_hash in self.layout_cache:
return self.layout_cache[graph_hash]
- # Define role hierarchy
+ # Define role hierarchy with better spacing
hierarchy = {
'Coordinator': 0,
'Researcher': 1,
@@ -873,26 +1960,35 @@ class WorkflowVisualizer:
'Synthesizer': 4
}
- # Position nodes based on hierarchy
+ # Group nodes by type
+ agent_nodes = [n for n in G.nodes() if G.nodes[n].get('role')]
+ task_nodes = [n for n in G.nodes() if 'task_' in str(n)]
+
pos = {}
- role_counts = {}
- for node in G.nodes():
+ # Position agent nodes
+ role_x_positions = {}
+ for i, (role, level) in enumerate(hierarchy.items()):
+ role_x_positions[role] = i * 3 - 6 # Spread horizontally
+
+ for node in agent_nodes:
role = G.nodes[node].get('role', 'Unknown')
level = hierarchy.get(role, 5)
-
- if level not in role_counts:
- role_counts[level] = 0
-
- x = role_counts[level] * 2 - (len([n for n in G.nodes() if hierarchy.get(G.nodes[n].get('role'), 5) == level]) - 1)
- y = -level * 2
-
+ x = role_x_positions.get(role, 0)
+ y = -level * 2.5
pos[node] = (x, y)
- role_counts[level] += 1
- # Cache the layout
- self.layout_cache[graph_hash] = pos
+ # Position task nodes in a grid on the right
+ if task_nodes:
+ task_start_x = max(role_x_positions.values()) + 4
+ cols = 3
+ for i, task in enumerate(task_nodes):
+ row = i // cols
+ col = i % cols
+ pos[task] = (task_start_x + col * 2, -row * 1.5)
+ # Cache layout
+ self.layout_cache[graph_hash] = pos
return pos
def _create_edge_trace(self, G: nx.DiGraph, pos: Dict) -> go.Scatter:
@@ -901,82 +1997,214 @@ class WorkflowVisualizer:
edge_y = []
for edge in G.edges():
- x0, y0 = pos.get(edge[0], (0, 0))
- x1, y1 = pos.get(edge[1], (0, 0))
- edge_x.extend([x0, x1, None])
- edge_y.extend([y0, y1, None])
+ if edge[0] in pos and edge[1] in pos:
+ x0, y0 = pos[edge[0]]
+ x1, y1 = pos[edge[1]]
+
+ # Add bezier curve points for smoother edges
+ mid_x = (x0 + x1) / 2
+ mid_y = (y0 + y1) / 2 - 0.5 # Slight curve
+
+ # Create curved edge
+ t = np.linspace(0, 1, 10)
+ for ti in t:
+ x = (1-ti)**2 * x0 + 2*(1-ti)*ti * mid_x + ti**2 * x1
+ y = (1-ti)**2 * y0 + 2*(1-ti)*ti * mid_y + ti**2 * y1
+ edge_x.append(x)
+ edge_y.append(y)
+ edge_x.append(None)
+ edge_y.append(None)
edge_trace = go.Scatter(
x=edge_x, y=edge_y,
- line=dict(width=2, color='#888'),
+ line=dict(width=2, color='#bdc3c7'),
hoverinfo='none',
- mode='lines'
+ mode='lines',
+ opacity=0.6
)
return edge_trace
def _create_node_trace(self, G: nx.DiGraph, pos: Dict,
- active_agents: List[str] = None) -> go.Scatter:
+ active_agents: List[str] = None,
+ highlight_tasks: List[str] = None) -> go.Scatter:
"""Create node trace for the graph."""
node_x = []
node_y = []
node_colors = []
node_sizes = []
node_text = []
+ node_hover = []
+ node_symbols = []
for node in G.nodes():
- x, y = pos.get(node, (0, 0))
- node_x.append(x)
- node_y.append(y)
-
- # Get node attributes
- role = G.nodes[node].get('role', 'Unknown')
- color = self.color_map.get(role, '#666')
-
- # Highlight active agents
- if active_agents and node in active_agents:
- size = 30
- color = self._brighten_color(color)
- else:
- size = 20
-
- node_colors.append(color)
- node_sizes.append(size)
-
- # Create hover text
- degree = G.degree(node)
- hover_text = f"{node}
Role: {role}
Connections: {degree}"
- node_text.append(hover_text)
+ if node in pos:
+ x, y = pos[node]
+ node_x.append(x)
+ node_y.append(y)
+
+ # Get node attributes
+ node_data = G.nodes[node]
+ role = node_data.get('role', '')
+
+ # Determine node properties
+ if role: # Agent node
+ color = self.color_map.get(role, '#95a5a6')
+ size = 35
+ symbol = 'circle'
+
+ # Highlight active agents
+ if active_agents and node in active_agents:
+ size = 45
+ color = self._brighten_color(color)
+
+ hover_text = f"{node}
Role: {role}
Status: Active"
+ display_text = node.split('-')[0] # Show role name
+
+ else: # Task node
+ status = node_data.get('status', 'pending')
+ color = self._get_status_color(status)
+ size = 25
+ symbol = 'square'
+
+ # Highlight specific tasks
+ if highlight_tasks and str(node) in highlight_tasks:
+ size = 30
+ color = self._brighten_color(color)
+
+ task_desc = node_data.get('task_description', 'Task')
+ hover_text = f"{node}
{task_desc}
Status: {status}"
+ display_text = node if len(str(node)) < 10 else f"T{node[-1]}"
+
+ node_colors.append(color)
+ node_sizes.append(size)
+ node_text.append(display_text)
+ node_hover.append(hover_text)
+ node_symbols.append(symbol)
node_trace = go.Scatter(
x=node_x, y=node_y,
mode='markers+text',
hoverinfo='text',
- text=[node for node in G.nodes()],
- textposition="top center",
- hovertext=node_text,
+ text=node_text,
+ hovertext=node_hover,
+ textposition="bottom center",
+ textfont=dict(size=10, color='#2c3e50'),
marker=dict(
showscale=False,
color=node_colors,
size=node_sizes,
+ symbol=node_symbols,
line=dict(color='white', width=2)
)
)
return node_trace
+ def _create_status_indicators(self, G: nx.DiGraph, pos: Dict,
+ active_agents: List[str] = None) -> Optional[go.Scatter]:
+ """Create status indicators for active agents."""
+ if not active_agents:
+ return None
+
+ indicator_x = []
+ indicator_y = []
+
+ for agent in active_agents:
+ if agent in pos:
+ x, y = pos[agent]
+ # Place indicator above agent
+ indicator_x.append(x)
+ indicator_y.append(y + 0.5)
+
+ if not indicator_x:
+ return None
+
+ return go.Scatter(
+ x=indicator_x,
+ y=indicator_y,
+ mode='markers',
+ marker=dict(
+ size=8,
+ color='#2ecc71',
+ symbol='circle',
+ line=dict(width=2, color='white')
+ ),
+ hoverinfo='skip'
+ )
+
+ def _create_annotations(self, G: nx.DiGraph, pos: Dict) -> List[Dict]:
+ """Create annotations for the graph."""
+ annotations = []
+
+ # Add role labels
+ role_positions = {}
+ for node in G.nodes():
+ if G.nodes[node].get('role'):
+ role = G.nodes[node]['role']
+ if role not in role_positions:
+ x, y = pos[node]
+ role_positions[role] = (x, y - 0.8)
+
+ for role, (x, y) in role_positions.items():
+ annotations.append(dict(
+ x=x, y=y,
+ text=f"{role}",
+ showarrow=False,
+ font=dict(size=12, color='#34495e'),
+ opacity=0.8
+ ))
+
+ return annotations
+
+ def _create_background_shapes(self, pos: Dict) -> List[Dict]:
+ """Create background shapes for visual grouping."""
+ shapes = []
+
+ # Group nodes by x-coordinate regions
+ if pos:
+ x_coords = [p[0] for p in pos.values()]
+ y_coords = [p[1] for p in pos.values()]
+
+ if x_coords and y_coords:
+ # Create subtle background regions
+ padding = 1.5
+ shapes.append(dict(
+ type='rect',
+ x0=min(x_coords) - padding,
+ y0=min(y_coords) - padding,
+ x1=max(x_coords) + padding,
+ y1=max(y_coords) + padding,
+ fillcolor='#ecf0f1',
+ opacity=0.3,
+ line=dict(width=0)
+ ))
+
+ return shapes
+
+ def _get_status_color(self, status: str) -> str:
+ """Get color based on task status."""
+ status_colors = {
+ 'completed': '#2ecc71',
+ 'in_progress': '#f39c12',
+ 'pending': '#95a5a6',
+ 'failed': '#e74c3c'
+ }
+ return status_colors.get(status, '#95a5a6')
+
def _brighten_color(self, color: str) -> str:
"""Make a color brighter for highlighting."""
- # Simple brightening by mixing with white
if color.startswith('#'):
+ # Convert hex to RGB
r = int(color[1:3], 16)
g = int(color[3:5], 16)
b = int(color[5:7], 16)
- # Mix with white (255, 255, 255)
- r = int(r + (255 - r) * 0.3)
- g = int(g + (255 - g) * 0.3)
- b = int(b + (255 - b) * 0.3)
+ # Brighten by mixing with white
+ factor = 0.3
+ r = int(r + (255 - r) * factor)
+ g = int(g + (255 - g) * factor)
+ b = int(b + (255 - b) * factor)
return f"#{r:02x}{g:02x}{b:02x}"
@@ -985,64 +2213,113 @@ class WorkflowVisualizer:
def create_task_timeline(self, tasks: List[Task]) -> go.Figure:
"""Create a timeline visualization of task execution."""
- # Prepare data for timeline
+ # Prepare timeline data
timeline_data = []
for task in tasks:
- if task.created_at and task.completed_at:
+ if task.created_at:
+ # Use completed_at if available, otherwise estimate
+ end_time = task.completed_at if task.completed_at else task.created_at + timedelta(seconds=30)
+
timeline_data.append({
'Task': task.id,
'Agent': task.assigned_to or 'Unassigned',
'Start': task.created_at,
- 'Finish': task.completed_at,
+ 'Finish': end_time,
'Status': task.status.value,
'Confidence': task.confidence
})
if not timeline_data:
- # Return empty figure if no completed tasks
- fig = go.Figure()
- fig.add_annotation(
- text="No completed tasks to display",
- xref="paper", yref="paper",
- x=0.5, y=0.5,
- showarrow=False
- )
- return fig
+ return self._create_empty_timeline()
+ # Create DataFrame
df = pd.DataFrame(timeline_data)
- # Create Gantt chart
+ # Create Gantt chart with custom styling
fig = px.timeline(
df,
x_start="Start",
x_end="Finish",
y="Agent",
color="Confidence",
- hover_data=["Task", "Status", "Confidence"],
- title="Task Execution Timeline",
- color_continuous_scale="Viridis"
+ hover_data=["Task", "Status"],
+ color_continuous_scale="Viridis",
+ labels={'Confidence': 'Confidence Score'}
+ )
+
+ # Update layout for professional appearance
+ fig.update_layout(
+ title={
+ 'text': 'Task Execution Timeline',
+ 'x': 0.5,
+ 'xanchor': 'center',
+ 'font': {'size': 18, 'color': '#2c3e50'}
+ },
+ height=400,
+ xaxis_title="Time",
+ yaxis_title="Agent",
+ plot_bgcolor='#f8f9fa',
+ paper_bgcolor='white',
+ font=dict(family='Inter, sans-serif'),
+ yaxis={'categoryorder': 'total ascending'}
)
- fig.update_yaxis(categoryorder="total ascending")
- fig.update_layout(height=400)
+ # Add grid lines
+ fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='#e0e0e0')
+ fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='#e0e0e0')
return fig
+ def _create_empty_timeline(self) -> go.Figure:
+ """Create empty timeline with message."""
+ fig = go.Figure()
+ fig.add_annotation(
+ text="No task execution data available yet.",
+ xref="paper", yref="paper",
+ x=0.5, y=0.5,
+ showarrow=False,
+ font=dict(size=14, color='#7f8c8d')
+ )
+ fig.update_layout(
+ height=400,
+ plot_bgcolor='#f8f9fa',
+ paper_bgcolor='white'
+ )
+ return fig
+
def create_confidence_heatmap(self, agent_contributions: Dict[str, Any]) -> go.Figure:
- """Create a heatmap showing agent confidence levels."""
+ """Create a heatmap showing agent performance metrics."""
+ if not agent_contributions:
+ return self._create_empty_heatmap()
+
+ # Prepare data
agents = list(agent_contributions.keys())
- metrics = ['tasks_completed', 'average_confidence', 'total_execution_time']
+ metrics = ['Tasks Completed', 'Avg Confidence', 'Time Efficiency', 'Collaboration Score']
- # Normalize data for heatmap
+ # Create data matrix
data = []
for metric in metrics:
row = []
for agent in agents:
- value = agent_contributions[agent].get(metric, 0)
- if metric == 'total_execution_time':
- value = value / 60 # Convert to minutes
+ contrib = agent_contributions[agent]
+
+ if metric == 'Tasks Completed':
+ value = contrib.get('tasks_completed', 0) / 10.0 # Normalize
+ elif metric == 'Avg Confidence':
+ value = contrib.get('average_confidence', 0)
+ elif metric == 'Time Efficiency':
+ # Inverse of average time per task, normalized
+ time = contrib.get('total_execution_time', 1)
+ tasks = contrib.get('tasks_completed', 1)
+ avg_time = time / tasks if tasks > 0 else float('inf')
+ value = min(1.0, 30.0 / avg_time) if avg_time > 0 else 0
+ elif metric == 'Collaboration Score':
+ value = min(1.0, contrib.get('collaboration_count', 0) / 5.0)
+ else:
+ value = 0
+
row.append(value)
data.append(row)
@@ -1050,18 +2327,103 @@ class WorkflowVisualizer:
fig = go.Figure(data=go.Heatmap(
z=data,
x=agents,
- y=['Tasks Completed', 'Avg Confidence', 'Time (min)'],
+ y=metrics,
colorscale='Blues',
text=np.round(data, 2),
texttemplate='%{text}',
- textfont={"size": 10}
+ textfont={"size": 12},
+ colorbar=dict(title="Score", titleside="right"),
+ hoverongaps=False
))
+ # Update layout
fig.update_layout(
- title="Agent Performance Metrics",
+ title={
+ 'text': 'Agent Performance Metrics',
+ 'x': 0.5,
+ 'xanchor': 'center',
+ 'font': {'size': 18, 'color': '#2c3e50'}
+ },
xaxis_title="Agents",
yaxis_title="Metrics",
- height=300
+ height=350,
+ plot_bgcolor='white',
+ paper_bgcolor='white',
+ font=dict(family='Inter, sans-serif')
+ )
+
+ # Add cell borders
+ fig.update_xaxes(showgrid=False, showline=True, linewidth=2, linecolor='#2c3e50')
+ fig.update_yaxes(showgrid=False, showline=True, linewidth=2, linecolor='#2c3e50')
+
+ return fig
+
+ def _create_empty_heatmap(self) -> go.Figure:
+ """Create empty heatmap with message."""
+ fig = go.Figure()
+ fig.add_annotation(
+ text="No agent performance data available yet.",
+ xref="paper", yref="paper",
+ x=0.5, y=0.5,
+ showarrow=False,
+ font=dict(size=14, color='#7f8c8d')
+ )
+ fig.update_layout(
+ height=350,
+ plot_bgcolor='#f8f9fa',
+ paper_bgcolor='white'
+ )
+ return fig
+
+ def create_performance_comparison(self, performance_metrics: Dict[str, Any]) -> go.Figure:
+ """Create performance comparison visualization."""
+
+ # Extract metrics
+ baseline_time = Config.BENCHMARK_BASELINE['single_agent_time']
+ actual_time = performance_metrics.get('average_task_completion_time', baseline_time)
+ time_improvement = performance_metrics.get('time_improvement_percentage', 0)
+
+ # Create bar chart comparing performance
+ categories = ['Single Agent', 'Multi-Agent System']
+ values = [baseline_time, actual_time]
+ colors = ['#e74c3c', '#2ecc71']
+
+ fig = go.Figure(data=[
+ go.Bar(
+ x=categories,
+ y=values,
+ marker_color=colors,
+ text=[f'{v:.1f}s' for v in values],
+ textposition='auto'
+ )
+ ])
+
+ # Add improvement annotation
+ fig.add_annotation(
+ x=1, y=actual_time + 5,
+ text=f"{time_improvement:.1f}% Faster",
+ showarrow=True,
+ arrowhead=2,
+ arrowsize=1,
+ arrowwidth=2,
+ arrowcolor='#2ecc71',
+ font=dict(size=14, color='#2ecc71', weight='bold')
+ )
+
+ # Update layout
+ fig.update_layout(
+ title={
+ 'text': 'Performance Comparison',
+ 'x': 0.5,
+ 'xanchor': 'center',
+ 'font': {'size': 18, 'color': '#2c3e50'}
+ },
+ yaxis_title="Average Completion Time (seconds)",
+ height=400,
+ plot_bgcolor='#f8f9fa',
+ paper_bgcolor='white',
+ font=dict(family='Inter, sans-serif'),
+ showlegend=False
)
return fig
@@ -1076,7 +2438,8 @@ class ReportGenerator:
'agent_contributions': self._generate_agent_contributions,
'key_findings': self._generate_key_findings,
'recommendations': self._generate_recommendations,
- 'confidence_analysis': self._generate_confidence_analysis
+ 'confidence_analysis': self._generate_confidence_analysis,
+ 'performance_metrics': self._generate_performance_metrics
}
def generate_report(self,
@@ -1097,21 +2460,36 @@ class ReportGenerator:
for section in include_sections:
if section in self.section_generators:
section_content = self.section_generators[section](workflow_result, problem_statement)
- report_sections.append(section_content)
+ if section_content:
+ report_sections.append(section_content)
# Footer
report_sections.append(self._generate_footer(workflow_result))
- return "\n\n".join(report_sections)
+ # Format the complete report
+ complete_report = "\n\n".join(report_sections)
+
+ # Add CSS styling for better markdown rendering
+ styled_report = f"""
Level: {confidence_level}
++ Advanced AI agents working together to solve complex problems through intelligent task decomposition and parallel processing +
+